import json import logging import os import requests import shutil import ffmpeg import packager from transcoder import Transcoder class Pipeline: def __init__(self, config): self.logger = logging.getLogger('play-daemon') self.queue = config['daemon']['queue'] self.storage = config['daemon']['storage'] self.error = config['daemon']['error'] self.processing = config['daemon']['processing'] self.notify_url = config['daemon']['notify_url'] token = config['daemon']['notify_token'] self.headers = {'Authorization': 'Bearer {}'.format(token), 'Accept': 'application/json', 'Content-Type': 'application/json'} self.packagers = {'cattura': packager.Cattura(config), 'mediasite': packager.Mediasite(config), 'manual': packager.Manual(), 'update': packager.Update()} self.transcoder = Transcoder(config) # Main processing entry point def process(self, pres_id, pres_data): package = None ptype = pres_data['type'] if ptype == 'cleanup': # Not a real package but a cleanup job data = pres_data['data'] logging.info("%s - Cleaning up due to error in upload: %s", pres_id, data['message']) os.remove(data['base']) os.remove(os.path.join(self.queue, pres_id)) return elif ptype not in self.packagers: # Bail on invalid package types self.logger.error('%s - Unknown presentation format: %s', pres_id, ptype) return try: # 1. Origin-specific handling # Create initial package package = self.packagers[ptype].pack(pres_id, pres_data) except packager.PackageException as e: self.logger.error('%s - Error when packing: %s', pres_id, json.dumps(pres_data, sort_keys=True, indent=2), exc_info=e) return self.logger.info("%s - Package creation successful", pres_id) # 2. Create id dir in processing # Add workbase to package package = self.mkworkdir(package) try: # 3. transcoding and thumb generation # Put new files in processing # Update package with new relative names package = self.transcoder.convert(package) except ffmpeg.Error as e: self.logger.error('%s - Error when transcoding: %s\n%s', pres_id, json.dumps(package, sort_keys=True, indent=2), e.stderr) return except Exception as e: self.logger.error('%s - Error when transcoding: %s', pres_id, json.dumps(package, sort_keys=True, indent=2), exc_info=e) return # 4. stash originals # Move original files to processing subdir package = self.stash(package) try: # 5. platform notification package = self.notify(package) except NotificationException as e: message = '%s - Error when notifying platform: %s %s\n' \ + 'Package contents: %s' self.logger.error(message, pres_id, e.result.status_code, e.result.text, json.dumps(package, sort_keys=True, indent=2), exc_info=e) return # 6. finalize processing # Remove base, workbase, update_id, notification_id # Save package to disk # Move processing dir to processed # Delete queue file # Delete base files self.finalize(package) # Create a working directory def mkworkdir(self, package): workbase = os.path.join(self.processing, package['id']) if os.path.exists(workbase): shutil.rmtree(workbase) os.mkdir(workbase) package['workbase'] = workbase return package # Stash original files and queue file inside presentation directory def stash(self, package): pres_id = package['id'] shutil.copytree(package['base'], os.path.join(package['workbase'], 'originals')) shutil.copy(os.path.join(self.queue, pres_id), os.path.join(package['workbase'], 'queueitem.json')) return package # Notify platform of presentation def notify(self, package): pres_id = package['id'] note = dict(package) if 'update_id' in note: note['id'] = note.pop('update_id') if 'slides' in note: note.pop('slides') del(note['base']) del(note['workbase']) result = requests.post(self.notify_url, headers=self.headers, data=json.dumps(note)) if result.ok: message = "%s - Notification successful" if pres_id != note['id']: message = message + (' (updating %s)' % note['id']) self.logger.info(message, pres_id) else: raise NotificationException(json.dumps(note), result) return package def finalize(self, package): target_id = package['id'] if 'update_id' in package: target_id = package['update_id'] minpackage = {'id': target_id, 'thumb': package['thumb'], 'sources': package['sources']} work_path = package['workbase'] # Save the minimal package to disk with open(os.path.join(work_path, 'package.json'), 'x') as f: f.write(json.dumps(minpackage)) # Move files to final destination, taking updates into account target_path = os.path.join(self.storage, target_id) if os.path.exists(target_path): temp_path = target_path + '.old' if os.path.exists(temp_path): shutil.rmtree(temp_path) shutil.move(target_path, temp_path) shutil.move(work_path, target_path) # Delete queue file and upload data os.remove(os.path.join(self.queue, target_id)) try: # if any old error file remove it since it worked os.remove(os.path.join(self.error, target_id)) except OSError: pass shutil.rmtree(package['base']) return class NotificationException(Exception): def __init__(self, package, result): self.package = package self.result = result