Testing out a new approach to dispatching subtitle generation jobs.
Hope to solve gpu assignment issues
This commit is contained in:
parent
c34ed2cc62
commit
51991d327b
275
pipeline/handlers/subtitles_whisper_serial.py
Normal file
275
pipeline/handlers/subtitles_whisper_serial.py
Normal file
@ -0,0 +1,275 @@
|
||||
import logging
|
||||
import time
|
||||
|
||||
from multiprocessing import Process, Queue
|
||||
from pathlib import Path
|
||||
from queue import Empty
|
||||
from time import sleep
|
||||
from threading import Event
|
||||
|
||||
from .handler import Handler
|
||||
from ..exceptions import ConfigurationException, ValidationException
|
||||
|
||||
import torch
|
||||
import whisper
|
||||
import whisper.utils
|
||||
|
||||
|
||||
def _whisper_processor(inqueue,
|
||||
outqueue,
|
||||
device,
|
||||
modelname,
|
||||
modeldir,
|
||||
loglevel):
|
||||
logger = logging.getLogger(
|
||||
'play-daemon.SubtitlesWhisperHandler._whisper_processor')
|
||||
logger.setLevel(loglevel)
|
||||
logger.debug(f'Starting _whisper_processor on {device}')
|
||||
whisperModel = whisper.load_model(modelname,
|
||||
download_root=modeldir,
|
||||
device=device)
|
||||
while True:
|
||||
try:
|
||||
trackname, inpath, outpath, language = inqueue.get_nowait()
|
||||
except Empty:
|
||||
sleep(1)
|
||||
continue
|
||||
|
||||
logger.info(f'Starting whisper transcription job for {inpath}.')
|
||||
start = time.time()
|
||||
try:
|
||||
result = whisper.transcribe(whisperModel,
|
||||
str(inpath),
|
||||
language=language,
|
||||
word_timestamps=True)
|
||||
end = time.time()
|
||||
if language is None:
|
||||
out_language = result['language']
|
||||
logger.info(f"Detected language '{out_language}' in {inpath}.")
|
||||
else:
|
||||
out_language = language
|
||||
|
||||
vttWriter = whisper.utils.WriteVTT(str(outpath.parent))
|
||||
vttWriter.always_include_hours = True
|
||||
with open(outpath, 'w') as f:
|
||||
vttWriter.write_result(result, f, {'max_line_width': None,
|
||||
'max_line_count': 3,
|
||||
'max_words_per_line': 15,
|
||||
'highlight_words': False})
|
||||
elapsed = time.strftime('%H:%M:%S', time.gmtime(end - start))
|
||||
logger.info('Finished whisper transcription job '
|
||||
f'for {inpath} in {elapsed}.')
|
||||
except Exception as e:
|
||||
outqueue.put(('Failure', (trackname, e)))
|
||||
else:
|
||||
outqueue.put(('Success', trackname))
|
||||
|
||||
|
||||
@Handler.register
|
||||
class SubtitlesWhisperHandler(Handler):
|
||||
"""
|
||||
This class handles subtitle generation with Whisper.
|
||||
"""
|
||||
def __init__(self,
|
||||
handlerqueue,
|
||||
collector,
|
||||
worker,
|
||||
ldap,
|
||||
tempdir,
|
||||
config,
|
||||
device='cpu'):
|
||||
super().__init__(handlerqueue,
|
||||
collector,
|
||||
worker,
|
||||
ldap,
|
||||
tempdir,
|
||||
config)
|
||||
self.subprocess_inqueue = Queue()
|
||||
self.subprocess_outqueue = Queue()
|
||||
self.transcribe_process = Process(target=_whisper_processor,
|
||||
args=(self.subprocess_inqueue,
|
||||
self.subprocess_outqueue,
|
||||
device,
|
||||
config['whispermodel'],
|
||||
config['modeldir'],
|
||||
self.logger.getEffectiveLevel()))
|
||||
self.transcribe_process.daemon = True
|
||||
self.transcribe_process.start()
|
||||
|
||||
@classmethod
|
||||
def instantiate(cls,
|
||||
handlerqueue,
|
||||
collector,
|
||||
worker,
|
||||
ldap,
|
||||
tempdir,
|
||||
config):
|
||||
"""
|
||||
Returns a list of SubtitlesWhisperHandlers.
|
||||
|
||||
Instantiation behaviour is governed by two configuration values:
|
||||
device and count. Both are optional.
|
||||
|
||||
If device is not specified, the device will be determined by
|
||||
GPU availability.
|
||||
|
||||
If only device is specified and the device is GPU, a handler is
|
||||
created for each avilable GPU. If no GPUs are available, an exception
|
||||
is raised.
|
||||
|
||||
If only device is provided and the device is CPU, a single handler
|
||||
is created.
|
||||
|
||||
If count is provided, that number of handlers will be created. If the
|
||||
device is GPU, and count is greater than the number of available GPUs,
|
||||
an exception is raised.
|
||||
"""
|
||||
|
||||
gpu_count = torch.cuda.device_count()
|
||||
if 'device' in config:
|
||||
device = config['device']
|
||||
else:
|
||||
if gpu_count > 0:
|
||||
device = 'gpu'
|
||||
else:
|
||||
device = 'cpu'
|
||||
|
||||
if device == 'cpu':
|
||||
devices = [device for i in range(int(config.get('count', 1)))]
|
||||
else:
|
||||
if gpu_count == 0:
|
||||
raise ConfigurationException('GPU requested '
|
||||
'but none available.')
|
||||
|
||||
count = int(config.get('count', gpu_count))
|
||||
if count > gpu_count:
|
||||
raise ConfigurationException(f'{count} workers requested '
|
||||
f'but only {gpu_count} '
|
||||
'GPUs available.')
|
||||
devices = [f'cuda:{i}' for i in range(count)]
|
||||
|
||||
return [cls(handlerqueue,
|
||||
collector,
|
||||
worker,
|
||||
ldap,
|
||||
tempdir,
|
||||
config,
|
||||
dev)
|
||||
for dev in devices]
|
||||
|
||||
@classmethod
|
||||
def wants(cls, jobspec, existing_package):
|
||||
"""
|
||||
Return True if this handler wants to process this jobspec.
|
||||
Raises an exception if the job is wanted but doesn't pass validation.
|
||||
|
||||
A job is wanted if the job specification contains
|
||||
a 'generate_subtitles' key.
|
||||
"""
|
||||
if 'generate_subtitles' in jobspec:
|
||||
return cls._validate(jobspec, existing_package)
|
||||
return False
|
||||
|
||||
@classmethod
|
||||
def _validate(cls, jobspec, existing_package):
|
||||
"""
|
||||
Return True if the job is valid for this handler.
|
||||
|
||||
Validity requirements are:
|
||||
- Keys in 'subtitles' and 'generate_subtitles' must be
|
||||
mututally unique.
|
||||
- All 'source' values in subtitle generation specifications must be a
|
||||
valid source name, either one that already exists or one provided
|
||||
under 'sources' in this job.
|
||||
- The 'type' value in subtitle generation specifications must be a
|
||||
supported generator (currently only 'whisper').
|
||||
"""
|
||||
super()._validate(jobspec, existing_package)
|
||||
# Check for duplicate track names
|
||||
generate_names = jobspec.get('generate_subtitles', {}).keys()
|
||||
store_names = jobspec.get('subtitles', {}).keys()
|
||||
common_names = generate_names & store_names
|
||||
if common_names:
|
||||
names_string = ', '.join(common_names)
|
||||
raise ValidationException(
|
||||
f"Duplicate subtitle track name(s): {names_string}")
|
||||
|
||||
# Validate generation tasks
|
||||
for name, genspec in jobspec.get('generate_subtitles', {}).items():
|
||||
if genspec['type'] != 'whisper':
|
||||
raise ValidationException(
|
||||
"Unsupported subtitle generation job type: "
|
||||
f"{genspec['type']}")
|
||||
expected_source = genspec['source']
|
||||
jobsource = jobspec.get('sources', {}).get(expected_source, {})
|
||||
existing_source = existing_package['sources'].get(expected_source,
|
||||
{})
|
||||
if 'video' not in jobsource and 'video' not in existing_source:
|
||||
raise ValidationException(f"Subtitle track '{name}' refers "
|
||||
"to a missing source: "
|
||||
f"{expected_source}")
|
||||
return True
|
||||
|
||||
def _handle(self, jobspec, existing_package, tempdir):
|
||||
"""
|
||||
Return a function to apply changes to the stored package.
|
||||
|
||||
All subtitle generation tasks are run before apply_func is returned.
|
||||
The returned function moves generated subtitle files into the
|
||||
package's basedir and updates the package metadata.
|
||||
Replaced subtitle tracks are deleted.
|
||||
"""
|
||||
|
||||
transcribes = {}
|
||||
resultfiles = {}
|
||||
|
||||
for trackname, item in jobspec.get('generate_subtitles', {}).items():
|
||||
sourcename = item['source']
|
||||
generated_name = sourcename.replace('_', '__').replace(' ', '_')
|
||||
language = item.get('language', None)
|
||||
sourcepath = None
|
||||
source_from_job = jobspec.get('sources', {}).get(sourcename, {})
|
||||
file_from_job = source_from_job.get('video', None)
|
||||
if file_from_job:
|
||||
sourcepath = Path(jobspec['upload_dir']) / file_from_job
|
||||
else:
|
||||
existing_source = existing_package['sources'][sourcename]
|
||||
# Sorting the available resolutions in ascending numeric order
|
||||
resolutions_sorted = sorted(existing_source['video'].keys(),
|
||||
key=int)
|
||||
# Picking the smallest resolution,
|
||||
# since the sound quality is always the same
|
||||
basedir = Path(existing_package.basedir)
|
||||
sourcefile = existing_source['video'][resolutions_sorted[0]]
|
||||
sourcepath = basedir / sourcefile
|
||||
|
||||
generated_name = sourcename.replace('_', '__').replace(' ', '_')
|
||||
outpath = tempdir / f"{generated_name}.vtt"
|
||||
|
||||
self.subprocess_inqueue.put((trackname,
|
||||
sourcepath,
|
||||
outpath,
|
||||
language))
|
||||
transcribes[trackname] = 'pending'
|
||||
resultfiles[trackname] = outpath
|
||||
|
||||
# Wait for transcribes to finish
|
||||
self.logger.info("Waiting for transcribes")
|
||||
while 'pending' in transcribes.values():
|
||||
result, contents = self.subprocess_outqueue.get()
|
||||
if result == 'Success':
|
||||
trackname = contents
|
||||
transcribes[trackname] = 'done'
|
||||
else:
|
||||
trackname, e = contents
|
||||
transcribes[trackname] = 'failed'
|
||||
self.logger.error(f'Generation of {trackname} failed.',
|
||||
exc_info=e)
|
||||
self.logger.info("Done, making apply_func")
|
||||
|
||||
def apply_func(package):
|
||||
for name, subspath in resultfiles.items():
|
||||
if transcribes[name] == 'done':
|
||||
package.set_subtitle_track(name, subspath)
|
||||
|
||||
return apply_func
|
Loading…
x
Reference in New Issue
Block a user