Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Export dag #909

Merged
merged 32 commits into from
Sep 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
e3069ed
added changes made to visualize-dag from 08/19/24
Aug 28, 2024
5681127
added changes from e2f35ac95c694bc0fae75977f20e0cf82d1151ee in visual…
Aug 28, 2024
accafb3
added changes from 12c3725f405245b91539c961df2630dc8abc8d8d in visual…
Aug 28, 2024
2e284d1
added changes from 6a244f8fa49c2b78463ba6fd727b9e69091d15ef on visual…
Aug 28, 2024
2854c94
decreased verbosity of beeflow dag command
Aug 28, 2024
090a17e
fixed some linting errors
Aug 29, 2024
e65a6a8
linting error
Aug 29, 2024
eb2ca01
fixed indent
Aug 29, 2024
16e71fa
liniting
Aug 29, 2024
811a449
white space fix
Aug 29, 2024
27f1ad3
white space fix
Aug 29, 2024
8d0672b
changed names of functions that export the graphml to export_graphml
Sep 10, 2024
39939ad
integrated generate_graph into the bee code
Sep 10, 2024
ebc7e35
don't export intermediate .dot file
Sep 10, 2024
19613a4
fixed linting error
Sep 11, 2024
bb3f845
added command to documentation
Sep 11, 2024
69441bc
updated visualization page
Sep 11, 2024
b933512
added reference to visualization page
Sep 11, 2024
99bbe69
updated neo4j path instructions
Sep 11, 2024
2462781
hopefully fixed merge conflict
Sep 11, 2024
58d876d
attempt to fix conflict
Sep 11, 2024
eb0e1cb
added graphviz to the poetry.lock in develop
Sep 12, 2024
b9ce644
added graphml_key_updater to add any missing keys
Sep 12, 2024
c8a9762
added hint and requirement nodes to generate_graph
Sep 12, 2024
e506c69
fixed graphml_key_updater linting errors
Sep 12, 2024
647bf66
refactored generate_graph to fix linting error C901
Sep 12, 2024
c2f081a
Merge branch 'develop' into export-dag
Sep 12, 2024
d65a90f
fixed orientation of hint and requirement nodes in generate_graph
Sep 12, 2024
c2fe32d
Updated coverage.svg
github-actions[bot] Sep 12, 2024
f09f3e1
fixed generate_graph and graphml_key_updater linting errors
Sep 12, 2024
85d4edb
Merge branch 'export-dag' of github.com:lanl/BEE into export-dag
Sep 12, 2024
fbd05a4
shortened id in secho statement and made text green
Sep 13, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions beeflow/client/bee_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@


logging.basicConfig(level=logging.WARNING)
logging.getLogger("neo4j").setLevel(logging.WARNING)
WORKFLOW_MANAGER = 'bee_wfm/v1/jobs/'


