import json from dataclasses import asdict from queue import Queue import requests from .job import Job from .queuethread import QueueThread @QueueThread.register class NotifierThread(QueueThread): def __init__(self, inqueue: Queue, retryqueue: Queue): super().__init__(inqueue) self.retryqueue = retryqueue def _process(self, item: Job): result = item.status notification = json.dumps(asdict(result), default=str) self.logger.info('%s finished, %s', item.jobid, notification) try: notify(item.callback, notification) except Exception as e: self.logger.debug('Notification failed (%s) for %s', str(e), item.jobid) item.fail_notification(e) self.retryqueue.put(item) return item.succeed_notification() def notify(callback_url, notification): headers = {'Content-Type': 'application/json'} result = requests.post(callback_url, headers=headers, data=notification) result.raise_for_status()