Major updates to the subtitles handler with some knock-on effects

- More than one subtitles handler can be run in parallel
 - Language can be specified in generation tasks
 - Switched to pathlib for path handling in subtitles handler (simpler)
 - Handlers are now instantiated via factory function
This commit is contained in:
Erik Thuning 2023-11-24 16:17:51 +01:00
parent 755b8e6839
commit a0bba02840
5 changed files with 184 additions and 42 deletions

@ -1,6 +1,9 @@
class ValidationException(Exception):
pass
class ConfigurationException(Exception):
pass
class FFmpegException(Exception):
def __init__(self, inpath, outpath, maxheight, errormessage):
self.inpath = inpath

@ -28,13 +28,12 @@ def init_handlers(collector, worker, config):
handlerqueue = mp.Queue()
handler_queue_map[handlerclass] = handlerqueue
handlerconf = get_section(config, handlerclass.__name__)
for i in range(int(handlerconf.get('count', 1))):
handler = handlerclass(handlerqueue,
collector,
worker,
ldap,
config['Pipeline']['tempdir'],
handlerconf)
handler_list.append(handler)
handlers = handlerclass.instantiate(handlerqueue,
collector,
worker,
ldap,
config['Pipeline']['tempdir'],
handlerconf)
handler_list += handlers
return (handler_queue_map, handler_list)

