diff --git a/core/ec2_utils.py b/core/ec2_utils.py index 08b3bffc1..3050f23d7 100644 --- a/core/ec2_utils.py +++ b/core/ec2_utils.py @@ -300,12 +300,13 @@ def update_config(config, app_name, input_files, parameters): class WorkflowFile(object): - def __init__(self, bucket, key, runner, accession=None): + def __init__(self, bucket, key, runner, accession=None, output_type=None): self.bucket = bucket self.key = key self.s3 = utils.s3Utils(self.bucket, self.bucket, self.bucket) self.runner = runner self.accession = accession + self.output_type = output_type @property def status(self): @@ -328,11 +329,19 @@ def __init__(self, json): self.config = json['config'] self.output_s3 = self.args['output_S3_bucket'] self.app_name = self.args['app_name'] + self.output_files_meta = json['ff_meta']['output_files'] def output_files(self): files = dict() + output_types = dict() + for x in self.output_files_meta: + output_types[x['workflow_argument_name']] = x['type'] for k, v in self.args.get('output_target').iteritems(): - wff = {k: WorkflowFile(self.output_s3, v, self)} + if k in output_types: + out_type = output_types[k] + else: + out_type = None + wff = {k: WorkflowFile(self.output_s3, v, self, output_type=out_type)} files.update(wff) return files @@ -357,4 +366,4 @@ def input_files(self): @property def inputfile_accessions(self): - return [v.accession for k, v in self.input_files().iteritems()] + return {k: v.accession for k, v in self.input_files().iteritems()} diff --git a/core/fastqc_utils.py b/core/fastqc_utils.py index 71cd3e45e..89504f745 100644 --- a/core/fastqc_utils.py +++ b/core/fastqc_utils.py @@ -68,7 +68,11 @@ def parse_qc_table(data_list, url, qc_schema): if a[0] in qc_schema and qc_schema.get(a[0]).get('type') == 'string': qc_json.update({a[0]: str(a[1])}) elif a[0] in qc_schema and qc_schema.get(a[0]).get('type') == 'number': - qc_json.update({a[0]: number(a[1])}) + qc_json.update({a[0]: number(a[1].replace(',', ''))}) + if a[1] in qc_schema and qc_schema.get(a[1]).get('type') == 'string': + qc_json.update({a[1]: str(a[0])}) + elif a[1] in qc_schema and qc_schema.get(a[1]).get('type') == 'number': + qc_json.update({a[1]: number(a[0].replace(',', ''))}) except IndexError: # pragma: no cover # maybe a blank line or something pass diff --git a/core/run_task_awsf/event.json b/core/run_task_awsf/event.json index 0501d7eb5..1a50778ee 100644 --- a/core/run_task_awsf/event.json +++ b/core/run_task_awsf/event.json @@ -24,13 +24,13 @@ } }, "config": { - "ebs_size": 10, + "ebs_size": 0, "ebs_type": "io1", "json_bucket": "4dn-aws-pipeline-run-json", - "EBS_optimized": false, + "EBS_optimized": "", "ebs_iops": 500, "shutdown_min": 120, - "instance_type": "t2.micro", + "instance_type": "", "s3_access_arn": "arn:aws:iam::643366669028:instance-profile/S3_access", "ami_id": "ami-cfb14bb5", "copy_to_s3": true, diff --git a/core/update_ffmeta_awsf/event_fastqc.json b/core/update_ffmeta_awsf/event_fastqc.json new file mode 100644 index 000000000..14e456bb2 --- /dev/null +++ b/core/update_ffmeta_awsf/event_fastqc.json @@ -0,0 +1,98 @@ +{ + "ff_meta": { + "run_platform": "AWSEM", + "uuid": "18564368-1299-4ce0-82d1-c8917450d834", + "parameters": [], + "workflow": "2324ad76-ff37-4157-8bcc-3ce72b7dace9", + "title": "fastqc-0-11-4-1 hahaha run 2018-01-16 23:08:21.335210", + "award": "1U01CA200059-01", + "awsem_job_id": "", + "run_url": "https://console.aws.amazon.com/states/home?region=us-east-1#/executions/details/arn:aws:states:us-east-1:643366669028:execution:run_awsem_new_pony-dev:fastqc_613a0edc-f130-49a0-b97c-b812a6a42d84", + "awsem_app_name": "fastqc-0-11-4-1", + "lab": "4dn-dcic-lab", + "run_status": "started", + "output_files": [ + { + "type": "Output QC file", + "workflow_argument_name": "report_zip" + } + ], + "input_files": [ + { + "ordinal": 1, + "workflow_argument_name": "input_fastq", + "value": "f4864029-a8ad-4bb8-93e7-5108f462ccaa" + } + ] + }, + "instance_ip": "54.164.70.164", + "app_name": "fastqc-0-11-4-1", + "_tibanna": { + "env": "fourfront-webdev", + "settings": { + "url": "https://console.aws.amazon.com/states/home?region=us-east-1#/executions/details/arn:aws:states:us-east-1:643366669028:execution:run_awsem_new_pony-dev:fastqc_613a0edc-f130-49a0-b97c-b812a6a42d84", + "run_type": "fastqc", + "run_id": "613a0edc-f130-49a0-b97c-b812a6a42d84", + "env": "fourfront-webdev", + "run_name": "fastqc_613a0edc-f130-49a0-b97c-b812a6a42d84" + } + }, + "start_time": "20180116-23:08:29-UTC", + "args": { + "secondary_output_target": {}, + "app_name": "fastqc-0-11-4-1", + "input_parameters": {}, + "cwl_child_filenames": [ + "fastqc-0-11-4.cwl" + ], + "output_target": { + "report_zip": "18564368-1299-4ce0-82d1-c8917450d834/report_zip" + }, + "cwl_main_filename": "fastqc-0-11-4-1.cwl", + "secondary_files": {}, + "output_S3_bucket": "elasticbeanstalk-fourfront-webdev-wfoutput", + "app_version": "0.2.0", + "cwl_directory_url": "https://raw.githubusercontent.com/4dn-dcic/pipelines-cwl/0.2.0/cwl_awsem/", + "input_files": { + "input_fastq": { + "bucket_name": "elasticbeanstalk-fourfront-webdev-files", + "object_key": "f4864029-a8ad-4bb8-93e7-5108f462ccaa/4DNFIRSRJH45.fastq.gz" + } + } + }, + "output_bucket": "elasticbeanstalk-fourfront-webdev-wfoutput", + "jobid": "eUJZ9nH7Rsz2", + "instance_id": "i-04df1c1ca7a06ef66", + "tag": "hahaha", + "pf_meta": [], + "parameters": {}, + "config": { + "ebs_size": 10, + "password": "", + "ebs_type": "io1", + "json_bucket": "4dn-aws-pipeline-run-json", + "json_dir": "/tmp/json", + "EBS_optimized": false, + "ebs_iops": 500, + "userdata_dir": "/tmp/userdata", + "shutdown_min": "300", + "instance_type": "t2.micro", + "s3_access_arn": "arn:aws:iam::643366669028:instance-profile/S3_access", + "job_tag": "fastqc-0-11-4-1", + "copy_to_s3": true, + "script_url": "https://raw.githubusercontent.com/4dn-dcic/tibanna/master/awsf/", + "key_name": "4dn-encode", + "launch_instance": true, + "ami_id": "ami-cfb14bb5", + "log_bucket": "tibanna-output" + }, + "workflow_uuid": "2324ad76-ff37-4157-8bcc-3ce72b7dace9", + "input_files": [ + { + "workflow_argument_name": "input_fastq", + "bucket_name": "elasticbeanstalk-fourfront-webdev-files", + "uuid": "f4864029-a8ad-4bb8-93e7-5108f462ccaa", + "object_key": "4DNFIRSRJH45.fastq.gz" + } + ] +} diff --git a/core/update_ffmeta_awsf/event_pairsqc.json b/core/update_ffmeta_awsf/event_pairsqc.json new file mode 100644 index 000000000..d9e110e38 --- /dev/null +++ b/core/update_ffmeta_awsf/event_pairsqc.json @@ -0,0 +1,130 @@ +{ + "ff_meta": { + "run_platform": "AWSEM", + "uuid": "3d71f45f-6a77-47b4-9839-eb02204c6dae", + "parameters": [ + { + "workflow_argument_name": "enzyme", + "value": "6" + }, + { + "workflow_argument_name": "sample_name", + "value": "4DNFI1ZLO9D7" + } + ], + "workflow": "ae3a87cb-3fa2-469e-97c7-540fc2d0a117", + "title": "pairsqc-single run 2018-01-16 23:08:25.250148", + "award": "1U01CA200059-01", + "awsem_job_id": "", + "run_url": "https://console.aws.amazon.com/states/home?region=us-east-1#/executions/details/arn:aws:states:us-east-1:643366669028:execution:run_awsem_new_pony-dev:pairsqc-single_649e1149-68b5-43b3-bc92-65ca077dc983", + "awsem_app_name": "pairsqc-single", + "lab": "4dn-dcic-lab", + "run_status": "started", + "output_files": [ + { + "type": "Output QC file", + "workflow_argument_name": "report" + } + ], + "input_files": [ + { + "ordinal": 1, + "workflow_argument_name": "input_pairs", + "value": "817f3faa-0573-45c0-8230-02ec19de6544" + }, + { + "ordinal": 1, + "workflow_argument_name": "chromsizes", + "value": "4a6d10ee-2edb-4402-a98f-0edb1d58f5e9" + } + ] + }, + "instance_ip": "52.87.161.131", + "app_name": "pairsqc-single", + "_tibanna": { + "env": "fourfront-webdev", + "settings": { + "url": "https://console.aws.amazon.com/states/home?region=us-east-1#/executions/details/arn:aws:states:us-east-1:643366669028:execution:run_awsem_new_pony-dev:pairsqc-single_649e1149-68b5-43b3-bc92-65ca077dc983", + "run_type": "pairsqc-single", + "run_id": "649e1149-68b5-43b3-bc92-65ca077dc983", + "env": "fourfront-webdev", + "run_name": "pairsqc-single_649e1149-68b5-43b3-bc92-65ca077dc983" + } + }, + "start_time": "20180116-23:08:32-UTC", + "args": { + "secondary_output_target": {}, + "app_name": "pairsqc-single", + "input_parameters": { + "enzyme": "6", + "sample_name": "4DNFI1ZLO9D7" + }, + "cwl_child_filenames": [], + "output_target": { + "report": "3d71f45f-6a77-47b4-9839-eb02204c6dae/report" + }, + "cwl_main_filename": "pairsqc-single.cwl", + "secondary_files": { + "input_pairs": { + "bucket_name": "elasticbeanstalk-fourfront-webdev-wfoutput", + "object_key": "817f3faa-0573-45c0-8230-02ec19de6544/4DNFI1ZLO9D7.pairs.gz.px2" + } + }, + "output_S3_bucket": "elasticbeanstalk-fourfront-webdev-wfoutput", + "app_version": "dev", + "cwl_directory_url": "https://raw.githubusercontent.com/4dn-dcic/pipelines-cwl/dev/cwl_awsem/", + "input_files": { + "chromsizes": { + "bucket_name": "elasticbeanstalk-fourfront-webdev-files", + "object_key": "4a6d10ee-2edb-4402-a98f-0edb1d58f5e9/4DNFI823LSII.chrom.sizes" + }, + "input_pairs": { + "bucket_name": "elasticbeanstalk-fourfront-webdev-wfoutput", + "object_key": "817f3faa-0573-45c0-8230-02ec19de6544/4DNFI1ZLO9D7.pairs.gz" + } + } + }, + "output_bucket": "elasticbeanstalk-fourfront-webdev-wfoutput", + "jobid": "11Jz5nf2xJGt", + "instance_id": "i-0901c30deadd480e4", + "pf_meta": [], + "parameters": { + "enzyme": "6", + "sample_name": "4DNFI1ZLO9D7" + }, + "config": { + "ebs_size": 13, + "password": "dragonfly", + "ebs_type": "io1", + "json_bucket": "4dn-aws-pipeline-run-json", + "json_dir": "/tmp/json", + "EBS_optimized": false, + "ebs_iops": 500, + "userdata_dir": "/tmp/userdata", + "shutdown_min": 120, + "instance_type": "t2.micro", + "s3_access_arn": "arn:aws:iam::643366669028:instance-profile/S3_access", + "job_tag": "pairsqc-single", + "copy_to_s3": true, + "script_url": "https://raw.githubusercontent.com/4dn-dcic/tibanna/master/awsf/", + "key_name": "4dn-encode", + "launch_instance": true, + "ami_id": "ami-cfb14bb5", + "log_bucket": "tibanna-output" + }, + "workflow_uuid": "ae3a87cb-3fa2-469e-97c7-540fc2d0a117", + "input_files": [ + { + "workflow_argument_name": "input_pairs", + "bucket_name": "elasticbeanstalk-fourfront-webdev-wfoutput", + "uuid": "817f3faa-0573-45c0-8230-02ec19de6544", + "object_key": "4DNFI1ZLO9D7.pairs.gz" + }, + { + "object_key": "4DNFI823LSII.chrom.sizes", + "bucket_name": "elasticbeanstalk-fourfront-webdev-files", + "workflow_argument_name": "chromsizes", + "uuid": "4a6d10ee-2edb-4402-a98f-0edb1d58f5e9" + } + ] +} diff --git a/core/update_ffmeta_awsf/service.py b/core/update_ffmeta_awsf/service.py index 4c0e47e18..eee98de26 100644 --- a/core/update_ffmeta_awsf/service.py +++ b/core/update_ffmeta_awsf/service.py @@ -3,7 +3,7 @@ from core import utils, ff_utils, ec2_utils import boto3 from collections import defaultdict -from core.fastqc_utils import parse_fastqc, parse_qc_table +from core.fastqc_utils import parse_qc_table LOG = logging.getLogger(__name__) s3 = boto3.resource('s3') @@ -30,7 +30,25 @@ def update_processed_file_metadata(status, pf_meta, tibanna): return pf_meta -def fastqc_updater(status, wf_file, ff_meta, tibanna): +def qc_updater(status, wf_file, ff_meta, tibanna): + if ff_meta.awsem_app_name == 'fastqc-0-11-4-1': + return _qc_updater(status, wf_file, ff_meta, tibanna, + quality_metric='quality_metric_fastqc', + inputfile_argument='input_fastq', + report_html='fastqc_report.html', + datafiles=['summary.txt', 'fastqc_data.txt']) + elif ff_meta.awsem_app_name == 'pairsqc-single': + inputfile_argument = 'input_pairs' + input_accession = str(wf_file.runner.inputfile_accessions[inputfile_argument]) + return _qc_updater(status, wf_file, ff_meta, tibanna, + quality_metric="quality_metric_pairsqc", + inputfile_argument=inputfile_argument, report_html='pairsqc_report.html', + datafiles=[input_accession + '.summary.out']) + + +def _qc_updater(status, wf_file, ff_meta, tibanna, quality_metric='quality_metric_fastqc', + inputfile_argument='input_fastq', report_html='fastqc_report.html', + datafiles=['summary.txt', 'fastqc_data.txt']): if status == 'uploading': # wait until this bad boy is finished return @@ -38,63 +56,10 @@ def fastqc_updater(status, wf_file, ff_meta, tibanna): ff_key = tibanna.ff_keys # move files to proper s3 location # need to remove sbg from this line - accession = wf_file.runner.inputfile_accessions[0] + accession = wf_file.runner.inputfile_accessions[inputfile_argument] zipped_report = wf_file.key - files_to_parse = ['summary.txt', 'fastqc_data.txt', 'fastqc_report.html'] - LOG.info("accession is %s" % accession) - - try: - files = wf_file.s3.unzip_s3_to_s3(zipped_report, accession, files_to_parse, - acl='public-read') - except Exception as e: - LOG.info(tibanna.s3.__dict__) - raise Exception("%s (key={})\n".format(zipped_report) % e) - # parse fastqc metadata - meta = parse_fastqc(files['summary.txt']['data'], - files['fastqc_data.txt']['data'], - url=files['fastqc_report.html']['s3key']) - LOG.info("fastqc meta is %s" % meta) - - # post fastq metadata - qc_meta = ff_utils.post_to_metadata(meta, 'quality_metric_fastqc', key=ff_key) - if qc_meta.get('@graph'): - qc_meta = qc_meta['@graph'][0] - - LOG.info("qc_meta is %s" % qc_meta) - # update original file as well - try: - original_file = ff_utils.get_metadata(accession, key=ff_key) - LOG.info("original_file is %s" % original_file) - except Exception as e: - raise Exception("Couldn't get metadata for accession {} : ".format(accession) + str(e)) - patch_file = {'quality_metric': qc_meta['@id']} - try: - ff_utils.patch_metadata(patch_file, original_file['uuid'], key=ff_key) - except Exception as e: - raise Exception("patch_metadata failed in fastqc_updater." + str(e) + - "original_file ={}\n".format(str(original_file))) - - # patch the workflow run, value_qc is used to make drawing graphs easier. - output_files = ff_meta.output_files - output_files[0]['value_qc'] = qc_meta['@id'] - retval = {"output_quality_metrics": [{"name": "quality_metric_fastqc", "value": qc_meta['@id']}], - 'output_files': output_files} - - LOG.info("retval is %s" % retval) - return retval - - -def pairsqc_updater(status, wf_file, ff_meta, tibanna, quality_metric="quality_metric_pairsqc"): - if status == 'uploading': - # wait until this bad boy is finished - return - # keys - ff_key = tibanna.ff_keys - # move files to proper s3 location - # need to remove sbg from this line - accession = wf_file.runner.inputfile_accessions[0] - zipped_report = wf_file.key - files_to_parse = [accession + '.summary.out', 'pairsqc_report.html'] + files_to_parse = datafiles + files_to_parse.append(report_html) LOG.info("accession is %s" % accession) try: @@ -108,8 +73,10 @@ def pairsqc_updater(status, wf_file, ff_meta, tibanna, quality_metric="quality_m qc_schema = ff_utils.get_metadata("profiles/" + quality_metric + ".json", key=ff_key) # parse fastqc metadata - meta = parse_qc_table([files[accession + '.summary.out']['data']], - url=files['pairsqc_report.html']['s3key'], + LOG.info("files : %s" % str(files)) + filedata = [files[_]['data'] for _ in datafiles] + meta = parse_qc_table(filedata, + url=files[report_html]['s3key'], qc_schema=qc_schema.get('properties')) LOG.info("fastqc meta is %s" % meta) @@ -129,13 +96,13 @@ def pairsqc_updater(status, wf_file, ff_meta, tibanna, quality_metric="quality_m try: ff_utils.patch_metadata(patch_file, original_file['uuid'], key=ff_key) except Exception as e: - raise Exception("patch_metadata failed in pairsqc_updater." + str(e) + + raise Exception("patch_metadata failed in fastqc_updater." + str(e) + "original_file ={}\n".format(str(original_file))) # patch the workflow run, value_qc is used to make drawing graphs easier. output_files = ff_meta.output_files output_files[0]['value_qc'] = qc_meta['@id'] - retval = {"output_quality_metrics": [{"name": "quality_metric_pairsqc", "value": qc_meta['@id']}], + retval = {"output_quality_metrics": [{"name": quality_metric, "value": qc_meta['@id']}], 'output_files': output_files} LOG.info("retval is %s" % retval) @@ -146,7 +113,7 @@ def md5_updater(status, wf_file, ff_meta, tibanna): # get key ff_key = tibanna.ff_keys # get metadata about original input file - accession = wf_file.runner.inputfile_accessions[0] + accession = wf_file.runner.inputfile_accessions['input_file'] original_file = ff_utils.get_metadata(accession, key=ff_key) if status.lower() == 'uploaded': @@ -222,11 +189,11 @@ def handler(event, context): status = export.status print("export res is %s", status) if status == 'COMPLETED': - patch_meta = OUTFILE_UPDATERS[awsem.app_name]('uploaded', export, ff_meta, tibanna) + patch_meta = OUTFILE_UPDATERS[export.output_type]('uploaded', export, ff_meta, tibanna) if pf_meta: pf_meta = update_processed_file_metadata('uploaded', pf_meta, tibanna) elif status in ['FAILED']: - patch_meta = OUTFILE_UPDATERS[awsem.app_name]('upload failed', export, ff_meta, tibanna) + patch_meta = OUTFILE_UPDATERS[export.output_type]('upload failed', export, ff_meta, tibanna) ff_meta.run_status = 'error' ff_meta.post(key=tibanna.ff_keys) raise Exception("Failed to export file %s" % (upload_key)) @@ -255,8 +222,5 @@ def handler(event, context): # Cardinal knowledge of all workflow updaters OUTFILE_UPDATERS = defaultdict(lambda: donothing) -OUTFILE_UPDATERS['md5'] = md5_updater -OUTFILE_UPDATERS['validatefiles'] = md5_updater -OUTFILE_UPDATERS['fastqc-0-11-4-1/1'] = fastqc_updater -OUTFILE_UPDATERS['fastqc-0-11-4-1'] = fastqc_updater -OUTFILE_UPDATERS['pairsqc-single'] = pairsqc_updater +OUTFILE_UPDATERS['Output report file'] = md5_updater +OUTFILE_UPDATERS['Output QC file'] = qc_updater diff --git a/core/update_ffmeta_awsf_dev/event_fastqc.json b/core/update_ffmeta_awsf_dev/event_fastqc.json new file mode 100644 index 000000000..14e456bb2 --- /dev/null +++ b/core/update_ffmeta_awsf_dev/event_fastqc.json @@ -0,0 +1,98 @@ +{ + "ff_meta": { + "run_platform": "AWSEM", + "uuid": "18564368-1299-4ce0-82d1-c8917450d834", + "parameters": [], + "workflow": "2324ad76-ff37-4157-8bcc-3ce72b7dace9", + "title": "fastqc-0-11-4-1 hahaha run 2018-01-16 23:08:21.335210", + "award": "1U01CA200059-01", + "awsem_job_id": "", + "run_url": "https://console.aws.amazon.com/states/home?region=us-east-1#/executions/details/arn:aws:states:us-east-1:643366669028:execution:run_awsem_new_pony-dev:fastqc_613a0edc-f130-49a0-b97c-b812a6a42d84", + "awsem_app_name": "fastqc-0-11-4-1", + "lab": "4dn-dcic-lab", + "run_status": "started", + "output_files": [ + { + "type": "Output QC file", + "workflow_argument_name": "report_zip" + } + ], + "input_files": [ + { + "ordinal": 1, + "workflow_argument_name": "input_fastq", + "value": "f4864029-a8ad-4bb8-93e7-5108f462ccaa" + } + ] + }, + "instance_ip": "54.164.70.164", + "app_name": "fastqc-0-11-4-1", + "_tibanna": { + "env": "fourfront-webdev", + "settings": { + "url": "https://console.aws.amazon.com/states/home?region=us-east-1#/executions/details/arn:aws:states:us-east-1:643366669028:execution:run_awsem_new_pony-dev:fastqc_613a0edc-f130-49a0-b97c-b812a6a42d84", + "run_type": "fastqc", + "run_id": "613a0edc-f130-49a0-b97c-b812a6a42d84", + "env": "fourfront-webdev", + "run_name": "fastqc_613a0edc-f130-49a0-b97c-b812a6a42d84" + } + }, + "start_time": "20180116-23:08:29-UTC", + "args": { + "secondary_output_target": {}, + "app_name": "fastqc-0-11-4-1", + "input_parameters": {}, + "cwl_child_filenames": [ + "fastqc-0-11-4.cwl" + ], + "output_target": { + "report_zip": "18564368-1299-4ce0-82d1-c8917450d834/report_zip" + }, + "cwl_main_filename": "fastqc-0-11-4-1.cwl", + "secondary_files": {}, + "output_S3_bucket": "elasticbeanstalk-fourfront-webdev-wfoutput", + "app_version": "0.2.0", + "cwl_directory_url": "https://raw.githubusercontent.com/4dn-dcic/pipelines-cwl/0.2.0/cwl_awsem/", + "input_files": { + "input_fastq": { + "bucket_name": "elasticbeanstalk-fourfront-webdev-files", + "object_key": "f4864029-a8ad-4bb8-93e7-5108f462ccaa/4DNFIRSRJH45.fastq.gz" + } + } + }, + "output_bucket": "elasticbeanstalk-fourfront-webdev-wfoutput", + "jobid": "eUJZ9nH7Rsz2", + "instance_id": "i-04df1c1ca7a06ef66", + "tag": "hahaha", + "pf_meta": [], + "parameters": {}, + "config": { + "ebs_size": 10, + "password": "", + "ebs_type": "io1", + "json_bucket": "4dn-aws-pipeline-run-json", + "json_dir": "/tmp/json", + "EBS_optimized": false, + "ebs_iops": 500, + "userdata_dir": "/tmp/userdata", + "shutdown_min": "300", + "instance_type": "t2.micro", + "s3_access_arn": "arn:aws:iam::643366669028:instance-profile/S3_access", + "job_tag": "fastqc-0-11-4-1", + "copy_to_s3": true, + "script_url": "https://raw.githubusercontent.com/4dn-dcic/tibanna/master/awsf/", + "key_name": "4dn-encode", + "launch_instance": true, + "ami_id": "ami-cfb14bb5", + "log_bucket": "tibanna-output" + }, + "workflow_uuid": "2324ad76-ff37-4157-8bcc-3ce72b7dace9", + "input_files": [ + { + "workflow_argument_name": "input_fastq", + "bucket_name": "elasticbeanstalk-fourfront-webdev-files", + "uuid": "f4864029-a8ad-4bb8-93e7-5108f462ccaa", + "object_key": "4DNFIRSRJH45.fastq.gz" + } + ] +} diff --git a/core/update_ffmeta_awsf_dev/event_pairsqc.json b/core/update_ffmeta_awsf_dev/event_pairsqc.json new file mode 100644 index 000000000..d9e110e38 --- /dev/null +++ b/core/update_ffmeta_awsf_dev/event_pairsqc.json @@ -0,0 +1,130 @@ +{ + "ff_meta": { + "run_platform": "AWSEM", + "uuid": "3d71f45f-6a77-47b4-9839-eb02204c6dae", + "parameters": [ + { + "workflow_argument_name": "enzyme", + "value": "6" + }, + { + "workflow_argument_name": "sample_name", + "value": "4DNFI1ZLO9D7" + } + ], + "workflow": "ae3a87cb-3fa2-469e-97c7-540fc2d0a117", + "title": "pairsqc-single run 2018-01-16 23:08:25.250148", + "award": "1U01CA200059-01", + "awsem_job_id": "", + "run_url": "https://console.aws.amazon.com/states/home?region=us-east-1#/executions/details/arn:aws:states:us-east-1:643366669028:execution:run_awsem_new_pony-dev:pairsqc-single_649e1149-68b5-43b3-bc92-65ca077dc983", + "awsem_app_name": "pairsqc-single", + "lab": "4dn-dcic-lab", + "run_status": "started", + "output_files": [ + { + "type": "Output QC file", + "workflow_argument_name": "report" + } + ], + "input_files": [ + { + "ordinal": 1, + "workflow_argument_name": "input_pairs", + "value": "817f3faa-0573-45c0-8230-02ec19de6544" + }, + { + "ordinal": 1, + "workflow_argument_name": "chromsizes", + "value": "4a6d10ee-2edb-4402-a98f-0edb1d58f5e9" + } + ] + }, + "instance_ip": "52.87.161.131", + "app_name": "pairsqc-single", + "_tibanna": { + "env": "fourfront-webdev", + "settings": { + "url": "https://console.aws.amazon.com/states/home?region=us-east-1#/executions/details/arn:aws:states:us-east-1:643366669028:execution:run_awsem_new_pony-dev:pairsqc-single_649e1149-68b5-43b3-bc92-65ca077dc983", + "run_type": "pairsqc-single", + "run_id": "649e1149-68b5-43b3-bc92-65ca077dc983", + "env": "fourfront-webdev", + "run_name": "pairsqc-single_649e1149-68b5-43b3-bc92-65ca077dc983" + } + }, + "start_time": "20180116-23:08:32-UTC", + "args": { + "secondary_output_target": {}, + "app_name": "pairsqc-single", + "input_parameters": { + "enzyme": "6", + "sample_name": "4DNFI1ZLO9D7" + }, + "cwl_child_filenames": [], + "output_target": { + "report": "3d71f45f-6a77-47b4-9839-eb02204c6dae/report" + }, + "cwl_main_filename": "pairsqc-single.cwl", + "secondary_files": { + "input_pairs": { + "bucket_name": "elasticbeanstalk-fourfront-webdev-wfoutput", + "object_key": "817f3faa-0573-45c0-8230-02ec19de6544/4DNFI1ZLO9D7.pairs.gz.px2" + } + }, + "output_S3_bucket": "elasticbeanstalk-fourfront-webdev-wfoutput", + "app_version": "dev", + "cwl_directory_url": "https://raw.githubusercontent.com/4dn-dcic/pipelines-cwl/dev/cwl_awsem/", + "input_files": { + "chromsizes": { + "bucket_name": "elasticbeanstalk-fourfront-webdev-files", + "object_key": "4a6d10ee-2edb-4402-a98f-0edb1d58f5e9/4DNFI823LSII.chrom.sizes" + }, + "input_pairs": { + "bucket_name": "elasticbeanstalk-fourfront-webdev-wfoutput", + "object_key": "817f3faa-0573-45c0-8230-02ec19de6544/4DNFI1ZLO9D7.pairs.gz" + } + } + }, + "output_bucket": "elasticbeanstalk-fourfront-webdev-wfoutput", + "jobid": "11Jz5nf2xJGt", + "instance_id": "i-0901c30deadd480e4", + "pf_meta": [], + "parameters": { + "enzyme": "6", + "sample_name": "4DNFI1ZLO9D7" + }, + "config": { + "ebs_size": 13, + "password": "dragonfly", + "ebs_type": "io1", + "json_bucket": "4dn-aws-pipeline-run-json", + "json_dir": "/tmp/json", + "EBS_optimized": false, + "ebs_iops": 500, + "userdata_dir": "/tmp/userdata", + "shutdown_min": 120, + "instance_type": "t2.micro", + "s3_access_arn": "arn:aws:iam::643366669028:instance-profile/S3_access", + "job_tag": "pairsqc-single", + "copy_to_s3": true, + "script_url": "https://raw.githubusercontent.com/4dn-dcic/tibanna/master/awsf/", + "key_name": "4dn-encode", + "launch_instance": true, + "ami_id": "ami-cfb14bb5", + "log_bucket": "tibanna-output" + }, + "workflow_uuid": "ae3a87cb-3fa2-469e-97c7-540fc2d0a117", + "input_files": [ + { + "workflow_argument_name": "input_pairs", + "bucket_name": "elasticbeanstalk-fourfront-webdev-wfoutput", + "uuid": "817f3faa-0573-45c0-8230-02ec19de6544", + "object_key": "4DNFI1ZLO9D7.pairs.gz" + }, + { + "object_key": "4DNFI823LSII.chrom.sizes", + "bucket_name": "elasticbeanstalk-fourfront-webdev-files", + "workflow_argument_name": "chromsizes", + "uuid": "4a6d10ee-2edb-4402-a98f-0edb1d58f5e9" + } + ] +} diff --git a/core/update_ffmeta_awsf_dev/service.py b/core/update_ffmeta_awsf_dev/service.py index 4c0e47e18..eee98de26 100644 --- a/core/update_ffmeta_awsf_dev/service.py +++ b/core/update_ffmeta_awsf_dev/service.py @@ -3,7 +3,7 @@ from core import utils, ff_utils, ec2_utils import boto3 from collections import defaultdict -from core.fastqc_utils import parse_fastqc, parse_qc_table +from core.fastqc_utils import parse_qc_table LOG = logging.getLogger(__name__) s3 = boto3.resource('s3') @@ -30,7 +30,25 @@ def update_processed_file_metadata(status, pf_meta, tibanna): return pf_meta -def fastqc_updater(status, wf_file, ff_meta, tibanna): +def qc_updater(status, wf_file, ff_meta, tibanna): + if ff_meta.awsem_app_name == 'fastqc-0-11-4-1': + return _qc_updater(status, wf_file, ff_meta, tibanna, + quality_metric='quality_metric_fastqc', + inputfile_argument='input_fastq', + report_html='fastqc_report.html', + datafiles=['summary.txt', 'fastqc_data.txt']) + elif ff_meta.awsem_app_name == 'pairsqc-single': + inputfile_argument = 'input_pairs' + input_accession = str(wf_file.runner.inputfile_accessions[inputfile_argument]) + return _qc_updater(status, wf_file, ff_meta, tibanna, + quality_metric="quality_metric_pairsqc", + inputfile_argument=inputfile_argument, report_html='pairsqc_report.html', + datafiles=[input_accession + '.summary.out']) + + +def _qc_updater(status, wf_file, ff_meta, tibanna, quality_metric='quality_metric_fastqc', + inputfile_argument='input_fastq', report_html='fastqc_report.html', + datafiles=['summary.txt', 'fastqc_data.txt']): if status == 'uploading': # wait until this bad boy is finished return @@ -38,63 +56,10 @@ def fastqc_updater(status, wf_file, ff_meta, tibanna): ff_key = tibanna.ff_keys # move files to proper s3 location # need to remove sbg from this line - accession = wf_file.runner.inputfile_accessions[0] + accession = wf_file.runner.inputfile_accessions[inputfile_argument] zipped_report = wf_file.key - files_to_parse = ['summary.txt', 'fastqc_data.txt', 'fastqc_report.html'] - LOG.info("accession is %s" % accession) - - try: - files = wf_file.s3.unzip_s3_to_s3(zipped_report, accession, files_to_parse, - acl='public-read') - except Exception as e: - LOG.info(tibanna.s3.__dict__) - raise Exception("%s (key={})\n".format(zipped_report) % e) - # parse fastqc metadata - meta = parse_fastqc(files['summary.txt']['data'], - files['fastqc_data.txt']['data'], - url=files['fastqc_report.html']['s3key']) - LOG.info("fastqc meta is %s" % meta) - - # post fastq metadata - qc_meta = ff_utils.post_to_metadata(meta, 'quality_metric_fastqc', key=ff_key) - if qc_meta.get('@graph'): - qc_meta = qc_meta['@graph'][0] - - LOG.info("qc_meta is %s" % qc_meta) - # update original file as well - try: - original_file = ff_utils.get_metadata(accession, key=ff_key) - LOG.info("original_file is %s" % original_file) - except Exception as e: - raise Exception("Couldn't get metadata for accession {} : ".format(accession) + str(e)) - patch_file = {'quality_metric': qc_meta['@id']} - try: - ff_utils.patch_metadata(patch_file, original_file['uuid'], key=ff_key) - except Exception as e: - raise Exception("patch_metadata failed in fastqc_updater." + str(e) + - "original_file ={}\n".format(str(original_file))) - - # patch the workflow run, value_qc is used to make drawing graphs easier. - output_files = ff_meta.output_files - output_files[0]['value_qc'] = qc_meta['@id'] - retval = {"output_quality_metrics": [{"name": "quality_metric_fastqc", "value": qc_meta['@id']}], - 'output_files': output_files} - - LOG.info("retval is %s" % retval) - return retval - - -def pairsqc_updater(status, wf_file, ff_meta, tibanna, quality_metric="quality_metric_pairsqc"): - if status == 'uploading': - # wait until this bad boy is finished - return - # keys - ff_key = tibanna.ff_keys - # move files to proper s3 location - # need to remove sbg from this line - accession = wf_file.runner.inputfile_accessions[0] - zipped_report = wf_file.key - files_to_parse = [accession + '.summary.out', 'pairsqc_report.html'] + files_to_parse = datafiles + files_to_parse.append(report_html) LOG.info("accession is %s" % accession) try: @@ -108,8 +73,10 @@ def pairsqc_updater(status, wf_file, ff_meta, tibanna, quality_metric="quality_m qc_schema = ff_utils.get_metadata("profiles/" + quality_metric + ".json", key=ff_key) # parse fastqc metadata - meta = parse_qc_table([files[accession + '.summary.out']['data']], - url=files['pairsqc_report.html']['s3key'], + LOG.info("files : %s" % str(files)) + filedata = [files[_]['data'] for _ in datafiles] + meta = parse_qc_table(filedata, + url=files[report_html]['s3key'], qc_schema=qc_schema.get('properties')) LOG.info("fastqc meta is %s" % meta) @@ -129,13 +96,13 @@ def pairsqc_updater(status, wf_file, ff_meta, tibanna, quality_metric="quality_m try: ff_utils.patch_metadata(patch_file, original_file['uuid'], key=ff_key) except Exception as e: - raise Exception("patch_metadata failed in pairsqc_updater." + str(e) + + raise Exception("patch_metadata failed in fastqc_updater." + str(e) + "original_file ={}\n".format(str(original_file))) # patch the workflow run, value_qc is used to make drawing graphs easier. output_files = ff_meta.output_files output_files[0]['value_qc'] = qc_meta['@id'] - retval = {"output_quality_metrics": [{"name": "quality_metric_pairsqc", "value": qc_meta['@id']}], + retval = {"output_quality_metrics": [{"name": quality_metric, "value": qc_meta['@id']}], 'output_files': output_files} LOG.info("retval is %s" % retval) @@ -146,7 +113,7 @@ def md5_updater(status, wf_file, ff_meta, tibanna): # get key ff_key = tibanna.ff_keys # get metadata about original input file - accession = wf_file.runner.inputfile_accessions[0] + accession = wf_file.runner.inputfile_accessions['input_file'] original_file = ff_utils.get_metadata(accession, key=ff_key) if status.lower() == 'uploaded': @@ -222,11 +189,11 @@ def handler(event, context): status = export.status print("export res is %s", status) if status == 'COMPLETED': - patch_meta = OUTFILE_UPDATERS[awsem.app_name]('uploaded', export, ff_meta, tibanna) + patch_meta = OUTFILE_UPDATERS[export.output_type]('uploaded', export, ff_meta, tibanna) if pf_meta: pf_meta = update_processed_file_metadata('uploaded', pf_meta, tibanna) elif status in ['FAILED']: - patch_meta = OUTFILE_UPDATERS[awsem.app_name]('upload failed', export, ff_meta, tibanna) + patch_meta = OUTFILE_UPDATERS[export.output_type]('upload failed', export, ff_meta, tibanna) ff_meta.run_status = 'error' ff_meta.post(key=tibanna.ff_keys) raise Exception("Failed to export file %s" % (upload_key)) @@ -255,8 +222,5 @@ def handler(event, context): # Cardinal knowledge of all workflow updaters OUTFILE_UPDATERS = defaultdict(lambda: donothing) -OUTFILE_UPDATERS['md5'] = md5_updater -OUTFILE_UPDATERS['validatefiles'] = md5_updater -OUTFILE_UPDATERS['fastqc-0-11-4-1/1'] = fastqc_updater -OUTFILE_UPDATERS['fastqc-0-11-4-1'] = fastqc_updater -OUTFILE_UPDATERS['pairsqc-single'] = pairsqc_updater +OUTFILE_UPDATERS['Output report file'] = md5_updater +OUTFILE_UPDATERS['Output QC file'] = qc_updater diff --git a/src/benchmark b/src/benchmark index 7effa6fa0..8bd69329f 160000 --- a/src/benchmark +++ b/src/benchmark @@ -1 +1 @@ -Subproject commit 7effa6fa0a0cfb03b55059e8b365798a63d29649 +Subproject commit 8bd69329f59d5da269db9ac81e11a9026e18ee74 diff --git a/tests/core/conftest.py b/tests/core/conftest.py index f1976dec0..0d10825cf 100644 --- a/tests/core/conftest.py +++ b/tests/core/conftest.py @@ -86,20 +86,35 @@ def check_export_event_data(sbg_keys, ff_keys): @pytest.fixture(scope='session') -def run_awsf_event_data(sbg_keys, ff_keys): +def run_awsf_event_data(ff_keys): return get_event_file_for('start_run_awsf', ff_keys=ff_keys) @pytest.fixture(scope='session') -def run_awsf_event_data_secondary_files(sbg_keys, ff_keys): +def run_task_awsf_event_data(ff_keys): + return get_event_file_for('run_task_awsf', ff_keys=ff_keys) + + +@pytest.fixture(scope='session') +def run_awsf_event_data_secondary_files(ff_keys): return get_event_file_for('start_run_awsf', ff_keys=ff_keys, event_file='event2.json') @pytest.fixture(scope='session') -def update_ffmeta_event_data(sbg_keys, ff_keys): +def update_ffmeta_event_data(ff_keys): return get_event_file_for('update_ffmeta_awsf', ff_keys=ff_keys) +@pytest.fixture(scope='session') +def update_ffmeta_event_data_pairsqc(ff_keys): + return get_event_file_for('update_ffmeta_awsf', ff_keys=ff_keys, event_file='event_pairsqc.json') + + +@pytest.fixture(scope='session') +def update_ffmeta_event_data_fastqc(ff_keys): + return get_event_file_for('update_ffmeta_awsf', ff_keys=ff_keys, event_file='event_fastqc.json') + + def get_test_json(file_name): dir_path = os.path.dirname(os.path.realpath(__file__)) event_file_name = os.path.join(dir_path, '..', '..', 'test_json', file_name) diff --git a/tests/core/start_run_awsem/test_handler.py b/tests/core/start_run_awsem/test_handler.py index 1146e729e..bdceda96d 100644 --- a/tests/core/start_run_awsem/test_handler.py +++ b/tests/core/start_run_awsem/test_handler.py @@ -4,9 +4,6 @@ get_format_extension_map, handle_processed_files ) -from core.run_task_awsf.service import ( - update_config -) from ..conftest import valid_env from core.utils import Tibanna from core import ff_utils @@ -79,14 +76,3 @@ def test_handle_processed_files(run_awsf_event_data_secondary_files): assert pdict['extra_files'] == [{'file_format': 'pairs_px2'}] else: assert 'extra_files' not in pdict - - -@valid_env -@pytest.mark.webtest -def test_update_config(run_awsf_event_data): - data = run_awsf_event_data - config = update_config(data['config'], data['app_name'], data['input_files'], data['parameters']) - assert config['instance_type'] == 't2.nano' - assert config['EBS_optimized'] is False - assert config['ebs_size'] >= 10 - assert config['copy_to_s3'] is True # check the other fields are preserved in the returned config diff --git a/tests/core/test_ec2_utils.py b/tests/core/test_ec2_utils.py index 78d25e218..49304c571 100644 --- a/tests/core/test_ec2_utils.py +++ b/tests/core/test_ec2_utils.py @@ -1,4 +1,4 @@ -from core.ec2_utils import Awsem +from core.ec2_utils import Awsem, update_config def test_create_awsem(update_ffmeta_event_data, tibanna_env): @@ -8,6 +8,7 @@ def test_create_awsem(update_ffmeta_event_data, tibanna_env): assert awsem.config assert awsem.app_name assert awsem.output_s3 + assert awsem.output_files_meta def test_get_output_files(update_ffmeta_event_data, tibanna_env): @@ -19,6 +20,7 @@ def test_get_output_files(update_ffmeta_event_data, tibanna_env): assert of[first_key].runner == awsem assert of[first_key].bucket == awsem.output_s3 assert of[first_key].key == 'lalala/md5_report' + assert of[first_key].output_type == 'Output report file' def test_get_input_files(update_ffmeta_event_data, tibanna_env): @@ -36,4 +38,14 @@ def test_get_input_files(update_ffmeta_event_data, tibanna_env): def test_get_inputfile_accession(update_ffmeta_event_data, tibanna_env): update_ffmeta_event_data.update(tibanna_env) awsem = Awsem(update_ffmeta_event_data) - assert awsem.inputfile_accessions[0] == '4DNFIRSRJH45' + assert awsem.inputfile_accessions['input_file'] == '4DNFIRSRJH45' + + +def test_update_config(run_task_awsf_event_data): + data = run_task_awsf_event_data + config = data['config'] + update_config(config, data['args']['app_name'], data['args']['input_files'], data['args']['input_parameters']) + assert config['instance_type'] == 't2.micro' + assert config['EBS_optimized'] is False + assert config['ebs_size'] >= 10 + assert config['copy_to_s3'] is True # check the other fields are preserved in the returned config diff --git a/tests/core/test_fastqc_utils.py b/tests/core/test_fastqc_utils.py index 12840f95d..7d9ca7013 100644 --- a/tests/core/test_fastqc_utils.py +++ b/tests/core/test_fastqc_utils.py @@ -4,6 +4,7 @@ DIR = os.path.dirname(__file__) FASTQC_DIR = os.path.join(DIR, '..', 'files', 'fastqc_report') +PAIRSQC_DIR = os.path.join(DIR, '..', 'files', 'pairsqc_report') @pytest.fixture @@ -16,7 +17,45 @@ def data(): return open(os.path.join(FASTQC_DIR, 'fastqc_data.txt')).read() +@pytest.fixture +def pairsqc_summary(): + return open(os.path.join(PAIRSQC_DIR, "sample1.summary.out")).read() + + +@pytest.fixture +def quality_metric_pairsqc_schema(): + qc_schema = {'Total reads': {'type': 'number'}, + 'Cis/Trans ratio': {'type': 'number'}, + 'convergence': {'type': 'string'}} + return qc_schema + + +@pytest.fixture +def quality_metric_fastqc_schema(): + qc_schema = {'Total Sequences': {'type': 'number'}, + 'Kmer Content': {'type': 'string'}, + 'overall_quality_status': {'type': 'string'}} + return qc_schema + + def test_parse_fastqc(summary, data): meta = fastqc_utils.parse_fastqc(summary, data, url='test_url') assert meta['url'] == 'test_url' assert meta['overall_quality_status'] == 'PASS' + + +def test_parse_qc_table_pairsqc(pairsqc_summary, quality_metric_pairsqc_schema): + meta = fastqc_utils.parse_qc_table([pairsqc_summary], url='test_url', + qc_schema=quality_metric_pairsqc_schema) + assert meta['Total reads'] == 651962 + assert meta['Cis/Trans ratio'] == 64.141 + assert meta['convergence'] == 'Good' + + +def test_parse_qc_table_fastqc(summary, data, quality_metric_fastqc_schema): + meta = fastqc_utils.parse_qc_table([summary, data], url='test_url', + qc_schema=quality_metric_fastqc_schema) + assert meta['Total Sequences'] == 557747 + assert meta['Kmer Content'] == 'WARN' + assert meta['url'] == 'test_url' + assert meta['overall_quality_status'] == 'PASS' diff --git a/tests/core/test_utils.py b/tests/core/test_utils.py index b2e8e02c3..282ebd2fb 100644 --- a/tests/core/test_utils.py +++ b/tests/core/test_utils.py @@ -152,6 +152,28 @@ def test_unzip_s3_to_s3(s3_utils): assert objs.get('Contents', None) +@valid_env +@pytest.mark.webtest +def test_unzip_s3_to_s3_pairsqc(s3_utils): + prefix = '__test_data/extracted' + filename = '23d0a314-e401-4826-a76b-4356e019b059/report' + s3_utils.s3_delete_dir(prefix) + + # ensure this thing was deleted + # if no files there will be no Contents in response + objs = s3_utils.s3_read_dir(prefix) + assert [] == objs.get('Contents', []) + + # now copy to that dir we just deleted + retfile_list = ['4DNFI1ZLO9D7.summary.out', 'pairsqc_report.html'] + ret_files = s3_utils.unzip_s3_to_s3(filename, prefix, retfile_list) + assert 3 == len(ret_files.keys()) + assert ret_files['pairsqc_report.html']['s3key'].startswith("https://s3.amazonaws.com") + + objs = s3_utils.s3_read_dir(prefix) + assert objs.get('Contents', None) + + @valid_env @pytest.mark.webtest def test_create_sbg_workflow(sbg_project, sbg_keys): diff --git a/tests/files/pairsqc_report/sample1.summary.out b/tests/files/pairsqc_report/sample1.summary.out new file mode 100644 index 000000000..06f0a7c54 --- /dev/null +++ b/tests/files/pairsqc_report/sample1.summary.out @@ -0,0 +1,8 @@ +Total reads 651,962 +Short cis reads (<20kb) 221,017 +Cis reads (>20kb) 276,411 +Trans reads 154,534 +Cis/Trans ratio 64.141 +% Long-range intrachromosomal reads 42.397 +convergence Good +slope -0.95