Skip to content

Commit

Permalink
Merge pull request #112 from 4dn-dcic/lighter_unicorn
Browse files Browse the repository at this point in the history
Lighter unicorn
  • Loading branch information
SooLee authored Jul 22, 2018
2 parents bad2c15 + 88db39a commit a6c63a0
Show file tree
Hide file tree
Showing 25 changed files with 1,055 additions and 986 deletions.
1 change: 1 addition & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ python:
install:
- pip install six
- pip install -U -r requirements.txt
- pip install -U -r requirements-4dn.txt
- pip install codacy-coverage
script:
- invoke test
Expand Down
23 changes: 12 additions & 11 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ Tibanna has been evolving: originally developed for Desktop workflow submitter t
* Python 2.7
* Pip 9.0.3 / 10.0.1
* The other dependencies are listed in `requirements.txt` and are auto-installed in the following steps.
* If you are 4DN-DCIC user, use the dependencies specified in `requirements-4dn.txt`. These include all the base requirements in `requirements.txt`, as well as other 4DN-specific pakages.

### Admin
As admin, you need to first set up Tibanna environment on your AWS account and create a usergroup with a shared permission to the environment.
Expand All @@ -52,7 +53,7 @@ python -m pip install pip==9.0.3 # or curl https://bootstrap.pypa.io/get-pip.py
git clone https://github.com/4dn-dcic/tibanna
cd tibanna
pip install -r requirements.txt
pip install -r requirements.txt # if you're 4dn-dcic, use requirements-4dn.txt instead
```

Set up `awscli`: for more details see https://github.com/4dn-dcic/tibanna/blob/master/tutorials/tibanna_unicorn.md#set-up-aws-cli
Expand Down Expand Up @@ -89,11 +90,11 @@ Tibanna usergroup default_6206 has been created on AWS.
Then, deploy tibanna (unicorn) to your aws account for a specific user group (for more details about tibanna deployment, see below)
* Note: you can only use unicorn (the core with no communication with 4DN portal). Pony is reserved for 4DN-DCIC.
```
invoke deploy_tibanna --usergroup=<usergroup> --no-tests --sfn-type=unicorn
invoke deploy_tibanna --usergroup=<usergroup> --sfn-type=unicorn
```
As an exmple,
```
invoke deploy_tibanna --usergroup=default_6206 --no-tests --sfn-type=unicorn
invoke deploy_tibanna --usergroup=default_6206 --sfn-type=unicorn
```

To run a workflow on the tibanna (unicorn) deployed for the usergroup (for more details about running workflows, see below),
Expand Down Expand Up @@ -147,7 +148,7 @@ SECRET # aws secret key

To create a copy of tibanna (step function + lambdas)
```
invoke deploy_tibanna [--suffix=<suffixname>] [--sfn_type=<sfn_type>] [--no-tests]
invoke deploy_tibanna [--suffix=<suffixname>] [--sfn_type=<sfn_type>] [--tests]
# (use suffix for development version)
# example <suffixname> : dev
# <sfn_type> (step function type) is either 'pony' or 'unicorn' (default pony)
Expand All @@ -162,7 +163,7 @@ The above command will create a step function named `tibanna_pony_dev2` that use
```
invoke deploy_tibanna --suffix=dev --sfn_type=unicorn
```
This example creates a step function named `tibanna_unicorn_dev` that uses a set of lambdas with suffix `_dev`, and deploys these lambdas.
This example creates a step function named `tibanna_unicorn_dev` that uses a set of lambdas with suffix `_dev`, and deploys these lambdas. Using the `--tests` argument will ensure tests pass befor deploying; currently this is **NOT** available for users outside of 4DN-DCIC.

