87da30ab93
As a consequence, the storage module no longer provides a singleton.
123 lines
4.2 KiB
Python
123 lines
4.2 KiB
Python
import logging
|
|
import logging.handlers
|
|
import multiprocessing as mp
|
|
import time
|
|
|
|
from configparser import ConfigParser
|
|
from pathlib import Path
|
|
from queue import Queue
|
|
from sched import scheduler
|
|
|
|
from .deduplogger import DedupLogHandler
|
|
from .notifierthread import NotifierThread
|
|
from .queuereader import QueueReader
|
|
from .retrythread import RetryThread
|
|
from .schedulethread import ScheduleThread
|
|
from .storage import FileHandler
|
|
from .whisperthread import WhisperThread
|
|
|
|
|
|
class Pipeline:
|
|
def __init__(self, config: ConfigParser):
|
|
self.running = False
|
|
self.config = config
|
|
|
|
# Since all keys in worker are optional, also make that section
|
|
# itself optional.
|
|
if 'worker' not in config:
|
|
config.add_section('worker')
|
|
|
|
# Since all keys in logging are optional, also make that section
|
|
# itself optional.
|
|
if 'logging' not in config:
|
|
config.add_section('logging')
|
|
|
|
# Basic logging settings
|
|
baselogger = logging.getLogger('whisper-daemon')
|
|
if baselogger.hasHandlers():
|
|
baselogger.handlers = []
|
|
log_level = config['logging'].get('log_level', 'ERROR')
|
|
baselogger.setLevel(log_level)
|
|
|
|
fmt = logging.Formatter('%(levelname)s in %(name)s: %(message)s')
|
|
|
|
stderrlog = logging.StreamHandler()
|
|
stderrlog.setLevel(log_level)
|
|
stderrlog.setFormatter(fmt)
|
|
baselogger.addHandler(DedupLogHandler(stderrlog, 3, 30))
|
|
|
|
# Mail logging settings
|
|
if 'mail_level' in config['logging']:
|
|
from_addr = config['logging']['mail_from']
|
|
to_addr = config['logging']['mail_to']
|
|
subject = config['logging']['mail_subject']
|
|
maillog = logging.handlers.SMTPHandler('localhost',
|
|
from_addr,
|
|
[to_addr],
|
|
subject)
|
|
maillog.setLevel(config['logging']['mail_level'])
|
|
maillog.setFormatter(fmt)
|
|
baselogger.addHandler(maillog)
|
|
|
|
self.logger = logging.getLogger('whisper-daemon.pipeline')
|
|
|
|
def start(self):
|
|
if self.running:
|
|
raise Exception('Already started!')
|
|
self.running = True
|
|
self.logger.info('Starting')
|
|
queuedir = Path(self.config['whisper-daemon']['queuedir'])
|
|
modeldir = Path(self.config['whisper-daemon']['modeldir'])
|
|
donedir = Path(self.config['whisper-daemon']['donedir'])
|
|
workdir = Path(self.config['whisper-daemon']['workdir'])
|
|
model = self.config['whisper-daemon']['model']
|
|
worker_config = self.config['worker']
|
|
|
|
filehandler = FileHandler(queuedir, donedir, workdir)
|
|
|
|
# The resultqueue must be a multiprocessing queue since it will be
|
|
# receiving items from the separate whisper process.
|
|
# Otherwise sticking to basic queue.
|
|
jobqueue = Queue()
|
|
resultqueue = mp.Queue()
|
|
retryqueue = Queue()
|
|
filehandler.populate_queues(jobqueue,
|
|
resultqueue,
|
|
retryqueue)
|
|
|
|
self.watcher = QueueReader(jobqueue, queuedir, filehandler)
|
|
self.whisper = WhisperThread(jobqueue,
|
|
resultqueue,
|
|
donedir,
|
|
model,
|
|
modeldir,
|
|
worker_config)
|
|
self.notifier = NotifierThread(resultqueue, retryqueue)
|
|
|
|
schedule = scheduler(time.time, time.sleep)
|
|
self.retrier = RetryThread(retryqueue, resultqueue, schedule)
|
|
self.scheduler = ScheduleThread(schedule)
|
|
|
|
|
|
self.scheduler.start()
|
|
self.retrier.start()
|
|
self.notifier.start()
|
|
self.whisper.start()
|
|
self.watcher.start()
|
|
|
|
self.logger.debug('Started')
|
|
|
|
def stop(self):
|
|
if not self.running:
|
|
raise Exception('Not running!')
|
|
self.running = False
|
|
self.logger.info('Stopping')
|
|
|
|
self.watcher.shutdown()
|
|
self.whisper.shutdown(block=False)
|
|
self.notifier.shutdown()
|
|
self.retrier.shutdown()
|
|
self.scheduler.shutdown()
|
|
|
|
self.logger.debug('Stopped')
|