Skip to content

Commit

Permalink
Merge branch 'develop' for Release 0.1.9
Browse files Browse the repository at this point in the history
  • Loading branch information
pagrubel committed Sep 17, 2024
2 parents 6db4ab5 + a69459b commit bebe886
Show file tree
Hide file tree
Showing 122 changed files with 7,250 additions and 9,561 deletions.
3 changes: 2 additions & 1 deletion .github/workflows/docs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,12 @@ on:
push:
branches: [main, develop]
pull_request:
types: [opened, synchronize, edited]
types: [opened, synchronize, edited, labeled, unlabeled]
branches: [main, develop]

jobs:
docs:
if: ${{ !(contains(github.event.pull_request.labels.*.name, 'WIP (no-ci)')) && !(contains(github.event.pull_request.labels.*.name, 'WIP (lint-only)')) }}
name: Build Docs
runs-on: ubuntu-latest
steps:
Expand Down
6 changes: 3 additions & 3 deletions .github/workflows/integration.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,13 @@ env: {}

jobs:
integration-test:
if: ${{ !(contains(github.event.pull_request.labels.*.name, 'WIP') || contains(github.event.pull_request.labels.*.name, 'lint-only')) }}
if: ${{ !(contains(github.event.pull_request.labels.*.name, 'WIP (no-ci)')) && !(contains(github.event.pull_request.labels.*.name, 'WIP (lint-only)')) }}
name: BEE Integration Test
strategy:
matrix:
batch_scheduler: [Slurm, Flux]
bee_worker: [Slurmrestd, SlurmCommands, Flux]
env:
BATCH_SCHEDULER: ${{ matrix.batch_scheduler }}
BEE_WORKER: ${{ matrix.bee_worker }}
# Note: Needs to run on 22.04 or later since slurmrestd doesn't seem to be
# available on 20.04
runs-on: ubuntu-22.04
Expand Down
3 changes: 2 additions & 1 deletion .github/workflows/pylama.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,12 @@ on:
push:
branches: [main, develop]
pull_request:
types: [opened, synchronize, edited]
types: [opened, synchronize, edited, labeled, unlabeled]
branches: [main, develop]

jobs:
pylama:
if: ${{ !(contains(github.event.pull_request.labels.*.name, 'WIP (no-ci)')) }}
name: PyLama Lint
runs-on: ubuntu-latest
steps:
Expand Down
39 changes: 36 additions & 3 deletions .github/workflows/unit-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,11 @@ env: {}

jobs:
integration-test:
if: ${{ !(contains(github.event.pull_request.labels.*.name, 'WIP') || contains(github.event.pull_request.labels.*.name, 'lint-only')) }}
if: ${{ !(contains(github.event.pull_request.labels.*.name, 'WIP (no-ci)')) && !(contains(github.event.pull_request.labels.*.name, 'WIP (lint-only)')) }}
name: BEE Unit Tests
env:
# Unit tests are only run with Slurm right now
BATCH_SCHEDULER: Slurm
# Unit tests are only run with slurmrestd right now
BEE_WORKER: Slurmrestd
# Note: Needs to run on 22.04 or later since slurmrestd doesn't seem to be
# available on 20.04
runs-on: ubuntu-22.04
Expand All @@ -35,3 +35,36 @@ jobs:
run: |
. ./ci/env.sh
./ci/unit_tests.sh
- name: Coverage Badge
if: github.event_name == 'pull_request'
uses: tj-actions/coverage-badge-py@v2

- name: Verify Changed Files
if: github.event_name == 'pull_request'
uses: tj-actions/verify-changed-files@v20
id: verify-changed-files
with:
files: |
*.svg
- name: Commit files
if: steps.verify-changed-files.outputs.files_changed == 'true' && github.actor != 'github-actions[bot]' && github.event_name == 'pull_request'
run: |
git config --local user.email "github-actions[bot]@users.noreply.github.com"
git config --local user.name "github-actions[bot]"
git add coverage.svg
git commit -m "Updated coverage.svg"
- name: Fetch latest changes again
if: steps.verify-changed-files.outputs.files_changed == 'true' && github.actor != 'github-actions[bot]' && github.event_name == 'pull_request'
run: |
git fetch origin
git rebase origin/${{ github.event.pull_request.head.ref }}
- name: Push changes
if: steps.verify-changed-files.outputs.files_changed == 'true' && github.actor != 'github-actions[bot]' && github.event_name == 'pull_request'
uses: ad-m/github-push-action@master
with:
github_token: ${{ secrets.github_token }}
branch: ${{ github.event.pull_request.head.ref }}
14 changes: 13 additions & 1 deletion README.rst
Original file line number Diff line number Diff line change
@@ -1,7 +1,17 @@
BEE: Build and Execution Environment
************************************

