Added a preprocessor for arec recorders.

Also moved the code for creating a basic jobspec and pulling information from
the relevant daisy booking into the preprocessor superclass so it can be
called by both the cattura and arec preprocessors.
This commit is contained in:
Erik Thuning 2024-06-04 15:49:07 +02:00
parent 49d1da804b
commit dec2da4310
3 changed files with 138 additions and 31 deletions
pipeline/preprocessors

@ -0,0 +1,97 @@
import json
import os
import re
from datetime import datetime
from pathlib import Path
from .preprocessor import Preprocessor
@Preprocessor.register
class ArecProcessor(Preprocessor):
'''
Preprocessing for arec jobs.
Requires that at least CH1 be present in the upload. This is not checked
by the validation function - it must be configured properly when the
recorder is installed.
CH1 is used for audio playback and subtitles generation, so should be
the camera input.
'''
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# This regex matches the name format for the individual channel files
# in a capture, which is this format: 1970_12_31_23_45_00_CH?_Name.mp4
# The above ? represents a number between 1 and 4.
# Capture group 1 is only there to shorten the regex.
# Capture group 2 capures the channel number,
# capture group 3 captures the channel name.
self.name_regex = re.compile(
'^\d{4}(_\d{2}){5}_CH([1-4])_([^.]+).mp4$')
def validate(self, queueitem):
if 'upload_dir' not in queueitem.keys():
raise KeyError('upload_dir missing from job specification.')
upload_dir = Path(queueitem['upload_dir'])
if not upload_dir.exists():
raise ValueError('Specified upload_dir does not exist.')
data = self._parse_arec_json(upload_dir)
required_strings = ['Title',
'Device_hostname']
required_datetimes = ['Start_time',
'End_time']
for key in required_keys + required_datetimes:
if key not in data:
raise KeyError(f'{key} missing from arec json file.')
for key in required_datetimes:
# Will throw an exception if the format is invalid
datetime.fromisoformat(data[key])
return True
def _preprocess(self, job):
jobid = job['jobid']
queueitem = job['queueitem']
upload_dir = Path(queueitem['upload_dir'])
arec_data = self._parse_arec_json(upload_dir)
raw_title = data['Title']
recorder = data['Device_hostname']
room_id = self.config[recorder]
starttime = datetime.fromisoformat(data['Start_time']).timestamp()
endtime = datetime.fromisoformat(data['End_time']).timestamp()
outspec = self._init_jobspec(upload_dir,
starttime,
raw_title)
self._fill_jobspec_from_daisy(starttime, endtime, room_id, outspec)
# Populate the sources
sources = {}
for item in upload_dir.iterdir():
match = self.name_regex.match(item.name)
if match:
item_channel_no, item_channel_name = match.groups(2, 3)
source = {'video': item.name,
'poster': '',
'playAudio': False}
if item_channel_no == '1':
source['playAudio'] = True
sources[f'Channel {item_channel_no}'] = source
outspec['sources'] = sources
# Configure subtitle generation settings
outspec['generate_subtitles'] = {'Generated': {'type': 'whisper',
'source': 'Channel 1'}}
return {'jobid': jobid,
'jobspec': outspec}
def _parse_arec_json(self, upload_dir: Path) -> dict:
with open(upload_dir / 'information.json') as f:
data = json.load(f)
return {item['name']: item['value'] for item in data}

@ -29,6 +29,7 @@ class CatturaProcessor(Preprocessor):
jobid = job['jobid']
queueitem = job['queueitem']
recorder = queueitem['recorder']
room_id = self.config[recorder]
data = queueitem['data']
info = self._find_packageinfo(data['publishedOutputs'])
presname = info['name']
@ -38,16 +39,8 @@ class CatturaProcessor(Preprocessor):
with open(os.path.join(path, pkgfile)) as f:
mediapackage = json.load(f)
outspec = {'upload_dir': path,
'created': 0,
'title': {'sv': presname,
'en': presname},
'description': '',
'presenters': [],
'courses': [],
'thumb': '',
'tags': [],
'sources': {}}
outspec = self._init_jobspec(path, 0, presname)
duration = None
outputs = mediapackage['outputs']
@ -80,29 +73,9 @@ class CatturaProcessor(Preprocessor):
outspec['sources'][srcname] = source
outspec['tags'].append(
self.daisy.get_room_name(self.config[recorder]))
start = datetime.fromtimestamp(outspec['created'])
end = start + timedelta(seconds=duration)
booking = self.daisy.get_booking(start, end,
self.config[recorder])
if booking is not None:
title = {'sv': booking['displayStringLong']['swedish'],
'en': booking['displayStringLong']['english']}
outspec['title'] = title
outspec['presenters'] = booking['teachers']
if booking['description']:
outspec['description'] = booking['description']
if not outspec['presenters'] and booking['bookedBy']:
outspec['presenters'].append(booking['bookedBy'])
outspec['courses'] = booking['courseSegmentInstances']
if booking['educationalType']:
for i in booking['educationalType'].values():
# Add both english and swedish name;
# sometimes they're identical so don't add twice
if i not in outspec['tags']:
outspec['tags'].append(i)
self._fill_jobspec_from_daisy(start, end, room_id, outspec)
return {'jobid': jobid,
'jobspec': outspec}

@ -21,6 +21,43 @@ class Preprocessor(QueueThread, metaclass=ABCMeta):
"""
self.distributor.put(self._preprocess(job))
def _init_jobspec(self, upload_dir=None, created=0, title=''):
jobspec = {'created': created,
'title': {'sv': title,
'en': title},
'description': '',
'presenters': [],
'courses': [],
'thumb': '',
'tags': [],
'sources': {}}
if upload_dir is not None:
jobspec['upload_dir'] = str(upload_dir)
return jobspec
def _fill_jobspec_from_daisy(self, starttime, endtime, room_id, outspec):
booking = self.daisy.get_booking(starttime, endtime, room_id)
if booking is not None:
title = {'sv': booking['displayStringLong']['swedish'],
'en': booking['displayStringLong']['english']}
outspec['title'] = title
outspec['presenters'] = booking['teachers']
if booking['description']:
outspec['description'] = booking['description']
if not outspec['presenters'] and booking['bookedBy']:
outspec['presenters'].append(booking['bookedBy'])
outspec['courses'] = booking['courseSegmentInstances']
if booking['educationalType']:
for i in booking['educationalType'].values():
# Add both english and swedish name;
# sometimes they're identical so don't add twice
if i not in outspec['tags']:
outspec['tags'].append(i)
outspec['tags'].append(
self.daisy.get_room_name(room_id))
@abstractmethod
def validate(self, queueitem):
"""