import logging import os import time from multiprocessing import Process, Queue from pathlib import Path from queue import Empty from time import sleep from .handler import Handler from ..exceptions import ValidationException import whisper import whisper.utils def _whisper_processor(inqueue, outqueue, device, modelname, modeldir, loglevel): logger = logging.getLogger( 'play-daemon.SubtitlesWhisperHandler._whisper_processor') logger.setLevel(loglevel) logger.debug('Starting on %s', device) rundevice = 'cpu' if device != 'cpu': # This is a hack. In order to get around a bug in the triton library # that makes it impossible to assign work to a gpu other that the # first, we only expose the desired gpu to the transcription process. # This has the side-effect of precluding any sort of # automatic gpu detection. os.environ['CUDA_VISIBLE_DEVICES'] = device rundevice = 'cuda' whisperModel = whisper.load_model(modelname, download_root=modeldir, device=rundevice) logger.debug('Model loaded, awaiting jobs.') while True: try: trackname, inpath, outpath, language = inqueue.get_nowait() except Empty: sleep(1) continue logger.info('Starting whisper transcription job for %s.', inpath) start = time.time() try: result = whisper.transcribe(whisperModel, str(inpath), language=language, word_timestamps=True) end = time.time() if language is None: out_language = result['language'] logger.info( "Detected language '%s' in %s.", out_language, inpath) else: out_language = language vttWriter = whisper.utils.WriteVTT(str(outpath.parent)) vttWriter.always_include_hours = True with open(outpath, 'w') as f: vttWriter.write_result(result, f, {'max_line_width': None, 'max_line_count': 3, 'max_words_per_line': 15, 'highlight_words': False}) elapsed = time.strftime('%H:%M:%S', time.gmtime(end - start)) logger.info('Finished whisper transcription job for %s in %s.', inpath, elapsed) except Exception as e: outqueue.put(('Failure', (trackname, e))) else: outqueue.put(('Success', trackname)) @Handler.register class SubtitlesWhisperHandler(Handler): """ This class handles subtitle generation with Whisper. """ def __init__(self, handlerqueue, collector, worker, ldap, tempdir, config, device): super().__init__(handlerqueue, collector, worker, ldap, tempdir, config) self.subprocess_inqueue = Queue() self.subprocess_outqueue = Queue() self.transcribe_process = Process( target=_whisper_processor, args=(self.subprocess_inqueue, self.subprocess_outqueue, device, config['whispermodel'], config['modeldir'], self.logger.getEffectiveLevel())) self.transcribe_process.daemon = True self.transcribe_process.start() @classmethod def instantiate(cls, handlerqueue, collector, worker, ldap, tempdir, config): """ Returns a list of SubtitlesWhisperHandlers. Instantiation behaviour is governed by two configuration values: device and count. Both are optional. If device is not specified, the device will be determined by GPU availability. If only device is specified and the device is GPU, a handler is created for each avilable GPU. If no GPUs are available, an exception is raised. If only device is provided and the device is CPU, a single handler is created. If count is provided, that number of handlers will be created. If the device is GPU, and count is greater than the number of available GPUs, an exception is raised. """ if 'device' in config: device = config['device'] else: raise Exception("'device' is a mandatory configuration " "setting for SubtitlesWhisperHandler.") if device == 'cpu': devices = [device for i in range(int(config.get('count', 1)))] else: id_config_string = config.get('gpu_ids', '0') ids = [s.strip() for s in id_config_string.split(',')] per_gpu = int(config.get('threads_per_gpu', 1)) # 'for j in range(per_gpu)' repeats each gpu id # the appropriate number of times devices = [i for i in ids for j in range(per_gpu)] return [cls(handlerqueue, collector, worker, ldap, tempdir, config, dev) for dev in devices] @classmethod def wants(cls, jobspec, existing_package): """ Return True if this handler wants to process this jobspec. Raises an exception if the job is wanted but doesn't pass validation. A job is wanted if the job specification contains a 'generate_subtitles' key. """ if 'generate_subtitles' in jobspec: return cls._validate(jobspec, existing_package) return False @classmethod def _validate(cls, jobspec, existing_package): """ Return True if the job is valid for this handler. Validity requirements are: - Keys in 'subtitles' and 'generate_subtitles' must be mututally unique. - All 'source' values in subtitle generation specifications must be a valid source name, either one that already exists or one provided under 'sources' in this job. - The 'type' value in subtitle generation specifications must be a supported generator (currently only 'whisper'). """ super()._validate(jobspec, existing_package) # Check for duplicate track names generate_names = jobspec.get('generate_subtitles', {}).keys() store_names = jobspec.get('subtitles', {}).keys() common_names = generate_names & store_names if common_names: names_string = ', '.join(common_names) raise ValidationException( f"Duplicate subtitle track name(s): {names_string}") # Validate generation tasks for name, genspec in jobspec.get('generate_subtitles', {}).items(): if genspec['type'] != 'whisper': raise ValidationException( "Unsupported subtitle generation job type: " f"{genspec['type']}") expected_source = genspec['source'] jobsource = jobspec.get('sources', {}).get(expected_source, {}) existing_source = existing_package['sources'].get(expected_source, {}) if 'video' not in jobsource and 'video' not in existing_source: raise ValidationException(f"Subtitle track '{name}' refers " "to a missing source: " f"{expected_source}") return True def _handle(self, jobspec, existing_package, tempdir): """ Return a function to apply changes to the stored package. All subtitle generation tasks are run before apply_func is returned. The returned function moves generated subtitle files into the package's basedir and updates the package metadata. Replaced subtitle tracks are deleted. """ transcribes = {} resultfiles = {} for trackname, item in jobspec.get('generate_subtitles', {}).items(): sourcename = item['source'] language = item.get('language', None) sourcepath = None source_from_job = jobspec.get('sources', {}).get(sourcename, {}) file_from_job = source_from_job.get('video', None) if file_from_job: sourcepath = Path(jobspec['upload_dir']) / file_from_job else: existing_source = existing_package['sources'][sourcename] # Sorting the available resolutions in ascending numeric order resolutions_sorted = sorted(existing_source['video'].keys(), key=int) # Picking the smallest resolution, # since the sound quality is always the same basedir = Path(existing_package.basedir) sourcefile = existing_source['video'][resolutions_sorted[0]] sourcepath = basedir / sourcefile generated_name = trackname.replace('_', '__').replace(' ', '_') outpath = tempdir / f"{generated_name}.vtt" self.subprocess_inqueue.put((trackname, sourcepath, outpath, language)) transcribes[trackname] = 'pending' resultfiles[trackname] = outpath # Wait for transcribes to finish self.logger.info("Waiting for transcribes") while 'pending' in transcribes.values(): result, contents = self.subprocess_outqueue.get() if result == 'Success': trackname = contents transcribes[trackname] = 'done' else: trackname, e = contents transcribes[trackname] = 'failed' self.logger.error('Generation of %s failed.', trackname, exc_info=e) self.logger.info("Done, making apply_func") def apply_func(package): for name, subspath in resultfiles.items(): if transcribes[name] == 'done': package.set_subtitle_track(name, subspath) return apply_func