import json import logging import logging.handlers import os from queue import Queue from watchdog.observers import Observer from watchdog.events import FileSystemEventHandler from pipeline import Pipeline class Daemon: def __init__(self, config): self.queue_dir = config['daemon']['queue'] self.error_dir = config['daemon']['error'] self.logger = logging.getLogger('play-daemon') self.logger.setLevel('DEBUG') if 'mail_level' in config['daemon']: from_addr = config['daemon']['mail_from'] to_addr = config['daemon']['mail_to'] subject = config['daemon']['mail_subject'] maillog = logging.handlers.SMTPHandler('localhost', from_addr, [to_addr], subject) maillog.setLevel(config['daemon']['mail_level']) self.logger.addHandler(maillog) syslog = logging.StreamHandler() syslog.setLevel(config['daemon'].get('log_level', 'ERROR')) self.logger.addHandler(syslog) self.pipeline = Pipeline(config) self.queue = Queue() self.watcher = Observer() self.watcher.schedule(QueueUpdater(self.queue), self.queue_dir) self.watcher.start() for p in os.listdir(self.queue_dir): path = os.path.join(self.queue_dir, p) if os.path.isfile(path): self.queue.put(path) def do_processing(self): while True: try: next_job = self.queue.get() pres_id = os.path.basename(next_job) queue_item = None with open(next_job) as f: queue_item = json.load(f) self.pipeline.process(pres_id, queue_item) except KeyboardInterrupt: return except Exception as e: msg = "Exception during processing of queue item %s: %s" self.logger.exception(msg, pres_id, queue_item, exc_info=e) # write the que info and error to disk for play-api to display the status with open(os.path.join(self.error_dir, pres_id), 'w') as f: queue_item['error'] = e json.dump(queue_item, f) class QueueUpdater(FileSystemEventHandler): def __init__(self, queue): self.queue = queue def on_created(self, event): self.queue.put(event.src_path)