To deploy lambda functions (use suffix for development version lambdas)
```
Expand Down Expand Up @@ -258,16 +259,16 @@ cat webdevtestlist | xargs -I{} sh -c "invoke run_workflow --workflow=tibanna_po
"uuid": "1f53df95-4cf3-41cc-971d-81bb16c486dd",
"bucket_name": "elasticbeanstalk-fourfront-webdev-files"
},
{
"workflow_argument_name": "fastq1",
{
"workflow_argument_name": "fastq1",
"bucket_name": "elasticbeanstalk-fourfront-webdev-files",
"uuid": "1150b428-272b-4a0c-b3e6-4b405c148f7c",
"uuid": "1150b428-272b-4a0c-b3e6-4b405c148f7c",
"object_key": "4DNFIVOZN511.fastq.gz"
},
{
"workflow_argument_name": "fastq2",
{
"workflow_argument_name": "fastq2",
"bucket_name": "elasticbeanstalk-fourfront-webdev-files",
"uuid": "f4864029-a8ad-4bb8-93e7-5108f462ccaa",
"uuid": "f4864029-a8ad-4bb8-93e7-5108f462ccaa",
"object_key": "4DNFIRSRJH45.fastq.gz"
}
],
Expand Down
2 changes: 1 addition & 1 deletion core/_version.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
"""Version information."""

# The following line *must* be the last in the module, exactly as formatted:
__version__ = "0.3.7"
__version__ = "0.3.8"
23 changes: 16 additions & 7 deletions core/check_task_awsem/service.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,30 @@
# -*- coding: utf-8 -*-

from dcicutils import s3_utils
import logging
import boto3
from core.utils import (
StillRunningException,
EC2StartingException,
AWSEMJobErrorException,
powerup
)
import json
from core.ec2_utils import does_key_exist

LOG = logging.getLogger(__name__)


def metadata_only(event):
event.update({'postrunjson': 'metadata_only'})
return event


def read_s3(bucket, object_name):
response = boto3.client('s3').get_object(Bucket=bucket, Key=object_name)
LOG.info(str(response))
return response['Body'].read()


@powerup('check_task_awsem', metadata_only)
def handler(event, context):
'''
Expand All @@ -24,7 +34,6 @@ def handler(event, context):

# s3 bucket that stores the output
bucket_name = event['config']['log_bucket']
s3 = s3_utils.s3Utils(bucket_name, bucket_name, bucket_name)

# info about the jobby job
jobid = event['jobid']
Expand All @@ -37,18 +46,18 @@ def handler(event, context):
postrunjson_location = "https://s3.amazonaws.com/%s/%s" % (bucket_name, postrunjson)

# check to see ensure this job has started else fail
if not s3.does_key_exist(job_started):
if not does_key_exist(bucket_name, job_started):
raise EC2StartingException("Failed to find jobid %s, ec2 is probably still booting" % jobid)

# check to see if job has error, report if so
if s3.does_key_exist(job_error):
if does_key_exist(bucket_name, job_error):
raise AWSEMJobErrorException("Job encountered an error check log at %s" % job_log_location)

# check to see if job has completed if not throw retry error
if s3.does_key_exist(job_success):
if not s3.does_key_exist(postrunjson):
if does_key_exist(bucket_name, job_success):
if not does_key_exist(bucket_name, postrunjson):
raise Exception("Postrun json not found at %s" % postrunjson_location)
postrunjsoncontent = json.loads(s3.read_s3(postrunjson))
postrunjsoncontent = json.loads(read_s3(bucket_name, postrunjson))
if len(str(postrunjsoncontent)) + len(str(event)) < RESPONSE_JSON_CONTENT_INCLUSION_LIMIT:
event['postrunjson'] = postrunjsoncontent
else:
Expand Down
126 changes: 25 additions & 101 deletions core/ec2_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
import subprocess
import logging
# from invoke import run
from dcicutils import s3_utils
import botocore.session
import boto3
from Benchmark import run as B
Expand Down Expand Up @@ -274,17 +273,16 @@ def update_config(config, app_name, input_files, parameters):
input_size_in_bytes = dict()
for argname, f in input_files.iteritems():
bucket = f['bucket_name']
s3 = s3_utils.s3Utils(bucket, bucket, bucket)
if isinstance(f['object_key'], list):
size = []
for key in f['object_key']:
try:
size.append(s3.get_file_size(key, bucket))
size.append(get_file_size(key, bucket))
except:
raise Exception("Can't get input file size")
else:
try:
size = s3.get_file_size(f['object_key'], bucket)
size = get_file_size(f['object_key'], bucket)
except:
raise Exception("Can't get input file size")
input_size_in_bytes.update({str(argname): size})
Expand Down Expand Up @@ -320,100 +318,26 @@ def update_config(config, app_name, input_files, parameters):
raise Exception("EBS_optimized cannot be determined nor given")


class WorkflowFile(object):

def __init__(self, bucket, key, runner, accession=None, output_type=None,
filesize=None, md5=None):
self.bucket = bucket
self.key = key
self.s3 = s3_utils.s3Utils(self.bucket, self.bucket, self.bucket)
self.runner = runner
self.accession = accession
self.output_type = output_type
self.filesize = filesize
self.md5 = md5

@property
def status(self):
exists = self.s3.does_key_exist(self.key, self.bucket)
if exists:
return "COMPLETED"
else:
return "FAILED"

def read(self):
return self.s3.read_s3(self.key).strip()


# TODO: refactor this to inherit from an abstrat class called Runner
# then implement for SBG as well
class Awsem(object):

def __init__(self, json):
self.args = json['args']
self.config = json['config']
self.output_s3 = self.args['output_S3_bucket']
self.app_name = self.args['app_name']
self.output_files_meta = json['ff_meta']['output_files']
self.output_info = None
if isinstance(json.get('postrunjson'), dict):
self.output_info = json['postrunjson']['Job']['Output']['Output files']

def output_files(self):
files = dict()
output_types = dict()
for x in self.output_files_meta:
output_types[x['workflow_argument_name']] = x['type']
for k, v in self.args.get('output_target').iteritems():
if k in output_types:
out_type = output_types[k]
else:
out_type = None
if out_type == 'Output processed file':
file_name = v.split('/')[-1]
accession = file_name.split('.')[0].strip('/')
else:
accession = None
if self.output_info:
md5 = self.output_info[k].get('md5sum', '')
filesize = self.output_info[k].get('size', 0)
wff = {k: WorkflowFile(self.output_s3, v, self, accession,
output_type=out_type, filesize=filesize, md5=md5)}
else:
wff = {k: WorkflowFile(self.output_s3, v, self, accession,
output_type=out_type)}
files.update(wff)
return files

def secondary_output_files(self):
files = dict()
for k, v in self.args.get('secondary_output_target').iteritems():
wff = {k: WorkflowFile(self.output_s3, v, self)}
files.update(wff)
return files

def input_files(self):
files = dict()
for arg_name, item in self.args.get('input_files').iteritems():
file_name = item.get('object_key').split('/')[-1]
accession = file_name.split('.')[0].strip('/')
wff = {arg_name: WorkflowFile(item.get('bucket_name'),
item.get('object_key'),
self,
accession)}
files.update(wff)
return files

def all_files(self):
files = dict()
files.update(self.input_files())
files.update(self.output_files())
return files

@property
def inputfile_accessions(self):
return {k: v.accession for k, v in self.input_files().iteritems()}

@property
def all_file_accessions(self):
return {k: v.accession for k, v in self.all_files().iteritems()}
def get_file_size(key, bucket, size_in_gb=False):
'''
default returns file size in bytes,
unless size_in_gb = True
'''
meta = does_key_exist(bucket, key)
if not meta:
raise Exception("key not found")
one_gb = 1073741824
size = meta['ContentLength']
if size_in_gb:
size = size / one_gb
return size


def does_key_exist(bucket, object_name):
try:
file_metadata = boto3.client('s3').head_object(Bucket=bucket, Key=object_name)
except Exception as e:
print("object %s not found on bucket %s" % (str(object_name), str(bucket)))
print(str(e))
return False
return file_metadata
Loading

0 comments on commit a6c63a0

Please sign in to comment.