import json import logging import os import requests import shutil import ffmpeg import packager from ldap import Ldap from transcoder import Transcoder from thumbnailer import Thumbnailer class Pipeline: def __init__(self, config): self.logger = logging.getLogger('play-daemon') self.queue = config['daemon']['queue'] self.storage = config['daemon']['storage'] self.error = config['daemon']['error'] self.processing = config['daemon']['processing'] self.notify_url = config['daemon']['notify_url'] token = config['daemon']['notify_token'] self.headers = {'Authorization': f'Bearer {token}', 'Accept': 'application/json', 'Content-Type': 'application/json'} self.packagers = {'cattura': packager.Cattura(config), 'mediasite': packager.Mediasite(config), 'manual': packager.Manual(), 'update': packager.Update()} self.thumbnailer = Thumbnailer(config['thumbnailer'], Ldap(config['ldap'])) self.transcoder = Transcoder(config) def log_error(self, message_tmpl, pres_id, *args, **kwargs): message = message_tmpl % (pres_id, *args) self.logger.error(message, **kwargs) with open(os.path.join(self.error, pres_id), 'w') as f: f.write(message) # Main processing entry point def process(self, pres_id, pres_data): package = None ptype = pres_data['type'] if ptype == 'cleanup': # Not a real package but a cleanup job data = pres_data['data'] self.logger.info('%s - Cleaning up due to error in upload: %s', pres_id, data['message']) try: shutil.rmtree(data['base']) except FileNotFoundError as e: self.logger.info('%s - Given base attribute not found: %s', pres_id, data['base']) os.remove(os.path.join(self.queue, pres_id)) return elif ptype not in self.packagers: # Bail on invalid package types self.log_error('%s - Unknown presentation format: %s', pres_id, ptype) return self.logger.info('%s - Creating %s package', pres_id, ptype) try: # Origin-specific handling # Create initial package package = self.packagers[ptype].pack(pres_id, pres_data) except packager.PackageException as e: self.log_error('%s - Error when packing: %s', pres_id, json.dumps(pres_data, sort_keys=True, indent=2), exc_info=e) return self.logger.info('%s - Package creation successful', pres_id) # Create id dir in processing # Add workbase to package package = self.mkworkdir(package) # Create thumbnail if none exists package = self.thumbnailer.create_if_missing(package) try: # Transcoding and thumb generation # Put new files in processing # Update package with new relative names package = self.transcoder.convert(package) except ffmpeg.Error as e: self.log_error('%s - Error when transcoding: %s\n%s', pres_id, json.dumps(package, sort_keys=True, indent=2), e.stderr) return except Exception as e: self.log_error('%s - Error when transcoding: %s', pres_id, json.dumps(package, sort_keys=True, indent=2), exc_info=e) return # Pick up any passed subs package = self.pickup_subs(package) # Stash originals if we're in debugging mode if self.logger.getEffectiveLevel() <= logging.DEBUG: package = self.stash(package) try: # Platform notification package = self.notify(package) except NotificationException as e: message = '%s - Error when notifying platform: %s %s\n' \ + 'Package contents: %s' self.log_error(message, pres_id, e.result.status_code, e.result.text, json.dumps(package, sort_keys=True, indent=2), exc_info=e) return # Finalize processing # Remove base, workbase, orig_id, notification_id # Save package to disk # Move processing dir to processed # Delete queue file # Delete base files self.finalize(package) # Create a working directory def mkworkdir(self, package): workbase = os.path.join(self.processing, package['id']) if os.path.exists(workbase): shutil.rmtree(workbase) os.mkdir(workbase) package['workbase'] = workbase return package # Move subs to working directory def pickup_subs(self, package): subs = package.get('subtitles', '') if subs: subname = os.path.basename(subs) shutil.copy(os.path.join(package['base'], subs), os.path.join(package['workbase'], subname)) package['subtitles'] = subname return package # Stash original files and queue file inside presentation directory def stash(self, package): pres_id = package['id'] shutil.copytree(package['base'], os.path.join(package['workbase'], 'originals')) shutil.copy(os.path.join(self.queue, pres_id), os.path.join(package['workbase'], 'queueitem.json')) return package # Notify platform of presentation def notify(self, package): pres_id = package['id'] note = dict(package) if 'orig_id' in note: note['id'] = note.pop('orig_id') if 'slides' in note: note.pop('slides') del(note['base']) del(note['workbase']) result = requests.post(self.notify_url, headers=self.headers, data=json.dumps(note)) if result.ok: message = '%s - Notification successful' if pres_id != note['id']: message = '%s (updating %s)' % (message, note['id']) self.logger.info(message, pres_id) else: raise NotificationException(json.dumps(note), result) return package def finalize(self, package): target_id = package['id'] if 'orig_id' in package: target_id = package['orig_id'] minpackage = {'id': target_id, 'thumb': package['thumb'], 'sources': package['sources']} work_path = package['workbase'] # Save the minimal package to disk with open(os.path.join(work_path, 'package.json'), 'x') as f: f.write(json.dumps(minpackage)) # Move files to final destination, taking updates into account target_path = os.path.join(self.storage, target_id) if os.path.exists(target_path): temp_path = target_path + '.old' if os.path.exists(temp_path): shutil.rmtree(temp_path) shutil.move(target_path, temp_path) shutil.move(work_path, target_path) # Delete queue file and upload data os.remove(os.path.join(self.queue, target_id)) try: # if any old error file remove it since it worked os.remove(os.path.join(self.error, target_id)) except OSError: pass shutil.rmtree(package['base']) return class NotificationException(Exception): def __init__(self, package, result): self.package = package self.result = result