Expand Down Expand Up @@ -623,6 +624,14 @@ def reexecute(wf_name: str = typer.Argument(..., help='The workflow name'),
return wf_id


@app.command()
def dag(wf_id: str = typer.Argument(..., callback=match_short_id)):
"""Export a DAG of the workflow to a GraphML file."""
wf_utils.export_dag(wf_id)
typer.secho(f"DAG for workflow {_short_id(wf_id)} has been exported successfully.",
fg=typer.colors.GREEN)


@app.callback(invoke_without_command=True)
def version_callback(version: bool = False):
"""Beeflow."""
Expand Down
26 changes: 26 additions & 0 deletions beeflow/common/deps/neo4j_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
run_dir = mount_dir + '/run'
certs_dir = mount_dir + '/certificates'
confs_dir = mount_dir + "/conf"
dags_dir = os.path.join(bee_workdir, 'dags')
graphmls_dir = dags_dir + "/graphmls"
container_path = container_manager.get_container_dir('neo4j')
log = bee_logging.setup('neo4j')

Expand Down Expand Up @@ -59,6 +61,8 @@ def setup_mounts():
os.makedirs(mount_dir, exist_ok=True)
os.makedirs(certs_dir, exist_ok=True)
os.makedirs(run_dir, exist_ok=True)
os.makedirs(dags_dir, exist_ok=True)
os.makedirs(graphmls_dir, exist_ok=True)


def setup_configs(bolt_port, http_port, https_port):
Expand Down Expand Up @@ -86,6 +90,24 @@ def setup_configs(bolt_port, http_port, https_port):
with open(gdb_configfile, "wt", encoding="utf8") as cfile:
cfile.write(data)

apoc_configfile = os.path.join(confs_dir, "apoc.conf")
if not os.path.exists(apoc_configfile):
with open(apoc_configfile, "wt", encoding="utf8") as afile:
afile.write("apoc.export.file.enabled=true\n")
log.info(f"Created {apoc_configfile} with apoc.export.file.enabled=true")
else:
with open(apoc_configfile, "rt", encoding="utf8") as afile:
apoc_data = afile.read()

apoc_config = r'#?(apoc.export.file.enabled=)[^\n]*'
if re.search(apoc_config, apoc_data):
apoc_data = re.sub(apoc_config, r'apoc.export.file.enabled=true', apoc_data)
else:
apoc_data += "\napoc.export.file.enabled=true\n"

with open(apoc_configfile, "wt", encoding="utf8") as afile:
afile.write(apoc_data)


def create_credentials():
"""Create the password and set the logfiles in environment."""
Expand All @@ -95,10 +117,12 @@ def create_credentials():
subprocess.run([
"ch-run",
"--set-env=" + container_path + "/ch/environment",
"--set-env=apoc.export.file.enabled=true",
"-b", confs_dir + ":/var/lib/neo4j/conf",
"-b", data_dir + ":/data",
"-b", logs_dir + ":/logs",
"-b", run_dir + ":/var/lib/neo4j/run", container_path,
"-W", "-b", graphmls_dir + ":/var/lib/neo4j/import",
"--", *command
], check=True)
except subprocess.CalledProcessError:
Expand All @@ -112,11 +136,13 @@ def create_database():
proc = subprocess.Popen([ #noqa can't use with because returning
"ch-run",
"--set-env=" + container_path + "/ch/environment",
"--set-env=apoc.export.file.enabled=true",
"-b", confs_dir + ":/var/lib/neo4j/conf",
"-b", data_dir + ":/data",
"-b", logs_dir + ":/logs",
"-b", run_dir + ":/var/lib/neo4j/run",
"-b", certs_dir + ":/var/lib/neo4j/certificates",
"-W", "-b", graphmls_dir + ":/var/lib/neo4j/import",
container_path, "--", *command
], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
wait_gdb()
Expand Down
4 changes: 4 additions & 0 deletions beeflow/common/gdb/gdb_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -287,3 +287,7 @@ def workflow_completed(self):
@abstractmethod
def close(self):
"""Close the connection to the graph database."""

@abstractmethod
def export_graphml(self):
"""Export a BEE workflow as a graphml."""
81 changes: 81 additions & 0 deletions beeflow/common/gdb/generate_graph.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
"""Module to make a png of a graph from a graphml file."""

import os
import networkx as nx
import graphviz

from beeflow.common import paths

bee_workdir = paths.workdir()
dags_dir = os.path.join(bee_workdir, 'dags')
graphmls_dir = dags_dir + "/graphmls"


def generate_viz(wf_id):
"""Generate a PNG of a workflow graph from a GraphML file."""
short_id = wf_id[:6]
graphml_path = graphmls_dir + "/" + short_id + ".graphml"
output_path = dags_dir + "/" + short_id

# Load the GraphML file using NetworkX
graph = nx.read_graphml(graphml_path)

# Initialize Graphviz graph
dot = graphviz.Digraph(comment='Hierarchical Graph')

# Add nodes and edges using helper functions
add_nodes_to_dot(graph, dot)
add_edges_to_dot(graph, dot)

# Render the graph and save as PNG
png_data = dot.pipe(format='png')
with open(output_path + ".png", "wb") as png_file:
png_file.write(png_data)


def add_nodes_to_dot(graph, dot):
"""Add nodes from the graph to the Graphviz object with labels and colors."""
label_to_color = {
":Workflow": 'steelblue',
":Output": 'mediumseagreen',
":Metadata": 'skyblue',
":Task": 'lightcoral',
":Input": 'sandybrown',
":Hint": 'plum',
":Requirement": 'lightpink1'
}

for node_id, attributes in graph.nodes(data=True):
label = attributes.get('labels', node_id)
node_label, color = get_node_label_and_color(label, attributes, label_to_color)
dot.node(node_id, label=node_label, style='filled', fillcolor=color)


def get_node_label_and_color(label, attributes, label_to_color):
"""Return the appropriate node label and color based on node type."""
label_to_attribute = {
":Workflow": "Workflow",
":Output": attributes.get('value', label),
":Metadata": attributes.get('state', label),
":Task": attributes.get('name', label),
":Input": attributes.get('source', label),
":Hint": attributes.get('class', label),
":Requirement": attributes.get('class', label)
}

# Check if the label is in the predefined labels
if label in label_to_attribute:
return label_to_attribute[label], label_to_color.get(label, 'gray')

# Default case if no match
return label, 'gray'


def add_edges_to_dot(graph, dot):
"""Add edges from the graph to the Graphviz object with appropriate labels."""
for source, target, attributes in graph.edges(data=True):
edge_label = attributes.get('label', '')
if edge_label in ('INPUT_OF', 'DESCRIBES', 'HINT_OF', 'REQUIREMENT_OF'):
dot.edge(source, target, label=edge_label, fontsize="10")
else:
dot.edge(target, source, label=edge_label, fontsize="10")
60 changes: 60 additions & 0 deletions beeflow/common/gdb/graphml_key_updater.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
"""Module to make sure all required keys are present."""

import xml.etree.ElementTree as ET
import os

from beeflow.common import paths

bee_workdir = paths.workdir()
dags_dir = os.path.join(bee_workdir, 'dags')
graphmls_dir = dags_dir + "/graphmls"

expected_keys = {"id", "name", "state", "class", "type", "value", "source",
"workflow_id", "base_command", "stdout", "stderr", "default",
"prefix", "position", "value_from", "glob"}

default_key_definitions = {
"id": {"for": "node", "attr.name": "id", "attr.type": "string"},
"name": {"for": "node", "attr.name": "name", "attr.type": "string"},
"state": {"for": "node", "attr.name": "state", "attr.type": "string"},
"class": {"for": "node", "attr.name": "class", "attr.type": "string"},
"type": {"for": "node", "attr.name": "type", "attr.type": "string"},
"value": {"for": "node", "attr.name": "value", "attr.type": "string"},
"source": {"for": "node", "attr.name": "source", "attr.type": "string"},
"workflow_id": {"for": "node", "attr.name": "workflow_id", "attr.type": "string"},
"base_command": {"for": "node", "attr.name": "base_command", "attr.type": "string"},
"stdout": {"for": "node", "attr.name": "stdout", "attr.type": "string"},
"stderr": {"for": "node", "attr.name": "stderr", "attr.type": "string"},
"default": {"for": "node", "attr.name": "default", "attr.type": "string"},
"prefix": {"for": "node", "attr.name": "prefix", "attr.type": "string"},
"position": {"for": "node", "attr.name": "position", "attr.type": "long"},
"value_from": {"for": "node", "attr.name": "value_from", "attr.type": "string"},
"glob": {"for": "node", "attr.name": "glob", "attr.type": "string"},
}


def update_graphml(wf_id):
"""Update GraphML file by ensuring required keys are present and updating its structure."""
short_id = wf_id[:6]
graphml_path = graphmls_dir + "/" + short_id + ".graphml"
# Parse the GraphML file and preserve namespaces
tree = ET.parse(graphml_path)
root = tree.getroot()

name_space = {'graphml': 'http://graphml.graphdrawing.org/xmlns'}
defined_keys = {key.attrib['id'] for key in root.findall('graphml:key', name_space)}
used_keys = {data.attrib['key'] for data in root.findall('.//graphml:data', name_space)}

missing_keys = used_keys - defined_keys

# Insert default key definitions for missing keys
for missing_key in missing_keys:
if missing_key in expected_keys:
default_def = default_key_definitions[missing_key]
key_element = ET.Element(f'{{{name_space["graphml"]}}}key',
id=missing_key,
**default_def)
root.insert(0, key_element)

# Save the updated GraphML file by overwriting the original one
tree.write(graphml_path, encoding='UTF-8', xml_declaration=True)
16 changes: 16 additions & 0 deletions beeflow/common/gdb/neo4j_cypher.py
Original file line number Diff line number Diff line change
Expand Up @@ -725,3 +725,19 @@ def cleanup(tx):
cleanup_query = "MATCH (n) DETACH DELETE n"

tx.run(cleanup_query)


def export_graphml(tx, wf_id):
"""Export BEE workflow as graphml."""
short_id = wf_id[:6]
export_query = (
"WITH \"MATCH (n1)-[r]->(n2) "
f"WHERE n1.workflow_id = '{wf_id}' OR n2.workflow_id = '{wf_id}' "
"RETURN r, n1, n2\" AS query "
f"CALL apoc.export.graphml.query(query, '{short_id}.graphml', {{useTypes: true}}) "
"YIELD file, source, format, nodes, relationships, properties, time, rows, batchSize, "
"batches, done, data "
"RETURN file, source, format, nodes, relationships, properties, time, rows, batchSize, "
"batches, done, data"
)
tx.run(export_query)
5 changes: 5 additions & 0 deletions beeflow/common/gdb/neo4j_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -473,6 +473,11 @@ def _write_transaction(self, tx_fun, **kwargs):
with self._driver.session() as session:
session.write_transaction(tx_fun, **kwargs)

def export_graphml(self, workflow_id):
"""Export a BEE workflow as a graphml."""
with self._driver.session() as session:
session.write_transaction(tx.export_graphml, wf_id=workflow_id)


def _reconstruct_requirements(req_records):
"""Reconstruct requirements by their records retrieved from Neo4j.
Expand Down
4 changes: 4 additions & 0 deletions beeflow/common/wf_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -304,3 +304,7 @@ def workflow_completed(self):
:rtype: bool
"""
return self._gdb_driver.workflow_completed(self._workflow_id)

def export_graphml(self):
"""Export a BEE workflow as a graphml."""
self._gdb_driver.export_graphml(self._workflow_id)
10 changes: 10 additions & 0 deletions beeflow/wf_manager/resources/wf_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
from beeflow.common import log as bee_logging
from beeflow.common.config_driver import BeeConfig as bc
from beeflow.common.gdb import neo4j_driver
from beeflow.common.gdb.generate_graph import generate_viz
from beeflow.common.gdb.graphml_key_updater import update_graphml
from beeflow.common.wf_interface import WorkflowInterface
from beeflow.common.connection import Connection
from beeflow.common import paths
Expand Down Expand Up @@ -292,6 +294,14 @@ def setup_workflow(wf_id, wf_name, wf_dir, wf_workdir, no_start, workflow=None,
start_workflow(wf_id)


def export_dag(wf_id):
"""Export the DAG of the workflow."""
wfi = get_workflow_interface(wf_id)
wfi.export_graphml()
update_graphml(wf_id)
generate_viz(wf_id)


def start_workflow(wf_id):
"""Attempt to start the workflow, returning True if successful."""
db = connect_db(wfm_db, get_db_path())
Expand Down
4 changes: 2 additions & 2 deletions coverage.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
5 changes: 5 additions & 0 deletions docs/sphinx/commands.rst
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,11 @@ Arguments:

``beeflow reexecute``: Reexecute an archived workflow.

Arguments:
WF_ID [required]

``beeflow dag``: Export a directed acyclic graph (DAG) of a submitted workflow. This command can be run at any point of the workflow. To see the DAG of a workflow before it runs, submit the workflow with the ``--no-start`` flag and then use the dag command. The DAGs are exported to ~/.beeflow/dags. See :ref:`workflow-visualization` for more information.

Arguments:
WF_ID [required]

Expand Down
Binary file added docs/sphinx/images/941cd5.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
2 changes: 1 addition & 1 deletion docs/sphinx/installation.rst
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ Requirements:
* **Containers**:
Two Charliecloud dependency containers are currently required for BEE: one for the Neo4j graph database and another for Redis. The paths to these containers will need to be set in the BEE configuration later, using the ``neo4j_image`` and the ``redis_image`` options respectively. BEE only supports Neo4j 5.x. We are currently using the latest version of Redis supplied on Docker Hub (as of 2023).

For LANL systems, please use the containers supplied by the BEE team: **/usr/projects/BEE/neo4j-5-17.tar.gz**, **/usr/projects/BEE/redis.tar.gz**.
For LANL systems, please use the containers supplied by the BEE team: **/usr/projects/BEE/neo4j.tar.gz**, **/usr/projects/BEE/redis.tar.gz**.

For other users, these containers can be pulled from Docker Hub (after following `Installation:`_ below) using ``beeflow core pull-deps``, which will download and report the container paths to be set in the config later.

Expand Down
Loading