Skip to content

Commit

Permalink
Merge pull request #760 from lanl/checkpoint-restart-tests
Browse files Browse the repository at this point in the history
Add new checkpoint-restart tests and refactor integration test code
  • Loading branch information
pagrubel authored Jan 18, 2024
2 parents 4d1b14f + 2fa2451 commit 145a596
Show file tree
Hide file tree
Showing 18 changed files with 717 additions and 600 deletions.
160 changes: 160 additions & 0 deletions beeflow/common/integration/generated_workflows.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
"""Inline workflow generation code."""
from pathlib import Path
import os
import uuid
import yaml

from beeflow.common.integration import utils


BASE_CONTAINER = 'alpine'
SIMPLE_DOCKERFILE = """
# Dummy docker file that really doesn't do much
FROM alpine
# Install something that has minimal dependencies
RUN apk update && apk add bzip2
"""
# Dump the Dockerfile to a path that the later code can reference
DOCKER_FILE_PATH = os.path.join('/tmp', f'Dockerfile-{uuid.uuid4().hex}')


def yaml_dump(path, data):
"""Dump this data as a yaml file at path."""
with open(path, 'w', encoding='utf-8') as fp:
yaml.dump(data, fp, Dumper=yaml.CDumper)


def builder_workflow(output_path, docker_requirement, main_input):
"""Generate a base workflow to be used for testing the builder."""
os.makedirs(output_path, exist_ok=True)
main_cwl_file = 'workflow.cwl'
main_cwl_data = {
'cwlVersion': 'v1.0',
'class': 'Workflow',
'inputs': {
'main_input': 'string',
},
'outputs': {
'main_output': {
'outputSource': 'step0/step_output',
'type': 'File',
},
},
'steps': {
'step0': {
'run': 'step0.cwl',
'in': {
'step_input': 'main_input',
},
'out': ['step_output'],
'hints': {
'DockerRequirement': docker_requirement,
},
}
},
}
yaml_dump(os.path.join(output_path, main_cwl_file), main_cwl_data)

step0_file = 'step0.cwl'
step0_data = {
'cwlVersion': 'v1.0',
'class': 'CommandLineTool',
'baseCommand': 'touch',
'inputs': {
'step_input': {
'type': 'string',
'inputBinding': {
'position': 1,
},
},
},
'outputs': {
'step_output': {
'type': 'stdout',
}
},
'stdout': 'output.txt',
}
yaml_dump(os.path.join(output_path, step0_file), step0_data)

job_file = 'job.yml'
job_data = {
'main_input': main_input,
}
yaml_dump(os.path.join(output_path, job_file), job_data)

return (main_cwl_file, job_file)


def simple_workflow(output_path, fname):
"""Generate a simple workflow."""
os.makedirs(output_path, exist_ok=True)

task0_cwl = 'task0.cwl'
main_cwl = 'main.cwl'
main_cwl_file = str(Path(output_path, main_cwl))
main_cwl_data = {
'cwlVersion': 'v1.0',
'class': 'Workflow',
'requirements': {},
'inputs': {
'in_fname': 'string',
},
'outputs': {
'out': {
'type': 'File',
'outputSource': 'task0/out',
},
},
'steps': {
'task0': {
'run': task0_cwl,
'in': {
'fname': 'in_fname',
},
'out': ['out'],
},
},
}
yaml_dump(main_cwl_file, main_cwl_data)

task0_cwl_file = str(Path(output_path, task0_cwl))
task0_data = {
'cwlVersion': 'v1.0',
'class': 'CommandLineTool',
'baseCommand': 'touch',
'inputs': {
'fname': {
'type': 'string',
'inputBinding': {
'position': 1,
},
},
},
'stdout': 'touch.log',
'outputs': {
'out': {
'type': 'stdout',
}
}
}
yaml_dump(task0_cwl_file, task0_data)

job_yaml = 'job.yaml'
job_file = str(Path(output_path, job_yaml))
job_data = {
'in_fname': fname,
}
yaml_dump(job_file, job_data)
return (main_cwl, job_yaml)


def init():
"""Initialize files and the container runtime."""
with open(DOCKER_FILE_PATH, 'w', encoding='utf-8') as docker_file_fp:
docker_file_fp.write(SIMPLE_DOCKERFILE)

# Remove an existing base container
if BASE_CONTAINER in utils.ch_image_list():
utils.ch_image_delete(BASE_CONTAINER)
236 changes: 236 additions & 0 deletions beeflow/common/integration/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,236 @@
"""Utility code for running the integration tests."""
from contextlib import contextmanager
from pathlib import Path
import os
import shutil
import subprocess
import time
import traceback
import uuid

from beeflow.client import bee_client


# Max time of each workflow
TIMEOUT = 90
INTEGRATION_TEST_DIR = os.path.expanduser('~/.beeflow-integration')


class CIError(Exception):
"""CI error class."""

def __init__(self, *args):
"""Initialize the error with default args."""
self.args = args


class Container:
"""Container CI class for building BEE containers."""

def __init__(self, name, dockerfile, tarball):
"""BEE CI container constructor."""
self.name = name
self.dockerfile = dockerfile
self.tarball = tarball
self.done = False

def build(self):
"""Build the containers and save them in the proper location."""
if not self.done:
try:
subprocess.check_call(['ch-image', 'build', '-f', self.dockerfile, '-t', self.name,
'--force', 'seccomp', os.path.dirname(self.dockerfile)])
subprocess.check_call(['ch-convert', '-i', 'ch-image', '-o', 'tar', self.name,
self.tarball])
except subprocess.CalledProcessError as error:
raise CIError(
f'container build: {error}'
) from None
self.done = True


