Skip to content

Commit

Permalink
Merge branch 'develop' for release 0.1.6
Browse files Browse the repository at this point in the history
  • Loading branch information
pagrubel committed Dec 8, 2023
2 parents e6ac1d2 + 577faa1 commit e3be834
Show file tree
Hide file tree
Showing 36 changed files with 939 additions and 1,574 deletions.
76 changes: 39 additions & 37 deletions beeflow/client/bee_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import tempfile
import textwrap
import time
import importlib.metadata
import jsonpickle
import requests
import typer
Expand Down Expand Up @@ -174,8 +175,12 @@ def match_short_id(wf_id):
@app.command()
def submit(wf_name: str = typer.Argument(..., help='the workflow name'), # pylint:disable=R0915
wf_path: pathlib.Path = typer.Argument(..., help='path to the workflow .tgz or dir'),
main_cwl: str = typer.Argument(..., help='filename of main CWL file'),
yaml: str = typer.Argument(..., help='filename of YAML file'),
main_cwl: str = typer.Argument(...,
help='filename of main CWL (if using CWL tarball), '
+ 'path of main CWL (if using CWL directory)'),
yaml: str = typer.Argument(...,
help='filename of yaml file (if using CWL tarball), '
+ 'path of yaml file (if using CWL directory)'),
workdir: pathlib.Path = typer.Argument(...,
help='working directory for workflow containing input + output files',),
no_start: bool = typer.Option(False, '--no-start', '-n',
Expand All @@ -189,7 +194,7 @@ def is_parent(parent, path):

tarball_path = ""
if os.path.exists(wf_path):
# Check to see if the wf_path is a tarball or a directory. Run package() if directory
# Check to see if the wf_path is a tarball or a directory. Package if directory
if os.path.isdir(wf_path):
print("Detected directory instead of packaged workflow. Packaging Directory...")
main_cwl_path = pathlib.Path(main_cwl).resolve()
Expand All @@ -200,45 +205,37 @@ def is_parent(parent, path):
if not yaml_path.exists():
error_exit(f'YAML file {yaml} does not exist')

# Parse workflow
parser = CwlParser()
workflow_id = generate_workflow_id()
workflow, tasks = parser.parse_workflow(workflow_id, str(main_cwl_path),
job=str(yaml_path))
tasks = [jsonpickle.encode(task) for task in tasks]

# Packaging in temp dir, after copying alternate cwl_main or yaml file
cwl_indir = is_parent(wf_path, main_cwl_path)
yaml_indir = is_parent(wf_path, yaml_path)
# The CWL and YAML file are already in the workflow directory
# so we don't need to do anything
tempdir_path = pathlib.Path(tempfile.mkdtemp())
if cwl_indir and yaml_indir:
package_path = package(wf_path, tempdir_path)
else:
# Create a temp wf directory
tempdir_wf_path = pathlib.Path(tempdir_path / wf_path.name)
shutil.copytree(wf_path, tempdir_wf_path, dirs_exist_ok=False)
if not cwl_indir:
shutil.copy2(main_cwl, tempdir_wf_path)
if not yaml_indir:
shutil.copy2(yaml, tempdir_wf_path)
package_path = package(tempdir_wf_path, tempdir_path)
wf_tarball = open(package_path, 'rb')
shutil.rmtree(tempdir_path)
else:
# Untar and parse workflow
tempdir_path = pathlib.Path(tempfile.mkdtemp())
tempdir_wf_path = unpackage(wf_path, tempdir_path)
main_cwl_path = pathlib.Path(tempdir_wf_path / main_cwl).resolve()
yaml_path = pathlib.Path(tempdir_wf_path / yaml).resolve()

parser = CwlParser()
workflow_id = generate_workflow_id()
workflow, tasks = parser.parse_workflow(workflow_id, str(main_cwl_path),
job=str(yaml_path))

package_path = wf_path
# Untar and parse workflow
untar_path = pathlib.Path(tempfile.mkdtemp())
untar_wf_path = unpackage(package_path, untar_path)
main_cwl_path = untar_wf_path / pathlib.Path(main_cwl).name
yaml_path = untar_wf_path / pathlib.Path(yaml).name
parser = CwlParser()
workflow_id = generate_workflow_id()
workflow, tasks = parser.parse_workflow(workflow_id, str(main_cwl_path),
job=str(yaml_path))
tasks = [jsonpickle.encode(task) for task in tasks]

wf_tarball = open(package_path, 'rb')
shutil.rmtree(untar_path)
if os.path.isdir(wf_path):
shutil.rmtree(tempdir_path)
wf_tarball = open(wf_path, 'rb')
else:
error_exit(f'Workflow tarball {wf_path} cannot be found')

Expand All @@ -254,7 +251,8 @@ def is_parent(parent, path):
'wf_filename': os.path.basename(wf_path).encode(),
'workdir': workdir,
'workflow': jsonpickle.encode(workflow),
'tasks': jsonpickle.encode(tasks, warn=True)
'tasks': jsonpickle.encode(tasks, warn=True),
'no_start': no_start,
}
files = {
'workflow_archive': wf_tarball
Expand Down Expand Up @@ -283,10 +281,6 @@ def is_parent(parent, path):
if tarball_path:
os.remove(tarball_path)

# Start the workflow
if not no_start:
start(wf_id)

return wf_id


Expand Down Expand Up @@ -354,7 +348,7 @@ def unpackage(package_path, dest_path):
if not package_str.endswith('.tgz'):
# No cleanup, maybe we should rm dest_path?
error_exit("Invalid package name, please use the beeflow package command")
wf_dir = package_str[:-4]
wf_dir = pathlib.Path(package_path).stem

return_code = subprocess.run(['tar', '-C', dest_path, '-xf', package_path],
check=True).returncode
Expand All @@ -363,14 +357,12 @@ def unpackage(package_path, dest_path):
error_exit("Unpackage failed")
else:
print(f"Package {package_str} unpackaged successfully")


return dest_path/wf_dir # noqa: Not an arithmetic operation
return pathlib.Path(dest_path / wf_dir)


@app.command('list')
def list_workflows():
"""List all worklfows."""
"""List all workflows."""
try:
conn = _wfm_conn()
resp = conn.get(_url(), timeout=60)
Expand Down Expand Up @@ -536,6 +528,16 @@ def reexecute(wf_name: str = typer.Argument(..., help='The workflow name'),
return wf_id


@app.callback(invoke_without_command=True)
def version_callback(version: bool = False):
"""Beeflow."""
# Print out the current version of the app, and then exit
# Note above docstring gets used in the help menu
if version:
version = importlib.metadata.version("hpc-beeflow")
print(version)


def main():
"""Execute bee_client."""
global _INTERACTIVE
Expand Down
Loading

0 comments on commit e3be834

Please sign in to comment.