Inital commit

This commit is contained in:
Erik Thuning 2022-09-15 15:09:25 +02:00
commit 74bc0598e7
8 changed files with 339 additions and 0 deletions

4
.gitignore vendored Normal file

@ -0,0 +1,4 @@
*~
__pycache__/*
*.pyc
queue/*

104
collector.py Normal file

@ -0,0 +1,104 @@
from queuethread import QueueThread
'''
This class keeps track of pending actions and is responsible
for cleaning up once all pending actions are done.
'''
class Collector(QueueThread):
def __init__(self, notifier):
super().__init__()
self.notifier = notifier
self.tracker = {}
'''
Submit pending actions for a job.
'''
def put_pending(self, jobid, actions):
super().put({'jobid': jobid,
'state': 'start',
'actions': actions})
'''
Submit a done notice for a job action.
'''
def put_done(self, jobid, action, package):
super().put({'jobid': jobid,
'state': 'done',
'action': action,
'package': package})
'''
Handle a pending notice.
Throws an exception if the jobid is already known.
'''
def _handle_pending(self, item):
jobid = item['jobid']
actions = item['actions']
if jobid in self.tracker:
raise Exception(f"Job {jobid} has already been submitted")
self.tracker[jobid] = Pending(jobid, actions)
'''
Handle a done notice. Submits a notification to the notifier. Discards
the pending object if no more actions are pending.
Throws an exception on unknown jobids.
'''
def _handle_done(self, item):
jobid = item['jobid']
if jobid not in self.tracker:
raise Exception(f"Job {jobid} not found in tracker")
all_done = self.tracker[jobid].finish(item['action'])
self._notify_done(item, self.tracker[jobid])
if all_done:
del self.tracker[jobid]
'''
Work through the queue.
Throws an exception on invalid state values.
'''
def _handle(self, item):
state = item['state']
if state == 'start':
self._handle_pending(item)
elif state == 'done':
self._handle_done(item)
else:
raise Exception(f"Invalid state {state} for {item['jobid']}")
'''
Submit a done notification to the notifier.
'''
def _notify_done(self, item, pending):
self.notifier.put({'type': 'action_done',
'action': item['action'],
'jobid': item['jobid'],
'package': item['package'],
'pending': pending.actions})
'''
Utility class to represent pending actions.
'''
class Pending:
def __init__(self, jobid, actions):
self.jobid = jobid
self.actions = actions
'''
Finish a pending action. Returns True if no more actions are pending,
otherwise False.
Throws an exception if the given action is missing from the pending list.
'''
def finish(self, action):
if action not in self.actions:
raise Exception(f"Collector.pending.{self.jobid}: Cannot finish missing action {action}")
self.actions.remove(action)
if not self.actions:
return True
else:
return False

55
distributor.py Normal file

@ -0,0 +1,55 @@
from queuethread import QueueThread
'''
This queue accepts all items and passes them along to registered handlers
by asking each handler if it wants the item.
'''
class Distributor(QueueThread):
def __init__(self, collector, notifier, handlers={}):
super().__init__()
self.collector = collector
self.notifier = notifier
self.handlers = handlers
'''
Add a handler identified by the provided name. Throws an exception
if the name is taken.
'''
def addHandler(name, handler):
if name in handler:
raise Exception(
f'There is already a handler registered at {name}')
handlers[name] = handler
'''
Walk through all registered handlers, check if the handler wants the
item and enqueue it with the handler(s) that do.
If the item fails validation for a handler that wants the item, the item
is not added to any queue and an error notification is sent,
discarding the item.
'''
def _handle(self, item):
wants = {}
for name, handler in self.handlers.items():
try:
if handler.wants(item['jobspec']):
wants[name] = handler
except Exception as e:
self._notify_error(name, item, e)
return
self.collector.put_pending(item['jobid'], wants.keys())
for handler in wants.values():
handler.put(item['jobspec'])
return
'''
Queue up an error notification with the notifier.
'''
def _notify_error(self, handler, item, exception):
self.notifier.put({'type': 'error',
'source': handler,
'message': exception.message,
'jobid': item['jobid'],
'jobspec': item['jobspec']})

43
handler.py Normal file

@ -0,0 +1,43 @@
from abc import ABCMeta, abstractmethod
from queuethread import QueueThread
'''
Base class for handlers to be registered with a QueuePickup instance.
'''
@QueueThread.register
class Handler(QueueThread, metaclass=ABCMeta):
def __init__(self, collector):
super().__init__()
self.collector = collector
def _notify_done(self, item, package):
'''
Validate basic item sanity. Currently checks only for the presence
of an 'id' field.
If validation fails, an exception is thrown.
Subclasses should overload this method with their own sanity checks,
but should also call this method first.
'''
@abstractmethod
def _validate(self, jobspec):
if 'id' not in jobspec:
raise Exception('ID is missing!')
return
'''
Abstract method to check if this handler is interested in a given queue
item.
To be implemented by subclasses.
Should return True if the item is wanted, otherwise False.
Should validate the item according to its own requirements as part of
the process. Exceptions thrown by validation should not be caught here.
'''
@abstractmethod
def wants(self, jobspec):
pass

16
notifier.py Normal file

@ -0,0 +1,16 @@
from queuethread import QueueThread
'''
This queue handles notifications to a third party regarding queue
processing status.
'''
class Notifier(QueueThread):
def __init__(self):
super().__init__()
'''
Process notification queue
'''
def _handle(self, item):
print(item)

23
queuereader.py Normal file

@ -0,0 +1,23 @@
import json
import os
from watchdog.events import FileSystemEventHandler
class QueueReader(FileSystemEventHandler):
def __init__(self, queue):
self.queue = queue
def _handle(self, path):
if not os.path.isfile(path):
return
with open(path) as f:
data = json.load(f)
self.queue.put({'jobid': os.path.basename(path),
'jobspec': data})
def on_created(self, event):
self._handle(event.src_path)
def on_moved(self, event):
self._handle(event.dest_path)

33
queuethread.py Normal file

@ -0,0 +1,33 @@
from abc import ABCMeta, abstractmethod
from queue import Queue
from threading import Thread
'''
Base class for a thread governing a queue.
'''
class QueueThread(Thread, metaclass=ABCMeta):
def __init__(self):
super().__init__()
self.queue = Queue()
def run(self):
while True:
item = self.queue.get()
self._handle(item)
return
'''
Add an item to be processed by this thread.
'''
def put(self, item):
self.queue.put(item)
return
'''
Handle an item in the queue.
To be implemented by a more specific subclass.
'''
@abstractmethod
def _handle(self, item):
pass

61
test.py Normal file

@ -0,0 +1,61 @@
from watchdog.observers import Observer
from collector import Collector
from distributor import Distributor
from handler import Handler
from notifier import Notifier
@Handler.register
class FooHandler(Handler):
def wants(self, item):
if 'foo' in item:
self._validate(item)
return True
return False
def _validate(self, item):
super()._validate(item)
for key in 'foodep', 'baz':
if key not in item:
raise Exception(f"{item['id']}: {key} missing!")
def _handle(self, item):
print(f"FooHandler: {item['id']}-{item['foo']}-{item['foodep']}-{item['baz']}")
self.collector.put_done(item['id'], 'foo', 'boguspackage')
@Handler.register
class BarHandler(Handler):
def wants(self, item):
if 'bar' in item:
self._validate(item)
return True
def _validate(self, item):
super()._validate(item)
if 'baz' not in item:
raise Exception(f"{item['id']}: baz missing!")
def _handle(self, item):
print(f"BarHandler: {item['id']}@{item['bar']}@{item['baz']}")
self.collector.put_done(item['id'], 'bar', 'boguspackage')
if __name__ == '__main__':
notifier = Notifier()
collector = Collector(notifier)
handlers = {'foo': FooHandler(collector),
'bar': BarHandler(collector)}
distrib = Distributor(collector, notifier, handlers)
watcher = Observer()
watcher.schedule(QueueUpdater(distrib), './queue/')
notifier.start()
collector.start()
for handler in handlers.values():
handler.start()
distrib.start()
watcher.start()