Erik Thuning 87da30ab93 Moved the responsibility for storing job state to disk into Job update methods.
As a consequence, the storage module no longer provides a singleton.
2024-06-03 14:27:31 +02:00

122 lines
4.3 KiB
Python

import json
import logging
import multiprocessing as mp
import os
import re
from dataclasses import asdict
from pathlib import Path
from queue import Queue, Empty
from .job import Job, Result, Pending, FailedNotification
class FileHandler:
def __init__(self, queuedir: Path, donedir: Path, workdir: Path):
self.queuedir = queuedir
self.resultdir = workdir / 'results'
self.retrydir = workdir / 'retries'
self.logger = logging.getLogger('whisper-daemon.FileHandler')
self.pattern = re.compile(r'.+\.json')
for path in self.resultdir, self.retrydir:
if not path.exists():
path.mkdir()
# Since the whisper process creates a donedir way before it writes
# the transcription there, we clean out any donedirs that are empty.
for subdir in donedir.iterdir():
if not subdir.is_dir():
continue
if not any(subdir.iterdir()):
self.logger.debug('Deleted empty result dir %s', subdir)
subdir.rmdir()
self.logger.info('Initialized.')
self.logger.debug('Reading queue items from %s', self.queuedir)
self.logger.debug('Filtering queue items on %s', self.pattern.pattern)
self.logger.debug('Reading results from %s', self.resultdir)
self.logger.debug('Reading retries from %s', self.retrydir)
def init_job(self, queuefile: Path) -> Job:
jobid = queuefile.stem
job = None
result_data = None
retry_data = None
with open(queuefile) as f:
jobdata = json.load(f)
result_file = self.resultdir / f'{jobid}.json'
if result_file.exists():
with open(result_file) as f:
result_data = json.load(f)
retry_file = self.retrydir / f'{jobid}.json'
if retry_file.exists():
with open(retry_file) as f:
retry_data = json.load(f)
return Job.from_dict(jobid=jobid,
filehandler=self,
status=result_data,
notification_status=retry_data,
**jobdata)
def read_jobs(self) -> list[Job]:
jobs = []
for f in sorted(self.queuedir.iterdir(), key=os.path.getmtime):
if not self.queuefile_is_valid(f):
continue
try:
job = self.init_job(f)
jobs.append(job)
except Exception as e:
self.logger.error('Error when processing %s', f, exc_info=e)
return jobs
def queuefile_is_valid(self, queuefile: Path) -> bool:
filename = queuefile.name
self.logger.debug('Evaluating %s', filename)
if not queuefile.is_file():
self.logger.debug('Ignoring, %s is not a regular file', filename)
return False
if not self.pattern.fullmatch(filename):
self.logger.debug(
'Ignoring, %s does not match expected name format', filename)
return False
self.logger.debug('%s looks like a valid queue item', filename)
return True
def write_status_file(self, path: Path, data: Result):
with open(path, 'w') as f:
json.dump(asdict(data), f, default=str)
def write_result_file(self, jobid: str, result: Result):
result_file = self.resultdir / f'{jobid}.json'
self.write_status_file(result_file, result)
def write_retry_file(self, jobid: str, result: Result):
retry_file = self.retrydir / f'{jobid}.json'
self.write_status_file(retry_file, result)
def populate_queues(self,
jobqueue: Queue,
resultqueue: mp.Queue,
retryqueue: Queue) -> None:
self.logger.debug('Populating queues')
for job in self.read_jobs():
jobid = job.jobid
jobstatus = job.status.result
if jobstatus == 'Pending':
self.logger.debug('Queueing %s for processing', jobid)
jobqueue.put(job)
elif job.notification_status is None:
self.logger.debug('Queueing %s for notification', jobid)
resultqueue.put(job)
else:
self.logger.debug('Queueing %s to retry notification', jobid)
retryqueue.put(job)