import json import logging from dataclasses import dataclass, field from datetime import datetime, timedelta from pathlib import Path import whisper.utils from .exceptions import JobSpecificationException @dataclass class Result: result: str = field(init=False) @classmethod def from_dict(cls, **kwargs): return cls(**kwargs) def __post_init__(self): self.result = self.__class__.__name__ @dataclass class Pending(Result): pass @dataclass class Success(Result): resultfile: Path @classmethod def from_dict(cls, result: str, resultfile: str): # Ignoring result since it will be auto-populated return cls(Path(resultfile)) @dataclass class Failure(Result): errormessage: str @classmethod def from_dict(cls, result: str, errormessage: str): # Ignoring result since it will be auto-populated return cls(errormessage) @dataclass class SuccessfulNotification(Result): success_time: datetime = None @classmethod def from_dict(cls, result: str, success_time: int): # Ignoring result since it will be auto-populated return cls(datetime.fromtimestamp(success_time)) def __post_init__(self): super().__post_init__() if self.success_time is None: self.success_time = datetime.now() def get_cleanup_time(self): return self.success_time + timedelta(days=30) @dataclass class FailedNotification(Result): failures: dict[int, (datetime, str)] = None @classmethod def from_dict(cls, result: str, failures: dict[str, (str, str)]): # Ignoring result since it will be auto-populated failures = {int(index): (datetime.fromisoformat(time), error) for index, (time, error) in failures.items()} return cls(failures) def __post_init__(self): super().__post_init__() if self.failures is None: self.failures = {} def fail_attempt(self, e: Exception) -> None: next_index = len(self.failures.keys()) + 1 self.failures[next_index] = (datetime.now(), str(e)) def get_next_attempt(self) -> float: # Backoff algorithm: Retry after 15 minutes, doubling wait on each # subsequent failure, maxing out at 8 hours. fail_count = len(self.failures.keys()) last_failure, _ = self.failures[max(self.failures.keys())] proposed_wait_time = (2**fail_count) * 15 wait_time = min(proposed_wait_time, 480) delta = timedelta(minutes=wait_time) next_time = last_failure + delta return next_time.timestamp() class Job: @classmethod def from_dict(cls, jobid: str, filehandler: 'FileHandler', callback: str, jobfile: str, origin: str, outputformat: str, language: str = None, status: dict = None, notification_status: dict = None): job = Job(jobid, filehandler, callback, Path(jobfile), origin, outputformat, language) if status is not None: result = status['result'] if result == 'Success': job.status = Success.from_dict(**status) elif result == 'Failure': job.status = Failure.from_dict(**status) if notification_status is not None: result = notification_status['result'] if result == 'SuccessfulNotification': job.notification_status = SuccessfulNotification.from_dict( **notification_status) elif result == 'FailedNotification': job.notification_status = FailedNotification.from_dict( **notification_status) return job def __init__(self, jobid: str, filehandler: 'FileHandler', callback: str, jobfile: Path, origin: str, outputformat: str, language: str = None): self.jobid = jobid self.filehandler = filehandler self.callback = callback self.jobfile = jobfile self.origin = origin self.outputformat = outputformat self.language = language self.status = Pending() self.notification_status = None self.logger = logging.getLogger(f'whisper-daemon.Job[{jobid}]') if self.outputformat not in ['txt', 'vtt', 'srt', 'tsv', 'json']: raise JobSpecificationException( f'Invalid output format: {self.outputformat}') self.logger.info('Initialized') def finish(self, textfile: Path) -> None: self.status = Success(textfile.absolute()) # Temporarily disabling for ease of testing #self.jobfile.unlink() #self.jobfile = None self.filehandler.write_result_file(self.jobid, self.status) def fail(self, e: Exception) -> None: self.status = Failure(str(e)) self.filehandler.write_result_file(self.jobid, self.status) def succeed_notification(self): self.notification_status = SuccessfulNotification() def fail_notification(self, e: Exception) -> None: if isinstance(self.notification_status, SuccessfulNotification): raise Exception("Can't fail after succeeding") if self.notification_status is None: self.notification_status = FailedNotification() self.notification_status.fail_attempt(e) self.filehandler.write_retry_file(self.jobid, self.notification_status)