Skip to content

Commit

Permalink
Fix reset error issue #757: caused errors when there were active work…
Browse files Browse the repository at this point in the history
…flows. This commit performs the following to solve the errors:

 - allow reset when there are active workflows, remove workflows
 - add ability to remove or cancel paused workflows
 - check workflows for active states
 - warn user there are active states before continuing
 - stop all processes for active workflows
 - add get_workflow_list method to eliminiate duplicate code
 - make response text message clearer to read in code
 - add timestamp to name for unique backup if archive flag is set
 - simplify complexity of reset function
 - print warning and possible course of action when intializing workflows cause directory removal to fail
  • Loading branch information
pagrubel committed Mar 19, 2024
1 parent 2ea76d9 commit c42435f
Show file tree
Hide file tree
Showing 2 changed files with 133 additions and 94 deletions.
55 changes: 22 additions & 33 deletions beeflow/client/bee_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,15 +106,25 @@ def _resource(tag=""):
return _url() + str(tag)


def get_wf_list():
"""Get the list of all workflows."""
try:
conn = _wfm_conn()
resp = conn.get(_url(), timeout=60)
except requests.exceptions.ConnectionError:
error_exit('Could not reach WF Manager.')

if resp.status_code != requests.codes.okay: # pylint: disable=no-member
error_exit('WF Manager did not return workflow list')

logging.info('List Jobs: {resp.text}')
return jsonpickle.decode(resp.json()['workflow_list'])


def check_short_id_collision():
"""Check short workflow IDs for colliions; increase short ID length if detected."""
global short_id_len #noqa: Not a constant
conn = _wfm_conn()
resp = conn.get(_url(), timeout=60)
if resp.status_code != requests.codes.okay: # pylint: disable=no-member
error_exit(f"Checking for ID collision failed: {resp.status_code}")

workflow_list = jsonpickle.decode(resp.json()['workflow_list'])
workflow_list = get_wf_list()
if workflow_list:
while short_id_len < MAX_ID_LEN:
id_list = [_short_id(job[1]) for job in workflow_list]
Expand All @@ -133,18 +143,7 @@ def check_short_id_collision():
def match_short_id(wf_id):
"""Match user-provided short workflow ID to full workflow IDs."""
matched_ids = []

try:
conn = _wfm_conn()
resp = conn.get(_url(), timeout=60)
except requests.exceptions.ConnectionError:
error_exit('Could not reach WF Manager.')

if resp.status_code != requests.codes.okay: # pylint: disable=no-member
error_exit(f'Could not match ID: {wf_id}. Code {resp.status_code}')
# raise ApiError("GET /jobs".format(resp.status_code))

