import json import logging import logging.handlers import os import re import traceback from queue import Queue from watchdog.observers import Observer from watchdog.events import FileSystemEventHandler from pipeline import Pipeline class Daemon: def __init__(self, config): self.queue_dir = config['daemon']['queue'] self.error_dir = config['daemon']['error'] self.logger = logging.getLogger('play-daemon') if 'mail_level' in config['daemon']: from_addr = config['daemon']['mail_from'] to_addr = config['daemon']['mail_to'] subject = config['daemon']['mail_subject'] maillog = logging.handlers.SMTPHandler('localhost', from_addr, [to_addr], subject) maillog.setLevel(config['daemon']['mail_level']) self.logger.addHandler(maillog) loglevel = config['daemon'].get('log_level', 'ERROR') self.logger.setLevel(loglevel) syslog = logging.StreamHandler() syslog.setLevel(loglevel) self.logger.addHandler(syslog) self.pipeline = Pipeline(config) self.queue = Queue() self.watcher = Observer() self.watcher.schedule(QueueUpdater(self.queue), self.queue_dir) self.watcher.start() def path_filter(path): if not re.match(r'^[0-9a-f-]+$', os.path.basename(path)): return False if not os.path.isfile(path): return False return True paths = filter(path_filter, [os.path.join(self.queue_dir, item) for item in os.listdir(self.queue_dir)]) for path in sorted(paths, key=os.path.getctime): self.queue.put(path) def do_processing(self): while True: try: next_job = self.queue.get() pres_id = os.path.basename(next_job) queue_item = None with open(next_job) as f: queue_item = json.load(f) self.pipeline.process(pres_id, queue_item) except KeyboardInterrupt: return except Exception as e: stack = traceback.format_exception(type(e), e, e.__traceback__) self.logger.error('%s - Exception:\n%s', pres_id, ''.join(stack)) continue class QueueUpdater(FileSystemEventHandler): def __init__(self, queue): self.queue = queue def _handle(self, event): path = event.src_path if not os.path.isfile(path): return if not re.match(r'^[0-9a-f-]+$', os.path.basename(path)): return self.queue.put(path) def on_created(self, event): self._handle(event) def on_moved(self, event): self._handle(event)