import logging import multiprocessing as mp import os import shutil import time import ffmpeg class Transcoder: def __init__(self, config): self.jobs = int(config['transcoder']['jobs']) self.logger = logging.getLogger('play-daemon') self.worker = _Worker(self.logger) self.variants = {720: (26, 'fast'), 1080: (26, 'fast')} def convert(self, package): base = package['base'] workbase = package['workbase'] package_id = package['id'] if package['thumb']: thumbpath = package['thumb'] thumbname = "thumb{}".format(os.path.splitext(thumbpath)[1]) oldpath = os.path.join(base, thumbpath) newpath = os.path.join(workbase, thumbname) shutil.copy2(oldpath, newpath) package['thumb'] = thumbname pending = [] self.logger.info("%s - Started transcoding", package_id) with mp.Pool(processes=self.jobs) as pool: self.logger.debug("%s - Pool created", package_id) for stream in package['sources']: transcodes = {} # If the strean is a colection of slide images that has to be converted to a video if 'demux_file' in stream: self.logger.debug("%s - Processing stream %s", package_id, 'slides job in demux.txt') # Create the different variants for maxheight in self.variants: crf, preset = self.variants[maxheight] # Call the _Worker function asyncronously transcodes[maxheight] = pool.apply_async(_Worker.make_slides_video, (self.worker, package_id, stream['demux_file'], workbase, maxheight, preset, crf)) _, ext = os.path.splitext(stream['poster']) slides_thumb = 'slides_thumb{}'.format(ext) shutil.copy2(stream['poster'],os.path.join(workbase, slides_thumb)) stream['poster'] = slides_thumb # Remove the reference to the demuxfile since it is no longer needed stream.pop('demux_file') # If the strean is a regualr video it needs to be transcoded with new resolution elif 'video' in stream: streampath_rel = stream['video'] streampath_abs = os.path.join(base, streampath_rel) self.logger.debug("%s - Processing stream %s", package_id, streampath_rel) # Create the different variants for maxheight in self.variants: crf, preset = self.variants[maxheight] # Call the _Worker function asyncronously transcodes[maxheight] = pool.apply_async(_Worker.transcode, (self.worker, package_id, streampath_abs, workbase, maxheight, preset, crf)) thumbpath_rel = stream['poster'] if thumbpath_rel: _, ext = os.path.splitext(thumbpath_rel) thumbbase, _ = os.path.splitext(os.path.basename(streampath_abs)) thumbname = '{}{}'.format(thumbbase, ext) shutil.copy2(os.path.join(base, thumbpath_rel), os.path.join(workbase, thumbname)) stream['poster'] = thumbname else: thumbjob = pool.apply_async(_Worker.make_thumb, (self.worker, streampath_abs, workbase)) transcodes['poster'] = thumbjob # Store the jobs and streams in the pending array pending.append({'jobs': transcodes, 'data': stream}) # Close the pool pool.close() pool.join() self.logger.info("%s - Finished transcoding", package_id) package['sources'] = [] for item in pending: stream = item['data'] jobs = item['jobs'] streams = {} for maxheight in self.variants: # Get the jobs. If not done wait until they are ready. streams[maxheight] = jobs[maxheight].get() stream['video'] = streams if 'poster' in jobs: stream['poster'] = jobs['poster'].get() if not package['thumb'] and stream['playAudio']: package['thumb'] = stream['poster'] package['sources'].append(stream) # Return the finished streams return package class _Worker: def __init__(self, logger): self.thumbtime = 1 self.logger = logger def _has_audio(self, instream): for stream in ffmpeg.probe(instream)['streams']: if stream['codec_type'] == 'audio': return True return False def make_thumb(self, instream, outdir): name = os.path.basename(instream) name, _ = os.path.splitext(name) thumb = '{}-thumb.jpg'.format(name) outpath = os.path.join(outdir, thumb) ffmpeg.input(instream, ss=self.thumbtime) \ .output(outpath, vframes=1) \ .run(quiet=True) self.logger.debug("Thumb created.") return thumb def transcode(self, package_id, instream, outdir, maxheight, preset, crf): self.logger.debug("%s - %s - Preparing stream for processing", package_id, instream) name = os.path.basename(instream) name, ext = os.path.splitext(name) outstream = '{}-{}-{}-{}.mp4'.format(name, maxheight, preset, crf) outpath = os.path.join(outdir, outstream) self.logger.debug("%s - %s - Building filter graph", package_id, instream) initial = ffmpeg.input(instream) scaled = initial.video.filter('scale', height='min(in_h, {})'.format(maxheight), width=-2) joined = scaled if self._has_audio(instream): joined = ffmpeg.concat(scaled, initial.audio, v=1, a=1) result = joined.output(outpath, vcodec='libx264', acodec='aac', audio_bitrate='160k', movflags='+faststart', preset=preset, crf=crf) self.logger.debug("%s - %s - Calling ffmpeg", package_id, instream) start = time.time() quiet = True if self.logger.getEffectiveLevel() < logging.INFO: quiet = False result.run(quiet=quiet) runtime = time.time() - start self.logger.info("%s - Transcoded %s in %s seconds", package_id, outstream, runtime) return outstream # Function to make a video from images used for the slideshow def make_slides_video(self, package_id, demux_file, outdir, maxheight, preset, crf): self.logger.debug("%s - Preparing slides for processing", package_id) outstream = '{}-{}-{}-{}.mp4'.format('slides', maxheight, preset, crf) outpath = os.path.join(outdir, outstream) self.logger.debug("%s - Building slide video from demux.txt", package_id) quiet = True if self.logger.getEffectiveLevel() < logging.INFO: quiet = False self.logger.debug("%s - Calling ffmpeg", package_id) start = time.time() # https://ffmpeg.org/ffmpeg-formats.html#toc-concat-1 # https://github.com/kkroening/ffmpeg-python/tree/master/examples (ffmpeg .input(demux_file, safe=0) .filter('scale', height='min(in_h, {})'.format(maxheight), width=-2) .output(outpath, crf=crf, preset=preset, pix_fmt='yuv420p') .overwrite_output() .run(quiet=quiet)) runtime = time.time() - start self.logger.info("%s - Transcoded %s in %s seconds", package_id, outstream, runtime) return outstream