play-daemon/daemon/__init__.py
2021-10-26 15:38:43 +02:00

67 lines
2.5 KiB
Python

import json
import logging
import logging.handlers
import os
from queue import Queue
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
from pipeline import Pipeline
class Daemon:
def __init__(self, config):
self.queue_dir = config['daemon']['queue']
self.error_dir = config['daemon']['error']
self.logger = logging.getLogger('play-daemon')
self.logger.setLevel('DEBUG')
if 'mail_level' in config['daemon']:
from_addr = config['daemon']['mail_from']
to_addr = config['daemon']['mail_to']
subject = config['daemon']['mail_subject']
maillog = logging.handlers.SMTPHandler('localhost',
from_addr,
[to_addr],
subject)
maillog.setLevel(config['daemon']['mail_level'])
self.logger.addHandler(maillog)
syslog = logging.StreamHandler()
syslog.setLevel(config['daemon'].get('log_level', 'ERROR'))
self.logger.addHandler(syslog)
self.pipeline = Pipeline(config)
self.queue = Queue()
self.watcher = Observer()
self.watcher.schedule(QueueUpdater(self.queue), self.queue_dir)
self.watcher.start()
for p in os.listdir(self.queue_dir):
path = os.path.join(self.queue_dir, p)
if os.path.isfile(path):
self.queue.put(path)
def do_processing(self):
while True:
try:
next_job = self.queue.get()
pres_id = os.path.basename(next_job)
queue_item = None
with open(next_job) as f:
queue_item = json.load(f)
self.pipeline.process(pres_id, queue_item)
except KeyboardInterrupt:
return
except Exception as e:
msg = "Exception during processing of queue item %s: %s"
self.logger.exception(msg, pres_id, queue_item, exc_info=e)
# write the que info and error to disk for play-api to display the status
with open(os.path.join(self.error_dir, pres_id), 'w') as f:
queue_item['error'] = e
json.dump(queue_item, f)
class QueueUpdater(FileSystemEventHandler):
def __init__(self, queue):
self.queue = queue
def on_created(self, event):
self.queue.put(event.src_path)