Skip to content

Commit

Permalink
No Start State (#914)
Browse files Browse the repository at this point in the history
* Add "No Start" workflow state for workflows that are submitted with the --no-start flag

* Updated coverage.svg

---------

Co-authored-by: leahh <leahh@lanl.gov>
Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
  • Loading branch information
3 people authored Sep 11, 2024
1 parent 398ea68 commit ad84827
Show file tree
Hide file tree
Showing 9 changed files with 66 additions and 34 deletions.
7 changes: 5 additions & 2 deletions beeflow/client/bee_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -464,7 +464,10 @@ def query(wf_id: str = typer.Argument(..., callback=match_short_id)):
wf_status = resp.json()['wf_status']
typer.echo(wf_status)
for _task_id, task_name, task_state in tasks_status:
typer.echo(f'{task_name}--{task_state}')
if wf_status == 'No Start':
typer.echo(f'{task_name}')
else:
typer.echo(f'{task_name}--{task_state}')

logging.info('Query workflow: {resp.text}')
return wf_status, tasks_status
Expand Down Expand Up @@ -525,7 +528,7 @@ def cancel(wf_id: str = typer.Argument(..., callback=match_short_id)):
"""Cancel a paused or running workflow."""
long_wf_id = wf_id
wf_status = get_wf_status(wf_id)
if wf_status in ('Running', 'Paused'):
if wf_status in ('Running', 'Paused', 'No Start'):
try:
conn = _wfm_conn()
resp = conn.delete(_resource(long_wf_id), json={'option': 'cancel'}, timeout=60)
Expand Down
2 changes: 1 addition & 1 deletion beeflow/common/gdb/gdb_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ def reset_workflow(self, new_id):
"""

@abstractmethod
def load_task(self, task):
def load_task(self, task, task_state):
"""Load a task into a stored workflow.
Dependencies should be automatically deduced and generated by the graph database
Expand Down
8 changes: 4 additions & 4 deletions beeflow/common/gdb/neo4j_cypher.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ def create_task_output_nodes(tx, task):
value=output.value, glob=output.glob)


def create_task_metadata_node(tx, task):
def create_task_metadata_node(tx, task, task_state):
"""Create a task metadata node in the Neo4j database.
The node holds metadata about a task's execution state.
Expand All @@ -192,9 +192,9 @@ def create_task_metadata_node(tx, task):
:type task: Task
"""
metadata_query = ("MATCH (t:Task {id: $task_id}) "
"CREATE (m:Metadata {state: 'WAITING'})-[:DESCRIBES]->(t)")
"CREATE (m:Metadata {state: $task_state})-[:DESCRIBES]->(t)")

tx.run(metadata_query, task_id=task.id)
tx.run(metadata_query, task_id=task.id, task_state=task_state)


def add_dependencies(tx, task, old_task=None, restarted_task=False):
Expand Down Expand Up @@ -329,7 +329,7 @@ def get_workflow_tasks(tx, wf_id):
:type wf_id: str
:rtype: neo4j.Result
"""
workflow_query = "MATCH t:Task WHERE t.workflow_id = $wf_id RETURN t"
workflow_query = "MATCH (t:Task) WHERE t.workflow_id = $wf_id RETURN t"

return [rec['t'] for rec in tx.run(workflow_query, wf_id=wf_id)]

Expand Down
5 changes: 3 additions & 2 deletions beeflow/common/gdb/neo4j_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ def reset_workflow(self, old_id, new_id):
session.write_transaction(tx.reset_tasks_metadata, wf_id=old_id)
session.write_transaction(tx.reset_workflow_id, old_id=old_id, new_id=new_id)

def load_task(self, task):
def load_task(self, task, task_state):
"""Load a task into a workflow stored in the Neo4j database.
Dependencies are automatically deduced and generated by Neo4j upon loading
Expand All @@ -148,7 +148,8 @@ def load_task(self, task):
session.write_transaction(tx.create_task_requirement_nodes, task=task)
session.write_transaction(tx.create_task_input_nodes, task=task)
session.write_transaction(tx.create_task_output_nodes, task=task)
session.write_transaction(tx.create_task_metadata_node, task=task)
session.write_transaction(tx.create_task_metadata_node, task=task,
task_state=task_state)
session.write_transaction(tx.add_dependencies, task=task)

