play-daemon-threaded/test.py
Erik Thuning cbf29c4962 Implementing a unified job pool for transcodes and subtitles never panned out,
so the code was just adding unnecessary complexity. The pipeline now uses
mp.pool to manage ffmpeg jobs as before.

This reverts commit f91109fb3e and deletes the
WorkThread class and its associated tests.
2024-10-17 11:34:00 +02:00

526 lines
21 KiB
Python
Executable File

#!/usr/bin/env python3
from configparser import ConfigParser
from glob import iglob
from itertools import cycle
from os import makedirs, path
from shutil import copyfile, rmtree
from time import sleep, strftime
import json
import unittest
import uuid
from pipeline import Pipeline
from pipeline.package import PackageManager
from pipeline.utils import raise_for_structure, canonical_manifest
filesdir = 'testfiles'
configfile = 'config.ini'
subssubdir = 'subs'
videosubdir = 'video'
imagesubdir = 'image'
subsglob = path.join(filesdir, subssubdir, '*.vtt')
videoglob = path.join(filesdir, videosubdir, '*')
imageglob = path.join(filesdir, imagesubdir, '*')
class DaemonTest(unittest.TestCase):
def setUp(self):
config = ConfigParser(empty_lines_in_values=False)
config.read(path.join(filesdir, configfile))
self.testbase = f"./{str(uuid.uuid4())}"
pipeconf = {('Pipeline', 'queuedir'): path.join(self.testbase,
'queue'),
('Pipeline', 'uploaddir'): path.join(self.testbase,
'uploads'),
('Pipeline', 'packagedir'): path.join(self.testbase,
'packages'),
('Pipeline', 'tempdir'): path.join(self.testbase, 'temp'),
('Pipeline', 'cachedir'): path.join(self.testbase,
'cache'),
('Notifier', 'url'): path.join(self.testbase, 'results')}
for (section, key), value in pipeconf.items():
if section not in config:
config.add_section(section)
config[section][key] = value
makedirs(value)
self.config = config
self.pipeconf = config['Pipeline']
self.notify_url = config['Notifier']['url']
self.subspaths = cycle(iglob(subsglob))
self.videopaths = cycle(iglob(videoglob))
self.imagepaths = cycle(iglob(imageglob))
def tearDown(self):
rmtree(self.testbase)
#@unittest.skip
class PackageTest(DaemonTest):
def test_validating(self):
contents = {
'title': {'sv': 'Testpaket',
'en': 'Test package'},
'description': 'Testbeskrivning',
'created': 1000,
'duration': 500.0,
'presenters': ['fooPresenter', 'barPresenter'],
'courses': [{'designation': 'fooCourse',
'semester': 'vt23'},
{'designation': 'barCourse',
'semester': 'ht22'}],
'tags': ['fooTag', 'barTag'],
'thumb': 'somethumb.jpg',
'subtitles': {'English': path.basename(next(self.subspaths))},
'sources': {'main': {'poster': 'fooPoster',
'playAudio': True,
'video': {'720': 'fooVideo720',
'1080': 'fooVideo1080'}},
'second': {'poster': 'barPoster',
'playAudio': False,
'video': {'720': 'barVideo720',
'1080': 'barVideo1080'}}}
}
testable = {}
for key in contents:
testable[key] = contents[key]
self.assertTrue(raise_for_structure(testable, canonical_manifest))
testable['duration'] = 40.5
self.assertTrue(raise_for_structure(testable, canonical_manifest))
with self.assertRaises(ValueError):
testable['duration'] = "foo"
raise_for_structure(testable, canonical_manifest)
with self.assertRaises(ValueError):
testable['duration'] = 50
testable['presenters'] = 4
raise_for_structure(testable, canonical_manifest)
with self.assertRaises(ValueError):
testable['presenters'] = []
testable['sources'] = "a string"
raise_for_structure(testable, canonical_manifest)
def test_creating(self):
pkg_id = str(uuid.uuid4())
contents = {
'title': {'sv': 'Testpaket',
'en': 'Test package'},
'description': 'Testbeskrivning',
'created': 1000,
'duration': 500.0,
'presenters': ['fooPresenter', 'barPresenter'],
'courses': [{'designation': 'fooCourse',
'semester': 'vt23'}],
'tags': ['fooTag', 'barTag'],
'sources': {'main': {'poster': 'fooPoster',
'playAudio': True,
'video': {'720': 'fooVideo720',
'1080': 'fooVideo1080'}},
'second': {'poster': 'barPoster',
'playAudio': False,
'video': {'720': 'barVideo720',
'1080': 'barVideo1080'}}}
}
# Save package to disk
with (PackageManager(pkg_id,
path.join(self.pipeconf['packagedir'], pkg_id))
as package):
for key, value in contents.items():
package[key] = value
# Read it back and validate
with (PackageManager(pkg_id,
path.join(self.pipeconf['packagedir'], pkg_id))
as package):
for key, value in contents.items():
self.assertEqual(value, package._contents[key])
def test_updating(self):
pkg_id = str(uuid.uuid4())
contents = {
'title': {'sv': 'Testpaket',
'en': 'Test package'},
'description': 'Testbeskrivning',
'created': 1000,
'duration': 500.0,
'presenters': ['fooPresenter', 'barPresenter'],
'sources': {'main': {'poster': 'fooPoster',
'playAudio': True,
'video': {'1080': 'fooVideo1080',
'720': 'fooVideo720'}},
'second': {'poster': 'barPoster',
'playAudio': False,
'video': {'720': 'barVideo720',
'1080': 'barVideo1080'}}}
}
# Save initial version to disk
with (PackageManager(pkg_id,
path.join(self.pipeconf['packagedir'], pkg_id))
as package):
for key, value in contents.items():
package[key] = value
# Read it back, modify it and save it again
with (PackageManager(pkg_id,
path.join(self.pipeconf['packagedir'], pkg_id))
as package):
package['presenters'].remove('fooPresenter')
package['sources'].pop('second')
package['sources']['main']['poster'] = 'newFooPoster'
package['tags'] = ['fooTag', 'barTag']
# Update canonical contents to match
contents['presenters'].remove('fooPresenter')
contents['sources'].pop('second')
contents['sources']['main']['poster'] = 'newFooPoster'
contents['tags'] = ['fooTag', 'barTag']
# Read it back and validate
with (PackageManager(pkg_id,
path.join(self.pipeconf['packagedir'], pkg_id))
as package):
for key, value in contents.items():
self.assertEqual(value, package._contents[key])
#@unittest.skip("These tests are pretty slow")
class PipelineTest(DaemonTest):
def setUp(self):
super().setUp()
self.pipeline = Pipeline(self.config)
self.pipeline.start()
def tearDown(self):
self.pipeline.stop()
super().tearDown()
def submit_default_job(self, jobspec):
jobid = str(uuid.uuid4())
queuedata = {'type': 'default',
'data': jobspec}
with open(path.join(self.pipeconf['queuedir'], jobid), 'x') as f:
json.dump(queuedata, f)
return jobid
def wait_for_result(self, jobid, handlers,
notification_url=None, timeout=10):
resultfiles = {}
final_result = None
if notification_url is None:
notification_url = self.notify_url
for handler in handlers:
resultfiles = {path.join(notification_url,
f"{jobid}.{handler}"): False
for handler in handlers}
for _try in range(1, timeout+1):
sleep(1)
for resultfile, found in resultfiles.items():
if path.exists(resultfile):
resultfiles[resultfile] = True
if all(resultfiles.values()):
break
else:
self.fail(f"No result produced after {_try} seconds")
for resultfile in resultfiles:
with open(resultfile) as f:
try:
result = json.load(f)
except json.decoder.JSONDecodeError:
print("¤ Contents of invalid notification file ¤")
print(f.read())
print("¤ End invalid notification file contents ¤")
self.fail("Invalid JSON in result file.")
# Validate that this is the correct resultfile
self.assertEqual(jobid, result['jobid'])
if result['type'] == 'error':
# Only a single error should ever be produced, so treat it as
# the final notification and stop checking the rest.
final_result = result
break
if result['pending'] == []:
final_result = result
if final_result['type'] == 'success':
# On success of all expected handlers, check match of saved
# package and notification
package_id = result['package']['pkg_id']
packagebase = path.join(self.pipeconf['packagedir'], package_id)
package = PackageManager(package_id, packagebase).read()
for key, value in result['package'].items():
if key == 'pkg_id':
self.assertEqual(value, package.uuid)
else:
self.assertEqual(value, package[key])
return final_result
def init_job(self, pkgid=False, url=False, subs=False, thumb=False,
source_count=0, disabled_source_count=0, poster_count=0):
jobspec = {}
if url == True:
notification_dir = path.join(self.testbase,
'results',
str(uuid.uuid4()))
makedirs(notification_dir)
jobspec['notification_url'] = notification_dir
# Set pkg_id
if pkgid == True:
# Generate new
jobspec['pkg_id'] = str(uuid.uuid4())
elif pkgid:
# Re-use existing
jobspec['pkg_id'] = pkgid
if subs:
self.add_subtitles(jobspec)
# Set thumb
if thumb == '':
# Request generation in pipeline
self.add_genthumb(jobspec)
elif thumb == True:
# Set an image to be included
self.add_thumb(jobspec)
# Set sources
if source_count:
# poster_count determines the number of posters to include.
# Any remaining posters will be generated in pipeline
self.add_sources(jobspec, source_count, poster_count, disabled_source_count)
return jobspec
def ensure_uldir(self, jobspec):
if 'upload_dir' not in jobspec:
uldir = path.join(self.pipeconf['uploaddir'],
str(uuid.uuid4()))
jobspec['upload_dir'] = uldir
makedirs(uldir)
return uldir
return jobspec['upload_dir']
def add_subtitles(self, jobspec):
uldir = self.ensure_uldir(jobspec)
subspec = {}
for key in 'Svenska', 'English':
subspath = next(self.subspaths)
subsfile = path.basename(subspath)
subspec[key] = subsfile
copyfile(subspath, path.join(uldir, subsfile))
jobspec['subtitles'] = subspec
def add_sources(self, jobspec, count, poster_count=0, disabled_count=0):
uldir = self.ensure_uldir(jobspec)
if 'sources' not in jobspec:
jobspec['sources'] = {}
posters = 0
disabled = 0
for i in range(count):
videopath = next(self.videopaths)
videofile = path.basename(videopath)
copyfile(videopath, path.join(uldir, videofile))
sourcedef = {'video': videofile,
'playAudio': False,
'enabled': True}
if disabled < disabled_count:
sourcedef['enabled'] = False
disabled += 1
if i == 0:
sourcedef['playAudio'] = True
if posters < poster_count:
posterpath = next(self.imagepaths)
posterfile = path.basename(posterpath)
copyfile(posterpath, path.join(uldir, posterfile))
sourcedef['poster'] = posterfile
posters += 1
jobspec['sources'][f"source-{i}"] = sourcedef
#@unittest.skip
def test_missing_typekey(self):
jobid = str(uuid.uuid4())
queuedata = {'data': ''}
with open(path.join(self.pipeconf['queuedir'], jobid), 'x') as f:
json.dump(queuedata, f)
result = self.wait_for_result(jobid, ['QueueReader'])
self.assertEqual(result['type'], 'error')
self.assertEqual(result['message'],
'"Job specification missing \'type\' key."')
#@unittest.skip
def test_invalid_typekey(self):
jobid = str(uuid.uuid4())
queuedata = {'type': 'invalid_type',
'data': ''}
with open(path.join(self.pipeconf['queuedir'], jobid), 'x') as f:
json.dump(queuedata, f)
result = self.wait_for_result(jobid, ['QueueReader'])
self.assertEqual(result['type'], 'error')
self.assertEqual(result['message'],
"Invalid type 'invalid_type' in job specification.")
#@unittest.skip
def test_subtitles(self):
jobspec = self.init_job(subs=True, url=True)
jobid = self.submit_default_job(jobspec)
url = jobspec['notification_url']
result = self.wait_for_result(jobid,
['SubtitlesImportHandler'],
notification_url=url)
package_id = result['package']['pkg_id']
with PackageManager(package_id, path.join(self.pipeconf['packagedir'],
package_id)) as package:
for lang in jobspec['subtitles'].keys():
# Subsfile should be in place for each key
subspath = path.join(package.basedir,
package['subtitles'][lang])
self.assertTrue(path.exists(subspath))
# uldir should be gone
self.assertFalse(path.exists(jobspec['upload_dir']))
#@unittest.skip
def test_metadata(self):
jobspec = self.init_job()
jobspec['title'] = {'sv': 'Testtitel',
'en': 'Test title'}
jobspec['description'] = 'Test description'
jobspec['created'] = 1234567
jobspec['presenters'] = ['fooPerson', 'barPerson']
jobspec['courses'] = [{'designation': 'fooCourse',
'semester': 'vt22'},
{'designation': 'barCourse',
'semester': 'ht07'}]
jobspec['tags'] = ['foo', 'bar']
jobid = self.submit_default_job(jobspec)
result = self.wait_for_result(jobid, ['MetadataHandler'])
package_id = result['package']['pkg_id']
with PackageManager(package_id, path.join(self.pipeconf['packagedir'],
package_id)) as package:
# Check match of saved package and jobspec
for key, value in jobspec.items():
if key == 'pkg_id':
continue
self.assertEqual(value, package[key])
# Check match of saved package and notification
for key, value in result['package'].items():
if key == 'pkg_id':
continue
self.assertEqual(value, package[key])
#@unittest.skip("This test is very slow")
def test_transcoding(self):
jobspec = self.init_job(source_count=3, poster_count=2, disabled_source_count=1)
jobid = self.submit_default_job(jobspec)
result = self.wait_for_result(jobid, ['AudioHandler',
'TranscodeHandler',
'PosterHandler'],
timeout=180)
package_id = result['package']['pkg_id']
with PackageManager(package_id, path.join(self.pipeconf['packagedir'],
package_id)) as package:
# Check match of saved package and jobspec
for name, source in jobspec['sources'].items():
pkgsource = package['sources'][name]
self.assertEqual(source['playAudio'], pkgsource['playAudio'])
self.assertTrue(path.exists(path.join(package.basedir,
pkgsource['poster'])))
for variant, filename in pkgsource['video'].items():
videopath = path.join(package.basedir, filename)
self.assertTrue(path.exists(videopath))
# uldir should be gone
self.assertFalse(path.exists(jobspec['upload_dir']))
#@unittest.skip("This test is very slow")
def test_generating_subs(self):
tracknames = ['Generated_1', 'Generated_2']
job_id_spec_map = {}
for i in range(3):
jobspec = self.init_job(source_count=1, poster_count=1)
subsource = next(iter(jobspec['sources']))
jobspec['generate_subtitles'] = {}
for name in tracknames:
jobspec['generate_subtitles'][name] = {'type': 'whisper',
'source': subsource}
jobid = self.submit_default_job(jobspec)
job_id_spec_map[jobid] = jobspec
expected_handlers = ['AudioHandler',
'TranscodeHandler',
'SubtitlesWhisperHandler']
job_result_map = {jobid: (jobspec,
self.wait_for_result(jobid,
expected_handlers,
timeout=540))
for jobid, jobspec
in job_id_spec_map.items()}
for jobid, (jobspec, result) in job_result_map.items():
package_id = result['package']['pkg_id']
with PackageManager(package_id,
path.join(self.pipeconf['packagedir'],
package_id)) as package:
# Check match of saved package and jobspec
for name, source in jobspec['sources'].items():
pkgsource = package['sources'][name]
self.assertEqual(source['playAudio'],
pkgsource['playAudio'])
self.assertTrue(
path.exists(path.join(package.basedir,
pkgsource['poster'])))
for variant, filename in pkgsource['video'].items():
videopath = path.join(package.basedir, filename)
self.assertTrue(path.exists(videopath))
for name in tracknames:
subspath = path.join(package.basedir,
package['subtitles'][name])
self.assertTrue(path.exists(subspath))
with open(subspath) as f:
print(f"¤ Subtitle track {name} ¤")
print(f.read())
print(f"¤ End subtitle track {name} ¤")
# uldir should be gone
self.assertFalse(path.exists(jobspec['upload_dir']))
if __name__ == '__main__':
if not path.exists(f"./{filesdir}"):
print(f"""
These tests require a directory called '{filesdir}' in the
current working directory.
'{filesdir}' should contain the following:
- A file named '{configfile}', containing sections as defined in
config.ini.example. The sections Pipeline and Notifier should be omitted.
- A subdirectory named '{subssubdir}', containing at least two files with
a .vtt suffix.
- A subdirectory named '{videosubdir}', containing at least one video clip.
Tests may fail if this subdirectory contains non-video files.
- A subdirectory named '{imagesubdir}', containing at least one image file.
""")
exit(1)
unittest.main()