A whisper thread has been incorporated, not quite working yet

This commit is contained in:
Erik Thuning 2024-05-02 16:48:33 +02:00
parent fe64ae1ad8
commit 08a85a1386
7 changed files with 324 additions and 25 deletions

@ -1,17 +1,25 @@
import logging
import logging.handlers
from configparser import ConfigParser
from pathlib import Path
from queue import Queue
from .deduplogger import DedupLogHandler
from .queuereader import QueueReader
from .whisperthread import WhisperThread
class Pipeline:
def __init__(self, config):
def __init__(self, config: ConfigParser):
self.running = False
self.config = config
# Since all keys in worker are optional, also make that section
# itself optional.
if 'worker' not in config:
config.add_section('worker')
# Since all keys in logging are optional, also make that section
# itself optional.
if 'logging' not in config:
@ -52,9 +60,19 @@ class Pipeline:
def start(self):
self.logger.info('Starting')
jobqueue = Queue()
self.watcher = QueueReader(self.config['whisper-daemon']['queuedir'],
jobqueue)
queuedir = Path(self.config['whisper-daemon']['queuedir'])
modeldir = Path(self.config['whisper-daemon']['modeldir'])
model = self.config['whisper-daemon']['model']
worker_config = self.config['worker']
self.watcher = QueueReader(jobqueue, queuedir)
self.whisper = WhisperThread(jobqueue,
model,
modeldir,
worker_config)
self.whisper.start()
self.watcher.start()
self.running = True
@ -62,9 +80,10 @@ class Pipeline:
def stop(self):
if not self.running:
raise Exception("Not running!")
raise Exception('Not running!')
self.logger.info('Stopping')
self.watcher.shutdown()
self.whisper.shutdown()
self.running = False
self.logger.debug('Stopped')

2
pipeline/exceptions.py Normal file

@ -0,0 +1,2 @@
class ConfigurationException(Exception):
pass

@ -1,20 +1,53 @@
import json
import logging
from dataclasses import dataclass
from pathlib import Path
class Job:
def __init__(self, queuefile: Path):
self.queuefile = queuefile
self.jobid = queuefile.stem
@dataclass
class Pending:
pass
self.logger = logging.getLogger(f'play-daemon.Job[{jobid}]')
@dataclass
class Success:
resultfile: Path
@dataclass
class Failure:
exception: Exception
class Job:
def __init__(self, jobid: str, queuefile: Path):
self.success = None
self.jobid = jobid
self.queuefile = queuefile
self.logger = logging.getLogger(f'whisper-daemon.Job[{jobid}]')
with open(queuefile) as f:
jobdata = json.load(f)
self.callback = jobdata.callback
self.jobfile = jobdata.jobfile
self.origin = jobdata.origin
self.callback = jobdata['callback']
self.jobfile = Path(jobdata['jobfile'])
self.origin = jobdata['origin']
self.logger.info('Initialized new job')
self.logger.info('Initialized')
def finish(self, textfile: Path):
self.success = True
self.resultfile = textfile
def fail(self, e: Exception):
self.success = False
self.exception = e
def get_result(self):
if self.success is None:
return Pending()
if self.success:
return Success(self.resultfile)
else:
return Failure(self.exception)

