diff --git a/.travis.yml b/.travis.yml index d71e2bcc0..bad172ec4 100644 --- a/.travis.yml +++ b/.travis.yml @@ -7,6 +7,7 @@ python: install: - pip install six - pip install -U -r requirements.txt +- pip install -U -r requirements-4dn.txt - pip install codacy-coverage script: - invoke test diff --git a/README.md b/README.md index 3a0161fca..5933e396d 100644 --- a/README.md +++ b/README.md @@ -39,6 +39,7 @@ Tibanna has been evolving: originally developed for Desktop workflow submitter t * Python 2.7 * Pip 9.0.3 / 10.0.1 * The other dependencies are listed in `requirements.txt` and are auto-installed in the following steps. +* If you are 4DN-DCIC user, use the dependencies specified in `requirements-4dn.txt`. These include all the base requirements in `requirements.txt`, as well as other 4DN-specific pakages. ### Admin As admin, you need to first set up Tibanna environment on your AWS account and create a usergroup with a shared permission to the environment. @@ -52,7 +53,7 @@ python -m pip install pip==9.0.3 # or curl https://bootstrap.pypa.io/get-pip.py git clone https://github.com/4dn-dcic/tibanna cd tibanna -pip install -r requirements.txt +pip install -r requirements.txt # if you're 4dn-dcic, use requirements-4dn.txt instead ``` Set up `awscli`: for more details see https://github.com/4dn-dcic/tibanna/blob/master/tutorials/tibanna_unicorn.md#set-up-aws-cli @@ -89,11 +90,11 @@ Tibanna usergroup default_6206 has been created on AWS. Then, deploy tibanna (unicorn) to your aws account for a specific user group (for more details about tibanna deployment, see below) * Note: you can only use unicorn (the core with no communication with 4DN portal). Pony is reserved for 4DN-DCIC. ``` -invoke deploy_tibanna --usergroup= --no-tests --sfn-type=unicorn +invoke deploy_tibanna --usergroup= --sfn-type=unicorn ``` As an exmple, ``` -invoke deploy_tibanna --usergroup=default_6206 --no-tests --sfn-type=unicorn +invoke deploy_tibanna --usergroup=default_6206 --sfn-type=unicorn ``` To run a workflow on the tibanna (unicorn) deployed for the usergroup (for more details about running workflows, see below), @@ -147,7 +148,7 @@ SECRET # aws secret key To create a copy of tibanna (step function + lambdas) ``` -invoke deploy_tibanna [--suffix=] [--sfn_type=] [--no-tests] +invoke deploy_tibanna [--suffix=] [--sfn_type=] [--tests] # (use suffix for development version) # example : dev # (step function type) is either 'pony' or 'unicorn' (default pony) @@ -162,7 +163,7 @@ The above command will create a step function named `tibanna_pony_dev2` that use ``` invoke deploy_tibanna --suffix=dev --sfn_type=unicorn ``` -This example creates a step function named `tibanna_unicorn_dev` that uses a set of lambdas with suffix `_dev`, and deploys these lambdas. +This example creates a step function named `tibanna_unicorn_dev` that uses a set of lambdas with suffix `_dev`, and deploys these lambdas. Using the `--tests` argument will ensure tests pass befor deploying; currently this is **NOT** available for users outside of 4DN-DCIC. To deploy lambda functions (use suffix for development version lambdas) ``` @@ -258,16 +259,16 @@ cat webdevtestlist | xargs -I{} sh -c "invoke run_workflow --workflow=tibanna_po "uuid": "1f53df95-4cf3-41cc-971d-81bb16c486dd", "bucket_name": "elasticbeanstalk-fourfront-webdev-files" }, - { - "workflow_argument_name": "fastq1", + { + "workflow_argument_name": "fastq1", "bucket_name": "elasticbeanstalk-fourfront-webdev-files", - "uuid": "1150b428-272b-4a0c-b3e6-4b405c148f7c", + "uuid": "1150b428-272b-4a0c-b3e6-4b405c148f7c", "object_key": "4DNFIVOZN511.fastq.gz" }, - { - "workflow_argument_name": "fastq2", + { + "workflow_argument_name": "fastq2", "bucket_name": "elasticbeanstalk-fourfront-webdev-files", - "uuid": "f4864029-a8ad-4bb8-93e7-5108f462ccaa", + "uuid": "f4864029-a8ad-4bb8-93e7-5108f462ccaa", "object_key": "4DNFIRSRJH45.fastq.gz" } ], diff --git a/core/_version.py b/core/_version.py index 1da5f08e3..234b49909 100644 --- a/core/_version.py +++ b/core/_version.py @@ -1,4 +1,4 @@ """Version information.""" # The following line *must* be the last in the module, exactly as formatted: -__version__ = "0.3.7" +__version__ = "0.3.8" diff --git a/core/check_task_awsem/service.py b/core/check_task_awsem/service.py index f0eaf15f4..899b538e0 100644 --- a/core/check_task_awsem/service.py +++ b/core/check_task_awsem/service.py @@ -1,6 +1,7 @@ # -*- coding: utf-8 -*- -from dcicutils import s3_utils +import logging +import boto3 from core.utils import ( StillRunningException, EC2StartingException, @@ -8,6 +9,9 @@ powerup ) import json +from core.ec2_utils import does_key_exist + +LOG = logging.getLogger(__name__) def metadata_only(event): @@ -15,6 +19,12 @@ def metadata_only(event): return event +def read_s3(bucket, object_name): + response = boto3.client('s3').get_object(Bucket=bucket, Key=object_name) + LOG.info(str(response)) + return response['Body'].read() + + @powerup('check_task_awsem', metadata_only) def handler(event, context): ''' @@ -24,7 +34,6 @@ def handler(event, context): # s3 bucket that stores the output bucket_name = event['config']['log_bucket'] - s3 = s3_utils.s3Utils(bucket_name, bucket_name, bucket_name) # info about the jobby job jobid = event['jobid'] @@ -37,18 +46,18 @@ def handler(event, context): postrunjson_location = "https://s3.amazonaws.com/%s/%s" % (bucket_name, postrunjson) # check to see ensure this job has started else fail - if not s3.does_key_exist(job_started): + if not does_key_exist(bucket_name, job_started): raise EC2StartingException("Failed to find jobid %s, ec2 is probably still booting" % jobid) # check to see if job has error, report if so - if s3.does_key_exist(job_error): + if does_key_exist(bucket_name, job_error): raise AWSEMJobErrorException("Job encountered an error check log at %s" % job_log_location) # check to see if job has completed if not throw retry error - if s3.does_key_exist(job_success): - if not s3.does_key_exist(postrunjson): + if does_key_exist(bucket_name, job_success): + if not does_key_exist(bucket_name, postrunjson): raise Exception("Postrun json not found at %s" % postrunjson_location) - postrunjsoncontent = json.loads(s3.read_s3(postrunjson)) + postrunjsoncontent = json.loads(read_s3(bucket_name, postrunjson)) if len(str(postrunjsoncontent)) + len(str(event)) < RESPONSE_JSON_CONTENT_INCLUSION_LIMIT: event['postrunjson'] = postrunjsoncontent else: diff --git a/core/ec2_utils.py b/core/ec2_utils.py index 2652a5a33..093c23c61 100644 --- a/core/ec2_utils.py +++ b/core/ec2_utils.py @@ -8,7 +8,6 @@ import subprocess import logging # from invoke import run -from dcicutils import s3_utils import botocore.session import boto3 from Benchmark import run as B @@ -274,17 +273,16 @@ def update_config(config, app_name, input_files, parameters): input_size_in_bytes = dict() for argname, f in input_files.iteritems(): bucket = f['bucket_name'] - s3 = s3_utils.s3Utils(bucket, bucket, bucket) if isinstance(f['object_key'], list): size = [] for key in f['object_key']: try: - size.append(s3.get_file_size(key, bucket)) + size.append(get_file_size(key, bucket)) except: raise Exception("Can't get input file size") else: try: - size = s3.get_file_size(f['object_key'], bucket) + size = get_file_size(f['object_key'], bucket) except: raise Exception("Can't get input file size") input_size_in_bytes.update({str(argname): size}) @@ -320,100 +318,26 @@ def update_config(config, app_name, input_files, parameters): raise Exception("EBS_optimized cannot be determined nor given") -class WorkflowFile(object): - - def __init__(self, bucket, key, runner, accession=None, output_type=None, - filesize=None, md5=None): - self.bucket = bucket - self.key = key - self.s3 = s3_utils.s3Utils(self.bucket, self.bucket, self.bucket) - self.runner = runner - self.accession = accession - self.output_type = output_type - self.filesize = filesize - self.md5 = md5 - - @property - def status(self): - exists = self.s3.does_key_exist(self.key, self.bucket) - if exists: - return "COMPLETED" - else: - return "FAILED" - - def read(self): - return self.s3.read_s3(self.key).strip() - - -# TODO: refactor this to inherit from an abstrat class called Runner -# then implement for SBG as well -class Awsem(object): - - def __init__(self, json): - self.args = json['args'] - self.config = json['config'] - self.output_s3 = self.args['output_S3_bucket'] - self.app_name = self.args['app_name'] - self.output_files_meta = json['ff_meta']['output_files'] - self.output_info = None - if isinstance(json.get('postrunjson'), dict): - self.output_info = json['postrunjson']['Job']['Output']['Output files'] - - def output_files(self): - files = dict() - output_types = dict() - for x in self.output_files_meta: - output_types[x['workflow_argument_name']] = x['type'] - for k, v in self.args.get('output_target').iteritems(): - if k in output_types: - out_type = output_types[k] - else: - out_type = None - if out_type == 'Output processed file': - file_name = v.split('/')[-1] - accession = file_name.split('.')[0].strip('/') - else: - accession = None - if self.output_info: - md5 = self.output_info[k].get('md5sum', '') - filesize = self.output_info[k].get('size', 0) - wff = {k: WorkflowFile(self.output_s3, v, self, accession, - output_type=out_type, filesize=filesize, md5=md5)} - else: - wff = {k: WorkflowFile(self.output_s3, v, self, accession, - output_type=out_type)} - files.update(wff) - return files - - def secondary_output_files(self): - files = dict() - for k, v in self.args.get('secondary_output_target').iteritems(): - wff = {k: WorkflowFile(self.output_s3, v, self)} - files.update(wff) - return files - - def input_files(self): - files = dict() - for arg_name, item in self.args.get('input_files').iteritems(): - file_name = item.get('object_key').split('/')[-1] - accession = file_name.split('.')[0].strip('/') - wff = {arg_name: WorkflowFile(item.get('bucket_name'), - item.get('object_key'), - self, - accession)} - files.update(wff) - return files - - def all_files(self): - files = dict() - files.update(self.input_files()) - files.update(self.output_files()) - return files - - @property - def inputfile_accessions(self): - return {k: v.accession for k, v in self.input_files().iteritems()} - - @property - def all_file_accessions(self): - return {k: v.accession for k, v in self.all_files().iteritems()} +def get_file_size(key, bucket, size_in_gb=False): + ''' + default returns file size in bytes, + unless size_in_gb = True + ''' + meta = does_key_exist(bucket, key) + if not meta: + raise Exception("key not found") + one_gb = 1073741824 + size = meta['ContentLength'] + if size_in_gb: + size = size / one_gb + return size + + +def does_key_exist(bucket, object_name): + try: + file_metadata = boto3.client('s3').head_object(Bucket=bucket, Key=object_name) + except Exception as e: + print("object %s not found on bucket %s" % (str(object_name), str(bucket))) + print(str(e)) + return False + return file_metadata diff --git a/core/pony_utils.py b/core/pony_utils.py new file mode 100644 index 000000000..d496cc835 --- /dev/null +++ b/core/pony_utils.py @@ -0,0 +1,502 @@ +from __future__ import print_function +import json +import os +import sys +import datetime +from uuid import uuid4 +from dcicutils.ff_utils import ( + get_metadata, + post_metadata, + patch_metadata, + generate_rand_accession, + search_metadata +) +from dcicutils.s3_utils import s3Utils +from core.utils import run_workflow as _run_workflow +from core.utils import _tibanna_settings +from time import sleep +import logging + +########################################### +# These utils exclusively live in Tibanna # +########################################### + + +########################### +# Config +########################### + +# logger +LOG = logging.getLogger(__name__) + + +def create_ffmeta_awsem(workflow, app_name, input_files=None, + parameters=None, title=None, uuid=None, + output_files=None, award='1U01CA200059-01', lab='4dn-dcic-lab', + run_status='started', run_platform='AWSEM', run_url='', tag=None, + aliases=None, awsem_postrun_json=None, submitted_by=None, extra_meta=None, + **kwargs): + + input_files = [] if input_files is None else input_files + parameters = [] if parameters is None else parameters + if award is None: + award = '1U01CA200059-01' + if lab is None: + lab = '4dn-dcic-lab' + + if title is None: + if tag is None: + title = app_name + " run " + str(datetime.datetime.now()) + else: + title = app_name + ' ' + tag + " run " + str(datetime.datetime.now()) + + return WorkflowRunMetadata(workflow=workflow, app_name=app_name, input_files=input_files, + parameters=parameters, uuid=uuid, award=award, + lab=lab, run_platform=run_platform, run_url=run_url, + title=title, output_files=output_files, run_status=run_status, + aliases=aliases, awsem_postrun_json=awsem_postrun_json, + submitted_by=submitted_by, extra_meta=extra_meta) + + +class WorkflowRunMetadata(object): + ''' + fourfront metadata + ''' + + def __init__(self, workflow, app_name, input_files=[], + parameters=[], uuid=None, + award='1U01CA200059-01', lab='4dn-dcic-lab', + run_platform='AWSEM', title=None, output_files=None, + run_status='started', awsem_job_id=None, + run_url='', aliases=None, awsem_postrun_json=None, + submitted_by=None, extra_meta=None, **kwargs): + """Class for WorkflowRun that matches the 4DN Metadata schema + Workflow (uuid of the workflow to run) has to be given. + Workflow_run uuid is auto-generated when the object is created. + """ + if run_platform == 'AWSEM': + self.awsem_app_name = app_name + # self.app_name = app_name + if awsem_job_id is None: + self.awsem_job_id = '' + else: + self.awsem_job_id = awsem_job_id + else: + raise Exception("invalid run_platform {} - it must be AWSEM".format(run_platform)) + + self.run_status = run_status + self.uuid = uuid if uuid else str(uuid4()) + self.workflow = workflow + self.run_platform = run_platform + if run_url: + self.run_url = run_url + + self.title = title + if aliases: + if isinstance(aliases, basestring): # noqa + aliases = [aliases, ] + self.aliases = aliases + self.input_files = input_files + if output_files: + self.output_files = output_files + self.parameters = parameters + self.award = award + self.lab = lab + if awsem_postrun_json: + self.awsem_postrun_json = awsem_postrun_json + if submitted_by: + self.submitted_by = submitted_by + + if extra_meta: + for k, v in extra_meta.iteritems(): + self.__dict__[k] = v + + def append_outputfile(self, outjson): + self.output_files.append(outjson) + + def as_dict(self): + return self.__dict__ + + def toJSON(self): + return json.dumps(self, default=lambda o: o.__dict__, sort_keys=True, indent=4) + + def post(self, key, type_name=None): + if not type_name: + if self.run_platform == 'AWSEM': + type_name = 'workflow_run_awsem' + else: + raise Exception("cannot determine workflow schema type from the run platform: should be AWSEM.") + return post_metadata(self.as_dict(), type_name, key=key) + + def patch(self, key, type_name=None): + return patch_metadata(self.as_dict(), key=key) + + +class ProcessedFileMetadata(object): + def __init__(self, uuid=None, accession=None, file_format='', lab='4dn-dcic-lab', + extra_files=None, source_experiments=None, + award='1U01CA200059-01', status='to be uploaded by workflow', + md5sum=None, file_size=None, other_fields=None, **kwargs): + self.uuid = uuid if uuid else str(uuid4()) + self.accession = accession if accession else generate_rand_accession() + self.status = status + self.lab = lab + self.award = award + self.file_format = file_format + if extra_files: + self.extra_files = extra_files + if source_experiments: + self.source_experiments = source_experiments + if md5sum: + self.md5sum = md5sum + if file_size: + self.file_size = file_size + if other_fields: + for field in other_fields: + setattr(self, field, other_fields[field]) + + def as_dict(self): + return self.__dict__ + + def toJSON(self): + return json.dumps(self, default=lambda o: o.__dict__, sort_keys=True, indent=4) + + def post(self, key): + return post_metadata(self.as_dict(), "file_processed", key=key, add_on='force_md5') + + def patch(self, key): + return patch_metadata(self.as_dict(), key=key, add_on='force_md5') + + @classmethod + def get(cls, uuid, key, ff_env=None, check_queue=False, return_data=False): + data = get_metadata(uuid, + key=key, + ff_env=ff_env, + add_on='frame=object', + check_queue=check_queue) + if type(data) is not dict: + raise Exception("unable to find object with unique key of %s" % uuid) + if 'FileProcessed' not in data.get('@type', {}): + raise Exception("you can only load ProcessedFiles into this object") + + pf = ProcessedFileMetadata(**data) + if return_data: + return pf, data + else: + return pf + + +def aslist(x): + if isinstance(x, list): + return x + else: + return [x] + + +def ensure_list(val): + if isinstance(val, (list, tuple)): + return val + return [val] + + +def get_extra_file_key(infile_format, infile_key, extra_file_format, fe_map): + infile_extension = fe_map.get(infile_format) + extra_file_extension = fe_map.get(extra_file_format) + return infile_key.replace(infile_extension, extra_file_extension) + + +def get_format_extension_map(ff_keys): + try: + fp_schema = get_metadata("profiles/file_processed.json", key=ff_keys) + fe_map = fp_schema.get('file_format_file_extension') + except Exception as e: + raise Exception("Can't get format-extension map from file_processed schema. %s\n" % e) + return fe_map + + +def get_source_experiment(input_file_uuid, ff_keys, ff_env): + """ + Connects to fourfront and get source experiment info as a unique list + Takes a single input file uuid. + """ + pf_source_experiments_set = set() + inf_uuids = aslist(input_file_uuid) + for inf_uuid in inf_uuids: + infile_meta = get_metadata(inf_uuid, + key=ff_keys, + ff_env=ff_env, + add_on='frame=object') + if infile_meta.get('experiments'): + for exp in infile_meta.get('experiments'): + exp_obj = get_metadata(exp, + key=ff_keys, + ff_env=ff_env, + add_on='frame=raw') + pf_source_experiments_set.add(exp_obj['uuid']) + if infile_meta.get('source_experiments'): + # this field is an array of strings, not linkTo's + pf_source_experiments_set.update(infile_meta.get('source_experiments')) + return list(pf_source_experiments_set) + + +def merge_source_experiments(input_file_uuids, ff_keys, ff_env=None): + """ + Connects to fourfront and get source experiment info as a unique list + Takes a list of input file uuids. + """ + pf_source_experiments = set() + for input_file_uuid in input_file_uuids: + pf_source_experiments.update(get_source_experiment(input_file_uuid, ff_keys, ff_env)) + return list(pf_source_experiments) + + +class Tibanna(object): + + def __init__(self, env, ff_keys=None, sbg_keys=None, settings=None): + self.env = env + self.s3 = s3Utils(env=env) + + if not ff_keys: + ff_keys = self.s3.get_access_keys() + self.ff_keys = ff_keys + + if not settings: + settings = {} + self.settings = settings + + def get_reporter(self): + ''' + a reporter is a generic name for somethign that reports on the results of each step + of the workflow. For our immediate purposes this will return ffmetadata object + (which is a connection to our metadata repository, eventually it should, through + polymorphism support any type of reporter a user might develop. + ''' + return None + + def get_runner(self): + ''' + a runner is an object that implements a set api to run the workflow one step at a time. + Currently this is sbg for us. + ''' + return None + + def as_dict(self): + return {'env': self.env, + 'settings': self.settings} + + +def current_env(): + return os.environ.get('ENV_NAME', 'test') + + +def is_prod(): + return current_env().lower() == 'prod' + + +class WorkflowFile(object): + + def __init__(self, bucket, key, runner, accession=None, output_type=None, + filesize=None, md5=None): + self.bucket = bucket + self.key = key + self.s3 = s3Utils(self.bucket, self.bucket, self.bucket) + self.runner = runner + self.accession = accession + self.output_type = output_type + self.filesize = filesize + self.md5 = md5 + + @property + def status(self): + exists = self.s3.does_key_exist(self.key, self.bucket) + if exists: + return "COMPLETED" + else: + return "FAILED" + + def read(self): + return self.s3.read_s3(self.key).strip() + + +# TODO: refactor this to inherit from an abstrat class called Runner +# then implement for SBG as well +class Awsem(object): + + def __init__(self, json): + self.args = json['args'] + self.config = json['config'] + self.output_s3 = self.args['output_S3_bucket'] + self.app_name = self.args['app_name'] + self.output_files_meta = json['ff_meta']['output_files'] + self.output_info = None + if isinstance(json.get('postrunjson'), dict): + self.output_info = json['postrunjson']['Job']['Output']['Output files'] + + def output_files(self): + files = dict() + output_types = dict() + for x in self.output_files_meta: + output_types[x['workflow_argument_name']] = x['type'] + for k, v in self.args.get('output_target').iteritems(): + if k in output_types: + out_type = output_types[k] + else: + out_type = None + if out_type == 'Output processed file': + file_name = v.split('/')[-1] + accession = file_name.split('.')[0].strip('/') + else: + accession = None + if self.output_info: + md5 = self.output_info[k].get('md5sum', '') + filesize = self.output_info[k].get('size', 0) + wff = {k: WorkflowFile(self.output_s3, v, self, accession, + output_type=out_type, filesize=filesize, md5=md5)} + else: + wff = {k: WorkflowFile(self.output_s3, v, self, accession, + output_type=out_type)} + files.update(wff) + return files + + def secondary_output_files(self): + files = dict() + for k, v in self.args.get('secondary_output_target').iteritems(): + wff = {k: WorkflowFile(self.output_s3, v, self)} + files.update(wff) + return files + + def input_files(self): + files = dict() + for arg_name, item in self.args.get('input_files').iteritems(): + file_name = item.get('object_key').split('/')[-1] + accession = file_name.split('.')[0].strip('/') + wff = {arg_name: WorkflowFile(item.get('bucket_name'), + item.get('object_key'), + self, + accession)} + files.update(wff) + return files + + def all_files(self): + files = dict() + files.update(self.input_files()) + files.update(self.output_files()) + return files + + @property + def inputfile_accessions(self): + return {k: v.accession for k, v in self.input_files().iteritems()} + + @property + def all_file_accessions(self): + return {k: v.accession for k, v in self.all_files().iteritems()} + + +def run_md5(env, accession, uuid): + tibanna = Tibanna(env=env) + meta_data = get_metadata(accession, key=tibanna.ff_keys) + file_name = meta_data['upload_key'].split('/')[-1] + + input_json = make_input(env=env, workflow='md5', object_key=file_name, uuid=uuid) + return _run_workflow(input_json, accession) + + +def batch_fastqc(env, batch_size=20): + ''' + try to run fastqc on everythign that needs it ran + ''' + files_processed = 0 + files_skipped = 0 + + # handle ctrl-c + import signal + + def report(signum, frame): + print("Processed %s files, skipped %s files" % (files_processed, files_skipped)) + sys.exit(-1) + + signal.signal(signal.SIGINT, report) + + tibanna = Tibanna(env=env) + uploaded_files = search_metadata("search/?type=File&status=uploaded&limit=%s" % batch_size, + key=tibanna.ff_key, ff_env=tibanna.env) + + # TODO: need to change submit 4dn to not overwrite my limit + if len(uploaded_files['@graph']) > batch_size: + limited_files = uploaded_files['@graph'][:batch_size] + else: + limited_files = uploaded_files['@graph'] + + for ufile in limited_files: + fastqc_run = False + for wfrun in ufile.get('workflow_run_inputs', []): + if 'fastqc' in wfrun: + fastqc_run = True + if not fastqc_run: + print("running fastqc for %s" % ufile.get('accession')) + run_fastqc(env, ufile.get('accession'), ufile.get('uuid')) + files_processed += 1 + else: + print("******** fastqc already run for %s skipping" % ufile.get('accession')) + files_skipped += 1 + sleep(5) + if files_processed % 10 == 0: + sleep(60) + + print("Processed %s files, skipped %s files" % (files_processed, files_skipped)) + + +def run_fastqc(env, accession, uuid): + if not accession.endswith(".fastq.gz"): + accession += ".fastq.gz" + input_json = make_input(env=env, workflow='fastqc-0-11-4-1', object_key=accession, uuid=uuid) + return _run_workflow(input_json, accession) + + +_workflows = {'md5': + {'uuid': 'd3f25cd3-e726-4b3c-a022-48f844474b41', + 'arg_name': 'input_file' + }, + 'fastqc-0-11-4-1': + {'uuid': '2324ad76-ff37-4157-8bcc-3ce72b7dace9', + 'arg_name': 'input_fastq' + }, + } + + +def make_input(env, workflow, object_key, uuid): + bucket = "elasticbeanstalk-%s-files" % env + output_bucket = "elasticbeanstalk-%s-wfoutput" % env + workflow_uuid = _workflows[workflow]['uuid'] + workflow_arg_name = _workflows[workflow]['arg_name'] + + data = {"parameters": {}, + "app_name": workflow, + "workflow_uuid": workflow_uuid, + "input_files": [ + {"workflow_argument_name": workflow_arg_name, + "bucket_name": bucket, + "uuid": uuid, + "object_key": object_key, + } + ], + "output_bucket": output_bucket, + "config": { + "ebs_type": "io1", + "json_bucket": "4dn-aws-pipeline-run-json", + "ebs_iops": 500, + "shutdown_min": 30, + "ami_id": "ami-cfb14bb5", + "copy_to_s3": True, + "script_url": "https://raw.githubusercontent.com/4dn-dcic/tibanna/master/awsf/", + "launch_instance": True, + "password": "thisisnotmypassword", + "log_bucket": "tibanna-output", + "key_name": "" + }, + } + data.update(_tibanna_settings({'run_id': str(object_key), + 'run_type': workflow, + 'env': env, + })) + return data diff --git a/core/run_task_awsem/event_md5.json b/core/run_task_awsem/event_md5.json index b910e10d4..ade2e2809 100644 --- a/core/run_task_awsem/event_md5.json +++ b/core/run_task_awsem/event_md5.json @@ -8,6 +8,7 @@ "report": "3c5eb494-38de-472c-a596-c50dde2fd51e/report" }, "cwl_main_filename": "md5.cwl", + "cwl_version": "draft3", "secondary_files": {}, "output_S3_bucket": "elasticbeanstalk-fourfront-webprod-wfoutput", "app_version": "0.0.4", @@ -60,7 +61,6 @@ "pf_meta": [], "parameters": {}, "config": { - "cwl_version": "draft3", "ebs_size": 0, "ebs_type": "io1", "key_name": "", @@ -68,9 +68,7 @@ "ebs_iops": 500, "shutdown_min": 30, "instance_type": "", - "copy_to_s3": true, - "json_bucket": "4dn-aws-pipeline-run-json", "launch_instance": true, "password": "thisisnotmypassword", @@ -84,5 +82,6 @@ "workflow_argument_name": "input_file", "object_key": "4DNFIZQZ39L9.fastq.gz" } - ] + ], + "push_error_to_end": true } diff --git a/core/run_task_awsem/event_repliseq.json b/core/run_task_awsem/event_repliseq.json index c7e8b2439..fb774462b 100644 --- a/core/run_task_awsem/event_repliseq.json +++ b/core/run_task_awsem/event_repliseq.json @@ -25,6 +25,7 @@ "output_S3_bucket": "elasticbeanstalk-fourfront-webdev-wfoutput", "app_version": null, "cwl_directory_url": "https://raw.githubusercontent.com/4dn-dcic/pipelines-cwl/dev/cwl_awsem/", + "cwl_version": "draft3", "input_files": { "bwaIndex": { "bucket_name": "elasticbeanstalk-fourfront-webdev-files", @@ -184,7 +185,6 @@ "nthreads": 4 }, "config": { - "cwl_version": "draft3", "ebs_size": 0, "ebs_type": "io1", "json_bucket": "4dn-aws-pipeline-run-json", diff --git a/core/start_run_awsem/service.py b/core/start_run_awsem/service.py index 8c42604cb..d6bd0547a 100644 --- a/core/start_run_awsem/service.py +++ b/core/start_run_awsem/service.py @@ -3,10 +3,10 @@ # import json import boto3 from dcicutils import ff_utils -from core.utils import ( +from core.utils import powerup +from core.utils import TibannaStartException +from core.pony_utils import ( Tibanna, - powerup, - TibannaStartException, merge_source_experiments, ensure_list, create_ffmeta_awsem, @@ -29,6 +29,8 @@ def metadata_only(event): @powerup('start_run_awsem', metadata_only) def handler(event, context): + if event.get('push_error_to_end', True): + event['push_error_to_end'] = True # push error to end by default for pony return real_handler(event, context) diff --git a/core/update_ffmeta_awsem/service.py b/core/update_ffmeta_awsem/service.py index 6dca3bdc3..42bb0c71c 100644 --- a/core/update_ffmeta_awsem/service.py +++ b/core/update_ffmeta_awsem/service.py @@ -1,7 +1,8 @@ # -*- coding: utf-8 -*- import logging from dcicutils import ff_utils -from core import utils, ec2_utils +from core import pony_utils +from core.utils import powerup import boto3 from collections import defaultdict from core.fastqc_utils import parse_qc_table @@ -45,8 +46,8 @@ def update_processed_file_metadata(status, pf, tibanna, export): if pf.file_format == 'bg' and export.bucket in ff_utils.HIGLASS_BUCKETS: for pfextra in pf.extra_files: if pfextra.get('file_format') == 'bw': - fe_map = utils.get_format_extension_map(ff_key) - extra_file_key = utils.get_extra_file_key('bg', export.key, 'bw', fe_map) + fe_map = pony_utils.get_format_extension_map(ff_key) + extra_file_key = pony_utils.get_extra_file_key('bg', export.key, 'bw', fe_map) pf.__dict__['higlass_uid'] = register_to_higlass( tibanna, export.bucket, extra_file_key, 'bigwig', 'vector' ) @@ -254,7 +255,7 @@ def metadata_only(event): return real_handler(event, None) -@utils.powerup('update_ffmeta_awsem', metadata_only) +@powerup('update_ffmeta_awsem', metadata_only) def handler(event, context): return real_handler(event, context) @@ -268,16 +269,16 @@ def real_handler(event, context): # get data # used to automatically determine the environment tibanna_settings = event.get('_tibanna', {}) - tibanna = utils.Tibanna(tibanna_settings['env'], settings=tibanna_settings) - ff_meta = utils.create_ffmeta_awsem( + tibanna = pony_utils.Tibanna(tibanna_settings['env'], settings=tibanna_settings) + ff_meta = pony_utils.create_ffmeta_awsem( app_name=event.get('ff_meta').get('awsem_app_name'), **event.get('ff_meta') ) - pf_meta = [utils.ProcessedFileMetadata(**pf) for pf in event.get('pf_meta')] + pf_meta = [pony_utils.ProcessedFileMetadata(**pf) for pf in event.get('pf_meta')] # ensure this bad boy is always initialized patch_meta = False - awsem = ec2_utils.Awsem(event) + awsem = pony_utils.Awsem(event) # go through this and replace export_report with awsf format # actually interface should be look through ff_meta files and call diff --git a/core/utils.py b/core/utils.py index 22f82ab36..ea2de5c99 100644 --- a/core/utils.py +++ b/core/utils.py @@ -1,38 +1,57 @@ from __future__ import print_function -import json -import boto3 -import os -import datetime -from uuid import uuid4 -from dcicutils.ff_utils import ( - get_metadata, - post_metadata, - patch_metadata, - generate_rand_accession -) -from dcicutils.s3_utils import s3Utils from core.iam_utils import get_stepfunction_role_name import logging import traceback +import os +import boto3 +import json +from uuid import uuid4 ########################################### # These utils exclusively live in Tibanna # ########################################### -########################### -# Config -########################### - -# production step function AWS_ACCOUNT_NUMBER = os.environ.get('AWS_ACCOUNT_NUMBER') AWS_REGION = os.environ.get('TIBANNA_AWS_REGION') BASE_ARN = 'arn:aws:states:' + AWS_REGION + ':' + AWS_ACCOUNT_NUMBER + ':%s:%s' WORKFLOW_NAME = 'tibanna_pony' STEP_FUNCTION_ARN = BASE_ARN % ('stateMachine', WORKFLOW_NAME) + # just store this in one place _tibanna = '_tibanna' + +def _tibanna_settings(settings_patch=None, force_inplace=False, env=''): + tibanna = {"run_id": str(uuid4()), + "env": env, + "url": '', + 'run_type': 'generic', + 'run_name': '', + } + in_place = None + if force_inplace: + if not settings_patch.get(_tibanna): + settings_patch[_tibanna] = {} + if settings_patch: + in_place = settings_patch.get(_tibanna, None) + if in_place is not None: + tibanna.update(in_place) + else: + tibanna.update(settings_patch) + + # generate run name + if not tibanna.get('run_name'): + # aws doesn't like / in names + tibanna['run_name'] = "%s_%s" % (tibanna['run_type'].replace('/', '-'), tibanna['run_id']) + + if in_place is not None: + settings_patch[_tibanna] = tibanna + return settings_patch + else: + return {_tibanna: tibanna} + + # logger LOG = logging.getLogger(__name__) @@ -58,224 +77,54 @@ class FdnConnectionException(Exception): pass -def create_ffmeta_awsem(workflow, app_name, input_files=None, - parameters=None, title=None, uuid=None, - output_files=None, award='1U01CA200059-01', lab='4dn-dcic-lab', - run_status='started', run_platform='AWSEM', run_url='', tag=None, - aliases=None, awsem_postrun_json=None, submitted_by=None, extra_meta=None, - **kwargs): - - input_files = [] if input_files is None else input_files - parameters = [] if parameters is None else parameters - if award is None: - award = '1U01CA200059-01' - if lab is None: - lab = '4dn-dcic-lab' - - if title is None: - if tag is None: - title = app_name + " run " + str(datetime.datetime.now()) - else: - title = app_name + ' ' + tag + " run " + str(datetime.datetime.now()) - - return WorkflowRunMetadata(workflow=workflow, app_name=app_name, input_files=input_files, - parameters=parameters, uuid=uuid, award=award, - lab=lab, run_platform=run_platform, run_url=run_url, - title=title, output_files=output_files, run_status=run_status, - aliases=aliases, awsem_postrun_json=awsem_postrun_json, - submitted_by=submitted_by, extra_meta=extra_meta) - - -class WorkflowRunMetadata(object): +def powerup(lambda_name, metadata_only_func): ''' - fourfront metadata + friendly wrapper for your lambda functions, based on input_json / event comming in... + 1. Logs basic input for all functions + 2. if 'skip' key == 'lambda_name', skip the function + 3. catch exceptions raised by labmda, and if not in list of ignored exceptions, added + the exception to output json + 4. 'metadata' only parameter, if set to true, just create metadata instead of run workflow + ''' + def decorator(function): + import logging + logging.basicConfig() + logger = logging.getLogger('logger') + ignored_exceptions = [EC2StartingException, StillRunningException, + TibannaStartException, FdnConnectionException] - def __init__(self, workflow, app_name, input_files=[], - parameters=[], uuid=None, - award='1U01CA200059-01', lab='4dn-dcic-lab', - run_platform='AWSEM', title=None, output_files=None, - run_status='started', awsem_job_id=None, - run_url='', aliases=None, awsem_postrun_json=None, - submitted_by=None, extra_meta=None, **kwargs): - """Class for WorkflowRun that matches the 4DN Metadata schema - Workflow (uuid of the workflow to run) has to be given. - Workflow_run uuid is auto-generated when the object is created. - """ - if run_platform == 'AWSEM': - self.awsem_app_name = app_name - # self.app_name = app_name - if awsem_job_id is None: - self.awsem_job_id = '' - else: - self.awsem_job_id = awsem_job_id - else: - raise Exception("invalid run_platform {} - it must be AWSEM".format(run_platform)) - - self.run_status = run_status - self.uuid = uuid if uuid else str(uuid4()) - self.workflow = workflow - self.run_platform = run_platform - if run_url: - self.run_url = run_url - - self.title = title - if aliases: - if isinstance(aliases, basestring): # noqa - aliases = [aliases, ] - self.aliases = aliases - self.input_files = input_files - if output_files: - self.output_files = output_files - self.parameters = parameters - self.award = award - self.lab = lab - if awsem_postrun_json: - self.awsem_postrun_json = awsem_postrun_json - if submitted_by: - self.submitted_by = submitted_by - - if extra_meta: - for k, v in extra_meta.iteritems(): - self.__dict__[k] = v - - def append_outputfile(self, outjson): - self.output_files.append(outjson) - - def as_dict(self): - return self.__dict__ - - def toJSON(self): - return json.dumps(self, default=lambda o: o.__dict__, sort_keys=True, indent=4) - - def post(self, key, type_name=None): - if not type_name: - if self.run_platform == 'AWSEM': - type_name = 'workflow_run_awsem' + def wrapper(event, context): + if context: + logger.info(context) + logger.info(event) + if lambda_name in event.get('skip', []): + logger.info('skipping %s since skip was set in input_json' % lambda_name) + return event + elif event.get('push_error_to_end', False) and event.get('error', False) \ + and lambda_name != 'update_ffmeta_awsem': + logger.info('skipping %s since a value for "error" is in input json ' + 'and lambda is not update_ffmeta_awsem' % lambda_name) + return event + elif event.get('metadata_only', False): + return metadata_only_func(event) else: - raise Exception("cannot determine workflow schema type from the run platform: should be AWSEM.") - return post_metadata(self.as_dict(), type_name, key=key) - - def patch(self, key, type_name=None): - return patch_metadata(self.as_dict(), key=key) - - -class ProcessedFileMetadata(object): - def __init__(self, uuid=None, accession=None, file_format='', lab='4dn-dcic-lab', - extra_files=None, source_experiments=None, - award='1U01CA200059-01', status='to be uploaded by workflow', - md5sum=None, file_size=None, other_fields=None, **kwargs): - self.uuid = uuid if uuid else str(uuid4()) - self.accession = accession if accession else generate_rand_accession() - self.status = status - self.lab = lab - self.award = award - self.file_format = file_format - if extra_files: - self.extra_files = extra_files - if source_experiments: - self.source_experiments = source_experiments - if md5sum: - self.md5sum = md5sum - if file_size: - self.file_size = file_size - if other_fields: - for field in other_fields: - setattr(self, field, other_fields[field]) - - def as_dict(self): - return self.__dict__ - - def toJSON(self): - return json.dumps(self, default=lambda o: o.__dict__, sort_keys=True, indent=4) - - def post(self, key): - return post_metadata(self.as_dict(), "file_processed", key=key, add_on='force_md5') - - def patch(self, key): - return patch_metadata(self.as_dict(), key=key, add_on='force_md5') - - @classmethod - def get(cls, uuid, key, ff_env=None, check_queue=False, return_data=False): - data = get_metadata(uuid, - key=key, - ff_env=ff_env, - add_on='frame=object', - check_queue=check_queue) - if type(data) is not dict: - raise Exception("unable to find object with unique key of %s" % uuid) - if 'FileProcessed' not in data.get('@type', {}): - raise Exception("you can only load ProcessedFiles into this object") - - pf = ProcessedFileMetadata(**data) - if return_data: - return pf, data - else: - return pf - - -def aslist(x): - if isinstance(x, list): - return x - else: - return [x] - - -def ensure_list(val): - if isinstance(val, (list, tuple)): - return val - return [val] - - -def get_extra_file_key(infile_format, infile_key, extra_file_format, fe_map): - infile_extension = fe_map.get(infile_format) - extra_file_extension = fe_map.get(extra_file_format) - return infile_key.replace(infile_extension, extra_file_extension) - - -def get_format_extension_map(ff_keys): - try: - fp_schema = get_metadata("profiles/file_processed.json", key=ff_keys) - fe_map = fp_schema.get('file_format_file_extension') - except Exception as e: - raise Exception("Can't get format-extension map from file_processed schema. %s\n" % e) - return fe_map - - -def get_source_experiment(input_file_uuid, ff_keys, ff_env): - """ - Connects to fourfront and get source experiment info as a unique list - Takes a single input file uuid. - """ - pf_source_experiments_set = set() - inf_uuids = aslist(input_file_uuid) - for inf_uuid in inf_uuids: - infile_meta = get_metadata(inf_uuid, - key=ff_keys, - ff_env=ff_env, - add_on='frame=object') - if infile_meta.get('experiments'): - for exp in infile_meta.get('experiments'): - exp_obj = get_metadata(exp, - key=ff_keys, - ff_env=ff_env, - add_on='frame=raw') - pf_source_experiments_set.add(exp_obj['uuid']) - if infile_meta.get('source_experiments'): - # this field is an array of strings, not linkTo's - pf_source_experiments_set.update(infile_meta.get('source_experiments')) - return list(pf_source_experiments_set) - - -def merge_source_experiments(input_file_uuids, ff_keys, ff_env=None): - """ - Connects to fourfront and get source experiment info as a unique list - Takes a list of input file uuids. - """ - pf_source_experiments = set() - for input_file_uuid in input_file_uuids: - pf_source_experiments.update(get_source_experiment(input_file_uuid, ff_keys, ff_env)) - return list(pf_source_experiments) + try: + return function(event, context) + except Exception as e: + if type(e) in ignored_exceptions: + raise e + # update ff_meta to error status + elif lambda_name == 'update_ffmeta_awsem' or not event.get('push_error_to_end', False): + # for last step just pit out error + raise e + else: + error_msg = 'Error on step: %s. Full traceback: %s' % (lambda_name, traceback.format_exc()) + event['error'] = error_msg + logger.info(error_msg) + return event + return wrapper + return decorator def run_workflow(input_json, accession='', workflow='tibanna_pony', @@ -457,124 +306,3 @@ def create_stepfunction(dev_suffix=None, raise(e) # sfn_arn = response['stateMachineArn'] return(response) - - -class Tibanna(object): - - def __init__(self, env, ff_keys=None, sbg_keys=None, settings=None): - self.env = env - self.s3 = s3Utils(env=env) - - if not ff_keys: - ff_keys = self.s3.get_access_keys() - self.ff_keys = ff_keys - - if not settings: - settings = {} - self.settings = settings - - def get_reporter(self): - ''' - a reporter is a generic name for somethign that reports on the results of each step - of the workflow. For our immediate purposes this will return ffmetadata object - (which is a connection to our metadata repository, eventually it should, through - polymorphism support any type of reporter a user might develop. - ''' - return None - - def get_runner(self): - ''' - a runner is an object that implements a set api to run the workflow one step at a time. - Currently this is sbg for us. - ''' - return None - - def as_dict(self): - return {'env': self.env, - 'settings': self.settings} - - -def _tibanna_settings(settings_patch=None, force_inplace=False, env=''): - tibanna = {"run_id": str(uuid4()), - "env": env, - "url": '', - 'run_type': 'generic', - 'run_name': '', - } - in_place = None - if force_inplace: - if not settings_patch.get(_tibanna): - settings_patch[_tibanna] = {} - if settings_patch: - in_place = settings_patch.get(_tibanna, None) - if in_place is not None: - tibanna.update(in_place) - else: - tibanna.update(settings_patch) - - # generate run name - if not tibanna.get('run_name'): - # aws doesn't like / in names - tibanna['run_name'] = "%s_%s" % (tibanna['run_type'].replace('/', '-'), tibanna['run_id']) - - if in_place is not None: - settings_patch[_tibanna] = tibanna - return settings_patch - else: - return {_tibanna: tibanna} - - -def current_env(): - return os.environ.get('ENV_NAME', 'test') - - -def is_prod(): - return current_env().lower() == 'prod' - - -def powerup(lambda_name, metadata_only_func): - ''' - friendly wrapper for your lambda functions, based on input_json / event comming in... - 1. Logs basic input for all functions - 2. if 'skip' key == 'lambda_name', skip the function - 3. catch exceptions raised by labmda, and if not in list of ignored exceptions, added - the exception to output json - 4. 'metadata' only parameter, if set to true, just create metadata instead of run workflow - - ''' - def decorator(function): - import logging - logging.basicConfig() - logger = logging.getLogger('logger') - ignored_exceptions = [EC2StartingException, StillRunningException, - TibannaStartException, FdnConnectionException] - - def wrapper(event, context): - logger.info(context) - logger.info(event) - if lambda_name in event.get('skip', []): - logger.info('skipping %s since skip was set in input_json' % lambda_name) - return event - elif event.get('error', False) and lambda_name != 'update_ffmeta_awsem': - logger.info('skipping %s since a value for "error" is in input json ' - 'and lambda is not update_ffmeta_awsem' % lambda_name) - return event - elif event.get('metadata_only', False): - return metadata_only_func(event) - else: - try: - return function(event, context) - except Exception as e: - if type(e) in ignored_exceptions: - raise e - # update ff_meta to error status - elif lambda_name == 'update_ffmeta_awsem': - # for last step just pit out error - raise e - else: - error_msg = 'Error on step: %s. Full traceback: %s' % (lambda_name, traceback.format_exc()) - event['error'] = error_msg - logger.info(error_msg) - return event - return wrapper - return decorator diff --git a/core/validate_md5_s3_trigger/service.py b/core/validate_md5_s3_trigger/service.py index c972cd741..b6654319e 100644 --- a/core/validate_md5_s3_trigger/service.py +++ b/core/validate_md5_s3_trigger/service.py @@ -1,6 +1,7 @@ # -*- coding: utf-8 -*- import boto3 -from core.utils import _tibanna_settings, Tibanna, STEP_FUNCTION_ARN +from core.utils import _tibanna_settings, STEP_FUNCTION_ARN +from core.pony_utils import Tibanna from dcicutils.ff_utils import get_metadata import json diff --git a/requirements-4dn.txt b/requirements-4dn.txt new file mode 100644 index 000000000..4ceb51c4e --- /dev/null +++ b/requirements-4dn.txt @@ -0,0 +1,12 @@ +dcicutils==0.4.0 +-e git+https://github.com/4dn-dcic/python-lambda.git#egg=python_lambda +-e git+https://github.com/SooLee/Benchmark.git#egg=Benchmark +# for tasks.py +invoke==0.18.1 +# syntax checker +flake8==2.4.1 +# for testing +pytest==3.0.5 +pytest-cov==2.3.1 +pytest-runner +mock diff --git a/requirements-lambda-pony.txt b/requirements-lambda-pony.txt new file mode 100644 index 000000000..070858412 --- /dev/null +++ b/requirements-lambda-pony.txt @@ -0,0 +1,2 @@ +dcicutils==0.4.0 +-e git+https://github.com/SooLee/Benchmark.git#egg=Benchmark diff --git a/requirements-lambda-unicorn.txt b/requirements-lambda-unicorn.txt new file mode 100644 index 000000000..2eeaec5ac --- /dev/null +++ b/requirements-lambda-unicorn.txt @@ -0,0 +1,4 @@ +boto3==1.7.42 +botocore==1.10.42 +urllib3==1.22 +-e git+https://github.com/SooLee/Benchmark.git#egg=Benchmark diff --git a/requirements.txt b/requirements.txt index 90fbbf781..d424a9cd3 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,19 +1,16 @@ awscli==1.15.42 boto3==1.7.42 botocore==1.10.42 +urllib3==1.22 +requests==2.19.1 -e git+https://github.com/4dn-dcic/python-lambda.git#egg=python_lambda -e git+https://github.com/SooLee/Benchmark.git#egg=Benchmark -# Build tasks +# for tasks.py invoke==0.18.1 -# Syntax checking +# syntax checker flake8==2.4.1 -aws_requests_auth==0.4.1 -elasticsearch-curator==5.5.4 -elasticsearch==5.5.2 -pytest-runner -dcicutils==0.3.4 -urllib3 # for testing pytest==3.0.5 pytest-cov==2.3.1 +pytest-runner mock diff --git a/setup.py b/setup.py index 7709ebecb..9f1d02126 100644 --- a/setup.py +++ b/setup.py @@ -8,23 +8,22 @@ except: pass # don't know why this fails with tox -requires = [ - 'awscli==1.15.42', - 'boto3==1.7.42', - 'botocore==1.10.42', - 'invoke==0.18.1', - 'flake8==2.4.1', - 'dcicutils==0.3.1', - 'urllib3', - 'packaging@https://github.com/j1z0/python-lambda.git#egg=python_lambda', - 'packaging@https://github.com/SooLee/Benchmark.git#egg=Benchmark' -] +# for the time being, we are not pip packaging, so do not use +# setup_requires or install_requires +with open('requirements-lambda-unicorn.txt') as f: + inst_parsed = f.read().splitlines() +install_requires = [req.strip() for req in inst_parsed if 'git+' not in req] + +# full requirements for unicorn (does not require dcicutils) +with open('requirements.txt') as f: + set_parsed = f.read().splitlines() +setup_requires = [req.strip() for req in set_parsed if 'git+' not in req] + +# full requirements for pony and running tests (includes dcicutils) +with open('requirements-4dn.txt') as f: + tests_parsed = f.read().splitlines() +tests_require = [req.strip() for req in tests_parsed if 'git+' not in req] -tests_require = [ - 'pytest>=3.0.1', - 'pytest-mock', - 'pytest-cov', -] setup( name='core', @@ -42,15 +41,15 @@ 'Programming Language :: Python', 'Programming Language :: Python :: 2.7', ], - install_requires=requires, + install_requires=[], include_package_data=True, tests_require=tests_require, extras_require={ 'test': tests_require, }, - setup_requires=['pytest-runner', ], + setup_requires=[], dependency_links=[ - 'git+https://github.com/j1z0/python-lambda.git#egg=python_lambda', + 'git+https://github.com/4dn-dcic/python-lambda.git#egg=python_lambda', 'git+https://github.com/SooLee/Benchmark.git#egg=Benchmark' ] ) diff --git a/tasks.py b/tasks.py index 24632414c..b0e0bd711 100644 --- a/tasks.py +++ b/tasks.py @@ -9,17 +9,11 @@ import contextlib import shutil # from botocore.errorfactory import ExecutionAlreadyExists +from core.ec2_utils import AWS_S3_ROLE_NAME +from core.utils import AWS_REGION, AWS_ACCOUNT_NUMBER from core.utils import run_workflow as _run_workflow from core.utils import create_stepfunction as _create_stepfunction -from core.utils import _tibanna_settings, Tibanna from core.utils import _tibanna -from core.utils import AWS_REGION, AWS_ACCOUNT_NUMBER -from core.ec2_utils import AWS_S3_ROLE_NAME -from dcicutils.s3_utils import s3Utils -from dcicutils.ff_utils import ( - get_metadata, - search_metadata -) from core.launch_utils import rerun as _rerun from core.launch_utils import rerun_many as _rerun_many from core.launch_utils import kill_all as _kill_all @@ -27,7 +21,6 @@ from core.iam_utils import get_bucket_role_name, get_lambda_role_name from contextlib import contextmanager import aws_lambda -from time import sleep import requests import random @@ -40,6 +33,7 @@ AMI_ID_CWL_DRAFT3 = 'ami-cfb14bb5' TIBANNA_REPO_NAME = os.environ.get('TIBANNA_REPO_NAME', '4dn-dcic/tibanna') TIBANNA_REPO_BRANCH = os.environ.get('TIBANNA_REPO_BRANCH', 'master') +UNICORN_LAMBDAS = ['run_task_awsem', 'check_task_awsem'] def get_random_line_in_gist(url): @@ -76,13 +70,13 @@ def setenv(**kwargs): def get_all_core_lambdas(): return [ - 'validate_md5_s3_trigger', - 'start_run_awsem', - 'run_task_awsem', - 'check_task_awsem', - 'update_ffmeta_awsem', - 'run_workflow', - ] + 'validate_md5_s3_trigger', + 'start_run_awsem', + 'run_task_awsem', + 'check_task_awsem', + 'update_ffmeta_awsem', + 'run_workflow', + ] def env_list(name): @@ -91,6 +85,9 @@ def env_list(name): if secret is None: raise RuntimeError("SECRET should be defined in env") envlist = { + 'run_workflow': {'SECRET': secret, + 'TIBANNA_AWS_REGION': AWS_REGION, + 'AWS_ACCOUNT_NUMBER': AWS_ACCOUNT_NUMBER}, 'start_run_awsem': {'SECRET': secret, 'TIBANNA_AWS_REGION': AWS_REGION, 'AWS_ACCOUNT_NUMBER': AWS_ACCOUNT_NUMBER}, @@ -247,20 +244,25 @@ def deploy_chalice(ctx, name='lambda_sbg', version=None): @task -def deploy_core(ctx, name, version=None, no_tests=False, suffix=None, usergroup=None): +def deploy_core(ctx, name, version=None, tests=False, suffix=None, usergroup=None): print("preparing for deploy...") - print("make sure tests pass") - if no_tests is False: + if tests: + print("running tests...") if test(ctx) != 0: print("tests need to pass first before deploy") return + else: + print("skipping tests. execute with --tests flag to run them") if name == 'all': names = get_all_core_lambdas() - print(names) + + elif name == 'unicorn': + names = UNICORN_LAMBDAS else: names = [name, ] + print('deploying the following lambdas: %s' % names) - # dist directores are the enemy, clean the all + # dist directores are the enemy, clean them all for name in get_all_core_lambdas(): print("cleaning house before deploying") with chdir("./core/%s" % (name)): @@ -305,8 +307,13 @@ def deploy_lambda_package(ctx, name, suffix=None, usergroup=None): else: new_name = name new_src = '../' + new_name + # use the lightweight requirements for the lambdas to simplify deployment + if name in UNICORN_LAMBDAS: + requirements_file = '../../requirements-lambda-unicorn.txt' + else: + requirements_file = '../../requirements-lambda-pony.txt' with chdir(new_src): - aws_lambda.deploy(os.getcwd(), local_package='../..', requirements='../../requirements.txt') + aws_lambda.deploy(os.getcwd(), local_package='../..', requirements=requirements_file) # add environment variables lambda_update_config = {'FunctionName': new_name} envs = env_list(name) @@ -410,127 +417,6 @@ def publish(ctx, test=False): run('twine upload dist/*', echo=True) -@task -def run_md5(ctx, env, accession, uuid): - tibanna = Tibanna(env=env) - meta_data = get_metadata(accession, key=tibanna.ff_keys) - file_name = meta_data['upload_key'].split('/')[-1] - - input_json = make_input(env=env, workflow='md5', object_key=file_name, uuid=uuid) - return _run_workflow(input_json, accession) - - -@task -def batch_fastqc(ctx, env, batch_size=20): - ''' - try to run fastqc on everythign that needs it ran - ''' - files_processed = 0 - files_skipped = 0 - - # handle ctrl-c - import signal - - def report(signum, frame): - print("Processed %s files, skipped %s files" % (files_processed, files_skipped)) - sys.exit(-1) - - signal.signal(signal.SIGINT, report) - - tibanna = Tibanna(env=env) - uploaded_files = search_metadata("search/?type=File&status=uploaded&limit=%s" % batch_size, - key=tibanna.ff_key, ff_env=tibanna.env) - - # TODO: need to change submit 4dn to not overwrite my limit - if len(uploaded_files['@graph']) > batch_size: - limited_files = uploaded_files['@graph'][:batch_size] - else: - limited_files = uploaded_files['@graph'] - - for ufile in limited_files: - fastqc_run = False - for wfrun in ufile.get('workflow_run_inputs', []): - if 'fastqc' in wfrun: - fastqc_run = True - if not fastqc_run: - print("running fastqc for %s" % ufile.get('accession')) - run_fastqc(ctx, env, ufile.get('accession'), ufile.get('uuid')) - files_processed += 1 - else: - print("******** fastqc already run for %s skipping" % ufile.get('accession')) - files_skipped += 1 - sleep(5) - if files_processed % 10 == 0: - sleep(60) - - print("Processed %s files, skipped %s files" % (files_processed, files_skipped)) - - -@task -def run_fastqc(ctx, env, accession, uuid): - if not accession.endswith(".fastq.gz"): - accession += ".fastq.gz" - input_json = make_input(env=env, workflow='fastqc-0-11-4-1', object_key=accession, uuid=uuid) - return _run_workflow(input_json, accession) - - -_workflows = {'md5': - {'uuid': 'd3f25cd3-e726-4b3c-a022-48f844474b41', - 'arg_name': 'input_file' - }, - 'fastqc-0-11-4-1': - {'uuid': '2324ad76-ff37-4157-8bcc-3ce72b7dace9', - 'arg_name': 'input_fastq' - }, - } - - -def calc_ebs_size(bucket, key): - s3 = s3Utils(bucket, bucket, bucket) - size = s3.get_file_size(key, bucket, add_gb=3, size_in_gb=True) - if size < 10: - size = 10 - return size - - -def make_input(env, workflow, object_key, uuid): - bucket = "elasticbeanstalk-%s-files" % env - output_bucket = "elasticbeanstalk-%s-wfoutput" % env - workflow_uuid = _workflows[workflow]['uuid'] - workflow_arg_name = _workflows[workflow]['arg_name'] - - data = {"parameters": {}, - "app_name": workflow, - "workflow_uuid": workflow_uuid, - "input_files": [ - {"workflow_argument_name": workflow_arg_name, - "bucket_name": bucket, - "uuid": uuid, - "object_key": object_key, - } - ], - "output_bucket": output_bucket, - "config": { - "ebs_type": "io1", - "json_bucket": "4dn-aws-pipeline-run-json", - "ebs_iops": 500, - "shutdown_min": 30, - "ami_id": "ami-cfb14bb5", - "copy_to_s3": True, - "script_url": "https://raw.githubusercontent.com/4dn-dcic/tibanna/master/awsf/", - "launch_instance": True, - "password": "thisisnotmypassword", - "log_bucket": "tibanna-output", - "key_name": "" - }, - } - data.update(_tibanna_settings({'run_id': str(object_key), - 'run_type': workflow, - 'env': env, - })) - return data - - @task def run_workflow(ctx, input_json='', workflow=''): with open(input_json) as input_file: @@ -554,18 +440,17 @@ def setup_tibanna_env(ctx, buckets='', usergroup_tag='default'): @task -def deploy_tibanna(ctx, suffix=None, sfn_type='pony', usergroup=None, version=None, no_tests=False): - print("creating a new workflow..") +def deploy_tibanna(ctx, suffix=None, sfn_type='pony', usergroup=None, version=None, tests=False): + print("creating a new workflow...") if sfn_type not in ['pony', 'unicorn']: raise Exception("Invalid sfn_type : it must be either pony or unicorn.") res = _create_stepfunction(suffix, sfn_type, usergroup=usergroup) print(res) - print("deploying lambdas..") + print("deploying lambdas...") if sfn_type == 'pony': - deploy_core(ctx, 'all', version=version, no_tests=no_tests, suffix=suffix, usergroup=usergroup) + deploy_core(ctx, 'all', version=version, tests=tests, suffix=suffix, usergroup=usergroup) else: - deploy_core(ctx, 'run_task_awsem', version=version, no_tests=no_tests, suffix=suffix, usergroup=usergroup) - deploy_core(ctx, 'check_task_awsem', version=version, no_tests=no_tests, suffix=suffix, usergroup=usergroup) + deploy_core(ctx, 'unicorn', version=version, tests=tests, suffix=suffix, usergroup=usergroup) @task diff --git a/tests/core/check_task_awsem/test_handler.py b/tests/core/check_task_awsem/test_handler.py index 9c5c2cf37..e11aeba95 100644 --- a/tests/core/check_task_awsem/test_handler.py +++ b/tests/core/check_task_awsem/test_handler.py @@ -12,7 +12,8 @@ @pytest.fixture() def check_task_input(): return {"config": {"log_bucket": "tibanna-output"}, - "jobid": "test_job" + "jobid": "test_job", + "push_error_to_end": True } diff --git a/tests/core/start_run_awsem/test_handler.py b/tests/core/start_run_awsem/test_handler.py index 4e162b077..121fc6c4b 100644 --- a/tests/core/start_run_awsem/test_handler.py +++ b/tests/core/start_run_awsem/test_handler.py @@ -7,7 +7,7 @@ add_secondary_files_to_args, ) from ..conftest import valid_env -from core.utils import Tibanna, ProcessedFileMetadata +from core.pony_utils import Tibanna, ProcessedFileMetadata from dcicutils import ff_utils import mock @@ -59,7 +59,7 @@ def test_proc_file_for_arg_name(run_awsem_event_data_processed_files, proc_file_ file_with_type = proc_file_in_webdev.copy() file_with_type['@type'] = ['FileProcessed', 'Item', 'whatever'] - with mock.patch('core.utils.get_metadata', return_value=file_with_type): + with mock.patch('core.pony_utils.get_metadata', return_value=file_with_type): pf, resp = proc_file_for_arg_name(of, 'output_file1', tibanna) assert type(pf) == ProcessedFileMetadata assert pf.__dict__ == proc_file_in_webdev diff --git a/tests/core/test_ec2_utils.py b/tests/core/test_ec2_utils.py index 35af111d7..0281fb839 100644 --- a/tests/core/test_ec2_utils.py +++ b/tests/core/test_ec2_utils.py @@ -1,44 +1,4 @@ -from core.ec2_utils import Awsem, update_config - - -def test_create_awsem(update_ffmeta_event_data, tibanna_env): - update_ffmeta_event_data.update(tibanna_env) - awsem = Awsem(update_ffmeta_event_data) - assert awsem.args - assert awsem.config - assert awsem.app_name - assert awsem.output_s3 - assert awsem.output_files_meta - - -def test_get_output_files(update_ffmeta_event_data, tibanna_env): - update_ffmeta_event_data.update(tibanna_env) - awsem = Awsem(update_ffmeta_event_data) - of = awsem.output_files() - first_key = of.keys()[0] - assert 1 == len(of) - assert of[first_key].runner == awsem - assert of[first_key].bucket == awsem.output_s3 - assert of[first_key].key == 'lalala/md5_report' - assert of[first_key].output_type == 'Output report file' - - -def test_get_input_files(update_ffmeta_event_data, tibanna_env): - update_ffmeta_event_data.update(tibanna_env) - awsem = Awsem(update_ffmeta_event_data) - infiles = awsem.input_files() - first_key = infiles.keys()[0] - assert 1 == len(infiles) - assert infiles[first_key].runner == awsem - assert infiles[first_key].bucket == 'elasticbeanstalk-fourfront-webdev-files' - assert infiles[first_key].key == 'f4864029-a8ad-4bb8-93e7-5108f462ccaa/4DNFIRSRJH45.fastq.gz' - assert infiles[first_key].accession == '4DNFIRSRJH45' - - -def test_get_inputfile_accession(update_ffmeta_event_data, tibanna_env): - update_ffmeta_event_data.update(tibanna_env) - awsem = Awsem(update_ffmeta_event_data) - assert awsem.inputfile_accessions['input_file'] == '4DNFIRSRJH45' +from core.ec2_utils import update_config def test_update_config(run_task_awsem_event_data): diff --git a/tests/core/test_ff_utils.py b/tests/core/test_ff_utils.py deleted file mode 100644 index 2efdee852..000000000 --- a/tests/core/test_ff_utils.py +++ /dev/null @@ -1,87 +0,0 @@ -import pytest -import mock -from .conftest import valid_env -from core.utils import ( - Tibanna, - merge_source_experiments, - ProcessedFileMetadata, - get_format_extension_map, - get_extra_file_key -) -import logging - -LOG = logging.getLogger(__name__) - - -@pytest.fixture() -def proc_file_in_webdev(): - return {'status': 'released', - 'uuid': 'f6d5ba22-aaf9-48e9-8df4-bc5c131c96af', - 'file_format': 'normvector_juicerformat', - 'accession': '4DNFIRO3UX7I', - 'award': '/awards/1U01CA200059-01/', - 'lab': '/labs/4dn-dcic-lab/'} - - -def test_create_ProcessedFileMetadata_from_get_error_if_no_at_type(ff_keys, proc_file_in_webdev): - # can use acc, uuid, @id, any valid url - with mock.patch('core.utils.get_metadata', return_value=proc_file_in_webdev): - with pytest.raises(Exception) as expinfo: - ProcessedFileMetadata.get(proc_file_in_webdev['accession'], ff_keys) - assert "only load ProcessedFiles" in str(expinfo.value) - - -def test_create_ProcessedFileMetadata_from_get(ff_keys, proc_file_in_webdev): - # can use acc, uuid, @id, any valid url - file_with_type = proc_file_in_webdev.copy() - file_with_type['@type'] = ['FileProcessed', 'Item', 'whatever'] - with mock.patch('core.utils.get_metadata', return_value=file_with_type) as ff: - pf = ProcessedFileMetadata.get(proc_file_in_webdev['accession'], ff_keys) - assert pf.__dict__ == proc_file_in_webdev - assert type(pf) is ProcessedFileMetadata - ff.was_called_once() - - -@valid_env -@pytest.mark.webtest -def test_get_format_extension_map(run_awsem_event_data): - tibanna_settings = run_awsem_event_data.get('_tibanna', {}) - # if they don't pass in env guess it from output_bucket - env = tibanna_settings.get('env') - # tibanna provides access to keys based on env and stuff like that - tibanna = Tibanna(env, ff_keys=run_awsem_event_data.get('ff_keys'), - settings=tibanna_settings) - - fe_map = get_format_extension_map(tibanna.ff_keys) - assert(fe_map) - assert 'pairs' in fe_map.keys() - - -@valid_env -@pytest.mark.webtest -def test_merge_source_experiment(run_awsem_event_data): - input_file = { - "bucket_name": "elasticbeanstalk-fourfront-webdev-wfoutput", - "workflow_argument_name": "input_pairs", - "uuid": ["d2c897ec-bdb2-47ce-b1b1-845daccaa571", "d2c897ec-bdb2-47ce-b1b1-845daccaa571"], - "object_key": ["4DNFI25JXLLI.pairs.gz", "4DNFI25JXLLI.pairs.gz"] - } - data = run_awsem_event_data - tibanna_settings = data.get('_tibanna', {}) - # if they don't pass in env guess it from output_bucket - env = tibanna_settings.get('env') - # tibanna provides access to keys based on env and stuff like that - tibanna = Tibanna(env, ff_keys=data.get('ff_keys'), - settings=tibanna_settings) - res = merge_source_experiments(input_file['uuid'], tibanna.ff_keys, tibanna.env) - LOG.info(res) - assert 'fake_source_experiment' in res - - -def test_get_extra_file_key(): - fe_map = {'bg': '.bedGraph.gz', 'bw': '.bw'} - infile_key = 'hahaha/lalala.bedGraph.gz' - infile_format = 'bg' - extra_file_format = 'bw' - extra_file_key = get_extra_file_key(infile_format, infile_key, extra_file_format, fe_map) - assert extra_file_key == 'hahaha/lalala.bw' diff --git a/tests/core/test_pony_utils.py b/tests/core/test_pony_utils.py new file mode 100644 index 000000000..85fd29dfe --- /dev/null +++ b/tests/core/test_pony_utils.py @@ -0,0 +1,306 @@ +from core.pony_utils import ( + Tibanna, + WorkflowRunMetadata, + ensure_list, + Awsem, + merge_source_experiments, + ProcessedFileMetadata, + get_format_extension_map, + get_extra_file_key +) +import pytest +from conftest import valid_env +from test_utils import awsem_error_fun +import logging +import mock + +LOG = logging.getLogger(__name__) + + +@pytest.fixture +def ff_metadata(): + return { + "app_name": "md5", + "_tibanna": { + "env": "fourfront-webprod", + "settings": { + "url": "", + "run_type": "md5", + "run_name": "md5_4DNFIIE1QWPL.fastq.gz", + "env": "fourfront-webprod", + "run_id": "4DNFIIE1QWPL.fastq.gz" + } + }, + "ff_meta": { + "run_platform": "AWSEM", + "uuid": "71d4d068-1b17-4e99-8b59-cfc561266b45", + "parameters": [], + "workflow": "d3f25cd3-e726-4b3c-a022-48f844474b41", + "title": "md5 run 2018-02-06 21:41:42.750987", + "award": "1U01CA200059-01", + "awsem_job_id": "", + "awsem_app_name": "md5", + "lab": "4dn-dcic-lab", + "run_status": "started", + "output_files": [ + { + "type": "Output report file", + "workflow_argument_name": "report" + } + ], + "input_files": [ + { + "ordinal": 1, + "workflow_argument_name": "input_file", + "value": "b4f6807c-6f93-4b7d-91ff-ff95e801165c" + } + ] + }, + "push_error_to_end": True + } + + +@pytest.fixture +def json_request(): + return { + "input_files": [ + { + "bucket_name": "encoded-4dn-files", + "object_key": "4DNFI067AFHV.fastq.gz", + "uuid": "46e82a90-49e5-4c33-afab-9ec90d65cca1", + "workflow_argument_name": "fastq1" + }, + { + "bucket_name": "encoded-4dn-files", + "object_key": "4DNFI067AFHX.fastq.gz", + "uuid": "46e82a90-49e5-4c33-afab-9ec90d65cca2", + "workflow_argument_name": "fastq2" + }, + { + "bucket_name": "encoded-4dn-files", + "object_key": "4DNFIZQZ39L9.bwaIndex.tgz", + "uuid": "1f53df95-4cf3-41cc-971d-81bb16c486dd", + "workflow_argument_name": "bwa_index" + } + ], + "workflow_uuid": "02d636b9-d82d-4da9-950c-2ca994a0943e", + "app_name": "hi-c-processing-parta", + "parameters": { + "nThreads": 8, + "teststring": "test", + } + } + + +@pytest.fixture +def workflow_event_data(): + return {"workflow": {"import_id_list": ["FHtnXozBk1C5Fyp2dRmSa2yhFCBBoEcN"], + "app_name": "md5", + "task_id": "", + "task_input": {"app": "4dn-dcic/dev/md5", + "project": "4dn-dcic/dev", + "name": "md5", + "inputs": {"input_file": {"class": "File", + "name": "4DNFI7RAJFJ4.fasta.gz", + "path": "5877fc32e4b0f31cb4bc37a1"}}}, + "volume_list": [{"id": "4dn-labor/4dn_s32588y8f6", "name": "4dn_s32588y8f6"}], + "header": {"X-SBG-Auth-Token": "1234", "Content-type": "application/json"}, + "token": "1234", "export_report": [], "project_id": "4dn-dcic/dev", + "export_id_list": [], "output_volume_id": "4dn-labor/4dn_s32588y8f7"}} + + +@valid_env +@pytest.mark.webtest +def test_read_s3(s3_utils): + filename = '__test_data/test_file.txt' + read = s3_utils.read_s3(filename) + assert read.strip() == 'thisisatest' + + +@valid_env +@pytest.mark.webtest +def test_get_file_size(s3_utils): + filename = '__test_data/test_file.txt' + size = s3_utils.get_file_size(filename) + assert size == 12 + + +@valid_env +@pytest.mark.webtest +def test_get_file_size_in_bg(s3_utils): + filename = '__test_data/test_file.txt' + size = s3_utils.get_file_size(filename, add_gb=2, size_in_gb=True) + assert size == 2 + + +@valid_env +@pytest.mark.webtest +def test_read_s3_zip(s3_utils): + filename = '__test_data/fastqc_report.zip' + files = s3_utils.read_s3_zipfile(filename, ['summary.txt', 'fastqc_data.txt']) + assert files['summary.txt'] + assert files['fastqc_data.txt'] + assert files['summary.txt'].startswith('PASS') + + +@valid_env +@pytest.mark.webtest +def test_unzip_s3_to_s3(s3_utils): + prefix = '__test_data/extracted' + filename = '__test_data/fastqc_report.zip' + s3_utils.s3_delete_dir(prefix) + + # ensure this thing was deleted + # if no files there will be no Contents in response + objs = s3_utils.s3_read_dir(prefix) + assert [] == objs.get('Contents', []) + + # now copy to that dir we just deleted + retfile_list = ['summary.txt', 'fastqc_data.txt', 'fastqc_report.html'] + ret_files = s3_utils.unzip_s3_to_s3(filename, prefix, retfile_list) + assert 3 == len(ret_files.keys()) + assert ret_files['fastqc_report.html']['s3key'].startswith("https://s3.amazonaws.com") + + objs = s3_utils.s3_read_dir(prefix) + assert objs.get('Contents', None) + + +def test_create_workflowrun_from_event_parameter(update_ffmeta_event_data_newmd5): + meta = update_ffmeta_event_data_newmd5['ff_meta'].copy() + meta['app_name'] = 'md5' + ff_wfr = WorkflowRunMetadata(**meta) + assert ff_wfr + + +def test_tibanna(): + data = {'env': 'fourfront-webdev', + 'settings': {'1': '1'}} + tibanna = Tibanna(**data) + assert tibanna + assert tibanna.as_dict() == data + + +def test_ensure_list(): + assert ensure_list(5) == [5] + assert ensure_list('hello') == ['hello'] + assert ensure_list(['hello']) == ['hello'] + assert ensure_list({'a': 'b'}) == [{'a': 'b'}] + + +def test_create_awsem(update_ffmeta_event_data, tibanna_env): + update_ffmeta_event_data.update(tibanna_env) + awsem = Awsem(update_ffmeta_event_data) + assert awsem.args + assert awsem.config + assert awsem.app_name + assert awsem.output_s3 + assert awsem.output_files_meta + + +def test_get_output_files(update_ffmeta_event_data, tibanna_env): + update_ffmeta_event_data.update(tibanna_env) + awsem = Awsem(update_ffmeta_event_data) + of = awsem.output_files() + first_key = of.keys()[0] + assert 1 == len(of) + assert of[first_key].runner == awsem + assert of[first_key].bucket == awsem.output_s3 + assert of[first_key].key == 'lalala/md5_report' + assert of[first_key].output_type == 'Output report file' + + +def test_get_input_files(update_ffmeta_event_data, tibanna_env): + update_ffmeta_event_data.update(tibanna_env) + awsem = Awsem(update_ffmeta_event_data) + infiles = awsem.input_files() + first_key = infiles.keys()[0] + assert 1 == len(infiles) + assert infiles[first_key].runner == awsem + assert infiles[first_key].bucket == 'elasticbeanstalk-fourfront-webdev-files' + assert infiles[first_key].key == 'f4864029-a8ad-4bb8-93e7-5108f462ccaa/4DNFIRSRJH45.fastq.gz' + assert infiles[first_key].accession == '4DNFIRSRJH45' + + +def test_get_inputfile_accession(update_ffmeta_event_data, tibanna_env): + update_ffmeta_event_data.update(tibanna_env) + awsem = Awsem(update_ffmeta_event_data) + assert awsem.inputfile_accessions['input_file'] == '4DNFIRSRJH45' + + +@pytest.fixture() +def proc_file_in_webdev(): + return {'status': 'released', + 'uuid': 'f6d5ba22-aaf9-48e9-8df4-bc5c131c96af', + 'file_format': 'normvector_juicerformat', + 'accession': '4DNFIRO3UX7I', + 'award': '/awards/1U01CA200059-01/', + 'lab': '/labs/4dn-dcic-lab/'} + + +def test_create_ProcessedFileMetadata_from_get_error_if_no_at_type(ff_keys, proc_file_in_webdev): + # can use acc, uuid, @id, any valid url + with mock.patch('core.pony_utils.get_metadata', return_value=proc_file_in_webdev): + with pytest.raises(Exception) as expinfo: + ProcessedFileMetadata.get(proc_file_in_webdev['accession'], ff_keys) + assert "only load ProcessedFiles" in str(expinfo.value) + + +def test_create_ProcessedFileMetadata_from_get(ff_keys, proc_file_in_webdev): + # can use acc, uuid, @id, any valid url + file_with_type = proc_file_in_webdev.copy() + file_with_type['@type'] = ['FileProcessed', 'Item', 'whatever'] + with mock.patch('core.pony_utils.get_metadata', return_value=file_with_type) as ff: + pf = ProcessedFileMetadata.get(proc_file_in_webdev['accession'], ff_keys) + assert pf.__dict__ == proc_file_in_webdev + assert type(pf) is ProcessedFileMetadata + ff.was_called_once() + + +@valid_env +@pytest.mark.webtest +def test_get_format_extension_map(run_awsem_event_data): + tibanna_settings = run_awsem_event_data.get('_tibanna', {}) + # if they don't pass in env guess it from output_bucket + env = tibanna_settings.get('env') + # tibanna provides access to keys based on env and stuff like that + tibanna = Tibanna(env, ff_keys=run_awsem_event_data.get('ff_keys'), + settings=tibanna_settings) + + fe_map = get_format_extension_map(tibanna.ff_keys) + assert(fe_map) + assert 'pairs' in fe_map.keys() + + +@valid_env +@pytest.mark.webtest +def test_merge_source_experiment(run_awsem_event_data): + input_file = { + "bucket_name": "elasticbeanstalk-fourfront-webdev-wfoutput", + "workflow_argument_name": "input_pairs", + "uuid": ["d2c897ec-bdb2-47ce-b1b1-845daccaa571", "d2c897ec-bdb2-47ce-b1b1-845daccaa571"], + "object_key": ["4DNFI25JXLLI.pairs.gz", "4DNFI25JXLLI.pairs.gz"] + } + data = run_awsem_event_data + tibanna_settings = data.get('_tibanna', {}) + # if they don't pass in env guess it from output_bucket + env = tibanna_settings.get('env') + # tibanna provides access to keys based on env and stuff like that + tibanna = Tibanna(env, ff_keys=data.get('ff_keys'), + settings=tibanna_settings) + res = merge_source_experiments(input_file['uuid'], tibanna.ff_keys, tibanna.env) + LOG.info(res) + assert 'fake_source_experiment' in res + + +def test_get_extra_file_key(): + fe_map = {'bg': '.bedGraph.gz', 'bw': '.bw'} + infile_key = 'hahaha/lalala.bedGraph.gz' + infile_format = 'bg' + extra_file_format = 'bw' + extra_file_key = get_extra_file_key(infile_format, infile_key, extra_file_format, fe_map) + assert extra_file_key == 'hahaha/lalala.bw' + + +def test_powerup_add_awsem_error_to_output(ff_metadata): + res = awsem_error_fun(ff_metadata, None) + assert ('error' in res) diff --git a/tests/core/test_utils.py b/tests/core/test_utils.py index be40eed1d..bb7e275b0 100644 --- a/tests/core/test_utils.py +++ b/tests/core/test_utils.py @@ -1,186 +1,12 @@ from core.utils import ( - Tibanna, powerup, StillRunningException, AWSEMJobErrorException, - WorkflowRunMetadata, - ensure_list, - ) import pytest -from conftest import valid_env import mock -@pytest.fixture -def ff_metadata(): - return { - "app_name": "md5", - "_tibanna": { - "env": "fourfront-webprod", - "settings": { - "url": "", - "run_type": "md5", - "run_name": "md5_4DNFIIE1QWPL.fastq.gz", - "env": "fourfront-webprod", - "run_id": "4DNFIIE1QWPL.fastq.gz" - } - }, - "ff_meta": { - "run_platform": "AWSEM", - "uuid": "71d4d068-1b17-4e99-8b59-cfc561266b45", - "parameters": [], - "workflow": "d3f25cd3-e726-4b3c-a022-48f844474b41", - "title": "md5 run 2018-02-06 21:41:42.750987", - "award": "1U01CA200059-01", - "awsem_job_id": "", - "awsem_app_name": "md5", - "lab": "4dn-dcic-lab", - "run_status": "started", - "output_files": [ - { - "type": "Output report file", - "workflow_argument_name": "report" - } - ], - "input_files": [ - { - "ordinal": 1, - "workflow_argument_name": "input_file", - "value": "b4f6807c-6f93-4b7d-91ff-ff95e801165c" - } - ] - } - } - - -@pytest.fixture -def json_request(): - return { - "input_files": [ - { - "bucket_name": "encoded-4dn-files", - "object_key": "4DNFI067AFHV.fastq.gz", - "uuid": "46e82a90-49e5-4c33-afab-9ec90d65cca1", - "workflow_argument_name": "fastq1" - }, - { - "bucket_name": "encoded-4dn-files", - "object_key": "4DNFI067AFHX.fastq.gz", - "uuid": "46e82a90-49e5-4c33-afab-9ec90d65cca2", - "workflow_argument_name": "fastq2" - }, - { - "bucket_name": "encoded-4dn-files", - "object_key": "4DNFIZQZ39L9.bwaIndex.tgz", - "uuid": "1f53df95-4cf3-41cc-971d-81bb16c486dd", - "workflow_argument_name": "bwa_index" - } - ], - "workflow_uuid": "02d636b9-d82d-4da9-950c-2ca994a0943e", - "app_name": "hi-c-processing-parta", - "parameters": { - "nThreads": 8, - "teststring": "test", - } - } - - -@pytest.fixture -def workflow_event_data(): - return {"workflow": {"import_id_list": ["FHtnXozBk1C5Fyp2dRmSa2yhFCBBoEcN"], - "app_name": "md5", - "task_id": "", - "task_input": {"app": "4dn-dcic/dev/md5", - "project": "4dn-dcic/dev", - "name": "md5", - "inputs": {"input_file": {"class": "File", - "name": "4DNFI7RAJFJ4.fasta.gz", - "path": "5877fc32e4b0f31cb4bc37a1"}}}, - "volume_list": [{"id": "4dn-labor/4dn_s32588y8f6", "name": "4dn_s32588y8f6"}], - "header": {"X-SBG-Auth-Token": "1234", "Content-type": "application/json"}, - "token": "1234", "export_report": [], "project_id": "4dn-dcic/dev", - "export_id_list": [], "output_volume_id": "4dn-labor/4dn_s32588y8f7"}} - - -@valid_env -@pytest.mark.webtest -def test_read_s3(s3_utils): - filename = '__test_data/test_file.txt' - read = s3_utils.read_s3(filename) - assert read.strip() == 'thisisatest' - - -@valid_env -@pytest.mark.webtest -def test_get_file_size(s3_utils): - filename = '__test_data/test_file.txt' - size = s3_utils.get_file_size(filename) - assert size == 12 - - -@valid_env -@pytest.mark.webtest -def test_get_file_size_in_bg(s3_utils): - filename = '__test_data/test_file.txt' - size = s3_utils.get_file_size(filename, add_gb=2, size_in_gb=True) - assert size == 2 - - -@valid_env -@pytest.mark.webtest -def test_read_s3_zip(s3_utils): - filename = '__test_data/fastqc_report.zip' - files = s3_utils.read_s3_zipfile(filename, ['summary.txt', 'fastqc_data.txt']) - assert files['summary.txt'] - assert files['fastqc_data.txt'] - assert files['summary.txt'].startswith('PASS') - - -@valid_env -@pytest.mark.webtest -def test_unzip_s3_to_s3(s3_utils): - prefix = '__test_data/extracted' - filename = '__test_data/fastqc_report.zip' - s3_utils.s3_delete_dir(prefix) - - # ensure this thing was deleted - # if no files there will be no Contents in response - objs = s3_utils.s3_read_dir(prefix) - assert [] == objs.get('Contents', []) - - # now copy to that dir we just deleted - retfile_list = ['summary.txt', 'fastqc_data.txt', 'fastqc_report.html'] - ret_files = s3_utils.unzip_s3_to_s3(filename, prefix, retfile_list) - assert 3 == len(ret_files.keys()) - assert ret_files['fastqc_report.html']['s3key'].startswith("https://s3.amazonaws.com") - - objs = s3_utils.s3_read_dir(prefix) - assert objs.get('Contents', None) - - -def test_create_workflowrun_from_event_parameter(update_ffmeta_event_data_newmd5): - meta = update_ffmeta_event_data_newmd5['ff_meta'].copy() - meta['app_name'] = 'md5' - ff_wfr = WorkflowRunMetadata(**meta) - assert ff_wfr - - -def test_tibanna(): - data = {'env': 'fourfront-webdev', - 'settings': {'1': '1'}} - tibanna = Tibanna(**data) - assert tibanna - assert tibanna.as_dict() == data - - -def test_ensure_list(): - assert ensure_list(5) == [5] - assert ensure_list('hello') == ['hello'] - assert ensure_list(['hello']) == ['hello'] - assert ensure_list({'a': 'b'}) == [{'a': 'b'}] - - @powerup("wrapped_fun", mock.Mock(side_effect=StillRunningException("metadata"))) def wrapped_fun(event, context): raise StillRunningException("I should not be called") @@ -203,7 +29,7 @@ def awsem_error_fun(event, context): def test_powerup_errors_are_dumped_into_return_dict(): - res = error_fun({'some': 'data'}, None) + res = error_fun({'some': 'data', 'push_error_to_end': True}, None) assert res['some'] == 'data' assert res['error'] assert 'Error on step: error_fun' in res['error'] @@ -212,7 +38,8 @@ def test_powerup_errors_are_dumped_into_return_dict(): def test_powerup_throws_if_error_set_in_input_json(): # only throw the error because lambda name is update_ffmeta_awsem with pytest.raises(Exception): - update_ffmeta_error_fun({'error': 'same like skip'}, None) + event = {'error': 'same like skip', 'push_error_to_end': True} + update_ffmeta_error_fun(event, None) def test_powerup_error_thrown_if_ignored_exceptions(): @@ -226,7 +53,7 @@ def test_powerup_error_propogates(): # skip throwing an error because 'error' is in event json and the # lambda name != update_ffmeta_awsem. error is propagated to the res # and will be returned exactly as input - res = wrapped_fun({'error': 'should not raise'}, None) + res = wrapped_fun({'error': 'should not raise', 'push_error_to_end': True}, None) assert res['error'] == 'should not raise' @@ -251,8 +78,3 @@ def test_powerup_calls_metadata_only_func(): assert exec_nfo assert 'metadata' in str(exec_nfo.value) - - -def test_powerup_add_awsem_error_to_output(ff_metadata): - res = awsem_error_fun(ff_metadata, None) - assert ('error' in res) diff --git a/tests/core/update_ffmeta_awsem/test_service.py b/tests/core/update_ffmeta_awsem/test_service.py index 35ac20859..d31646d2f 100644 --- a/tests/core/update_ffmeta_awsem/test_service.py +++ b/tests/core/update_ffmeta_awsem/test_service.py @@ -5,8 +5,8 @@ md5_updater, _md5_updater ) -from core.ec2_utils import Awsem -from core import utils +from core.pony_utils import Awsem +from core import pony_utils # from core.check_export_sbg.service import get_inputfile_accession import pytest from ..conftest import valid_env @@ -126,7 +126,7 @@ def test__md5_updater_8(): def test_md5_updater_oldmd5(update_ffmeta_event_data): event = update_ffmeta_event_data tibanna_settings = event.get('_tibanna', {}) - tibanna = utils.Tibanna(**tibanna_settings) + tibanna = pony_utils.Tibanna(**tibanna_settings) awsem = Awsem(update_ffmeta_event_data) ouf = awsem.output_files()['report'] md5_updater('uploaded', ouf, None, tibanna) @@ -137,7 +137,7 @@ def test_md5_updater_oldmd5(update_ffmeta_event_data): def test_md5_updater_newmd5(update_ffmeta_event_data_newmd5): event = update_ffmeta_event_data_newmd5 tibanna_settings = event.get('_tibanna', {}) - tibanna = utils.Tibanna(**tibanna_settings) + tibanna = pony_utils.Tibanna(**tibanna_settings) awsem = Awsem(update_ffmeta_event_data_newmd5) ouf = awsem.output_files()['report'] md5_updater('uploaded', ouf, None, tibanna) @@ -164,7 +164,7 @@ def test_update_ffmeta_awsem_e2e(update_ffmeta_event_data, tibanna_env): @valid_env def test_mcool_updates_fourfront_higlass(update_ffmeta_mcool, tibanna_env): update_ffmeta_mcool.update(tibanna_env) - with mock.patch('core.utils.patch_metadata'): + with mock.patch('core.pony_utils.patch_metadata'): with mock.patch('requests.post') as mock_request: ret = handler(update_ffmeta_mcool, None) mock_request.assert_called_once() @@ -175,7 +175,7 @@ def test_mcool_updates_fourfront_higlass(update_ffmeta_mcool, tibanna_env): @pytest.mark.webtest def test_metadata_only(update_ffmeta_metaonly_data2, tibanna_env): update_ffmeta_metaonly_data2.update(tibanna_env) - with mock.patch('core.utils.patch_metadata') as mock_request: + with mock.patch('core.pony_utils.patch_metadata') as mock_request: ret = handler(update_ffmeta_metaonly_data2, None) # once for patch pf once for workflow run mock_request.call_count == 2 @@ -186,7 +186,7 @@ def test_metadata_only(update_ffmeta_metaonly_data2, tibanna_env): def test_register_to_higlass(used_env): bucket = 'elasticbeanstalk-fourfront-webdev-wfoutput' mcool_key = 'a940cf00-6001-473e-80d1-1e4a43866863/4DNFI75GAT6T.mcool' - tibanna = utils.Tibanna(used_env) + tibanna = pony_utils.Tibanna(used_env) with mock.patch('requests.post') as mock_request: res = register_to_higlass(tibanna, bucket, mcool_key, 'cooler', 'matrix') mock_request.assert_called_once() @@ -198,7 +198,7 @@ def test_register_to_higlass(used_env): def test_register_to_higlass2(used_env): bucket = 'elasticbeanstalk-fourfront-webdev-wfoutput' bigwig_key = 'a940cf00-6001-473e-80d1-1e4a43866863/4DNFI75GAT6T.bw' - tibanna = utils.Tibanna(used_env) + tibanna = pony_utils.Tibanna(used_env) with mock.patch('requests.post') as mock_request: res = register_to_higlass(tibanna, bucket, bigwig_key, 'bigwig', 'vector') mock_request.assert_called_once()