@ -1,7 +1,7 @@
import logging
from abc import ABCMeta, abstractmethod
from os import mkdir, path
from pathlib import Path
from shutil import rmtree
from ..collector import DoneItem, ErrorItem
@ -12,7 +12,7 @@ from ..queuethread import QueueThread
@QueueThread.register
class Handler(QueueThread, metaclass=ABCMeta):
"""
Base class for handlers to be registered with a QueuePickup instance.
Base class for handlers.
"""
def __init__(self,
handlerqueue,
@ -25,17 +25,43 @@ class Handler(QueueThread, metaclass=ABCMeta):
self.collector = collector
self.workerthread = worker
self.ldap = ldap
self.tempfiles_parent = tempdir
self.tempfiles_parent = Path(tempdir)
self.config = config
@classmethod
def instantiate(cls,
handlerqueue,
collector,
worker,
ldap,
tempdir,
config):
"""
Factory method to create a number of Handler instances based on the
count given in the config for a specific handler.
Only intended to be called on a subclass.
Returns a list of at least one instance of the Handler in question.
"""
instances = []
for i in range(config.get('count', 1)):
instances.append(cls(handlerqueue,
collector,
worker,
ldap,
tempdir,
config))
return instances
def _process(self, queueitem):
"""
Process a pending job. Calls _handle and passes the result
to the collector.
"""
tempdir = path.join(self.tempfiles_parent,
f"{queueitem['jobid']}.{self.__class__.__name__}")
mkdir(tempdir)
tempdir = (self.tempfiles_parent
/ f"{queueitem['jobid']}.{self.__class__.__name__}")
tempdir.mkdir()
try:
apply_func = self._handle(queueitem['jobspec'],
queueitem['existing'],

@ -1,15 +1,22 @@
import logging
import time
from os import path
from pathlib import Path
from .handler import Handler
from ..exceptions import ValidationException
from ..exceptions import ConfigurationException, ValidationException
import torch
import whisper
import whisper.utils
def _do_whisper_transcribe(inpath, outpath, modelname, modeldir):
def _do_whisper_transcribe(inpath,
outpath,
device,
modelname,
modeldir,
language=None):
"""
Transcribe the given file at 'inpath' to a VTT file at 'outpath'
using the Whisper engine.
@ -17,19 +24,39 @@ def _do_whisper_transcribe(inpath, outpath, modelname, modeldir):
Should be called asynchronously.
"""
logger = logging.getLogger('play-daemon.SubtitlesHandler.Whisper')
logger.info(f"Starting whisper transcription job for {inpath}")
whisperModel = whisper.load_model(
modelname,
download_root=modeldir)
result = whisper.transcribe(whisperModel, inpath)
language = result['language']
logger.info(f"Detected language '{language}' in '{inpath}'")
vttWriter = whisper.utils.WriteVTT(path.dirname(outpath))
vttWriter.always_include_hours = True
logger = logging.getLogger(
'play-daemon.SubtitlesHandler._do_whisper_transcribe')
logger.info(f'Starting whisper transcription job for {inpath}.')
try:
whisperModel = whisper.load_model(
modelname,
download_root=modeldir,
device=device)
start = time.time()
result = whisper.transcribe(whisperModel,
str(inpath),
language=language,
word_timestamps=True)
end = time.time()
if language is None:
read_language = result['language']
logger.info(f"Detected language '{read_language}' in {inpath}.")
vttWriter = whisper.utils.WriteVTT(str(outpath.parent))
vttWriter.always_include_hours = True
except Exception as e:
logger.error(f'Transcription of {inpath} failed.')
logger.error(e, exc_info=e)
raise e
with open(outpath, 'w') as f:
vttWriter.write_result(result, f)
logger.info(f"Finished whisper transcription job for {inpath}")
vttWriter.write_result(result, f, {'max_line_width': None,
'max_line_count': 3,
'max_words_per_line': 15,
'highlight_words': False})
elapsed = time.strftime('%H:%M:%S', time.gmtime(end - start))
logger.info('Finished whisper transcription job '
f'for {inpath} in {elapsed}.')
return outpath
@Handler.register
@ -37,6 +64,85 @@ class SubtitlesHandler(Handler):
"""
This class handles package subtitles.
"""
def __init__(self,
handlerqueue,
collector,
worker,
ldap,
tempdir,
config,
device='cpu'):
super().__init__(handlerqueue,
collector,
worker,
ldap,
tempdir,
config)
self.jobsize = int(config['jobsize'])
self.whispermodel = config['whispermodel']
self.modeldir = config['modeldir']
self.device = device
@classmethod
def instantiate(cls,
handlerqueue,
collector,
worker,
ldap,
tempdir,
config):
"""
Returns a list SubtitlesHandlers.
Instantiation behaviour is governed by two configuration values:
device and count. Both are optional.
If device is not specified, the device will be determined by
GPU availability.
If only device is specified and the device is GPU, a handler is
created for each avilable GPU. If no GPUs are available, an exception
is raised.
If only device is provided and the device is CPU, a single handler
is created.
If count is provided, that number of handlers will be created. If the
device is GPU, and count is greater than the number of available GPUs,
an exception is raised.
"""
gpu_count = torch.cuda.device_count()
if 'device' in config:
device = config['device']
else:
if gpu_count > 0:
device = 'gpu'
else:
device = 'cpu'
if device == 'cpu':
devices = [device for i in range(int(config.get('count', 1)))]
else:
if gpu_count == 0:
raise ConfigurationException('GPU requested '
'but none available.')
count = int(config.get('count', gpu_count))
if count > gpu_count:
raise ConfigurationException(f'{count} workers requested '
f'but only {gpu_count} '
'GPUs available.')
devices = [f'cuda:{i}' for i in range(count)]
return [cls(handlerqueue,
collector,
worker,
ldap,
tempdir,
config,
dev)
for dev in devices]
@classmethod
def wants(cls, jobspec, existing_package):
@ -101,8 +207,8 @@ class SubtitlesHandler(Handler):
continue
if 'upload_dir' not in jobspec:
raise ValidationException("upload_dir missing")
subspath = path.join(jobspec['upload_dir'], subsfile)
if not path.isfile(subspath):
subspath = Path(jobspec['upload_dir']) / subsfile
if not subspath.is_file():
raise ValidationException(
f"Error for subtitle track {name}: "
f"{subspath} is not a valid file")
@ -120,14 +226,16 @@ class SubtitlesHandler(Handler):
transcribes = []
resultfiles = {}
loglevel = self.logger.getEffectiveLevel()
for trackname, item in jobspec.get('generate_subtitles', {}).items():
sourcename = item['source']
language = item.get('language', None)
sourcepath = None
source_from_job = jobspec.get('sources', {}).get(sourcename, {})
file_from_job = source_from_job.get('video', None)
if file_from_job:
sourcepath = path.join(jobspec['upload_dir'], file_from_job)
sourcepath = Path(jobspec['upload_dir']) / file_from_job
else:
existing_source = existing_package['sources'][sourcename]
# Sorting the available resolutions in ascending numeric order
@ -135,32 +243,37 @@ class SubtitlesHandler(Handler):
key=int)
# Picking the smallest resolution,
# since the sound quality is always the same
sourcepath = path.join(
existing_package.basedir,
existing_source['video'][resolutions_sorted[0]])
basedir = Path(existing_package.basedir)
sourcefile = existing_source['video'][resolutions_sorted[0]]
sourcepath = basedir / sourcefile
outpath = path.join(tempdir, f"{sourcename}.vtt")
outpath = tempdir / f"{sourcename}.vtt"
transcribe = self.asyncjob(int(self.config['jobsize']),
transcribe = self.asyncjob(self.jobsize,
_do_whisper_transcribe,
(sourcepath,
outpath,
self.config['whispermodel'],
self.config['modeldir']))
self.device,
self.whispermodel,
self.modeldir),
{'language': language})
transcribes.append(transcribe)
resultfiles[trackname] = outpath
self.logger.debug("Waiting for transcribes")
# Wait for transcribes to finish
for item in transcribes:
item.wait()
try:
for item in transcribes:
item.wait()
except Exception as e:
self.logger.error(e, exc_info=e)
self.logger.debug("Done, making apply_func")
def apply_func(package):
for name, subsfile in jobspec.get('subtitles', {}).items():
subspath = None
if subsfile:
subspath = path.join(jobspec['upload_dir'], subsfile)
subspath = Path(jobspec['upload_dir']) / subsfile
package.set_subtitle_track(name, subspath)
for name, subspath in resultfiles.items():

@ -19,6 +19,7 @@ canonical_jobspec = {
'thumb': str,
'subtitles': {str: str},
'generate_subtitles': {str: {'type': str,
'language': str,
'source': str}},
'sources': {str: {'poster': str,
'playAudio': bool,