Compare commits

...

2 Commits

5 changed files with 128 additions and 119 deletions

@ -1,19 +1,36 @@
from .handler import Handler
from .transcode import TranscodeHandler
from ..exceptions import ValidationException
from ..utils import make_poster
from os import path, remove, rename
from shutil import copy2
import ffmpeg
from .handler import Handler
from .transcode import TranscodeHandler
from ..exceptions import ValidationException
def make_poster(inpath, outdir):
"""
Create a poster from the video file at inpath.
"""
indir, infile = path.split(inpath)
infile_noext, _ = path.splitext(infile)
outfile = f"{infile_noext}-poster.jpg"
outpath = path.join(outdir, outfile)
ffmpeg.input(inpath, ss=1) \
.output(outpath, vframes=1) \
.run(quiet=True)
return (outdir, outfile)
@Handler.register
class PosterHandler(Handler):
'''
"""
This class generates poster files and imports uploaded ones.
'''
"""
def wants(self, jobspec, existing_package):
'''
"""
Return True if this handler wants to process this jobspec.
Raises an exception if the job is wanted but doesn't pass validation.
@ -21,7 +38,7 @@ class PosterHandler(Handler):
- a 'poster' key exists in a source
- a video has been uploaded for a source without a
corresponding poster
'''
"""
if 'sources' in jobspec:
# A poster has been uploaded
uploaded = {name: source['poster']
@ -35,13 +52,13 @@ class PosterHandler(Handler):
return False
def _validate(self, jobspec, existing_package, uploaded):
'''
"""
Return True if the job is valid for this handler.
Validity requirements are:
- 'upload_dir' must exist if a 'poster' value is truthy
- all truthy 'poster' values must point to valid files
'''
"""
super()._validate(jobspec, existing_package)
if uploaded:
if 'upload_dir' not in jobspec:
@ -54,25 +71,25 @@ class PosterHandler(Handler):
return True
def apply_after(self, handler):
'''
"""
Return True if this handler must be applied after
the supplied handler.
This handler must be applied after the transcode handler,
so posters can be generated from the right files.
'''
"""
if isinstance(handler, TranscodeHandler):
return True
return False
def _handle(self, jobspec, existing_package, tempdir):
'''
"""
Return a function to apply changes to the stored package.
The returned function generates posters as appropriate, moves poster
files into the package's basedir and updates the package metadata.
Replaced posters are deleted.
'''
"""
def apply_func(package):
uploaded = {name: source['poster']
for name, source in jobspec['sources'].items()
@ -121,14 +138,14 @@ class PosterHandler(Handler):
return apply_func
def _needs_poster(source):
'''
"""
Returns True if a poster should be generated for this source,
otherwise False.
Poster should be generated if:
- a poster key has been passed with falsy value
- a video has been uploaded without a corresponding poster file
'''
"""
if 'poster' in source and not source['poster']:
return True
if 'video' in source and not source.get('poster', None):

