80 lines
3.2 KiB
Python
80 lines
3.2 KiB
Python
from pathlib import Path
|
|
from pprint import pformat
|
|
from uuid import uuid4
|
|
|
|
from .collector import PendingItem
|
|
from .exceptions import ValidationException
|
|
from .notifier import ErrorNote
|
|
from .package import PackageManager
|
|
from .queuethread import QueueThread
|
|
|
|
|
|
class Distributor(QueueThread):
|
|
"""
|
|
This queue accepts all items and passes them along to registered handlers
|
|
by asking each handler if it wants the item.
|
|
"""
|
|
def __init__(self, collector, notifier, handler_queue_map,
|
|
packagedir, tempdir, cachedir):
|
|
super().__init__()
|
|
self.collector = collector
|
|
self.notifier = notifier
|
|
self.handlers = handler_queue_map
|
|
self.packages = Path(packagedir)
|
|
self.tempdir = Path(tempdir)
|
|
self.pkgid_cache = Path(cachedir)
|
|
|
|
def _process(self, queueitem):
|
|
"""
|
|
If the item doesn't specify an ID, create one so a new package
|
|
can be created.
|
|
|
|
Walk through all registered handlers, check if the handler wants the
|
|
item and enqueue it with the handler(s) that do.
|
|
|
|
If the item fails validation for a handler that wants the item, the
|
|
item is not added to any queue and an error notification is sent,
|
|
discarding the item.
|
|
"""
|
|
self.logger.info('Processing queue item: %s', pformat(queueitem))
|
|
jobid = queueitem['jobid']
|
|
if 'pkg_id' not in queueitem['jobspec']:
|
|
queueitem['jobspec']['pkg_id'] = self.get_pkgid(jobid)
|
|
package_id = queueitem['jobspec']['pkg_id']
|
|
package_manager = PackageManager(package_id,
|
|
self.packages / package_id)
|
|
package_tempcopy_dir = self.tempdir / f'{jobid}.package'
|
|
package = package_manager.make_temporary_copy(package_tempcopy_dir)
|
|
queueitem['existing'] = package
|
|
|
|
self.logger.info('%s - Determining tasks for package %s',
|
|
jobid, package_id)
|
|
wants = {}
|
|
for handlerclass, handlerqueue in self.handlers.items():
|
|
try:
|
|
if handlerclass.wants(queueitem['jobspec'],
|
|
queueitem['existing']):
|
|
self.logger.info('%s - wanted by %s',
|
|
jobid,
|
|
handlerclass.__name__)
|
|
wants[handlerclass] = handlerqueue
|
|
except ValidationException as e:
|
|
url = queueitem.get('notification_url', None)
|
|
return self.notifier.put(ErrorNote(jobid,
|
|
url,
|
|
handlerclass.__name__,
|
|
e))
|
|
self.collector.put(PendingItem(queueitem, list(wants.keys())))
|
|
for handlerqueue in wants.values():
|
|
handlerqueue.put(queueitem)
|
|
|
|
def get_pkgid(self, jobid):
|
|
cachefile = self.pkgid_cache / jobid
|
|
if cachefile.exists():
|
|
return cachefile.read_text()
|
|
new_id = str(uuid4())
|
|
cachefile.write_text(new_id)
|
|
self.logger.info('%s - Created new package ID %s',
|
|
jobid, new_id)
|
|
return new_id
|