import logging from dataclasses import dataclass, field from os import path from pathlib import Path from pprint import pformat from shutil import rmtree from typing import Callable from .notifier import DoneNote, ErrorNote from .package import Package, PackageManager from .queuethread import QueueThread @dataclass class PendingItem: queueitem: dict handlers: list jobid: str = field(init=False) jobspec: dict = field(init=False) package: Package = field(init=False) def __post_init__(self): self.jobid = self.queueitem['jobid'] self.jobspec = self.queueitem['jobspec'] self.package = self.queueitem['existing'] @dataclass class DoneItem: jobid: str handler: object # Handler, but circular import prevents it apply_func: Callable @dataclass class ErrorItem: jobid: str handler: object # Handler, but circular import prevents it exception: Exception @QueueThread.register class Collector(QueueThread): """ This class keeps track of pending actions and is responsible for cleaning up once all pending actions are done. """ def __init__(self, notifier, packagedir, cachedir): super().__init__() self.packages = Path(packagedir) self.pkgid_cache = Path(cachedir) self.notifier = notifier self.trackers = {} self.actions = {PendingItem: self._process_pending, DoneItem: self._process_done, ErrorItem: self._process_error} def _process_pending(self, item): """ Handle a pending notice. Throws an exception if the jobid is already known. """ jobid = item.jobid if jobid in self.trackers: raise Exception(f"Job {jobid} has already been submitted") self.logger.info("%s - registering pending handlers %s", jobid, pformat(item.handlers)) self.trackers[jobid] = Tracker(item) def _process_done(self, item): """ Handle a done notice. Submits a notification to the notifier. Discards the pending object if no more actions are pending. Throws an exception on unknown jobids. """ jobid = item.jobid if jobid not in self.trackers: raise Exception(f"Job {jobid} not found in tracker") pending = self.trackers[jobid] if pending.failed: self.logger.info("%s - job has been marked as failed, " "will not apply %s", jobid, pformat(item.handler)) all_done = pending.finish(item.handler) else: if any(item.handler.apply_after(handler) for handler in pending.handlers): # Defer processing if there are pending handlers that must # finish before this one # Not logging this due to unbelieveable spam return self.put(item) self.logger.info("%s - applying %s", jobid, item.handler.__class__.__name__) package_id = pending.jobspec['pkg_id'] try: with (PackageManager(package_id, self.packages / package_id) as package): item.apply_func(package) except Exception as e: url = pending.jobspec.get('notification_url', None) self.notifier.put(ErrorNote(jobid, url, self.__class__.__name__, e)) pending.failed = True else: all_done = pending.finish(item.handler) url = pending.jobspec.get('notification_url', None) pending_handlers = [handlerclass.__name__ for handlerclass in pending.handlers] notification = DoneNote(jobid, url, item.handler.__class__.__name__, package.asdict(), pending_handlers) self.notifier.put(notification) if all_done: self.trackers.pop(jobid) (self.pkgid_cache / jobid).unlink(missing_ok=True) self.logger.info("%s - all done", jobid) def _process_error(self, item): jobid = item.jobid origin = item.handler exception = item.exception if jobid not in self.trackers: raise Exception(f"Job {jobid} not found in tracker") pending = self.trackers[jobid] pending.failed = True url = pending.jobspec.get('notification_url', None) self.notifier.put(ErrorNote(jobid, url, origin.__class__.__name__, exception)) def _process(self, item): """ Work through the queue. Throws an exception on invalid action values. """ if type(item) not in self.actions: raise Exception(f"Invalid action {type(item)}") # Call the appropriate handler method self.actions[type(item)](item) class Tracker: """ Utility class to represent pending actions. """ def __init__(self, pending_item): self.logger = self.logger = logging.getLogger( f'play-daemon.{self.__class__.__name__}') self.jobid = pending_item.jobid self.jobspec = pending_item.jobspec self.temp_copy = pending_item.package self.handlers = pending_item.handlers self.failed = False def finish(self, handler): """ Finish a pending action. Returns True if no more actions are pending, otherwise False. If no more actions are pending, cleans up files uploaded for the job before returning. Throws an exception if the given action is missing from the pending list. """ handlerclass = handler.__class__ if handlerclass not in self.handlers: raise Exception(f"{handler} missing from {self.handlers}") self.handlers.remove(handlerclass) if not self.handlers: # Clean up uploaded files once all handlers are done with the job # Failed jobs get cleaned manually for safety if 'upload_dir' in self.jobspec and not self.failed: self.logger.info("Deleting incoming files for job %s (%s)", self.jobid, self.jobspec['upload_dir']) rmtree(self.jobspec['upload_dir']) # Also remove the temporary copy of the previous package state if path.exists(self.temp_copy.basedir): self.logger.info("Deleting temporary files for job %s (%s)", self.jobid, self.temp_copy.basedir) rmtree(self.temp_copy.basedir) return True else: return False