Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix reset error: addresses issue #757 #793

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 22 additions & 33 deletions beeflow/client/bee_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,15 +106,25 @@ def _resource(tag=""):
return _url() + str(tag)


def get_wf_list():
"""Get the list of all workflows."""
try:
conn = _wfm_conn()
resp = conn.get(_url(), timeout=60)
except requests.exceptions.ConnectionError:
error_exit('Could not reach WF Manager.')

if resp.status_code != requests.codes.okay: # pylint: disable=no-member
error_exit('WF Manager did not return workflow list')

logging.info('List Jobs: {resp.text}')
return jsonpickle.decode(resp.json()['workflow_list'])


def check_short_id_collision():
"""Check short workflow IDs for colliions; increase short ID length if detected."""
global short_id_len #noqa: Not a constant
conn = _wfm_conn()
resp = conn.get(_url(), timeout=60)
if resp.status_code != requests.codes.okay: # pylint: disable=no-member
error_exit(f"Checking for ID collision failed: {resp.status_code}")

workflow_list = jsonpickle.decode(resp.json()['workflow_list'])
workflow_list = get_wf_list()
if workflow_list:
while short_id_len < MAX_ID_LEN:
id_list = [_short_id(job[1]) for job in workflow_list]
Expand All @@ -133,18 +143,7 @@ def check_short_id_collision():
def match_short_id(wf_id):
"""Match user-provided short workflow ID to full workflow IDs."""
matched_ids = []

try:
conn = _wfm_conn()
resp = conn.get(_url(), timeout=60)
except requests.exceptions.ConnectionError:
error_exit('Could not reach WF Manager.')

if resp.status_code != requests.codes.okay: # pylint: disable=no-member
error_exit(f'Could not match ID: {wf_id}. Code {resp.status_code}')
# raise ApiError("GET /jobs".format(resp.status_code))

