play-daemon-threaded/pipeline/queuereader.py

75 lines
2.6 KiB
Python

import json
import logging
import os
import re
from pathlib import Path
from pprint import pformat
from watchdog.events import FileSystemEventHandler
from watchdog.observers.polling import PollingObserver
from .notifier import ErrorNote
class QueueReader(FileSystemEventHandler):
def __init__(self, preprocessors, notifier, queuedir):
self.preprocessors = preprocessors
self.notifier = notifier
self.queuedir = queuedir
self.watcher = PollingObserver()
self.watcher.schedule(self, queuedir)
# The following regex matches a UUID:
# A string consisting of hexadecimal characters split by dashes
# 8x-4x-4x-4x-12x
self.pattern = re.compile(
r'[0-9a-f]{8}(-[0-9a-f]{4}){3}-[0-9a-f]{12}')
self.logger = logging.getLogger('play-daemon.QueueReader')
def start(self):
for f in sorted(Path(self.queuedir).iterdir(), key=os.path.getmtime):
self._process(f)
return self.watcher.start()
def shutdown(self):
return self.watcher.stop()
def _process(self, path):
self.logger.info('Evaluating %s', path)
if not path.is_file():
self.logger.info('Ignoring, %s is not a regular file', path)
return
if not self.pattern.fullmatch(path.name):
self.logger.info(
'Ignoring, %s does not match expected name format', path)
return
with open(path) as f:
item = json.load(f)
job = {'jobid': path.name,
'queueitem': item}
self.logger.info('Processing job: %s', pformat(job))
try:
if 'type' not in item:
raise KeyError("Job specification missing 'type' key.")
_type = item['type']
if _type not in self.preprocessors:
raise ValueError(
f"Invalid type '{_type}' in job specification.")
self.preprocessors[_type].validate(item)
self.preprocessors[_type].put(job)
self.logger.info('%s queued for preprocessing with %s',
job['jobid'],
self.preprocessors[_type].__class__.__name__)
except (KeyError, ValueError) as e:
url = job.get('notification_url', None)
self.notifier.put(ErrorNote(job['jobid'],
url,
self.__class__.__name__,
e))
def on_created(self, event):
self._process(Path(event.src_path))
def on_moved(self, event):
self._process(Path(event.dest_path))