Erik Thuning cbf29c4962 Implementing a unified job pool for transcodes and subtitles never panned out,
so the code was just adding unnecessary complexity. The pipeline now uses
mp.pool to manage ffmpeg jobs as before.

This reverts commit f91109fb3e4795ce652b966ba0563ba4e100b3d1 and deletes the
WorkThread class and its associated tests.
2024-10-17 11:34:00 +02:00

211 lines
7.4 KiB
Python

import logging
from os import path, remove, rename
import ffmpeg
from .handler import Handler
from ..exceptions import FFmpegException, ValidationException
def _has_audio(filepath):
"""
Helper function to determine whether a video file has an audio
stream or not.
"""
for stream in ffmpeg.probe(filepath)['streams']:
if stream['codec_type'] == 'audio':
return True
return False
def _get_dimensions(filepath):
"""
Helper function to get the dimensions of a video file.
Returns a tuple (width, height).
If no video stream is present, returns (0,0).
"""
for stream in ffmpeg.probe(filepath)['streams']:
if stream['codec_type'] == 'video':
return (int(stream['width']), int(stream['height']))
return (0,0)
def _get_videocodec(filepath):
"""
Helper function to get the video codec for a file.
Returns the codec as a string, or None if no video stream is present.
"""
for stream in ffmpeg.probe(filepath)['streams']:
if stream['codec_type'] == 'video':
return stream['codec_name']
return None
def _scale(insize, maxheight):
"""
Scale the given (height, width) tuple from insize so the
height is at most maxheight, retaining aspect ratio.
Returns a new tuple (scaled_width, scaled_height, scale_factor).
"""
width, height = insize
scaled_height = min(maxheight, height)
scale_factor = scaled_height / height
scaled_width = int(width * scale_factor)
return (scaled_width, scaled_height, scale_factor)
def _do_transcode(inpath, outpath, maxheight):
"""
Transcode a video file.
Intended to be called asynchronously from a handler thread.
"""
codec = _get_videocodec(inpath)
width, height, scale = _scale(_get_dimensions(inpath), maxheight)
audiosettings = {'c:a': 'aac',
'audio_bitrate': '160k'}
videosettings_in = {}
videosettings_out = {'c:v': 'libx264',
# Quality settings
'movflags': '+faststart',
'preset': 'fast',
'crf': 26,
# Ensure correct container format
'f': 'mp4'}
# Scale the output if necessary
if scale < 1:
# The width must be an even number.
# Setting width to -2 will allow ffmpeg to automatically deal with
# any scalings where the width would have become odd.
videosettings_out['vf'] = f'scale=-2:{height}'
logger = logging.getLogger('play-daemon.TranscodeHandler._do_transcode')
logger.info('Starting ffmpeg transcode job for %s', inpath)
try:
initial = ffmpeg.input(inpath, **videosettings_in)
result = initial.output(outpath,
**videosettings_out,
**audiosettings)
if logger.getEffectiveLevel() <= logging.INFO:
cmdline = ' '.join(result.compile())
logger.info('FFmpeg commandline: %s', cmdline)
# Need to pass an input string to not break terminal
return result.run(input='', quiet=True)
except ffmpeg.Error as e:
raise FFmpegException(inpath, outpath, maxheight, e.stderr)
@Handler.register
class TranscodeHandler(Handler):
"""
This class handles video transcoding.
"""
# These keys are strings in order to escape json validation hell
variants = ['720', '1080']
@classmethod
def wants(cls, jobspec, existing_package):
"""
Return True if this handler wants to process this jobspec.
Raises an exception if the job is wanted but doesn't pass validation.
A job is wanted if a source has been passed that contains
a 'video' key.
"""
if 'sources' in jobspec and any('video' in source
for source
in jobspec['sources'].values()):
return cls._validate(jobspec, existing_package)
return False
@classmethod
def _validate(cls, jobspec, existing_package):
"""
Return True if the job is valid for this handler.
Validity requirements are:
- 'upload_dir' must exist
- all 'video' values must point to valid files
"""
super()._validate(jobspec, existing_package)
if 'upload_dir' not in jobspec:
raise ValidationException("upload_dir missing")
for name, source in jobspec['sources'].items():
if not path.isfile(path.join(jobspec['upload_dir'],
source['video'])):
raise ValidationException(
f"video file invalid for source {name}")
return True
def _handle(self, jobspec, existing_package, tempdir):
"""
Return a function to apply changes to the stored package.
All transcoding jobs are run before apply_func is returned. apply_func
then moves the generated files into the package's basedir and updates
its metadata. Old files for affectes sources are removed, and the
package's 'duration' field is updated.
"""
transcodes = []
resultfiles = {}
# Start the transcode jobs
for sourcename, source in jobspec['sources'].items():
if 'video' in source:
resultfiles[sourcename] = {}
inpath = path.join(jobspec['upload_dir'], source['video'])
infile = path.basename(inpath)
name_noext, ext = path.splitext(infile)
for maxheight in self.variants:
outfile = f'{name_noext}-{maxheight}.mp4'
outpath = path.join(tempdir, outfile)
args = (inpath,
outpath,
int(maxheight))
transcode = self.asyncjob(_do_transcode, args)
transcodes.append(transcode)
resultfiles[sourcename][maxheight] = outfile
self.logger.info("Waiting for transcodes")
for item in transcodes:
item.get()
self.logger.info("Done, making apply_func")
def apply_func(package):
sources = package['sources']
for name, variants in resultfiles.items():
if name in sources:
if 'video' in sources[name]:
# Remove existing variant files
oldvariants = sources[name]['video']
for resolution in list(oldvariants.keys()):
remove(path.join(package.basedir,
oldvariants.pop(resolution)))
else:
# Initialize a new source
# set playAudio to False by default so we don't conflict
# with the audio handler, set enabled to True so we don't
# conflict with the visibility handler
sources[name] = {'playAudio': False,
'enabled': True}
# Save new source videos
sources[name]['video'] = variants
for resultfile in variants.values():
rename(path.join(tempdir, resultfile),
path.join(package.basedir, resultfile))
package.set_duration()
return apply_func