import logging import time from multiprocessing import Process, Queue from pathlib import Path from queue import Empty from time import sleep from .handler import Handler from ..exceptions import ConfigurationException, ValidationException import torch import whisper import whisper.utils def _whisper_processor(inqueue, outqueue, device, modelname, modeldir, loglevel): logger = logging.getLogger( 'play-daemon.SubtitlesWhisperHandler._whisper_processor') logger.setLevel(loglevel) logger.debug('Starting _whisper_processor on %s', device) whisperModel = whisper.load_model(modelname, download_root=modeldir, device=device) while True: try: trackname, inpath, outpath, language = inqueue.get_nowait() except Empty: sleep(1) continue logger.info('Starting whisper transcription job for %s.', inpath) start = time.time() try: result = whisper.transcribe(whisperModel, str(inpath), language=language, word_timestamps=True) end = time.time() if language is None: out_language = result['language'] logger.info("Detected language '%s' in %s.", out_language, inpath) else: out_language = language vttWriter = whisper.utils.WriteVTT(str(outpath.parent)) vttWriter.always_include_hours = True with open(outpath, 'w') as f: vttWriter.write_result(result, f, {'max_line_width': None, 'max_line_count': 3, 'max_words_per_line': 15, 'highlight_words': False}) elapsed = time.strftime('%H:%M:%S', time.gmtime(end - start)) logger.info('Finished whisper transcription job for %s in %s.', inpath, elapsed) except Exception as e: outqueue.put(('Failure', (trackname, e))) else: outqueue.put(('Success', trackname)) @Handler.register class SubtitlesWhisperHandler(Handler): """ This class handles subtitle generation with Whisper. """ def __init__(self, handlerqueue, collector, worker, ldap, tempdir, config, device='cpu'): super().__init__(handlerqueue, collector, worker, ldap, tempdir, config) self.subprocess_inqueue = Queue() self.subprocess_outqueue = Queue() self.transcribe_process = Process( target=_whisper_processor, args=(self.subprocess_inqueue, self.subprocess_outqueue, device, config['whispermodel'], config['modeldir'], self.logger.getEffectiveLevel())) self.transcribe_process.daemon = True self.transcribe_process.start() @classmethod def instantiate(cls, handlerqueue, collector, worker, ldap, tempdir, config): """ Returns a list of SubtitlesWhisperHandlers. Instantiation behaviour is governed by two configuration values: device and count. Both are optional. If device is not specified, the device will be determined by GPU availability. If only device is specified and the device is GPU, a handler is created for each avilable GPU. If no GPUs are available, an exception is raised. If only device is provided and the device is CPU, a single handler is created. If count is provided, that number of handlers will be created. If the device is GPU, and count is greater than the number of available GPUs, an exception is raised. """ gpu_count = torch.cuda.device_count() if 'device' in config: device = config['device'] else: if gpu_count > 0: device = 'gpu' else: device = 'cpu' if device == 'cpu': devices = [device for i in range(int(config.get('count', 1)))] else: if gpu_count == 0: raise ConfigurationException('GPU requested ' 'but none available.') count = int(config.get('count', gpu_count)) if count > gpu_count: raise ConfigurationException(f'{count} workers requested ' f'but only {gpu_count} ' 'GPUs available.') devices = [f'cuda:{i}' for i in range(count)] return [cls(handlerqueue, collector, worker, ldap, tempdir, config, dev) for dev in devices] @classmethod def wants(cls, jobspec, existing_package): """ Return True if this handler wants to process this jobspec. Raises an exception if the job is wanted but doesn't pass validation. A job is wanted if the job specification contains a 'generate_subtitles' key. """ if 'generate_subtitles' in jobspec: return cls._validate(jobspec, existing_package) return False @classmethod def _validate(cls, jobspec, existing_package): """ Return True if the job is valid for this handler. Validity requirements are: - Keys in 'subtitles' and 'generate_subtitles' must be mututally unique. - All 'source' values in subtitle generation specifications must be a valid source name, either one that already exists or one provided under 'sources' in this job. - The 'type' value in subtitle generation specifications must be a supported generator (currently only 'whisper'). """ super()._validate(jobspec, existing_package) # Check for duplicate track names generate_names = jobspec.get('generate_subtitles', {}).keys() store_names = jobspec.get('subtitles', {}).keys() common_names = generate_names & store_names if common_names: names_string = ', '.join(common_names) raise ValidationException( f"Duplicate subtitle track name(s): {names_string}") # Validate generation tasks for name, genspec in jobspec.get('generate_subtitles', {}).items(): if genspec['type'] != 'whisper': raise ValidationException( "Unsupported subtitle generation job type: " f"{genspec['type']}") expected_source = genspec['source'] jobsource = jobspec.get('sources', {}).get(expected_source, {}) existing_source = existing_package['sources'].get(expected_source, {}) if 'video' not in jobsource and 'video' not in existing_source: raise ValidationException(f"Subtitle track '{name}' refers " "to a missing source: " f"{expected_source}") return True def _handle(self, jobspec, existing_package, tempdir): """ Return a function to apply changes to the stored package. All subtitle generation tasks are run before apply_func is returned. The returned function moves generated subtitle files into the package's basedir and updates the package metadata. Replaced subtitle tracks are deleted. """ transcribes = {} resultfiles = {} for trackname, item in jobspec.get('generate_subtitles', {}).items(): sourcename = item['source'] language = item.get('language', None) sourcepath = None source_from_job = jobspec.get('sources', {}).get(sourcename, {}) file_from_job = source_from_job.get('video', None) if file_from_job: sourcepath = Path(jobspec['upload_dir']) / file_from_job else: existing_source = existing_package['sources'][sourcename] # Sorting the available resolutions in ascending numeric order resolutions_sorted = sorted(existing_source['video'].keys(), key=int) # Picking the smallest resolution, # since the sound quality is always the same basedir = Path(existing_package.basedir) sourcefile = existing_source['video'][resolutions_sorted[0]] sourcepath = basedir / sourcefile generated_name = trackname.replace('_', '__').replace(' ', '_') outpath = tempdir / f"{generated_name}.vtt" self.subprocess_inqueue.put((trackname, sourcepath, outpath, language)) transcribes[trackname] = 'pending' resultfiles[trackname] = outpath # Wait for transcribes to finish self.logger.info("Waiting for transcribes") while 'pending' in transcribes.values(): result, contents = self.subprocess_outqueue.get() if result == 'Success': trackname = contents transcribes[trackname] = 'done' else: trackname, e = contents transcribes[trackname] = 'failed' self.logger.error('Generation of %s failed.', trackname, exc_info=e) self.logger.info("Done, making apply_func") def apply_func(package): for name, subspath in resultfiles.items(): if transcribes[name] == 'done': package.set_subtitle_track(name, subspath) return apply_func