Getting close to mvp, just need the callback handling

This commit is contained in:
Erik Thuning 2024-05-03 14:58:32 +02:00
parent 08a85a1386
commit cd7901880d
9 changed files with 164 additions and 96 deletions

@ -1,5 +1,6 @@
import logging
import logging.handlers
import multiprocessing as mp
from configparser import ConfigParser
from pathlib import Path
@ -7,6 +8,7 @@ from queue import Queue
from .deduplogger import DedupLogHandler
from .queuereader import QueueReader
from .notifierthread import NotifierThread
from .whisperthread import WhisperThread
@ -59,19 +61,28 @@ class Pipeline:
def start(self):
self.logger.info('Starting')
jobqueue = Queue()
queuedir = Path(self.config['whisper-daemon']['queuedir'])
modeldir = Path(self.config['whisper-daemon']['modeldir'])
donedir = Path(self.config['whisper-daemon']['donedir'])
model = self.config['whisper-daemon']['model']
worker_config = self.config['worker']
# The resultqueue must be a multiprocessing queue since it will be
# receiving items from the separate whisper process.
# Otherwise sticking to basic queue.
jobqueue = Queue()
resultqueue = mp.Queue()
self.watcher = QueueReader(jobqueue, queuedir)
self.whisper = WhisperThread(jobqueue,
resultqueue,
donedir,
model,
modeldir,
worker_config)
self.notifier = NotifierThread(resultqueue)
self.notifier.start()
self.whisper.start()
self.watcher.start()
@ -85,5 +96,7 @@ class Pipeline:
self.watcher.shutdown()
self.whisper.shutdown()
self.notifier.shutdown()
self.running = False
self.logger.debug('Stopped')

@ -1,2 +1,5 @@
class ConfigurationException(Exception):
pass
class JobSpecificationException(Exception):
pass

@ -4,6 +4,10 @@ import logging
from dataclasses import dataclass
from pathlib import Path
import whisper.utils
from .exceptions import JobSpecificationException
@dataclass
class Pending:
@ -25,6 +29,7 @@ class Job:
self.success = None
self.jobid = jobid
self.queuefile = queuefile
self.language = None
self.logger = logging.getLogger(f'whisper-daemon.Job[{jobid}]')
@ -33,6 +38,13 @@ class Job:
self.callback = jobdata['callback']
self.jobfile = Path(jobdata['jobfile'])
self.origin = jobdata['origin']
if 'language' in jobdata:
self.language = jobdata['language']
self.outputformat = jobdata['outputformat']
if self.outputformat not in ['txt', 'vtt', 'srt', 'tsv', 'json']:
raise JobSpecificationException(
f'Invalid output format: {self.outputformat}')
self.logger.info('Initialized')

@ -0,0 +1,15 @@
from queue import Queue
from .job import Job
from .queuethread import QueueThread
@QueueThread.register
class NotifierThread(QueueThread):
def __init__(self, inqueue: Queue):
super().__init__(inqueue)
def _process(self, item: Job):
from pprint import pformat
self.logger.info('%s finished, %s',
item.jobid, pformat(item.get_result()))

@ -22,6 +22,8 @@ class QueueReader(FileSystemEventHandler):
def start(self):
self.logger.debug('Starting')
self.logger.info("Reading queue items from '%s', filtering on r'%s'",
self.queuedir, self.pattern.pattern)
for f in sorted(self.queuedir.iterdir(), key=os.path.getmtime):
self._process(f)
return self.watcher.start()
@ -31,22 +33,24 @@ class QueueReader(FileSystemEventHandler):
return self.watcher.stop()
def _process(self, path: Path):
self.logger.debug('Evaluating %s', path)
if not path.is_file():
self.logger.debug('Ignoring, %s is not a regular file', path)
return
if not self.pattern.fullmatch(path.name):
self.logger.debug(
'Ignoring, %s does not match expected name format', path)
return
self.logger.debug('Processing queue item: %s', path)
filename = path.name
jobid = path.stem
self.logger.debug('Evaluating %s', filename)
if not path.is_file():
self.logger.debug('Ignoring, %s is not a regular file', filename)
return
if not self.pattern.fullmatch(filename):
self.logger.debug(
'Ignoring, %s does not match expected name format', filename)
return
self.logger.debug('Processing queue item %s', jobid)
try:
job = Job(jobid, path)
self.queue.put(job)
self.logger.info('Queued job: %s', jobid)
self.logger.info('Queued job %s', jobid)
except Exception as e:
self.logger.error('Error when processing %s: %s', jobid, e)
self.logger.error('Error when processing %s', jobid, exc_info=e)
def on_created(self, event):
self._process(Path(event.src_path))

@ -34,19 +34,16 @@ class QueueThread(Thread, metaclass=ABCMeta):
except Empty:
sleep(1)
self.logger.debug('Stopped')
return
def shutdown(self):
self.logger.debug('Stopping')
self._kill.set()
return
def put(self, item):
"""
Add an item to be processed by this thread.
"""
self.queue.put(item)
return item
@abstractmethod
def _process(self, item):

