play-daemon/daemon/pipeline.py
2021-10-26 15:59:04 +02:00

185 lines
7.1 KiB
Python

import json
import logging
import os
import requests
import shutil
import ffmpeg
import packager
from transcoder import Transcoder
class Pipeline:
def __init__(self, config):
self.logger = logging.getLogger('play-daemon')
self.queue = config['daemon']['queue']
self.storage = config['daemon']['storage']
self.error = config['daemon']['error']
self.processing = config['daemon']['processing']
self.notify_url = config['daemon']['notify_url']
token = config['daemon']['notify_token']
self.headers = {'Authorization': 'Bearer {}'.format(token),
'Accept': 'application/json',
'Content-Type': 'application/json'}
self.packagers = {'cattura': packager.Cattura(config),
'mediasite': packager.Mediasite(config),
'manual': packager.Manual(),
'update': packager.Update()}
self.transcoder = Transcoder(config)
# Main processing entry point
def process(self, pres_id, pres_data):
package = None
ptype = pres_data['type']
if ptype == 'cleanup':
# Not a real package but a cleanup job
data = pres_data['data']
logging.info("%s - Cleaning up due to error in upload: %s",
pres_id, data['message'])
os.remove(data['base'])
os.remove(os.path.join(self.queue, pres_id))
return
elif ptype not in self.packagers:
# Bail on invalid package types
self.logger.error('%s - Unknown presentation format: %s',
pres_id, ptype)
return
try:
# 1. Origin-specific handling
# Create initial package
package = self.packagers[ptype].pack(pres_id, pres_data)
except packager.PackageException as e:
self.logger.error('%s - Error when packing: %s',
pres_id, json.dumps(pres_data,
sort_keys=True,
indent=2),
exc_info=e)
return
self.logger.info("%s - Package creation successful", pres_id)
# 2. Create id dir in processing
# Add workbase to package
package = self.mkworkdir(package)
try:
# 3. transcoding and thumb generation
# Put new files in processing
# Update package with new relative names
package = self.transcoder.convert(package)
except ffmpeg.Error as e:
self.logger.error('%s - Error when transcoding: %s\n%s',
pres_id, json.dumps(package,
sort_keys=True,
indent=2), e.stderr)
return
except Exception as e:
self.logger.error('%s - Error when transcoding: %s',
pres_id, json.dumps(package,
sort_keys=True,
indent=2),
exc_info=e)
return
# 4. stash originals
# Move original files to processing subdir
package = self.stash(package)
try:
# 5. platform notification
package = self.notify(package)
except NotificationException as e:
message = '%s - Error when notifying platform: %s %s\n' \
+ 'Package contents: %s'
self.logger.error(message,
pres_id,
e.result.status_code, e.result.text,
json.dumps(package,
sort_keys=True,
indent=2),
exc_info=e)
return
# 6. finalize processing
# Remove base, workbase, update_id, notification_id
# Save package to disk
# Move processing dir to processed
# Delete queue file
# Delete base files
self.finalize(package)
# Create a working directory
def mkworkdir(self, package):
workbase = os.path.join(self.processing, package['id'])
if os.path.exists(workbase):
shutil.rmtree(workbase)
os.mkdir(workbase)
package['workbase'] = workbase
return package
# Stash original files and queue file inside presentation directory
def stash(self, package):
pres_id = package['id']
shutil.copytree(package['base'], os.path.join(package['workbase'],
'originals'))
shutil.copy(os.path.join(self.queue, pres_id),
os.path.join(package['workbase'], 'queueitem.json'))
return package
# Notify platform of presentation
def notify(self, package):
pres_id = package['id']
note = dict(package)
if 'update_id' in note:
note['id'] = note.pop('update_id')
if 'slides' in note:
note.pop('slides')
del(note['base'])
del(note['workbase'])
result = requests.post(self.notify_url,
headers=self.headers,
data=json.dumps(note))
if result.ok:
message = "%s - Notification successful"
if pres_id != note['id']:
message = message + (' (updating %s)' % note['id'])
self.logger.info(message, pres_id)
else:
raise NotificationException(json.dumps(note), result)
return package
def finalize(self, package):
target_id = package['id']
if 'update_id' in package:
target_id = package['update_id']
minpackage = {'id': target_id,
'thumb': package['thumb'],
'sources': package['sources']}
work_path = package['workbase']
# Save the minimal package to disk
with open(os.path.join(work_path, 'package.json'), 'x') as f:
f.write(json.dumps(minpackage))
# Move files to final destination, taking updates into account
target_path = os.path.join(self.storage, target_id)
if os.path.exists(target_path):
temp_path = target_path + '.old'
if os.path.exists(temp_path):
shutil.rmtree(temp_path)
shutil.move(target_path, temp_path)
shutil.move(work_path, target_path)
# Delete queue file and upload data
os.remove(os.path.join(self.queue, target_id))
try:
# if any old error file remove it since it worked
os.remove(os.path.join(self.error, target_id))
except OSError:
pass
shutil.rmtree(package['base'])
return
class NotificationException(Exception):
def __init__(self, package, result):
self.package = package
self.result = result