whisper-daemon/pipeline/notifierthread.py

42 lines
1.1 KiB
Python

import json
from dataclasses import asdict
from queue import Queue
import requests
from .job import Job
from .queuethread import QueueThread
@QueueThread.register
class NotifierThread(QueueThread):
def __init__(self, inqueue: Queue, retryqueue: Queue):
super().__init__(inqueue)
self.retryqueue = retryqueue
def _process(self, item: Job):
result = item.status
notification = json.dumps(asdict(result), default=str)
self.logger.info('%s finished, %s',
item.jobid, notification)
try:
notify(item.callback, notification)
except Exception as e:
self.logger.debug('Notification failed (%s) for %s',
str(e), item.jobid)
item.fail_notification(e)
self.retryqueue.put(item)
return
item.succeed_notification()
def notify(callback_url, notification):
headers = {'Content-Type': 'application/json'}
result = requests.post(callback_url,
headers=headers,
data=notification)
result.raise_for_status()