From fcb56648ae779fded2fb8a707c4509776ef3be8e Mon Sep 17 00:00:00 2001 From: Jake Tronge Date: Wed, 20 Dec 2023 11:39:35 -0700 Subject: [PATCH 1/7] Add long running checkpoint-restart test This adds a long running checkpoint-restart workflow to the integration tests that should fail after a maximum number of 'num_tries'. Also updates the other integration tests to check the final workflow status. --- .../checkpoint-too-long/Dockerfile | 7 ++++ .../checkpoint-too-long/input.yml | 2 + .../checkpoint-too-long/step0.cwl | 13 +++++++ .../checkpoint-too-long/workflow.cwl | 32 ++++++++++++++++ ci/integration_test.py | 37 +++++++++++++++++-- 5 files changed, 88 insertions(+), 3 deletions(-) create mode 100644 beeflow/data/cwl/bee_workflows/checkpoint-too-long/Dockerfile create mode 100644 beeflow/data/cwl/bee_workflows/checkpoint-too-long/input.yml create mode 100644 beeflow/data/cwl/bee_workflows/checkpoint-too-long/step0.cwl create mode 100644 beeflow/data/cwl/bee_workflows/checkpoint-too-long/workflow.cwl diff --git a/beeflow/data/cwl/bee_workflows/checkpoint-too-long/Dockerfile b/beeflow/data/cwl/bee_workflows/checkpoint-too-long/Dockerfile new file mode 100644 index 000000000..ea9e89fbb --- /dev/null +++ b/beeflow/data/cwl/bee_workflows/checkpoint-too-long/Dockerfile @@ -0,0 +1,7 @@ +# Dummy container with program to run forever +FROM alpine + +RUN printf "#!/bin/sh\n" > /usr/bin/checkpoint-program \ + && printf "touch backup0.crx\n" >> /usr/bin/checkpoint-program \ + && printf "sleep 10000000000\n" >> /usr/bin/checkpoint-program \ + && chmod 755 /usr/bin/checkpoint-program diff --git a/beeflow/data/cwl/bee_workflows/checkpoint-too-long/input.yml b/beeflow/data/cwl/bee_workflows/checkpoint-too-long/input.yml new file mode 100644 index 000000000..4a1c105e0 --- /dev/null +++ b/beeflow/data/cwl/bee_workflows/checkpoint-too-long/input.yml @@ -0,0 +1,2 @@ +# Dummy input +fake_input: 64 diff --git a/beeflow/data/cwl/bee_workflows/checkpoint-too-long/step0.cwl b/beeflow/data/cwl/bee_workflows/checkpoint-too-long/step0.cwl new file mode 100644 index 000000000..e0ae84547 --- /dev/null +++ b/beeflow/data/cwl/bee_workflows/checkpoint-too-long/step0.cwl @@ -0,0 +1,13 @@ +class: CommandLineTool +cwlVersion: v1.0 + +baseCommand: /usr/bin/checkpoint-program +stdout: checkpoint_stdout.txt +inputs: + fake_input: + type: int? + inputBinding: + prefix: -f +outputs: + step0_output: + type: stdout diff --git a/beeflow/data/cwl/bee_workflows/checkpoint-too-long/workflow.cwl b/beeflow/data/cwl/bee_workflows/checkpoint-too-long/workflow.cwl new file mode 100644 index 000000000..0fa9086e6 --- /dev/null +++ b/beeflow/data/cwl/bee_workflows/checkpoint-too-long/workflow.cwl @@ -0,0 +1,32 @@ +# Test workflow that will run forever, causing a failure when it is restarted +# more than 'num_tries'. +class: Workflow +cwlVersion: v1.0 + +inputs: + # Dummy input for the first step + fake_input: int +outputs: + step0_stdout: + type: File + outputSource: step0/step0_stdout + +steps: + step0: + run: step0.cwl + in: + fake_input: fake_input + out: [step0_output] + hints: + beeflow:CheckpointRequirement: + enabled: true + file_path: checkpoint_output + container_path: checkpoint_output + file_regex: backup[0-9]*.crx + restart_parameters: -R + num_tries: 1 + beeflow:SchedulerRequirement: + timeLimit: 00:00:10 + DockerRequirement: + dockerFile: "Dockerfile" + beeflow:containerName: "checkpoint-failure" diff --git a/ci/integration_test.py b/ci/integration_test.py index 99a37f130..89774b52b 100755 --- a/ci/integration_test.py +++ b/ci/integration_test.py @@ -86,11 +86,12 @@ def run(self): except bee_client.ClientError as error: raise CIError(*error.args) from error + @property def running(self): """Check if the workflow is running or about to run.""" - print(bee_client.query(self.wf_id)) return bee_client.query(self.wf_id)[0] in ('Initializing', 'Waiting', 'Running', 'Pending') + @property def status(self): """Get the status of the workflow.""" return bee_client.query(self.wf_id)[0] @@ -133,7 +134,7 @@ def _run_workflows(self, workflows): # Now run until all workflows are complete or until self.timeout is hit t = 0 while t < self.timeout: - if all(not wfl.running() for wfl in workflows): + if all(not wfl.running for wfl in workflows): # All workflows have completed break time.sleep(2) @@ -176,7 +177,11 @@ def run(self, test_names=None): print('PASSED') print('------') # Only remove the outer_workdir if it passed - shutil.rmtree(outer_workdir) + try: + shutil.rmtree(outer_workdir) + except OSError as err: + print(f'WARNING: Failed to remove {outer_workdir}') + print(err) print('######## WORKFLOW RESULTS ########') fails = sum(1 if result is not None else 0 for name, result in results.items()) @@ -230,6 +235,11 @@ def check_path_exists(path): ci_assert(os.path.exists(path), f'expected file "{path}" does not exist') +def check_completed(workflow): + """Ensure the workflow has a completed status.""" + ci_assert(workflow.status == 'Archived', f'Bad workflow status {workflow.status}') + + # # Inline workflow generation code # @@ -407,6 +417,7 @@ def copy_container(outer_workdir): main_cwl=main_cwl, job_file=job_file, workdir=workdir, containers=[container]) yield [workflow] + check_completed(workflow) # Ensure the output file was created path = os.path.join(workdir, main_input) check_path_exists(path) @@ -443,6 +454,7 @@ def use_container(outer_workdir): main_cwl=main_cwl, job_file=job_file, workdir=container_workdir, containers=[container]) yield [workflow] + check_completed(workflow) path = os.path.join(container_workdir, main_input) check_path_exists(path) # This container should not have been copied into the container archive @@ -481,6 +493,7 @@ def docker_file(outer_workdir): main_cwl=main_cwl, job_file=job_file, workdir=workdir, containers=[]) yield [workflow] + check_completed(workflow) path = os.path.join(workdir, main_input) check_path_exists(path) # The container should have been copied into the archive @@ -516,6 +529,7 @@ def docker_pull(outer_workdir): main_cwl=main_cwl, job_file=job_file, workdir=workdir, containers=[]) yield [workflow] + check_completed(workflow) path = os.path.join(workdir, main_input) check_path_exists(path) # Check that the image tarball is in the archive @@ -551,6 +565,8 @@ def multiple_workflows(outer_workdir): }) workflows.append(workflow) yield workflows + for workflow in workflows: + check_completed(workflow) for wfl_info in workflow_data: workdir = wfl_info['workdir'] workflow_path = wfl_info['workflow_path'] @@ -569,11 +585,26 @@ def checkpoint_restart(outer_workdir): main_cwl='clamr_wf.cwl', job_file='clamr_job.yml', workdir=workdir, containers=[]) yield [workflow] + check_completed(workflow) # Check for the movie file path = Path(workdir, 'CLAMR_movie.mp4') check_path_exists(path) +@TEST_RUNNER.add(ignore=True) +def checkpoint_restart_failure(outer_workdir): + """Test a checkpoint restart workflow that continues past 'num_retries'.""" + workdir = os.path.join(outer_workdir, uuid.uuid4().hex) + os.makedirs(workdir) + workflow = Workflow('checkpoint-too-long', + 'beeflow/data/cwl/bee_workflows/checkpoint-too-long', + main_cwl='workflow.cwl', job_file='input.yml', + workdir=workdir, containers=[]) + yield [workflow] + ci_assert(workflow.status == 'Failed', + f'Workflow did not fail as expected (final status: {workflow.status})') + + def test_input_callback(arg): """Parse a list of tests separated by commas.""" return arg.split(',') if arg is not None else None From 24b4dd17640a722be55c78054d2823b5518e2188 Mon Sep 17 00:00:00 2001 From: Jake Tronge Date: Wed, 20 Dec 2023 11:52:57 -0700 Subject: [PATCH 2/7] Move integration tests into beeflow package --- beeflow/common/integration_test.py | 632 ++++++++++++++++++++++++++++ ci/integration_test.py | 633 +---------------------------- 2 files changed, 634 insertions(+), 631 deletions(-) create mode 100644 beeflow/common/integration_test.py diff --git a/beeflow/common/integration_test.py b/beeflow/common/integration_test.py new file mode 100644 index 000000000..72e0f0ddc --- /dev/null +++ b/beeflow/common/integration_test.py @@ -0,0 +1,632 @@ +from contextlib import contextmanager +from pathlib import Path +import os +import shutil +import subprocess +import sys +import time +import traceback +import uuid +import yaml +import typer + +from beeflow.common.config_driver import BeeConfig as bc +from beeflow.client import bee_client + + +# Max time of each workflow +TIMEOUT = 90 +INTEGRATION_TEST_DIR = os.path.expanduser('~/.beeflow-integration') + + +class CIError(Exception): + """CI error class.""" + + def __init__(self, *args): + """Initialize the error with default args.""" + self.args = args + + +class Container: + """Container CI class for building BEE containers.""" + + def __init__(self, name, dockerfile, tarball): + """BEE CI container constructor.""" + self.name = name + self.dockerfile = dockerfile + self.tarball = tarball + self.done = False + + def build(self): + """Build the containers and save them in the proper location.""" + if not self.done: + try: + subprocess.check_call(['ch-image', 'build', '-f', self.dockerfile, '-t', self.name, + '--force', 'seccomp', os.path.dirname(self.dockerfile)]) + subprocess.check_call(['ch-convert', '-i', 'ch-image', '-o', 'tar', self.name, + self.tarball]) + except subprocess.CalledProcessError as error: + raise CIError( + f'container build: {error}' + ) from None + self.done = True + + +class Workflow: + """Workflow CI class for interacting with BEE.""" + + def __init__(self, name, path, main_cwl, job_file, workdir, containers): + """Workflow constructor.""" + self.name = name + self.path = path + self.main_cwl = main_cwl + self.job_file = job_file + self.workdir = workdir + self.containers = containers + self.wf_id = None + self.tarball = None + + def run(self): + """Attempt to submit and start the workflow.""" + # Build all the containers first + print('Building all containers') + for ctr in self.containers: + ctr.build() + try: + tarball_dir = Path(self.path).parent + tarball = f'{Path(self.path).name}.tgz' + self.tarball = tarball_dir / tarball + print('Creating the workflow tarball', self.tarball) + bee_client.package(Path(self.path), Path(tarball_dir)) + print('Submitting and starting workflow') + self.wf_id = bee_client.submit(self.name, self.tarball, self.main_cwl, + self.job_file, self.workdir, no_start=False) + except bee_client.ClientError as error: + raise CIError(*error.args) from error + + @property + def running(self): + """Check if the workflow is running or about to run.""" + return bee_client.query(self.wf_id)[0] in ('Initializing', 'Waiting', 'Running', 'Pending') + + @property + def status(self): + """Get the status of the workflow.""" + return bee_client.query(self.wf_id)[0] + + def cleanup(self): + """Clean up any leftover workflow data.""" + # Remove the generated tarball + os.remove(self.tarball) + + +class TestRunner: + """Test runner class.""" + + def __init__(self): + """Build a new test runner class.""" + self.test_cases = [] + self.timeout = TIMEOUT + + def add(self, ignore=False): + """Decorate a test case and add it to the runner instance.""" + + def wrap(test_case): + """Wrap the function.""" + # First turn it into a contextmanager + test_case = contextmanager(test_case) + self.test_cases.append((test_case, ignore)) + return test_case + + return wrap + + def test_details(self): + """Return a list of all the test details (test_name, ignore).""" + return [(test_case.__name__, ignore) for test_case, ignore in self.test_cases] + + def _run_workflows(self, workflows): + """Run a list of workflows to completion.""" + # Start all workflows at once + for wfl in workflows: + wfl.run() + # Now run until all workflows are complete or until self.timeout is hit + t = 0 + while t < self.timeout: + if all(not wfl.running for wfl in workflows): + # All workflows have completed + break + time.sleep(2) + t += 2 + if t >= self.timeout: + raise CIError('workflow timeout') + + def run(self, test_names=None): + """Test run all test cases.""" + results = {} + for test_case, ignore in self.test_cases: + if test_names is not None: + # Skip tests that aren't required to be run + if test_case.__name__ not in test_names: + continue + elif ignore: + # Skip ignored tests + continue + outer_workdir = os.path.join(INTEGRATION_TEST_DIR, uuid.uuid4().hex) + os.makedirs(outer_workdir) + msg0 = f'Starting test {test_case.__name__}' + msg1 = f'(running in "{outer_workdir}")' + count = max(len(msg0), len(msg1)) + print('#' * count) + print(msg0) + print(msg1) + print('-' * count) + try: + with test_case(outer_workdir) as workflows: + self._run_workflows(workflows) + except CIError as error: + traceback.print_exc() + results[test_case.__name__] = error + print('------') + print('FAILED') + print('------') + else: + results[test_case.__name__] = None + print('------') + print('PASSED') + print('------') + # Only remove the outer_workdir if it passed + try: + shutil.rmtree(outer_workdir) + except OSError as err: + print(f'WARNING: Failed to remove {outer_workdir}') + print(err) + + print('######## WORKFLOW RESULTS ########') + fails = sum(1 if result is not None else 0 for name, result in results.items()) + passes = len(results) - fails + print(f'{passes} passes, {fails} fails') + for name, result in results.items(): + error = result + if error is not None: + print(f'{name}: {error}') + print('##################################') + return 1 if fails > 0 else 0 + + +TEST_RUNNER = TestRunner() + + +# +# Helper methods for running the test cases +# + + +def ch_image_delete(img_name): + """Execute a ch-image delete [img_name].""" + try: + subprocess.check_call(['ch-image', 'delete', img_name]) + except subprocess.CalledProcessError: + raise CIError( + f'failed when calling `ch-image delete {img_name}` to clean up' + ) from None + + +def ch_image_list(): + """Execute a ch-image list and return the images.""" + try: + res = subprocess.run(['ch-image', 'list'], stdout=subprocess.PIPE, check=True) + output = res.stdout.decode(encoding='utf-8') + images = [img for img in output.split('\n') if img] + return images + except subprocess.CalledProcessError as err: + raise CIError(f'failed when calling `ch-image list`: {err}') from err + + +def ci_assert(predicate, msg): + """Assert that the predicate is True, or raise a CIError.""" + if not predicate: + raise CIError(msg) + + +def check_path_exists(path): + """Check that the specified path exists.""" + ci_assert(os.path.exists(path), f'expected file "{path}" does not exist') + + +def check_completed(workflow): + """Ensure the workflow has a completed status.""" + ci_assert(workflow.status == 'Archived', f'Bad workflow status {workflow.status}') + + +# +# Inline workflow generation code +# + +def yaml_dump(path, data): + """Dump this data as a yaml file at path.""" + with open(path, 'w', encoding='utf-8') as fp: + yaml.dump(data, fp, Dumper=yaml.CDumper) + + +def generate_builder_workflow(output_path, docker_requirement, main_input): + """Generate a base workflow to be used for testing the builder.""" + os.makedirs(output_path, exist_ok=True) + main_cwl_file = 'workflow.cwl' + main_cwl_data = { + 'cwlVersion': 'v1.0', + 'class': 'Workflow', + 'inputs': { + 'main_input': 'string', + }, + 'outputs': { + 'main_output': { + 'outputSource': 'step0/step_output', + 'type': 'File', + }, + }, + 'steps': { + 'step0': { + 'run': 'step0.cwl', + 'in': { + 'step_input': 'main_input', + }, + 'out': ['step_output'], + 'hints': { + 'DockerRequirement': docker_requirement, + }, + } + }, + } + yaml_dump(os.path.join(output_path, main_cwl_file), main_cwl_data) + + step0_file = 'step0.cwl' + step0_data = { + 'cwlVersion': 'v1.0', + 'class': 'CommandLineTool', + 'baseCommand': 'touch', + 'inputs': { + 'step_input': { + 'type': 'string', + 'inputBinding': { + 'position': 1, + }, + }, + }, + 'outputs': { + 'step_output': { + 'type': 'stdout', + } + }, + 'stdout': 'output.txt', + } + yaml_dump(os.path.join(output_path, step0_file), step0_data) + + job_file = 'job.yml' + job_data = { + 'main_input': main_input, + } + yaml_dump(os.path.join(output_path, job_file), job_data) + + return (main_cwl_file, job_file) + + +def generate_simple_workflow(output_path, fname): + """Generate a simple workflow.""" + os.makedirs(output_path, exist_ok=True) + + task0_cwl = 'task0.cwl' + main_cwl = 'main.cwl' + main_cwl_file = str(Path(output_path, main_cwl)) + main_cwl_data = { + 'cwlVersion': 'v1.0', + 'class': 'Workflow', + 'requirements': {}, + 'inputs': { + 'in_fname': 'string', + }, + 'outputs': { + 'out': { + 'type': 'File', + 'outputSource': 'task0/out', + }, + }, + 'steps': { + 'task0': { + 'run': task0_cwl, + 'in': { + 'fname': 'in_fname', + }, + 'out': ['out'], + }, + }, + } + yaml_dump(main_cwl_file, main_cwl_data) + + task0_cwl_file = str(Path(output_path, task0_cwl)) + task0_data = { + 'cwlVersion': 'v1.0', + 'class': 'CommandLineTool', + 'baseCommand': 'touch', + 'inputs': { + 'fname': { + 'type': 'string', + 'inputBinding': { + 'position': 1, + }, + }, + }, + 'stdout': 'touch.log', + 'outputs': { + 'out': { + 'type': 'stdout', + } + } + } + yaml_dump(task0_cwl_file, task0_data) + + job_yaml = 'job.yaml' + job_file = str(Path(output_path, job_yaml)) + job_data = { + 'in_fname': fname, + } + yaml_dump(job_file, job_data) + return (main_cwl, job_yaml) + + +BASE_CONTAINER = 'alpine' +SIMPLE_DOCKERFILE = """ +# Dummy docker file that really doesn't do much +FROM alpine + +# Install something that has minimal dependencies +RUN apk update && apk add bzip2 +""" +# Dump the Dockerfile to a path that the later code can reference +DOCKER_FILE_PATH = os.path.join('/tmp', f'Dockerfile-{uuid.uuid4().hex}') +with open(DOCKER_FILE_PATH, 'w', encoding='utf-8') as docker_file_fp: + docker_file_fp.write(SIMPLE_DOCKERFILE) + +# +# Workflows setup +# + +# Remove an existing base container +if BASE_CONTAINER in ch_image_list(): + ch_image_delete(BASE_CONTAINER) + + +@TEST_RUNNER.add() +def copy_container(outer_workdir): + """Prepare, check results of using `beeflow:copyContainer` then do cleanup.""" + # `beeflow:copyContainer` workflow + container_path = f'/tmp/copy_container-{uuid.uuid4().hex}.tar.gz' + container_name = 'copy-container' + workdir = os.path.join(outer_workdir, uuid.uuid4().hex) + os.makedirs(workdir) + container = Container(container_name, DOCKER_FILE_PATH, container_path) + workflow_path = os.path.join(outer_workdir, f'bee-cc-workflow-{uuid.uuid4().hex}') + main_input = 'copy_container' + docker_requirement = { + 'beeflow:copyContainer': container_path, + } + main_cwl, job_file = generate_builder_workflow(workflow_path, docker_requirement, + main_input) + workflow = Workflow('copy-container', workflow_path, + main_cwl=main_cwl, job_file=job_file, workdir=workdir, + containers=[container]) + yield [workflow] + check_completed(workflow) + # Ensure the output file was created + path = os.path.join(workdir, main_input) + check_path_exists(path) + os.remove(path) + # Ensure that the container has been copied into the archive + container_archive = bc.get('builder', 'container_archive') + basename = os.path.basename(container_path) + path = os.path.join(container_archive, basename) + check_path_exists(path) + os.remove(path) + ch_image_delete(container_name) + + +@TEST_RUNNER.add() +def use_container(outer_workdir): + """Prepare, check results of using `beeflow:useContainer` and clean up.""" + # `beeflow:useContainer` workflow + container_path = os.path.join(outer_workdir, f'use_container-{uuid.uuid4().hex}.tar.gz') + container_name = 'use-container' + container_workdir = os.path.join(outer_workdir, uuid.uuid4().hex) + os.makedirs(container_workdir) + container = Container(container_name, DOCKER_FILE_PATH, container_path) + workflow_path = os.path.join(outer_workdir, f'bee-use-ctr-workflow-{uuid.uuid4().hex}') + main_input = 'use_ctr' + docker_requirement = { + 'beeflow:useContainer': container_path, + } + main_cwl, job_file = generate_builder_workflow( + workflow_path, + docker_requirement, + main_input, + ) + workflow = Workflow('use-container', workflow_path, + main_cwl=main_cwl, job_file=job_file, + workdir=container_workdir, containers=[container]) + yield [workflow] + check_completed(workflow) + path = os.path.join(container_workdir, main_input) + check_path_exists(path) + # This container should not have been copied into the container archive + container_archive = bc.get('builder', 'container_archive') + basename = os.path.basename(container_path) + path = Path(container_archive, basename) + ci_assert( + not path.exists(), + f'the container "{basename}" was copied into the container archive' + ) + ch_image_delete(container_name) + + +@TEST_RUNNER.add() +def docker_file(outer_workdir): + """Ensure that the `dockerFile` example ran properly and then clean up.""" + # `dockerFile` workflow + workflow_path = os.path.join(outer_workdir, f'bee-df-workflow-{uuid.uuid4().hex}') + # Copy the Dockerfile to the workdir path + os.makedirs(workflow_path) + workdir = os.path.join(outer_workdir, uuid.uuid4().hex) + os.makedirs(workdir) + shutil.copy(DOCKER_FILE_PATH, os.path.join(workflow_path, 'Dockerfile')) + container_name = 'docker_file_test' + docker_requirement = { + 'dockerFile': 'Dockerfile', + 'beeflow:containerName': container_name, + } + main_input = 'docker_file' + main_cwl, job_file = generate_builder_workflow( + workflow_path, + docker_requirement, + main_input, + ) + workflow = Workflow('docker-file', workflow_path, + main_cwl=main_cwl, job_file=job_file, workdir=workdir, + containers=[]) + yield [workflow] + check_completed(workflow) + path = os.path.join(workdir, main_input) + check_path_exists(path) + # The container should have been copied into the archive + container_archive = bc.get('builder', 'container_archive') + tarball = f'{container_name}.tar.gz' + path = os.path.join(container_archive, tarball) + check_path_exists(path) + os.remove(path) + # Check that the container is listed + images = ch_image_list() + ci_assert(container_name in images, f'cannot find expected container "{container_name}"') + ch_image_delete(container_name) + + +@TEST_RUNNER.add() +def docker_pull(outer_workdir): + """Prepare, then check that the `dockerPull` option was successful.""" + # `dockerPull` workflow + workflow_path = os.path.join(outer_workdir, f'bee-dp-workflow-{uuid.uuid4().hex}') + workdir = os.path.join(outer_workdir, uuid.uuid4().hex) + os.makedirs(workdir) + container_name = BASE_CONTAINER + docker_requirement = { + 'dockerPull': container_name, + } + main_input = 'docker_pull' + main_cwl, job_file = generate_builder_workflow( + workflow_path, + docker_requirement, + main_input, + ) + workflow = Workflow('docker-pull', workflow_path, + main_cwl=main_cwl, job_file=job_file, workdir=workdir, + containers=[]) + yield [workflow] + check_completed(workflow) + path = os.path.join(workdir, main_input) + check_path_exists(path) + # Check that the image tarball is in the archive + container_archive = bc.get('builder', 'container_archive') + path = os.path.join(container_archive, f'{container_name}.tar.gz') + check_path_exists(path) + os.remove(path) + # Commenting the below out for now; looks like Charliecloud 0.32 isn't + # showing base containers for some reason? + # Check for the image with `ch-image list` + # images = ch_image_list() + # ci_assert(container_name in images, f'could not find expected container "{container_name}"') + # ch_image_delete(container_name) + + +@TEST_RUNNER.add() +def multiple_workflows(outer_workdir): + """Test running three different workflows at the same time.""" + output_file = 'output_file' + workflow_data = [] + workflows = [] + for i in range(3): + workdir = os.path.join(outer_workdir, uuid.uuid4().hex) + os.makedirs(workdir) + workflow_path = Path(outer_workdir, uuid.uuid4().hex) + main_cwl, job_file = generate_simple_workflow(workflow_path, output_file) + workflow = Workflow(f'multi-workflow-{i}', workflow_path, + main_cwl=main_cwl, job_file=job_file, + workdir=workdir, containers=[]) + workflow_data.append({ + 'workdir': workdir, + 'workflow_path': workflow_path, + }) + workflows.append(workflow) + yield workflows + for workflow in workflows: + check_completed(workflow) + for wfl_info in workflow_data: + workdir = wfl_info['workdir'] + workflow_path = wfl_info['workflow_path'] + # Each test should have touched this file + path = Path(workdir, output_file) + check_path_exists(path) + + +@TEST_RUNNER.add(ignore=True) +def checkpoint_restart(outer_workdir): + """Test the clamr-ffmpeg checkpoint restart workflow.""" + workdir = os.path.join(outer_workdir, uuid.uuid4().hex) + os.makedirs(workdir) + workflow = Workflow('checkpoint-restart', + 'beeflow/data/cwl/bee_workflows/clamr-wf-checkpoint', + main_cwl='clamr_wf.cwl', job_file='clamr_job.yml', + workdir=workdir, containers=[]) + yield [workflow] + check_completed(workflow) + # Check for the movie file + path = Path(workdir, 'CLAMR_movie.mp4') + check_path_exists(path) + + +@TEST_RUNNER.add(ignore=True) +def checkpoint_restart_failure(outer_workdir): + """Test a checkpoint restart workflow that continues past 'num_retries'.""" + workdir = os.path.join(outer_workdir, uuid.uuid4().hex) + os.makedirs(workdir) + workflow = Workflow('checkpoint-too-long', + 'beeflow/data/cwl/bee_workflows/checkpoint-too-long', + main_cwl='workflow.cwl', job_file='input.yml', + workdir=workdir, containers=[]) + yield [workflow] + ci_assert(workflow.status == 'Failed', + f'Workflow did not fail as expected (final status: {workflow.status})') + + +def test_input_callback(arg): + """Parse a list of tests separated by commas.""" + return arg.split(',') if arg is not None else None + + +def main(tests = typer.Option(None, '--tests', '-t', # noqa (conflict on '=' sign) + callback=test_input_callback, + help='tests run as comma-separated string'), + show_tests: bool = typer.Option(False, '--show-tests', '-s', # noqa (conflict on '=' sign) + help='show a list of all tests'), + timeout: int = typer.Option(TIMEOUT, '--timeout', help='workflow timeout in seconds')): + """Launch the integration tests.""" + if show_tests: + print('INTEGRATION TEST CASES:') + for test_name, ignore in TEST_RUNNER.test_details(): + print('*', test_name, '(ignored)' if ignore else '') + return + TEST_RUNNER.timeout = timeout + # Run the workflows and then show completion results + ret = TEST_RUNNER.run(tests) + # ret = test_workflows(WORKFLOWS) + # General clean up + os.remove(DOCKER_FILE_PATH) + sys.exit(ret) +# Ignore W0231: This is a user-defined exception and I don't think we need to call +# __init__ on the base class. +# pylama:ignore=W0231 diff --git a/ci/integration_test.py b/ci/integration_test.py index 89774b52b..6451ecfce 100755 --- a/ci/integration_test.py +++ b/ci/integration_test.py @@ -1,638 +1,9 @@ #!/usr/bin/env python """CI workflow run script.""" -from contextlib import contextmanager -from pathlib import Path -import os -import shutil -import subprocess -import sys -import time -import traceback -import uuid -import yaml import typer -from beeflow.common.config_driver import BeeConfig as bc -from beeflow.client import bee_client - - -# Max time of each workflow -TIMEOUT = 90 -INTEGRATION_TEST_DIR = os.path.expanduser('~/.beeflow-integration') - - -class CIError(Exception): - """CI error class.""" - - def __init__(self, *args): - """Initialize the error with default args.""" - self.args = args - - -class Container: - """Container CI class for building BEE containers.""" - - def __init__(self, name, dockerfile, tarball): - """BEE CI container constructor.""" - self.name = name - self.dockerfile = dockerfile - self.tarball = tarball - self.done = False - - def build(self): - """Build the containers and save them in the proper location.""" - if not self.done: - try: - subprocess.check_call(['ch-image', 'build', '-f', self.dockerfile, '-t', self.name, - '--force', 'seccomp', os.path.dirname(self.dockerfile)]) - subprocess.check_call(['ch-convert', '-i', 'ch-image', '-o', 'tar', self.name, - self.tarball]) - except subprocess.CalledProcessError as error: - raise CIError( - f'container build: {error}' - ) from None - self.done = True - - -class Workflow: - """Workflow CI class for interacting with BEE.""" - - def __init__(self, name, path, main_cwl, job_file, workdir, containers): - """Workflow constructor.""" - self.name = name - self.path = path - self.main_cwl = main_cwl - self.job_file = job_file - self.workdir = workdir - self.containers = containers - self.wf_id = None - self.tarball = None - - def run(self): - """Attempt to submit and start the workflow.""" - # Build all the containers first - print('Building all containers') - for ctr in self.containers: - ctr.build() - try: - tarball_dir = Path(self.path).parent - tarball = f'{Path(self.path).name}.tgz' - self.tarball = tarball_dir / tarball - print('Creating the workflow tarball', self.tarball) - bee_client.package(Path(self.path), Path(tarball_dir)) - print('Submitting and starting workflow') - self.wf_id = bee_client.submit(self.name, self.tarball, self.main_cwl, - self.job_file, self.workdir, no_start=False) - except bee_client.ClientError as error: - raise CIError(*error.args) from error - - @property - def running(self): - """Check if the workflow is running or about to run.""" - return bee_client.query(self.wf_id)[0] in ('Initializing', 'Waiting', 'Running', 'Pending') - - @property - def status(self): - """Get the status of the workflow.""" - return bee_client.query(self.wf_id)[0] - - def cleanup(self): - """Clean up any leftover workflow data.""" - # Remove the generated tarball - os.remove(self.tarball) - - -class TestRunner: - """Test runner class.""" - - def __init__(self): - """Build a new test runner class.""" - self.test_cases = [] - self.timeout = TIMEOUT - - def add(self, ignore=False): - """Decorate a test case and add it to the runner instance.""" - - def wrap(test_case): - """Wrap the function.""" - # First turn it into a contextmanager - test_case = contextmanager(test_case) - self.test_cases.append((test_case, ignore)) - return test_case - - return wrap - - def test_details(self): - """Return a list of all the test details (test_name, ignore).""" - return [(test_case.__name__, ignore) for test_case, ignore in self.test_cases] - - def _run_workflows(self, workflows): - """Run a list of workflows to completion.""" - # Start all workflows at once - for wfl in workflows: - wfl.run() - # Now run until all workflows are complete or until self.timeout is hit - t = 0 - while t < self.timeout: - if all(not wfl.running for wfl in workflows): - # All workflows have completed - break - time.sleep(2) - t += 2 - if t >= self.timeout: - raise CIError('workflow timeout') - - def run(self, test_names=None): - """Test run all test cases.""" - results = {} - for test_case, ignore in self.test_cases: - if test_names is not None: - # Skip tests that aren't required to be run - if test_case.__name__ not in test_names: - continue - elif ignore: - # Skip ignored tests - continue - outer_workdir = os.path.join(INTEGRATION_TEST_DIR, uuid.uuid4().hex) - os.makedirs(outer_workdir) - msg0 = f'Starting test {test_case.__name__}' - msg1 = f'(running in "{outer_workdir}")' - count = max(len(msg0), len(msg1)) - print('#' * count) - print(msg0) - print(msg1) - print('-' * count) - try: - with test_case(outer_workdir) as workflows: - self._run_workflows(workflows) - except CIError as error: - traceback.print_exc() - results[test_case.__name__] = error - print('------') - print('FAILED') - print('------') - else: - results[test_case.__name__] = None - print('------') - print('PASSED') - print('------') - # Only remove the outer_workdir if it passed - try: - shutil.rmtree(outer_workdir) - except OSError as err: - print(f'WARNING: Failed to remove {outer_workdir}') - print(err) - - print('######## WORKFLOW RESULTS ########') - fails = sum(1 if result is not None else 0 for name, result in results.items()) - passes = len(results) - fails - print(f'{passes} passes, {fails} fails') - for name, result in results.items(): - error = result - if error is not None: - print(f'{name}: {error}') - print('##################################') - return 1 if fails > 0 else 0 - - -TEST_RUNNER = TestRunner() - - -# -# Helper methods for running the test cases -# - - -def ch_image_delete(img_name): - """Execute a ch-image delete [img_name].""" - try: - subprocess.check_call(['ch-image', 'delete', img_name]) - except subprocess.CalledProcessError: - raise CIError( - f'failed when calling `ch-image delete {img_name}` to clean up' - ) from None - - -def ch_image_list(): - """Execute a ch-image list and return the images.""" - try: - res = subprocess.run(['ch-image', 'list'], stdout=subprocess.PIPE, check=True) - output = res.stdout.decode(encoding='utf-8') - images = [img for img in output.split('\n') if img] - return images - except subprocess.CalledProcessError as err: - raise CIError(f'failed when calling `ch-image list`: {err}') from err - - -def ci_assert(predicate, msg): - """Assert that the predicate is True, or raise a CIError.""" - if not predicate: - raise CIError(msg) - - -def check_path_exists(path): - """Check that the specified path exists.""" - ci_assert(os.path.exists(path), f'expected file "{path}" does not exist') - - -def check_completed(workflow): - """Ensure the workflow has a completed status.""" - ci_assert(workflow.status == 'Archived', f'Bad workflow status {workflow.status}') - - -# -# Inline workflow generation code -# - -def yaml_dump(path, data): - """Dump this data as a yaml file at path.""" - with open(path, 'w', encoding='utf-8') as fp: - yaml.dump(data, fp, Dumper=yaml.CDumper) - - -def generate_builder_workflow(output_path, docker_requirement, main_input): - """Generate a base workflow to be used for testing the builder.""" - os.makedirs(output_path, exist_ok=True) - main_cwl_file = 'workflow.cwl' - main_cwl_data = { - 'cwlVersion': 'v1.0', - 'class': 'Workflow', - 'inputs': { - 'main_input': 'string', - }, - 'outputs': { - 'main_output': { - 'outputSource': 'step0/step_output', - 'type': 'File', - }, - }, - 'steps': { - 'step0': { - 'run': 'step0.cwl', - 'in': { - 'step_input': 'main_input', - }, - 'out': ['step_output'], - 'hints': { - 'DockerRequirement': docker_requirement, - }, - } - }, - } - yaml_dump(os.path.join(output_path, main_cwl_file), main_cwl_data) - - step0_file = 'step0.cwl' - step0_data = { - 'cwlVersion': 'v1.0', - 'class': 'CommandLineTool', - 'baseCommand': 'touch', - 'inputs': { - 'step_input': { - 'type': 'string', - 'inputBinding': { - 'position': 1, - }, - }, - }, - 'outputs': { - 'step_output': { - 'type': 'stdout', - } - }, - 'stdout': 'output.txt', - } - yaml_dump(os.path.join(output_path, step0_file), step0_data) - - job_file = 'job.yml' - job_data = { - 'main_input': main_input, - } - yaml_dump(os.path.join(output_path, job_file), job_data) - - return (main_cwl_file, job_file) - - -def generate_simple_workflow(output_path, fname): - """Generate a simple workflow.""" - os.makedirs(output_path, exist_ok=True) - - task0_cwl = 'task0.cwl' - main_cwl = 'main.cwl' - main_cwl_file = str(Path(output_path, main_cwl)) - main_cwl_data = { - 'cwlVersion': 'v1.0', - 'class': 'Workflow', - 'requirements': {}, - 'inputs': { - 'in_fname': 'string', - }, - 'outputs': { - 'out': { - 'type': 'File', - 'outputSource': 'task0/out', - }, - }, - 'steps': { - 'task0': { - 'run': task0_cwl, - 'in': { - 'fname': 'in_fname', - }, - 'out': ['out'], - }, - }, - } - yaml_dump(main_cwl_file, main_cwl_data) - - task0_cwl_file = str(Path(output_path, task0_cwl)) - task0_data = { - 'cwlVersion': 'v1.0', - 'class': 'CommandLineTool', - 'baseCommand': 'touch', - 'inputs': { - 'fname': { - 'type': 'string', - 'inputBinding': { - 'position': 1, - }, - }, - }, - 'stdout': 'touch.log', - 'outputs': { - 'out': { - 'type': 'stdout', - } - } - } - yaml_dump(task0_cwl_file, task0_data) - - job_yaml = 'job.yaml' - job_file = str(Path(output_path, job_yaml)) - job_data = { - 'in_fname': fname, - } - yaml_dump(job_file, job_data) - return (main_cwl, job_yaml) - - -BASE_CONTAINER = 'alpine' -SIMPLE_DOCKERFILE = """ -# Dummy docker file that really doesn't do much -FROM alpine - -# Install something that has minimal dependencies -RUN apk update && apk add bzip2 -""" -# Dump the Dockerfile to a path that the later code can reference -DOCKER_FILE_PATH = os.path.join('/tmp', f'Dockerfile-{uuid.uuid4().hex}') -with open(DOCKER_FILE_PATH, 'w', encoding='utf-8') as docker_file_fp: - docker_file_fp.write(SIMPLE_DOCKERFILE) - -# -# Workflows setup -# - -# Remove an existing base container -if BASE_CONTAINER in ch_image_list(): - ch_image_delete(BASE_CONTAINER) - - -@TEST_RUNNER.add() -def copy_container(outer_workdir): - """Prepare, check results of using `beeflow:copyContainer` then do cleanup.""" - # `beeflow:copyContainer` workflow - container_path = f'/tmp/copy_container-{uuid.uuid4().hex}.tar.gz' - container_name = 'copy-container' - workdir = os.path.join(outer_workdir, uuid.uuid4().hex) - os.makedirs(workdir) - container = Container(container_name, DOCKER_FILE_PATH, container_path) - workflow_path = os.path.join(outer_workdir, f'bee-cc-workflow-{uuid.uuid4().hex}') - main_input = 'copy_container' - docker_requirement = { - 'beeflow:copyContainer': container_path, - } - main_cwl, job_file = generate_builder_workflow(workflow_path, docker_requirement, - main_input) - workflow = Workflow('copy-container', workflow_path, - main_cwl=main_cwl, job_file=job_file, workdir=workdir, - containers=[container]) - yield [workflow] - check_completed(workflow) - # Ensure the output file was created - path = os.path.join(workdir, main_input) - check_path_exists(path) - os.remove(path) - # Ensure that the container has been copied into the archive - container_archive = bc.get('builder', 'container_archive') - basename = os.path.basename(container_path) - path = os.path.join(container_archive, basename) - check_path_exists(path) - os.remove(path) - ch_image_delete(container_name) - - -@TEST_RUNNER.add() -def use_container(outer_workdir): - """Prepare, check results of using `beeflow:useContainer` and clean up.""" - # `beeflow:useContainer` workflow - container_path = os.path.join(outer_workdir, f'use_container-{uuid.uuid4().hex}.tar.gz') - container_name = 'use-container' - container_workdir = os.path.join(outer_workdir, uuid.uuid4().hex) - os.makedirs(container_workdir) - container = Container(container_name, DOCKER_FILE_PATH, container_path) - workflow_path = os.path.join(outer_workdir, f'bee-use-ctr-workflow-{uuid.uuid4().hex}') - main_input = 'use_ctr' - docker_requirement = { - 'beeflow:useContainer': container_path, - } - main_cwl, job_file = generate_builder_workflow( - workflow_path, - docker_requirement, - main_input, - ) - workflow = Workflow('use-container', workflow_path, - main_cwl=main_cwl, job_file=job_file, - workdir=container_workdir, containers=[container]) - yield [workflow] - check_completed(workflow) - path = os.path.join(container_workdir, main_input) - check_path_exists(path) - # This container should not have been copied into the container archive - container_archive = bc.get('builder', 'container_archive') - basename = os.path.basename(container_path) - path = Path(container_archive, basename) - ci_assert( - not path.exists(), - f'the container "{basename}" was copied into the container archive' - ) - ch_image_delete(container_name) - - -@TEST_RUNNER.add() -def docker_file(outer_workdir): - """Ensure that the `dockerFile` example ran properly and then clean up.""" - # `dockerFile` workflow - workflow_path = os.path.join(outer_workdir, f'bee-df-workflow-{uuid.uuid4().hex}') - # Copy the Dockerfile to the workdir path - os.makedirs(workflow_path) - workdir = os.path.join(outer_workdir, uuid.uuid4().hex) - os.makedirs(workdir) - shutil.copy(DOCKER_FILE_PATH, os.path.join(workflow_path, 'Dockerfile')) - container_name = 'docker_file_test' - docker_requirement = { - 'dockerFile': 'Dockerfile', - 'beeflow:containerName': container_name, - } - main_input = 'docker_file' - main_cwl, job_file = generate_builder_workflow( - workflow_path, - docker_requirement, - main_input, - ) - workflow = Workflow('docker-file', workflow_path, - main_cwl=main_cwl, job_file=job_file, workdir=workdir, - containers=[]) - yield [workflow] - check_completed(workflow) - path = os.path.join(workdir, main_input) - check_path_exists(path) - # The container should have been copied into the archive - container_archive = bc.get('builder', 'container_archive') - tarball = f'{container_name}.tar.gz' - path = os.path.join(container_archive, tarball) - check_path_exists(path) - os.remove(path) - # Check that the container is listed - images = ch_image_list() - ci_assert(container_name in images, f'cannot find expected container "{container_name}"') - ch_image_delete(container_name) - - -@TEST_RUNNER.add() -def docker_pull(outer_workdir): - """Prepare, then check that the `dockerPull` option was successful.""" - # `dockerPull` workflow - workflow_path = os.path.join(outer_workdir, f'bee-dp-workflow-{uuid.uuid4().hex}') - workdir = os.path.join(outer_workdir, uuid.uuid4().hex) - os.makedirs(workdir) - container_name = BASE_CONTAINER - docker_requirement = { - 'dockerPull': container_name, - } - main_input = 'docker_pull' - main_cwl, job_file = generate_builder_workflow( - workflow_path, - docker_requirement, - main_input, - ) - workflow = Workflow('docker-pull', workflow_path, - main_cwl=main_cwl, job_file=job_file, workdir=workdir, - containers=[]) - yield [workflow] - check_completed(workflow) - path = os.path.join(workdir, main_input) - check_path_exists(path) - # Check that the image tarball is in the archive - container_archive = bc.get('builder', 'container_archive') - path = os.path.join(container_archive, f'{container_name}.tar.gz') - check_path_exists(path) - os.remove(path) - # Commenting the below out for now; looks like Charliecloud 0.32 isn't - # showing base containers for some reason? - # Check for the image with `ch-image list` - # images = ch_image_list() - # ci_assert(container_name in images, f'could not find expected container "{container_name}"') - # ch_image_delete(container_name) - - -@TEST_RUNNER.add() -def multiple_workflows(outer_workdir): - """Test running three different workflows at the same time.""" - output_file = 'output_file' - workflow_data = [] - workflows = [] - for i in range(3): - workdir = os.path.join(outer_workdir, uuid.uuid4().hex) - os.makedirs(workdir) - workflow_path = Path(outer_workdir, uuid.uuid4().hex) - main_cwl, job_file = generate_simple_workflow(workflow_path, output_file) - workflow = Workflow(f'multi-workflow-{i}', workflow_path, - main_cwl=main_cwl, job_file=job_file, - workdir=workdir, containers=[]) - workflow_data.append({ - 'workdir': workdir, - 'workflow_path': workflow_path, - }) - workflows.append(workflow) - yield workflows - for workflow in workflows: - check_completed(workflow) - for wfl_info in workflow_data: - workdir = wfl_info['workdir'] - workflow_path = wfl_info['workflow_path'] - # Each test should have touched this file - path = Path(workdir, output_file) - check_path_exists(path) - - -@TEST_RUNNER.add(ignore=True) -def checkpoint_restart(outer_workdir): - """Test the clamr-ffmpeg checkpoint restart workflow.""" - workdir = os.path.join(outer_workdir, uuid.uuid4().hex) - os.makedirs(workdir) - workflow = Workflow('checkpoint-restart', - 'beeflow/data/cwl/bee_workflows/clamr-wf-checkpoint', - main_cwl='clamr_wf.cwl', job_file='clamr_job.yml', - workdir=workdir, containers=[]) - yield [workflow] - check_completed(workflow) - # Check for the movie file - path = Path(workdir, 'CLAMR_movie.mp4') - check_path_exists(path) - - -@TEST_RUNNER.add(ignore=True) -def checkpoint_restart_failure(outer_workdir): - """Test a checkpoint restart workflow that continues past 'num_retries'.""" - workdir = os.path.join(outer_workdir, uuid.uuid4().hex) - os.makedirs(workdir) - workflow = Workflow('checkpoint-too-long', - 'beeflow/data/cwl/bee_workflows/checkpoint-too-long', - main_cwl='workflow.cwl', job_file='input.yml', - workdir=workdir, containers=[]) - yield [workflow] - ci_assert(workflow.status == 'Failed', - f'Workflow did not fail as expected (final status: {workflow.status})') - - -def test_input_callback(arg): - """Parse a list of tests separated by commas.""" - return arg.split(',') if arg is not None else None - - -def main(tests = typer.Option(None, '--tests', '-t', # noqa (conflict on '=' sign) - callback=test_input_callback, - help='tests run as comma-separated string'), - show_tests: bool = typer.Option(False, '--show-tests', '-s', # noqa (conflict on '=' sign) - help='show a list of all tests'), - timeout: int = typer.Option(TIMEOUT, '--timeout', help='workflow timeout in seconds')): - """Launch the integration tests.""" - if show_tests: - print('INTEGRATION TEST CASES:') - for test_name, ignore in TEST_RUNNER.test_details(): - print('*', test_name, '(ignored)' if ignore else '') - return - TEST_RUNNER.timeout = timeout - # Run the workflows and then show completion results - ret = TEST_RUNNER.run(tests) - # ret = test_workflows(WORKFLOWS) - # General clean up - os.remove(DOCKER_FILE_PATH) - sys.exit(ret) +from beeflow.common import integration_test if __name__ == '__main__': - typer.run(main) -# Ignore W0231: This is a user-defined exception and I don't think we need to call -# __init__ on the base class. -# pylama:ignore=W0231 + typer.run(integration_test.main) From e1802d0c3699be42617c77aed517fbebb4f68cbb Mon Sep 17 00:00:00 2001 From: Jake Tronge Date: Wed, 20 Dec 2023 12:49:13 -0700 Subject: [PATCH 3/7] Refactor integration tests into different modules --- .../common/integration/generated_workflows.py | 160 ++++++ beeflow/common/integration/utils.py | 236 ++++++++ beeflow/common/integration_test.py | 515 +++--------------- 3 files changed, 465 insertions(+), 446 deletions(-) create mode 100644 beeflow/common/integration/generated_workflows.py create mode 100644 beeflow/common/integration/utils.py diff --git a/beeflow/common/integration/generated_workflows.py b/beeflow/common/integration/generated_workflows.py new file mode 100644 index 000000000..ab6fe675e --- /dev/null +++ b/beeflow/common/integration/generated_workflows.py @@ -0,0 +1,160 @@ +"""Inline workflow generation code.""" +from pathlib import Path +import os +import uuid +import yaml + +from beeflow.common.integration import utils + + +BASE_CONTAINER = 'alpine' +SIMPLE_DOCKERFILE = """ +# Dummy docker file that really doesn't do much +FROM alpine + +# Install something that has minimal dependencies +RUN apk update && apk add bzip2 +""" +# Dump the Dockerfile to a path that the later code can reference +DOCKER_FILE_PATH = os.path.join('/tmp', f'Dockerfile-{uuid.uuid4().hex}') + + +def yaml_dump(path, data): + """Dump this data as a yaml file at path.""" + with open(path, 'w', encoding='utf-8') as fp: + yaml.dump(data, fp, Dumper=yaml.CDumper) + + +def builder_workflow(output_path, docker_requirement, main_input): + """Generate a base workflow to be used for testing the builder.""" + os.makedirs(output_path, exist_ok=True) + main_cwl_file = 'workflow.cwl' + main_cwl_data = { + 'cwlVersion': 'v1.0', + 'class': 'Workflow', + 'inputs': { + 'main_input': 'string', + }, + 'outputs': { + 'main_output': { + 'outputSource': 'step0/step_output', + 'type': 'File', + }, + }, + 'steps': { + 'step0': { + 'run': 'step0.cwl', + 'in': { + 'step_input': 'main_input', + }, + 'out': ['step_output'], + 'hints': { + 'DockerRequirement': docker_requirement, + }, + } + }, + } + yaml_dump(os.path.join(output_path, main_cwl_file), main_cwl_data) + + step0_file = 'step0.cwl' + step0_data = { + 'cwlVersion': 'v1.0', + 'class': 'CommandLineTool', + 'baseCommand': 'touch', + 'inputs': { + 'step_input': { + 'type': 'string', + 'inputBinding': { + 'position': 1, + }, + }, + }, + 'outputs': { + 'step_output': { + 'type': 'stdout', + } + }, + 'stdout': 'output.txt', + } + yaml_dump(os.path.join(output_path, step0_file), step0_data) + + job_file = 'job.yml' + job_data = { + 'main_input': main_input, + } + yaml_dump(os.path.join(output_path, job_file), job_data) + + return (main_cwl_file, job_file) + + +def simple_workflow(output_path, fname): + """Generate a simple workflow.""" + os.makedirs(output_path, exist_ok=True) + + task0_cwl = 'task0.cwl' + main_cwl = 'main.cwl' + main_cwl_file = str(Path(output_path, main_cwl)) + main_cwl_data = { + 'cwlVersion': 'v1.0', + 'class': 'Workflow', + 'requirements': {}, + 'inputs': { + 'in_fname': 'string', + }, + 'outputs': { + 'out': { + 'type': 'File', + 'outputSource': 'task0/out', + }, + }, + 'steps': { + 'task0': { + 'run': task0_cwl, + 'in': { + 'fname': 'in_fname', + }, + 'out': ['out'], + }, + }, + } + yaml_dump(main_cwl_file, main_cwl_data) + + task0_cwl_file = str(Path(output_path, task0_cwl)) + task0_data = { + 'cwlVersion': 'v1.0', + 'class': 'CommandLineTool', + 'baseCommand': 'touch', + 'inputs': { + 'fname': { + 'type': 'string', + 'inputBinding': { + 'position': 1, + }, + }, + }, + 'stdout': 'touch.log', + 'outputs': { + 'out': { + 'type': 'stdout', + } + } + } + yaml_dump(task0_cwl_file, task0_data) + + job_yaml = 'job.yaml' + job_file = str(Path(output_path, job_yaml)) + job_data = { + 'in_fname': fname, + } + yaml_dump(job_file, job_data) + return (main_cwl, job_yaml) + + +def init(): + """Initialize files and the container runtime.""" + with open(DOCKER_FILE_PATH, 'w', encoding='utf-8') as docker_file_fp: + docker_file_fp.write(SIMPLE_DOCKERFILE) + + # Remove an existing base container + if BASE_CONTAINER in utils.ch_image_list(): + utils.ch_image_delete(BASE_CONTAINER) diff --git a/beeflow/common/integration/utils.py b/beeflow/common/integration/utils.py new file mode 100644 index 000000000..2bf4435e1 --- /dev/null +++ b/beeflow/common/integration/utils.py @@ -0,0 +1,236 @@ +"""Utility code for running the integration tests.""" +from contextlib import contextmanager +from pathlib import Path +import os +import shutil +import subprocess +import time +import traceback +import uuid + +from beeflow.client import bee_client + + +# Max time of each workflow +TIMEOUT = 90 +INTEGRATION_TEST_DIR = os.path.expanduser('~/.beeflow-integration') + + +class CIError(Exception): + """CI error class.""" + + def __init__(self, *args): + """Initialize the error with default args.""" + self.args = args + + +class Container: + """Container CI class for building BEE containers.""" + + def __init__(self, name, dockerfile, tarball): + """BEE CI container constructor.""" + self.name = name + self.dockerfile = dockerfile + self.tarball = tarball + self.done = False + + def build(self): + """Build the containers and save them in the proper location.""" + if not self.done: + try: + subprocess.check_call(['ch-image', 'build', '-f', self.dockerfile, '-t', self.name, + '--force', 'seccomp', os.path.dirname(self.dockerfile)]) + subprocess.check_call(['ch-convert', '-i', 'ch-image', '-o', 'tar', self.name, + self.tarball]) + except subprocess.CalledProcessError as error: + raise CIError( + f'container build: {error}' + ) from None + self.done = True + + +class Workflow: + """Workflow CI class for interacting with BEE.""" + + def __init__(self, name, path, main_cwl, job_file, workdir, containers): + """Workflow constructor.""" + self.name = name + self.path = path + self.main_cwl = main_cwl + self.job_file = job_file + self.workdir = workdir + self.containers = containers + self.wf_id = None + self.tarball = None + + def run(self): + """Attempt to submit and start the workflow.""" + # Build all the containers first + print('Building all containers') + for ctr in self.containers: + ctr.build() + try: + tarball_dir = Path(self.path).parent + tarball = f'{Path(self.path).name}.tgz' + self.tarball = tarball_dir / tarball + print('Creating the workflow tarball', self.tarball) + bee_client.package(Path(self.path), Path(tarball_dir)) + print('Submitting and starting workflow') + self.wf_id = bee_client.submit(self.name, self.tarball, self.main_cwl, + self.job_file, self.workdir, no_start=False) + except bee_client.ClientError as error: + raise CIError(*error.args) from error + + @property + def running(self): + """Check if the workflow is running or about to run.""" + return bee_client.query(self.wf_id)[0] in ('Initializing', 'Waiting', 'Running', 'Pending') + + @property + def status(self): + """Get the status of the workflow.""" + return bee_client.query(self.wf_id)[0] + + def cleanup(self): + """Clean up any leftover workflow data.""" + # Remove the generated tarball + os.remove(self.tarball) + + +class TestRunner: + """Test runner class.""" + + def __init__(self): + """Build a new test runner class.""" + self.test_cases = [] + self.timeout = TIMEOUT + + def add(self, ignore=False): + """Decorate a test case and add it to the runner instance.""" + + def wrap(test_case): + """Wrap the function.""" + # First turn it into a contextmanager + test_case = contextmanager(test_case) + self.test_cases.append((test_case, ignore)) + return test_case + + return wrap + + def test_details(self): + """Return a list of all the test details (test_name, ignore).""" + return [(test_case.__name__, ignore) for test_case, ignore in self.test_cases] + + def _run_workflows(self, workflows): + """Run a list of workflows to completion.""" + # Start all workflows at once + for wfl in workflows: + wfl.run() + # Now run until all workflows are complete or until self.timeout is hit + t = 0 + while t < self.timeout: + if all(not wfl.running for wfl in workflows): + # All workflows have completed + break + time.sleep(2) + t += 2 + if t >= self.timeout: + raise CIError('workflow timeout') + + def run(self, test_names=None): + """Test run all test cases.""" + results = {} + for test_case, ignore in self.test_cases: + if test_names is not None: + # Skip tests that aren't required to be run + if test_case.__name__ not in test_names: + continue + elif ignore: + # Skip ignored tests + continue + outer_workdir = os.path.join(INTEGRATION_TEST_DIR, uuid.uuid4().hex) + os.makedirs(outer_workdir) + msg0 = f'Starting test {test_case.__name__}' + msg1 = f'(running in "{outer_workdir}")' + count = max(len(msg0), len(msg1)) + print('#' * count) + print(msg0) + print(msg1) + print('-' * count) + try: + with test_case(outer_workdir) as workflows: + self._run_workflows(workflows) + except CIError as error: + traceback.print_exc() + results[test_case.__name__] = error + print('------') + print('FAILED') + print('------') + else: + results[test_case.__name__] = None + print('------') + print('PASSED') + print('------') + # Only remove the outer_workdir if it passed + try: + shutil.rmtree(outer_workdir) + except OSError as err: + print(f'WARNING: Failed to remove {outer_workdir}') + print(err) + + fails = sum(1 if result is not None else 0 for _, result in results.items()) + self.display_results(results, fails) + return 1 if fails > 0 else 0 + + def display_results(self, results, fails): + """Show the workflow results.""" + print('######## WORKFLOW RESULTS ########') + passes = len(results) - fails + print(f'{passes} passes, {fails} fails') + for name, result in results.items(): + error = result + if error is not None: + print(f'{name}: {error}') + print('##################################') + + +# +# Helper methods for running the test cases +# + + +def ch_image_delete(img_name): + """Execute a ch-image delete [img_name].""" + try: + subprocess.check_call(['ch-image', 'delete', img_name]) + except subprocess.CalledProcessError: + raise CIError( + f'failed when calling `ch-image delete {img_name}` to clean up' + ) from None + + +def ch_image_list(): + """Execute a ch-image list and return the images.""" + try: + res = subprocess.run(['ch-image', 'list'], stdout=subprocess.PIPE, check=True) + output = res.stdout.decode(encoding='utf-8') + images = [img for img in output.split('\n') if img] + return images + except subprocess.CalledProcessError as err: + raise CIError(f'failed when calling `ch-image list`: {err}') from err + + +def ci_assert(predicate, msg): + """Assert that the predicate is True, or raise a CIError.""" + if not predicate: + raise CIError(msg) + + +def check_path_exists(path): + """Check that the specified path exists.""" + ci_assert(os.path.exists(path), f'expected file "{path}" does not exist') + + +def check_completed(workflow): + """Ensure the workflow has a completed status.""" + ci_assert(workflow.status == 'Archived', f'Bad workflow status {workflow.status}') diff --git a/beeflow/common/integration_test.py b/beeflow/common/integration_test.py index 72e0f0ddc..372073e31 100644 --- a/beeflow/common/integration_test.py +++ b/beeflow/common/integration_test.py @@ -1,398 +1,17 @@ -from contextlib import contextmanager +"""BEE integration tests.""" from pathlib import Path import os import shutil -import subprocess import sys -import time -import traceback import uuid -import yaml import typer +from beeflow.common.integration import utils +from beeflow.common.integration import generated_workflows from beeflow.common.config_driver import BeeConfig as bc -from beeflow.client import bee_client -# Max time of each workflow -TIMEOUT = 90 -INTEGRATION_TEST_DIR = os.path.expanduser('~/.beeflow-integration') - - -class CIError(Exception): - """CI error class.""" - - def __init__(self, *args): - """Initialize the error with default args.""" - self.args = args - - -class Container: - """Container CI class for building BEE containers.""" - - def __init__(self, name, dockerfile, tarball): - """BEE CI container constructor.""" - self.name = name - self.dockerfile = dockerfile - self.tarball = tarball - self.done = False - - def build(self): - """Build the containers and save them in the proper location.""" - if not self.done: - try: - subprocess.check_call(['ch-image', 'build', '-f', self.dockerfile, '-t', self.name, - '--force', 'seccomp', os.path.dirname(self.dockerfile)]) - subprocess.check_call(['ch-convert', '-i', 'ch-image', '-o', 'tar', self.name, - self.tarball]) - except subprocess.CalledProcessError as error: - raise CIError( - f'container build: {error}' - ) from None - self.done = True - - -class Workflow: - """Workflow CI class for interacting with BEE.""" - - def __init__(self, name, path, main_cwl, job_file, workdir, containers): - """Workflow constructor.""" - self.name = name - self.path = path - self.main_cwl = main_cwl - self.job_file = job_file - self.workdir = workdir - self.containers = containers - self.wf_id = None - self.tarball = None - - def run(self): - """Attempt to submit and start the workflow.""" - # Build all the containers first - print('Building all containers') - for ctr in self.containers: - ctr.build() - try: - tarball_dir = Path(self.path).parent - tarball = f'{Path(self.path).name}.tgz' - self.tarball = tarball_dir / tarball - print('Creating the workflow tarball', self.tarball) - bee_client.package(Path(self.path), Path(tarball_dir)) - print('Submitting and starting workflow') - self.wf_id = bee_client.submit(self.name, self.tarball, self.main_cwl, - self.job_file, self.workdir, no_start=False) - except bee_client.ClientError as error: - raise CIError(*error.args) from error - - @property - def running(self): - """Check if the workflow is running or about to run.""" - return bee_client.query(self.wf_id)[0] in ('Initializing', 'Waiting', 'Running', 'Pending') - - @property - def status(self): - """Get the status of the workflow.""" - return bee_client.query(self.wf_id)[0] - - def cleanup(self): - """Clean up any leftover workflow data.""" - # Remove the generated tarball - os.remove(self.tarball) - - -class TestRunner: - """Test runner class.""" - - def __init__(self): - """Build a new test runner class.""" - self.test_cases = [] - self.timeout = TIMEOUT - - def add(self, ignore=False): - """Decorate a test case and add it to the runner instance.""" - - def wrap(test_case): - """Wrap the function.""" - # First turn it into a contextmanager - test_case = contextmanager(test_case) - self.test_cases.append((test_case, ignore)) - return test_case - - return wrap - - def test_details(self): - """Return a list of all the test details (test_name, ignore).""" - return [(test_case.__name__, ignore) for test_case, ignore in self.test_cases] - - def _run_workflows(self, workflows): - """Run a list of workflows to completion.""" - # Start all workflows at once - for wfl in workflows: - wfl.run() - # Now run until all workflows are complete or until self.timeout is hit - t = 0 - while t < self.timeout: - if all(not wfl.running for wfl in workflows): - # All workflows have completed - break - time.sleep(2) - t += 2 - if t >= self.timeout: - raise CIError('workflow timeout') - - def run(self, test_names=None): - """Test run all test cases.""" - results = {} - for test_case, ignore in self.test_cases: - if test_names is not None: - # Skip tests that aren't required to be run - if test_case.__name__ not in test_names: - continue - elif ignore: - # Skip ignored tests - continue - outer_workdir = os.path.join(INTEGRATION_TEST_DIR, uuid.uuid4().hex) - os.makedirs(outer_workdir) - msg0 = f'Starting test {test_case.__name__}' - msg1 = f'(running in "{outer_workdir}")' - count = max(len(msg0), len(msg1)) - print('#' * count) - print(msg0) - print(msg1) - print('-' * count) - try: - with test_case(outer_workdir) as workflows: - self._run_workflows(workflows) - except CIError as error: - traceback.print_exc() - results[test_case.__name__] = error - print('------') - print('FAILED') - print('------') - else: - results[test_case.__name__] = None - print('------') - print('PASSED') - print('------') - # Only remove the outer_workdir if it passed - try: - shutil.rmtree(outer_workdir) - except OSError as err: - print(f'WARNING: Failed to remove {outer_workdir}') - print(err) - - print('######## WORKFLOW RESULTS ########') - fails = sum(1 if result is not None else 0 for name, result in results.items()) - passes = len(results) - fails - print(f'{passes} passes, {fails} fails') - for name, result in results.items(): - error = result - if error is not None: - print(f'{name}: {error}') - print('##################################') - return 1 if fails > 0 else 0 - - -TEST_RUNNER = TestRunner() - - -# -# Helper methods for running the test cases -# - - -def ch_image_delete(img_name): - """Execute a ch-image delete [img_name].""" - try: - subprocess.check_call(['ch-image', 'delete', img_name]) - except subprocess.CalledProcessError: - raise CIError( - f'failed when calling `ch-image delete {img_name}` to clean up' - ) from None - - -def ch_image_list(): - """Execute a ch-image list and return the images.""" - try: - res = subprocess.run(['ch-image', 'list'], stdout=subprocess.PIPE, check=True) - output = res.stdout.decode(encoding='utf-8') - images = [img for img in output.split('\n') if img] - return images - except subprocess.CalledProcessError as err: - raise CIError(f'failed when calling `ch-image list`: {err}') from err - - -def ci_assert(predicate, msg): - """Assert that the predicate is True, or raise a CIError.""" - if not predicate: - raise CIError(msg) - - -def check_path_exists(path): - """Check that the specified path exists.""" - ci_assert(os.path.exists(path), f'expected file "{path}" does not exist') - - -def check_completed(workflow): - """Ensure the workflow has a completed status.""" - ci_assert(workflow.status == 'Archived', f'Bad workflow status {workflow.status}') - - -# -# Inline workflow generation code -# - -def yaml_dump(path, data): - """Dump this data as a yaml file at path.""" - with open(path, 'w', encoding='utf-8') as fp: - yaml.dump(data, fp, Dumper=yaml.CDumper) - - -def generate_builder_workflow(output_path, docker_requirement, main_input): - """Generate a base workflow to be used for testing the builder.""" - os.makedirs(output_path, exist_ok=True) - main_cwl_file = 'workflow.cwl' - main_cwl_data = { - 'cwlVersion': 'v1.0', - 'class': 'Workflow', - 'inputs': { - 'main_input': 'string', - }, - 'outputs': { - 'main_output': { - 'outputSource': 'step0/step_output', - 'type': 'File', - }, - }, - 'steps': { - 'step0': { - 'run': 'step0.cwl', - 'in': { - 'step_input': 'main_input', - }, - 'out': ['step_output'], - 'hints': { - 'DockerRequirement': docker_requirement, - }, - } - }, - } - yaml_dump(os.path.join(output_path, main_cwl_file), main_cwl_data) - - step0_file = 'step0.cwl' - step0_data = { - 'cwlVersion': 'v1.0', - 'class': 'CommandLineTool', - 'baseCommand': 'touch', - 'inputs': { - 'step_input': { - 'type': 'string', - 'inputBinding': { - 'position': 1, - }, - }, - }, - 'outputs': { - 'step_output': { - 'type': 'stdout', - } - }, - 'stdout': 'output.txt', - } - yaml_dump(os.path.join(output_path, step0_file), step0_data) - - job_file = 'job.yml' - job_data = { - 'main_input': main_input, - } - yaml_dump(os.path.join(output_path, job_file), job_data) - - return (main_cwl_file, job_file) - - -def generate_simple_workflow(output_path, fname): - """Generate a simple workflow.""" - os.makedirs(output_path, exist_ok=True) - - task0_cwl = 'task0.cwl' - main_cwl = 'main.cwl' - main_cwl_file = str(Path(output_path, main_cwl)) - main_cwl_data = { - 'cwlVersion': 'v1.0', - 'class': 'Workflow', - 'requirements': {}, - 'inputs': { - 'in_fname': 'string', - }, - 'outputs': { - 'out': { - 'type': 'File', - 'outputSource': 'task0/out', - }, - }, - 'steps': { - 'task0': { - 'run': task0_cwl, - 'in': { - 'fname': 'in_fname', - }, - 'out': ['out'], - }, - }, - } - yaml_dump(main_cwl_file, main_cwl_data) - - task0_cwl_file = str(Path(output_path, task0_cwl)) - task0_data = { - 'cwlVersion': 'v1.0', - 'class': 'CommandLineTool', - 'baseCommand': 'touch', - 'inputs': { - 'fname': { - 'type': 'string', - 'inputBinding': { - 'position': 1, - }, - }, - }, - 'stdout': 'touch.log', - 'outputs': { - 'out': { - 'type': 'stdout', - } - } - } - yaml_dump(task0_cwl_file, task0_data) - - job_yaml = 'job.yaml' - job_file = str(Path(output_path, job_yaml)) - job_data = { - 'in_fname': fname, - } - yaml_dump(job_file, job_data) - return (main_cwl, job_yaml) - - -BASE_CONTAINER = 'alpine' -SIMPLE_DOCKERFILE = """ -# Dummy docker file that really doesn't do much -FROM alpine - -# Install something that has minimal dependencies -RUN apk update && apk add bzip2 -""" -# Dump the Dockerfile to a path that the later code can reference -DOCKER_FILE_PATH = os.path.join('/tmp', f'Dockerfile-{uuid.uuid4().hex}') -with open(DOCKER_FILE_PATH, 'w', encoding='utf-8') as docker_file_fp: - docker_file_fp.write(SIMPLE_DOCKERFILE) - -# -# Workflows setup -# - -# Remove an existing base container -if BASE_CONTAINER in ch_image_list(): - ch_image_delete(BASE_CONTAINER) +TEST_RUNNER = utils.TestRunner() @TEST_RUNNER.add() @@ -403,30 +22,31 @@ def copy_container(outer_workdir): container_name = 'copy-container' workdir = os.path.join(outer_workdir, uuid.uuid4().hex) os.makedirs(workdir) - container = Container(container_name, DOCKER_FILE_PATH, container_path) + container = utils.Container(container_name, generated_workflows.DOCKER_FILE_PATH, + container_path) workflow_path = os.path.join(outer_workdir, f'bee-cc-workflow-{uuid.uuid4().hex}') main_input = 'copy_container' docker_requirement = { 'beeflow:copyContainer': container_path, } - main_cwl, job_file = generate_builder_workflow(workflow_path, docker_requirement, - main_input) - workflow = Workflow('copy-container', workflow_path, - main_cwl=main_cwl, job_file=job_file, workdir=workdir, - containers=[container]) + main_cwl, job_file = generated_workflows.builder_workflow(workflow_path, docker_requirement, + main_input) + workflow = utils.Workflow('copy-container', workflow_path, main_cwl=main_cwl, + job_file=job_file, workdir=workdir, + containers=[container]) yield [workflow] - check_completed(workflow) + utils.check_completed(workflow) # Ensure the output file was created path = os.path.join(workdir, main_input) - check_path_exists(path) + utils.check_path_exists(path) os.remove(path) # Ensure that the container has been copied into the archive container_archive = bc.get('builder', 'container_archive') basename = os.path.basename(container_path) path = os.path.join(container_archive, basename) - check_path_exists(path) + utils.check_path_exists(path) os.remove(path) - ch_image_delete(container_name) + utils.ch_image_delete(container_name) @TEST_RUNNER.add() @@ -437,33 +57,34 @@ def use_container(outer_workdir): container_name = 'use-container' container_workdir = os.path.join(outer_workdir, uuid.uuid4().hex) os.makedirs(container_workdir) - container = Container(container_name, DOCKER_FILE_PATH, container_path) + container = utils.Container(container_name, generated_workflows.DOCKER_FILE_PATH, + container_path) workflow_path = os.path.join(outer_workdir, f'bee-use-ctr-workflow-{uuid.uuid4().hex}') main_input = 'use_ctr' docker_requirement = { 'beeflow:useContainer': container_path, } - main_cwl, job_file = generate_builder_workflow( + main_cwl, job_file = generated_workflows.builder_workflow( workflow_path, docker_requirement, main_input, ) - workflow = Workflow('use-container', workflow_path, - main_cwl=main_cwl, job_file=job_file, - workdir=container_workdir, containers=[container]) + workflow = utils.Workflow('use-container', workflow_path, main_cwl=main_cwl, + job_file=job_file, workdir=container_workdir, + containers=[container]) yield [workflow] - check_completed(workflow) + utils.check_completed(workflow) path = os.path.join(container_workdir, main_input) - check_path_exists(path) + utils.check_path_exists(path) # This container should not have been copied into the container archive container_archive = bc.get('builder', 'container_archive') basename = os.path.basename(container_path) path = Path(container_archive, basename) - ci_assert( + utils.ci_assert( not path.exists(), f'the container "{basename}" was copied into the container archive' ) - ch_image_delete(container_name) + utils.ch_image_delete(container_name) @TEST_RUNNER.add() @@ -475,35 +96,35 @@ def docker_file(outer_workdir): os.makedirs(workflow_path) workdir = os.path.join(outer_workdir, uuid.uuid4().hex) os.makedirs(workdir) - shutil.copy(DOCKER_FILE_PATH, os.path.join(workflow_path, 'Dockerfile')) + shutil.copy(generated_workflows.DOCKER_FILE_PATH, os.path.join(workflow_path, 'Dockerfile')) container_name = 'docker_file_test' docker_requirement = { 'dockerFile': 'Dockerfile', 'beeflow:containerName': container_name, } main_input = 'docker_file' - main_cwl, job_file = generate_builder_workflow( + main_cwl, job_file = generated_workflows.builder_workflow( workflow_path, docker_requirement, main_input, ) - workflow = Workflow('docker-file', workflow_path, - main_cwl=main_cwl, job_file=job_file, workdir=workdir, - containers=[]) + workflow = utils.Workflow('docker-file', workflow_path, main_cwl=main_cwl, + job_file=job_file, workdir=workdir, containers=[]) yield [workflow] - check_completed(workflow) + utils.check_completed(workflow) path = os.path.join(workdir, main_input) - check_path_exists(path) + utils.check_path_exists(path) # The container should have been copied into the archive container_archive = bc.get('builder', 'container_archive') tarball = f'{container_name}.tar.gz' path = os.path.join(container_archive, tarball) - check_path_exists(path) + utils.check_path_exists(path) os.remove(path) # Check that the container is listed - images = ch_image_list() - ci_assert(container_name in images, f'cannot find expected container "{container_name}"') - ch_image_delete(container_name) + images = utils.ch_image_list() + utils.ci_assert(container_name in images, + f'cannot find expected container "{container_name}"') + utils.ch_image_delete(container_name) @TEST_RUNNER.add() @@ -513,34 +134,34 @@ def docker_pull(outer_workdir): workflow_path = os.path.join(outer_workdir, f'bee-dp-workflow-{uuid.uuid4().hex}') workdir = os.path.join(outer_workdir, uuid.uuid4().hex) os.makedirs(workdir) - container_name = BASE_CONTAINER + container_name = generated_workflows.BASE_CONTAINER docker_requirement = { 'dockerPull': container_name, } main_input = 'docker_pull' - main_cwl, job_file = generate_builder_workflow( + main_cwl, job_file = generated_workflows.builder_workflow( workflow_path, docker_requirement, main_input, ) - workflow = Workflow('docker-pull', workflow_path, - main_cwl=main_cwl, job_file=job_file, workdir=workdir, - containers=[]) + workflow = utils.Workflow('docker-pull', workflow_path, main_cwl=main_cwl, + job_file=job_file, workdir=workdir, containers=[]) yield [workflow] - check_completed(workflow) + utils.check_completed(workflow) path = os.path.join(workdir, main_input) - check_path_exists(path) + utils.check_path_exists(path) # Check that the image tarball is in the archive container_archive = bc.get('builder', 'container_archive') path = os.path.join(container_archive, f'{container_name}.tar.gz') - check_path_exists(path) + utils.check_path_exists(path) os.remove(path) # Commenting the below out for now; looks like Charliecloud 0.32 isn't # showing base containers for some reason? # Check for the image with `ch-image list` - # images = ch_image_list() - # ci_assert(container_name in images, f'could not find expected container "{container_name}"') - # ch_image_delete(container_name) + # images = utils.ch_image_list() + # utils.ci_assert(container_name in images, + # f'could not find expected container "{container_name}"') + # utils.ch_image_delete(container_name) @TEST_RUNNER.add() @@ -553,10 +174,10 @@ def multiple_workflows(outer_workdir): workdir = os.path.join(outer_workdir, uuid.uuid4().hex) os.makedirs(workdir) workflow_path = Path(outer_workdir, uuid.uuid4().hex) - main_cwl, job_file = generate_simple_workflow(workflow_path, output_file) - workflow = Workflow(f'multi-workflow-{i}', workflow_path, - main_cwl=main_cwl, job_file=job_file, - workdir=workdir, containers=[]) + main_cwl, job_file = generated_workflows.simple_workflow(workflow_path, output_file) + workflow = utils.Workflow(f'multi-workflow-{i}', workflow_path, + main_cwl=main_cwl, job_file=job_file, + workdir=workdir, containers=[]) workflow_data.append({ 'workdir': workdir, 'workflow_path': workflow_path, @@ -564,13 +185,13 @@ def multiple_workflows(outer_workdir): workflows.append(workflow) yield workflows for workflow in workflows: - check_completed(workflow) + utils.check_completed(workflow) for wfl_info in workflow_data: workdir = wfl_info['workdir'] workflow_path = wfl_info['workflow_path'] # Each test should have touched this file path = Path(workdir, output_file) - check_path_exists(path) + utils.check_path_exists(path) @TEST_RUNNER.add(ignore=True) @@ -578,15 +199,15 @@ def checkpoint_restart(outer_workdir): """Test the clamr-ffmpeg checkpoint restart workflow.""" workdir = os.path.join(outer_workdir, uuid.uuid4().hex) os.makedirs(workdir) - workflow = Workflow('checkpoint-restart', - 'beeflow/data/cwl/bee_workflows/clamr-wf-checkpoint', - main_cwl='clamr_wf.cwl', job_file='clamr_job.yml', - workdir=workdir, containers=[]) + workflow = utils.Workflow('checkpoint-restart', + 'beeflow/data/cwl/bee_workflows/clamr-wf-checkpoint', + main_cwl='clamr_wf.cwl', job_file='clamr_job.yml', + workdir=workdir, containers=[]) yield [workflow] - check_completed(workflow) + utils.check_completed(workflow) # Check for the movie file path = Path(workdir, 'CLAMR_movie.mp4') - check_path_exists(path) + utils.check_path_exists(path) @TEST_RUNNER.add(ignore=True) @@ -594,13 +215,13 @@ def checkpoint_restart_failure(outer_workdir): """Test a checkpoint restart workflow that continues past 'num_retries'.""" workdir = os.path.join(outer_workdir, uuid.uuid4().hex) os.makedirs(workdir) - workflow = Workflow('checkpoint-too-long', - 'beeflow/data/cwl/bee_workflows/checkpoint-too-long', - main_cwl='workflow.cwl', job_file='input.yml', - workdir=workdir, containers=[]) + workflow = utils.Workflow('checkpoint-too-long', + 'beeflow/data/cwl/bee_workflows/checkpoint-too-long', + main_cwl='workflow.cwl', job_file='input.yml', + workdir=workdir, containers=[]) yield [workflow] - ci_assert(workflow.status == 'Failed', - f'Workflow did not fail as expected (final status: {workflow.status})') + utils.ci_assert(workflow.status == 'Failed', + f'Workflow did not fail as expected (final status: {workflow.status})') def test_input_callback(arg): @@ -613,19 +234,21 @@ def main(tests = typer.Option(None, '--tests', '-t', # noqa (conflict on '=' si help='tests run as comma-separated string'), show_tests: bool = typer.Option(False, '--show-tests', '-s', # noqa (conflict on '=' sign) help='show a list of all tests'), - timeout: int = typer.Option(TIMEOUT, '--timeout', help='workflow timeout in seconds')): + timeout: int = typer.Option(utils.TIMEOUT, '--timeout', + help='workflow timeout in seconds')): """Launch the integration tests.""" if show_tests: print('INTEGRATION TEST CASES:') for test_name, ignore in TEST_RUNNER.test_details(): print('*', test_name, '(ignored)' if ignore else '') return + generated_workflows.init() TEST_RUNNER.timeout = timeout # Run the workflows and then show completion results ret = TEST_RUNNER.run(tests) # ret = test_workflows(WORKFLOWS) # General clean up - os.remove(DOCKER_FILE_PATH) + os.remove(generated_workflows.DOCKER_FILE_PATH) sys.exit(ret) # Ignore W0231: This is a user-defined exception and I don't think we need to call # __init__ on the base class. From 54dd3f88535f268134db041213f6d21aa90315a6 Mon Sep 17 00:00:00 2001 From: Jake Tronge Date: Wed, 17 Jan 2024 12:23:29 -0700 Subject: [PATCH 4/7] Add info on running tests to the docs --- docs/sphinx/commands.rst | 2 ++ docs/sphinx/development.rst | 8 ++++++++ 2 files changed, 10 insertions(+) diff --git a/docs/sphinx/commands.rst b/docs/sphinx/commands.rst index 2290332be..9689f6acd 100644 --- a/docs/sphinx/commands.rst +++ b/docs/sphinx/commands.rst @@ -1,3 +1,5 @@ +.. _command-line-interface: + Command Line Interface ********************** diff --git a/docs/sphinx/development.rst b/docs/sphinx/development.rst index 03f19f8e5..626b0ae40 100644 --- a/docs/sphinx/development.rst +++ b/docs/sphinx/development.rst @@ -127,3 +127,11 @@ Additional documentation: * https://github.com/sdispater/poetry +Running Tests +------------- + +BEE includes unit and integration tests that can be run locally. + +To run the unit tests, make sure to install beeflow with ``poetry install -E cloud_extras``; the ``-E cloud_extras`` option forces Poetry to install extra dependencies required for some of the cloud API tests. After loading a shell with ``poetry shell``, you can run the unit tests with ``pytest beeflow/tests``. + +For the integration tests, you'll first have to start beeflow with ``beeflow core start`` (see :ref:`command-line-interface`). Then, making sure that you have Charliecloud loaded in your environment, you can run ``./ci/integration_test.py`` to run the tests. The integration test script includes a number of options for running extra tests, details of which can be found through ``--help`` and other command line options. From 9a4459477ab3331523ef42f415e6595acc6879de Mon Sep 17 00:00:00 2001 From: Jake Tronge Date: Wed, 17 Jan 2024 12:31:50 -0700 Subject: [PATCH 5/7] Add extra sentence about .beeflow-integration directory --- docs/sphinx/development.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/sphinx/development.rst b/docs/sphinx/development.rst index 626b0ae40..d722cde50 100644 --- a/docs/sphinx/development.rst +++ b/docs/sphinx/development.rst @@ -134,4 +134,4 @@ BEE includes unit and integration tests that can be run locally. To run the unit tests, make sure to install beeflow with ``poetry install -E cloud_extras``; the ``-E cloud_extras`` option forces Poetry to install extra dependencies required for some of the cloud API tests. After loading a shell with ``poetry shell``, you can run the unit tests with ``pytest beeflow/tests``. -For the integration tests, you'll first have to start beeflow with ``beeflow core start`` (see :ref:`command-line-interface`). Then, making sure that you have Charliecloud loaded in your environment, you can run ``./ci/integration_test.py`` to run the tests. The integration test script includes a number of options for running extra tests, details of which can be found through ``--help`` and other command line options. +For the integration tests, you'll first have to start beeflow with ``beeflow core start`` (see :ref:`command-line-interface`). Then, making sure that you have Charliecloud loaded in your environment, you can run ``./ci/integration_test.py`` to run the tests. The integration tests will create a directory ``~/.beeflow-integration`` to be used for storing temporary files as well as inspecting failure results. The script itself includes a number of options for running extra tests, details of which can be found through ``--help`` and other command line options. From a98089096c5409eb08697f7a9f36a5ca15ffea60 Mon Sep 17 00:00:00 2001 From: Jake Tronge Date: Thu, 18 Jan 2024 11:21:21 -0700 Subject: [PATCH 6/7] Move integration workflows to ci/test_workflows --- beeflow/common/integration_test.py | 4 ++-- .../test_workflows}/checkpoint-too-long/Dockerfile | 0 .../test_workflows}/checkpoint-too-long/input.yml | 0 .../test_workflows}/checkpoint-too-long/step0.cwl | 0 .../test_workflows}/checkpoint-too-long/workflow.cwl | 0 .../clamr-wf-checkpoint/Dockerfile.clamr-ffmpeg | 0 .../test_workflows}/clamr-wf-checkpoint/README.md | 0 .../test_workflows}/clamr-wf-checkpoint/clamr.cwl | 0 .../test_workflows}/clamr-wf-checkpoint/clamr_job.json | 0 .../test_workflows}/clamr-wf-checkpoint/clamr_job.yml | 0 .../test_workflows}/clamr-wf-checkpoint/clamr_job_long.yml | 0 .../test_workflows}/clamr-wf-checkpoint/clamr_wf.cwl | 0 .../test_workflows}/clamr-wf-checkpoint/ffmpeg.cwl | 0 13 files changed, 2 insertions(+), 2 deletions(-) rename {beeflow/data/cwl/bee_workflows => ci/test_workflows}/checkpoint-too-long/Dockerfile (100%) rename {beeflow/data/cwl/bee_workflows => ci/test_workflows}/checkpoint-too-long/input.yml (100%) rename {beeflow/data/cwl/bee_workflows => ci/test_workflows}/checkpoint-too-long/step0.cwl (100%) rename {beeflow/data/cwl/bee_workflows => ci/test_workflows}/checkpoint-too-long/workflow.cwl (100%) rename {beeflow/data/cwl/bee_workflows => ci/test_workflows}/clamr-wf-checkpoint/Dockerfile.clamr-ffmpeg (100%) rename {beeflow/data/cwl/bee_workflows => ci/test_workflows}/clamr-wf-checkpoint/README.md (100%) rename {beeflow/data/cwl/bee_workflows => ci/test_workflows}/clamr-wf-checkpoint/clamr.cwl (100%) rename {beeflow/data/cwl/bee_workflows => ci/test_workflows}/clamr-wf-checkpoint/clamr_job.json (100%) rename {beeflow/data/cwl/bee_workflows => ci/test_workflows}/clamr-wf-checkpoint/clamr_job.yml (100%) rename {beeflow/data/cwl/bee_workflows => ci/test_workflows}/clamr-wf-checkpoint/clamr_job_long.yml (100%) rename {beeflow/data/cwl/bee_workflows => ci/test_workflows}/clamr-wf-checkpoint/clamr_wf.cwl (100%) rename {beeflow/data/cwl/bee_workflows => ci/test_workflows}/clamr-wf-checkpoint/ffmpeg.cwl (100%) diff --git a/beeflow/common/integration_test.py b/beeflow/common/integration_test.py index 372073e31..704616c66 100644 --- a/beeflow/common/integration_test.py +++ b/beeflow/common/integration_test.py @@ -200,7 +200,7 @@ def checkpoint_restart(outer_workdir): workdir = os.path.join(outer_workdir, uuid.uuid4().hex) os.makedirs(workdir) workflow = utils.Workflow('checkpoint-restart', - 'beeflow/data/cwl/bee_workflows/clamr-wf-checkpoint', + 'ci/test_workflows/clamr-wf-checkpoint', main_cwl='clamr_wf.cwl', job_file='clamr_job.yml', workdir=workdir, containers=[]) yield [workflow] @@ -216,7 +216,7 @@ def checkpoint_restart_failure(outer_workdir): workdir = os.path.join(outer_workdir, uuid.uuid4().hex) os.makedirs(workdir) workflow = utils.Workflow('checkpoint-too-long', - 'beeflow/data/cwl/bee_workflows/checkpoint-too-long', + 'ci/test_workflows/checkpoint-too-long', main_cwl='workflow.cwl', job_file='input.yml', workdir=workdir, containers=[]) yield [workflow] diff --git a/beeflow/data/cwl/bee_workflows/checkpoint-too-long/Dockerfile b/ci/test_workflows/checkpoint-too-long/Dockerfile similarity index 100% rename from beeflow/data/cwl/bee_workflows/checkpoint-too-long/Dockerfile rename to ci/test_workflows/checkpoint-too-long/Dockerfile diff --git a/beeflow/data/cwl/bee_workflows/checkpoint-too-long/input.yml b/ci/test_workflows/checkpoint-too-long/input.yml similarity index 100% rename from beeflow/data/cwl/bee_workflows/checkpoint-too-long/input.yml rename to ci/test_workflows/checkpoint-too-long/input.yml diff --git a/beeflow/data/cwl/bee_workflows/checkpoint-too-long/step0.cwl b/ci/test_workflows/checkpoint-too-long/step0.cwl similarity index 100% rename from beeflow/data/cwl/bee_workflows/checkpoint-too-long/step0.cwl rename to ci/test_workflows/checkpoint-too-long/step0.cwl diff --git a/beeflow/data/cwl/bee_workflows/checkpoint-too-long/workflow.cwl b/ci/test_workflows/checkpoint-too-long/workflow.cwl similarity index 100% rename from beeflow/data/cwl/bee_workflows/checkpoint-too-long/workflow.cwl rename to ci/test_workflows/checkpoint-too-long/workflow.cwl diff --git a/beeflow/data/cwl/bee_workflows/clamr-wf-checkpoint/Dockerfile.clamr-ffmpeg b/ci/test_workflows/clamr-wf-checkpoint/Dockerfile.clamr-ffmpeg similarity index 100% rename from beeflow/data/cwl/bee_workflows/clamr-wf-checkpoint/Dockerfile.clamr-ffmpeg rename to ci/test_workflows/clamr-wf-checkpoint/Dockerfile.clamr-ffmpeg diff --git a/beeflow/data/cwl/bee_workflows/clamr-wf-checkpoint/README.md b/ci/test_workflows/clamr-wf-checkpoint/README.md similarity index 100% rename from beeflow/data/cwl/bee_workflows/clamr-wf-checkpoint/README.md rename to ci/test_workflows/clamr-wf-checkpoint/README.md diff --git a/beeflow/data/cwl/bee_workflows/clamr-wf-checkpoint/clamr.cwl b/ci/test_workflows/clamr-wf-checkpoint/clamr.cwl similarity index 100% rename from beeflow/data/cwl/bee_workflows/clamr-wf-checkpoint/clamr.cwl rename to ci/test_workflows/clamr-wf-checkpoint/clamr.cwl diff --git a/beeflow/data/cwl/bee_workflows/clamr-wf-checkpoint/clamr_job.json b/ci/test_workflows/clamr-wf-checkpoint/clamr_job.json similarity index 100% rename from beeflow/data/cwl/bee_workflows/clamr-wf-checkpoint/clamr_job.json rename to ci/test_workflows/clamr-wf-checkpoint/clamr_job.json diff --git a/beeflow/data/cwl/bee_workflows/clamr-wf-checkpoint/clamr_job.yml b/ci/test_workflows/clamr-wf-checkpoint/clamr_job.yml similarity index 100% rename from beeflow/data/cwl/bee_workflows/clamr-wf-checkpoint/clamr_job.yml rename to ci/test_workflows/clamr-wf-checkpoint/clamr_job.yml diff --git a/beeflow/data/cwl/bee_workflows/clamr-wf-checkpoint/clamr_job_long.yml b/ci/test_workflows/clamr-wf-checkpoint/clamr_job_long.yml similarity index 100% rename from beeflow/data/cwl/bee_workflows/clamr-wf-checkpoint/clamr_job_long.yml rename to ci/test_workflows/clamr-wf-checkpoint/clamr_job_long.yml diff --git a/beeflow/data/cwl/bee_workflows/clamr-wf-checkpoint/clamr_wf.cwl b/ci/test_workflows/clamr-wf-checkpoint/clamr_wf.cwl similarity index 100% rename from beeflow/data/cwl/bee_workflows/clamr-wf-checkpoint/clamr_wf.cwl rename to ci/test_workflows/clamr-wf-checkpoint/clamr_wf.cwl diff --git a/beeflow/data/cwl/bee_workflows/clamr-wf-checkpoint/ffmpeg.cwl b/ci/test_workflows/clamr-wf-checkpoint/ffmpeg.cwl similarity index 100% rename from beeflow/data/cwl/bee_workflows/clamr-wf-checkpoint/ffmpeg.cwl rename to ci/test_workflows/clamr-wf-checkpoint/ffmpeg.cwl From 2fa245195704b1323737a6e6632e4389f209e5f6 Mon Sep 17 00:00:00 2001 From: Patricia Grubel Date: Thu, 18 Jan 2024 15:27:56 -0700 Subject: [PATCH 7/7] Make minor change to documentation --- docs/sphinx/development.rst | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/sphinx/development.rst b/docs/sphinx/development.rst index d722cde50..f0ef2aba6 100644 --- a/docs/sphinx/development.rst +++ b/docs/sphinx/development.rst @@ -128,9 +128,9 @@ Additional documentation: * https://github.com/sdispater/poetry Running Tests -------------- +================== -BEE includes unit and integration tests that can be run locally. +BEE includes unit and integration tests that can be run on a local system. To run the unit tests, make sure to install beeflow with ``poetry install -E cloud_extras``; the ``-E cloud_extras`` option forces Poetry to install extra dependencies required for some of the cloud API tests. After loading a shell with ``poetry shell``, you can run the unit tests with ``pytest beeflow/tests``.