#!/usr/bin/env python3 from configparser import ConfigParser from glob import iglob from itertools import cycle from os import makedirs, path from shutil import copyfile, rmtree from time import sleep, strftime import json import unittest import uuid from pipeline import Pipeline from pipeline.package import PackageManager from pipeline.utils import raise_for_structure, canonical_manifest filesdir = 'testfiles' configfile = 'config.ini' subssubdir = 'subs' videosubdir = 'video' imagesubdir = 'image' subsglob = path.join(filesdir, subssubdir, '*.vtt') videoglob = path.join(filesdir, videosubdir, '*') imageglob = path.join(filesdir, imagesubdir, '*') class DaemonTest(unittest.TestCase): def setUp(self): config = ConfigParser(empty_lines_in_values=False) config.read(path.join(filesdir, configfile)) self.testbase = f"./{str(uuid.uuid4())}" pipeconf = {('Pipeline', 'queuedir'): path.join(self.testbase, 'queue'), ('Pipeline', 'uploaddir'): path.join(self.testbase, 'uploads'), ('Pipeline', 'packagedir'): path.join(self.testbase, 'packages'), ('Pipeline', 'tempdir'): path.join(self.testbase, 'temp'), ('Pipeline', 'cachedir'): path.join(self.testbase, 'cache'), ('Notifier', 'url'): path.join(self.testbase, 'results')} for (section, key), value in pipeconf.items(): if section not in config: config.add_section(section) config[section][key] = value makedirs(value) self.config = config self.pipeconf = config['Pipeline'] self.notify_url = config['Notifier']['url'] self.subspaths = cycle(iglob(subsglob)) self.videopaths = cycle(iglob(videoglob)) self.imagepaths = cycle(iglob(imageglob)) def tearDown(self): rmtree(self.testbase) #@unittest.skip class PackageTest(DaemonTest): def test_validating(self): contents = { 'title': {'sv': 'Testpaket', 'en': 'Test package'}, 'description': 'Testbeskrivning', 'created': 1000, 'duration': 500.0, 'presenters': ['fooPresenter', 'barPresenter'], 'courses': [{'designation': 'fooCourse', 'semester': 'vt23'}, {'designation': 'barCourse', 'semester': 'ht22'}], 'tags': ['fooTag', 'barTag'], 'thumb': 'somethumb.jpg', 'subtitles': {'English': path.basename(next(self.subspaths))}, 'sources': {'main': {'poster': 'fooPoster', 'playAudio': True, 'video': {'720': 'fooVideo720', '1080': 'fooVideo1080'}}, 'second': {'poster': 'barPoster', 'playAudio': False, 'video': {'720': 'barVideo720', '1080': 'barVideo1080'}}} } testable = {} for key in contents: testable[key] = contents[key] self.assertTrue(raise_for_structure(testable, canonical_manifest)) testable['duration'] = 40.5 self.assertTrue(raise_for_structure(testable, canonical_manifest)) with self.assertRaises(ValueError): testable['duration'] = "foo" raise_for_structure(testable, canonical_manifest) with self.assertRaises(ValueError): testable['duration'] = 50 testable['presenters'] = 4 raise_for_structure(testable, canonical_manifest) with self.assertRaises(ValueError): testable['presenters'] = [] testable['sources'] = "a string" raise_for_structure(testable, canonical_manifest) def test_creating(self): pkg_id = str(uuid.uuid4()) contents = { 'title': {'sv': 'Testpaket', 'en': 'Test package'}, 'description': 'Testbeskrivning', 'created': 1000, 'duration': 500.0, 'presenters': ['fooPresenter', 'barPresenter'], 'courses': [{'designation': 'fooCourse', 'semester': 'vt23'}], 'tags': ['fooTag', 'barTag'], 'sources': {'main': {'poster': 'fooPoster', 'playAudio': True, 'video': {'720': 'fooVideo720', '1080': 'fooVideo1080'}}, 'second': {'poster': 'barPoster', 'playAudio': False, 'video': {'720': 'barVideo720', '1080': 'barVideo1080'}}} } # Save package to disk with (PackageManager(pkg_id, path.join(self.pipeconf['packagedir'], pkg_id)) as package): for key, value in contents.items(): package[key] = value # Read it back and validate with (PackageManager(pkg_id, path.join(self.pipeconf['packagedir'], pkg_id)) as package): for key, value in contents.items(): self.assertEqual(value, package._contents[key]) def test_updating(self): pkg_id = str(uuid.uuid4()) contents = { 'title': {'sv': 'Testpaket', 'en': 'Test package'}, 'description': 'Testbeskrivning', 'created': 1000, 'duration': 500.0, 'presenters': ['fooPresenter', 'barPresenter'], 'sources': {'main': {'poster': 'fooPoster', 'playAudio': True, 'video': {'1080': 'fooVideo1080', '720': 'fooVideo720'}}, 'second': {'poster': 'barPoster', 'playAudio': False, 'video': {'720': 'barVideo720', '1080': 'barVideo1080'}}} } # Save initial version to disk with (PackageManager(pkg_id, path.join(self.pipeconf['packagedir'], pkg_id)) as package): for key, value in contents.items(): package[key] = value # Read it back, modify it and save it again with (PackageManager(pkg_id, path.join(self.pipeconf['packagedir'], pkg_id)) as package): package['presenters'].remove('fooPresenter') package['sources'].pop('second') package['sources']['main']['poster'] = 'newFooPoster' package['tags'] = ['fooTag', 'barTag'] # Update canonical contents to match contents['presenters'].remove('fooPresenter') contents['sources'].pop('second') contents['sources']['main']['poster'] = 'newFooPoster' contents['tags'] = ['fooTag', 'barTag'] # Read it back and validate with (PackageManager(pkg_id, path.join(self.pipeconf['packagedir'], pkg_id)) as package): for key, value in contents.items(): self.assertEqual(value, package._contents[key]) #@unittest.skip("These tests are pretty slow") class PipelineTest(DaemonTest): def setUp(self): super().setUp() self.pipeline = Pipeline(self.config) self.pipeline.start() def tearDown(self): self.pipeline.stop() super().tearDown() def submit_default_job(self, jobspec): jobid = str(uuid.uuid4()) queuedata = {'type': 'default', 'data': jobspec} with open(path.join(self.pipeconf['queuedir'], jobid), 'x') as f: json.dump(queuedata, f) return jobid def wait_for_result(self, jobid, handlers, notification_url=None, timeout=10): resultfiles = {} final_result = None if notification_url is None: notification_url = self.notify_url for handler in handlers: resultfiles = {path.join(notification_url, f"{jobid}.{handler}"): False for handler in handlers} for _try in range(1, timeout+1): sleep(1) for resultfile, found in resultfiles.items(): if path.exists(resultfile): resultfiles[resultfile] = True if all(resultfiles.values()): break else: self.fail(f"No result produced after {_try} seconds") for resultfile in resultfiles: with open(resultfile) as f: try: result = json.load(f) except json.decoder.JSONDecodeError: print("¤ Contents of invalid notification file ¤") print(f.read()) print("¤ End invalid notification file contents ¤") self.fail("Invalid JSON in result file.") # Validate that this is the correct resultfile self.assertEqual(jobid, result['jobid']) if result['type'] == 'error': # Only a single error should ever be produced, so treat it as # the final notification and stop checking the rest. final_result = result break if result['pending'] == []: final_result = result if final_result['type'] == 'success': # On success of all expected handlers, check match of saved # package and notification package_id = result['package']['pkg_id'] packagebase = path.join(self.pipeconf['packagedir'], package_id) package = PackageManager(package_id, packagebase).read() for key, value in result['package'].items(): if key == 'pkg_id': self.assertEqual(value, package.uuid) else: self.assertEqual(value, package[key]) return final_result def init_job(self, pkgid=False, url=False, subs=False, thumb=False, source_count=0, disabled_source_count=0, poster_count=0): jobspec = {} if url == True: notification_dir = path.join(self.testbase, 'results', str(uuid.uuid4())) makedirs(notification_dir) jobspec['notification_url'] = notification_dir # Set pkg_id if pkgid == True: # Generate new jobspec['pkg_id'] = str(uuid.uuid4()) elif pkgid: # Re-use existing jobspec['pkg_id'] = pkgid if subs: self.add_subtitles(jobspec) # Set thumb if thumb == '': # Request generation in pipeline self.add_genthumb(jobspec) elif thumb == True: # Set an image to be included self.add_thumb(jobspec) # Set sources if source_count: # poster_count determines the number of posters to include. # Any remaining posters will be generated in pipeline self.add_sources(jobspec, source_count, poster_count, disabled_source_count) return jobspec def ensure_uldir(self, jobspec): if 'upload_dir' not in jobspec: uldir = path.join(self.pipeconf['uploaddir'], str(uuid.uuid4())) jobspec['upload_dir'] = uldir makedirs(uldir) return uldir return jobspec['upload_dir'] def add_subtitles(self, jobspec): uldir = self.ensure_uldir(jobspec) subspec = {} for key in 'Svenska', 'English': subspath = next(self.subspaths) subsfile = path.basename(subspath) subspec[key] = subsfile copyfile(subspath, path.join(uldir, subsfile)) jobspec['subtitles'] = subspec def add_sources(self, jobspec, count, poster_count=0, disabled_count=0): uldir = self.ensure_uldir(jobspec) if 'sources' not in jobspec: jobspec['sources'] = {} posters = 0 disabled = 0 for i in range(count): videopath = next(self.videopaths) videofile = path.basename(videopath) copyfile(videopath, path.join(uldir, videofile)) sourcedef = {'video': videofile, 'playAudio': False, 'enabled': True} if disabled < disabled_count: sourcedef['enabled'] = False disabled += 1 if i == 0: sourcedef['playAudio'] = True if posters < poster_count: posterpath = next(self.imagepaths) posterfile = path.basename(posterpath) copyfile(posterpath, path.join(uldir, posterfile)) sourcedef['poster'] = posterfile posters += 1 jobspec['sources'][f"source-{i}"] = sourcedef #@unittest.skip def test_missing_typekey(self): jobid = str(uuid.uuid4()) queuedata = {'data': ''} with open(path.join(self.pipeconf['queuedir'], jobid), 'x') as f: json.dump(queuedata, f) result = self.wait_for_result(jobid, ['QueueReader']) self.assertEqual(result['type'], 'error') self.assertEqual(result['message'], '"Job specification missing \'type\' key."') #@unittest.skip def test_invalid_typekey(self): jobid = str(uuid.uuid4()) queuedata = {'type': 'invalid_type', 'data': ''} with open(path.join(self.pipeconf['queuedir'], jobid), 'x') as f: json.dump(queuedata, f) result = self.wait_for_result(jobid, ['QueueReader']) self.assertEqual(result['type'], 'error') self.assertEqual(result['message'], "Invalid type 'invalid_type' in job specification.") #@unittest.skip def test_subtitles(self): jobspec = self.init_job(subs=True, url=True) jobid = self.submit_default_job(jobspec) url = jobspec['notification_url'] result = self.wait_for_result(jobid, ['SubtitlesImportHandler'], notification_url=url) package_id = result['package']['pkg_id'] with PackageManager(package_id, path.join(self.pipeconf['packagedir'], package_id)) as package: for lang in jobspec['subtitles'].keys(): # Subsfile should be in place for each key subspath = path.join(package.basedir, package['subtitles'][lang]) self.assertTrue(path.exists(subspath)) # uldir should be gone self.assertFalse(path.exists(jobspec['upload_dir'])) #@unittest.skip def test_metadata(self): jobspec = self.init_job() jobspec['title'] = {'sv': 'Testtitel', 'en': 'Test title'} jobspec['description'] = 'Test description' jobspec['created'] = 1234567 jobspec['presenters'] = ['fooPerson', 'barPerson'] jobspec['courses'] = [{'designation': 'fooCourse', 'semester': 'vt22'}, {'designation': 'barCourse', 'semester': 'ht07'}] jobspec['tags'] = ['foo', 'bar'] jobid = self.submit_default_job(jobspec) result = self.wait_for_result(jobid, ['MetadataHandler']) package_id = result['package']['pkg_id'] with PackageManager(package_id, path.join(self.pipeconf['packagedir'], package_id)) as package: # Check match of saved package and jobspec for key, value in jobspec.items(): if key == 'pkg_id': continue self.assertEqual(value, package[key]) # Check match of saved package and notification for key, value in result['package'].items(): if key == 'pkg_id': continue self.assertEqual(value, package[key]) #@unittest.skip("This test is very slow") def test_transcoding(self): jobspec = self.init_job(source_count=3, poster_count=2, disabled_source_count=1) jobid = self.submit_default_job(jobspec) result = self.wait_for_result(jobid, ['AudioHandler', 'TranscodeHandler', 'PosterHandler'], timeout=180) package_id = result['package']['pkg_id'] with PackageManager(package_id, path.join(self.pipeconf['packagedir'], package_id)) as package: # Check match of saved package and jobspec for name, source in jobspec['sources'].items(): pkgsource = package['sources'][name] self.assertEqual(source['playAudio'], pkgsource['playAudio']) self.assertTrue(path.exists(path.join(package.basedir, pkgsource['poster']))) for variant, filename in pkgsource['video'].items(): videopath = path.join(package.basedir, filename) self.assertTrue(path.exists(videopath)) # uldir should be gone self.assertFalse(path.exists(jobspec['upload_dir'])) #@unittest.skip("This test is very slow") def test_generating_subs(self): tracknames = ['Generated_1', 'Generated_2'] job_id_spec_map = {} for i in range(3): jobspec = self.init_job(source_count=1, poster_count=1) subsource = next(iter(jobspec['sources'])) jobspec['generate_subtitles'] = {} for name in tracknames: jobspec['generate_subtitles'][name] = {'type': 'whisper', 'source': subsource} jobid = self.submit_default_job(jobspec) job_id_spec_map[jobid] = jobspec expected_handlers = ['AudioHandler', 'TranscodeHandler', 'SubtitlesWhisperHandler'] job_result_map = {jobid: (jobspec, self.wait_for_result(jobid, expected_handlers, timeout=540)) for jobid, jobspec in job_id_spec_map.items()} for jobid, (jobspec, result) in job_result_map.items(): package_id = result['package']['pkg_id'] with PackageManager(package_id, path.join(self.pipeconf['packagedir'], package_id)) as package: # Check match of saved package and jobspec for name, source in jobspec['sources'].items(): pkgsource = package['sources'][name] self.assertEqual(source['playAudio'], pkgsource['playAudio']) self.assertTrue( path.exists(path.join(package.basedir, pkgsource['poster']))) for variant, filename in pkgsource['video'].items(): videopath = path.join(package.basedir, filename) self.assertTrue(path.exists(videopath)) for name in tracknames: subspath = path.join(package.basedir, package['subtitles'][name]) self.assertTrue(path.exists(subspath)) with open(subspath) as f: print(f"¤ Subtitle track {name} ¤") print(f.read()) print(f"¤ End subtitle track {name} ¤") # uldir should be gone self.assertFalse(path.exists(jobspec['upload_dir'])) if __name__ == '__main__': if not path.exists(f"./{filesdir}"): print(f""" These tests require a directory called '{filesdir}' in the current working directory. '{filesdir}' should contain the following: - A file named '{configfile}', containing sections as defined in config.ini.example. The sections Pipeline and Notifier should be omitted. - A subdirectory named '{subssubdir}', containing at least two files with a .vtt suffix. - A subdirectory named '{videosubdir}', containing at least one video clip. Tests may fail if this subdirectory contains non-video files. - A subdirectory named '{imagesubdir}', containing at least one image file. """) exit(1) unittest.main()