Skip to content

Commit

Permalink
Merge pull request #776 from lanl/issue750/fix-checkpoint-restart
Browse files Browse the repository at this point in the history
Fix Checkpoint/Restart
  • Loading branch information
pagrubel authored Feb 14, 2024
2 parents 3e03f5e + 9ce7239 commit 228ec58
Show file tree
Hide file tree
Showing 7 changed files with 41 additions and 19 deletions.
8 changes: 7 additions & 1 deletion beeflow/common/parser/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,13 @@ def parse_requirements(self, requirements, as_hints=False):
return reqs
if as_hints:
for req in requirements:
items = {k: str(v) for k, v in req.items() if k != "class"}
items = {}
for k, v in req.items():
if k != 'class':
if isinstance(v, (int, float)):
items[k] = v
else:
items[k] = str(v)
# Load in the dockerfile at parse time
if 'dockerFile' in items:
self._read_requirement_file('dockerFile', items)
Expand Down
5 changes: 3 additions & 2 deletions beeflow/common/wf_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ def restart_task(self, task, checkpoint_file):
:param task: the task to restart
:type task: Task
:param checkpoint_file: the task checkpoint file
:rtype: Task or None
"""
for hint in task.hints:
Expand All @@ -102,8 +103,8 @@ def restart_task(self, task, checkpoint_file):
hint.params["num_tries"] -= 1
hint.params["bee_checkpoint_file__"] = checkpoint_file
break
state = self.get_task_state(task)
self.set_task_state(task, f"FAILED RESTART: {state}")
self.set_task_state(task, "FAILED")
self.set_workflow_state("FAILED")
return None
else:
raise ValueError("invalid task for checkpoint restart")
Expand Down
12 changes: 8 additions & 4 deletions beeflow/task_manager/background.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,10 +89,14 @@ def update_jobs():
log.info(f'state: {new_job_state}')
log.info(f'TIMELIMIT/TIMEOUT task_checkpoint: {task_checkpoint}')
if task_checkpoint:
checkpoint_file = utils.get_restart_file(task_checkpoint, task.workdir)
task_info = {'checkpoint_file': checkpoint_file, 'restart': True}
update_task_state(task.workflow_id, task.id, new_job_state,
task_info=task_info)
try:
checkpoint_file = utils.get_restart_file(task_checkpoint, task.workdir)
task_info = {'checkpoint_file': checkpoint_file, 'restart': True}
update_task_state(task.workflow_id, task.id, new_job_state,
task_info=task_info)
except utils.CheckpointRestartError as err:
log.error(f'Checkpoint restart failed for {task.name} ({task.id}): {err}')
update_task_state(task.workflow_id, task.id, 'FAILED')
else:
update_task_state(task.workflow_id, task.id, new_job_state)
else:
Expand Down
23 changes: 16 additions & 7 deletions beeflow/task_manager/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,22 +67,31 @@ def wfm_conn():
return Connection(paths.wfm_socket())


class CheckpointRestartError(Exception):
"""Exception to be thrown on checkpoint-restart failure."""


def get_restart_file(task_checkpoint, task_workdir):
"""Find latest checkpoint file."""
if 'file_regex' not in task_checkpoint:
raise RuntimeError('file_regex is required for checkpointing')
raise CheckpointRestartError('file_regex is required for checkpointing')
if 'file_path' not in task_checkpoint:
raise RuntimeError('file_path is required for checkpointing')
raise CheckpointRestartError('file_path is required for checkpointing')
file_regex = task_checkpoint['file_regex']
file_path = Path(task_workdir, task_checkpoint['file_path'])
regex = re.compile(file_regex)
checkpoint_files = [
Path(file_path, fname) for fname in os.listdir(file_path)
if regex.match(fname)
]
try:
checkpoint_files = [
Path(file_path, fname) for fname in os.listdir(file_path)
if regex.match(fname)
]
except FileNotFoundError:
raise CheckpointRestartError(
f'Checkpoint file_path ("{file_path}") not found'
) from None
checkpoint_files.sort(key=os.path.getmtime)
try:
checkpoint_file = checkpoint_files[-1]
return str(checkpoint_file)
except IndexError:
raise RuntimeError('Missing checkpoint file for task') from None
raise CheckpointRestartError('Missing checkpoint file for task') from None
6 changes: 4 additions & 2 deletions beeflow/wf_manager/resources/wf_update.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,11 +96,13 @@ def put(self):
task_info = jsonpickle.decode(data['task_info'])
checkpoint_file = task_info['checkpoint_file']
new_task = wfi.restart_task(task, checkpoint_file)
db.workflows.add_task(new_task.id, wf_id, new_task.name, "WAITING")
if new_task is None:
log.info('No more restarts')
wf_state = wfi.get_task_state(task)
wf_state = wfi.get_workflow_state()
wf_utils.update_wf_status(wf_id, 'Failed')
db.workflows.update_workflow_state(wf_id, 'Failed')
return make_response(jsonify(status=f'Task {task_id} set to {job_state}'))
db.workflows.add_task(new_task.id, wf_id, new_task.name, "WAITING")
# Submit the restart task
tasks = [new_task]
wf_utils.schedule_submit_tasks(wf_id, tasks)
Expand Down
4 changes: 2 additions & 2 deletions ci/test_workflows/checkpoint-too-long/workflow.cwl
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ steps:
hints:
beeflow:CheckpointRequirement:
enabled: true
file_path: checkpoint_output
container_path: checkpoint_output
file_path: .
container_path: .
file_regex: backup[0-9]*.crx
restart_parameters: -R
num_tries: 1
Expand Down
2 changes: 1 addition & 1 deletion ci/test_workflows/clamr-wf-checkpoint/clamr_job_long.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"grid_resolution": 32,
"max_levels": 3,
"time_steps": 80000,
"time_steps": 150000,
"steps_between_outputs": 10,
"steps_between_graphics": 25,
"graphics_type": "png",
Expand Down

0 comments on commit 228ec58

Please sign in to comment.