import json import logging import os import re from pathlib import Path from pprint import pformat from watchdog.events import FileSystemEventHandler from watchdog.observers.polling import PollingObserver from .notifier import ErrorNote class QueueReader(FileSystemEventHandler): def __init__(self, preprocessors, notifier, queuedir): self.preprocessors = preprocessors self.notifier = notifier self.queuedir = queuedir self.watcher = PollingObserver() self.watcher.schedule(self, queuedir) # The following regex matches a UUID: # A string consisting of hexadecimal characters split by dashes # 8x-4x-4x-4x-12x self.pattern = re.compile( r'[0-9a-f]{8}(-[0-9a-f]{4}){3}-[0-9a-f]{12}') self.logger = logging.getLogger('play-daemon.QueueReader') def start(self): for f in sorted(Path(self.queuedir).iterdir(), key=os.path.getmtime): self._process(f) return self.watcher.start() def shutdown(self): return self.watcher.stop() def _process(self, path): self.logger.info('Evaluating %s', path) if not path.is_file(): self.logger.info('Ignoring, %s is not a regular file', path) return if not self.pattern.fullmatch(path.name): self.logger.info( 'Ignoring, %s does not match expected name format', path) return with open(path) as f: item = json.load(f) job = {'jobid': path.name, 'queueitem': item} self.logger.info('Processing job: %s', pformat(job)) try: if 'type' not in item: raise KeyError("Job specification missing 'type' key.") _type = item['type'] if _type not in self.preprocessors: raise ValueError( f"Invalid type '{_type}' in job specification.") self.preprocessors[_type].validate(item) self.preprocessors[_type].put(job) self.logger.info('%s queued for preprocessing with %s', job['jobid'], self.preprocessors[_type].__class__.__name__) except (KeyError, ValueError) as e: url = job.get('notification_url', None) self.notifier.put(ErrorNote(job['jobid'], url, self.__class__.__name__, e)) def on_created(self, event): self._process(Path(event.src_path)) def on_moved(self, event): self._process(Path(event.dest_path))