Erik Thuning 398f59e66b Fixes
2022-10-17 13:56:54 +02:00

58 lines
1.6 KiB
Python

import json
from os import path, remove
from .queuethread import QueueThread
from .package import NotificationEncoder
'''
This class handles notifications to a third party regarding job
processing status.
'''
class Notifier(QueueThread):
def __init__(self, queuedir, notify_url):
super().__init__()
self.queuedir = queuedir
self.notify_url = notify_url
# Select notification function based on the destination
if self.notify_url.startswith(('http://', 'https://')):
self._notify = self._notify_http
else:
self._notify = self._notify_file
def put_done(self, handler, jobid, package, pending):
self.queue.put({'type': 'success',
'jobid': jobid,
'origin': handler,
'package': package,
'pending': pending})
def put_error(self, jobid, originator, exception):
self.queue.put({'type': 'error',
'jobid': jobid,
'origin': originator,
'message': str(exception)})
'''
Process notification queue
'''
def _handle(self, item):
self._notify(item)
'''
Send a notification to a HTTP endpoint
'''
def _notify_http(self, item):
#TODO: Implement http notification
pass
'''
Save a notification to file
'''
def _notify_file(self, item):
resultfile = f"{item['jobid']}.{item['origin']}"
with open(path.join(self.notify_url, resultfile), 'w') as f:
json.dump(item, f, cls=NotificationEncoder)