Skip to content

Commit

Permalink
DAG Paths (#931)
Browse files Browse the repository at this point in the history
* Add  custom DAG locations and versioning, support for saving final dag png and graphmls in workflow archive

* add the apoc install recipe and updated bee_install.sh for ci integration tests

---------

Co-authored-by: leahh <leahh@lanl.gov>
  • Loading branch information
Leahh02 and leahh authored Oct 7, 2024
1 parent cd51dc2 commit b48c061
Show file tree
Hide file tree
Showing 12 changed files with 95 additions and 32 deletions.
30 changes: 28 additions & 2 deletions beeflow/client/bee_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -625,9 +625,35 @@ def reexecute(wf_name: str = typer.Argument(..., help='The workflow name'),


@app.command()
def dag(wf_id: str = typer.Argument(..., callback=match_short_id)):
def dag(wf_id: str = typer.Argument(..., callback=match_short_id),
output_dir: pathlib.Path = typer.Argument(...,
help='Path to the where the dag output will be'),
no_dag_dir: bool = typer.Option(False, '--no-dag-dir',
help='do not make a subdirectory within ouput_dir for the dags')):
"""Export a DAG of the workflow to a GraphML file."""
wf_utils.export_dag(wf_id)
output_dir = output_dir.resolve()
# Make sure output_dir is an absolute path and exists
output_dir = os.path.expanduser(output_dir)
output_dir = os.path.abspath(output_dir)
if not os.path.exists(output_dir):
error_exit(f"Path for dag directory \"{output_dir}\" doesn't exist")

# output_dir must be a string
output_dir = str(output_dir)
# Check if the workflow is archived
wf_status = get_wf_status(wf_id)
if wf_status == 'Archived':
bee_workdir = wf_utils.get_bee_workdir()
mount_dir = os.path.join(bee_workdir, 'gdb_mount')
graphmls_dir = mount_dir + '/graphmls'
typer.secho("Workflow has been archived. All new DAGs will look the same as the one "
"in the archive directory.",
fg=typer.colors.MAGENTA)
else:
wf_dir = wf_utils.get_workflow_dir(wf_id)
graphmls_dir = wf_dir + '/graphmls'
os.makedirs(graphmls_dir, exist_ok=True)
wf_utils.export_dag(wf_id, output_dir, graphmls_dir, no_dag_dir)
typer.secho(f"DAG for workflow {_short_id(wf_id)} has been exported successfully.",
fg=typer.colors.GREEN)

Expand Down
4 changes: 1 addition & 3 deletions beeflow/common/deps/neo4j_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,7 @@
run_dir = mount_dir + '/run'
certs_dir = mount_dir + '/certificates'
confs_dir = mount_dir + "/conf"
dags_dir = os.path.join(bee_workdir, 'dags')
graphmls_dir = dags_dir + "/graphmls"
graphmls_dir = mount_dir + '/graphmls'
container_path = container_manager.get_container_dir('neo4j')
log = bee_logging.setup('neo4j')

Expand Down Expand Up @@ -61,7 +60,6 @@ def setup_mounts():
os.makedirs(mount_dir, exist_ok=True)
os.makedirs(certs_dir, exist_ok=True)
os.makedirs(run_dir, exist_ok=True)
os.makedirs(dags_dir, exist_ok=True)
os.makedirs(graphmls_dir, exist_ok=True)


Expand Down
27 changes: 18 additions & 9 deletions beeflow/common/gdb/generate_graph.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,30 @@
"""Module to make a png of a graph from a graphml file."""

import os
import shutil
import networkx as nx
import graphviz

from beeflow.common import paths

bee_workdir = paths.workdir()
dags_dir = os.path.join(bee_workdir, 'dags')
graphmls_dir = dags_dir + "/graphmls"


def generate_viz(wf_id):
def generate_viz(wf_id, output_dir, graphmls_dir, no_dag_dir):
"""Generate a PNG of a workflow graph from a GraphML file."""
short_id = wf_id[:6]
graphml_path = graphmls_dir + "/" + short_id + ".graphml"
output_path = dags_dir + "/" + short_id

if no_dag_dir:
dags_dir = output_dir
else:
dags_dir = output_dir + "/" + short_id + "-dags"
os.makedirs(dags_dir, exist_ok=True)

output_path = dags_dir + "/" + short_id + ".png"
if os.path.exists(output_path):
i = 1
backup_path = f'{dags_dir}/{short_id}_v{i}.png'
while os.path.exists(backup_path):
i += 1
backup_path = f'{dags_dir}/{short_id}_v{i}.png'
shutil.copy(output_path, backup_path)

# Load the GraphML file using NetworkX
graph = nx.read_graphml(graphml_path)
Expand All @@ -29,7 +38,7 @@ def generate_viz(wf_id):

# Render the graph and save as PNG
png_data = dot.pipe(format='png')
with open(output_path + ".png", "wb") as png_file:
with open(output_path, "wb") as png_file:
png_file.write(png_data)


Expand Down
24 changes: 17 additions & 7 deletions beeflow/common/gdb/graphml_key_updater.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,13 @@

import xml.etree.ElementTree as ET
import os
import shutil

from beeflow.common import paths

bee_workdir = paths.workdir()
dags_dir = os.path.join(bee_workdir, 'dags')
graphmls_dir = dags_dir + "/graphmls"
mount_dir = os.path.join(bee_workdir, 'gdb_mount')
gdb_graphmls_dir = mount_dir + '/graphmls'

expected_keys = {"id", "name", "state", "class", "type", "value", "source",
"workflow_id", "base_command", "stdout", "stderr", "default",
Expand All @@ -33,12 +34,21 @@
}


def update_graphml(wf_id):
def update_graphml(wf_id, graphmls_dir):
"""Update GraphML file by ensuring required keys are present and updating its structure."""
short_id = wf_id[:6]
graphml_path = graphmls_dir + "/" + short_id + ".graphml"
gdb_graphml_path = gdb_graphmls_dir + "/" + short_id + ".graphml"
output_graphml_path = graphmls_dir + "/" + short_id + ".graphml"
# Handle making multiple versions of the graphmls without overriding old ones
if os.path.exists(output_graphml_path):
i = 1
backup_path = f'{graphmls_dir}/{short_id}_v{i}.graphml'
while os.path.exists(backup_path):
i += 1
backup_path = f'{graphmls_dir}/{short_id}_v{i}.graphml'
shutil.copy(output_graphml_path, backup_path)
# Parse the GraphML file and preserve namespaces
tree = ET.parse(graphml_path)
tree = ET.parse(gdb_graphml_path)
root = tree.getroot()

name_space = {'graphml': 'http://graphml.graphdrawing.org/xmlns'}
Expand All @@ -56,5 +66,5 @@ def update_graphml(wf_id):
**default_def)
root.insert(0, key_element)

# Save the updated GraphML file by overwriting the original one
tree.write(graphml_path, encoding='UTF-8', xml_declaration=True)
# Save the updated GraphML file
tree.write(output_graphml_path, encoding='UTF-8', xml_declaration=True)
2 changes: 2 additions & 0 deletions beeflow/common/integration_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import os
import shutil
import sys
import time
import uuid
import typer

Expand Down Expand Up @@ -293,6 +294,7 @@ def main(tests = typer.Option(None, '--tests', '-t', # noqa (conflict on '=' si
timeout: int = typer.Option(utils.TIMEOUT, '--timeout',
help='workflow timeout in seconds')):
"""Launch the integration tests."""
time.sleep(60)
if show_tests:
print('INTEGRATION TEST CASES:')
for test_name, ignore in TEST_RUNNER.test_details():
Expand Down
4 changes: 4 additions & 0 deletions beeflow/wf_manager/resources/wf_update.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ def archive_workflow(db, wf_id, final_state=None):
workflow_dir = wf_utils.get_workflow_dir(wf_id)
shutil.copyfile(os.path.expanduser("~") + '/.config/beeflow/bee.conf',
workflow_dir + '/' + 'bee.conf')
# Archive Completed DAG
graphmls_dir = workflow_dir + "/graphmls"
os.makedirs(graphmls_dir, exist_ok=True)
wf_utils.export_dag(wf_id, workflow_dir, graphmls_dir, no_dag_dir=True)

wf_state = f'Archived/{final_state}' if final_state is not None else 'Archived'
db.workflows.update_workflow_state(wf_id, wf_state)
Expand Down
6 changes: 3 additions & 3 deletions beeflow/wf_manager/resources/wf_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -294,12 +294,12 @@ def setup_workflow(wf_id, wf_name, wf_dir, wf_workdir, no_start, workflow=None,
start_workflow(wf_id)


def export_dag(wf_id):
def export_dag(wf_id, output_dir, graphmls_dir, no_dag_dir):
"""Export the DAG of the workflow."""
wfi = get_workflow_interface(wf_id)
wfi.export_graphml()
update_graphml(wf_id)
generate_viz(wf_id)
update_graphml(wf_id, graphmls_dir)
generate_viz(wf_id, output_dir, graphmls_dir, no_dag_dir)


def start_workflow(wf_id):
Expand Down
5 changes: 3 additions & 2 deletions ci/bee_install.sh
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,9 @@ printf "**Setting up BEE containers**\n"
printf "\n\n"
mkdir -p $HOME/img
# Pull the Neo4j container
ch-image pull neo4j:5.17 || exit 1
ch-convert -i ch-image -o tar neo4j:5.17 $NEO4J_CONTAINER || exit 1
chmod +x ./ci/install_apoc_docker
ch-image build -t apoc_neo4j -f ./ci/install_apoc_docker ./ci || exit 1
ch-convert -i ch-image -o tar apoc_neo4j $NEO4J_CONTAINER || exit 1
# Pull the Redis container
ch-image pull redis || exit 1
ch-convert -i ch-image -o tar redis $REDIS_CONTAINER || exit 1
Expand Down
4 changes: 3 additions & 1 deletion ci/deps_install.sh
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ sudo apt-get install -y slurmctld slurmd slurmrestd munge python3 python3-venv \
curl build-essential zlib1g-dev libncurses5-dev libgdbm-dev libnss3-dev \
libssl-dev libsqlite3-dev libreadline-dev libffi-dev libbz2-dev \
libmunge-dev \
libyaml-dev # needed for PyYAML
libyaml-dev # needed for PyYAML

sudo apt-get install -y graphviz libgraphviz-dev

# Install most recent Charliecloud
curl -O -L https://github.com/hpc/charliecloud/releases/download/v${CHARLIECLOUD_VERSION}/charliecloud-${CHARLIECLOUD_VERSION}.tar.gz
Expand Down
2 changes: 2 additions & 0 deletions ci/install_apoc_docker
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
FROM neo4j:5.17-community
RUN cp /var/lib/neo4j/labs/apoc-5.17.0-core.jar /var/lib/neo4j/plugins/apoc-5.17.8-core.jar
8 changes: 6 additions & 2 deletions docs/sphinx/commands.rst
Original file line number Diff line number Diff line change
Expand Up @@ -105,10 +105,14 @@ Arguments:
Arguments:
WF_ID [required]

``beeflow dag``: Export a directed acyclic graph (DAG) of a submitted workflow. This command can be run at any point of the workflow. To see the DAG of a workflow before it runs, submit the workflow with the ``--no-start`` flag and then use the dag command. The DAGs are exported to ~/.beeflow/dags. See :ref:`workflow-visualization` for more information.
``beeflow dag``: Export a directed acyclic graph (DAG) of a submitted workflow. This command can be run at any point of the workflow. To see the DAG of a workflow before it runs, submit the workflow with the ``--no-start`` flag and then use the dag command. The DAGs are exported to $OUTPUT_DIR/$WD_ID-dags by default. If the ``no-dag-dir`` flag is specified when the dag command is run, the DAG will be exported to $OUTPUT_DIR. The dag command makes multiple versions of the DAGs. The most recent version is $WF_ID.png and the others are $WD_ID_v1.png, $WF_ID_v2.png ... where v1 is the oldest. See :ref:`workflow-visualization` for more information.

Arguments:
WF_ID [required]
- WF_ID [required]
- OUTPUT_DIR, Directory for the output [required]

Options:
``no-dag-dir``: Do not make a subdirectory within the output_dir for the DAGs.

Generating and Managing Configuration Files
===========================================
Expand Down
11 changes: 8 additions & 3 deletions docs/sphinx/visualization.rst
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ Workflow Visualization
**********************

BEE includes a simple command for viewing BEE workflows. By using the ``beeflow
dag $ID`` command, you can view the directed acyclic graph (DAG) of any submitted
dag $ID $OUTPUT_DIR`` command, you can view the directed acyclic graph (DAG) of any submitted
workflow.

Creating DAGs
Expand All @@ -13,8 +13,13 @@ Creating DAGs
The dag command can be run at any point of the workflow, and can
be run multiple times. To see the DAG of a workflow before it runs, submit
the workflow with the ``--no-start`` flag and then use the dag command. The
DAGs are exported in PNG format to ~/.beeflow/dags. They follow the naming
convention ``$ID.png``
DAGs are exported in PNG format to $OUTPUT_DIR/$WD_ID-dags by default. If the
``no-dag-dir`` flag is specified when the dag command is run, the DAG will be
exported to $OUTPUT_DIR. The dag command makes multiple versions of the DAGs. The
most recent version is $WF_ID.png and the others are $WD_ID_v1.png,
$WF_ID_v2.png ... where v1 is the oldest. The graphmls used to make the DAGs are saved
in the workflow archive and are saved with their version number. These graphmls can
be useful for debugging when there are errors creating the DAGs.

Example DAG
===========
Expand Down

0 comments on commit b48c061

Please sign in to comment.