185 lines
7.1 KiB
Python
185 lines
7.1 KiB
Python
import json
|
|
import logging
|
|
import os
|
|
import requests
|
|
import shutil
|
|
|
|
import ffmpeg
|
|
|
|
import packager
|
|
from transcoder import Transcoder
|
|
|
|
class Pipeline:
|
|
def __init__(self, config):
|
|
self.logger = logging.getLogger('play-daemon')
|
|
self.queue = config['daemon']['queue']
|
|
self.storage = config['daemon']['storage']
|
|
self.error = config['daemon']['error']
|
|
self.processing = config['daemon']['processing']
|
|
self.notify_url = config['daemon']['notify_url']
|
|
token = config['daemon']['notify_token']
|
|
self.headers = {'Authorization': 'Bearer {}'.format(token),
|
|
'Accept': 'application/json',
|
|
'Content-Type': 'application/json'}
|
|
self.packagers = {'cattura': packager.Cattura(config),
|
|
'mediasite': packager.Mediasite(config),
|
|
'manual': packager.Manual(),
|
|
'update': packager.Update()}
|
|
self.transcoder = Transcoder(config)
|
|
|
|
# Main processing entry point
|
|
def process(self, pres_id, pres_data):
|
|
package = None
|
|
ptype = pres_data['type']
|
|
if ptype == 'cleanup':
|
|
# Not a real package but a cleanup job
|
|
data = pres_data['data']
|
|
logging.info("%s - Cleaning up due to error in upload: %s",
|
|
pres_id, data['message'])
|
|
os.remove(data['base'])
|
|
os.remove(os.path.join(self.queue, pres_id))
|
|
return
|
|
elif ptype not in self.packagers:
|
|
# Bail on invalid package types
|
|
self.logger.error('%s - Unknown presentation format: %s',
|
|
pres_id, ptype)
|
|
return
|
|
try:
|
|
# 1. Origin-specific handling
|
|
# Create initial package
|
|
package = self.packagers[ptype].pack(pres_id, pres_data)
|
|
except packager.PackageException as e:
|
|
self.logger.error('%s - Error when packing: %s',
|
|
pres_id, json.dumps(pres_data,
|
|
sort_keys=True,
|
|
indent=2),
|
|
exc_info=e)
|
|
return
|
|
self.logger.info("%s - Package creation successful", pres_id)
|
|
|
|
# 2. Create id dir in processing
|
|
# Add workbase to package
|
|
package = self.mkworkdir(package)
|
|
try:
|
|
# 3. transcoding and thumb generation
|
|
# Put new files in processing
|
|
# Update package with new relative names
|
|
package = self.transcoder.convert(package)
|
|
except ffmpeg.Error as e:
|
|
self.logger.error('%s - Error when transcoding: %s\n%s',
|
|
pres_id, json.dumps(package,
|
|
sort_keys=True,
|
|
indent=2), e.stderr)
|
|
return
|
|
except Exception as e:
|
|
self.logger.error('%s - Error when transcoding: %s',
|
|
pres_id, json.dumps(package,
|
|
sort_keys=True,
|
|
indent=2),
|
|
exc_info=e)
|
|
return
|
|
# 4. stash originals
|
|
# Move original files to processing subdir
|
|
package = self.stash(package)
|
|
try:
|
|
# 5. platform notification
|
|
package = self.notify(package)
|
|
except NotificationException as e:
|
|
message = '%s - Error when notifying platform: %s %s\n' \
|
|
+ 'Package contents: %s'
|
|
self.logger.error(message,
|
|
pres_id,
|
|
e.result.status_code, e.result.text,
|
|
json.dumps(package,
|
|
sort_keys=True,
|
|
indent=2),
|
|
exc_info=e)
|
|
return
|
|
# 6. finalize processing
|
|
# Remove base, workbase, update_id, notification_id
|
|
# Save package to disk
|
|
# Move processing dir to processed
|
|
# Delete queue file
|
|
# Delete base files
|
|
self.finalize(package)
|
|
|
|
# Create a working directory
|
|
def mkworkdir(self, package):
|
|
workbase = os.path.join(self.processing, package['id'])
|
|
if os.path.exists(workbase):
|
|
shutil.rmtree(workbase)
|
|
os.mkdir(workbase)
|
|
package['workbase'] = workbase
|
|
return package
|
|
|
|
# Stash original files and queue file inside presentation directory
|
|
def stash(self, package):
|
|
pres_id = package['id']
|
|
shutil.copytree(package['base'], os.path.join(package['workbase'],
|
|
'originals'))
|
|
shutil.copy(os.path.join(self.queue, pres_id),
|
|
os.path.join(package['workbase'], 'queueitem.json'))
|
|
return package
|
|
|
|
# Notify platform of presentation
|
|
def notify(self, package):
|
|
pres_id = package['id']
|
|
note = dict(package)
|
|
if 'update_id' in note:
|
|
note['id'] = note.pop('update_id')
|
|
if 'slides' in note:
|
|
note.pop('slides')
|
|
del(note['base'])
|
|
del(note['workbase'])
|
|
result = requests.post(self.notify_url,
|
|
headers=self.headers,
|
|
data=json.dumps(note))
|
|
if result.ok:
|
|
message = "%s - Notification successful"
|
|
if pres_id != note['id']:
|
|
message = message + (' (updating %s)' % note['id'])
|
|
self.logger.info(message, pres_id)
|
|
else:
|
|
raise NotificationException(json.dumps(note), result)
|
|
return package
|
|
|
|
def finalize(self, package):
|
|
target_id = package['id']
|
|
if 'update_id' in package:
|
|
target_id = package['update_id']
|
|
|
|
minpackage = {'id': target_id,
|
|
'thumb': package['thumb'],
|
|
'sources': package['sources']}
|
|
|
|
work_path = package['workbase']
|
|
|
|
# Save the minimal package to disk
|
|
with open(os.path.join(work_path, 'package.json'), 'x') as f:
|
|
f.write(json.dumps(minpackage))
|
|
|
|
# Move files to final destination, taking updates into account
|
|
target_path = os.path.join(self.storage, target_id)
|
|
if os.path.exists(target_path):
|
|
temp_path = target_path + '.old'
|
|
if os.path.exists(temp_path):
|
|
shutil.rmtree(temp_path)
|
|
shutil.move(target_path, temp_path)
|
|
shutil.move(work_path, target_path)
|
|
|
|
# Delete queue file and upload data
|
|
os.remove(os.path.join(self.queue, target_id))
|
|
try:
|
|
# if any old error file remove it since it worked
|
|
os.remove(os.path.join(self.error, target_id))
|
|
except OSError:
|
|
pass
|
|
shutil.rmtree(package['base'])
|
|
return
|
|
|
|
|
|
class NotificationException(Exception):
|
|
def __init__(self, package, result):
|
|
self.package = package
|
|
self.result = result
|