75 lines
2.6 KiB
Python
75 lines
2.6 KiB
Python
import json
|
|
import logging
|
|
import os
|
|
import re
|
|
|
|
from pathlib import Path
|
|
from pprint import pformat
|
|
|
|
from watchdog.events import FileSystemEventHandler
|
|
from watchdog.observers.polling import PollingObserver
|
|
|
|
from .notifier import ErrorNote
|
|
|
|
|
|
class QueueReader(FileSystemEventHandler):
|
|
def __init__(self, preprocessors, notifier, queuedir):
|
|
self.preprocessors = preprocessors
|
|
self.notifier = notifier
|
|
self.queuedir = queuedir
|
|
self.watcher = PollingObserver()
|
|
self.watcher.schedule(self, queuedir)
|
|
# The following regex matches a UUID:
|
|
# A string consisting of hexadecimal characters split by dashes
|
|
# 8x-4x-4x-4x-12x
|
|
self.pattern = re.compile(
|
|
r'[0-9a-f]{8}(-[0-9a-f]{4}){3}-[0-9a-f]{12}')
|
|
self.logger = logging.getLogger('play-daemon.QueueReader')
|
|
|
|
def start(self):
|
|
for f in sorted(Path(self.queuedir).iterdir(), key=os.path.getmtime):
|
|
self._process(f)
|
|
return self.watcher.start()
|
|
|
|
def shutdown(self):
|
|
return self.watcher.stop()
|
|
|
|
def _process(self, path):
|
|
self.logger.info('Evaluating %s', path)
|
|
if not path.is_file():
|
|
self.logger.info('Ignoring, %s is not a regular file', path)
|
|
return
|
|
if not self.pattern.fullmatch(path.name):
|
|
self.logger.info(
|
|
'Ignoring, %s does not match expected name format', path)
|
|
return
|
|
with open(path) as f:
|
|
item = json.load(f)
|
|
job = {'jobid': path.name,
|
|
'queueitem': item}
|
|
self.logger.info('Processing job: %s', pformat(job))
|
|
try:
|
|
if 'type' not in item:
|
|
raise KeyError("Job specification missing 'type' key.")
|
|
_type = item['type']
|
|
if _type not in self.preprocessors:
|
|
raise ValueError(
|
|
f"Invalid type '{_type}' in job specification.")
|
|
self.preprocessors[_type].validate(item)
|
|
self.preprocessors[_type].put(job)
|
|
self.logger.info('%s queued for preprocessing with %s',
|
|
job['jobid'],
|
|
self.preprocessors[_type].__class__.__name__)
|
|
except (KeyError, ValueError) as e:
|
|
url = job.get('notification_url', None)
|
|
self.notifier.put(ErrorNote(job['jobid'],
|
|
url,
|
|
self.__class__.__name__,
|
|
e))
|
|
|
|
def on_created(self, event):
|
|
self._process(Path(event.src_path))
|
|
|
|
def on_moved(self, event):
|
|
self._process(Path(event.dest_path))
|