234 lines
8.8 KiB
Python
234 lines
8.8 KiB
Python
import json
|
|
import logging
|
|
import os
|
|
import requests
|
|
import shutil
|
|
|
|
import ffmpeg
|
|
|
|
import packager
|
|
from ldap import Ldap
|
|
from transcoder import Transcoder
|
|
from thumbnailer import Thumbnailer
|
|
from updater import Updater
|
|
|
|
class Pipeline:
|
|
def __init__(self, config):
|
|
self.logger = logging.getLogger('play-daemon')
|
|
self.queue = config['daemon']['queue']
|
|
self.storage = config['daemon']['storage']
|
|
self.error = config['daemon']['error']
|
|
self.processing = config['daemon']['processing']
|
|
self.notify_url = config['daemon']['notify_url']
|
|
self.jsonfile = 'package.json'
|
|
token = config['daemon']['notify_token']
|
|
self.headers = {'Authorization': f'Bearer {token}',
|
|
'Accept': 'application/json',
|
|
'Content-Type': 'application/json'}
|
|
self.packagers = {'cattura': packager.Cattura(config),
|
|
'mediasite': packager.Mediasite(config),
|
|
'manual': packager.Manual(),
|
|
'update': packager.Update()}
|
|
self.thumbnailer = Thumbnailer(config['thumbnailer'],
|
|
Ldap(config['ldap']))
|
|
self.transcoder = Transcoder(config)
|
|
self.updater = Updater(self.storage, self.jsonfile)
|
|
|
|
def log_error(self, message_tmpl, pres_id, *args, **kwargs):
|
|
message = message_tmpl % (pres_id, *args)
|
|
self.logger.error(message, **kwargs)
|
|
with open(os.path.join(self.error, pres_id), 'w') as f:
|
|
f.write(message)
|
|
|
|
# Main processing entry point
|
|
def process(self, pres_id, pres_data):
|
|
package = None
|
|
ptype = pres_data['type']
|
|
if ptype == 'cleanup':
|
|
# Not a real package but a cleanup job
|
|
data = pres_data['data']
|
|
self.logger.info('%s - Cleaning up due to error in upload: %s',
|
|
pres_id, data['message'])
|
|
try:
|
|
shutil.rmtree(data['base'])
|
|
except FileNotFoundError as e:
|
|
self.logger.info('%s - Given base attribute not found: %s',
|
|
pres_id, data['base'])
|
|
os.remove(os.path.join(self.queue, pres_id))
|
|
return
|
|
elif ptype not in self.packagers:
|
|
# Bail on invalid package types
|
|
self.log_error('%s - Unknown presentation format: %s',
|
|
pres_id, ptype)
|
|
return
|
|
self.logger.info('%s - Creating %s package', pres_id, ptype)
|
|
try:
|
|
# Origin-specific handling
|
|
# Create initial package
|
|
package = self.packagers[ptype].pack(pres_id, pres_data)
|
|
except packager.PackageException as e:
|
|
self.log_error('%s - Error when packing: %s',
|
|
pres_id,
|
|
json.dumps(pres_data,
|
|
sort_keys=True,
|
|
indent=2),
|
|
exc_info=e)
|
|
return
|
|
self.logger.info('%s - Package creation successful', pres_id)
|
|
|
|
# Create id dir in processing
|
|
# Add workbase to package
|
|
package = self.mkworkdir(package)
|
|
|
|
# Copy any passed subs to workdir
|
|
package = self.pickup_subs(package)
|
|
|
|
# Copy passed thumbnail to workdir or generate one
|
|
package = self.thumbnailer.pickup(package)
|
|
|
|
try:
|
|
# Transcoding and poster generation
|
|
# Put new files in workdir
|
|
# Update package with new relative names
|
|
package = self.transcoder.convert(package)
|
|
except ffmpeg.Error as e:
|
|
self.log_error('%s - Error when transcoding: %s\n%s',
|
|
pres_id,
|
|
json.dumps(package,
|
|
sort_keys=True,
|
|
indent=2),
|
|
e.stderr)
|
|
return
|
|
except Exception as e:
|
|
self.log_error('%s - Error when transcoding: %s',
|
|
pres_id,
|
|
json.dumps(package,
|
|
sort_keys=True,
|
|
indent=2),
|
|
exc_info=e)
|
|
return
|
|
|
|
# If the job is an update, this will integrate unchanged originals
|
|
package = self.updater.integrate_update(package)
|
|
|
|
# Stash originals if we're in debugging mode
|
|
if self.logger.getEffectiveLevel() <= logging.DEBUG:
|
|
package = self.stash(package)
|
|
try:
|
|
# Platform notification
|
|
package = self.notify(package)
|
|
except NotificationException as e:
|
|
message = '%s - Error when notifying platform: %s %s\n' \
|
|
+ 'Package contents: %s'
|
|
self.log_error(message,
|
|
pres_id,
|
|
e.result.status_code,
|
|
e.result.text,
|
|
json.dumps(package,
|
|
sort_keys=True,
|
|
indent=2),
|
|
exc_info=e)
|
|
return
|
|
# Finalize processing
|
|
# Remove base, workbase, orig_id, notification_id
|
|
# Save package to workdir
|
|
# Move workdir to processed
|
|
# Delete queue file
|
|
# Delete base files
|
|
self.finalize(package)
|
|
|
|
# Create a working directory
|
|
def mkworkdir(self, package):
|
|
workbase = os.path.join(self.processing, package['id'])
|
|
if os.path.exists(workbase):
|
|
shutil.rmtree(workbase)
|
|
os.mkdir(workbase)
|
|
package['workbase'] = workbase
|
|
return package
|
|
|
|
# Move subs to working directory
|
|
def pickup_subs(self, package):
|
|
subs = package.get('subtitles', '')
|
|
if subs:
|
|
subname = os.path.basename(subs)
|
|
shutil.copy(os.path.join(package['base'], subs),
|
|
os.path.join(package['workbase'], subname))
|
|
package['subtitles'] = subname
|
|
return package
|
|
|
|
# Stash original files and queue file inside presentation directory
|
|
def stash(self, package):
|
|
pres_id = package['id']
|
|
workbase = package['workbase']
|
|
shutil.copy(os.path.join(self.queue, pres_id),
|
|
os.path.join(workbase, 'queueitem.json'))
|
|
if 'orig_id' in package:
|
|
shutil.copy(os.path.join(self.storage, package['orig_id']),
|
|
os.path.join(workbase, 'originals'))
|
|
shutil.copytree(package['base'],
|
|
os.path.join(workbase, 'incoming'))
|
|
return package
|
|
|
|
# Notify platform of presentation
|
|
def notify(self, package):
|
|
pres_id = package['id']
|
|
note = dict(package)
|
|
if 'orig_id' in note:
|
|
note['id'] = note.pop('orig_id')
|
|
if 'slides' in note:
|
|
note.pop('slides')
|
|
del(note['base'])
|
|
del(note['workbase'])
|
|
result = requests.post(self.notify_url,
|
|
headers=self.headers,
|
|
data=json.dumps(note))
|
|
if result.ok:
|
|
message = '%s - Notification successful'
|
|
if pres_id != note['id']:
|
|
message = '%s (updating %s)' % (message, note['id'])
|
|
self.logger.info(message, pres_id)
|
|
else:
|
|
raise NotificationException(json.dumps(note), result)
|
|
return package
|
|
|
|
def finalize(self, package):
|
|
target_id = package['id']
|
|
if 'orig_id' in package:
|
|
target_id = package['orig_id']
|
|
|
|
minpackage = {'id': target_id,
|
|
'thumb': package['thumb'],
|
|
'subtitles': package['subtitles'],
|
|
'sources': package['sources']}
|
|
|
|
workbase = package['workbase']
|
|
|
|
# Save the minimal package to disk
|
|
with open(os.path.join(workbase, self.jsonfile), 'x') as f:
|
|
f.write(json.dumps(minpackage))
|
|
|
|
target_path = os.path.join(self.storage, target_id)
|
|
if 'orig_id' in package:
|
|
# The package is an update; old data exists
|
|
# and is to be replaced
|
|
shutil.rmtree(target_path)
|
|
shutil.move(workbase, target_path)
|
|
|
|
# Delete queue file and upload data
|
|
os.remove(os.path.join(self.queue, target_id))
|
|
|
|
# Remove any error file since everything has worked now
|
|
errorfile = os.path.join(self.error, target_id)
|
|
if os.path.isfile(errorfile):
|
|
os.remove(errorfile)
|
|
|
|
# Remove files from incoming
|
|
shutil.rmtree(package['base'])
|
|
return
|
|
|
|
|
|
class NotificationException(Exception):
|
|
def __init__(self, package, result):
|
|
self.package = package
|
|
self.result = result
|