Implementing a unified job pool for transcodes and subtitles never panned out,

so the code was just adding unnecessary complexity. The pipeline now uses
mp.pool to manage ffmpeg jobs as before.

This reverts commit f91109fb3e4795ce652b966ba0563ba4e100b3d1 and deletes the
WorkThread class and its associated tests.
This commit is contained in:
Erik Thuning 2024-10-17 11:30:35 +02:00
parent 3da736422d
commit cbf29c4962
7 changed files with 17 additions and 264 deletions

@ -47,17 +47,9 @@ url = https://example.com/notify
token = A70keN
[Pool]
# The capacity to set for the worker pool.
# This is arbitrary and needs dialling in based on the capabilities
# of the hardware. The capacity factor is the maximum allowed sum of the sizes
# of all jobs running in parallel.
#
# Capacity must be large enough to accomodate at least one job of the largest
# size and one other job of the next smaller size. Otherwise no jobs will run.
#
# See also TranscodeHandler.jobsize and SubtitlesHandler.jobsize
capacity = 3
[FFmpeg]
# The maximum number of ffmpeg jobs to run in parallel
workers = 4
[Daisy]
@ -103,14 +95,6 @@ baseimage = /path/to/template.png
textcolor = white
[TranscodeHandler]
# See also Pool.capacity.
# The amount of resources a single transcode job will consume
# in the worker pool. This should be set so that the server can handle the
# pool getting completely filled with jobs of this type.
jobsize = 1
[MediasiteProcessor]
# Credentials to use when downloading files from mediasite
user = someuser

