Skip to content

Commit

Permalink
Merge pull request #710 from lanl/issue-706/combine-beeflow-beeclient
Browse files Browse the repository at this point in the history
Combine beeflow, beeclient, and beecfg
  • Loading branch information
pagrubel authored Aug 28, 2023
2 parents b63fcab + a05172c commit 78cb514
Show file tree
Hide file tree
Showing 20 changed files with 173 additions and 230 deletions.
13 changes: 7 additions & 6 deletions beeflow/client/bee_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,13 @@
import requests
import typer

from beeflow.common.config_driver import BeeConfig as bc
from beeflow.common import config_driver
from beeflow.common.cli import NaturalOrderGroup
from beeflow.common.connection import Connection
from beeflow.common import paths
from beeflow.common.parser import CwlParser
from beeflow.common.wf_data import generate_workflow_id

from beeflow.client import core

# Length of a shortened workflow ID
short_id_len = 6 #noqa: Not a constant
Expand Down Expand Up @@ -167,6 +167,8 @@ def match_short_id(wf_id):


app = typer.Typer(no_args_is_help=True, add_completion=False, cls=NaturalOrderGroup)
app.add_typer(core.app, name='core')
app.add_typer(config_driver.app, name='config')


@app.command()
Expand Down Expand Up @@ -351,7 +353,7 @@ def unpackage(package_path, dest_path):

if not package_str.endswith('.tgz'):
# No cleanup, maybe we should rm dest_path?
error_exit("Invalid package name, please use the beeclient package command")
error_exit("Invalid package name, please use the beeflow package command")
wf_dir = package_str[:-4]

