play-daemon/daemon/pipeline.py
2022-09-06 10:40:20 +02:00

234 lines
8.8 KiB
Python

import json
import logging
import os
import requests
import shutil
import ffmpeg
import packager
from ldap import Ldap
from transcoder import Transcoder
from thumbnailer import Thumbnailer
from updater import Updater
class Pipeline:
def __init__(self, config):
self.logger = logging.getLogger('play-daemon')
self.queue = config['daemon']['queue']
self.storage = config['daemon']['storage']
self.error = config['daemon']['error']
self.processing = config['daemon']['processing']
self.notify_url = config['daemon']['notify_url']
self.jsonfile = 'package.json'
token = config['daemon']['notify_token']
self.headers = {'Authorization': f'Bearer {token}',
'Accept': 'application/json',
'Content-Type': 'application/json'}
self.packagers = {'cattura': packager.Cattura(config),
'mediasite': packager.Mediasite(config),
'manual': packager.Manual(),
'update': packager.Update()}
self.thumbnailer = Thumbnailer(config['thumbnailer'],
Ldap(config['ldap']))
self.transcoder = Transcoder(config)
self.updater = Updater(self.storage, self.jsonfile)
def log_error(self, message_tmpl, pres_id, *args, **kwargs):
message = message_tmpl % (pres_id, *args)
self.logger.error(message, **kwargs)
with open(os.path.join(self.error, pres_id), 'w') as f:
f.write(message)
# Main processing entry point
def process(self, pres_id, pres_data):
package = None
ptype = pres_data['type']
if ptype == 'cleanup':
# Not a real package but a cleanup job
data = pres_data['data']
self.logger.info('%s - Cleaning up due to error in upload: %s',
pres_id, data['message'])
try:
shutil.rmtree(data['base'])
except FileNotFoundError as e:
self.logger.info('%s - Given base attribute not found: %s',
pres_id, data['base'])
os.remove(os.path.join(self.queue, pres_id))
return
elif ptype not in self.packagers:
# Bail on invalid package types
self.log_error('%s - Unknown presentation format: %s',
pres_id, ptype)
return
self.logger.info('%s - Creating %s package', pres_id, ptype)
try:
# Origin-specific handling
# Create initial package
package = self.packagers[ptype].pack(pres_id, pres_data)
except packager.PackageException as e:
self.log_error('%s - Error when packing: %s',
pres_id,
json.dumps(pres_data,
sort_keys=True,
indent=2),
exc_info=e)
return
self.logger.info('%s - Package creation successful', pres_id)
# Create id dir in processing
# Add workbase to package
package = self.mkworkdir(package)
# Copy any passed subs to workdir
package = self.pickup_subs(package)
# Copy passed thumbnail to workdir or generate one
package = self.thumbnailer.pickup(package)
try:
# Transcoding and poster generation
# Put new files in workdir
# Update package with new relative names
package = self.transcoder.convert(package)
except ffmpeg.Error as e:
self.log_error('%s - Error when transcoding: %s\n%s',
pres_id,
json.dumps(package,
sort_keys=True,
indent=2),
e.stderr)
return
except Exception as e:
self.log_error('%s - Error when transcoding: %s',
pres_id,
json.dumps(package,
sort_keys=True,
indent=2),
exc_info=e)
return
# If the job is an update, this will integrate unchanged originals
package = self.updater.integrate_update(package)
# Stash originals if we're in debugging mode
if self.logger.getEffectiveLevel() <= logging.DEBUG:
package = self.stash(package)
try:
# Platform notification
package = self.notify(package)
except NotificationException as e:
message = '%s - Error when notifying platform: %s %s\n' \
+ 'Package contents: %s'
self.log_error(message,
pres_id,
e.result.status_code,
e.result.text,
json.dumps(package,
sort_keys=True,
indent=2),
exc_info=e)
return
# Finalize processing
# Remove base, workbase, orig_id, notification_id
# Save package to workdir
# Move workdir to processed
# Delete queue file
# Delete base files
self.finalize(package)
# Create a working directory
def mkworkdir(self, package):
workbase = os.path.join(self.processing, package['id'])
if os.path.exists(workbase):
shutil.rmtree(workbase)
os.mkdir(workbase)
package['workbase'] = workbase
return package
# Move subs to working directory
def pickup_subs(self, package):
subs = package.get('subtitles', '')
if subs:
subname = os.path.basename(subs)
shutil.copy(os.path.join(package['base'], subs),
os.path.join(package['workbase'], subname))
package['subtitles'] = subname
return package
# Stash original files and queue file inside presentation directory
def stash(self, package):
pres_id = package['id']
workbase = package['workbase']
shutil.copy(os.path.join(self.queue, pres_id),
os.path.join(workbase, 'queueitem.json'))
if 'orig_id' in package:
shutil.copy(os.path.join(self.storage, package['orig_id']),
os.path.join(workbase, 'originals'))
shutil.copytree(package['base'],
os.path.join(workbase, 'incoming'))
return package
# Notify platform of presentation
def notify(self, package):
pres_id = package['id']
note = dict(package)
if 'orig_id' in note:
note['id'] = note.pop('orig_id')
if 'slides' in note:
note.pop('slides')
del(note['base'])
del(note['workbase'])
result = requests.post(self.notify_url,
headers=self.headers,
data=json.dumps(note))
if result.ok:
message = '%s - Notification successful'
if pres_id != note['id']:
message = '%s (updating %s)' % (message, note['id'])
self.logger.info(message, pres_id)
else:
raise NotificationException(json.dumps(note), result)
return package
def finalize(self, package):
target_id = package['id']
if 'orig_id' in package:
target_id = package['orig_id']
minpackage = {'id': target_id,
'thumb': package['thumb'],
'subtitles': package['subtitles'],
'sources': package['sources']}
workbase = package['workbase']
# Save the minimal package to disk
with open(os.path.join(workbase, self.jsonfile), 'x') as f:
f.write(json.dumps(minpackage))
target_path = os.path.join(self.storage, target_id)
if 'orig_id' in package:
# The package is an update; old data exists
# and is to be replaced
shutil.rmtree(target_path)
shutil.move(workbase, target_path)
# Delete queue file and upload data
os.remove(os.path.join(self.queue, target_id))
# Remove any error file since everything has worked now
errorfile = os.path.join(self.error, target_id)
if os.path.isfile(errorfile):
os.remove(errorfile)
# Remove files from incoming
shutil.rmtree(package['base'])
return
class NotificationException(Exception):
def __init__(self, package, result):
self.package = package
self.result = result