60 lines
1.8 KiB
Python
60 lines
1.8 KiB
Python
import logging
|
|
|
|
from abc import ABCMeta, abstractmethod
|
|
from queue import Empty, Queue
|
|
from threading import Event, Thread
|
|
from time import sleep
|
|
|
|
class QueueThread(Thread, metaclass=ABCMeta):
|
|
"""
|
|
Base class for a thread governing a queue.
|
|
"""
|
|
def __init__(self, inqueue=None):
|
|
super().__init__()
|
|
self._kill = Event()
|
|
self.queue = inqueue if inqueue is not None else Queue()
|
|
self.logger = logging.getLogger(
|
|
f'play-daemon.{self.__class__.__name__}')
|
|
|
|
def run(self):
|
|
self.logger.debug('Starting')
|
|
while not self._kill.is_set():
|
|
try:
|
|
item = self.queue.get_nowait()
|
|
try:
|
|
# This can be incredibly spammy, so using an even more
|
|
# verbose level than debug.
|
|
self.logger.log(5, 'Processing %s', item)
|
|
self._process(item)
|
|
except Exception as e:
|
|
self.logger.error(
|
|
'Unexpected error in %s when processing item %s',
|
|
self.__class__.__name__,
|
|
item,
|
|
exc_info=e)
|
|
except Empty:
|
|
sleep(1)
|
|
self.logger.debug('Stopped')
|
|
return
|
|
|
|
def shutdown(self):
|
|
self.logger.debug('Stopping')
|
|
self._kill.set()
|
|
return
|
|
|
|
def put(self, item):
|
|
"""
|
|
Add an item to be processed by this thread.
|
|
"""
|
|
self.queue.put(item)
|
|
return self.__class__.__name__
|
|
|
|
@abstractmethod
|
|
def _process(self, item):
|
|
"""
|
|
Process an item in the queue.
|
|
To be implemented by a more specific subclass.
|
|
"""
|
|
raise NotImplementedError('_process must be implemented '
|
|
'by QueueThread subclasses')
|