def initialize_ready_tasks(self, workflow_id):
Expand Down
4 changes: 2 additions & 2 deletions beeflow/common/wf_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ def reset_workflow(self, workflow_id):
self._workflow_id = workflow_id
self._gdb_driver.set_workflow_state(self._workflow_id, 'SUBMITTED')

def add_task(self, task):
def add_task(self, task, task_state):
"""Add a new task to a BEE workflow.
:param task: the name of the file to which to redirect stderr
Expand All @@ -90,7 +90,7 @@ def add_task(self, task):
task.hints = []

# Load the new task into the graph database
self._gdb_driver.load_task(task)
self._gdb_driver.load_task(task, task_state)

def restart_task(self, task, checkpoint_file):
"""Restart a failed BEE workflow task.
Expand Down
7 changes: 4 additions & 3 deletions beeflow/tests/mocks.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,10 +166,10 @@ def reset_workflow(self, old_id, new_id): #noqa not using parameter in mock
self.task_metadata[task_id] = {}
self.task_states[task_id] = 'WAITING'

def load_task(self, task):
def load_task(self, task, task_state):
"""Load a task into a workflow in the graph database."""
self.tasks[task.id] = task
self.task_states[task.id] = 'WAITING'
self.task_states[task.id] = task_state
self.task_metadata[task.id] = {}
self.inputs[task.id] = {}
self.outputs[task.id] = {}
Expand All @@ -186,7 +186,8 @@ def initialize_ready_tasks(self, workflow_id): #noqa not using parameter in mock

def restart_task(self, _old_task, new_task):
"""Create a new task from a failed task checkpoint restart enabled."""
self.load_task(new_task)
task_state = "WAITING"
self.load_task(new_task, task_state)

def finalize_task(self, task):
"""Set a task's state to completed."""
Expand Down
45 changes: 31 additions & 14 deletions beeflow/tests/test_wf_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,9 @@ def test_add_task(self):
stderr=stderr,
workflow_id=workflow_id)

self.wfi.add_task(task)
task_state = "WAITING"

self.wfi.add_task(task, task_state)

# Task object assertions
self.assertEqual(task_name, task.name)
Expand Down Expand Up @@ -225,7 +227,9 @@ def test_restart_task(self):
stderr=stderr,
workflow_id=workflow_id)

self.wfi.add_task(task)
task_state = "WAITING"

self.wfi.add_task(task, task_state)

# Restart the task, should create a new Task
new_task = self.wfi.restart_task(task, test_checkpoint_file)
Expand Down Expand Up @@ -322,7 +326,9 @@ def test_get_task_by_id(self):
stderr=stderr,
workflow_id=workflow_id)

self.wfi.add_task(task)
task_state = "WAITING"

self.wfi.add_task(task, task_state)

self.assertEqual(task, self.wfi.get_task_by_id(task.id))

Expand Down Expand Up @@ -435,9 +441,11 @@ def test_get_task_state(self):
None)],
[StepOutput("test_task/output", "File", "output.txt", "output.txt")],
None, None, workflow_id)
self.wfi.add_task(task)
task_state = "WAITING"

self.wfi.add_task(task, task_state)

# Should be WAITING because workflow not yet executed
# Should be WAITING
self.assertEqual("WAITING", self.wfi.get_task_state(task))

def test_set_task_state(self):
Expand All @@ -454,7 +462,8 @@ def test_set_task_state(self):
None)],
[StepOutput("test_task/output", "File", "output.txt", "output.txt")],
None, None, workflow_id)
self.wfi.add_task(task)
task_state = "WAITING"
self.wfi.add_task(task, task_state)

self.wfi.set_task_state(task, "RUNNING")

Expand All @@ -475,7 +484,8 @@ def test_get_task_metadata(self):
None)],
[StepOutput("test_task/output", "File", "output.txt", "output.txt")],
None, None, workflow_id)
self.wfi.add_task(task)
task_state = "WAITING"
self.wfi.add_task(task, task_state)
metadata = {"cluster": "fog", "crt": "charliecloud",
"container_md5": "67df538c1b6893f4276d10b2af34ccfe", "job_id": 1337}

Expand All @@ -496,7 +506,8 @@ def test_set_task_metadata(self):
None)],
[StepOutput("test_task/output", "File", "output.txt", "output.txt")],
None, None, workflow_id)
self.wfi.add_task(task)
task_state = "WAITING"
self.wfi.add_task(task, task_state)
metadata = {"cluster": "fog", "crt": "charliecloud",
"container_md5": "67df538c1b6893f4276d10b2af34ccfe", "job_id": 1337}