@ -1,5 +1,6 @@
import logging
import logging.handlers
import multiprocessing as mp
import os
import shutil
@ -10,7 +11,6 @@ from .handlers import init_handlers
from .notifier import Notifier
from .preprocessors import init_preprocessors
from .queuereader import QueueReader
from .workthread import WorkThread
class Pipeline:
@ -63,7 +63,7 @@ class Pipeline:
for item in os.scandir(self.config['Pipeline']['tempdir']):
shutil.rmtree(item)
self.workthread = WorkThread(int(self.config['Pool']['capacity']))
self.workerpool = mp.Pool(int(self.config['FFmpeg']['workers']))
self.notifier = Notifier(self.config['Pipeline']['queuedir'],
self.config['Notifier'])
@ -71,7 +71,7 @@ class Pipeline:
self.config['Pipeline']['packagedir'],
self.config['Pipeline']['cachedir'])
handler_queue_map, self.handlers = init_handlers(self.collector,
self.workthread,
self.workerpool,
self.config)
self.distributor = Distributor(self.collector,
self.notifier,
@ -87,7 +87,6 @@ class Pipeline:
self.notifier.start()
self.collector.start()
self.workthread.start()
for handler in self.handlers:
handler.start()
self.distributor.start()
@ -108,9 +107,10 @@ class Pipeline:
self.distributor.shutdown()
for handler in self.handlers:
handler.shutdown()
self.workthread.shutdown()
self.collector.shutdown()
self.notifier.shutdown()
self.workerpool.close()
self.workerpool.terminate()
self.logger.debug('Stopped')
self.running = False

@ -23,7 +23,7 @@ allHandlers = [AudioHandler,
VisibilityHandler,
]
def init_handlers(collector, worker, config):
def init_handlers(collector, pool, config):
ldap = Ldap(config['Ldap'])
handler_queue_map = {}
handler_list = []
@ -33,7 +33,7 @@ def init_handlers(collector, worker, config):
handlerconf = get_section(config, handlerclass.__name__)
handlers = handlerclass.instantiate(handlerqueue,
collector,
worker,
pool,
ldap,
config['Pipeline']['tempdir'],
handlerconf)

@ -16,13 +16,13 @@ class Handler(QueueThread, metaclass=ABCMeta):
def __init__(self,
handlerqueue,
collector,
worker,
pool,
ldap,
tempdir,
config):
super().__init__(handlerqueue)
self.collector = collector
self.workerthread = worker
self.workerpool = pool
self.ldap = ldap
self.tempfiles_parent = Path(tempdir)
self.config = config
@ -73,13 +73,13 @@ class Handler(QueueThread, metaclass=ABCMeta):
except Exception as e:
self.collector.put(ErrorItem(queueitem['jobid'], self, e))
def asyncjob(self, size, func, args=(), kwargs={}):
def asyncjob(self, func, args):
"""
Queue up an async job in the worker thread.
Queue up an async job in the worker pool.
Returns a PendingJob object to be polled for completion.
Returns an AsyncResult object to be polled for completion.
"""
return self.workerthread.submit(size, func, args, kwargs)
return self.workerpool.apply_async(func, args)
@classmethod
def apply_after(cls, handlerclass):

@ -172,9 +172,7 @@ class TranscodeHandler(Handler):
args = (inpath,
outpath,
int(maxheight))
transcode = self.asyncjob(int(self.config['jobsize']),
_do_transcode,
args)
transcode = self.asyncjob(_do_transcode, args)
transcodes.append(transcode)
resultfiles[sourcename][maxheight] = outfile

@ -1,193 +0,0 @@
import multiprocessing as mp
from collections import deque
from collections.abc import Iterable
from dataclasses import dataclass
from pprint import pformat
from threading import Event
from time import sleep
from typing import Callable
from .queuethread import QueueThread
class PendingJob:
'''
Basically a reimplementation of multiprocessing.AsyncResult
'''
def __init__(self):
self._event = Event()
def ready(self):
return self._event.is_set()
def wait(self, timeout=None):
self._event.wait(timeout)
def get(self, timeout=None):
self.wait(timeout)
if not self.ready():
raise TimeoutError
if self._success:
return self._value
else:
raise self._value
def _finish(self, obj):
self._value = obj
self._success = True
self._event.set()
def _fail(self, obj):
self._value = obj
self._success = False
self._event.set()
@dataclass
class WorkUnit:
size: int
job: PendingJob
func: Callable
args: Iterable
kwargs: dict
class WorkThread(QueueThread):
def __init__(self, capacity):
super().__init__()
self.queue = {1: deque()}
self.pool = mp.Pool(capacity)
self.capacity = capacity
self.running = []
def run(self):
self.logger.debug('Starting')
while not self._kill.is_set():
# Is there even anything to do?
if self._queue_length() == 0:
# Let's not loop too often
sleep(1)
continue
# First, we update the list of running jobs.
still_running = [pending for pending in self.running
if not pending.job.ready()]
self.running = still_running
self.logger.debug('Currently running jobs in the pool: '
+ pformat(still_running))
# Next, we tally free capacity.
free = self.capacity - sum([pending.size
for pending in still_running])
# Get the biggest job that has been waiting the longest.
# We check against `self.capacity` instead of `free` in order to
# be able to reserve space.
candidate = self._peek(self.capacity)
# Get the size of the smallest job in the queue
# If the queue only has one item, this check is short-circuited so
# we don't reserve capacity for nothing.
smallest_queued_size = 0
if self._queue_length() > 1:
smallest_queued_size = self._smallest_in_queue()
self.logger.debug(
f'{free=}, {smallest_queued_size=}, {candidate.size=}')
# We check against the free space minus the smallest item in order
# to make sure the queue doesn't get completely filled with
# only big jobs.
if free - smallest_queued_size >= candidate.size:
# There is plenty of space in the queue, just submit.
self.logger.debug(
f'Processing job with size {candidate.size}')
self._process(self._pop(free))
else:
self.logger.debug(
f'Insufficient capacity for size {candidate.size}')
# There isn't enough free capacity for the biggest job.
# We now need to find a suitable job to process without
# risking locking out the biggest job indefinitely.
# How much capacity to reserve so the next big job can run.
# We reserve at least the amount of capacity matching the
# smallest queued job.
largest_running_size = 0
if len(still_running) != 0:
largest_running_size = max([job.size
for job in still_running])
reserved_capacity = max(smallest_queued_size,
candidate.size - largest_running_size)
self.logger.debug(
f'{largest_running_size=}, {reserved_capacity=}')
# Get a new candidate given the reserved capacity
candidate = self._pop(free - reserved_capacity)
if candidate:
self.logger.debug(
f'Processing job with size {candidate.size}')
# There was a valid canidate job, submit it.
self._process(candidate)
else:
# There is no suitable queue item, wait for a bit.
sleep(1)
self.logger.debug('Stopped')
return
def shutdown(self):
super().shutdown()
self.pool.close()
self.pool.terminate()
def submit(self, size, func, args=(), kwargs={}):
job = PendingJob()
self.put(WorkUnit(size, job, func, args, kwargs))
return job
def put(self, item):
"""
Add an item to be processed by this thread.
"""
if item.size not in self.queue:
self.queue[item.size] = deque()
self.queue[item.size].append(item)
return self.__class__.__name__
def _queue_length(self):
return sum([len(subqueue) for subqueue in self.queue.values()])
def _smallest_in_queue(self):
sizes = [size for size, subqueue in self.queue.items()
if len(subqueue) > 0]
if len(sizes) > 0:
return min(sizes)
return 0
def _peek(self, maxsize):
sizes = [size for size, subqueue in self.queue.items()
if size <= maxsize
and len(subqueue) > 0]
if len(sizes) == 0:
return None
return self.queue[max(sizes)][-1]
def _pop(self, maxsize):
sizes = [size for size, subqueue in self.queue.items()
if size <= maxsize
and len(subqueue) > 0]
if len(sizes) == 0:
return None
return self.queue[max(sizes)].pop()
def _process(self, item):
self.logger.debug('Processing %s', item)
def success(out):
item.job._finish(out)
def failure(out):
item.job._fail(out)
self.pool.apply_async(item.func, item.args, item.kwargs,
callback=success, error_callback=failure)
self.running.append(item)

36
test.py

@ -14,7 +14,6 @@ import uuid
from pipeline import Pipeline
from pipeline.package import PackageManager
from pipeline.utils import raise_for_structure, canonical_manifest
from pipeline.workthread import WorkThread
filesdir = 'testfiles'
@ -27,41 +26,6 @@ videoglob = path.join(filesdir, videosubdir, '*')
imageglob = path.join(filesdir, imagesubdir, '*')
def now():
return strftime('%H:%M:%S')
def dummyjob(name, wait):
print(f'{now()}: {name} will now wait for {wait} seconds.\n')
sleep(wait)
print(f'{now()}: {name} is done waiting.')
return name
#@unittest.skip
class WorkThreadTest(unittest.TestCase):
def setUp(self):
self.worker = WorkThread(20)
self.worker.start()
def tearDown(self):
self.worker.shutdown()
def test_workthread_queue(self):
jobs = []
print(f'{now()}: Submitting jobs.')
for i in range(9):
jobs.append(self.worker.submit(5, dummyjob,
(f'bigjob{i}', 10)))
for i in range(15):
jobs.append(self.worker.submit(1, dummyjob,
(f'smalljob{i}', 6)))
print(f'{now()}: Done submitting, now waiting.')
sleep(40)
for job in jobs:
self.assertTrue(job.ready())
class DaemonTest(unittest.TestCase):
def setUp(self):
config = ConfigParser(empty_lines_in_values=False)