87da30ab93
As a consequence, the storage module no longer provides a singleton.
122 lines
4.3 KiB
Python
122 lines
4.3 KiB
Python
import json
|
|
import logging
|
|
import multiprocessing as mp
|
|
import os
|
|
import re
|
|
|
|
from dataclasses import asdict
|
|
from pathlib import Path
|
|
from queue import Queue, Empty
|
|
|
|
from .job import Job, Result, Pending, FailedNotification
|
|
|
|
|
|
class FileHandler:
|
|
def __init__(self, queuedir: Path, donedir: Path, workdir: Path):
|
|
self.queuedir = queuedir
|
|
self.resultdir = workdir / 'results'
|
|
self.retrydir = workdir / 'retries'
|
|
self.logger = logging.getLogger('whisper-daemon.FileHandler')
|
|
self.pattern = re.compile(r'.+\.json')
|
|
|
|
for path in self.resultdir, self.retrydir:
|
|
if not path.exists():
|
|
path.mkdir()
|
|
|
|
# Since the whisper process creates a donedir way before it writes
|
|
# the transcription there, we clean out any donedirs that are empty.
|
|
for subdir in donedir.iterdir():
|
|
if not subdir.is_dir():
|
|
continue
|
|
if not any(subdir.iterdir()):
|
|
self.logger.debug('Deleted empty result dir %s', subdir)
|
|
subdir.rmdir()
|
|
|
|
self.logger.info('Initialized.')
|
|
self.logger.debug('Reading queue items from %s', self.queuedir)
|
|
self.logger.debug('Filtering queue items on %s', self.pattern.pattern)
|
|
self.logger.debug('Reading results from %s', self.resultdir)
|
|
self.logger.debug('Reading retries from %s', self.retrydir)
|
|
|
|
def init_job(self, queuefile: Path) -> Job:
|
|
jobid = queuefile.stem
|
|
job = None
|
|
result_data = None
|
|
retry_data = None
|
|
|
|
with open(queuefile) as f:
|
|
jobdata = json.load(f)
|
|
|
|
result_file = self.resultdir / f'{jobid}.json'
|
|
if result_file.exists():
|
|
with open(result_file) as f:
|
|
result_data = json.load(f)
|
|
|
|
retry_file = self.retrydir / f'{jobid}.json'
|
|
if retry_file.exists():
|
|
with open(retry_file) as f:
|
|
retry_data = json.load(f)
|
|
|
|
return Job.from_dict(jobid=jobid,
|
|
filehandler=self,
|
|
status=result_data,
|
|
notification_status=retry_data,
|
|
**jobdata)
|
|
|
|
def read_jobs(self) -> list[Job]:
|
|
jobs = []
|
|
for f in sorted(self.queuedir.iterdir(), key=os.path.getmtime):
|
|
if not self.queuefile_is_valid(f):
|
|
continue
|
|
try:
|
|
job = self.init_job(f)
|
|
jobs.append(job)
|
|
except Exception as e:
|
|
self.logger.error('Error when processing %s', f, exc_info=e)
|
|
return jobs
|
|
|
|
def queuefile_is_valid(self, queuefile: Path) -> bool:
|
|
filename = queuefile.name
|
|
|
|
self.logger.debug('Evaluating %s', filename)
|
|
if not queuefile.is_file():
|
|
self.logger.debug('Ignoring, %s is not a regular file', filename)
|
|
return False
|
|
if not self.pattern.fullmatch(filename):
|
|
self.logger.debug(
|
|
'Ignoring, %s does not match expected name format', filename)
|
|
return False
|
|
|
|
self.logger.debug('%s looks like a valid queue item', filename)
|
|
return True
|
|
|
|
def write_status_file(self, path: Path, data: Result):
|
|
with open(path, 'w') as f:
|
|
json.dump(asdict(data), f, default=str)
|
|
|
|
def write_result_file(self, jobid: str, result: Result):
|
|
result_file = self.resultdir / f'{jobid}.json'
|
|
self.write_status_file(result_file, result)
|
|
|
|
def write_retry_file(self, jobid: str, result: Result):
|
|
retry_file = self.retrydir / f'{jobid}.json'
|
|
self.write_status_file(retry_file, result)
|
|
|
|
def populate_queues(self,
|
|
jobqueue: Queue,
|
|
resultqueue: mp.Queue,
|
|
retryqueue: Queue) -> None:
|
|
self.logger.debug('Populating queues')
|
|
for job in self.read_jobs():
|
|
jobid = job.jobid
|
|
jobstatus = job.status.result
|
|
if jobstatus == 'Pending':
|
|
self.logger.debug('Queueing %s for processing', jobid)
|
|
jobqueue.put(job)
|
|
elif job.notification_status is None:
|
|
self.logger.debug('Queueing %s for notification', jobid)
|
|
resultqueue.put(job)
|
|
else:
|
|
self.logger.debug('Queueing %s to retry notification', jobid)
|
|
retryqueue.put(job)
|