import json from dataclasses import asdict, dataclass from pprint import pformat from os import path, remove import requests from .queuethread import QueueThread ''' Note classes are used by other classes submitting information to the notifier. Notification classes are used internally in the notifier class to format the actual messages sent out. ''' @dataclass class DoneNote: jobid: str url: str origin: str package: dict pending: list @dataclass class DoneNotification: jobid: str origin: str package: dict pending: list type: str = 'success' @dataclass class ErrorNote: jobid: str url: str origin: str exception: Exception @dataclass class ErrorNotification: jobid: str origin: str message: str type: str = 'error' class Notifier(QueueThread): """ This class handles notifications to a third party regarding job processing status. """ def __init__(self, queuedir, config): super().__init__() self.queuedir = queuedir self.notify_url = config['url'] self.actions = {DoneNote: self._process_done, ErrorNote: self._process_error} self.token = config.get('token', None) def _process(self, item): if type(item) not in self.actions: raise Exception(f"Invalid action {type(item)}") self.actions[type(item)](item) def _process_error(self, item): self.logger.error("%s - error in %s", item.jobid, item.origin, exc_info=item.exception) notification = ErrorNotification(item.jobid, item.origin, str(item.exception)) self._notify(notification, item.url) # Avoid removing failed jobs' queue files for debuggability #remove(path.join(self.queuedir, item.jobid)) def _process_done(self, item): notification = DoneNotification(item.jobid, item.origin, item.package, item.pending) self._notify(notification, item.url) self.logger.info("%s - Notification sent:\n%s", item.jobid, pformat(notification)) if not item.pending: remove(path.join(self.queuedir, item.jobid)) def _notify(self, item, url): if url is None: self._notify_fallback(item) elif url.startswith(('http://', 'https://')): self.logger.debug("Notifying to HTTP endpoint %s", url) self._notify_http(item, url) else: self.logger.debug("Notifying to file %s", url) self._notify_file(item, url) def _notify_fallback(self, item): if self.notify_url.startswith(('http://', 'https://')): self.logger.debug("Notifying to default HTTP endpoint") headers = {} if self.token is not None: headers['Authorization'] = f"Bearer {self.token}" self._notify_http(item, self.notify_url, headers) else: self.logger.debug("Notifying to file") self._notify_file(item, self.notify_url) def _notify_http(self, item, url, headers={}): """ Send a notification to a HTTP endpoint """ headers['Content-Type'] = 'application/json' requests.post(url, headers=headers, data=json.dumps(asdict(item))) def _notify_file(self, item, notification_dir): """ Save a notification to file """ if notification_dir is None: notification_dir = self.notify_url resultfile = f"{item.jobid}.{item.origin}" with open(path.join(notification_dir, resultfile), 'w') as f: json.dump(asdict(item), f)