workflow_list = jsonpickle.decode(resp.json()['workflow_list'])
workflow_list = get_wf_list()
if workflow_list:
for job in workflow_list:
if job[1].startswith(wf_id):
Expand Down Expand Up @@ -370,12 +369,12 @@ def package(wf_path: pathlib.Path = typer.Argument(...,

@app.command()
def remove(wf_id: str = typer.Argument(..., callback=match_short_id)):
"""Remove a cancelled or archived workflow with a workflow ID."""
"""Remove cancelled, paused, or archived workflow with a workflow ID."""
long_wf_id = wf_id

wf_status = get_wf_status(wf_id)
print(f"Workflow Status is {wf_status}")
if wf_status in ('Cancelled', 'Archived'):
if wf_status in ('Cancelled', 'Archived', 'Paused'):
verify = f"All stored information for workflow {_short_id(wf_id)} will be removed."
verify += "\nContinue to remove? yes(y)/no(n): """
response = input(verify)
Expand Down Expand Up @@ -421,17 +420,7 @@ def unpackage(package_path, dest_path):
@app.command('list')
def list_workflows():
"""List all workflows."""
try:
conn = _wfm_conn()
resp = conn.get(_url(), timeout=60)
except requests.exceptions.ConnectionError:
error_exit('Could not reach WF Manager.')

if resp.status_code != requests.codes.okay: # pylint: disable=no-member
error_exit('WF Manager did not return workflow list')

logging.info('List Jobs: {resp.text}')
workflow_list = jsonpickle.decode(resp.json()['workflow_list'])
workflow_list = get_wf_list()
if workflow_list:
typer.secho("Name\tID\tStatus", fg=typer.colors.GREEN)

Expand Down Expand Up @@ -521,10 +510,10 @@ def resume(wf_id: str = typer.Argument(..., callback=match_short_id)):

@app.command()
def cancel(wf_id: str = typer.Argument(..., callback=match_short_id)):
"""Cancel a workflow."""
"""Cancel a paused or running workflow."""
long_wf_id = wf_id
wf_status = get_wf_status(wf_id)
if wf_status == "Running":
if wf_status in ('Running', 'Paused'):
try:
conn = _wfm_conn()
resp = conn.delete(_resource(long_wf_id), json={'option': 'cancel'}, timeout=60)
Expand Down
171 changes: 110 additions & 61 deletions beeflow/client/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,24 @@
import socket
import sys
import shutil
import datetime
import time

import daemon
import typer

from beeflow.client import bee_client
from beeflow.common.config_driver import BeeConfig as bc
from beeflow.common import cli_connection
from beeflow.common import paths
from beeflow.wf_manager.resources import wf_utils

from beeflow.common.db import wfm_db
from beeflow.common.db.bdb import connect_db
from beeflow.wf_manager.common import dep_manager

db_path = wf_utils.get_db_path()


class ComponentManager:
"""Component manager class."""
Expand Down Expand Up @@ -431,12 +439,15 @@ def status():


@app.command()
def stop():
def stop(query='yes'):
"""Stop the current running beeflow daemon."""
stop_msg = ("\n** Please ensure all workflows are complete before stopping beeflow. **"
+ "\n** Check the status of workflows by running 'beeflow list'. **"
+ "\nAre you sure you want to kill beeflow components? [y/n] ")
ans = input(stop_msg)
if query == 'yes':
stop_msg = ("\n** Please ensure all workflows are complete before stopping beeflow. **"
+ "\n** Check the status of workflows by running 'beeflow list'. **"
+ "\nAre you sure you want to kill beeflow components? [y/n] ")
ans = input(stop_msg)
else:
ans = 'y'
if ans.lower() != 'y':
return
resp = cli_connection.send(paths.beeflow_socket(), {'type': 'quit'})
Expand All @@ -448,74 +459,112 @@ def stop():
sys.exit(1)
# As long as it returned something, we should be good
beeflow_log = paths.log_fname('beeflow')
print(f'Beeflow has stopped. Check the log at "{beeflow_log}".')
if query == "no":
print(f'Beeflow has stopped. Check the log at "{beeflow_log}".')


def kill_active_workflows(active_states, workflow_list):
"""Kill workflows with active states."""
db = connect_db(wfm_db, db_path)
success = True
for name, wf_id, state in workflow_list:
if state in active_states:
pid = db.workflows.get_gdb_pid(wf_id)
if pid > 0:
dep_manager.kill_gdb(pid)
else:
# Failure most likely caused by an Initializing workflow.
print(f"No process for {name}, {wf_id}, {state}.")
success = False
return success


def archive_dir(dir_to_archive):
"""Archive directories for archive flag in reset."""
archive_dirs = ['logs', 'container_archive', 'archives', 'workflows']
date_str = f"{datetime.datetime.now().strftime('%Y-%m-%d_%H%M%S')}"
backup_dir = f"{dir_to_archive}.{date_str}"
for a_dir in archive_dirs:
try:
shutil.copytree(f"{dir_to_archive}/{a_dir}",
f"{backup_dir}/{a_dir}")
except FileNotFoundError:
pass
print("Archive flag enabled.",
"Existing logs, containers, and workflows backed up in:\n"
f"{backup_dir}")


def handle_rm_error(err, dir_to_check, wf_list):
"""Handle IO error caused by either initializing workflows or nfs files."""
# Check if only nfs mounts are causing the problem and ignore
dir_list = os.listdir(dir_to_check)
nfs_list = [x for x in dir_list if x.startswith('.nfs')]
if dir_list and (dir_list != nfs_list):
print(f"Unable to remove {dir_to_check} \n {err.strerror}")
# Often initializing workflows cause a problem
if any('Initializing' in sublist for sublist in wf_list):
warn('Initializing workflows may have prevented removal.\n')
print(f"Try removing {dir_to_check} manually, to complete reset.")


@app.command()
def reset(archive: bool = typer.Option(False, '--archive', '-a',
help='Archive bee_workdir before removal')):
"""Delete the bee_workdir directory."""
# Check to see if the user is absolutely sure. Warning Message.
"""Stop all components and delete the bee_workdir directory."""
# Check workflow states; warn if there are active states.
workflow_list = bee_client.get_wf_list()
active_states = {'Running', 'Paused', 'Initializing', 'Waiting'}
if {item for row in workflow_list for item in row}.intersection(active_states):
caution = """
**************************************************************
Caution: There are active workflows! They will be removed!
Try 'beeflow list' to view them.
**************************************************************
"""
absolutely_sure = ""
dir_to_delete = os.path.expanduser(wf_utils.get_bee_workdir())
warn(f"\n A reset will remove this directory: {dir_to_delete}\n")
if archive:
print(" Archive flag is set: logs, workflows and containers will be backed up.")
print("""
A reset will:
Shutdown beeflow and all BEE components.
Delete the bee_workdir directory which results in:
Removing the archive of all workflows.
Removing the archive of workflow containers
(unless container_archive is configured elsewhere).
Reset all databases associated with the beeflow app.
Removing all beeflow logs.
Beeflow configuration files from bee_cfg will not be deleted.
""")
warn(f"{caution}\nAre you sure you want to reset?")
while absolutely_sure != "y" or absolutely_sure != "n":
# Get the user's bee_workdir directory
directory_to_delete = os.path.expanduser(wf_utils.get_bee_workdir())
print(f"A reset will remove this directory: {directory_to_delete}")

absolutely_sure = input(
"""
Are you sure you want to reset?

Please ensure all workflows are complete before running a reset
Check the status of workflows by running 'beeflow list'

A reset will shutdown beeflow and its components.

A reset will delete the bee_workdir directory which results in:
Removing the archive of workflows executed.
Removing the archive of workflow containers.
Reset all databases associated with the beeflow app.
Removing all beeflow logs.

Beeflow configuration files from bee_cfg will remain.

Respond with yes(y)/no(n): """)
absolutely_sure = input("Respond with yes(y)/no(n): ")
if absolutely_sure in ("n", "no"):
# Exit out if the user didn't really mean to do a reset
sys.exit()
if absolutely_sure in ("y", "yes"):
elif absolutely_sure in ("y", "yes"):
# First stop all active workflow processes
workflow_list = bee_client.get_wf_list()
kill_active_workflows(active_states, workflow_list)
# Stop all of the beeflow processes
resp = cli_connection.send(paths.beeflow_socket(), {'type': 'quit'})
if resp is not None:
print("Beeflow has been shutdown.")
print("Waiting for components to cleanly stop.")
# This wait is essential. It takes a minute to shut down.
time.sleep(5)

if os.path.exists(directory_to_delete):
# Save the bee_workdir directory if the archive option was set
if archive:
if os.path.exists(directory_to_delete + "/logs"):
shutil.copytree(directory_to_delete + "/logs",
directory_to_delete + ".backup/logs")
if os.path.exists(directory_to_delete + "/container_archive"):
shutil.copytree(directory_to_delete + "/container_archive",
directory_to_delete + ".backup/container_archive")
if os.path.exists(directory_to_delete + "/archives"):
shutil.copytree(directory_to_delete + "/archives",
directory_to_delete + ".backup/archives")
if os.path.exists(directory_to_delete + "/workflows"):
shutil.copytree(directory_to_delete + "/workflows",
directory_to_delete + ".backup/workflows")
print("Archive flag enabled,")
print("Existing logs, containers, and workflows backed up in:")
print(f"{directory_to_delete}.backup")
shutil.rmtree(directory_to_delete)
print(f"{directory_to_delete} has been removed.")
sys.exit()
stop("quiet")
print("Beeflow is shutting down.")
print("Waiting for components to cleanly stop.")
# This wait is essential. It takes a minute to shut down.
time.sleep(5)

# Save the bee_workdir directory if the archive option was set
if archive:
archive_dir(dir_to_delete)
try:
shutil.rmtree(dir_to_delete)
except OSError as err:
handle_rm_error(err, dir_to_delete, workflow_list)
else:
print(f"{directory_to_delete} does not exist. Exiting.")
sys.exit()
print(f"{dir_to_delete} has been removed.")
sys.exit()
print("Please respond with either the letter (y) or (n).")


Expand Down
Loading