play-daemon/daemon/pipeline.py
2022-04-01 16:34:47 +02:00

222 lines
8.3 KiB
Python

import json
import logging
import os
import requests
import shutil
import ffmpeg
import packager
from ldap import Ldap
from transcoder import Transcoder
from thumbnailer import Thumbnailer
class Pipeline:
def __init__(self, config):
self.logger = logging.getLogger('play-daemon')
self.queue = config['daemon']['queue']
self.storage = config['daemon']['storage']
self.error = config['daemon']['error']
self.processing = config['daemon']['processing']
self.notify_url = config['daemon']['notify_url']
token = config['daemon']['notify_token']
self.headers = {'Authorization': f'Bearer {token}',
'Accept': 'application/json',
'Content-Type': 'application/json'}
self.packagers = {'cattura': packager.Cattura(config),
'mediasite': packager.Mediasite(config),
'manual': packager.Manual(),
'update': packager.Update()}
self.thumbnailer = Thumbnailer(config['thumbnailer'],
Ldap(config['ldap']))
self.transcoder = Transcoder(config)
def log_error(self, message_tmpl, pres_id, *args, **kwargs):
message = message_tmpl % (pres_id, *args)
self.logger.error(message, **kwargs)
with open(os.path.join(self.error, pres_id), 'w') as f:
f.write(message)
# Main processing entry point
def process(self, pres_id, pres_data):
package = None
ptype = pres_data['type']
if ptype == 'cleanup':
# Not a real package but a cleanup job
data = pres_data['data']
self.logger.info('%s - Cleaning up due to error in upload: %s',
pres_id, data['message'])
try:
shutil.rmtree(data['base'])
except FileNotFoundError as e:
self.logger.info('%s - Given base attribute not found: %s',
pres_id, data['base'])
os.remove(os.path.join(self.queue, pres_id))
return
elif ptype not in self.packagers:
# Bail on invalid package types
self.log_error('%s - Unknown presentation format: %s',
pres_id, ptype)
return
self.logger.info('%s - Creating %s package', pres_id, ptype)
try:
# Origin-specific handling
# Create initial package
package = self.packagers[ptype].pack(pres_id, pres_data)
except packager.PackageException as e:
self.log_error('%s - Error when packing: %s',
pres_id,
json.dumps(pres_data,
sort_keys=True,
indent=2),
exc_info=e)
return
self.logger.info('%s - Package creation successful', pres_id)
# Create id dir in processing
# Add workbase to package
package = self.mkworkdir(package)
# Create thumbnail if none exists
package = self.thumbnailer.create_if_missing(package)
try:
# Transcoding and thumb generation
# Put new files in processing
# Update package with new relative names
package = self.transcoder.convert(package)
except ffmpeg.Error as e:
self.log_error('%s - Error when transcoding: %s\n%s',
pres_id,
json.dumps(package,
sort_keys=True,
indent=2),
e.stderr)
return
except Exception as e:
self.log_error('%s - Error when transcoding: %s',
pres_id,
json.dumps(package,
sort_keys=True,
indent=2),
exc_info=e)
return
# Pick up any passed subs
package = self.pickup_subs(package)
# Stash originals if we're in debugging mode
if self.logger.getEffectiveLevel() <= logging.DEBUG:
package = self.stash(package)
try:
# Platform notification
package = self.notify(package)
except NotificationException as e:
message = '%s - Error when notifying platform: %s %s\n' \
+ 'Package contents: %s'
self.log_error(message,
pres_id,
e.result.status_code,
e.result.text,
json.dumps(package,
sort_keys=True,
indent=2),
exc_info=e)
return
# Finalize processing
# Remove base, workbase, orig_id, notification_id
# Save package to disk
# Move processing dir to processed
# Delete queue file
# Delete base files
self.finalize(package)
# Create a working directory
def mkworkdir(self, package):
workbase = os.path.join(self.processing, package['id'])
if os.path.exists(workbase):
shutil.rmtree(workbase)
os.mkdir(workbase)
package['workbase'] = workbase
return package
# Move subs to working directory
def pickup_subs(self, package):
subs = package.get('subtitles', '')
if subs:
subname = os.path.basename(subs)
shutil.copy(os.path.join(package['base'], subs),
os.path.join(package['workbase'], subname))
package['subtitles'] = subname
return package
# Stash original files and queue file inside presentation directory
def stash(self, package):
pres_id = package['id']
shutil.copytree(package['base'], os.path.join(package['workbase'],
'originals'))
shutil.copy(os.path.join(self.queue, pres_id),
os.path.join(package['workbase'], 'queueitem.json'))
return package
# Notify platform of presentation
def notify(self, package):
pres_id = package['id']
note = dict(package)
if 'orig_id' in note:
note['id'] = note.pop('orig_id')
if 'slides' in note:
note.pop('slides')
del(note['base'])
del(note['workbase'])
result = requests.post(self.notify_url,
headers=self.headers,
data=json.dumps(note))
if result.ok:
message = '%s - Notification successful'
if pres_id != note['id']:
message = '%s (updating %s)' % (message, note['id'])
self.logger.info(message, pres_id)
else:
raise NotificationException(json.dumps(note), result)
return package
def finalize(self, package):
target_id = package['id']
if 'orig_id' in package:
target_id = package['orig_id']
minpackage = {'id': target_id,
'thumb': package['thumb'],
'sources': package['sources']}
work_path = package['workbase']
# Save the minimal package to disk
with open(os.path.join(work_path, 'package.json'), 'x') as f:
f.write(json.dumps(minpackage))
# Move files to final destination, taking updates into account
target_path = os.path.join(self.storage, target_id)
if os.path.exists(target_path):
temp_path = target_path + '.old'
if os.path.exists(temp_path):
shutil.rmtree(temp_path)
shutil.move(target_path, temp_path)
shutil.move(work_path, target_path)
# Delete queue file and upload data
os.remove(os.path.join(self.queue, target_id))
try:
# if any old error file remove it since it worked
os.remove(os.path.join(self.error, target_id))
except OSError:
pass
shutil.rmtree(package['base'])
return
class NotificationException(Exception):
def __init__(self, package, result):
self.package = package
self.result = result