play-daemon-threaded/pipeline/handlers/subtitles_whisper.py
Erik Thuning 3da736422d Added a debug log line when the model has finished loading.
Also removed some redundant info from the startup debug line.
2024-10-16 14:03:20 +02:00

282 lines
11 KiB
Python

import logging
import os
import time
from multiprocessing import Process, Queue
from pathlib import Path
from queue import Empty
from time import sleep
from .handler import Handler
from ..exceptions import ValidationException
import whisper
import whisper.utils
def _whisper_processor(inqueue,
outqueue,
device,
modelname,
modeldir,
loglevel):
logger = logging.getLogger(
'play-daemon.SubtitlesWhisperHandler._whisper_processor')
logger.setLevel(loglevel)
logger.debug('Starting on %s', device)
rundevice = 'cpu'
if device != 'cpu':
# This is a hack. In order to get around a bug in the triton library
# that makes it impossible to assign work to a gpu other that the
# first, we only expose the desired gpu to the transcription process.
# This has the side-effect of precluding any sort of
# automatic gpu detection.
os.environ['CUDA_VISIBLE_DEVICES'] = device
rundevice = 'cuda'
whisperModel = whisper.load_model(modelname,
download_root=modeldir,
device=rundevice)
logger.debug('Model loaded, awaiting jobs.')
while True:
try:
trackname, inpath, outpath, language = inqueue.get_nowait()
except Empty:
sleep(1)
continue
logger.info('Starting whisper transcription job for %s.', inpath)
start = time.time()
try:
result = whisper.transcribe(whisperModel,
str(inpath),
language=language,
word_timestamps=True)
end = time.time()
if language is None:
out_language = result['language']
logger.info(
"Detected language '%s' in %s.", out_language, inpath)
else:
out_language = language
vttWriter = whisper.utils.WriteVTT(str(outpath.parent))
vttWriter.always_include_hours = True
with open(outpath, 'w') as f:
vttWriter.write_result(result, f, {'max_line_width': None,
'max_line_count': 3,
'max_words_per_line': 15,
'highlight_words': False})
elapsed = time.strftime('%H:%M:%S', time.gmtime(end - start))
logger.info('Finished whisper transcription job for %s in %s.',
inpath, elapsed)
except Exception as e:
outqueue.put(('Failure', (trackname, e)))
else:
outqueue.put(('Success', trackname))
@Handler.register
class SubtitlesWhisperHandler(Handler):
"""
This class handles subtitle generation with Whisper.
"""
def __init__(self,
handlerqueue,
collector,
worker,
ldap,
tempdir,
config,
device):
super().__init__(handlerqueue,
collector,
worker,
ldap,
tempdir,
config)
self.subprocess_inqueue = Queue()
self.subprocess_outqueue = Queue()
self.transcribe_process = Process(
target=_whisper_processor,
args=(self.subprocess_inqueue,
self.subprocess_outqueue,
device,
config['whispermodel'],
config['modeldir'],
self.logger.getEffectiveLevel()))
self.transcribe_process.daemon = True
self.transcribe_process.start()
@classmethod
def instantiate(cls,
handlerqueue,
collector,
worker,
ldap,
tempdir,
config):
"""
Returns a list of SubtitlesWhisperHandlers.
Instantiation behaviour is governed by two configuration values:
device and count. Both are optional.
If device is not specified, the device will be determined by
GPU availability.
If only device is specified and the device is GPU, a handler is
created for each avilable GPU. If no GPUs are available, an exception
is raised.
If only device is provided and the device is CPU, a single handler
is created.
If count is provided, that number of handlers will be created. If the
device is GPU, and count is greater than the number of available GPUs,
an exception is raised.
"""
if 'device' in config:
device = config['device']
else:
raise Exception("'device' is a mandatory configuration "
"setting for SubtitlesWhisperHandler.")
if device == 'cpu':
devices = [device for i in range(int(config.get('count', 1)))]
else:
id_config_string = config.get('gpu_ids', '0')
ids = [s.strip() for s in id_config_string.split(',')]
per_gpu = int(config.get('threads_per_gpu', 1))
# 'for j in range(per_gpu)' repeats each gpu id
# the appropriate number of times
devices = [i for i in ids for j in range(per_gpu)]
return [cls(handlerqueue,
collector,
worker,
ldap,
tempdir,
config,
dev)
for dev in devices]
@classmethod
def wants(cls, jobspec, existing_package):
"""
Return True if this handler wants to process this jobspec.
Raises an exception if the job is wanted but doesn't pass validation.
A job is wanted if the job specification contains
a 'generate_subtitles' key.
"""
if 'generate_subtitles' in jobspec:
return cls._validate(jobspec, existing_package)
return False
@classmethod
def _validate(cls, jobspec, existing_package):
"""
Return True if the job is valid for this handler.
Validity requirements are:
- Keys in 'subtitles' and 'generate_subtitles' must be
mututally unique.
- All 'source' values in subtitle generation specifications must be a
valid source name, either one that already exists or one provided
under 'sources' in this job.
- The 'type' value in subtitle generation specifications must be a
supported generator (currently only 'whisper').
"""
super()._validate(jobspec, existing_package)
# Check for duplicate track names
generate_names = jobspec.get('generate_subtitles', {}).keys()
store_names = jobspec.get('subtitles', {}).keys()
common_names = generate_names & store_names
if common_names:
names_string = ', '.join(common_names)
raise ValidationException(
f"Duplicate subtitle track name(s): {names_string}")
# Validate generation tasks
for name, genspec in jobspec.get('generate_subtitles', {}).items():
if genspec['type'] != 'whisper':
raise ValidationException(
"Unsupported subtitle generation job type: "
f"{genspec['type']}")
expected_source = genspec['source']
jobsource = jobspec.get('sources', {}).get(expected_source, {})
existing_source = existing_package['sources'].get(expected_source,
{})
if 'video' not in jobsource and 'video' not in existing_source:
raise ValidationException(f"Subtitle track '{name}' refers "
"to a missing source: "
f"{expected_source}")
return True
def _handle(self, jobspec, existing_package, tempdir):
"""
Return a function to apply changes to the stored package.
All subtitle generation tasks are run before apply_func is returned.
The returned function moves generated subtitle files into the
package's basedir and updates the package metadata.
Replaced subtitle tracks are deleted.
"""
transcribes = {}
resultfiles = {}
for trackname, item in jobspec.get('generate_subtitles', {}).items():
sourcename = item['source']
language = item.get('language', None)
sourcepath = None
source_from_job = jobspec.get('sources', {}).get(sourcename, {})
file_from_job = source_from_job.get('video', None)
if file_from_job:
sourcepath = Path(jobspec['upload_dir']) / file_from_job
else:
existing_source = existing_package['sources'][sourcename]
# Sorting the available resolutions in ascending numeric order
resolutions_sorted = sorted(existing_source['video'].keys(),
key=int)
# Picking the smallest resolution,
# since the sound quality is always the same
basedir = Path(existing_package.basedir)
sourcefile = existing_source['video'][resolutions_sorted[0]]
sourcepath = basedir / sourcefile
generated_name = trackname.replace('_', '__').replace(' ', '_')
outpath = tempdir / f"{generated_name}.vtt"
self.subprocess_inqueue.put((trackname,
sourcepath,
outpath,
language))
transcribes[trackname] = 'pending'
resultfiles[trackname] = outpath
# Wait for transcribes to finish
self.logger.info("Waiting for transcribes")
while 'pending' in transcribes.values():
result, contents = self.subprocess_outqueue.get()
if result == 'Success':
trackname = contents
transcribes[trackname] = 'done'
else:
trackname, e = contents
transcribes[trackname] = 'failed'
self.logger.error('Generation of %s failed.',
trackname, exc_info=e)
self.logger.info("Done, making apply_func")
def apply_func(package):
for name, subspath in resultfiles.items():
if transcribes[name] == 'done':
package.set_subtitle_track(name, subspath)
return apply_func