3da736422d
Also removed some redundant info from the startup debug line.
282 lines
11 KiB
Python
282 lines
11 KiB
Python
import logging
|
|
import os
|
|
import time
|
|
|
|
from multiprocessing import Process, Queue
|
|
from pathlib import Path
|
|
from queue import Empty
|
|
from time import sleep
|
|
|
|
from .handler import Handler
|
|
from ..exceptions import ValidationException
|
|
|
|
import whisper
|
|
import whisper.utils
|
|
|
|
|
|
def _whisper_processor(inqueue,
|
|
outqueue,
|
|
device,
|
|
modelname,
|
|
modeldir,
|
|
loglevel):
|
|
logger = logging.getLogger(
|
|
'play-daemon.SubtitlesWhisperHandler._whisper_processor')
|
|
logger.setLevel(loglevel)
|
|
logger.debug('Starting on %s', device)
|
|
|
|
rundevice = 'cpu'
|
|
if device != 'cpu':
|
|
# This is a hack. In order to get around a bug in the triton library
|
|
# that makes it impossible to assign work to a gpu other that the
|
|
# first, we only expose the desired gpu to the transcription process.
|
|
# This has the side-effect of precluding any sort of
|
|
# automatic gpu detection.
|
|
os.environ['CUDA_VISIBLE_DEVICES'] = device
|
|
rundevice = 'cuda'
|
|
|
|
whisperModel = whisper.load_model(modelname,
|
|
download_root=modeldir,
|
|
device=rundevice)
|
|
logger.debug('Model loaded, awaiting jobs.')
|
|
while True:
|
|
try:
|
|
trackname, inpath, outpath, language = inqueue.get_nowait()
|
|
except Empty:
|
|
sleep(1)
|
|
continue
|
|
|
|
logger.info('Starting whisper transcription job for %s.', inpath)
|
|
start = time.time()
|
|
try:
|
|
result = whisper.transcribe(whisperModel,
|
|
str(inpath),
|
|
language=language,
|
|
word_timestamps=True)
|
|
end = time.time()
|
|
if language is None:
|
|
out_language = result['language']
|
|
logger.info(
|
|
"Detected language '%s' in %s.", out_language, inpath)
|
|
else:
|
|
out_language = language
|
|
|
|
vttWriter = whisper.utils.WriteVTT(str(outpath.parent))
|
|
vttWriter.always_include_hours = True
|
|
with open(outpath, 'w') as f:
|
|
vttWriter.write_result(result, f, {'max_line_width': None,
|
|
'max_line_count': 3,
|
|
'max_words_per_line': 15,
|
|
'highlight_words': False})
|
|
elapsed = time.strftime('%H:%M:%S', time.gmtime(end - start))
|
|
logger.info('Finished whisper transcription job for %s in %s.',
|
|
inpath, elapsed)
|
|
except Exception as e:
|
|
outqueue.put(('Failure', (trackname, e)))
|
|
else:
|
|
outqueue.put(('Success', trackname))
|
|
|
|
|
|
@Handler.register
|
|
class SubtitlesWhisperHandler(Handler):
|
|
"""
|
|
This class handles subtitle generation with Whisper.
|
|
"""
|
|
def __init__(self,
|
|
handlerqueue,
|
|
collector,
|
|
worker,
|
|
ldap,
|
|
tempdir,
|
|
config,
|
|
device):
|
|
super().__init__(handlerqueue,
|
|
collector,
|
|
worker,
|
|
ldap,
|
|
tempdir,
|
|
config)
|
|
self.subprocess_inqueue = Queue()
|
|
self.subprocess_outqueue = Queue()
|
|
self.transcribe_process = Process(
|
|
target=_whisper_processor,
|
|
args=(self.subprocess_inqueue,
|
|
self.subprocess_outqueue,
|
|
device,
|
|
config['whispermodel'],
|
|
config['modeldir'],
|
|
self.logger.getEffectiveLevel()))
|
|
self.transcribe_process.daemon = True
|
|
self.transcribe_process.start()
|
|
|
|
@classmethod
|
|
def instantiate(cls,
|
|
handlerqueue,
|
|
collector,
|
|
worker,
|
|
ldap,
|
|
tempdir,
|
|
config):
|
|
"""
|
|
Returns a list of SubtitlesWhisperHandlers.
|
|
|
|
Instantiation behaviour is governed by two configuration values:
|
|
device and count. Both are optional.
|
|
|
|
If device is not specified, the device will be determined by
|
|
GPU availability.
|
|
|
|
If only device is specified and the device is GPU, a handler is
|
|
created for each avilable GPU. If no GPUs are available, an exception
|
|
is raised.
|
|
|
|
If only device is provided and the device is CPU, a single handler
|
|
is created.
|
|
|
|
If count is provided, that number of handlers will be created. If the
|
|
device is GPU, and count is greater than the number of available GPUs,
|
|
an exception is raised.
|
|
"""
|
|
|
|
if 'device' in config:
|
|
device = config['device']
|
|
else:
|
|
raise Exception("'device' is a mandatory configuration "
|
|
"setting for SubtitlesWhisperHandler.")
|
|
|
|
if device == 'cpu':
|
|
devices = [device for i in range(int(config.get('count', 1)))]
|
|
else:
|
|
id_config_string = config.get('gpu_ids', '0')
|
|
ids = [s.strip() for s in id_config_string.split(',')]
|
|
per_gpu = int(config.get('threads_per_gpu', 1))
|
|
|
|
# 'for j in range(per_gpu)' repeats each gpu id
|
|
# the appropriate number of times
|
|
devices = [i for i in ids for j in range(per_gpu)]
|
|
|
|
return [cls(handlerqueue,
|
|
collector,
|
|
worker,
|
|
ldap,
|
|
tempdir,
|
|
config,
|
|
dev)
|
|
for dev in devices]
|
|
|
|
@classmethod
|
|
def wants(cls, jobspec, existing_package):
|
|
"""
|
|
Return True if this handler wants to process this jobspec.
|
|
Raises an exception if the job is wanted but doesn't pass validation.
|
|
|
|
A job is wanted if the job specification contains
|
|
a 'generate_subtitles' key.
|
|
"""
|
|
if 'generate_subtitles' in jobspec:
|
|
return cls._validate(jobspec, existing_package)
|
|
return False
|
|
|
|
@classmethod
|
|
def _validate(cls, jobspec, existing_package):
|
|
"""
|
|
Return True if the job is valid for this handler.
|
|
|
|
Validity requirements are:
|
|
- Keys in 'subtitles' and 'generate_subtitles' must be
|
|
mututally unique.
|
|
- All 'source' values in subtitle generation specifications must be a
|
|
valid source name, either one that already exists or one provided
|
|
under 'sources' in this job.
|
|
- The 'type' value in subtitle generation specifications must be a
|
|
supported generator (currently only 'whisper').
|
|
"""
|
|
super()._validate(jobspec, existing_package)
|
|
# Check for duplicate track names
|
|
generate_names = jobspec.get('generate_subtitles', {}).keys()
|
|
store_names = jobspec.get('subtitles', {}).keys()
|
|
common_names = generate_names & store_names
|
|
if common_names:
|
|
names_string = ', '.join(common_names)
|
|
raise ValidationException(
|
|
f"Duplicate subtitle track name(s): {names_string}")
|
|
|
|
# Validate generation tasks
|
|
for name, genspec in jobspec.get('generate_subtitles', {}).items():
|
|
if genspec['type'] != 'whisper':
|
|
raise ValidationException(
|
|
"Unsupported subtitle generation job type: "
|
|
f"{genspec['type']}")
|
|
expected_source = genspec['source']
|
|
jobsource = jobspec.get('sources', {}).get(expected_source, {})
|
|
existing_source = existing_package['sources'].get(expected_source,
|
|
{})
|
|
if 'video' not in jobsource and 'video' not in existing_source:
|
|
raise ValidationException(f"Subtitle track '{name}' refers "
|
|
"to a missing source: "
|
|
f"{expected_source}")
|
|
return True
|
|
|
|
def _handle(self, jobspec, existing_package, tempdir):
|
|
"""
|
|
Return a function to apply changes to the stored package.
|
|
|
|
All subtitle generation tasks are run before apply_func is returned.
|
|
The returned function moves generated subtitle files into the
|
|
package's basedir and updates the package metadata.
|
|
Replaced subtitle tracks are deleted.
|
|
"""
|
|
|
|
transcribes = {}
|
|
resultfiles = {}
|
|
|
|
for trackname, item in jobspec.get('generate_subtitles', {}).items():
|
|
sourcename = item['source']
|
|
language = item.get('language', None)
|
|
sourcepath = None
|
|
source_from_job = jobspec.get('sources', {}).get(sourcename, {})
|
|
file_from_job = source_from_job.get('video', None)
|
|
if file_from_job:
|
|
sourcepath = Path(jobspec['upload_dir']) / file_from_job
|
|
else:
|
|
existing_source = existing_package['sources'][sourcename]
|
|
# Sorting the available resolutions in ascending numeric order
|
|
resolutions_sorted = sorted(existing_source['video'].keys(),
|
|
key=int)
|
|
# Picking the smallest resolution,
|
|
# since the sound quality is always the same
|
|
basedir = Path(existing_package.basedir)
|
|
sourcefile = existing_source['video'][resolutions_sorted[0]]
|
|
sourcepath = basedir / sourcefile
|
|
|
|
generated_name = trackname.replace('_', '__').replace(' ', '_')
|
|
outpath = tempdir / f"{generated_name}.vtt"
|
|
|
|
self.subprocess_inqueue.put((trackname,
|
|
sourcepath,
|
|
outpath,
|
|
language))
|
|
transcribes[trackname] = 'pending'
|
|
resultfiles[trackname] = outpath
|
|
|
|
# Wait for transcribes to finish
|
|
self.logger.info("Waiting for transcribes")
|
|
while 'pending' in transcribes.values():
|
|
result, contents = self.subprocess_outqueue.get()
|
|
if result == 'Success':
|
|
trackname = contents
|
|
transcribes[trackname] = 'done'
|
|
else:
|
|
trackname, e = contents
|
|
transcribes[trackname] = 'failed'
|
|
self.logger.error('Generation of %s failed.',
|
|
trackname, exc_info=e)
|
|
self.logger.info("Done, making apply_func")
|
|
|
|
def apply_func(package):
|
|
for name, subspath in resultfiles.items():
|
|
if transcribes[name] == 'done':
|
|
package.set_subtitle_track(name, subspath)
|
|
|
|
return apply_func
|