188 lines
5.7 KiB
Python
188 lines
5.7 KiB
Python
import json
|
|
import logging
|
|
|
|
from dataclasses import dataclass, field
|
|
from datetime import datetime, timedelta
|
|
from pathlib import Path
|
|
|
|
import whisper.utils
|
|
|
|
from .exceptions import JobSpecificationException
|
|
|
|
|
|
@dataclass
|
|
class Result:
|
|
result: str = field(init=False)
|
|
|
|
@classmethod
|
|
def from_dict(cls, **kwargs):
|
|
return cls(**kwargs)
|
|
|
|
def __post_init__(self):
|
|
self.result = self.__class__.__name__
|
|
|
|
|
|
@dataclass
|
|
class Pending(Result):
|
|
pass
|
|
|
|
|
|
@dataclass
|
|
class Success(Result):
|
|
resultfile: Path
|
|
|
|
@classmethod
|
|
def from_dict(cls, result: str, resultfile: str):
|
|
# Ignoring result since it will be auto-populated
|
|
return cls(Path(resultfile))
|
|
|
|
|
|
@dataclass
|
|
class Failure(Result):
|
|
errormessage: str
|
|
|
|
@classmethod
|
|
def from_dict(cls, result: str, errormessage: str):
|
|
# Ignoring result since it will be auto-populated
|
|
return cls(errormessage)
|
|
|
|
|
|
@dataclass
|
|
class SuccessfulNotification(Result):
|
|
success_time: datetime = None
|
|
|
|
@classmethod
|
|
def from_dict(cls, result: str, success_time: int):
|
|
# Ignoring result since it will be auto-populated
|
|
return cls(datetime.fromtimestamp(success_time))
|
|
|
|
def __post_init__(self):
|
|
super().__post_init__()
|
|
if self.success_time is None:
|
|
self.success_time = datetime.now()
|
|
|
|
def get_cleanup_time(self):
|
|
return self.success_time + timedelta(days=30)
|
|
|
|
|
|
@dataclass
|
|
class FailedNotification(Result):
|
|
failures: dict[int, (datetime, str)] = None
|
|
|
|
@classmethod
|
|
def from_dict(cls, result: str, failures: dict[str, (str, str)]):
|
|
# Ignoring result since it will be auto-populated
|
|
failures = {int(index): (datetime.fromisoformat(time), error)
|
|
for index, (time, error)
|
|
in failures.items()}
|
|
return cls(failures)
|
|
|
|
def __post_init__(self):
|
|
super().__post_init__()
|
|
if self.failures is None:
|
|
self.failures = {}
|
|
|
|
def fail_attempt(self, e: Exception) -> None:
|
|
next_index = len(self.failures.keys()) + 1
|
|
self.failures[next_index] = (datetime.now(),
|
|
str(e))
|
|
|
|
def get_next_attempt(self) -> float:
|
|
# Backoff algorithm: Retry after 15 minutes, doubling wait on each
|
|
# subsequent failure, maxing out at 8 hours.
|
|
fail_count = len(self.failures.keys())
|
|
last_failure, _ = self.failures[max(self.failures.keys())]
|
|
proposed_wait_time = (2**fail_count) * 15
|
|
wait_time = min(proposed_wait_time, 480)
|
|
delta = timedelta(minutes=wait_time)
|
|
next_time = last_failure + delta
|
|
return next_time.timestamp()
|
|
|
|
|
|
class Job:
|
|
@classmethod
|
|
def from_dict(cls,
|
|
jobid: str,
|
|
filehandler: 'FileHandler',
|
|
callback: str,
|
|
jobfile: str,
|
|
origin: str,
|
|
outputformat: str,
|
|
language: str = None,
|
|
status: dict = None,
|
|
notification_status: dict = None):
|
|
job = Job(jobid,
|
|
filehandler,
|
|
callback,
|
|
Path(jobfile),
|
|
origin,
|
|
outputformat,
|
|
language)
|
|
|
|
if status is not None:
|
|
result = status['result']
|
|
if result == 'Success':
|
|
job.status = Success.from_dict(**status)
|
|
elif result == 'Failure':
|
|
job.status = Failure.from_dict(**status)
|
|
|
|
if notification_status is not None:
|
|
result = notification_status['result']
|
|
if result == 'SuccessfulNotification':
|
|
job.notification_status = SuccessfulNotification.from_dict(
|
|
**notification_status)
|
|
elif result == 'FailedNotification':
|
|
job.notification_status = FailedNotification.from_dict(
|
|
**notification_status)
|
|
return job
|
|
|
|
def __init__(self,
|
|
jobid: str,
|
|
filehandler: 'FileHandler',
|
|
callback: str,
|
|
jobfile: Path,
|
|
origin: str,
|
|
outputformat: str,
|
|
language: str = None):
|
|
self.jobid = jobid
|
|
self.filehandler = filehandler
|
|
self.callback = callback
|
|
self.jobfile = jobfile
|
|
self.origin = origin
|
|
self.outputformat = outputformat
|
|
self.language = language
|
|
self.status = Pending()
|
|
self.notification_status = None
|
|
|
|
self.logger = logging.getLogger(f'whisper-daemon.Job[{jobid}]')
|
|
|
|
if self.outputformat not in ['txt', 'vtt', 'srt', 'tsv', 'json']:
|
|
raise JobSpecificationException(
|
|
f'Invalid output format: {self.outputformat}')
|
|
|
|
self.logger.info('Initialized')
|
|
|
|
def finish(self, textfile: Path) -> None:
|
|
self.status = Success(textfile.absolute())
|
|
# Temporarily disabling for ease of testing
|
|
#self.jobfile.unlink()
|
|
#self.jobfile = None
|
|
self.filehandler.write_result_file(self.jobid, self.status)
|
|
|
|
def fail(self, e: Exception) -> None:
|
|
self.status = Failure(str(e))
|
|
self.filehandler.write_result_file(self.jobid, self.status)
|
|
|
|
def succeed_notification(self):
|
|
self.notification_status = SuccessfulNotification()
|
|
|
|
def fail_notification(self, e: Exception) -> None:
|
|
if isinstance(self.notification_status, SuccessfulNotification):
|
|
raise Exception("Can't fail after succeeding")
|
|
if self.notification_status is None:
|
|
self.notification_status = FailedNotification()
|
|
|
|
self.notification_status.fail_attempt(e)
|
|
self.filehandler.write_retry_file(self.jobid,
|
|
self.notification_status)
|