cbf29c4962
so the code was just adding unnecessary complexity. The pipeline now uses
mp.pool to manage ffmpeg jobs as before.
This reverts commit f91109fb3e
and deletes the
WorkThread class and its associated tests.
526 lines
21 KiB
Python
Executable File
526 lines
21 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
|
|
from configparser import ConfigParser
|
|
from glob import iglob
|
|
from itertools import cycle
|
|
from os import makedirs, path
|
|
from shutil import copyfile, rmtree
|
|
from time import sleep, strftime
|
|
|
|
import json
|
|
import unittest
|
|
import uuid
|
|
|
|
from pipeline import Pipeline
|
|
from pipeline.package import PackageManager
|
|
from pipeline.utils import raise_for_structure, canonical_manifest
|
|
|
|
|
|
filesdir = 'testfiles'
|
|
configfile = 'config.ini'
|
|
subssubdir = 'subs'
|
|
videosubdir = 'video'
|
|
imagesubdir = 'image'
|
|
subsglob = path.join(filesdir, subssubdir, '*.vtt')
|
|
videoglob = path.join(filesdir, videosubdir, '*')
|
|
imageglob = path.join(filesdir, imagesubdir, '*')
|
|
|
|
|
|
class DaemonTest(unittest.TestCase):
|
|
def setUp(self):
|
|
config = ConfigParser(empty_lines_in_values=False)
|
|
config.read(path.join(filesdir, configfile))
|
|
|
|
self.testbase = f"./{str(uuid.uuid4())}"
|
|
pipeconf = {('Pipeline', 'queuedir'): path.join(self.testbase,
|
|
'queue'),
|
|
('Pipeline', 'uploaddir'): path.join(self.testbase,
|
|
'uploads'),
|
|
('Pipeline', 'packagedir'): path.join(self.testbase,
|
|
'packages'),
|
|
('Pipeline', 'tempdir'): path.join(self.testbase, 'temp'),
|
|
('Pipeline', 'cachedir'): path.join(self.testbase,
|
|
'cache'),
|
|
('Notifier', 'url'): path.join(self.testbase, 'results')}
|
|
for (section, key), value in pipeconf.items():
|
|
if section not in config:
|
|
config.add_section(section)
|
|
config[section][key] = value
|
|
makedirs(value)
|
|
|
|
self.config = config
|
|
self.pipeconf = config['Pipeline']
|
|
self.notify_url = config['Notifier']['url']
|
|
self.subspaths = cycle(iglob(subsglob))
|
|
self.videopaths = cycle(iglob(videoglob))
|
|
self.imagepaths = cycle(iglob(imageglob))
|
|
|
|
def tearDown(self):
|
|
rmtree(self.testbase)
|
|
|
|
#@unittest.skip
|
|
class PackageTest(DaemonTest):
|
|
def test_validating(self):
|
|
contents = {
|
|
'title': {'sv': 'Testpaket',
|
|
'en': 'Test package'},
|
|
'description': 'Testbeskrivning',
|
|
'created': 1000,
|
|
'duration': 500.0,
|
|
'presenters': ['fooPresenter', 'barPresenter'],
|
|
'courses': [{'designation': 'fooCourse',
|
|
'semester': 'vt23'},
|
|
{'designation': 'barCourse',
|
|
'semester': 'ht22'}],
|
|
'tags': ['fooTag', 'barTag'],
|
|
'thumb': 'somethumb.jpg',
|
|
'subtitles': {'English': path.basename(next(self.subspaths))},
|
|
'sources': {'main': {'poster': 'fooPoster',
|
|
'playAudio': True,
|
|
'video': {'720': 'fooVideo720',
|
|
'1080': 'fooVideo1080'}},
|
|
'second': {'poster': 'barPoster',
|
|
'playAudio': False,
|
|
'video': {'720': 'barVideo720',
|
|
'1080': 'barVideo1080'}}}
|
|
}
|
|
testable = {}
|
|
for key in contents:
|
|
testable[key] = contents[key]
|
|
self.assertTrue(raise_for_structure(testable, canonical_manifest))
|
|
testable['duration'] = 40.5
|
|
self.assertTrue(raise_for_structure(testable, canonical_manifest))
|
|
|
|
with self.assertRaises(ValueError):
|
|
testable['duration'] = "foo"
|
|
raise_for_structure(testable, canonical_manifest)
|
|
with self.assertRaises(ValueError):
|
|
testable['duration'] = 50
|
|
testable['presenters'] = 4
|
|
raise_for_structure(testable, canonical_manifest)
|
|
with self.assertRaises(ValueError):
|
|
testable['presenters'] = []
|
|
testable['sources'] = "a string"
|
|
raise_for_structure(testable, canonical_manifest)
|
|
|
|
def test_creating(self):
|
|
pkg_id = str(uuid.uuid4())
|
|
contents = {
|
|
'title': {'sv': 'Testpaket',
|
|
'en': 'Test package'},
|
|
'description': 'Testbeskrivning',
|
|
'created': 1000,
|
|
'duration': 500.0,
|
|
'presenters': ['fooPresenter', 'barPresenter'],
|
|
'courses': [{'designation': 'fooCourse',
|
|
'semester': 'vt23'}],
|
|
'tags': ['fooTag', 'barTag'],
|
|
'sources': {'main': {'poster': 'fooPoster',
|
|
'playAudio': True,
|
|
'video': {'720': 'fooVideo720',
|
|
'1080': 'fooVideo1080'}},
|
|
'second': {'poster': 'barPoster',
|
|
'playAudio': False,
|
|
'video': {'720': 'barVideo720',
|
|
'1080': 'barVideo1080'}}}
|
|
}
|
|
|
|
# Save package to disk
|
|
with (PackageManager(pkg_id,
|
|
path.join(self.pipeconf['packagedir'], pkg_id))
|
|
as package):
|
|
for key, value in contents.items():
|
|
package[key] = value
|
|
|
|
# Read it back and validate
|
|
with (PackageManager(pkg_id,
|
|
path.join(self.pipeconf['packagedir'], pkg_id))
|
|
as package):
|
|
for key, value in contents.items():
|
|
self.assertEqual(value, package._contents[key])
|
|
|
|
def test_updating(self):
|
|
pkg_id = str(uuid.uuid4())
|
|
contents = {
|
|
'title': {'sv': 'Testpaket',
|
|
'en': 'Test package'},
|
|
'description': 'Testbeskrivning',
|
|
'created': 1000,
|
|
'duration': 500.0,
|
|
'presenters': ['fooPresenter', 'barPresenter'],
|
|
'sources': {'main': {'poster': 'fooPoster',
|
|
'playAudio': True,
|
|
'video': {'1080': 'fooVideo1080',
|
|
'720': 'fooVideo720'}},
|
|
'second': {'poster': 'barPoster',
|
|
'playAudio': False,
|
|
'video': {'720': 'barVideo720',
|
|
'1080': 'barVideo1080'}}}
|
|
}
|
|
|
|
# Save initial version to disk
|
|
with (PackageManager(pkg_id,
|
|
path.join(self.pipeconf['packagedir'], pkg_id))
|
|
as package):
|
|
for key, value in contents.items():
|
|
package[key] = value
|
|
|
|
# Read it back, modify it and save it again
|
|
with (PackageManager(pkg_id,
|
|
path.join(self.pipeconf['packagedir'], pkg_id))
|
|
as package):
|
|
package['presenters'].remove('fooPresenter')
|
|
package['sources'].pop('second')
|
|
package['sources']['main']['poster'] = 'newFooPoster'
|
|
package['tags'] = ['fooTag', 'barTag']
|
|
|
|
# Update canonical contents to match
|
|
contents['presenters'].remove('fooPresenter')
|
|
contents['sources'].pop('second')
|
|
contents['sources']['main']['poster'] = 'newFooPoster'
|
|
contents['tags'] = ['fooTag', 'barTag']
|
|
|
|
# Read it back and validate
|
|
with (PackageManager(pkg_id,
|
|
path.join(self.pipeconf['packagedir'], pkg_id))
|
|
as package):
|
|
for key, value in contents.items():
|
|
self.assertEqual(value, package._contents[key])
|
|
|
|
|
|
#@unittest.skip("These tests are pretty slow")
|
|
class PipelineTest(DaemonTest):
|
|
def setUp(self):
|
|
super().setUp()
|
|
self.pipeline = Pipeline(self.config)
|
|
self.pipeline.start()
|
|
|
|
def tearDown(self):
|
|
self.pipeline.stop()
|
|
super().tearDown()
|
|
|
|
def submit_default_job(self, jobspec):
|
|
jobid = str(uuid.uuid4())
|
|
queuedata = {'type': 'default',
|
|
'data': jobspec}
|
|
with open(path.join(self.pipeconf['queuedir'], jobid), 'x') as f:
|
|
json.dump(queuedata, f)
|
|
return jobid
|
|
|
|
def wait_for_result(self, jobid, handlers,
|
|
notification_url=None, timeout=10):
|
|
resultfiles = {}
|
|
final_result = None
|
|
if notification_url is None:
|
|
notification_url = self.notify_url
|
|
|
|
for handler in handlers:
|
|
resultfiles = {path.join(notification_url,
|
|
f"{jobid}.{handler}"): False
|
|
for handler in handlers}
|
|
for _try in range(1, timeout+1):
|
|
sleep(1)
|
|
for resultfile, found in resultfiles.items():
|
|
if path.exists(resultfile):
|
|
resultfiles[resultfile] = True
|
|
if all(resultfiles.values()):
|
|
break
|
|
else:
|
|
self.fail(f"No result produced after {_try} seconds")
|
|
|
|
for resultfile in resultfiles:
|
|
with open(resultfile) as f:
|
|
try:
|
|
result = json.load(f)
|
|
except json.decoder.JSONDecodeError:
|
|
print("¤ Contents of invalid notification file ¤")
|
|
print(f.read())
|
|
print("¤ End invalid notification file contents ¤")
|
|
self.fail("Invalid JSON in result file.")
|
|
|
|
# Validate that this is the correct resultfile
|
|
self.assertEqual(jobid, result['jobid'])
|
|
if result['type'] == 'error':
|
|
# Only a single error should ever be produced, so treat it as
|
|
# the final notification and stop checking the rest.
|
|
final_result = result
|
|
break
|
|
|
|
if result['pending'] == []:
|
|
final_result = result
|
|
|
|
if final_result['type'] == 'success':
|
|
# On success of all expected handlers, check match of saved
|
|
# package and notification
|
|
package_id = result['package']['pkg_id']
|
|
packagebase = path.join(self.pipeconf['packagedir'], package_id)
|
|
package = PackageManager(package_id, packagebase).read()
|
|
for key, value in result['package'].items():
|
|
if key == 'pkg_id':
|
|
self.assertEqual(value, package.uuid)
|
|
else:
|
|
self.assertEqual(value, package[key])
|
|
|
|
return final_result
|
|
|
|
def init_job(self, pkgid=False, url=False, subs=False, thumb=False,
|
|
source_count=0, disabled_source_count=0, poster_count=0):
|
|
jobspec = {}
|
|
|
|
if url == True:
|
|
notification_dir = path.join(self.testbase,
|
|
'results',
|
|
str(uuid.uuid4()))
|
|
makedirs(notification_dir)
|
|
jobspec['notification_url'] = notification_dir
|
|
|
|
# Set pkg_id
|
|
if pkgid == True:
|
|
# Generate new
|
|
jobspec['pkg_id'] = str(uuid.uuid4())
|
|
elif pkgid:
|
|
# Re-use existing
|
|
jobspec['pkg_id'] = pkgid
|
|
|
|
if subs:
|
|
self.add_subtitles(jobspec)
|
|
|
|
# Set thumb
|
|
if thumb == '':
|
|
# Request generation in pipeline
|
|
self.add_genthumb(jobspec)
|
|
elif thumb == True:
|
|
# Set an image to be included
|
|
self.add_thumb(jobspec)
|
|
|
|
# Set sources
|
|
if source_count:
|
|
# poster_count determines the number of posters to include.
|
|
# Any remaining posters will be generated in pipeline
|
|
self.add_sources(jobspec, source_count, poster_count, disabled_source_count)
|
|
|
|
return jobspec
|
|
|
|
def ensure_uldir(self, jobspec):
|
|
if 'upload_dir' not in jobspec:
|
|
uldir = path.join(self.pipeconf['uploaddir'],
|
|
str(uuid.uuid4()))
|
|
jobspec['upload_dir'] = uldir
|
|
makedirs(uldir)
|
|
return uldir
|
|
return jobspec['upload_dir']
|
|
|
|
def add_subtitles(self, jobspec):
|
|
uldir = self.ensure_uldir(jobspec)
|
|
|
|
subspec = {}
|
|
for key in 'Svenska', 'English':
|
|
subspath = next(self.subspaths)
|
|
subsfile = path.basename(subspath)
|
|
subspec[key] = subsfile
|
|
copyfile(subspath, path.join(uldir, subsfile))
|
|
jobspec['subtitles'] = subspec
|
|
|
|
def add_sources(self, jobspec, count, poster_count=0, disabled_count=0):
|
|
uldir = self.ensure_uldir(jobspec)
|
|
if 'sources' not in jobspec:
|
|
jobspec['sources'] = {}
|
|
posters = 0
|
|
disabled = 0
|
|
for i in range(count):
|
|
videopath = next(self.videopaths)
|
|
videofile = path.basename(videopath)
|
|
copyfile(videopath, path.join(uldir, videofile))
|
|
sourcedef = {'video': videofile,
|
|
'playAudio': False,
|
|
'enabled': True}
|
|
if disabled < disabled_count:
|
|
sourcedef['enabled'] = False
|
|
disabled += 1
|
|
if i == 0:
|
|
sourcedef['playAudio'] = True
|
|
if posters < poster_count:
|
|
posterpath = next(self.imagepaths)
|
|
posterfile = path.basename(posterpath)
|
|
copyfile(posterpath, path.join(uldir, posterfile))
|
|
sourcedef['poster'] = posterfile
|
|
posters += 1
|
|
jobspec['sources'][f"source-{i}"] = sourcedef
|
|
|
|
#@unittest.skip
|
|
def test_missing_typekey(self):
|
|
jobid = str(uuid.uuid4())
|
|
queuedata = {'data': ''}
|
|
with open(path.join(self.pipeconf['queuedir'], jobid), 'x') as f:
|
|
json.dump(queuedata, f)
|
|
|
|
result = self.wait_for_result(jobid, ['QueueReader'])
|
|
self.assertEqual(result['type'], 'error')
|
|
self.assertEqual(result['message'],
|
|
'"Job specification missing \'type\' key."')
|
|
|
|
#@unittest.skip
|
|
def test_invalid_typekey(self):
|
|
jobid = str(uuid.uuid4())
|
|
queuedata = {'type': 'invalid_type',
|
|
'data': ''}
|
|
with open(path.join(self.pipeconf['queuedir'], jobid), 'x') as f:
|
|
json.dump(queuedata, f)
|
|
|
|
result = self.wait_for_result(jobid, ['QueueReader'])
|
|
self.assertEqual(result['type'], 'error')
|
|
self.assertEqual(result['message'],
|
|
"Invalid type 'invalid_type' in job specification.")
|
|
|
|
#@unittest.skip
|
|
def test_subtitles(self):
|
|
jobspec = self.init_job(subs=True, url=True)
|
|
|
|
jobid = self.submit_default_job(jobspec)
|
|
url = jobspec['notification_url']
|
|
result = self.wait_for_result(jobid,
|
|
['SubtitlesImportHandler'],
|
|
notification_url=url)
|
|
|
|
package_id = result['package']['pkg_id']
|
|
with PackageManager(package_id, path.join(self.pipeconf['packagedir'],
|
|
package_id)) as package:
|
|
for lang in jobspec['subtitles'].keys():
|
|
# Subsfile should be in place for each key
|
|
subspath = path.join(package.basedir,
|
|
package['subtitles'][lang])
|
|
self.assertTrue(path.exists(subspath))
|
|
|
|
# uldir should be gone
|
|
self.assertFalse(path.exists(jobspec['upload_dir']))
|
|
|
|
#@unittest.skip
|
|
def test_metadata(self):
|
|
jobspec = self.init_job()
|
|
jobspec['title'] = {'sv': 'Testtitel',
|
|
'en': 'Test title'}
|
|
jobspec['description'] = 'Test description'
|
|
jobspec['created'] = 1234567
|
|
jobspec['presenters'] = ['fooPerson', 'barPerson']
|
|
jobspec['courses'] = [{'designation': 'fooCourse',
|
|
'semester': 'vt22'},
|
|
{'designation': 'barCourse',
|
|
'semester': 'ht07'}]
|
|
jobspec['tags'] = ['foo', 'bar']
|
|
|
|
jobid = self.submit_default_job(jobspec)
|
|
result = self.wait_for_result(jobid, ['MetadataHandler'])
|
|
|
|
package_id = result['package']['pkg_id']
|
|
with PackageManager(package_id, path.join(self.pipeconf['packagedir'],
|
|
package_id)) as package:
|
|
# Check match of saved package and jobspec
|
|
for key, value in jobspec.items():
|
|
if key == 'pkg_id':
|
|
continue
|
|
self.assertEqual(value, package[key])
|
|
|
|
# Check match of saved package and notification
|
|
for key, value in result['package'].items():
|
|
if key == 'pkg_id':
|
|
continue
|
|
self.assertEqual(value, package[key])
|
|
|
|
#@unittest.skip("This test is very slow")
|
|
def test_transcoding(self):
|
|
jobspec = self.init_job(source_count=3, poster_count=2, disabled_source_count=1)
|
|
|
|
jobid = self.submit_default_job(jobspec)
|
|
result = self.wait_for_result(jobid, ['AudioHandler',
|
|
'TranscodeHandler',
|
|
'PosterHandler'],
|
|
timeout=180)
|
|
|
|
package_id = result['package']['pkg_id']
|
|
with PackageManager(package_id, path.join(self.pipeconf['packagedir'],
|
|
package_id)) as package:
|
|
# Check match of saved package and jobspec
|
|
for name, source in jobspec['sources'].items():
|
|
pkgsource = package['sources'][name]
|
|
self.assertEqual(source['playAudio'], pkgsource['playAudio'])
|
|
self.assertTrue(path.exists(path.join(package.basedir,
|
|
pkgsource['poster'])))
|
|
for variant, filename in pkgsource['video'].items():
|
|
videopath = path.join(package.basedir, filename)
|
|
self.assertTrue(path.exists(videopath))
|
|
|
|
# uldir should be gone
|
|
self.assertFalse(path.exists(jobspec['upload_dir']))
|
|
|
|
#@unittest.skip("This test is very slow")
|
|
def test_generating_subs(self):
|
|
tracknames = ['Generated_1', 'Generated_2']
|
|
job_id_spec_map = {}
|
|
for i in range(3):
|
|
jobspec = self.init_job(source_count=1, poster_count=1)
|
|
subsource = next(iter(jobspec['sources']))
|
|
jobspec['generate_subtitles'] = {}
|
|
for name in tracknames:
|
|
jobspec['generate_subtitles'][name] = {'type': 'whisper',
|
|
'source': subsource}
|
|
jobid = self.submit_default_job(jobspec)
|
|
job_id_spec_map[jobid] = jobspec
|
|
|
|
expected_handlers = ['AudioHandler',
|
|
'TranscodeHandler',
|
|
'SubtitlesWhisperHandler']
|
|
job_result_map = {jobid: (jobspec,
|
|
self.wait_for_result(jobid,
|
|
expected_handlers,
|
|
timeout=540))
|
|
for jobid, jobspec
|
|
in job_id_spec_map.items()}
|
|
|
|
for jobid, (jobspec, result) in job_result_map.items():
|
|
package_id = result['package']['pkg_id']
|
|
with PackageManager(package_id,
|
|
path.join(self.pipeconf['packagedir'],
|
|
package_id)) as package:
|
|
# Check match of saved package and jobspec
|
|
for name, source in jobspec['sources'].items():
|
|
pkgsource = package['sources'][name]
|
|
self.assertEqual(source['playAudio'],
|
|
pkgsource['playAudio'])
|
|
self.assertTrue(
|
|
path.exists(path.join(package.basedir,
|
|
pkgsource['poster'])))
|
|
for variant, filename in pkgsource['video'].items():
|
|
videopath = path.join(package.basedir, filename)
|
|
self.assertTrue(path.exists(videopath))
|
|
for name in tracknames:
|
|
subspath = path.join(package.basedir,
|
|
package['subtitles'][name])
|
|
self.assertTrue(path.exists(subspath))
|
|
with open(subspath) as f:
|
|
print(f"¤ Subtitle track {name} ¤")
|
|
print(f.read())
|
|
print(f"¤ End subtitle track {name} ¤")
|
|
|
|
# uldir should be gone
|
|
self.assertFalse(path.exists(jobspec['upload_dir']))
|
|
|
|
|
|
if __name__ == '__main__':
|
|
if not path.exists(f"./{filesdir}"):
|
|
print(f"""
|
|
These tests require a directory called '{filesdir}' in the
|
|
current working directory.
|
|
|
|
'{filesdir}' should contain the following:
|
|
|
|
- A file named '{configfile}', containing sections as defined in
|
|
config.ini.example. The sections Pipeline and Notifier should be omitted.
|
|
- A subdirectory named '{subssubdir}', containing at least two files with
|
|
a .vtt suffix.
|
|
- A subdirectory named '{videosubdir}', containing at least one video clip.
|
|
Tests may fail if this subdirectory contains non-video files.
|
|
- A subdirectory named '{imagesubdir}', containing at least one image file.
|
|
""")
|
|
exit(1)
|
|
unittest.main()
|