play-daemon-threaded/pipeline/distributor.py

80 lines
3.2 KiB
Python

from pathlib import Path
from pprint import pformat
from uuid import uuid4
from .collector import PendingItem
from .exceptions import ValidationException
from .notifier import ErrorNote
from .package import PackageManager
from .queuethread import QueueThread
class Distributor(QueueThread):
"""
This queue accepts all items and passes them along to registered handlers
by asking each handler if it wants the item.
"""
def __init__(self, collector, notifier, handler_queue_map,
packagedir, tempdir, cachedir):
super().__init__()
self.collector = collector
self.notifier = notifier
self.handlers = handler_queue_map
self.packages = Path(packagedir)
self.tempdir = Path(tempdir)
self.pkgid_cache = Path(cachedir)
def _process(self, queueitem):
"""
If the item doesn't specify an ID, create one so a new package
can be created.
Walk through all registered handlers, check if the handler wants the
item and enqueue it with the handler(s) that do.
If the item fails validation for a handler that wants the item, the
item is not added to any queue and an error notification is sent,
discarding the item.
"""
self.logger.info('Processing queue item: %s', pformat(queueitem))
jobid = queueitem['jobid']
if 'pkg_id' not in queueitem['jobspec']:
queueitem['jobspec']['pkg_id'] = self.get_pkgid(jobid)
package_id = queueitem['jobspec']['pkg_id']
package_manager = PackageManager(package_id,
self.packages / package_id)
package_tempcopy_dir = self.tempdir / f'{jobid}.package'
package = package_manager.make_temporary_copy(package_tempcopy_dir)
queueitem['existing'] = package
self.logger.info('%s - Determining tasks for package %s',
jobid, package_id)
wants = {}
for handlerclass, handlerqueue in self.handlers.items():
try:
if handlerclass.wants(queueitem['jobspec'],
queueitem['existing']):
self.logger.info('%s - wanted by %s',
jobid,
handlerclass.__name__)
wants[handlerclass] = handlerqueue
except ValidationException as e:
url = queueitem.get('notification_url', None)
return self.notifier.put(ErrorNote(jobid,
url,
handlerclass.__name__,
e))
self.collector.put(PendingItem(queueitem, list(wants.keys())))
for handlerqueue in wants.values():
handlerqueue.put(queueitem)
def get_pkgid(self, jobid):
cachefile = self.pkgid_cache / jobid
if cachefile.exists():
return cachefile.read_text()
new_id = str(uuid4())
cachefile.write_text(new_id)
self.logger.info('%s - Created new package ID %s',
jobid, new_id)
return new_id