Stefan Nenzén
30361abdd9
Create a VisibilityHandler Co-authored-by: nenzen <stefan@nenzen.com> Reviewed-on: #3 Reviewed-by: erth9960 <thuning@dsv.su.se> Co-authored-by: Stefan Nenzén <nenzen@dsv.su.se> Co-committed-by: Stefan Nenzén <nenzen@dsv.su.se>
137 lines
3.9 KiB
Python
137 lines
3.9 KiB
Python
import json
|
|
|
|
from dataclasses import asdict, dataclass
|
|
from pprint import pformat
|
|
from os import path, remove
|
|
|
|
import requests
|
|
|
|
from .queuethread import QueueThread
|
|
|
|
|
|
'''
|
|
Note classes are used by other classes submitting information
|
|
to the notifier.
|
|
|
|
Notification classes are used internally in the notifier class to format the
|
|
actual messages sent out.
|
|
'''
|
|
|
|
@dataclass
|
|
class DoneNote:
|
|
jobid: str
|
|
url: str
|
|
origin: str
|
|
package: dict
|
|
pending: list
|
|
|
|
|
|
@dataclass
|
|
class DoneNotification:
|
|
jobid: str
|
|
origin: str
|
|
package: dict
|
|
pending: list
|
|
type: str = 'success'
|
|
|
|
|
|
@dataclass
|
|
class ErrorNote:
|
|
jobid: str
|
|
url: str
|
|
origin: str
|
|
exception: Exception
|
|
|
|
|
|
@dataclass
|
|
class ErrorNotification:
|
|
jobid: str
|
|
origin: str
|
|
message: str
|
|
type: str = 'error'
|
|
|
|
|
|
class Notifier(QueueThread):
|
|
"""
|
|
This class handles notifications to a third party regarding job
|
|
processing status.
|
|
"""
|
|
def __init__(self, queuedir, config):
|
|
super().__init__()
|
|
self.queuedir = queuedir
|
|
self.notify_url = config['url']
|
|
self.actions = {DoneNote: self._process_done,
|
|
ErrorNote: self._process_error}
|
|
|
|
self.token = config.get('token', None)
|
|
|
|
def _process(self, item):
|
|
if type(item) not in self.actions:
|
|
raise Exception(f"Invalid action {type(item)}")
|
|
self.actions[type(item)](item)
|
|
|
|
def _process_error(self, item):
|
|
self.logger.error("%s - error in %s",
|
|
item.jobid,
|
|
item.origin,
|
|
exc_info=item.exception)
|
|
notification = ErrorNotification(item.jobid,
|
|
item.origin,
|
|
str(item.exception))
|
|
self._notify(notification, item.url)
|
|
# Avoid removing failed jobs' queue files for debuggability
|
|
#remove(path.join(self.queuedir, item.jobid))
|
|
|
|
def _process_done(self, item):
|
|
notification = DoneNotification(item.jobid,
|
|
item.origin,
|
|
item.package,
|
|
item.pending)
|
|
self._notify(notification, item.url)
|
|
self.logger.info("%s - Notification sent:\n%s",
|
|
item.jobid, pformat(notification))
|
|
if not item.pending:
|
|
remove(path.join(self.queuedir, item.jobid))
|
|
|
|
def _notify(self, item, url):
|
|
if url is None:
|
|
self._notify_fallback(item)
|
|
elif url.startswith(('http://', 'https://')):
|
|
self.logger.debug("Notifying to HTTP endpoint %s",
|
|
url)
|
|
self._notify_http(item, url)
|
|
else:
|
|
self.logger.debug("Notifying to file %s", url)
|
|
self._notify_file(item, url)
|
|
|
|
def _notify_fallback(self, item):
|
|
if self.notify_url.startswith(('http://', 'https://')):
|
|
self.logger.debug("Notifying to default HTTP endpoint")
|
|
headers = {}
|
|
if self.token is not None:
|
|
headers['Authorization'] = f"Bearer {self.token}"
|
|
self._notify_http(item, self.notify_url, headers)
|
|
else:
|
|
self.logger.debug("Notifying to file")
|
|
self._notify_file(item, self.notify_url)
|
|
|
|
def _notify_http(self, item, url, headers={}):
|
|
"""
|
|
Send a notification to a HTTP endpoint
|
|
"""
|
|
headers['Content-Type'] = 'application/json'
|
|
requests.post(url,
|
|
headers=headers,
|
|
data=json.dumps(asdict(item)))
|
|
|
|
def _notify_file(self, item, notification_dir):
|
|
"""
|
|
Save a notification to file
|
|
"""
|
|
if notification_dir is None:
|
|
notification_dir = self.notify_url
|
|
|
|
resultfile = f"{item.jobid}.{item.origin}"
|
|
with open(path.join(notification_dir, resultfile), 'w') as f:
|
|
json.dump(asdict(item), f)
|