diff --git a/config.ini.example b/config.ini.example index 8ede91d..33dc734 100644 --- a/config.ini.example +++ b/config.ini.example @@ -47,17 +47,9 @@ url = https://example.com/notify token = A70keN -[Pool] -# The capacity to set for the worker pool. -# This is arbitrary and needs dialling in based on the capabilities -# of the hardware. The capacity factor is the maximum allowed sum of the sizes -# of all jobs running in parallel. -# -# Capacity must be large enough to accomodate at least one job of the largest -# size and one other job of the next smaller size. Otherwise no jobs will run. -# -# See also TranscodeHandler.jobsize and SubtitlesHandler.jobsize -capacity = 3 +[FFmpeg] +# The maximum number of ffmpeg jobs to run in parallel +workers = 4 [Daisy] @@ -103,14 +95,6 @@ baseimage = /path/to/template.png textcolor = white -[TranscodeHandler] -# See also Pool.capacity. -# The amount of resources a single transcode job will consume -# in the worker pool. This should be set so that the server can handle the -# pool getting completely filled with jobs of this type. -jobsize = 1 - - [MediasiteProcessor] # Credentials to use when downloading files from mediasite user = someuser diff --git a/pipeline/__init__.py b/pipeline/__init__.py index 75b87e6..702eb6b 100644 --- a/pipeline/__init__.py +++ b/pipeline/__init__.py @@ -1,5 +1,6 @@ import logging import logging.handlers +import multiprocessing as mp import os import shutil @@ -10,7 +11,6 @@ from .handlers import init_handlers from .notifier import Notifier from .preprocessors import init_preprocessors from .queuereader import QueueReader -from .workthread import WorkThread class Pipeline: @@ -63,7 +63,7 @@ class Pipeline: for item in os.scandir(self.config['Pipeline']['tempdir']): shutil.rmtree(item) - self.workthread = WorkThread(int(self.config['Pool']['capacity'])) + self.workerpool = mp.Pool(int(self.config['FFmpeg']['workers'])) self.notifier = Notifier(self.config['Pipeline']['queuedir'], self.config['Notifier']) @@ -71,7 +71,7 @@ class Pipeline: self.config['Pipeline']['packagedir'], self.config['Pipeline']['cachedir']) handler_queue_map, self.handlers = init_handlers(self.collector, - self.workthread, + self.workerpool, self.config) self.distributor = Distributor(self.collector, self.notifier, @@ -87,7 +87,6 @@ class Pipeline: self.notifier.start() self.collector.start() - self.workthread.start() for handler in self.handlers: handler.start() self.distributor.start() @@ -108,9 +107,10 @@ class Pipeline: self.distributor.shutdown() for handler in self.handlers: handler.shutdown() - self.workthread.shutdown() self.collector.shutdown() self.notifier.shutdown() + self.workerpool.close() + self.workerpool.terminate() self.logger.debug('Stopped') self.running = False diff --git a/pipeline/handlers/__init__.py b/pipeline/handlers/__init__.py index 09f11e1..3d6a735 100644 --- a/pipeline/handlers/__init__.py +++ b/pipeline/handlers/__init__.py @@ -23,7 +23,7 @@ allHandlers = [AudioHandler, VisibilityHandler, ] -def init_handlers(collector, worker, config): +def init_handlers(collector, pool, config): ldap = Ldap(config['Ldap']) handler_queue_map = {} handler_list = [] @@ -33,7 +33,7 @@ def init_handlers(collector, worker, config): handlerconf = get_section(config, handlerclass.__name__) handlers = handlerclass.instantiate(handlerqueue, collector, - worker, + pool, ldap, config['Pipeline']['tempdir'], handlerconf) diff --git a/pipeline/handlers/handler.py b/pipeline/handlers/handler.py index 63a4737..945ccda 100644 --- a/pipeline/handlers/handler.py +++ b/pipeline/handlers/handler.py @@ -16,13 +16,13 @@ class Handler(QueueThread, metaclass=ABCMeta): def __init__(self, handlerqueue, collector, - worker, + pool, ldap, tempdir, config): super().__init__(handlerqueue) self.collector = collector - self.workerthread = worker + self.workerpool = pool self.ldap = ldap self.tempfiles_parent = Path(tempdir) self.config = config @@ -73,13 +73,13 @@ class Handler(QueueThread, metaclass=ABCMeta): except Exception as e: self.collector.put(ErrorItem(queueitem['jobid'], self, e)) - def asyncjob(self, size, func, args=(), kwargs={}): + def asyncjob(self, func, args): """ - Queue up an async job in the worker thread. + Queue up an async job in the worker pool. - Returns a PendingJob object to be polled for completion. + Returns an AsyncResult object to be polled for completion. """ - return self.workerthread.submit(size, func, args, kwargs) + return self.workerpool.apply_async(func, args) @classmethod def apply_after(cls, handlerclass): diff --git a/pipeline/handlers/transcode.py b/pipeline/handlers/transcode.py index cde875c..c98999c 100644 --- a/pipeline/handlers/transcode.py +++ b/pipeline/handlers/transcode.py @@ -172,9 +172,7 @@ class TranscodeHandler(Handler): args = (inpath, outpath, int(maxheight)) - transcode = self.asyncjob(int(self.config['jobsize']), - _do_transcode, - args) + transcode = self.asyncjob(_do_transcode, args) transcodes.append(transcode) resultfiles[sourcename][maxheight] = outfile diff --git a/pipeline/workthread.py b/pipeline/workthread.py deleted file mode 100644 index 4efb351..0000000 --- a/pipeline/workthread.py +++ /dev/null @@ -1,193 +0,0 @@ -import multiprocessing as mp - -from collections import deque -from collections.abc import Iterable -from dataclasses import dataclass -from pprint import pformat -from threading import Event -from time import sleep -from typing import Callable - -from .queuethread import QueueThread - - -class PendingJob: - ''' - Basically a reimplementation of multiprocessing.AsyncResult - ''' - def __init__(self): - self._event = Event() - - def ready(self): - return self._event.is_set() - - def wait(self, timeout=None): - self._event.wait(timeout) - - def get(self, timeout=None): - self.wait(timeout) - if not self.ready(): - raise TimeoutError - if self._success: - return self._value - else: - raise self._value - - def _finish(self, obj): - self._value = obj - self._success = True - self._event.set() - - def _fail(self, obj): - self._value = obj - self._success = False - self._event.set() - - -@dataclass -class WorkUnit: - size: int - job: PendingJob - func: Callable - args: Iterable - kwargs: dict - - -class WorkThread(QueueThread): - def __init__(self, capacity): - super().__init__() - self.queue = {1: deque()} - self.pool = mp.Pool(capacity) - self.capacity = capacity - self.running = [] - - def run(self): - self.logger.debug('Starting') - while not self._kill.is_set(): - # Is there even anything to do? - if self._queue_length() == 0: - # Let's not loop too often - sleep(1) - continue - - # First, we update the list of running jobs. - still_running = [pending for pending in self.running - if not pending.job.ready()] - self.running = still_running - self.logger.debug('Currently running jobs in the pool: ' - + pformat(still_running)) - - # Next, we tally free capacity. - free = self.capacity - sum([pending.size - for pending in still_running]) - - # Get the biggest job that has been waiting the longest. - # We check against `self.capacity` instead of `free` in order to - # be able to reserve space. - candidate = self._peek(self.capacity) - - # Get the size of the smallest job in the queue - # If the queue only has one item, this check is short-circuited so - # we don't reserve capacity for nothing. - smallest_queued_size = 0 - if self._queue_length() > 1: - smallest_queued_size = self._smallest_in_queue() - self.logger.debug( - f'{free=}, {smallest_queued_size=}, {candidate.size=}') - - # We check against the free space minus the smallest item in order - # to make sure the queue doesn't get completely filled with - # only big jobs. - if free - smallest_queued_size >= candidate.size: - # There is plenty of space in the queue, just submit. - self.logger.debug( - f'Processing job with size {candidate.size}') - self._process(self._pop(free)) - else: - self.logger.debug( - f'Insufficient capacity for size {candidate.size}') - # There isn't enough free capacity for the biggest job. - # We now need to find a suitable job to process without - # risking locking out the biggest job indefinitely. - - # How much capacity to reserve so the next big job can run. - # We reserve at least the amount of capacity matching the - # smallest queued job. - largest_running_size = 0 - if len(still_running) != 0: - largest_running_size = max([job.size - for job in still_running]) - reserved_capacity = max(smallest_queued_size, - candidate.size - largest_running_size) - self.logger.debug( - f'{largest_running_size=}, {reserved_capacity=}') - - # Get a new candidate given the reserved capacity - candidate = self._pop(free - reserved_capacity) - - if candidate: - self.logger.debug( - f'Processing job with size {candidate.size}') - # There was a valid canidate job, submit it. - self._process(candidate) - else: - # There is no suitable queue item, wait for a bit. - sleep(1) - - self.logger.debug('Stopped') - return - - def shutdown(self): - super().shutdown() - self.pool.close() - self.pool.terminate() - - def submit(self, size, func, args=(), kwargs={}): - job = PendingJob() - self.put(WorkUnit(size, job, func, args, kwargs)) - return job - - def put(self, item): - """ - Add an item to be processed by this thread. - """ - if item.size not in self.queue: - self.queue[item.size] = deque() - self.queue[item.size].append(item) - return self.__class__.__name__ - - def _queue_length(self): - return sum([len(subqueue) for subqueue in self.queue.values()]) - - def _smallest_in_queue(self): - sizes = [size for size, subqueue in self.queue.items() - if len(subqueue) > 0] - if len(sizes) > 0: - return min(sizes) - return 0 - - def _peek(self, maxsize): - sizes = [size for size, subqueue in self.queue.items() - if size <= maxsize - and len(subqueue) > 0] - if len(sizes) == 0: - return None - return self.queue[max(sizes)][-1] - - def _pop(self, maxsize): - sizes = [size for size, subqueue in self.queue.items() - if size <= maxsize - and len(subqueue) > 0] - if len(sizes) == 0: - return None - return self.queue[max(sizes)].pop() - - def _process(self, item): - self.logger.debug('Processing %s', item) - def success(out): - item.job._finish(out) - def failure(out): - item.job._fail(out) - self.pool.apply_async(item.func, item.args, item.kwargs, - callback=success, error_callback=failure) - self.running.append(item) diff --git a/test.py b/test.py index ff95249..44b7466 100755 --- a/test.py +++ b/test.py @@ -14,7 +14,6 @@ import uuid from pipeline import Pipeline from pipeline.package import PackageManager from pipeline.utils import raise_for_structure, canonical_manifest -from pipeline.workthread import WorkThread filesdir = 'testfiles' @@ -27,41 +26,6 @@ videoglob = path.join(filesdir, videosubdir, '*') imageglob = path.join(filesdir, imagesubdir, '*') -def now(): - return strftime('%H:%M:%S') - - -def dummyjob(name, wait): - print(f'{now()}: {name} will now wait for {wait} seconds.\n') - sleep(wait) - print(f'{now()}: {name} is done waiting.') - return name - - -#@unittest.skip -class WorkThreadTest(unittest.TestCase): - def setUp(self): - self.worker = WorkThread(20) - self.worker.start() - - def tearDown(self): - self.worker.shutdown() - - def test_workthread_queue(self): - jobs = [] - print(f'{now()}: Submitting jobs.') - for i in range(9): - jobs.append(self.worker.submit(5, dummyjob, - (f'bigjob{i}', 10))) - for i in range(15): - jobs.append(self.worker.submit(1, dummyjob, - (f'smalljob{i}', 6))) - print(f'{now()}: Done submitting, now waiting.') - sleep(40) - for job in jobs: - self.assertTrue(job.ready()) - - class DaemonTest(unittest.TestCase): def setUp(self): config = ConfigParser(empty_lines_in_values=False)