42 lines
1.1 KiB
Python
42 lines
1.1 KiB
Python
import json
|
|
|
|
from dataclasses import asdict
|
|
from queue import Queue
|
|
|
|
import requests
|
|
|
|
from .job import Job
|
|
from .queuethread import QueueThread
|
|
|
|
|
|
@QueueThread.register
|
|
class NotifierThread(QueueThread):
|
|
def __init__(self, inqueue: Queue, retryqueue: Queue):
|
|
super().__init__(inqueue)
|
|
self.retryqueue = retryqueue
|
|
|
|
def _process(self, item: Job):
|
|
result = item.status
|
|
notification = json.dumps(asdict(result), default=str)
|
|
|
|
self.logger.info('%s finished, %s',
|
|
item.jobid, notification)
|
|
|
|
try:
|
|
notify(item.callback, notification)
|
|
except Exception as e:
|
|
self.logger.debug('Notification failed (%s) for %s',
|
|
str(e), item.jobid)
|
|
item.fail_notification(e)
|
|
self.retryqueue.put(item)
|
|
return
|
|
item.succeed_notification()
|
|
|
|
|
|
def notify(callback_url, notification):
|
|
headers = {'Content-Type': 'application/json'}
|
|
result = requests.post(callback_url,
|
|
headers=headers,
|
|
data=notification)
|
|
result.raise_for_status()
|