return_code = subprocess.run(['tar', '-C', dest_path, '-xf', package_path],
Expand All @@ -366,8 +368,8 @@ def unpackage(package_path, dest_path):
return dest_path/wf_dir # noqa: Not an arithmetic operation


@app.command()
def listall():
@app.command('list')
def list_workflows():
"""List all worklfows."""
try:
conn = _wfm_conn()
Expand Down Expand Up @@ -538,7 +540,6 @@ def main():
"""Execute bee_client."""
global _INTERACTIVE
_INTERACTIVE = True
bc.init()
app()


Expand Down
166 changes: 78 additions & 88 deletions beeflow/cli.py → beeflow/client/core.py
100755 → 100644
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,6 @@
from beeflow.common import paths


bc.init()
# Max number of times a component can be restarted
MAX_RESTARTS = bc.get('DEFAULT', 'max_restarts')


class ComponentManager:
"""Component manager class."""

Expand Down Expand Up @@ -96,6 +91,8 @@ def run(self, base_components):

def poll(self):
"""Poll each process to check for errors, restart failed processes."""
# Max number of times a component can be restarted
max_restarts = bc.get('DEFAULT', 'max_restarts')
for name in self.procs: # noqa no need to iterate with items() since self.procs may be set
component = self.components[name]
if component['failed']:
Expand All @@ -104,8 +101,8 @@ def poll(self):
if returncode is not None:
log = paths.log_fname(name)
print(f'Component "{name}" failed, check log "{log}"')
if component['restart_count'] >= MAX_RESTARTS:
print(f'Component "{name}" has been restarted {MAX_RESTARTS} '
if component['restart_count'] >= max_restarts:
print(f'Component "{name}" has been restarted {max_restarts} '
'times, not restarting again')
component['failed'] = True
else:
Expand All @@ -128,9 +125,6 @@ def kill(self):
proc.terminate()


MGR = ComponentManager()


def warn(*pargs):
"""Print a red warning message."""
typer.secho(' '.join(pargs), fg=typer.colors.RED, file=sys.stderr)
Expand All @@ -149,66 +143,64 @@ def open_log(component):
return open(log, 'a', encoding='utf-8')


# Slurmrestd will be started only if we're running with Slurm and
# slurm::use_commands is not True
NEED_SLURMRESTD = (bc.get('DEFAULT', 'workload_scheduler') == 'Slurm'
and not bc.get('slurm', 'use_commands'))


@MGR.component('wf_manager', ('scheduler',))
def start_wfm():
"""Start the WFM."""
fp = open_log('wf_manager')
return launch_with_gunicorn('beeflow.wf_manager.wf_manager:create_app()',
paths.wfm_socket(), stdout=fp, stderr=fp)


TM_DEPS = []
if NEED_SLURMRESTD:
TM_DEPS.append('slurmrestd')


@MGR.component('task_manager', TM_DEPS)
def start_task_manager():
"""Start the TM."""
fp = open_log('task_manager')
return launch_with_gunicorn('beeflow.task_manager:flask_app', paths.tm_socket(),
stdout=fp, stderr=fp)


@MGR.component('scheduler', ())
def start_scheduler():
"""Start the scheduler."""
fp = open_log('scheduler')
# Using a function here because of the funny way that the scheduler's written
return launch_with_gunicorn('beeflow.scheduler.scheduler:create_app()',
paths.sched_socket(), stdout=fp, stderr=fp)


# Workflow manager and task manager need to be opened with PIPE for their stdout/stderr
if NEED_SLURMRESTD:
@MGR.component('slurmrestd')
def start_slurm_restd():
"""Start BEESlurmRestD. Returns a Popen process object."""
bee_workdir = bc.get('DEFAULT', 'bee_workdir')
slurmrestd_log = '/'.join([bee_workdir, 'logs', 'restd.log'])
openapi_version = bc.get('slurm', 'openapi_version')
slurm_args = f'-s openapi/{openapi_version}'
slurm_socket = paths.slurm_socket()
subprocess.run(['rm', '-f', slurm_socket], check=True)
# log.info("Attempting to open socket: {}".format(slurm_socket))
fp = open(slurmrestd_log, 'w', encoding='utf-8') # noqa
cmd = ['slurmrestd']
cmd.extend(slurm_args.split())
cmd.append(f'unix:{slurm_socket}')
return subprocess.Popen(cmd, stdout=fp, stderr=fp)


def handle_terminate(signum, stack): # noqa
"""Handle a terminate signal."""
# Kill all subprocesses
MGR.kill()
sys.exit(1)
def need_slurmrestd():
"""Check if slurmrestd is needed."""
return (bc.get('DEFAULT', 'workload_scheduler') == 'Slurm'
and not bc.get('slurm', 'use_commands'))


def init_components():
"""Initialize the components and component manager."""
mgr = ComponentManager()

# Slurmrestd will be started only if we're running with Slurm and
# slurm::use_commands is not True

@mgr.component('wf_manager', ('scheduler',))
def start_wfm():
"""Start the WFM."""
fp = open_log('wf_manager')
return launch_with_gunicorn('beeflow.wf_manager.wf_manager:create_app()',
paths.wfm_socket(), stdout=fp, stderr=fp)

tm_deps = []
if need_slurmrestd():
tm_deps.append('slurmrestd')

@mgr.component('task_manager', tm_deps)
def start_task_manager():
"""Start the TM."""
fp = open_log('task_manager')
return launch_with_gunicorn('beeflow.task_manager:flask_app', paths.tm_socket(),
stdout=fp, stderr=fp)

@mgr.component('scheduler', ())
def start_scheduler():
"""Start the scheduler."""
fp = open_log('scheduler')
# Using a function here because of the funny way that the scheduler's written
return launch_with_gunicorn('beeflow.scheduler.scheduler:create_app()',
paths.sched_socket(), stdout=fp, stderr=fp)

# Workflow manager and task manager need to be opened with PIPE for their stdout/stderr
if need_slurmrestd():
@mgr.component('slurmrestd')
def start_slurm_restd():
"""Start BEESlurmRestD. Returns a Popen process object."""
bee_workdir = bc.get('DEFAULT', 'bee_workdir')
slurmrestd_log = '/'.join([bee_workdir, 'logs', 'restd.log'])
openapi_version = bc.get('slurm', 'openapi_version')
slurm_args = f'-s openapi/{openapi_version}'
slurm_socket = paths.slurm_socket()
subprocess.run(['rm', '-f', slurm_socket], check=True)
# log.info("Attempting to open socket: {}".format(slurm_socket))
fp = open(slurmrestd_log, 'w', encoding='utf-8') # noqa
cmd = ['slurmrestd']
cmd.extend(slurm_args.split())
cmd.append(f'unix:{slurm_socket}')
return subprocess.Popen(cmd, stdout=fp, stderr=fp)

return mgr


MIN_CHARLIECLOUD_VERSION = (0, 32)
Expand Down Expand Up @@ -291,8 +283,14 @@ def handle_client(self, server):
print(f'connection failed: {err}')


def daemonize(base_components):
def daemonize(mgr, base_components):
"""Start beeflow as a daemon, monitoring all processes."""
def handle_terminate(signum, stack): # noqa
"""Handle a terminate signal."""
# Kill all subprocesses
mgr.kill()
sys.exit(1)

# Now set signal handling, the log and finally daemonize
signal_map = {
signal.SIGINT: handle_terminate,
Expand All @@ -301,7 +299,7 @@ def daemonize(base_components):
fp = open_log('beeflow')
with daemon.DaemonContext(signal_map=signal_map, stdout=fp, stderr=fp, stdin=fp,
umask=0o002):
Beeflow(MGR, base_components).loop()
Beeflow(mgr, base_components).loop()


app = typer.Typer(no_args_is_help=True)
Expand All @@ -310,11 +308,12 @@ def daemonize(base_components):
@app.command()
def start(foreground: bool = typer.Option(False, '--foreground', '-F',
help='run in the foreground')):
"""Attempt to daemonize if not in debug and start all BEE components."""
"""Start all BEE components."""
mgr = init_components()
beeflow_log = paths.log_fname('beeflow')
check_dependencies()
sock_path = paths.beeflow_socket()
if bc.get('DEFAULT', 'workload_scheduler') == 'Slurm' and not NEED_SLURMRESTD:
if bc.get('DEFAULT', 'workload_scheduler') == 'Slurm' and not need_slurmrestd():
warn('Not using slurmrestd. Command-line interface will be used.')
# Note: there is a possible race condition here, however unlikely
if os.path.exists(sock_path):
Expand All @@ -335,18 +334,18 @@ def start(foreground: bool = typer.Option(False, '--foreground', '-F',
sys.exit(1)
print('Starting beeflow...')
if not foreground:
print(f'Check "{beeflow_log}" or run `beeflow status` for more information.')
print(f'Check "{beeflow_log}" or run `beeflow core status` for more information.')
# Create the log path if it doesn't exist yet
path = paths.log_path()
os.makedirs(path, exist_ok=True)
base_components = ['wf_manager', 'task_manager', 'scheduler']
if foreground:
try:
Beeflow(MGR, base_components).loop()
Beeflow(mgr, base_components).loop()
except KeyboardInterrupt:
MGR.kill()
mgr.kill()
else:
daemonize(base_components)
daemonize(mgr, base_components)


@app.command()
Expand All @@ -367,7 +366,7 @@ def status():
def stop():
"""Stop the current running beeflow daemon."""
stop_msg = ("\n** Please ensure all workflows are complete before stopping beeflow. **"
+ "\n** Check the status of workflows by running 'beeclient listall'. **"
+ "\n** Check the status of workflows by running 'beeflow list'. **"
+ "\nAre you sure you want to kill beeflow components? [y/n] ")
ans = input(stop_msg)
if ans.lower() != 'y':
Expand Down Expand Up @@ -400,12 +399,3 @@ def version_callback(version: bool = False):
if version:
version = importlib.metadata.version("hpc-beeflow")
print(version)


def main():
"""Start the beeflow app."""
app()


if __name__ == '__main__':
main()
7 changes: 0 additions & 7 deletions beeflow/common/build/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,6 @@ Each step in a workflow may include a reference to `DockerRequirement` in the CW
A few examples to use for testing:
## CharliecloudBuildDriver Examples

### Initialize BeeConfig for all the examples:

```
from beeflow.common.config_driver import BeeConfig as bc
bc.init()
```

### dockerPull
```
from beeflow.common.build.container_drivers import CharliecloudBuildDriver
Expand Down
Loading

0 comments on commit 78cb514

Please sign in to comment.