Skip to content

Commit

Permalink
Merge pull request #697 from lanl/client/issue667
Browse files Browse the repository at this point in the history
Move parser to client
  • Loading branch information
pagrubel authored Jul 28, 2023
2 parents 3dd2c8d + ed00a4e commit 3bade1c
Show file tree
Hide file tree
Showing 8 changed files with 521 additions and 314 deletions.
49 changes: 46 additions & 3 deletions beeflow/client/bee_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
from beeflow.common.config_driver import BeeConfig as bc
from beeflow.common.cli import NaturalOrderGroup
from beeflow.common.connection import Connection
from beeflow.common.parser import CwlParser
from beeflow.common.wf_data import generate_workflow_id


# Length of a shortened workflow ID
Expand Down Expand Up @@ -195,6 +197,13 @@ def is_parent(parent, path):
if not yaml_path.exists():
error_exit(f'YAML file {yaml} does not exist')

# Parse workflow
parser = CwlParser()
workflow_id = generate_workflow_id()
workflow, tasks = parser.parse_workflow(workflow_id, str(main_cwl_path),
job=str(yaml_path))
tasks = [jsonpickle.encode(task) for task in tasks]

cwl_indir = is_parent(wf_path, main_cwl_path)
yaml_indir = is_parent(wf_path, yaml_path)
# The CWL and YAML file are already in the workflow directory
Expand All @@ -214,6 +223,18 @@ def is_parent(parent, path):
wf_tarball = open(package_path, 'rb')
shutil.rmtree(tempdir_path)
else:
# Untar and parse workflow
tempdir_path = pathlib.Path(tempfile.mkdtemp())
tempdir_wf_path = unpackage(wf_path, tempdir_path)
main_cwl_path = pathlib.Path(tempdir_wf_path / main_cwl).resolve()
yaml_path = pathlib.Path(tempdir_wf_path / yaml).resolve()

parser = CwlParser()
workflow_id = generate_workflow_id()
workflow, tasks = parser.parse_workflow(workflow_id, str(main_cwl_path),
job=str(yaml_path))