BEE is a workflow orchestration system designed to build containerized HPC applications and orchestrate workflows across HPC and cloud systems. BEE has adopted the Common Workflow Language (`CWL <https://www.commonwl.org/>`_) for specifying workflows. Complex scientific workflows specified by CWL are managed and visualized through a graph database, giving the user the ability to monitor the state of each task in the workflow. BEE runs jobs using the workload scheduler (i.e. Slurm or LSF) on the HPC system that tasks are specified to run on.
Coverage in Develop

.. image:: https://github.com/lanl/BEE/raw/develop/coverage.svg
:alt: Coverage Badge for develop

Coverage in Main

.. image:: https://github.com/lanl/BEE/raw/main/coverage.svg
:alt: Coverage Badge for main

BEE is a workflow orchestration system designed to build containerized HPC applications and orchestrate workflows across HPC and cloud systems. BEE has adopted the Common Workflow Language (`CWL <https://www.commonwl.org/>`_) for specifying workflows. Complex scientific workflows specified by CWL are managed and visualized through a graph database, giving the user the ability to monitor the state of each task in the workflow. BEE runs jobs using the workload scheduler (i.e. Slurm or Flux) on the HPC system that tasks are specified to run on.

BEE workflows can be archived for provenance and reproducibility. BEE can orchestrate workflows with containerized applications or those built locally on a system. However, there are advantages to containerizing an application.

Expand Down Expand Up @@ -46,10 +56,12 @@ Contributors:
* Patricia Grubel - `pagrubel <https://github.com/pagrubel>`_
* Qiang Guan - `guanxyz <https://github.com/guanxyz>`_
* Ragini Gupta - `raginigupta6 <https://github.com/raginigupta6>`_
* Leah Howell - `Leahh02 <https://github.com/Leahh02>`_
* Andres Quan - `aquan9 <https://github.com/aquan9>`_
* Quincy Wofford - `qwofford <https://github.com/qwofford>`_
* Tim Randles - `trandles-lanl <https://github.com/trandles-lanl>`_
* Jacob Tronge - `jtronge <https://github.com/jtronge>`_
* Kabir Vats - `kabir-vats <https://github.com/kabir-vats>`_

Concept and Design Contributors

Expand Down
12 changes: 10 additions & 2 deletions RELEASE.rst
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@ Verify all current changes in develop run correctly on nightly tests.
5. Once merged, on github web interface create a release and tag based on main branch
that matches the version in pyproject.toml
6. Follow step 2 but uncheck Allow specified actors to bypass and don't forget save
7. Log into your PYPI account and get a token for hpc-beeflow.
7. Log into your PYPI account and get a token for hpc-beeflow via:

> Your projects > hpc-beeflow > Manage > Settings > Create a token

8. Finally, on the command line: checkout the main branch and make sure you pull the latest verison

Then publish by:
Expand All @@ -28,12 +31,17 @@ Verify all current changes in develop run correctly on nightly tests.


Check the documentation at: `https://lanl.github.io/BEE/ <https://lanl.github.io/BEE/>`_

Also upgrade the pip version in your python or anaconda environment and check the version:

`` pip install --upgrade pip``

`` pip install --upgrade hpc-beeflow``

**WARNING**: Once a version is pushed to PyPI, it cannot be undone. You can
'delete' the version from the package settings, but you can no longer publish
an update to that same version.

