import json import logging import multiprocessing as mp import os import re from dataclasses import asdict from pathlib import Path from queue import Queue, Empty from .job import Job, Result, Pending, FailedNotification class FileHandler: def __init__(self, queuedir: Path, donedir: Path, workdir: Path): self.queuedir = queuedir self.resultdir = workdir / 'results' self.retrydir = workdir / 'retries' self.logger = logging.getLogger('whisper-daemon.FileHandler') self.pattern = re.compile(r'.+\.json') for path in self.resultdir, self.retrydir: if not path.exists(): path.mkdir() # Since the whisper process creates a donedir way before it writes # the transcription there, we clean out any donedirs that are empty. for subdir in donedir.iterdir(): if not subdir.is_dir(): continue if not any(subdir.iterdir()): self.logger.debug('Deleted empty result dir %s', subdir) subdir.rmdir() self.logger.info('Initialized.') self.logger.debug('Reading queue items from %s', self.queuedir) self.logger.debug('Filtering queue items on %s', self.pattern.pattern) self.logger.debug('Reading results from %s', self.resultdir) self.logger.debug('Reading retries from %s', self.retrydir) def init_job(self, queuefile: Path) -> Job: jobid = queuefile.stem job = None result_data = None retry_data = None with open(queuefile) as f: jobdata = json.load(f) result_file = self.resultdir / f'{jobid}.json' if result_file.exists(): with open(result_file) as f: result_data = json.load(f) retry_file = self.retrydir / f'{jobid}.json' if retry_file.exists(): with open(retry_file) as f: retry_data = json.load(f) return Job.from_dict(jobid=jobid, filehandler=self, status=result_data, notification_status=retry_data, **jobdata) def read_jobs(self) -> list[Job]: jobs = [] for f in sorted(self.queuedir.iterdir(), key=os.path.getmtime): if not self.queuefile_is_valid(f): continue try: job = self.init_job(f) jobs.append(job) except Exception as e: self.logger.error('Error when processing %s', f, exc_info=e) return jobs def queuefile_is_valid(self, queuefile: Path) -> bool: filename = queuefile.name self.logger.debug('Evaluating %s', filename) if not queuefile.is_file(): self.logger.debug('Ignoring, %s is not a regular file', filename) return False if not self.pattern.fullmatch(filename): self.logger.debug( 'Ignoring, %s does not match expected name format', filename) return False self.logger.debug('%s looks like a valid queue item', filename) return True def write_status_file(self, path: Path, data: Result): with open(path, 'w') as f: json.dump(asdict(data), f, default=str) def write_result_file(self, jobid: str, result: Result): result_file = self.resultdir / f'{jobid}.json' self.write_status_file(result_file, result) def write_retry_file(self, jobid: str, result: Result): retry_file = self.retrydir / f'{jobid}.json' self.write_status_file(retry_file, result) def populate_queues(self, jobqueue: Queue, resultqueue: mp.Queue, retryqueue: Queue) -> None: self.logger.debug('Populating queues') for job in self.read_jobs(): jobid = job.jobid jobstatus = job.status.result if jobstatus == 'Pending': self.logger.debug('Queueing %s for processing', jobid) jobqueue.put(job) elif job.notification_status is None: self.logger.debug('Queueing %s for notification', jobid) resultqueue.put(job) else: self.logger.debug('Queueing %s to retry notification', jobid) retryqueue.put(job)