@ -8,40 +8,45 @@ from queue import Queue
from watchdog.events import FileSystemEventHandler
from watchdog.observers.polling import PollingObserver
from .job import Job
class QueueReader(FileSystemEventHandler):
def __init__(self, queuedir: str, processing_queue: Queue):
self.queuedir = queuedir
def __init__(self, processing_queue: Queue, queuedir: str):
self.queuedir = Path(queuedir)
self.queue = processing_queue
self.watcher = PollingObserver()
self.watcher.schedule(self, queuedir)
self.pattern = re.compile(r'.+\.json')
self.logger = logging.getLogger('play-daemon.QueueReader')
self.logger = logging.getLogger('whisper-daemon.QueueReader')
def start(self):
for f in sorted(Path(self.queuedir).iterdir(), key=os.path.getmtime):
self.logger.debug('Starting')
for f in sorted(self.queuedir.iterdir(), key=os.path.getmtime):
self._process(f)
return self.watcher.start()
def shutdown(self):
self.logger.debug('Stopping')
return self.watcher.stop()
def _process(self, path: Path):
self.logger.info('Evaluating %s', path)
self.logger.debug('Evaluating %s', path)
if not path.is_file():
self.logger.info('Ignoring, %s is not a regular file', path)
self.logger.debug('Ignoring, %s is not a regular file', path)
return
if not self.pattern.fullmatch(path.name):
self.logger.info(
self.logger.debug(
'Ignoring, %s does not match expected name format', path)
return
self.logger.info('Processing queue item: %s', path)
self.logger.debug('Processing queue item: %s', path)
jobid = path.stem
try:
job = Job(path)
job = Job(jobid, path)
self.queue.put(job)
self.logger.info('Queued job: %s', job.jobid)
except Error as e:
self.logger.error('Error when processing %s: %s', job.jobid, e)
self.logger.info('Queued job: %s', jobid)
except Exception as e:
self.logger.error('Error when processing %s: %s', jobid, e)
def on_created(self, event):
self._process(Path(event.src_path))

58
pipeline/queuethread.py Normal file

@ -0,0 +1,58 @@
import logging
from abc import ABCMeta, abstractmethod
from queue import Empty, Queue
from threading import Event, Thread
from time import sleep
class QueueThread(Thread, metaclass=ABCMeta):
"""
Base class for a thread governing a queue.
"""
def __init__(self, inqueue: Queue):
super().__init__()
self._kill = Event()
self.queue = inqueue
self.logger = logging.getLogger(
f'whisper-daemon.{self.__class__.__name__}')
def run(self):
self.logger.debug('Starting')
while not self._kill.is_set():
try:
item = self.queue.get_nowait()
try:
# This can be incredibly spammy, so using an even more
# verbose level than debug.
self.logger.log(5, 'Processing %s', item)
self._process(item)
except Exception as e:
self.logger.error(
'Unexpected error when processing item %s',
item,
exc_info=e)
except Empty:
sleep(1)
self.logger.debug('Stopped')
return
def shutdown(self):
self.logger.debug('Stopping')
self._kill.set()
return
def put(self, item):
"""
Add an item to be processed by this thread.
"""
self.queue.put(item)
return item
@abstractmethod
def _process(self, item):
"""
Process an item in the queue.
To be implemented by a more specific subclass.
"""
raise NotImplementedError('_process must be implemented '
'by QueueThread subclasses')

156
pipeline/whisperthread.py Normal file

@ -0,0 +1,156 @@
import logging
import os
import time
from collections.abc import MutableMapping
from multiprocessing import Process, Queue
from pathlib import Path
from queue import Empty
import torch
import whisper
import whisper.utils
from .exceptions import ConfigurationException
from .job import Job
from .queuethread import QueueThread
def _whisper_processor(inqueue: Queue,
outqueue: Queue,
device: str,
modelname: str,
modeldir: Path,
loglevel: int):
logger = logging.getLogger(
'whisper-daemon.WhisperThread.worker')
logger.setLevel(loglevel)
logger.debug('Starting on %s', device)
rundevice = 'cpu'
if device != 'cpu':
os.environ['CUDA_VISIBLE_DEVICES'] = device
rundevice = 'cuda'
whisperModel = whisper.load_model(modelname,
download_root=modeldir,
device=rundevice)
while True:
try:
job = inqueue.get_nowait()
except Empty:
time.sleep(1)
continue
inpath = job.jobfile
outpath = None # This needs to be determined based on requested format
logger.info('Starting whisper transcription job for %s.', inpath)
start = time.time()
try:
result = whisper.transcribe(whisperModel,
str(inpath),
language=None,
word_timestamps=True)
end = time.time()
if language is None:
out_language = result['language']
logger.info(
"Detected language '%s' in %s.", out_language, inpath)
else:
out_language = language
vttWriter = whisper.utils.WriteVTT(str(outpath.parent))
vttWriter.always_include_hours = True
with open(outpath, 'w') as f:
vttWriter.write_result(result, f, {'max_line_width': None,
'max_line_count': 3,
'max_words_per_line': 15,
'highlight_words': False})
elapsed = time.strftime('%H:%M:%S', time.gmtime(end - start))
logger.info('Finished whisper transcription job for %s in %s.',
inpath, elapsed)
except Exception as e:
outqueue.put((job, 'Failure', e))
else:
outqueue.put((job, 'Success'))
@QueueThread.register
class WhisperThread(QueueThread):
def __init__(self,
inqueue: Queue,
model: str,
modeldir: Path,
workerconfig: MutableMapping):
"""
Instantiate a WhisperThread, which will contain one or more workers.
The passed config must contain the following keys:
- model: The whisper model to be used
- modeldir: The directory where the model is stored
Worker count is governed by two configuration values: device and
count. Both are optional.
If device is not specified, the device will be determined by
GPU availability.
If only device is specified and the device is GPU, a handler is
created for each avilable GPU. If no GPUs are available, an exception
is raised.
If only device is provided and the device is CPU, a single handler
is created.
If count is provided, that number of handlers will be created. If the
device is GPU, and count is greater than the number of available GPUs,
an exception is raised.
"""
super().__init__(inqueue)
self.modeldir = modeldir
self.model = model
self.outqueue = Queue()
gpu_count = torch.cuda.device_count()
if 'device' in workerconfig:
device = workerconfig['device']
else:
if gpu_count > 0:
device = 'gpu'
else:
device = 'cpu'
if device == 'cpu':
worker_count = int(workerconfig.get('count', 1))
devices = [f'{device}' for i in range(worker_count)]
else:
if gpu_count == 0:
raise ConfigurationException(
'GPU requested but none available.')
worker_count = int(workerconfig.get('count', gpu_count))
if count > gpu_count:
raise ConfigurationException(
f'{worker_count} workers requested '
f'but only {gpu_count} GPUs available.')
devices = [f'cuda:{i}' for i in range(worker_count)]
self.workers = [self.spawn_worker(device) for device in devices]
def spawn_worker(self, device: str):
worker_process = Process(
target=_whisper_processor,
args=(self.queue,
self.outqueue,
device,
self.model,
self.modeldir,
self.logger.getEffectiveLevel()))
worker_process.daemon = True
worker_process.start()
return worker_process
def _process(self, item: Job):
self.logger.info('Processed %s', item.jobid)

26
whisper-daemon.py Executable file

@ -0,0 +1,26 @@
#!/usr/bin/env python3
import signal
import sys
from configparser import ConfigParser
from os import path
from pipeline import Pipeline
conffile = 'config.ini'
if not path.exists(conffile):
print(f"{conffile} missing, exiting")
sys.exit(1)
config = ConfigParser(empty_lines_in_values=False)
config.read(conffile)
pipeline = Pipeline(config)
pipeline.start()
try:
signal.pause()
except KeyboardInterrupt:
pipeline.stop()