@ -0,0 +1,81 @@
import logging
import os
import time
from pathlib import Path
from queue import Empty, Queue
import whisper
import whisper.utils
def whisper_process(inqueue: Queue,
outqueue: Queue,
output_base_dir: Path,
modelname: str,
modeldir: Path,
loglevel: int,
device: str):
logger = logging.getLogger(
'whisper-daemon.WhisperThread.worker')
logger.setLevel(loglevel)
logger.debug('Starting on %s', device)
rundevice = 'cpu'
if device != 'cpu':
os.environ['CUDA_VISIBLE_DEVICES'] = device
rundevice = 'cuda'
whisperModel = whisper.load_model(modelname,
download_root=modeldir,
device=rundevice)
logger.debug('Model loaded, now processing jobs')
while True:
try:
job = inqueue.get_nowait()
except Empty:
time.sleep(1)
continue
jobid = job.jobid
logger.debug('%s - Picked up job', jobid)
language = job.language
inpath = job.jobfile
outputdir = output_base_dir / jobid
outputdir.mkdir()
writer = whisper.utils.get_writer(job.outputformat, outputdir)
if job.outputformat == 'vtt':
writer.always_include_hours = True
outpath = outputdir / inpath.with_suffix('.' + writer.extension).name
logger.debug('%s - inpath=%s, outpath=%s', jobid, inpath, outpath)
logger.info('%s - Starting whisper transcription', jobid)
start = time.time()
try:
result = whisper.transcribe(whisperModel,
str(inpath),
language=language,
word_timestamps=True)
end = time.time()
if language is None:
out_language = result['language']
logger.info("%s - Detected language '%s'",
jobid, out_language)
else:
out_language = language
with open(outpath, 'w') as f:
writer.write_result(result, f, {'max_line_width': None,
'max_line_count': 3,
'max_words_per_line': 15,
'highlight_words': False})
elapsed = time.strftime('%H:%M:%S', time.gmtime(end - start))
logger.info('%s - Transcription finished in %s', jobid, elapsed)
job.finish(outpath)
except Exception as e:
job.fail(e)
outqueue.put(job)

@ -1,85 +1,22 @@
import logging
import os
import time
from collections.abc import MutableMapping
from multiprocessing import Process, Queue
from pathlib import Path
from queue import Empty
from queue import Queue
import multiprocessing as mp
import torch
import whisper
import whisper.utils
from .exceptions import ConfigurationException
from .job import Job
from .queuethread import QueueThread
def _whisper_processor(inqueue: Queue,
outqueue: Queue,
device: str,
modelname: str,
modeldir: Path,
loglevel: int):
logger = logging.getLogger(
'whisper-daemon.WhisperThread.worker')
logger.setLevel(loglevel)
logger.debug('Starting on %s', device)
rundevice = 'cpu'
if device != 'cpu':
os.environ['CUDA_VISIBLE_DEVICES'] = device
rundevice = 'cuda'
whisperModel = whisper.load_model(modelname,
download_root=modeldir,
device=rundevice)
while True:
try:
job = inqueue.get_nowait()
except Empty:
time.sleep(1)
continue
inpath = job.jobfile
outpath = None # This needs to be determined based on requested format
logger.info('Starting whisper transcription job for %s.', inpath)
start = time.time()
try:
result = whisper.transcribe(whisperModel,
str(inpath),
language=None,
word_timestamps=True)
end = time.time()
if language is None:
out_language = result['language']
logger.info(
"Detected language '%s' in %s.", out_language, inpath)
else:
out_language = language
vttWriter = whisper.utils.WriteVTT(str(outpath.parent))
vttWriter.always_include_hours = True
with open(outpath, 'w') as f:
vttWriter.write_result(result, f, {'max_line_width': None,
'max_line_count': 3,
'max_words_per_line': 15,
'highlight_words': False})
elapsed = time.strftime('%H:%M:%S', time.gmtime(end - start))
logger.info('Finished whisper transcription job for %s in %s.',
inpath, elapsed)
except Exception as e:
outqueue.put((job, 'Failure', e))
else:
outqueue.put((job, 'Success'))
from .whisperprocess import whisper_process
@QueueThread.register
class WhisperThread(QueueThread):
def __init__(self,
inqueue: Queue,
outqueue: mp.Queue,
donedir: Path,
model: str,
modeldir: Path,
workerconfig: MutableMapping):
@ -108,9 +45,11 @@ class WhisperThread(QueueThread):
an exception is raised.
"""
super().__init__(inqueue)
self.outqueue = outqueue
self.donedir = donedir
self.modeldir = modeldir
self.model = model
self.outqueue = Queue()
self.workerqueue = mp.Queue()
gpu_count = torch.cuda.device_count()
if 'device' in workerconfig:
@ -136,21 +75,22 @@ class WhisperThread(QueueThread):
f'but only {gpu_count} GPUs available.')
devices = [f'cuda:{i}' for i in range(worker_count)]
self.workers = [self.spawn_worker(device) for device in devices]
self.logger.debug('Spawning %s workers on %s', worker_count, device)
self.workers = [self.spawn_worker(dev) for dev in devices]
def spawn_worker(self, device: str):
worker_process = Process(
target=_whisper_processor,
args=(self.queue,
self.outqueue,
device,
self.model,
self.modeldir,
self.logger.getEffectiveLevel()))
worker_process = mp.Process(target=whisper_process,
args=(self.workerqueue,
self.outqueue,
self.donedir,
self.model,
self.modeldir,
self.logger.getEffectiveLevel(),
device))
worker_process.daemon = True
worker_process.start()
return worker_process
def _process(self, item: Job):
self.workerqueue.put(item)
self.logger.info('Processed %s', item.jobid)

3
requirements.txt Normal file

@ -0,0 +1,3 @@
openai-whisper
requests
watchdog