Added optional support for user-specified notification endpoints in packages

This commit is contained in:
Erik Thuning 2024-03-26 15:47:39 +01:00
parent 19498e64f9
commit 6c11d5578f
7 changed files with 104 additions and 65 deletions

@ -34,40 +34,6 @@ The unit file (with any necessary modifications made) should be copied to
`/etc/systemd/system` and activated as normal.
## Changes from the previous daemon
* Keys are no longer mandatory by default. Omitted keys will be ignored,
and empty/null values generally mean to delete existing content for that
key. The exception to this is thumbnails/posters, which will be
(re)generated if an empty/null value is passed for them.
* The `update` job type has been deprecated. Updates are now identified by
the presence of a `pkg_id` key in the job specification.
* The `base` key in the old package format has been renamed to `upload_dir`.
* `duration` may no longer be specified in the job. Presentation duration is
instead computed from the provided video files directly.
* The value format for the `sources` key has changed. Instead of accepting a
list of source objects that each has a `name` key, it now expects an object
mapping source names to source objects. Source objects should no longer
contain a `name` key.
* The value format for the `subtitles` key has changed. Multiple subtitle
tracks are now supported, see the heading [Subtitles](#subtitles)
for details.
* The new key `generate_subtitles` has been added. See the heading
[Subtitles](#subtitles) for details.
* The `slides` key is no longer unique to mediasite jobs, any job can contain
a single slide stream.
* Notifications have been completely overhauled. See the heading
[Notifications](#notifications) for details.
## Workflow overview
A job is submitted by first uploading any required files to a unique
@ -161,6 +127,13 @@ Valid top-level keys and their expected values are:
The directory where any files relevant to this job have been uploaded. Must
be a subdirectory of `uploaddir` as specified in `config.ini`.
* `notification_url`: `string`
The remote endpoint where notifications about this job should be sent via
HTTP POST request. Must not require any authentication. If omitted, the
default endpoint configured in `config.ini` is used instead.
For testing purposes this can also be a local directory on the server,
which exists and is writable by the user the daemon is running as.
* `title`: `JSON`
The title in the supported languages. Its format is:
```json

@ -105,18 +105,23 @@ class Collector(QueueThread):
as package):
item.apply_func(package)
except Exception as e:
url = pending.jobspec.get('notification_url', None)
self.notifier.put(ErrorNote(jobid,
url,
self.__class__.__name__,
e))
pending.failed = True
else:
all_done = pending.finish(item.handler)
self.notifier.put(
DoneNote(jobid,
item.handler.__class__.__name__,
package.asdict(),
[handlerclass.__name__
for handlerclass in pending.handlers]))
url = pending.jobspec.get('notification_url', None)
pending_handlers = [handlerclass.__name__
for handlerclass in pending.handlers]
notification = DoneNote(jobid,
url,
item.handler.__class__.__name__,
package.asdict(),
pending_handlers)
self.notifier.put(notification)
if all_done:
self.trackers.pop(jobid)
(self.pkgid_cache / jobid).unlink(missing_ok=True)
@ -130,7 +135,9 @@ class Collector(QueueThread):
raise Exception(f"Job {jobid} not found in tracker")
pending = self.trackers[jobid]
pending.failed = True
url = pending.jobspec.get('notification_url', None)
self.notifier.put(ErrorNote(jobid,
url,
origin.__class__.__name__,
exception))

@ -59,7 +59,9 @@ class Distributor(QueueThread):
handlerclass.__name__)
wants[handlerclass] = handlerqueue
except ValidationException as e:
url = queueitem.get('notification_url', None)
return self.notifier.put(ErrorNote(jobid,
url,
handlerclass.__name__,
e))
self.collector.put(PendingItem(queueitem, list(wants.keys())))

@ -10,8 +10,25 @@ import requests
from .queuethread import QueueThread
'''
Note classes are used by other classes submitting information
to the notifier.
Notification classes are used internally in the notifier class to format the
actual messages sent out.
'''
@dataclass
class DoneNote:
jobid: str
url: str
origin: str
package: dict
pending: list
@dataclass
class DoneNotification:
jobid: str
origin: str
package: dict
@ -22,6 +39,7 @@ class DoneNote:
@dataclass
class ErrorNote:
jobid: str
url: str
origin: str
exception: Exception
@ -46,16 +64,7 @@ class Notifier(QueueThread):
self.actions = {DoneNote: self._process_done,
ErrorNote: self._process_error}
# Select notification function based on the destination
if self.notify_url.startswith(('http://', 'https://')):
self.logger.info("Notifying to HTTP endpoint")
self.headers = {'Authorization': f"Bearer {config['token']}",
'Accept': 'application/json',
'Content-Type': 'application/json'}
self._notify = self._notify_http
else:
self.logger.info("Notifying to file")
self._notify = self._notify_file
self.token = config.get('token', None)
def _process(self, item):
if type(item) not in self.actions:
@ -67,31 +76,62 @@ class Notifier(QueueThread):
item.jobid,
item.origin,
exc_info=item.exception)
self._notify(ErrorNotification(item.jobid,
item.origin,
str(item.exception)))
notification = ErrorNotification(item.jobid,
item.origin,
str(item.exception))
self._notify(notification, item.url)
# Avoid removing failed jobs' queue files for debuggability
#remove(path.join(self.queuedir, item.jobid))
def _process_done(self, item):
self._notify(item)
notification = DoneNotification(item.jobid,
item.origin,
item.package,
item.pending)
self._notify(notification, item.url)
self.logger.info("%s - Notification sent:\n%s",
item.jobid, pformat(item))
item.jobid, pformat(notification))
if not item.pending:
remove(path.join(self.queuedir, item.jobid))
def _notify_http(self, item):
def _notify(self, item, url):
if url is None:
self._notify_fallback(item)
elif url.startswith(('http://', 'https://')):
self.logger.debug("Notifying to HTTP endpoint %s",
url)
self._notify_http(item, url)
else:
self.logger.debug("Notifying to file %s", url)
self._notify_file(item, url)
def _notify_fallback(self, item):
if self.notify_url.startswith(('http://', 'https://')):
self.logger.debug("Notifying to default HTTP endpoint")
headers = {}
if self.token is not None:
headers['Authorization'] = f"Bearer {self.token}"
self._notify_http(item, self.notify_url, headers)
else:
self.logger.debug("Notifying to file")
self._notify_file(item, self.notify_url)
def _notify_http(self, item, url, headers={}):
"""
Send a notification to a HTTP endpoint
"""
requests.post(self.notify_url,
headers=self.headers,
headers['Content-Type'] = 'application/json'
requests.post(url,
headers=headers,
data=json.dumps(asdict(item)))
def _notify_file(self, item):
def _notify_file(self, item, notification_dir):
"""
Save a notification to file
"""
if notification_dir is None:
notification_dir = self.notify_url
resultfile = f"{item.jobid}.{item.origin}"
with open(path.join(self.notify_url, resultfile), 'w') as f:
with open(path.join(notification_dir, resultfile), 'w') as f:
json.dump(asdict(item), f)

@ -61,7 +61,9 @@ class QueueReader(FileSystemEventHandler):
job['jobid'],
self.preprocessors[_type].__class__.__name__)
except (KeyError, ValueError) as e:
url = job.get('notification_url', None)
self.notifier.put(ErrorNote(job['jobid'],
url,
self.__class__.__name__,
e))

@ -25,6 +25,7 @@ canonical_jobspec = {
'playAudio': bool,
'video': str}},
'slides': str,
'notification_url': str,
}
"""

24
test.py

@ -243,11 +243,15 @@ class PipelineTest(DaemonTest):
json.dump(queuedata, f)
return jobid
def wait_for_result(self, jobid, handlers, timeout=10):
def wait_for_result(self, jobid, handlers,
notification_url=None, timeout=10):
resultfiles = {}
final_result = None
if notification_url is None:
notification_url = self.notify_url
for handler in handlers:
resultfiles = {path.join(self.notify_url,
resultfiles = {path.join(notification_url,
f"{jobid}.{handler}"): False
for handler in handlers}
for _try in range(1, timeout+1):
@ -295,10 +299,17 @@ class PipelineTest(DaemonTest):
return final_result
def init_job(self, pkgid=False, subs=False, thumb=False,
def init_job(self, pkgid=False, url=False, subs=False, thumb=False,
source_count=0, poster_count=0):
jobspec = {}
if url == True:
notification_dir = path.join(self.testbase,
'results',
str(uuid.uuid4()))
makedirs(notification_dir)
jobspec['notification_url'] = notification_dir
# Set pkg_id
if pkgid == True:
# Generate new
@ -392,10 +403,13 @@ class PipelineTest(DaemonTest):
#@unittest.skip
def test_subtitles(self):
jobspec = self.init_job(subs=True)
jobspec = self.init_job(subs=True, url=True)
jobid = self.submit_default_job(jobspec)
result = self.wait_for_result(jobid, ['SubtitlesImportHandler'])
url = jobspec['notification_url']
result = self.wait_for_result(jobid,
['SubtitlesImportHandler'],
notification_url=url)
package_id = result['package']['pkg_id']
with PackageManager(package_id, path.join(self.pipeconf['packagedir'],