play-daemon-threaded/pipeline/queuethread.py
2023-11-30 09:53:09 +01:00

60 lines
1.8 KiB
Python

import logging
from abc import ABCMeta, abstractmethod
from queue import Empty, Queue
from threading import Event, Thread
from time import sleep
class QueueThread(Thread, metaclass=ABCMeta):
"""
Base class for a thread governing a queue.
"""
def __init__(self, inqueue=None):
super().__init__()
self._kill = Event()
self.queue = inqueue if inqueue is not None else Queue()
self.logger = logging.getLogger(
f'play-daemon.{self.__class__.__name__}')
def run(self):
self.logger.debug('Starting')
while not self._kill.is_set():
try:
item = self.queue.get_nowait()
try:
# This can be incredibly spammy, so using an even more
# verbose level than debug.
self.logger.log(5, 'Processing %s', item)
self._process(item)
except Exception as e:
self.logger.error(
'Unexpected error in %s when processing item %s',
self.__class__.__name__,
item,
exc_info=e)
except Empty:
sleep(1)
self.logger.debug('Stopped')
return
def shutdown(self):
self.logger.debug('Stopping')
self._kill.set()
return
def put(self, item):
"""
Add an item to be processed by this thread.
"""
self.queue.put(item)
return self.__class__.__name__
@abstractmethod
def _process(self, item):
"""
Process an item in the queue.
To be implemented by a more specific subclass.
"""
raise NotImplementedError('_process must be implemented '
'by QueueThread subclasses')