89 lines
3.0 KiB
Python
89 lines
3.0 KiB
Python
import json
|
|
import logging
|
|
import logging.handlers
|
|
import os
|
|
import re
|
|
import traceback
|
|
from queue import Queue
|
|
from watchdog.observers import Observer
|
|
from watchdog.events import FileSystemEventHandler
|
|
|
|
from pipeline import Pipeline
|
|
|
|
class Daemon:
|
|
def __init__(self, config):
|
|
self.queue_dir = config['daemon']['queue']
|
|
self.error_dir = config['daemon']['error']
|
|
self.logger = logging.getLogger('play-daemon')
|
|
if 'mail_level' in config['daemon']:
|
|
from_addr = config['daemon']['mail_from']
|
|
to_addr = config['daemon']['mail_to']
|
|
subject = config['daemon']['mail_subject']
|
|
maillog = logging.handlers.SMTPHandler('localhost',
|
|
from_addr,
|
|
[to_addr],
|
|
subject)
|
|
maillog.setLevel(config['daemon']['mail_level'])
|
|
self.logger.addHandler(maillog)
|
|
loglevel = config['daemon'].get('log_level', 'ERROR')
|
|
self.logger.setLevel(loglevel)
|
|
syslog = logging.StreamHandler()
|
|
syslog.setLevel(loglevel)
|
|
self.logger.addHandler(syslog)
|
|
|
|
self.pipeline = Pipeline(config)
|
|
self.queue = Queue()
|
|
self.watcher = Observer()
|
|
self.watcher.schedule(QueueUpdater(self.queue), self.queue_dir)
|
|
self.watcher.start()
|
|
|
|
def path_filter(path):
|
|
if not re.match(r'^[0-9a-f-]+$', os.path.basename(path)):
|
|
return False
|
|
if not os.path.isfile(path):
|
|
return False
|
|
return True
|
|
|
|
paths = filter(path_filter, [os.path.join(self.queue_dir, item)
|
|
for item in os.listdir(self.queue_dir)])
|
|
for path in sorted(paths, key=os.path.getctime):
|
|
self.queue.put(path)
|
|
|
|
def do_processing(self):
|
|
while True:
|
|
try:
|
|
next_job = self.queue.get()
|
|
pres_id = os.path.basename(next_job)
|
|
queue_item = None
|
|
with open(next_job) as f:
|
|
queue_item = json.load(f)
|
|
self.pipeline.process(pres_id, queue_item)
|
|
except KeyboardInterrupt:
|
|
return
|
|
except Exception as e:
|
|
stack = traceback.format_exception(type(e),
|
|
e,
|
|
e.__traceback__)
|
|
self.logger.error('%s - Exception:\n%s',
|
|
pres_id, ''.join(stack))
|
|
continue
|
|
|
|
|
|
class QueueUpdater(FileSystemEventHandler):
|
|
def __init__(self, queue):
|
|
self.queue = queue
|
|
|
|
def _handle(self, event):
|
|
path = event.src_path
|
|
if not os.path.isfile(path):
|
|
return
|
|
if not re.match(r'^[0-9a-f-]+$', os.path.basename(path)):
|
|
return
|
|
self.queue.put(path)
|
|
|
|
def on_created(self, event):
|
|
self._handle(event)
|
|
|
|
def on_moved(self, event):
|
|
self._handle(event)
|