Skip to content

Commit

Permalink
Linting for refactor wf status
Browse files Browse the repository at this point in the history
  • Loading branch information
kabir-vats committed Sep 10, 2024
1 parent 298071b commit 4734930
Show file tree
Hide file tree
Showing 3 changed files with 7 additions and 10 deletions.
8 changes: 4 additions & 4 deletions beeflow/tests/test_wf_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,8 @@ def teardown_workflow():
def setup_teardown_workflow(teardown_workflow, mocker, temp_db):
"""Set up and tear down for tests that use the workflow directory."""
mocker.patch('beeflow.wf_manager.resources.wf_utils.get_workflow_interface',
return_value=MockWFI())
mocker.patch('beeflow.wf_manager.resources.wf_utils.get_db_path', new=lambda:temp_db.db_file)
return_value=MockWFI())
mocker.patch('beeflow.wf_manager.resources.wf_utils.get_db_path', new=lambda: temp_db.db_file)
mocker.patch('beeflow.wf_manager.resources.wf_utils.connect_db', new=mock_connect_db)
wf_utils.create_workflow_dir(WF_ID)
wf_utils.create_wf_status(WF_ID)
Expand Down Expand Up @@ -174,7 +174,7 @@ def test_start_workflow(client, mocker, temp_db):
def test_workflow_status(client, mocker, setup_teardown_workflow, temp_db):
"""Test getting workflow status."""
mocker.patch('beeflow.wf_manager.resources.wf_utils.get_workflow_interface',
return_value=MockWFI())
return_value=MockWFI())
mocker.patch('beeflow.wf_manager.resources.wf_utils.get_db_path', new=lambda: temp_db.db_file)
mocker.patch('beeflow.wf_manager.resources.wf_actions.db_path', temp_db.db_file)
wf_name = 'wf'
Expand Down Expand Up @@ -252,7 +252,7 @@ def test_resume_workflow(client, mocker, setup_teardown_workflow, temp_db):
mocker.patch('beeflow.wf_manager.resources.wf_utils.get_workflow_interface',
return_value=MockWFI())
mocker.patch('beeflow.tests.mocks.MockWFI.get_workflow_state',
return_value='PAUSED')
return_value='PAUSED')
mocker.patch('beeflow.wf_manager.resources.wf_utils.connect_db', new=mock_connect_db)
mocker.patch('beeflow.wf_manager.resources.wf_utils.submit_tasks_tm', return_value=None)
mocker.patch('beeflow.wf_manager.resources.wf_utils.submit_tasks_scheduler', return_value=None)
Expand Down
5 changes: 1 addition & 4 deletions beeflow/wf_manager/resources/wf_actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ def __init__(self):

def post(self, wf_id):
"""Start workflow. Send ready tasks to the task manager."""
db = connect_db(wfm_db, db_path)
if wf_utils.start_workflow(wf_id):
resp = make_response(jsonify(msg='Started workflow!', status='ok'), 200)
else:
Expand All @@ -45,7 +44,7 @@ def get(wf_id):

for task in tasks:
tasks_status.append((task.id, task.name, task.state))
wf_status = wf_utils.read_wf_status(wf_id)
wf_status = wf_utils.read_wf_status(wf_id)

resp = make_response(jsonify(tasks_status=tasks_status,
wf_status=wf_status, status='ok'), 200)
Expand All @@ -57,7 +56,6 @@ def delete(self, wf_id):
option = self.reqparse.parse_args()['option']
db = connect_db(wfm_db, db_path)
if option == "cancel":
wfi = wf_utils.get_workflow_interface(wf_id)
# Remove all tasks currently in the database
wf_utils.update_wf_status(wf_id, 'Cancelled')
log.info(f"Workflow {wf_id} cancelled")
Expand All @@ -76,7 +74,6 @@ def delete(self, wf_id):

def patch(self, wf_id):
"""Pause or resume workflow."""
db = connect_db(wfm_db, db_path)
self.reqparse.add_argument('option', type=str, location='json')
option = self.reqparse.parse_args()['option']

Expand Down
4 changes: 2 additions & 2 deletions beeflow/wf_manager/resources/wf_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ def update_wf_status(wf_id, status_msg):
wfi = get_workflow_interface(wf_id)
wfi.set_workflow_state(status_msg)


def read_wf_status(wf_id):
"""Read workflow status metadata file."""
bee_workdir = get_bee_workdir()
Expand Down Expand Up @@ -302,9 +303,8 @@ def setup_workflow(wf_id, wf_name, wf_dir, wf_workdir, no_start, workflow=None,

def start_workflow(wf_id):
"""Attempt to start the workflow, returning True if successful."""
db = connect_db(wfm_db, get_db_path())
wfi = get_workflow_interface(wf_id)
state = read_wf_status(wf_id)
state = read_wf_status(wf_id)
if state in ('RUNNING', 'PAUSED', 'COMPLETED'):
return False
wfi.execute_workflow()
Expand Down

0 comments on commit 4734930

Please sign in to comment.