199 lines
7.1 KiB
Python
199 lines
7.1 KiB
Python
import logging
|
|
|
|
from dataclasses import dataclass, field
|
|
from os import path
|
|
from pathlib import Path
|
|
from pprint import pformat
|
|
from shutil import rmtree
|
|
from typing import Callable
|
|
|
|
from .notifier import DoneNote, ErrorNote
|
|
from .package import Package, PackageManager
|
|
from .queuethread import QueueThread
|
|
|
|
|
|
@dataclass
|
|
class PendingItem:
|
|
queueitem: dict
|
|
handlers: list
|
|
jobid: str = field(init=False)
|
|
jobspec: dict = field(init=False)
|
|
package: Package = field(init=False)
|
|
|
|
def __post_init__(self):
|
|
self.jobid = self.queueitem['jobid']
|
|
self.jobspec = self.queueitem['jobspec']
|
|
self.package = self.queueitem['existing']
|
|
|
|
|
|
@dataclass
|
|
class DoneItem:
|
|
jobid: str
|
|
handler: object # Handler, but circular import prevents it
|
|
apply_func: Callable
|
|
|
|
|
|
@dataclass
|
|
class ErrorItem:
|
|
jobid: str
|
|
handler: object # Handler, but circular import prevents it
|
|
exception: Exception
|
|
|
|
|
|
@QueueThread.register
|
|
class Collector(QueueThread):
|
|
"""
|
|
This class keeps track of pending actions and is responsible
|
|
for cleaning up once all pending actions are done.
|
|
"""
|
|
def __init__(self, notifier, packagedir, cachedir):
|
|
super().__init__()
|
|
self.packages = Path(packagedir)
|
|
self.pkgid_cache = Path(cachedir)
|
|
self.notifier = notifier
|
|
self.trackers = {}
|
|
|
|
self.actions = {PendingItem: self._process_pending,
|
|
DoneItem: self._process_done,
|
|
ErrorItem: self._process_error}
|
|
|
|
def _process_pending(self, item):
|
|
"""
|
|
Handle a pending notice.
|
|
|
|
Throws an exception if the jobid is already known.
|
|
"""
|
|
jobid = item.jobid
|
|
if jobid in self.trackers:
|
|
raise Exception(f"Job {jobid} has already been submitted")
|
|
self.logger.info("%s - registering pending handlers %s",
|
|
jobid, pformat(item.handlers))
|
|
self.trackers[jobid] = Tracker(item)
|
|
|
|
def _process_done(self, item):
|
|
"""
|
|
Handle a done notice. Submits a notification to the notifier. Discards
|
|
the pending object if no more actions are pending.
|
|
|
|
Throws an exception on unknown jobids.
|
|
"""
|
|
jobid = item.jobid
|
|
if jobid not in self.trackers:
|
|
raise Exception(f"Job {jobid} not found in tracker")
|
|
pending = self.trackers[jobid]
|
|
if pending.failed:
|
|
self.logger.info("%s - job has been marked as failed, "
|
|
"will not apply %s",
|
|
jobid,
|
|
pformat(item.handler))
|
|
all_done = pending.finish(item.handler)
|
|
else:
|
|
if any(item.handler.apply_after(handler)
|
|
for handler in pending.handlers):
|
|
# Defer processing if there are pending handlers that must
|
|
# finish before this one
|
|
# Not logging this due to unbelieveable spam
|
|
return self.put(item)
|
|
|
|
self.logger.info("%s - applying %s",
|
|
jobid,
|
|
item.handler.__class__.__name__)
|
|
package_id = pending.jobspec['pkg_id']
|
|
try:
|
|
with (PackageManager(package_id,
|
|
self.packages / package_id)
|
|
as package):
|
|
item.apply_func(package)
|
|
except Exception as e:
|
|
url = pending.jobspec.get('notification_url', None)
|
|
self.notifier.put(ErrorNote(jobid,
|
|
url,
|
|
self.__class__.__name__,
|
|
e))
|
|
pending.failed = True
|
|
else:
|
|
all_done = pending.finish(item.handler)
|
|
url = pending.jobspec.get('notification_url', None)
|
|
pending_handlers = [handlerclass.__name__
|
|
for handlerclass in pending.handlers]
|
|
notification = DoneNote(jobid,
|
|
url,
|
|
item.handler.__class__.__name__,
|
|
package.asdict(),
|
|
pending_handlers)
|
|
self.notifier.put(notification)
|
|
if all_done:
|
|
self.trackers.pop(jobid)
|
|
(self.pkgid_cache / jobid).unlink(missing_ok=True)
|
|
self.logger.info("%s - all done", jobid)
|
|
|
|
def _process_error(self, item):
|
|
jobid = item.jobid
|
|
origin = item.handler
|
|
exception = item.exception
|
|
if jobid not in self.trackers:
|
|
raise Exception(f"Job {jobid} not found in tracker")
|
|
pending = self.trackers[jobid]
|
|
pending.failed = True
|
|
url = pending.jobspec.get('notification_url', None)
|
|
self.notifier.put(ErrorNote(jobid,
|
|
url,
|
|
origin.__class__.__name__,
|
|
exception))
|
|
|
|
def _process(self, item):
|
|
"""
|
|
Work through the queue.
|
|
|
|
Throws an exception on invalid action values.
|
|
"""
|
|
if type(item) not in self.actions:
|
|
raise Exception(f"Invalid action {type(item)}")
|
|
# Call the appropriate handler method
|
|
self.actions[type(item)](item)
|
|
|
|
|
|
class Tracker:
|
|
"""
|
|
Utility class to represent pending actions.
|
|
"""
|
|
def __init__(self, pending_item):
|
|
self.logger = self.logger = logging.getLogger(
|
|
f'play-daemon.{self.__class__.__name__}')
|
|
self.jobid = pending_item.jobid
|
|
self.jobspec = pending_item.jobspec
|
|
self.temp_copy = pending_item.package
|
|
self.handlers = pending_item.handlers
|
|
self.failed = False
|
|
|
|
def finish(self, handler):
|
|
"""
|
|
Finish a pending action. Returns True if no more actions are pending,
|
|
otherwise False.
|
|
|
|
If no more actions are pending, cleans up files uploaded for the job
|
|
before returning.
|
|
|
|
Throws an exception if the given action is missing from the
|
|
pending list.
|
|
"""
|
|
handlerclass = handler.__class__
|
|
if handlerclass not in self.handlers:
|
|
raise Exception(f"{handler} missing from {self.handlers}")
|
|
self.handlers.remove(handlerclass)
|
|
if not self.handlers:
|
|
# Clean up uploaded files once all handlers are done with the job
|
|
# Failed jobs get cleaned manually for safety
|
|
if 'upload_dir' in self.jobspec and not self.failed:
|
|
self.logger.info("Deleting incoming files for job %s (%s)",
|
|
self.jobid, self.jobspec['upload_dir'])
|
|
rmtree(self.jobspec['upload_dir'])
|
|
# Also remove the temporary copy of the previous package state
|
|
if path.exists(self.temp_copy.basedir):
|
|
self.logger.info("Deleting temporary files for job %s (%s)",
|
|
self.jobid, self.temp_copy.basedir)
|
|
rmtree(self.temp_copy.basedir)
|
|
return True
|
|
else:
|
|
return False
|