Erik Thuning 87da30ab93 Moved the responsibility for storing job state to disk into Job update methods.
As a consequence, the storage module no longer provides a singleton.
2024-06-03 14:27:31 +02:00

123 lines
4.2 KiB
Python

import logging
import logging.handlers
import multiprocessing as mp
import time
from configparser import ConfigParser
from pathlib import Path
from queue import Queue
from sched import scheduler
from .deduplogger import DedupLogHandler
from .notifierthread import NotifierThread
from .queuereader import QueueReader
from .retrythread import RetryThread
from .schedulethread import ScheduleThread
from .storage import FileHandler
from .whisperthread import WhisperThread
class Pipeline:
def __init__(self, config: ConfigParser):
self.running = False
self.config = config
# Since all keys in worker are optional, also make that section
# itself optional.
if 'worker' not in config:
config.add_section('worker')
# Since all keys in logging are optional, also make that section
# itself optional.
if 'logging' not in config:
config.add_section('logging')
# Basic logging settings
baselogger = logging.getLogger('whisper-daemon')
if baselogger.hasHandlers():
baselogger.handlers = []
log_level = config['logging'].get('log_level', 'ERROR')
baselogger.setLevel(log_level)
fmt = logging.Formatter('%(levelname)s in %(name)s: %(message)s')
stderrlog = logging.StreamHandler()
stderrlog.setLevel(log_level)
stderrlog.setFormatter(fmt)
baselogger.addHandler(DedupLogHandler(stderrlog, 3, 30))
# Mail logging settings
if 'mail_level' in config['logging']:
from_addr = config['logging']['mail_from']
to_addr = config['logging']['mail_to']
subject = config['logging']['mail_subject']
maillog = logging.handlers.SMTPHandler('localhost',
from_addr,
[to_addr],
subject)
maillog.setLevel(config['logging']['mail_level'])
maillog.setFormatter(fmt)
baselogger.addHandler(maillog)
self.logger = logging.getLogger('whisper-daemon.pipeline')
def start(self):
if self.running:
raise Exception('Already started!')
self.running = True
self.logger.info('Starting')
queuedir = Path(self.config['whisper-daemon']['queuedir'])
modeldir = Path(self.config['whisper-daemon']['modeldir'])
donedir = Path(self.config['whisper-daemon']['donedir'])
workdir = Path(self.config['whisper-daemon']['workdir'])
model = self.config['whisper-daemon']['model']
worker_config = self.config['worker']
filehandler = FileHandler(queuedir, donedir, workdir)
# The resultqueue must be a multiprocessing queue since it will be
# receiving items from the separate whisper process.
# Otherwise sticking to basic queue.
jobqueue = Queue()
resultqueue = mp.Queue()
retryqueue = Queue()
filehandler.populate_queues(jobqueue,
resultqueue,
retryqueue)
self.watcher = QueueReader(jobqueue, queuedir, filehandler)
self.whisper = WhisperThread(jobqueue,
resultqueue,
donedir,
model,
modeldir,
worker_config)
self.notifier = NotifierThread(resultqueue, retryqueue)
schedule = scheduler(time.time, time.sleep)
self.retrier = RetryThread(retryqueue, resultqueue, schedule)
self.scheduler = ScheduleThread(schedule)
self.scheduler.start()
self.retrier.start()
self.notifier.start()
self.whisper.start()
self.watcher.start()
self.logger.debug('Started')
def stop(self):
if not self.running:
raise Exception('Not running!')
self.running = False
self.logger.info('Stopping')
self.watcher.shutdown()
self.whisper.shutdown(block=False)
self.notifier.shutdown()
self.retrier.shutdown()
self.scheduler.shutdown()
self.logger.debug('Stopped')