67 lines
2.5 KiB
Python
67 lines
2.5 KiB
Python
import json
|
|
import logging
|
|
import logging.handlers
|
|
import os
|
|
from queue import Queue
|
|
from watchdog.observers import Observer
|
|
from watchdog.events import FileSystemEventHandler
|
|
|
|
from pipeline import Pipeline
|
|
|
|
class Daemon:
|
|
def __init__(self, config):
|
|
self.queue_dir = config['daemon']['queue']
|
|
self.error_dir = config['daemon']['error']
|
|
self.logger = logging.getLogger('play-daemon')
|
|
self.logger.setLevel('DEBUG')
|
|
if 'mail_level' in config['daemon']:
|
|
from_addr = config['daemon']['mail_from']
|
|
to_addr = config['daemon']['mail_to']
|
|
subject = config['daemon']['mail_subject']
|
|
maillog = logging.handlers.SMTPHandler('localhost',
|
|
from_addr,
|
|
[to_addr],
|
|
subject)
|
|
maillog.setLevel(config['daemon']['mail_level'])
|
|
self.logger.addHandler(maillog)
|
|
syslog = logging.StreamHandler()
|
|
syslog.setLevel(config['daemon'].get('log_level', 'ERROR'))
|
|
self.logger.addHandler(syslog)
|
|
|
|
self.pipeline = Pipeline(config)
|
|
self.queue = Queue()
|
|
self.watcher = Observer()
|
|
self.watcher.schedule(QueueUpdater(self.queue), self.queue_dir)
|
|
self.watcher.start()
|
|
for p in os.listdir(self.queue_dir):
|
|
path = os.path.join(self.queue_dir, p)
|
|
if os.path.isfile(path):
|
|
self.queue.put(path)
|
|
|
|
def do_processing(self):
|
|
while True:
|
|
try:
|
|
next_job = self.queue.get()
|
|
pres_id = os.path.basename(next_job)
|
|
queue_item = None
|
|
with open(next_job) as f:
|
|
queue_item = json.load(f)
|
|
self.pipeline.process(pres_id, queue_item)
|
|
except KeyboardInterrupt:
|
|
return
|
|
except Exception as e:
|
|
msg = "Exception during processing of queue item %s: %s"
|
|
self.logger.exception(msg, pres_id, queue_item, exc_info=e)
|
|
# write the que info and error to disk for play-api to display the status
|
|
with open(os.path.join(self.error_dir, pres_id), 'w') as f:
|
|
queue_item['error'] = e
|
|
json.dump(queue_item, f)
|
|
|
|
|
|
class QueueUpdater(FileSystemEventHandler):
|
|
def __init__(self, queue):
|
|
self.queue = queue
|
|
|
|
def on_created(self, event):
|
|
self.queue.put(event.src_path)
|