Erik Thuning 87da30ab93 Moved the responsibility for storing job state to disk into Job update methods.
As a consequence, the storage module no longer provides a singleton.
2024-06-03 14:27:31 +02:00

188 lines
5.7 KiB
Python

import json
import logging
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from pathlib import Path
import whisper.utils
from .exceptions import JobSpecificationException
@dataclass
class Result:
result: str = field(init=False)
@classmethod
def from_dict(cls, **kwargs):
return cls(**kwargs)
def __post_init__(self):
self.result = self.__class__.__name__
@dataclass
class Pending(Result):
pass
@dataclass
class Success(Result):
resultfile: Path
@classmethod
def from_dict(cls, result: str, resultfile: str):
# Ignoring result since it will be auto-populated
return cls(Path(resultfile))
@dataclass
class Failure(Result):
errormessage: str
@classmethod
def from_dict(cls, result: str, errormessage: str):
# Ignoring result since it will be auto-populated
return cls(errormessage)
@dataclass
class SuccessfulNotification(Result):
success_time: datetime = None
@classmethod
def from_dict(cls, result: str, success_time: int):
# Ignoring result since it will be auto-populated
return cls(datetime.fromtimestamp(success_time))
def __post_init__(self):
super().__post_init__()
if self.success_time is None:
self.success_time = datetime.now()
def get_cleanup_time(self):
return self.success_time + timedelta(days=30)
@dataclass
class FailedNotification(Result):
failures: dict[int, (datetime, str)] = None
@classmethod
def from_dict(cls, result: str, failures: dict[str, (str, str)]):
# Ignoring result since it will be auto-populated
failures = {int(index): (datetime.fromisoformat(time), error)
for index, (time, error)
in failures.items()}
return cls(failures)
def __post_init__(self):
super().__post_init__()
if self.failures is None:
self.failures = {}
def fail_attempt(self, e: Exception) -> None:
next_index = len(self.failures.keys()) + 1
self.failures[next_index] = (datetime.now(),
str(e))
def get_next_attempt(self) -> float:
# Backoff algorithm: Retry after 15 minutes, doubling wait on each
# subsequent failure, maxing out at 8 hours.
fail_count = len(self.failures.keys())
last_failure, _ = self.failures[max(self.failures.keys())]
proposed_wait_time = (2**fail_count) * 15
wait_time = min(proposed_wait_time, 480)
delta = timedelta(minutes=wait_time)
next_time = last_failure + delta
return next_time.timestamp()
class Job:
@classmethod
def from_dict(cls,
jobid: str,
filehandler: 'FileHandler',
callback: str,
jobfile: str,
origin: str,
outputformat: str,
language: str = None,
status: dict = None,
notification_status: dict = None):
job = Job(jobid,
filehandler,
callback,
Path(jobfile),
origin,
outputformat,
language)
if status is not None:
result = status['result']
if result == 'Success':
job.status = Success.from_dict(**status)
elif result == 'Failure':
job.status = Failure.from_dict(**status)
if notification_status is not None:
result = notification_status['result']
if result == 'SuccessfulNotification':
job.notification_status = SuccessfulNotification.from_dict(
**notification_status)
elif result == 'FailedNotification':
job.notification_status = FailedNotification.from_dict(
**notification_status)
return job
def __init__(self,
jobid: str,
filehandler: 'FileHandler',
callback: str,
jobfile: Path,
origin: str,
outputformat: str,
language: str = None):
self.jobid = jobid
self.filehandler = filehandler
self.callback = callback
self.jobfile = jobfile
self.origin = origin
self.outputformat = outputformat
self.language = language
self.status = Pending()
self.notification_status = None
self.logger = logging.getLogger(f'whisper-daemon.Job[{jobid}]')
if self.outputformat not in ['txt', 'vtt', 'srt', 'tsv', 'json']:
raise JobSpecificationException(
f'Invalid output format: {self.outputformat}')
self.logger.info('Initialized')
def finish(self, textfile: Path) -> None:
self.status = Success(textfile.absolute())
# Temporarily disabling for ease of testing
#self.jobfile.unlink()
#self.jobfile = None
self.filehandler.write_result_file(self.jobid, self.status)
def fail(self, e: Exception) -> None:
self.status = Failure(str(e))
self.filehandler.write_result_file(self.jobid, self.status)
def succeed_notification(self):
self.notification_status = SuccessfulNotification()
def fail_notification(self, e: Exception) -> None:
if isinstance(self.notification_status, SuccessfulNotification):
raise Exception("Can't fail after succeeding")
if self.notification_status is None:
self.notification_status = FailedNotification()
self.notification_status.fail_attempt(e)
self.filehandler.write_retry_file(self.jobid,
self.notification_status)