import logging from abc import ABCMeta, abstractmethod from queue import Empty, Queue from threading import Event, Thread from time import sleep class QueueThread(Thread, metaclass=ABCMeta): """ Base class for a thread governing a queue. """ def __init__(self, inqueue: Queue): super().__init__() self._running = False self._kill = Event() self.queue = inqueue self.logger = logging.getLogger( f'whisper-daemon.{self.__class__.__name__}') def run(self): self.logger.debug('Starting') self._running = True while not self._kill.is_set(): try: item = self.queue.get_nowait() try: # This can be incredibly spammy, so using an even more # verbose level than debug. self.logger.log(5, 'Processing %s', item) self._process(item) except Exception as e: self.logger.error( 'Unexpected error when processing item %s', item, exc_info=e) except Empty: self._kill.wait(1) self._running = False self.logger.debug('Stopped') def shutdown(self, block=True): self.logger.debug('Stopping') self._kill.set() if block: while self._running == True: sleep(1) def put(self, item): """ Add an item to be processed by this thread. """ self.queue.put(item) @abstractmethod def _process(self, item): """ Process an item in the queue. To be implemented by a more specific subclass. """ raise NotImplementedError('_process must be implemented ' 'by QueueThread subclasses')