class Workflow:
"""Workflow CI class for interacting with BEE."""

def __init__(self, name, path, main_cwl, job_file, workdir, containers):
"""Workflow constructor."""
self.name = name
self.path = path
self.main_cwl = main_cwl
self.job_file = job_file
self.workdir = workdir
self.containers = containers
self.wf_id = None
self.tarball = None

def run(self):
"""Attempt to submit and start the workflow."""
# Build all the containers first
print('Building all containers')
for ctr in self.containers:
ctr.build()
try:
tarball_dir = Path(self.path).parent
tarball = f'{Path(self.path).name}.tgz'
self.tarball = tarball_dir / tarball
print('Creating the workflow tarball', self.tarball)
bee_client.package(Path(self.path), Path(tarball_dir))
print('Submitting and starting workflow')
self.wf_id = bee_client.submit(self.name, self.tarball, self.main_cwl,
self.job_file, self.workdir, no_start=False)
except bee_client.ClientError as error:
raise CIError(*error.args) from error

@property
def running(self):
"""Check if the workflow is running or about to run."""
return bee_client.query(self.wf_id)[0] in ('Initializing', 'Waiting', 'Running', 'Pending')

@property
def status(self):
"""Get the status of the workflow."""
return bee_client.query(self.wf_id)[0]

def cleanup(self):
"""Clean up any leftover workflow data."""
# Remove the generated tarball
os.remove(self.tarball)


class TestRunner:
"""Test runner class."""

def __init__(self):
"""Build a new test runner class."""
self.test_cases = []
self.timeout = TIMEOUT

def add(self, ignore=False):
"""Decorate a test case and add it to the runner instance."""

def wrap(test_case):
"""Wrap the function."""
# First turn it into a contextmanager
test_case = contextmanager(test_case)
self.test_cases.append((test_case, ignore))
return test_case

return wrap

def test_details(self):
"""Return a list of all the test details (test_name, ignore)."""
return [(test_case.__name__, ignore) for test_case, ignore in self.test_cases]

def _run_workflows(self, workflows):
"""Run a list of workflows to completion."""
# Start all workflows at once
for wfl in workflows:
wfl.run()
# Now run until all workflows are complete or until self.timeout is hit
t = 0
while t < self.timeout:
if all(not wfl.running for wfl in workflows):
# All workflows have completed
break
time.sleep(2)
t += 2
if t >= self.timeout:
raise CIError('workflow timeout')

def run(self, test_names=None):
"""Test run all test cases."""
results = {}
for test_case, ignore in self.test_cases:
if test_names is not None:
# Skip tests that aren't required to be run
if test_case.__name__ not in test_names:
continue
elif ignore:
# Skip ignored tests
continue
outer_workdir = os.path.join(INTEGRATION_TEST_DIR, uuid.uuid4().hex)
os.makedirs(outer_workdir)
msg0 = f'Starting test {test_case.__name__}'
msg1 = f'(running in "{outer_workdir}")'
count = max(len(msg0), len(msg1))
print('#' * count)
print(msg0)
print(msg1)
print('-' * count)
try:
with test_case(outer_workdir) as workflows:
self._run_workflows(workflows)
except CIError as error:
traceback.print_exc()
results[test_case.__name__] = error
print('------')
print('FAILED')
print('------')
else:
results[test_case.__name__] = None
print('------')
print('PASSED')
print('------')
# Only remove the outer_workdir if it passed
try:
shutil.rmtree(outer_workdir)
except OSError as err:
print(f'WARNING: Failed to remove {outer_workdir}')
print(err)

fails = sum(1 if result is not None else 0 for _, result in results.items())
self.display_results(results, fails)
return 1 if fails > 0 else 0

def display_results(self, results, fails):
"""Show the workflow results."""
print('######## WORKFLOW RESULTS ########')
passes = len(results) - fails
print(f'{passes} passes, {fails} fails')
for name, result in results.items():
error = result
if error is not None:
print(f'{name}: {error}')
print('##################################')


#
# Helper methods for running the test cases
#


def ch_image_delete(img_name):
"""Execute a ch-image delete [img_name]."""
try:
subprocess.check_call(['ch-image', 'delete', img_name])
except subprocess.CalledProcessError:
raise CIError(
f'failed when calling `ch-image delete {img_name}` to clean up'
) from None


def ch_image_list():
"""Execute a ch-image list and return the images."""
try:
res = subprocess.run(['ch-image', 'list'], stdout=subprocess.PIPE, check=True)
output = res.stdout.decode(encoding='utf-8')
images = [img for img in output.split('\n') if img]
return images
except subprocess.CalledProcessError as err:
raise CIError(f'failed when calling `ch-image list`: {err}') from err


def ci_assert(predicate, msg):
"""Assert that the predicate is True, or raise a CIError."""
if not predicate:
raise CIError(msg)


def check_path_exists(path):
"""Check that the specified path exists."""
ci_assert(os.path.exists(path), f'expected file "{path}" does not exist')


def check_completed(workflow):
"""Ensure the workflow has a completed status."""
ci_assert(workflow.status == 'Archived', f'Bad workflow status {workflow.status}')
Loading

0 comments on commit 145a596

Please sign in to comment.