shutil.rmtree(tempdir_path)
wf_tarball = open(wf_path, 'rb')
else:
error_exit(f'Workflow tarball {wf_path} cannot be found')
Expand All @@ -228,9 +249,9 @@ def is_parent(parent, path):
data = {
'wf_name': wf_name.encode(),
'wf_filename': os.path.basename(wf_path).encode(),
'main_cwl': os.path.basename(main_cwl),
'yaml': os.path.basename(yaml),
'workdir': workdir
'workdir': workdir,
'workflow': jsonpickle.encode(workflow),
'tasks': jsonpickle.encode(tasks, warn=True)
}
files = {
'workflow_archive': wf_tarball
Expand Down Expand Up @@ -322,6 +343,28 @@ def package(wf_path: pathlib.Path = typer.Argument(...,
return package_path


def unpackage(package_path, dest_path):
"""Unpackage a workflow tarball for parsing."""
package_str = str(package_path)
package_path = package_path.resolve()

if not package_str.endswith('.tgz'):
# No cleanup, maybe we should rm dest_path?
error_exit("Invalid package name, please use the beeclient package command")
wf_dir = package_str[:-4]

return_code = subprocess.run(['tar', '-C', dest_path, '-xf', package_path],
check=True).returncode
if return_code != 0:
# No cleanup, maybe we should rm dest_path?
error_exit("Unpackage failed")
else:
print(f"Package {package_str} unpackaged successfully")


return dest_path/wf_dir # noqa: Not an arithmetic operation


@app.command()
def listall():
"""List all worklfows."""
Expand Down
45 changes: 25 additions & 20 deletions beeflow/common/parser/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,15 @@
from schema_salad.exceptions import ValidationException # noqa (pylama can't find the exception)

from beeflow.common.config_driver import BeeConfig as bc
from beeflow.common.wf_data import (InputParameter,
from beeflow.common.wf_data import (Workflow,
Task,
InputParameter,
OutputParameter,
StepInput,
StepOutput,
Hint,
Requirement,
generate_workflow_id)
from beeflow.wf_manager.resources import wf_utils


if not bc.ready():
Expand Down Expand Up @@ -52,7 +53,7 @@ def __init__(self, *args):
class CwlParser:
"""Class for parsing CWL files."""

def __init__(self, wfi):
def __init__(self):
"""Initialize the CWL parser interface.
Sets the workflow interface for communication with the graph database.
Expand All @@ -61,7 +62,6 @@ def __init__(self, wfi):
self.path = None
self.steps = []
self.params = None
self._wfi = wfi

def parse_workflow(self, workflow_id, cwl_path, job=None):
"""Parse a CWL Workflow file and load it into the graph database.
Expand Down Expand Up @@ -99,7 +99,7 @@ def resolve_input(input_, type_):
:type input_: WorkflowInputParameter
:param type_: the workflow input type
:type type_: str
:rtype: str or int or float
:rtype: (Workflow, list of Task)
"""
# Use parsed input parameter for input value if it exists
input_id = _shortname(input_.id)
Expand All @@ -120,21 +120,23 @@ def resolve_input(input_, type_):
workflow_hints = self.parse_requirements(self.cwl.hints, as_hints=True)
workflow_requirements = self.parse_requirements(self.cwl.requirements)

self._wfi.initialize_workflow(workflow_id, workflow_name, workflow_inputs,
workflow_outputs, workflow_requirements, workflow_hints)
for step in self.cwl.steps:
self.parse_step(step)
workflow = Workflow(workflow_name, workflow_hints, workflow_requirements, workflow_inputs,
workflow_outputs, workflow_id)
tasks = [self.parse_step(step, workflow_id) for step in self.cwl.steps]

return self._wfi
return workflow, tasks

def parse_step(self, step):
def parse_step(self, step, workflow_id):
"""Parse a CWL step object.
Calling this to parse a CommandLineTool file without a corresponding
Workflow file will fail.
:param step: the CWL step object
:type step: WorkflowStep
:param workflow_id: the workflow ID
:type workflow_id: str
:rtype: Task
"""
# Parse CWL file specified by run field, else parse run field as inline CommandLineTool
if isinstance(step.run, str):
Expand All @@ -160,9 +162,8 @@ def parse_step(self, step):
step_stdout = step_cwl.stdout
step_stderr = step_cwl.stderr

self._wfi.add_task(step_name, base_command=step_command, inputs=step_inputs,
outputs=step_outputs, requirements=step_requirements,
hints=step_hints, stdout=step_stdout, stderr=step_stderr)
return Task(step_name, step_command, step_hints, step_requirements, step_inputs,
step_outputs, step_stdout, step_stderr, workflow_id)

def parse_job(self, job):
"""Parse a CWL input job file.
Expand Down Expand Up @@ -306,16 +307,16 @@ def parse_requirements(self, requirements, as_hints=False):
return reqs
if as_hints:
for req in requirements:
items = {k: v for k, v in req.items() if k != "class"}
# Load in files at parse time
items = {k: str(v) for k, v in req.items() if k != "class"}
# Load in the dockerfile at parse time
if 'dockerFile' in items:
self._read_requirement_file('dockerFile', items)
if 'beeflow:bindMounts' in items:
self._read_requirement_file('beeflow:bindMounts', items)
reqs.append(Hint(req['class'], items))
else:
for req in requirements:
reqs.append(Requirement(req.class_, {k: v for k, v in vars(req).items()
reqs.append(Requirement(req.class_, {k: str(v) for k, v in vars(req).items()
if k not in ("extension_fields",
"loadingOptions", "class_")
and v is not None}))
Expand Down Expand Up @@ -356,10 +357,14 @@ def parse_args(args=None):
def main():
"""Run the parser on a CWL Workflow and job file directly."""
wf_id = generate_workflow_id()
wfi = wf_utils.get_workflow_interface(wf_id)
parser = CwlParser(wfi)
parser = CwlParser()
args = parse_args()
parser.parse_workflow(wf_id, args.wf_file, args.inputs)
workflow, tasks = parser.parse_workflow(wf_id, args.wf_file, args.inputs)
print("Parsed workflow:")
print(workflow)

print("Parsed tasks:")
print(tasks)


if __name__ == "__main__":
Expand Down
4 changes: 2 additions & 2 deletions beeflow/common/wf_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,9 +121,9 @@ def __init__(self, name, base_command, hints, requirements, inputs, outputs, std
:param workflow_id: the workflow ID
:type workflow_id: str
:param task_id: the task ID
:type task_id: str
:type task_id: str, optional
:param workdir: the working directory from which to get and store data
:type workdir: path
:type workdir: path, optional
"""
self.name = name
self.base_command = base_command
Expand Down
75 changes: 20 additions & 55 deletions beeflow/common/wf_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@

import re

from beeflow.common.wf_data import Workflow, Task


class WorkflowInterface:
"""Interface for manipulating workflows."""
Expand All @@ -28,36 +26,22 @@ def reconnect(self):
"""Reconnect to the graph database using stored credentials."""
raise NotImplementedError()

def initialize_workflow(self, workflow_id, name, inputs, outputs, requirements=None,
hints=None):
def initialize_workflow(self, workflow):
"""Begin construction of a BEE workflow.
:param workflow_id: the pre-generated workflow ID
:type workflow_id: str
:param name: the workflow name
:type name: str
:param inputs: the inputs to the workflow
:type inputs: list of InputParameter
:param outputs: the outputs of the workflow
:type outputs: list of OutputParameter
:param requirements: the workflow requirements
:type requirements: list of Requirement
:param hints: the workflow hints (optional requirements)
:type hints: list of Hint
:rtype: Workflow
:param workflow: the workflow object
:type workflow: Workflow
"""
if self.workflow_loaded():
raise RuntimeError("attempt to re-initialize existing workflow")
if requirements is None:
requirements = []
if hints is None:
hints = []
if workflow.requirements is None:
workflow.requirements = []
if workflow.hints is None:
workflow.hints = []

workflow = Workflow(name, hints, requirements, inputs, outputs, workflow_id)
self._workflow_id = workflow_id
self._workflow_id = workflow.id
# Load the new workflow into the graph database
self._gdb_interface.initialize_workflow(workflow)
return workflow

def execute_workflow(self):
"""Begin execution of a BEE workflow."""
Expand All @@ -82,43 +66,24 @@ def finalize_workflow(self):
self._workflow_id = None
self._gdb_interface.cleanup()

def add_task(self, name, base_command, inputs, outputs, requirements=None, hints=None,
stdout=None, stderr=None):
def add_task(self, task):
"""Add a new task to a BEE workflow.
:param name: the name given to the task
:type name: str
:param base_command: the base command for the task
:type base_command: str or list of str
:param requirements: the task-specific requirements
:type requirements: list of Requirement
:param hints: the task-specific hints (optional requirements)
:type hints: list of Hint
:param inputs: the task inputs
:type inputs: list of StepInput
:param outputs: the task outputs
:type outputs: list of StepOutput
:param stdout: the name of the file to which to redirect stdout
:type stdout: str
:param stderr: the name of the file to which to redirect stderr
:type stderr: str
:rtype: Task
:param task: the name of the file to which to redirect stderr
:type task: Task
"""
# Immutable default arguments
if inputs is None:
inputs = []
if outputs is None:
outputs = []
if requirements is None:
requirements = []
if hints is None:
hints = []

task = Task(name, base_command, hints, requirements, inputs, outputs, stdout, stderr,
self._workflow_id)
if task.inputs is None:
task.inputs = []
if task.outputs is None:
task.outputs = []
if task.requirements is None:
task.requirements = []
if task.hints is None:
task.hints = []

# Load the new task into the graph database
self._gdb_interface.load_task(task)
return task

def restart_task(self, task, checkpoint_file):
"""Restart a failed BEE workflow task.
Expand Down
Loading

0 comments on commit 3bade1c

Please sign in to comment.