play-daemon/daemon/__init__.py

98 lines
3.5 KiB
Python

import json
import logging
import logging.handlers
import os
import re
import traceback
from queue import Queue
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
from pipeline import Pipeline
class Daemon:
def __init__(self, config):
self.queue_dir = config['daemon']['queue']
self.error_dir = config['daemon']['error']
self.logger = logging.getLogger('play-daemon')
if 'mail_level' in config['daemon']:
from_addr = config['daemon']['mail_from']
to_addr = config['daemon']['mail_to']
subject = config['daemon']['mail_subject']
maillog = logging.handlers.SMTPHandler('localhost',
from_addr,
[to_addr],
subject)
maillog.setLevel(config['daemon']['mail_level'])
self.logger.addHandler(maillog)
loglevel = config['daemon'].get('log_level', 'ERROR')
self.logger.setLevel(loglevel)
syslog = logging.StreamHandler()
syslog.setLevel(loglevel)
self.logger.addHandler(syslog)
self.pipeline = Pipeline(config)
self.queue = Queue()
self.watcher = Observer()
self.watcher.schedule(QueueUpdater(self.queue), self.queue_dir)
self.watcher.start()
def path_filter(path):
if not re.match(r'^[0-9a-f-]+$', os.path.basename(path)):
return False
if not os.path.isfile(path):
return False
return True
paths = filter(path_filter, [os.path.join(self.queue_dir, item)
for item in os.listdir(self.queue_dir)])
for path in sorted(paths, key=os.path.getctime):
self.queue.put(path)
def do_processing(self):
while True:
try:
next_job = self.queue.get()
pres_id = os.path.basename(next_job)
queue_item = None
try:
with open(next_job) as f:
queue_item = json.load(f)
self.pipeline.process(pres_id, queue_item)
except FileNotFoundError as e:
self.logger.info('%s - Queue file not found, skipping',
pres_id)
except KeyboardInterrupt:
return
except Exception as e:
stack = traceback.format_exception(type(e),
e,
e.__traceback__)
self.logger.error('%s - Exception:\n%s',
pres_id, ''.join(stack))
continue
class QueueUpdater(FileSystemEventHandler):
def __init__(self, queue):
self.queue = queue
self.logger = logging.getLogger('play-daemon')
def _handle(self, path):
if not os.path.isfile(path):
return
if not re.match(r'^[0-9a-f-]+$', os.path.basename(path)):
return
self.queue.put(path)
self.logger.debug('Watchdog: added %s to queue', path)
def on_created(self, event):
self.logger.debug('Watchdog: creation event triggered for %s',
event.src_path)
self._handle(event.src_path)
def on_moved(self, event):
self.logger.debug('Watchdog: move event triggered for %s',
event.dest_path)
self._handle(event.dest_path)