whisper-daemon/pipeline/queuereader.py
Erik Thuning 87da30ab93 Moved the responsibility for storing job state to disk into Job update methods.
As a consequence, the storage module no longer provides a singleton.
2024-06-03 14:27:31 +02:00

53 lines
1.5 KiB
Python

import logging
import os
import re
from pathlib import Path
from queue import Queue
from watchdog.events import FileSystemEventHandler
from watchdog.observers.polling import PollingObserver
from .storage import FileHandler
from .job import Job
class QueueReader(FileSystemEventHandler):
def __init__(self,
processing_queue: Queue,
queuedir: Path,
filehandler: FileHandler):
self.queuedir = queuedir
self.queue = processing_queue
self.filehandler = filehandler
self.watcher = PollingObserver()
self.watcher.schedule(self, self.queuedir)
self.logger = logging.getLogger('whisper-daemon.QueueReader')
def start(self):
self.logger.debug('Starting')
self.logger.info("Reading queue items from '%s'", self.queuedir)
return self.watcher.start()
def shutdown(self):
self.logger.debug('Stopping')
return self.watcher.stop()
def _process(self, path: Path):
if not self.filehandler.queuefile_is_valid(path):
return
jobid = path.stem
try:
job = self.filehandler.init_job(path)
self.queue.put(job)
self.logger.info('Queued job %s', jobid)
except Exception as e:
self.logger.error('Error when processing %s', jobid, exc_info=e)
def on_created(self, event):
self._process(Path(event.src_path))
def on_moved(self, event):
self._process(Path(event.dest_path))