Promoted subtitles_whisper_hack from hack status.

- Overwrote subtitles_whisper with subtitles_whisper_hack
 - Moved the comment about it being a hack to the more specific hack spot
 - Updated __init__.py to import from subtitles_whisper

The work invested in _hack is now significant enough that if automatic gpu
detection becomes viable again, the only meaningful starting point is
the _hack implementation. It's probably a good idea to remove _serial as well,
but leaving it around for now.
This commit is contained in:
Erik Thuning 2024-10-14 16:34:46 +02:00
parent d6206f1e6b
commit 27598d4401
3 changed files with 104 additions and 367 deletions

@ -4,7 +4,7 @@ from .audio import AudioHandler
from .metadata import MetadataHandler
from .poster import PosterHandler
from .slides import SlidesHandler
from .subtitles_whisper_hack import SubtitlesWhisperHandler
from .subtitles_whisper import SubtitlesWhisperHandler
from .subtitles_import import SubtitlesImportHandler
from .thumbnail import ThumbnailHandler
from .transcode import TranscodeHandler

@ -1,65 +1,79 @@
import logging
import os
import time
from multiprocessing import Process, Queue
from pathlib import Path
from queue import Empty
from time import sleep
from .handler import Handler
from ..exceptions import ConfigurationException, ValidationException
from ..exceptions import ValidationException
import torch
import whisper
import whisper.utils
def _do_whisper_transcribe(inpath,
outpath,
device,
modelname,
modeldir,
loglevel,
language=None,):
"""
Transcribe the given file at 'inpath' to a VTT file at 'outpath'
using the Whisper engine.
Should be called asynchronously.
"""
def _whisper_processor(inqueue,
outqueue,
device,
modelname,
modeldir,
loglevel):
logger = logging.getLogger(
'play-daemon.SubtitlesWhisperHandler._do_transcribe')
'play-daemon.SubtitlesWhisperHandler._whisper_processor')
logger.setLevel(loglevel)
logger.info('Starting whisper transcription job for %s.', inpath)
try:
whisperModel = whisper.load_model(
modelname,
download_root=modeldir,
device=device)
start = time.time()
result = whisper.transcribe(whisperModel,
str(inpath),
language=language,
word_timestamps=True)
end = time.time()
if language is None:
out_language = result['language']
logger.info("Detected language '%s' in %s.", out_language, inpath)
else:
out_language = language
vttWriter = whisper.utils.WriteVTT(str(outpath.parent))
vttWriter.always_include_hours = True
except Exception as e:
logger.error('Transcription of %s failed.', inpath, exc_info=e)
raise e
logger.debug('Starting _whisper_processor on %s', device)
with open(outpath, 'w') as f:
vttWriter.write_result(result, f, {'max_line_width': None,
'max_line_count': 3,
'max_words_per_line': 15,
'highlight_words': False})
elapsed = time.strftime('%H:%M:%S', time.gmtime(end - start))
logger.info('Finished whisper transcription job for %s in %s.',
inpath, elapsed)
return (outpath, out_language)
rundevice = 'cpu'
if device != 'cpu':
# This is a hack. In order to get around a bug in the triton library
# that makes it impossible to assign work to a gpu other that the
# first, we only expose the desired gpu to the transcription process.
# This has the side-effect of precluding any sort of
# automatic gpu detection.
os.environ['CUDA_VISIBLE_DEVICES'] = device
rundevice = 'cuda'
whisperModel = whisper.load_model(modelname,
download_root=modeldir,
device=rundevice)
while True:
try:
trackname, inpath, outpath, language = inqueue.get_nowait()
except Empty:
sleep(1)
continue
logger.info('Starting whisper transcription job for %s.', inpath)
start = time.time()
try:
result = whisper.transcribe(whisperModel,
str(inpath),
language=language,
word_timestamps=True)
end = time.time()
if language is None:
out_language = result['language']
logger.info(
"Detected language '%s' in %s.", out_language, inpath)
else:
out_language = language
vttWriter = whisper.utils.WriteVTT(str(outpath.parent))
vttWriter.always_include_hours = True
with open(outpath, 'w') as f:
vttWriter.write_result(result, f, {'max_line_width': None,
'max_line_count': 3,
'max_words_per_line': 15,
'highlight_words': False})
elapsed = time.strftime('%H:%M:%S', time.gmtime(end - start))
logger.info('Finished whisper transcription job for %s in %s.',
inpath, elapsed)
except Exception as e:
outqueue.put(('Failure', (trackname, e)))
else:
outqueue.put(('Success', trackname))
@Handler.register
@ -74,18 +88,25 @@ class SubtitlesWhisperHandler(Handler):
ldap,
tempdir,
config,
device='cpu'):
device):
super().__init__(handlerqueue,
collector,
worker,
ldap,
tempdir,
config)
self.jobsize = int(config['jobsize'])
self.whispermodel = config['whispermodel']
self.modeldir = config['modeldir']
self.device = device
self.logger.debug('Created SubtitlesWhisperHandler on %s', device)
self.subprocess_inqueue = Queue()
self.subprocess_outqueue = Queue()
self.transcribe_process = Process(
target=_whisper_processor,
args=(self.subprocess_inqueue,
self.subprocess_outqueue,
device,
config['whispermodel'],
config['modeldir'],
self.logger.getEffectiveLevel()))
self.transcribe_process.daemon = True
self.transcribe_process.start()
@classmethod
def instantiate(cls,
@ -116,28 +137,22 @@ class SubtitlesWhisperHandler(Handler):
an exception is raised.
"""
gpu_count = torch.cuda.device_count()
if 'device' in config:
device = config['device']
else:
if gpu_count > 0:
device = 'gpu'
else:
device = 'cpu'
raise Exception("'device' is a mandatory configuration "
"setting for SubtitlesWhisperHandler.")
if device == 'cpu':
devices = [device for i in range(int(config.get('count', 1)))]
else:
if gpu_count == 0:
raise ConfigurationException('GPU requested '
'but none available.')
id_config_string = config.get('gpu_ids', '0')
ids = [s.strip() for s in id_config_string.split(',')]
per_gpu = int(config.get('threads_per_gpu', 1))
count = int(config.get('count', gpu_count))
if count > gpu_count:
raise ConfigurationException(f'{count} workers requested '
f'but only {gpu_count} '
'GPUs available.')
devices = [f'cuda:{i}' for i in range(count)]
# 'for j in range(per_gpu)' repeats each gpu id
# the appropriate number of times
devices = [i for i in ids for j in range(per_gpu)]
return [cls(handlerqueue,
collector,
@ -211,13 +226,11 @@ class SubtitlesWhisperHandler(Handler):
Replaced subtitle tracks are deleted.
"""
transcribes = []
transcribes = {}
resultfiles = {}
loglevel = self.logger.getEffectiveLevel()
for trackname, item in jobspec.get('generate_subtitles', {}).items():
sourcename = item['source']
generated_name = sourcename.replace('_', '__').replace(' ', '_')
language = item.get('language', None)
sourcepath = None
source_from_job = jobspec.get('sources', {}).get(sourcename, {})
@ -235,29 +248,33 @@ class SubtitlesWhisperHandler(Handler):
sourcefile = existing_source['video'][resolutions_sorted[0]]
sourcepath = basedir / sourcefile
generated_name = sourcename.replace('_', '__').replace(' ', '_')
generated_name = trackname.replace('_', '__').replace(' ', '_')
outpath = tempdir / f"{generated_name}.vtt"
transcribe = self.asyncjob(self.jobsize,
_do_whisper_transcribe,
(sourcepath,
outpath,
self.device,
self.whispermodel,
self.modeldir,
loglevel),
{'language': language})
transcribes.append(transcribe)
self.subprocess_inqueue.put((trackname,
sourcepath,
outpath,
language))
transcribes[trackname] = 'pending'
resultfiles[trackname] = outpath
self.logger.info("Waiting for transcribes")
# Wait for transcribes to finish
for item in transcribes:
item.wait()
self.logger.info("Waiting for transcribes")
while 'pending' in transcribes.values():
result, contents = self.subprocess_outqueue.get()
if result == 'Success':
trackname = contents
transcribes[trackname] = 'done'
else:
trackname, e = contents
transcribes[trackname] = 'failed'
self.logger.error('Generation of %s failed.',
trackname, exc_info=e)
self.logger.info("Done, making apply_func")
def apply_func(package):
for name, subspath in resultfiles.items():
package.set_subtitle_track(name, subspath)
if transcribes[name] == 'done':
package.set_subtitle_track(name, subspath)
return apply_func

@ -1,280 +0,0 @@
import logging
import os
import time
from multiprocessing import Process, Queue
from pathlib import Path
from queue import Empty
from time import sleep
from .handler import Handler
from ..exceptions import ValidationException
import whisper
import whisper.utils
# This is a hack. In order to get around a bug in the triton library that
# makes it impossible to assign work to a gpu other that the first, we only
# expose the desired gpu to the transcription process. This has the
# side-effect of precluding automatic gpu detection, so the number of gpus
# must be explicitly configured.
def _whisper_processor(inqueue,
outqueue,
device,
modelname,
modeldir,
loglevel):
logger = logging.getLogger(
'play-daemon.SubtitlesWhisperHandler._whisper_processor')
logger.setLevel(loglevel)
logger.debug('Starting _whisper_processor on %s', device)
rundevice = 'cpu'
if device != 'cpu':
os.environ['CUDA_VISIBLE_DEVICES'] = device
rundevice = 'cuda'
whisperModel = whisper.load_model(modelname,
download_root=modeldir,
device=rundevice)
while True:
try:
trackname, inpath, outpath, language = inqueue.get_nowait()
except Empty:
sleep(1)
continue
logger.info('Starting whisper transcription job for %s.', inpath)
start = time.time()
try:
result = whisper.transcribe(whisperModel,
str(inpath),
language=language,
word_timestamps=True)
end = time.time()
if language is None:
out_language = result['language']
logger.info(
"Detected language '%s' in %s.", out_language, inpath)
else:
out_language = language
vttWriter = whisper.utils.WriteVTT(str(outpath.parent))
vttWriter.always_include_hours = True
with open(outpath, 'w') as f:
vttWriter.write_result(result, f, {'max_line_width': None,
'max_line_count': 3,
'max_words_per_line': 15,
'highlight_words': False})
elapsed = time.strftime('%H:%M:%S', time.gmtime(end - start))
logger.info('Finished whisper transcription job for %s in %s.',
inpath, elapsed)
except Exception as e:
outqueue.put(('Failure', (trackname, e)))
else:
outqueue.put(('Success', trackname))
@Handler.register
class SubtitlesWhisperHandler(Handler):
"""
This class handles subtitle generation with Whisper.
"""
def __init__(self,
handlerqueue,
collector,
worker,
ldap,
tempdir,
config,
device):
super().__init__(handlerqueue,
collector,
worker,
ldap,
tempdir,
config)
self.subprocess_inqueue = Queue()
self.subprocess_outqueue = Queue()
self.transcribe_process = Process(
target=_whisper_processor,
args=(self.subprocess_inqueue,
self.subprocess_outqueue,
device,
config['whispermodel'],
config['modeldir'],
self.logger.getEffectiveLevel()))
self.transcribe_process.daemon = True
self.transcribe_process.start()
@classmethod
def instantiate(cls,
handlerqueue,
collector,
worker,
ldap,
tempdir,
config):
"""
Returns a list of SubtitlesWhisperHandlers.
Instantiation behaviour is governed by two configuration values:
device and count. Both are optional.
If device is not specified, the device will be determined by
GPU availability.
If only device is specified and the device is GPU, a handler is
created for each avilable GPU. If no GPUs are available, an exception
is raised.
If only device is provided and the device is CPU, a single handler
is created.
If count is provided, that number of handlers will be created. If the
device is GPU, and count is greater than the number of available GPUs,
an exception is raised.
"""
if 'device' in config:
device = config['device']
else:
raise Exception("'device' is a mandatory configuration "
"setting for SubtitlesWhisperHandler.")
if device == 'cpu':
devices = [device for i in range(int(config.get('count', 1)))]
else:
id_config_string = config.get('gpu_ids', '0')
ids = [s.strip() for s in id_config_string.split(',')]
per_gpu = int(config.get('threads_per_gpu', 1))
# 'for j in range(per_gpu)' repeats each gpu id
# the appropriate number of times
devices = [i for i in ids for j in range(per_gpu)]
return [cls(handlerqueue,
collector,
worker,
ldap,
tempdir,
config,
dev)
for dev in devices]
@classmethod
def wants(cls, jobspec, existing_package):
"""
Return True if this handler wants to process this jobspec.
Raises an exception if the job is wanted but doesn't pass validation.
A job is wanted if the job specification contains
a 'generate_subtitles' key.
"""
if 'generate_subtitles' in jobspec:
return cls._validate(jobspec, existing_package)
return False
@classmethod
def _validate(cls, jobspec, existing_package):
"""
Return True if the job is valid for this handler.
Validity requirements are:
- Keys in 'subtitles' and 'generate_subtitles' must be
mututally unique.
- All 'source' values in subtitle generation specifications must be a
valid source name, either one that already exists or one provided
under 'sources' in this job.
- The 'type' value in subtitle generation specifications must be a
supported generator (currently only 'whisper').
"""
super()._validate(jobspec, existing_package)
# Check for duplicate track names
generate_names = jobspec.get('generate_subtitles', {}).keys()
store_names = jobspec.get('subtitles', {}).keys()
common_names = generate_names & store_names
if common_names:
names_string = ', '.join(common_names)
raise ValidationException(
f"Duplicate subtitle track name(s): {names_string}")
# Validate generation tasks
for name, genspec in jobspec.get('generate_subtitles', {}).items():
if genspec['type'] != 'whisper':
raise ValidationException(
"Unsupported subtitle generation job type: "
f"{genspec['type']}")
expected_source = genspec['source']
jobsource = jobspec.get('sources', {}).get(expected_source, {})
existing_source = existing_package['sources'].get(expected_source,
{})
if 'video' not in jobsource and 'video' not in existing_source:
raise ValidationException(f"Subtitle track '{name}' refers "
"to a missing source: "
f"{expected_source}")
return True
def _handle(self, jobspec, existing_package, tempdir):
"""
Return a function to apply changes to the stored package.
All subtitle generation tasks are run before apply_func is returned.
The returned function moves generated subtitle files into the
package's basedir and updates the package metadata.
Replaced subtitle tracks are deleted.
"""
transcribes = {}
resultfiles = {}
for trackname, item in jobspec.get('generate_subtitles', {}).items():
sourcename = item['source']
language = item.get('language', None)
sourcepath = None
source_from_job = jobspec.get('sources', {}).get(sourcename, {})
file_from_job = source_from_job.get('video', None)
if file_from_job:
sourcepath = Path(jobspec['upload_dir']) / file_from_job
else:
existing_source = existing_package['sources'][sourcename]
# Sorting the available resolutions in ascending numeric order
resolutions_sorted = sorted(existing_source['video'].keys(),
key=int)
# Picking the smallest resolution,
# since the sound quality is always the same
basedir = Path(existing_package.basedir)
sourcefile = existing_source['video'][resolutions_sorted[0]]
sourcepath = basedir / sourcefile
generated_name = trackname.replace('_', '__').replace(' ', '_')
outpath = tempdir / f"{generated_name}.vtt"
self.subprocess_inqueue.put((trackname,
sourcepath,
outpath,
language))
transcribes[trackname] = 'pending'
resultfiles[trackname] = outpath
# Wait for transcribes to finish
self.logger.info("Waiting for transcribes")
while 'pending' in transcribes.values():
result, contents = self.subprocess_outqueue.get()
if result == 'Success':
trackname = contents
transcribes[trackname] = 'done'
else:
trackname, e = contents
transcribes[trackname] = 'failed'
self.logger.error('Generation of %s failed.',
trackname, exc_info=e)
self.logger.info("Done, making apply_func")
def apply_func(package):
for name, subspath in resultfiles.items():
if transcribes[name] == 'done':
package.set_subtitle_track(name, subspath)
return apply_func