workflow_list = jsonpickle.decode(resp.json()['workflow_list'])
workflow_list = get_wf_list()
if workflow_list:
for job in workflow_list:
if job[1].startswith(wf_id):
Expand Down Expand Up @@ -370,12 +369,12 @@ def package(wf_path: pathlib.Path = typer.Argument(...,

@app.command()
def remove(wf_id: str = typer.Argument(..., callback=match_short_id)):
"""Remove a cancelled or archived workflow with a workflow ID."""
"""Remove cancelled, paused, or archived workflow with a workflow ID."""
long_wf_id = wf_id

wf_status = get_wf_status(wf_id)
print(f"Workflow Status is {wf_status}")
if wf_status in ('Cancelled', 'Archived'):
if wf_status in ('Cancelled', 'Archived', 'Paused'):
verify = f"All stored information for workflow {_short_id(wf_id)} will be removed."
verify += "\nContinue to remove? yes(y)/no(n): """
response = input(verify)
Expand Down Expand Up @@ -421,17 +420,7 @@ def unpackage(package_path, dest_path):
@app.command('list')
def list_workflows():
"""List all workflows."""
try:
conn = _wfm_conn()
resp = conn.get(_url(), timeout=60)
except requests.exceptions.ConnectionError:
error_exit('Could not reach WF Manager.')

if resp.status_code != requests.codes.okay: # pylint: disable=no-member
error_exit('WF Manager did not return workflow list')

logging.info('List Jobs: {resp.text}')
workflow_list = jsonpickle.decode(resp.json()['workflow_list'])
workflow_list = get_wf_list()
if workflow_list:
typer.secho("Name\tID\tStatus", fg=typer.colors.GREEN)

Expand Down Expand Up @@ -521,10 +510,10 @@ def resume(wf_id: str = typer.Argument(..., callback=match_short_id)):

@app.command()
def cancel(wf_id: str = typer.Argument(..., callback=match_short_id)):
"""Cancel a workflow."""
"""Cancel a paused or running workflow."""
long_wf_id = wf_id
wf_status = get_wf_status(wf_id)
if wf_status == "Running":
if wf_status in ('Running', 'Paused'):
try:
conn = _wfm_conn()
resp = conn.delete(_resource(long_wf_id), json={'option': 'cancel'}, timeout=60)
Expand Down
172 changes: 111 additions & 61 deletions beeflow/client/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,24 @@
import socket
import sys
import shutil
import datetime
import time

import daemon
import typer

from beeflow.client import bee_client
from beeflow.common.config_driver import BeeConfig as bc
from beeflow.common import cli_connection
from beeflow.common import paths
from beeflow.wf_manager.resources import wf_utils

from beeflow.common.db import wfm_db
from beeflow.common.db.bdb import connect_db
from beeflow.wf_manager.common import dep_manager

db_path = wf_utils.get_db_path()


class ComponentManager:
"""Component manager class."""
Expand Down Expand Up @@ -431,12 +439,16 @@ def status():


@app.command()
def stop():
def stop(query='yes'):
"""Stop the current running beeflow daemon."""
stop_msg = ("\n** Please ensure all workflows are complete before stopping beeflow. **"
+ "\n** Check the status of workflows by running 'beeflow list'. **"
+ "\nAre you sure you want to kill beeflow components? [y/n] ")
ans = input(stop_msg)
if query == 'yes':
ans = input("""
** Please ensure all workflows are complete before stopping beeflow. **
** Check the status of workflows by running 'beeflow list'. **
Are you sure you want to kill beeflow components? [y/n] """)
else:
ans = 'y'
if ans.lower() != 'y':
return
resp = cli_connection.send(paths.beeflow_socket(), {'type': 'quit'})
Expand All @@ -448,74 +460,112 @@ def stop():
sys.exit(1)
# As long as it returned something, we should be good
beeflow_log = paths.log_fname('beeflow')
print(f'Beeflow has stopped. Check the log at "{beeflow_log}".')
if query == "no":
print(f'Beeflow has stopped. Check the log at "{beeflow_log}".')


def kill_active_workflows(active_states, workflow_list):
"""Kill workflows with active states."""
db = connect_db(wfm_db, db_path)
success = True
for name, wf_id, state in workflow_list:
if state in active_states:
pid = db.workflows.get_gdb_pid(wf_id)
if pid > 0:
dep_manager.kill_gdb(pid)
else:
# Failure most likely caused by an Initializing workflow.
print(f"No process for {name}, {wf_id}, {state}.")
success = False
return success


def archive_dir(dir_to_archive):
"""Archive directories for archive flag in reset."""
archive_dirs = ['logs', 'container_archive', 'archives', 'workflows']
date_str = f"{datetime.datetime.now().strftime('%Y-%m-%d_%H%M%S')}"
backup_dir = f"{dir_to_archive}.{date_str}"
for a_dir in archive_dirs:
try:
shutil.copytree(f"{dir_to_archive}/{a_dir}",
f"{backup_dir}/{a_dir}")
except FileNotFoundError:
pass
print("Archive flag enabled.",
"Existing logs, containers, and workflows backed up in:\n"
f"{backup_dir}")


def handle_rm_error(err, dir_to_check, wf_list):
"""Handle IO error caused by either initializing workflows or nfs files."""
# Check if only nfs mounts are causing the problem and ignore
dir_list = os.listdir(dir_to_check)
nfs_list = [x for x in dir_list if x.startswith('.nfs')]
if dir_list and (dir_list != nfs_list):
print(f"Unable to remove {dir_to_check} \n {err.strerror}")
# Often initializing workflows cause a problem
if any('Initializing' in sublist for sublist in wf_list):
warn('Initializing workflows may have prevented removal.\n')
print(f"Try removing {dir_to_check} manually, to complete reset.")


@app.command()
def reset(archive: bool = typer.Option(False, '--archive', '-a',
help='Archive bee_workdir before removal')):
"""Delete the bee_workdir directory."""
# Check to see if the user is absolutely sure. Warning Message.
"""Stop all components and delete the bee_workdir directory."""
# Check workflow states; warn if there are active states.
workflow_list = bee_client.get_wf_list()
active_states = {'Running', 'Paused', 'Initializing', 'Waiting'}
if {item for row in workflow_list for item in row}.intersection(active_states):
caution = """
**************************************************************
Caution: There are active workflows! They will be removed!
Try 'beeflow list' to view them.
**************************************************************
"""
absolutely_sure = ""
dir_to_delete = os.path.expanduser(wf_utils.get_bee_workdir())
warn(f"\n A reset will remove this directory: {dir_to_delete}\n")
if archive:
print(" Archive flag is set: logs, workflows and containers will be backed up.")
print("""
A reset will:
Shutdown beeflow and all BEE components.
Delete the bee_workdir directory which results in:
Removing the archive of all workflows.
Removing the archive of workflow containers
(unless container_archive is configured elsewhere).
Reset all databases associated with the beeflow app.
Removing all beeflow logs.
Beeflow configuration files from bee_cfg will not be deleted.
""")
warn(f"{caution}\nAre you sure you want to reset?")
while absolutely_sure != "y" or absolutely_sure != "n":
# Get the user's bee_workdir directory
directory_to_delete = os.path.expanduser(wf_utils.get_bee_workdir())
print(f"A reset will remove this directory: {directory_to_delete}")

absolutely_sure = input(
"""
Are you sure you want to reset?
Please ensure all workflows are complete before running a reset
Check the status of workflows by running 'beeflow list'
A reset will shutdown beeflow and its components.
A reset will delete the bee_workdir directory which results in:
Removing the archive of workflows executed.
Removing the archive of workflow containers.
Reset all databases associated with the beeflow app.
Removing all beeflow logs.
Beeflow configuration files from bee_cfg will remain.
Respond with yes(y)/no(n): """)
absolutely_sure = input("Respond with yes(y)/no(n): ")
if absolutely_sure in ("n", "no"):
# Exit out if the user didn't really mean to do a reset
sys.exit()
if absolutely_sure in ("y", "yes"):
elif absolutely_sure in ("y", "yes"):
# First stop all active workflow processes
workflow_list = bee_client.get_wf_list()
kill_active_workflows(active_states, workflow_list)
# Stop all of the beeflow processes
resp = cli_connection.send(paths.beeflow_socket(), {'type': 'quit'})
if resp is not None:
print("Beeflow has been shutdown.")
print("Waiting for components to cleanly stop.")
# This wait is essential. It takes a minute to shut down.
time.sleep(5)

if os.path.exists(directory_to_delete):
# Save the bee_workdir directory if the archive option was set
if archive:
if os.path.exists(directory_to_delete + "/logs"):
shutil.copytree(directory_to_delete + "/logs",
directory_to_delete + ".backup/logs")
if os.path.exists(directory_to_delete + "/container_archive"):
shutil.copytree(directory_to_delete + "/container_archive",
directory_to_delete + ".backup/container_archive")
if os.path.exists(directory_to_delete + "/archives"):
shutil.copytree(directory_to_delete + "/archives",
directory_to_delete + ".backup/archives")
if os.path.exists(directory_to_delete + "/workflows"):
shutil.copytree(directory_to_delete + "/workflows",
directory_to_delete + ".backup/workflows")
print("Archive flag enabled,")
print("Existing logs, containers, and workflows backed up in:")
print(f"{directory_to_delete}.backup")
shutil.rmtree(directory_to_delete)
print(f"{directory_to_delete} has been removed.")
sys.exit()
stop("quiet")
print("Beeflow is shutting down.")
print("Waiting for components to cleanly stop.")
# This wait is essential. It takes a minute to shut down.
time.sleep(5)

# Save the bee_workdir directory if the archive option was set
if archive:
archive_dir(dir_to_delete)
try:
shutil.rmtree(dir_to_delete)
except OSError as err:
handle_rm_error(err, dir_to_delete, workflow_list)
else:
print(f"{directory_to_delete} does not exist. Exiting.")
sys.exit()
print(f"{dir_to_delete} has been removed.")
sys.exit()
print("Please respond with either the letter (y) or (n).")


Expand Down

0 comments on commit c42435f

Please sign in to comment.