53 lines
1.5 KiB
Python
53 lines
1.5 KiB
Python
import logging
|
|
import os
|
|
import re
|
|
|
|
from pathlib import Path
|
|
from queue import Queue
|
|
|
|
from watchdog.events import FileSystemEventHandler
|
|
from watchdog.observers.polling import PollingObserver
|
|
|
|
from .storage import FileHandler
|
|
from .job import Job
|
|
|
|
|
|
class QueueReader(FileSystemEventHandler):
|
|
def __init__(self,
|
|
processing_queue: Queue,
|
|
queuedir: Path,
|
|
filehandler: FileHandler):
|
|
self.queuedir = queuedir
|
|
self.queue = processing_queue
|
|
self.filehandler = filehandler
|
|
self.watcher = PollingObserver()
|
|
self.watcher.schedule(self, self.queuedir)
|
|
self.logger = logging.getLogger('whisper-daemon.QueueReader')
|
|
|
|
def start(self):
|
|
self.logger.debug('Starting')
|
|
self.logger.info("Reading queue items from '%s'", self.queuedir)
|
|
return self.watcher.start()
|
|
|
|
def shutdown(self):
|
|
self.logger.debug('Stopping')
|
|
return self.watcher.stop()
|
|
|
|
def _process(self, path: Path):
|
|
if not self.filehandler.queuefile_is_valid(path):
|
|
return
|
|
|
|
jobid = path.stem
|
|
try:
|
|
job = self.filehandler.init_job(path)
|
|
self.queue.put(job)
|
|
self.logger.info('Queued job %s', jobid)
|
|
except Exception as e:
|
|
self.logger.error('Error when processing %s', jobid, exc_info=e)
|
|
|
|
def on_created(self, event):
|
|
self._process(Path(event.src_path))
|
|
|
|
def on_moved(self, event):
|
|
self._process(Path(event.dest_path))
|