2022-09-28 22:30:38 +02:00

44 lines
1.3 KiB
Python

import json
from os import path, remove
from .queuethread import QueueThread
from .package import NotificationEncoder
'''
This queue handles notifications to a third party regarding queue
processing status.
'''
class Notifier(QueueThread):
def __init__(self, config):
super().__init__()
self.queuedir = config['queuedir']
self.resultdir = config['resultdir']
def put_done(self, handler, jobid, package, pending):
self.queue.put({'type': 'done',
'handler': handler,
'jobid': jobid,
'package': package,
'pending': pending})
def put_error(self, job, handler, exception):
self.queue.put({'type': 'error',
'job': job,
'handler': handler,
'message': exception.message})
'''
Process notification queue
'''
def _handle(self, item):
self._file_notify(item)
'''
Write notification to file
'''
def _file_notify(self, item):
resultfile = f"{item['jobid']}.{item['handler']}"
with open(path.join(self.resultdir, resultfile), 'w') as f:
json.dump(item, f, cls=NotificationEncoder)