@ -1,19 +1,42 @@
from os import path, rename
from shutil import rmtree
import ffmpeg
from .handler import Handler
from ..exceptions import ValidationException
from ..utils import make_poster, make_slides_video
from .poster import make_poster
'''
This class processes slides into video streams.
def make_slides_video(basedir, demux_file, outpath,
maxheight, preset, crf):
"""
Make a video from slides based on a demux file.
Intended to be called asynchronously from a handler thread.
"""
chdir(basedir)
# https://ffmpeg.org/ffmpeg-formats.html#toc-concat-1
initial = ffmpeg.input(demux_file)
scaled = initial.filter('scale',
height=f'min(in_h, {maxheight})',
width=-2)
result = scaled.output(outpath,
crf=crf,
preset=preset,
pix_fmt='yuv420p')
# Need to pass an input string to not break terminal
return result.run(input='', quiet=True)
In order to accept a job as valid, the fields 'pkg_id', 'upload_dir' and
'slides' must exist.
'''
@Handler.register
class SlidesHandler(Handler):
"""
This class processes slides into video streams.
In order to accept a job as valid, the fields 'pkg_id', 'upload_dir' and
'slides' must exist.
"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# These keys are strings in order to escape json validation hell

@ -1,16 +1,53 @@
from os import path, rename
from shutil import rmtree
import ffmpeg
from .handler import Handler
from ..exceptions import ValidationException
from ..utils import do_transcode
def do_transcode(inpath, outpath, maxheight, crf, preset):
"""
Transcode a video file.
Intended to be called asynchronously from a handler thread.
"""
initial = ffmpeg.input(inpath)
scaled = initial.video.filter('scale',
height=f'min(in_h, {maxheight})',
width=-2)
joined = scaled
if _has_audio(inpath):
joined = ffmpeg.concat(scaled, initial.audio, v=1, a=1)
result = joined.output(outpath,
vcodec='libx264',
acodec='aac',
audio_bitrate='160k',
movflags='+faststart',
preset=preset,
crf=crf)
#print(f"FFmpeg commandline: {' '.join(result.compile())}")
# Need to pass an input string to not break terminal
return result.run(input='', quiet=True)
def _has_audio(filepath):
"""
Helper function to determine whether a video file has an audio
stream or not.
"""
for stream in ffmpeg.probe(filepath)['streams']:
if stream['codec_type'] == 'audio':
return True
return False
@Handler.register
class TranscodeHandler(Handler):
'''
"""
This class handles video transcoding.
'''
"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# These keys are strings in order to escape json validation hell
@ -18,13 +55,13 @@ class TranscodeHandler(Handler):
'1080': (26, 'fast')}
def wants(self, jobspec, existing_package):
'''
"""
Return True if this handler wants to process this jobspec.
Raises an exception if the job is wanted but doesn't pass validation.
A job is wanted if a source has been passed that contains
a 'video' key.
'''
"""
if 'sources' in jobspec and any('video' in source
for source
in jobspec['sources'].values()):
@ -32,13 +69,13 @@ class TranscodeHandler(Handler):
return False
def _validate(self, jobspec, existing_package):
'''
"""
Return True if the job is valid for this handler.
Validity requirements are:
- 'upload_dir' must exist
- all 'video' values must point to valid files
'''
"""
super()._validate(jobspec, existing_package)
if 'upload_dir' not in jobspec:
raise ValidationException(f"upload_dir missing")
@ -50,14 +87,14 @@ class TranscodeHandler(Handler):
return True
def _handle(self, jobspec, existing_package, tempdir):
'''
"""
Return a function to apply changes to the stored package.
All transcoding jobs are run before apply_func is returned. apply_func
then moves the generated files into the package's basedir and updates
its metadata. Old files for affectes sources are removed, and the
package's 'duration' field is updated.
'''
"""
transcodes = []
resultfiles = {}

@ -1,12 +1,10 @@
from os import chdir, path
from types import FunctionType
import ffmpeg
'''
"""
The canonical structure of a package going into the pipeline.
'''
"""
canonical_jobspec = {
'pkg_id': str,
'upload_dir': str,
@ -27,9 +25,9 @@ canonical_jobspec = {
'slides': str,
}
'''
"""
The canonical structure of a fully populated package as stored on disk.
'''
"""
canonical_manifest = {
'title': {'en': str,
'sv': str},
@ -52,14 +50,14 @@ canonical_manifest = {
}
'''
Validate the structure of a json-like object (data) against an expected
format (structure), throwing an exception on invalid structure.
The data object need not contain all keys in the format, but provided keys
must match the format and have valid values.
'''
def raise_for_structure(data, structure):
"""
Validate the structure of a json-like object (data) against an expected
format (structure), throwing an exception on invalid structure.
The data object need not contain all keys in the format, but provided keys
must match the format and have valid values.
"""
if isinstance(structure, type):
# Base case, validate that data type matches structure type
if not isinstance(data, structure):
@ -99,80 +97,11 @@ def raise_for_structure(data, structure):
raise_for_structure(data[k], structure[k])
return True
'''
ConfigParser objects implement the dict.get() interface in a highly surprising
manner for sections, so we need a wrapper.
'''
def get_section(config, name):
"""
ConfigParser objects implement the dict.get() interface in a highly
surprising manner for sections, so we need a wrapper.
"""
if name in config:
return config[name]
return {}
'''
Make a video from slides based on a demux file.
Intended to be called asynchronously from a handler thread.
'''
def make_slides_video(basedir, demux_file, outpath,
maxheight, preset, crf):
chdir(basedir)
# https://ffmpeg.org/ffmpeg-formats.html#toc-concat-1
initial = ffmpeg.input(demux_file)
scaled = initial.filter('scale',
height=f'min(in_h, {maxheight})',
width=-2)
result = scaled.output(outpath,
crf=crf,
preset=preset,
pix_fmt='yuv420p')
# Need to pass an input string to not break terminal
return result.run(input='', quiet=True)
'''
Transcode a video file.
Intended to be called asynchronously from a handler thread.
'''
def do_transcode(inpath, outpath, maxheight, crf, preset):
initial = ffmpeg.input(inpath)
scaled = initial.video.filter('scale',
height=f'min(in_h, {maxheight})',
width=-2)
joined = scaled
if _has_audio(inpath):
joined = ffmpeg.concat(scaled, initial.audio, v=1, a=1)
result = joined.output(outpath,
vcodec='libx264',
acodec='aac',
audio_bitrate='160k',
movflags='+faststart',
preset=preset,
crf=crf)
#print(f"FFmpeg commandline: {' '.join(result.compile())}")
# Need to pass an input string to not break terminal
return result.run(input='', quiet=True)
'''
Helper function to determine whether a video file has an audio stream or not.
'''
def _has_audio(filepath):
for stream in ffmpeg.probe(filepath)['streams']:
if stream['codec_type'] == 'audio':
return True
return False
'''
Create a poster from the video file at inpath.
'''
def make_poster(inpath, outdir):
indir, infile = path.split(inpath)
infile_noext, _ = path.splitext(infile)
outfile = f"{infile_noext}-poster.jpg"
outpath = path.join(outdir, outfile)
ffmpeg.input(inpath, ss=1) \
.output(outpath, vframes=1) \
.run(quiet=True)
return (outdir, outfile)

@ -193,6 +193,7 @@ class PipelineTest(DaemonTest):
def wait_for_result(self, jobid, handlers, timeout=5):
resultfiles = {}
final_result = None
for handler in handlers:
resultfiles = {path.join(self.notify_url,
f"{jobid}.{handler}"): False
@ -207,7 +208,6 @@ class PipelineTest(DaemonTest):
else:
self.fail(f"No result produced after {_try} seconds")
final_result = None
for resultfile in resultfiles:
with open(resultfile) as f:
try:
@ -220,6 +220,9 @@ class PipelineTest(DaemonTest):
# Validate that this is the correct resultfile
self.assertEqual(jobid, result['jobid'])
if result['type'] == 'error':
final_result = result
break
if result['pending'] == []:
final_result = result