8. After the version is published change the version in develop to a pre-release of the next version
(example new version will be 0.1.x edit pyproject.toml version to be 0.1.xrc1
(example new version will be 0.1.x edit pyproject.toml version to be 0.1.Xdev

104 changes: 78 additions & 26 deletions beeflow/client/bee_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,15 @@
import pathlib
import shutil
import subprocess
import tarfile
import tempfile
import textwrap
import time
import importlib.metadata
import jsonpickle
import requests
import typer
import yaml

from beeflow.common import config_driver
from beeflow.common.cli import NaturalOrderGroup
Expand All @@ -39,6 +41,7 @@


logging.basicConfig(level=logging.WARNING)
logging.getLogger("neo4j").setLevel(logging.WARNING)
WORKFLOW_MANAGER = 'bee_wfm/v1/jobs/'


Expand Down Expand Up @@ -192,7 +195,7 @@ def submit(wf_name: str = typer.Argument(..., help='the workflow name'), # pyli
main_cwl: str = typer.Argument(...,
help='filename of main CWL (if using CWL tarball), '
+ 'path of main CWL (if using CWL directory)'),
yaml: str = typer.Argument(...,
yaml_file: str = typer.Argument(...,
help='filename of yaml file (if using CWL tarball), '
+ 'path of yaml file (if using CWL directory)'),
workdir: pathlib.Path = typer.Argument(...,
Expand All @@ -206,40 +209,43 @@ def is_parent(parent, path):
path = os.path.abspath(path)
return os.path.commonpath([parent]) == os.path.commonpath([parent, path])

wf_path = wf_path.resolve()
workdir = workdir.resolve()

tarball_path = ""
if os.path.exists(wf_path):
# Check to see if the wf_path is a tarball or a directory. Package if directory
if os.path.isdir(wf_path):
print("Detected directory instead of packaged workflow. Packaging Directory...")
main_cwl_path = pathlib.Path(main_cwl).resolve()
yaml_path = pathlib.Path(yaml).resolve()
yaml_path = pathlib.Path(yaml_file).resolve()

if not main_cwl_path.exists():
error_exit(f'Main CWL file {main_cwl} does not exist')
if not yaml_path.exists():
error_exit(f'YAML file {yaml} does not exist')
error_exit(f'YAML file {yaml_file} does not exist')

# Packaging in temp dir, after copying alternate cwl_main or yaml file
cwl_indir = is_parent(wf_path, main_cwl_path)
yaml_indir = is_parent(wf_path, yaml_path)

# Always create temp dir for the workflow
tempdir_path = pathlib.Path(tempfile.mkdtemp())
if cwl_indir and yaml_indir:
package_path = package(wf_path, tempdir_path)
else:
tempdir_wf_path = pathlib.Path(tempdir_path / wf_path.name)
shutil.copytree(wf_path, tempdir_wf_path, dirs_exist_ok=False)
if not cwl_indir:
shutil.copy2(main_cwl, tempdir_wf_path)
if not yaml_indir:
shutil.copy2(yaml, tempdir_wf_path)
package_path = package(tempdir_wf_path, tempdir_path)
tempdir_wf_path = pathlib.Path(tempdir_path / wf_name)
shutil.copytree(wf_path, tempdir_wf_path, dirs_exist_ok=False)
if not cwl_indir:
shutil.copy2(main_cwl, tempdir_wf_path)
if not yaml_indir:
shutil.copy2(yaml_file, tempdir_wf_path)
package_path = package(tempdir_wf_path, tempdir_path)
else:
package_path = wf_path

# Untar and parse workflow
untar_path = pathlib.Path(tempfile.mkdtemp())
untar_wf_path = unpackage(package_path, untar_path)
main_cwl_path = untar_wf_path / pathlib.Path(main_cwl).name
yaml_path = untar_wf_path / pathlib.Path(yaml).name
yaml_path = untar_wf_path / pathlib.Path(yaml_file).name
parser = CwlParser()
workflow_id = generate_workflow_id()
workflow, tasks = parser.parse_workflow(workflow_id, str(main_cwl_path),
Expand All @@ -259,6 +265,12 @@ def is_parent(parent, path):
if not os.path.exists(workdir):
error_exit(f"Workflow working directory \"{workdir}\" doesn't exist")

# Make sure the workdir is not in /var or /var/tmp
if os.path.commonpath([os.path.realpath('/tmp'), workdir]) == os.path.realpath('/tmp'):
error_exit("Workflow working directory cannot be in \"/tmp\"")
if os.path.commonpath([os.path.realpath('/var/tmp'), workdir]) == os.path.realpath('/var/tmp'):
error_exit("Workflow working directory cannot be in \"/var/tmp\"")

# TODO: Can all of this information be sent as a file?
data = {
'wf_name': wf_name.encode(),
Expand Down Expand Up @@ -297,16 +309,19 @@ def is_parent(parent, path):

# Store provided arguments in text file for future reference
wf_dir = wf_utils.get_workflow_dir(wf_id)
sub_wf_dir = wf_dir + "/submit_command_args.txt"
sub_wf_dir = wf_dir + "/submit_command_args.yaml"

f_name = open(sub_wf_dir, "w", encoding="utf-8")
f_name.write(f"wf_name: {wf_name}\n")
f_name.write(f"wf_path: {wf_path}\n")
f_name.write(f"main_cwl: {main_cwl}\n")
f_name.write(f"yaml: {yaml}\n")
f_name.write(f"workdir: {workdir}\n")
f_name.write(f"wf_id: {wf_id}")
f_name.close()
cmd = {
'wf_name': wf_name,
'wf_path': str(wf_path),
'main_cwl': main_cwl,
'yaml': yaml_file,
'workdir': workdir,
'wf_id': wf_id
}

with open(sub_wf_dir, "w", encoding='utf-8') as command_file:
yaml.dump(cmd, command_file)

return wf_id

Expand Down Expand Up @@ -374,7 +389,7 @@ def remove(wf_id: str = typer.Argument(..., callback=match_short_id)):

wf_status = get_wf_status(wf_id)
print(f"Workflow Status is {wf_status}")
if wf_status in ('Cancelled', 'Archived', 'Paused'):
if wf_status in ('Cancelled', 'Paused') or 'Archived' in wf_status:
verify = f"All stored information for workflow {_short_id(wf_id)} will be removed."
verify += "\nContinue to remove? yes(y)/no(n): """
response = input(verify)
Expand Down Expand Up @@ -450,7 +465,10 @@ def query(wf_id: str = typer.Argument(..., callback=match_short_id)):
wf_status = resp.json()['wf_status']
typer.echo(wf_status)
for _task_id, task_name, task_state in tasks_status:
typer.echo(f'{task_name}--{task_state}')
if wf_status == 'No Start':
typer.echo(f'{task_name}')
else:
typer.echo(f'{task_name}--{task_state}')

logging.info('Query workflow: {resp.text}')
return wf_status, tasks_status
Expand Down Expand Up @@ -511,7 +529,7 @@ def cancel(wf_id: str = typer.Argument(..., callback=match_short_id)):
"""Cancel a paused or running workflow."""
long_wf_id = wf_id
wf_status = get_wf_status(wf_id)
if wf_status in ('Running', 'Paused'):
if wf_status in ('Running', 'Paused', 'No Start'):
try:
conn = _wfm_conn()
resp = conn.delete(_resource(long_wf_id), json={'option': 'cancel'}, timeout=60)
Expand Down Expand Up @@ -558,6 +576,32 @@ def reexecute(wf_name: str = typer.Argument(..., help='The workflow name'),
else:
error_exit(f'Workflow tarball {wf_path} cannot be found')

# Make sure the workdir is an absolute path and exists
workdir = os.path.expanduser(workdir)
workdir = os.path.abspath(workdir)
if not os.path.exists(workdir):
error_exit(f"Workflow working directory \"{workdir}\" doesn't exist")
cwl_path = pathlib.Path(tempfile.mkdtemp())
archive_id = str(wf_path.stem)
with tarfile.open(wf_path) as archive:
archive_cmd = yaml.load(archive.extractfile(
str(pathlib.Path(archive_id) / 'submit_command_args.yaml')).read(),
Loader=yaml.Loader)

cwl_files = [
tarinfo for tarinfo in archive.getmembers()
if tarinfo.name.startswith(archive_id + '/cwl_files/')
and tarinfo.isreg()
]
for path in cwl_files:
path.name = os.path.basename(path.name)
archive.extractall(path=cwl_path, members=cwl_files)

main_cwl = cwl_path / pathlib.Path(archive_cmd['main_cwl']).name
yaml_file = cwl_path / pathlib.Path(archive_cmd['yaml']).name

return submit(wf_name, pathlib.Path(cwl_path), main_cwl, yaml_file, pathlib.Path(workdir))

data = {
'wf_filename': os.path.basename(wf_path).encode(),
'wf_name': wf_name.encode(),
Expand All @@ -580,6 +624,14 @@ def reexecute(wf_name: str = typer.Argument(..., help='The workflow name'),
return wf_id


@app.command()
def dag(wf_id: str = typer.Argument(..., callback=match_short_id)):
"""Export a DAG of the workflow to a GraphML file."""
wf_utils.export_dag(wf_id)
typer.secho(f"DAG for workflow {_short_id(wf_id)} has been exported successfully.",
fg=typer.colors.GREEN)


@app.callback(invoke_without_command=True)
def version_callback(version: bool = False):
"""Beeflow."""
Expand Down
Loading

0 comments on commit bebe886

Please sign in to comment.