Skip to content

Commit

Permalink
Merge pull request #322 from 4dn-dcic/encrypt_upload
Browse files Browse the repository at this point in the history
Encrypt upload
  • Loading branch information
SooLee authored Mar 16, 2021
2 parents 4ee9bed + f744962 commit 431c146
Show file tree
Hide file tree
Showing 26 changed files with 1,393 additions and 156 deletions.
14 changes: 10 additions & 4 deletions awsf3-docker/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,9 @@ export INSTANCE_REGION=$(ec2metadata --availability-zone | sed 's/[a-z]$//')
export INSTANCE_AVAILABILITY_ZONE=$(ec2metadata --availability-zone)
export AWS_ACCOUNT_ID=$(aws sts get-caller-identity| grep Account | sed 's/[^0-9]//g')
export AWS_REGION=$INSTANCE_REGION # this is for importing awsf3 package which imports tibanna package

export LOCAL_OUTDIR_CWL=$MOUNT_DIR_PREFIX$LOCAL_OUTDIR
export LOCAL_OUTDIR_WDL=/data1/wdl/cromwell-executions/
export LOCAL_WF_TMPDIR_CWL=$MOUNT_DIR_PREFIX$LOCAL_WF_TMPDIR


# function that executes a command and collecting log
Expand Down Expand Up @@ -203,7 +205,6 @@ exl echo
send_log
cwd0=$(pwd)
cd $LOCAL_WFDIR
mkdir -p $LOCAL_WF_TMPDIR
if [[ $LANGUAGE == 'wdl_v1' || $LANGUAGE == 'wdl' ]]
then
exl java -jar /usr/local/bin/cromwell.jar run $MAIN_WDL -i $cwd0/$INPUT_YML_FILE -m $LOGJSONFILE
Expand Down Expand Up @@ -233,7 +234,8 @@ else
fi
# cwltool cannot recognize symlinks and end up copying output from tmp directory intead of moving.
# To prevent this, use the original directory name here.
exlj cwltool --enable-dev --non-strict --no-read-only --no-match-user --outdir $MOUNT_DIR_PREFIX$LOCAL_OUTDIR --tmp-outdir-prefix $MOUNT_DIR_PREFIX$LOCAL_WF_TMPDIR --tmpdir-prefix $MOUNT_DIR_PREFIX$LOCAL_WF_TMPDIR $PRESERVED_ENV_OPTION $SINGULARITY_OPTION $MAIN_CWL $cwd0/$INPUT_YML_FILE
mkdir -p $LOCAL_WF_TMPDIR_CWL
exlj cwltool --enable-dev --non-strict --no-read-only --no-match-user --outdir $LOCAL_OUTDIR_CWL --tmp-outdir-prefix $LOCAL_WF_TMPDIR_CWL --tmpdir-prefix $LOCAL_WF_TMPDIR_CWL $PRESERVED_ENV_OPTION $SINGULARITY_OPTION $MAIN_CWL $cwd0/$INPUT_YML_FILE
handle_error $?
fi
cd $cwd0
Expand All @@ -245,7 +247,11 @@ send_log
exl echo
exl echo "## Calculating md5sum of output files"
exl date
md5sum $LOCAL_OUTDIR/* | grep -v "$JOBID" >> $MD5FILE ; ## calculate md5sum for output files (except log file, to avoid confusion)
touch $MD5FILE
if [[ $LANGUAGE == 'cwl_v1' ]]
then
exl md5sum $LOCAL_OUTDIR_CWL/* | grep -v "$JOBID" >> $MD5FILE ; ## exclude log file, to avoid confusion
fi
exl cat $MD5FILE
mv $MD5FILE $LOCAL_OUTDIR
exl date ## done time
Expand Down
13 changes: 9 additions & 4 deletions awsf3/target.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,13 +108,16 @@ def unzip_source(self):
yield {'name': content_file_name, 'content': z.open(content_file_name).read()}
yield None

def upload_to_s3(self):
def upload_to_s3(self, encrypt_s3_upload=False):
"""upload target to s3, source can be either a file or a directory."""
if not self.is_valid:
raise Exception('Upload Error: source / dest must be specified first')
if not self.s3:
self.s3 = boto3.client('s3')
err_msg = "failed to upload output file %s to %s. %s"
upload_extra_args = {}
if encrypt_s3_upload:
upload_extra_args = {'ServerSideEncryption': 'aws:kms'}
if os.path.isdir(self.source):
print("source " + self.source + " is a directory")
print("uploading output directory %s to %s in bucket %s" % (self.source, self.dest, self.bucket))
Expand All @@ -132,7 +135,7 @@ def upload_to_s3(self):
print("source_f=" + source_f)
print("dest_f=" + dest_f)
try:
self.s3.upload_file(source_f, self.bucket, dest_f)
self.s3.upload_file(source_f, self.bucket, dest_f, ExtraArgs=upload_extra_args)
except Exception as e:
raise Exception(err_msg % (source_f, self.bucket + '/' + dest_f, str(e)))
elif self.unzip:
Expand All @@ -153,6 +156,8 @@ def upload_to_s3(self):
'Key': self.dest + arcfile['name'],
'Body': arcfile['content'],
'ContentType': content_type}
if encrypt_s3_upload:
put_object_args.update(upload_extra_args)
try:
print("Putting object %s to %s in bucket %s" % (arcfile['name'], self.dest + arcfile['name'], self.bucket))
self.s3.put_object(**put_object_args)
Expand All @@ -166,13 +171,13 @@ def upload_to_s3(self):
dest = os.path.join(self.dest, self.source_name)
print("uploading output source %s to %s in bucket %s" % (self.source, dest, self.bucket))
try:
self.s3.upload_file(self.source, self.bucket, dest)
self.s3.upload_file(self.source, self.bucket, dest, ExtraArgs=upload_extra_args)
except Exception as e:
raise Exception(err_msg % (self.source, self.bucket + '/' + dest, str(e)))
else:
try:
print("uploading output source %s to %s in bucket %s" % (self.source, self.dest, self.bucket))
self.s3.upload_file(self.source, self.bucket, self.dest)
self.s3.upload_file(self.source, self.bucket, self.dest, ExtraArgs=upload_extra_args)
except Exception as e:
raise Exception(err_msg % (self.source, self.bucket + '/' + self.dest, str(e)))

Expand Down
46 changes: 32 additions & 14 deletions awsf3/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -275,13 +275,14 @@ def read_md5file(md5file):
return md5dict


def create_output_files_dict(language='cwl', execution_metadata=None, md5dict=None):
def create_output_files_dict(language='cwl', execution_metadata=None, md5dict=None, strict=True):
"""create a dictionary that contains 'path', 'secondaryFiles', 'md5sum' with argnames as keys.
For snakemake and shell, returns an empty dictionary (execution_metadata not required).
secondaryFiles is added only if the language is cwl.
execution_metadata is a dictionary read from wdl/cwl execution log json file.
md5dict is a dictionary with key=file path, value=md5sum (optional)."""
if language in ['cwl', 'cwl_v1', 'wdl'] and not execution_metadata:
md5dict is a dictionary with key=file path, value=md5sum (optional).
if strict is set False, then it does not check executio_metadata exists for cwl/wdl."""
if language in ['cwl', 'cwl_v1', 'wdl', 'wdl_v1', 'wdl_draft2'] and not execution_metadata and strict:
raise Exception("execution_metadata is required for cwl/wdl.")
out_meta = dict()
if language in ['wdl', 'wdl_v1', 'wdl_draft2']:
Expand Down Expand Up @@ -338,38 +339,43 @@ def update_postrun_json_init(json_old, json_new):
write_postrun_json(json_new, prj)


def update_postrun_json_upload_output(json_old, execution_metadata_file, md5file, json_new, language='cwl_v1'):
"""Update postrun json with output files"""
def update_postrun_json_upload_output(json_old, execution_metadata_file, md5file, json_new,
language='cwl_v1', strict=True, upload=True):
"""Update postrun json with output files.
if strict is set false, it does not check execution metadata is required for cwl/wdl."""
# read old json file and prepare postrunjson skeleton
prj = read_postrun_json(json_old)

# read md5 file
md5dict = read_md5file(md5file)
print(md5dict)

# read execution metadata file
if execution_metadata_file:
with open(execution_metadata_file, 'r') as f:
execution_metadata = json.load(f)
else:
execution_metadata = None
output_files = create_output_files_dict(language, execution_metadata, md5dict)
output_files = create_output_files_dict(language, execution_metadata, md5dict, strict=strict)
print(output_files)

# create output files for postrun json
prj.Job.Output.add_output_files(output_files)

# upload output to S3 (this also updates postrun json)
upload_output(prj)
if upload:
upload_output(prj)

# write to new json file
write_postrun_json(json_new, prj)


def upload_output(prj):
# parsing output_target and uploading output files to output target
upload_to_output_target(prj.Job.Output)
upload_to_output_target(prj.Job.Output, prj.config.encrypt_s3_upload)


def upload_to_output_target(prj_out):
def upload_to_output_target(prj_out, encrypt_s3_upload=False):
# parsing output_target and uploading output files to output target
output_bucket = prj_out.output_bucket_directory
output_argnames = prj_out.output_files.keys()
Expand All @@ -384,7 +390,7 @@ def upload_to_output_target(prj_out):
target.parse_custom_target(k, output_target[k])
if target.is_valid:
print("Target is valid. Uploading..")
target.upload_to_s3()
target.upload_to_s3(encrypt_s3_upload=encrypt_s3_upload)
else:
raise Exception("Invalid target %s -> %s: failed to upload" % k, output_target[k])
else:
Expand All @@ -393,7 +399,7 @@ def upload_to_output_target(prj_out):
target.parse_cwl_target(k, output_target.get(k, ''), prj_out.output_files)
if target.is_valid:
print("Target is valid. Uploading..")
target.upload_to_s3()
target.upload_to_s3(encrypt_s3_upload=encrypt_s3_upload)
prj_out.output_files[k].add_target(target.dest)

# upload secondary files
Expand All @@ -403,7 +409,7 @@ def upload_to_output_target(prj_out):
stlist.parse_target_values(prj_out.secondary_output_target.get(k, []))
stlist.reorder_by_source([sf.path for sf in secondary_output_files])
for st in stlist.secondary_targets:
st.upload_to_s3()
st.upload_to_s3(encrypt_s3_upload=encrypt_s3_upload)
for i, sf in enumerate(secondary_output_files):
sf.add_target(stlist.secondary_targets[i].dest)
else:
Expand Down Expand Up @@ -440,16 +446,28 @@ def postrun_json_final(prj, logfile=None):
prj_job.update(total_input_size=os.getenv('INPUTSIZE'))
prj_job.update(total_tmp_size=os.getenv('TEMPSIZE'))
prj_job.update(total_output_size=os.getenv('OUTPUTSIZE'))



def upload_postrun_json(jsonfile):
prj = read_postrun_json(jsonfile)
bucket = prj.Job.Log.log_bucket_directory
dest = prj.Job.JOBID + '.postrun.json'
if '/' in bucket:
bucket_dirs = bucket.split('/')
bucket = bucket_dirs.pop(0)
prefix = '/'.join(bucket_dirs)
dest = prefix + '/' + dest
if prj.config.public_postrun_json:
acl = 'public-read'
else:
acl = 'private'
s3 = boto3.client('s3')
s3.put_object(ACL=acl, Body=format_postrun_json(prj).encode('utf-8'), Bucket=bucket, Key=dest)
upload_arg = {
"Body": format_postrun_json(prj).encode('utf-8'),
"Bucket": bucket,
"Key": dest,
"ACL": acl
}
if prj.config.encrypt_s3_upload:
upload_arg.update({"ServerSideEncryption": "aws:kms"})
s3.put_object(**upload_arg)
149 changes: 149 additions & 0 deletions tests/awsf3/postrunjson/GBPtlqb2rFGH.postrun.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
{
"Job": {
"App": {
"App_name": "workflow_dbSNP_ID_fixer-check",
"App_version": "v21",
"cwl_url": "https://raw.githubusercontent.com/dbmi-bgm/cgap-pipeline/v21/cwl",
"language": "cwl_v1",
"main_cwl": "workflow_parallel_dbSNP_ID_fixer_plus_vcf-integrity-check.cwl",
"other_cwl_files": "parallel_dbSNP_ID_fixer.cwl,vcf-integrity-check.cwl"
},
"Input": {
"Env": {},
"Input_files_data": {
"dbSNP_ref_vcf": {
"class": "File",
"dir": "elasticbeanstalk-fourfront-cgapwolf-files",
"mount": true,
"path": "aa542c8e-b31c-4cff-b2d4-aa4037bb913c/GAPFIF4JKLTH.vcf.gz",
"profile": "",
"rename": "",
"unzip": ""
},
"input_vcf": {
"class": "File",
"dir": "elasticbeanstalk-fourfront-cgapwolf-wfoutput",
"mount": true,
"path": "6b82a145-3dba-452e-9648-1bb81b0d7b1d/GAPFIF3KTGCP.vcf.gz",
"profile": "",
"rename": "",
"unzip": ""
},
"region_file": {
"class": "File",
"dir": "elasticbeanstalk-fourfront-cgapwolf-files",
"mount": true,
"path": "1c07a3aa-e2a3-498c-b838-15991c4a2f28/GAPFIBGEOI72.txt",
"profile": "",
"rename": "",
"unzip": ""
}
},
"Input_parameters": {},
"Secondary_files_data": {
"dbSNP_ref_vcf": {
"class": "File",
"dir": "elasticbeanstalk-fourfront-cgapwolf-files",
"mount": true,
"path": "aa542c8e-b31c-4cff-b2d4-aa4037bb913c/GAPFIF4JKLTH.vcf.gz.tbi",
"profile": "",
"rename": "",
"unzip": ""
},
"input_vcf": {
"class": "File",
"dir": "elasticbeanstalk-fourfront-cgapwolf-wfoutput",
"mount": true,
"path": "6b82a145-3dba-452e-9648-1bb81b0d7b1d/GAPFIF3KTGCP.vcf.gz.tbi",
"profile": "",
"rename": "",
"unzip": ""
}
}
},
"JOBID": "GBPtlqb2rFGH",
"Log": {
"log_bucket_directory": "tibanna-output"
},
"Output": {
"Output files": {
"vcf": {
"basename": "fixed_GAPFIF3KTGCP.vcf.gz",
"checksum": "sha1$c2a428e01f90887b335b215300ea00828f4a73a1",
"class": "File",
"location": "file:///mnt/data1/out/fixed_GAPFIF3KTGCP.vcf.gz",
"md5sum": "80e0c0fad26272a96ef95e9362ff7419",
"path": "/mnt/data1/out/fixed_GAPFIF3KTGCP.vcf.gz",
"secondaryFiles": [
{
"basename": "fixed_GAPFIF3KTGCP.vcf.gz.tbi",
"checksum": "sha1$920e56345b7e31ab44413d9c792b6b6324e7cdea",
"class": "File",
"location": "file:///mnt/data1/out/fixed_GAPFIF3KTGCP.vcf.gz.tbi",
"md5sum": "56c95ec070f143c565744527150e6101",
"path": "/mnt/data1/out/fixed_GAPFIF3KTGCP.vcf.gz.tbi",
"size": 2187181
}
],
"size": 1723376222
},
"vcf-check": {
"basename": "integrity_check",
"checksum": "sha1$8296f8ccff8a4fad99be5d3d71175134d84e2621",
"class": "File",
"location": "file:///mnt/data1/out/integrity_check",
"md5sum": "475bff046ae43cc36f4edb68f5a35350",
"path": "/mnt/data1/out/integrity_check",
"size": 14
}
},
"alt_cond_output_argnames": {},
"output_bucket_directory": "elasticbeanstalk-fourfront-cgapwolf-wfoutput",
"output_target": {
"vcf": "2a6b9f05-2287-4cb2-be82-3bf6eff5f7be/GAPFIE8ISBQK.vcf.gz",
"vcf-check": "f18573ed-7aee-49d8-94c0-5d8ffef1b313/vcf-check553202857276"
},
"secondary_output_target": {
"vcf": [
"2a6b9f05-2287-4cb2-be82-3bf6eff5f7be/GAPFIE8ISBQK.vcf.gz.tbi"
]
}
},
"filesystem": "",
"instance_availablity_zone": "",
"instance_id": "",
"start_time": "20210312-14:04:23-UTC"
},
"config": {
"EBS_optimized": true,
"ami_id": "ami-0a7ddfc7e412ab6e0",
"availability_zone": "",
"awsf_image": "4dndcic/tibanna-awsf:1.0.5",
"behavior_on_capacity_limit": "wait_and_retry",
"cloudwatch_dashboard": false,
"cpu": "",
"ebs_iops": "",
"ebs_size": 45,
"ebs_type": "gp3",
"email": false,
"encrypt_s3_upload": false,
"instance_type": "c5n.4xlarge",
"job_tag": "workflow_dbSNP_ID_fixer-check",
"json_bucket": "tibanna-output",
"key_name": "",
"language": "cwl_v1",
"log_bucket": "tibanna-output",
"mem": 0,
"overwrite_input_extra": false,
"password": "",
"public_postrun_json": false,
"root_ebs_size": 8,
"run_name": "run_workflow_dbSNP_ID_fixer-check-bcc2009d-f1b0-4629-824c-ae7d0565e256",
"script_url": "https://raw.githubusercontent.com/4dn-dcic/tibanna/master/awsf3/",
"security_group": "",
"shutdown_min": "now",
"spot_duration": "",
"spot_instance": true,
"subnet": ""
}
}
Loading

0 comments on commit 431c146

Please sign in to comment.