Stefan Nenzén 30361abdd9 VisibilityHandler (#3)
Create a VisibilityHandler

Co-authored-by: nenzen <stefan@nenzen.com>
Reviewed-on: #3
Reviewed-by: erth9960 <thuning@dsv.su.se>
Co-authored-by: Stefan Nenzén <nenzen@dsv.su.se>
Co-committed-by: Stefan Nenzén <nenzen@dsv.su.se>
2024-03-27 14:54:33 +01:00

137 lines
3.9 KiB
Python

import json
from dataclasses import asdict, dataclass
from pprint import pformat
from os import path, remove
import requests
from .queuethread import QueueThread
'''
Note classes are used by other classes submitting information
to the notifier.
Notification classes are used internally in the notifier class to format the
actual messages sent out.
'''
@dataclass
class DoneNote:
jobid: str
url: str
origin: str
package: dict
pending: list
@dataclass
class DoneNotification:
jobid: str
origin: str
package: dict
pending: list
type: str = 'success'
@dataclass
class ErrorNote:
jobid: str
url: str
origin: str
exception: Exception
@dataclass
class ErrorNotification:
jobid: str
origin: str
message: str
type: str = 'error'
class Notifier(QueueThread):
"""
This class handles notifications to a third party regarding job
processing status.
"""
def __init__(self, queuedir, config):
super().__init__()
self.queuedir = queuedir
self.notify_url = config['url']
self.actions = {DoneNote: self._process_done,
ErrorNote: self._process_error}
self.token = config.get('token', None)
def _process(self, item):
if type(item) not in self.actions:
raise Exception(f"Invalid action {type(item)}")
self.actions[type(item)](item)
def _process_error(self, item):
self.logger.error("%s - error in %s",
item.jobid,
item.origin,
exc_info=item.exception)
notification = ErrorNotification(item.jobid,
item.origin,
str(item.exception))
self._notify(notification, item.url)
# Avoid removing failed jobs' queue files for debuggability
#remove(path.join(self.queuedir, item.jobid))
def _process_done(self, item):
notification = DoneNotification(item.jobid,
item.origin,
item.package,
item.pending)
self._notify(notification, item.url)
self.logger.info("%s - Notification sent:\n%s",
item.jobid, pformat(notification))
if not item.pending:
remove(path.join(self.queuedir, item.jobid))
def _notify(self, item, url):
if url is None:
self._notify_fallback(item)
elif url.startswith(('http://', 'https://')):
self.logger.debug("Notifying to HTTP endpoint %s",
url)
self._notify_http(item, url)
else:
self.logger.debug("Notifying to file %s", url)
self._notify_file(item, url)
def _notify_fallback(self, item):
if self.notify_url.startswith(('http://', 'https://')):
self.logger.debug("Notifying to default HTTP endpoint")
headers = {}
if self.token is not None:
headers['Authorization'] = f"Bearer {self.token}"
self._notify_http(item, self.notify_url, headers)
else:
self.logger.debug("Notifying to file")
self._notify_file(item, self.notify_url)
def _notify_http(self, item, url, headers={}):
"""
Send a notification to a HTTP endpoint
"""
headers['Content-Type'] = 'application/json'
requests.post(url,
headers=headers,
data=json.dumps(asdict(item)))
def _notify_file(self, item, notification_dir):
"""
Save a notification to file
"""
if notification_dir is None:
notification_dir = self.notify_url
resultfile = f"{item.jobid}.{item.origin}"
with open(path.join(notification_dir, resultfile), 'w') as f:
json.dump(asdict(item), f)