play-daemon-threaded/pipeline/collector.py

199 lines
7.1 KiB
Python

import logging
from dataclasses import dataclass, field
from os import path
from pathlib import Path
from pprint import pformat
from shutil import rmtree
from typing import Callable
from .notifier import DoneNote, ErrorNote
from .package import Package, PackageManager
from .queuethread import QueueThread
@dataclass
class PendingItem:
queueitem: dict
handlers: list
jobid: str = field(init=False)
jobspec: dict = field(init=False)
package: Package = field(init=False)
def __post_init__(self):
self.jobid = self.queueitem['jobid']
self.jobspec = self.queueitem['jobspec']
self.package = self.queueitem['existing']
@dataclass
class DoneItem:
jobid: str
handler: object # Handler, but circular import prevents it
apply_func: Callable
@dataclass
class ErrorItem:
jobid: str
handler: object # Handler, but circular import prevents it
exception: Exception
@QueueThread.register
class Collector(QueueThread):
"""
This class keeps track of pending actions and is responsible
for cleaning up once all pending actions are done.
"""
def __init__(self, notifier, packagedir, cachedir):
super().__init__()
self.packages = Path(packagedir)
self.pkgid_cache = Path(cachedir)
self.notifier = notifier
self.trackers = {}
self.actions = {PendingItem: self._process_pending,
DoneItem: self._process_done,
ErrorItem: self._process_error}
def _process_pending(self, item):
"""
Handle a pending notice.
Throws an exception if the jobid is already known.
"""
jobid = item.jobid
if jobid in self.trackers:
raise Exception(f"Job {jobid} has already been submitted")
self.logger.info("%s - registering pending handlers %s",
jobid, pformat(item.handlers))
self.trackers[jobid] = Tracker(item)
def _process_done(self, item):
"""
Handle a done notice. Submits a notification to the notifier. Discards
the pending object if no more actions are pending.
Throws an exception on unknown jobids.
"""
jobid = item.jobid
if jobid not in self.trackers:
raise Exception(f"Job {jobid} not found in tracker")
pending = self.trackers[jobid]
if pending.failed:
self.logger.info("%s - job has been marked as failed, "
"will not apply %s",
jobid,
pformat(item.handler))
all_done = pending.finish(item.handler)
else:
if any(item.handler.apply_after(handler)
for handler in pending.handlers):
# Defer processing if there are pending handlers that must
# finish before this one
# Not logging this due to unbelieveable spam
return self.put(item)
self.logger.info("%s - applying %s",
jobid,
item.handler.__class__.__name__)
package_id = pending.jobspec['pkg_id']
try:
with (PackageManager(package_id,
self.packages / package_id)
as package):
item.apply_func(package)
except Exception as e:
url = pending.jobspec.get('notification_url', None)
self.notifier.put(ErrorNote(jobid,
url,
self.__class__.__name__,
e))
pending.failed = True
else:
all_done = pending.finish(item.handler)
url = pending.jobspec.get('notification_url', None)
pending_handlers = [handlerclass.__name__
for handlerclass in pending.handlers]
notification = DoneNote(jobid,
url,
item.handler.__class__.__name__,
package.asdict(),
pending_handlers)
self.notifier.put(notification)
if all_done:
self.trackers.pop(jobid)
(self.pkgid_cache / jobid).unlink(missing_ok=True)
self.logger.info("%s - all done", jobid)
def _process_error(self, item):
jobid = item.jobid
origin = item.handler
exception = item.exception
if jobid not in self.trackers:
raise Exception(f"Job {jobid} not found in tracker")
pending = self.trackers[jobid]
pending.failed = True
url = pending.jobspec.get('notification_url', None)
self.notifier.put(ErrorNote(jobid,
url,
origin.__class__.__name__,
exception))
def _process(self, item):
"""
Work through the queue.
Throws an exception on invalid action values.
"""
if type(item) not in self.actions:
raise Exception(f"Invalid action {type(item)}")
# Call the appropriate handler method
self.actions[type(item)](item)
class Tracker:
"""
Utility class to represent pending actions.
"""
def __init__(self, pending_item):
self.logger = self.logger = logging.getLogger(
f'play-daemon.{self.__class__.__name__}')
self.jobid = pending_item.jobid
self.jobspec = pending_item.jobspec
self.temp_copy = pending_item.package
self.handlers = pending_item.handlers
self.failed = False
def finish(self, handler):
"""
Finish a pending action. Returns True if no more actions are pending,
otherwise False.
If no more actions are pending, cleans up files uploaded for the job
before returning.
Throws an exception if the given action is missing from the
pending list.
"""
handlerclass = handler.__class__
if handlerclass not in self.handlers:
raise Exception(f"{handler} missing from {self.handlers}")
self.handlers.remove(handlerclass)
if not self.handlers:
# Clean up uploaded files once all handlers are done with the job
# Failed jobs get cleaned manually for safety
if 'upload_dir' in self.jobspec and not self.failed:
self.logger.info("Deleting incoming files for job %s (%s)",
self.jobid, self.jobspec['upload_dir'])
rmtree(self.jobspec['upload_dir'])
# Also remove the temporary copy of the previous package state
if path.exists(self.temp_copy.basedir):
self.logger.info("Deleting temporary files for job %s (%s)",
self.jobid, self.temp_copy.basedir)
rmtree(self.temp_copy.basedir)
return True
else:
return False