import logging from abc import ABCMeta, abstractmethod from queue import Empty, Queue from threading import Event, Thread from time import sleep class QueueThread(Thread, metaclass=ABCMeta): """ Base class for a thread governing a queue. """ def __init__(self, inqueue=None): super().__init__() self._kill = Event() self.queue = inqueue if inqueue is not None else Queue() self.logger = logging.getLogger( f'play-daemon.{self.__class__.__name__}') def run(self): self.logger.debug('Starting') while not self._kill.is_set(): try: item = self.queue.get_nowait() try: # This can be incredibly spammy, so using an even more # verbose level than debug. self.logger.log(5, 'Processing %s', item) self._process(item) except Exception as e: self.logger.error( 'Unexpected error in %s when processing item %s', self.__class__.__name__, item, exc_info=e) except Empty: sleep(1) self.logger.debug('Stopped') return def shutdown(self): self.logger.debug('Stopping') self._kill.set() return def put(self, item): """ Add an item to be processed by this thread. """ self.queue.put(item) return self.__class__.__name__ @abstractmethod def _process(self, item): """ Process an item in the queue. To be implemented by a more specific subclass. """ raise NotImplementedError('_process must be implemented ' 'by QueueThread subclasses')