import json from copy import deepcopy from os import mkdir, path, rename, remove from shutil import copytree import ffmpeg from .utils import canonical_manifest, raise_for_structure class Package: def __init__(self, pkg_id, basedir, contents={}): self.uuid = pkg_id self.basedir = basedir self._manifest = path.join(basedir, 'package.json') self._contents = {'title': {'en': '', 'sv': ''}, 'description': '', 'created': None, 'duration': None, 'presenters': [], 'courses': [], 'tags': [], 'thumb': '', 'subtitles': {}, 'sources': {}} for key, value in contents.items(): self[key] = value def __setitem__(self, key, value): self._contents[key] = value def __getitem__(self, key): return self._contents[key] def __contains__(self, key): return key in self._contents def __repr__(self): return (f"Package({self.uuid}, " + f"{self.basedir}, " + f"contents={self._contents})") def __eq__(self, other): return (self.uuid == other.uuid and self.basedir == other.basedir and self._contents == other._contents) def get(self, key, default=None): if key in self._contents: return self._contents[key] else: return default def set_duration(self): """ Set duration by longest video file (they should all be the same length though) """ durations = [] for source in self._contents['sources'].values(): for video in source['video'].values(): videopath = path.join(self.basedir, video) try: duration = ffmpeg.probe(videopath)['format']['duration'] durations.append(float(duration)) except ffmpeg.Error as e: print(e.stdout) print(e.stderr) self['duration'] = max(durations) def set_subtitle_track(self, name, inpath): """ Set the subtitle track indicated by 'name' to the file at 'inpath'. If 'inpath' is None, the track is removed. """ subtitles = self._contents['subtitles'] if not inpath: # Delete this track if subtitles.get(name, None): remove(path.join(self.basedir, subtitles.pop(name))) return # Check if there is a subtitles file # already associated with this name existing = subtitles.get(name, None) if existing: # A subtitles file exists for this name, remove it remove(path.join(self.basedir, existing)) # Save the file to the correct place and update metadata rename(inpath, path.join(self.basedir, f"{name}.vtt")) subtitles[name] = f"{name}.vtt" def asdict(self): out = deepcopy(self._contents) out['pkg_id'] = self.uuid return out class PackageManager: def __init__(self, package_id, package_dir): self._package = Package(package_id, package_dir) def make_temporary_copy(self, tempbase): """ Returns a copy of the current package, with the base directory 'tempbase'. """ original = self.read() if path.exists(original.basedir): copytree(original.basedir, tempbase) parent = path.dirname(tempbase) return PackageManager(self._package.uuid, tempbase).read() def read(self): if path.exists(self._package._manifest): with open(self._package._manifest) as f: contents = json.load(f) raise_for_structure(contents, canonical_manifest) self._package = Package(self._package.uuid, self._package.basedir, contents=contents) return self._package def __enter__(self): # Make sure the package's basedir exists, # it may not have been created yet. if not path.exists(self._package.basedir): mkdir(self._package.basedir) return self.read() def __exit__(self, exc_type, exc_value, traceback): # In order to avoid races with the API, # first write to a temporary file tempfile = f"{self._package._manifest}.tmp" with open(tempfile, 'w') as f: json.dump(self._package._contents, f) # Then move the file atomically to the correct place rename(tempfile, self._package._manifest)