Expand All @@ -522,7 +533,8 @@ def test_get_task_input(self):
None)],
[StepOutput("test_task/output", "File", "output.txt", "output.txt")],
None, None, workflow_id)
self.wfi.add_task(task)
task_state = "WAITING"
self.wfi.add_task(task, task_state)

self.assertEqual(task.inputs[0], self.wfi.get_task_input(task, "test_input"))

Expand All @@ -539,7 +551,8 @@ def test_set_task_input(self):
[StepInput("test_input", "File", None, "default.txt", "test_input", None, None, None)],
[StepOutput("test_task/output", "File", "output.txt", "output.txt")],
None, None, workflow_id)
self.wfi.add_task(task)
task_state = "WAITING"
self.wfi.add_task(task, task_state)

test_input = StepInput("test_input", "File", "input.txt", "default.txt", "test_input",
None, None, None)
Expand All @@ -560,7 +573,8 @@ def test_get_task_output(self):
None)],
[StepOutput("test_task/output", "File", "output.txt", "output.txt")],
None, None, workflow_id)
self.wfi.add_task(task)
task_state = "WAITING"
self.wfi.add_task(task, task_state)

self.assertEqual(task.outputs[0], self.wfi.get_task_output(task, "test_task/output"))

Expand All @@ -578,7 +592,8 @@ def test_set_task_output(self):
None, None)],
[StepOutput("test_task/output", "File", None, "output.txt")],
None, None, workflow_id)
self.wfi.add_task(task)
task_state = "WAITING"
self.wfi.add_task(task, task_state)

test_output = StepOutput("test_task/output", "File", "output.txt", "output.txt")
self.wfi.set_task_output(task, "test_task/output", "output.txt")
Expand All @@ -599,7 +614,8 @@ def test_workflow_completed(self):
[StepOutput("test_task/output", "File", "output.txt",
"output.txt")],
None, None, workflow_id)
self.wfi.add_task(task)
task_state = "WAITING"
self.wfi.add_task(task, task_state)

# Workflow not completed
self.assertFalse(self.wfi.workflow_completed())
Expand Down Expand Up @@ -662,8 +678,9 @@ def _create_test_tasks(self, workflow_id):
workflow_id=workflow_id)
]

task_state = "WAITING"
for task in tasks:
self.wfi.add_task(task)
self.wfi.add_task(task, task_state)

return tasks

Expand Down
18 changes: 14 additions & 4 deletions beeflow/wf_manager/resources/wf_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -273,17 +273,20 @@ def setup_workflow(wf_id, wf_name, wf_dir, wf_workdir, no_start, workflow=None,
create_wf_metadata(wf_id, wf_name)
db = connect_db(wfm_db, get_db_path())
for task in tasks:
wfi.add_task(task)
task_state = "" if no_start else "WAITING"
wfi.add_task(task, task_state)
metadata = wfi.get_task_metadata(task)
metadata['workdir'] = wf_workdir
wfi.set_task_metadata(task, metadata)
db.workflows.add_task(task.id, wf_id, task.name, "WAITING")
db.workflows.add_task(task.id, wf_id, task.name, task_state)

update_wf_status(wf_id, 'Waiting')
db.workflows.update_workflow_state(wf_id, 'Waiting')
if no_start:
update_wf_status(wf_id, 'No Start')
db.workflows.update_workflow_state(wf_id, 'No Start')
log.info('Not starting workflow, as requested')
else:
update_wf_status(wf_id, 'Waiting')
db.workflows.update_workflow_state(wf_id, 'Waiting')
log.info('Starting workflow')
db.workflows.update_workflow_state(wf_id, 'Running')
start_workflow(wf_id)
Expand All @@ -296,6 +299,13 @@ def start_workflow(wf_id):
state = wfi.get_workflow_state()
if state in ('RUNNING', 'PAUSED', 'COMPLETED'):
return False
_, tasks = wfi.get_workflow()
tasks.reverse()
for task in tasks:
task_state = wfi.get_task_state(task)
if task_state == '':
wfi.set_task_state(task, 'WAITING')
db.workflows.update_task_state(task.id, wf_id, 'WAITING')
wfi.execute_workflow()
tasks = wfi.get_ready_tasks()
schedule_submit_tasks(wf_id, tasks)
Expand Down
4 changes: 2 additions & 2 deletions coverage.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.

0 comments on commit ad84827

Please sign in to comment.