from pathlib import Path from .preprocessor import Preprocessor from ..utils import raise_for_structure, canonical_jobspec canonical_default = { 'type': 'default', 'data': canonical_jobspec} @Preprocessor.register class DefaultProcessor(Preprocessor): """ Preprocessing for standard jobs. It passes the job onwards unchanged since the incoming job should already conform to the unified job format. """ def validate(self, queueitem): raise_for_structure(queueitem, canonical_default) if 'upload_dir' in queueitem['data']: upload_dir = Path(queueitem['data']['upload_dir']) if not upload_dir.exists(): raise ValueError('Specified upload_dir does not exist.') return True def _preprocess(self, job): jobid = job['jobid'] jobspec = job['queueitem']['data'] return {'jobid': jobid, 'jobspec': jobspec}