play-daemon/daemon/__init__.py

89 lines
3.0 KiB
Python

import json
import logging
import logging.handlers
import os
import re
import traceback
from queue import Queue
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
from pipeline import Pipeline
class Daemon:
def __init__(self, config):
self.queue_dir = config['daemon']['queue']
self.error_dir = config['daemon']['error']
self.logger = logging.getLogger('play-daemon')
if 'mail_level' in config['daemon']:
from_addr = config['daemon']['mail_from']
to_addr = config['daemon']['mail_to']
subject = config['daemon']['mail_subject']
maillog = logging.handlers.SMTPHandler('localhost',
from_addr,
[to_addr],
subject)
maillog.setLevel(config['daemon']['mail_level'])
self.logger.addHandler(maillog)
loglevel = config['daemon'].get('log_level', 'ERROR')
self.logger.setLevel(loglevel)
syslog = logging.StreamHandler()
syslog.setLevel(loglevel)
self.logger.addHandler(syslog)
self.pipeline = Pipeline(config)
self.queue = Queue()
self.watcher = Observer()
self.watcher.schedule(QueueUpdater(self.queue), self.queue_dir)
self.watcher.start()
def path_filter(path):
if not re.match(r'^[0-9a-f-]+$', os.path.basename(path)):
return False
if not os.path.isfile(path):
return False
return True
paths = filter(path_filter, [os.path.join(self.queue_dir, item)
for item in os.listdir(self.queue_dir)])
for path in sorted(paths, key=os.path.getctime):
self.queue.put(path)
def do_processing(self):
while True:
try:
next_job = self.queue.get()
pres_id = os.path.basename(next_job)
queue_item = None
with open(next_job) as f:
queue_item = json.load(f)
self.pipeline.process(pres_id, queue_item)
except KeyboardInterrupt:
return
except Exception as e:
stack = traceback.format_exception(type(e),
e,
e.__traceback__)
self.logger.error('%s - Exception:\n%s',
pres_id, ''.join(stack))
continue
class QueueUpdater(FileSystemEventHandler):
def __init__(self, queue):
self.queue = queue
def _handle(self, event):
path = event.src_path
if not os.path.isfile(path):
return
if not re.match(r'^[0-9a-f-]+$', os.path.basename(path)):
return
self.queue.put(path)
def on_created(self, event):
self._handle(event)
def on_moved(self, event):
self._handle(event)