Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

docs: replace prefect with Hamilton #36

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 3 additions & 7 deletions docs/configuration/environments.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ tasks:
environment:
image: docker.io/rootproject/root:6.28.04-ubuntu22.04
variables: <path to .env>
flow: prefect::SequentialTaskRunner
flow: sequential::local
- name: runStatsCode
type: "fasthep_flow.operators.BashOperator"
kwargs:
Expand All @@ -49,8 +49,8 @@ A `.env` file is a file specifying variables in the format `VARIABLE=VALUE` - on

The `flow`defines the orchestration of the workflow to use for this task. The
default orchestration is defined in the global settings, usually set to
`prefect::DaskTaskRunner`. In this case, we are using the
`prefect::SequentialTaskRunner` to run the task locally.
`dask::local`. In this case, we are using the `sequential::local` to run the
task locally.

````{note}
The `flow` setting has to use the same prefix as the global setting and has to match a defined orchestration.```
Expand All @@ -69,10 +69,6 @@ The full set of options for `environment` is:
environment:
variables: <path to .env> | { <key>: <value>, ... }
image: <image name>
workflow:
transform: prefect
kwargs:
runner: SequentialTaskRunner | DaskTaskRunner | any other supported value
resources: # see details in global settings
extra_data: TBC
````
2 changes: 1 addition & 1 deletion docs/configuration/global_settings.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ global:
prefix: h_
folder_rule: from_name | fixed | None
folder: None
flow: prefect::DaskTaskRunner
flow: dask::local
output:
directory: /path/to/output/dir
variables: <path to .env> | { <key>: <value>, ... }
Expand Down
10 changes: 5 additions & 5 deletions docs/examples/hello_world.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,17 @@ step-by-step.
## Creating a flow

The first thing that `fasthep-flow` does is to create a flow. This is done by
creating a `prefect.Flow` object, and adding a task for each step in the YAML
creating a `fasthep-flow` object, and adding a task for each task in the YAML
file. The task is created by the `fasthep-flow` operator, and the parameters are
passed to the task as keyword arguments.

We can do this ourselves by creating a flow and adding a task to it.

```python
from fasthep.operators import BashOperator
from prefect import Flow
from fasthep_flow.operators import BashOperator
from fasthep_flow import Workflow

flow = Flow("hello_world")
flow = Workflow("hello_world")
task = BashOperator(bash_command="echo 'Hello World!'")
flow.add_task(task)
```
Expand All @@ -57,7 +57,7 @@ first, and then running it on the specified cluster (e.g. HTCondor or Google
Cloud Composer). For now, let's just run it on a local Dask cluster.

```bash
fasthep-flow execute hello_world.yaml --workflow="{'transform':'prefect', 'kwargs':{'runner': 'DaskTaskRunner'}}"
fasthep-flow execute hello_world.yaml --workflow="{'transform':'hamilton', 'kwargs':{'adapter': 'DaskGraphAdapter'}}"
```

This will start a Dask cluster on your local machine, and run the flow on it.
Expand Down
40 changes: 6 additions & 34 deletions docs/orchestration.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,40 +6,12 @@ of tasks. Internally, a workflow is represented as a directed acyclic graph
workflows and to run on distributed resources, `fasthep-flow` allows for
conversions to other tools/frameworks to optimize the execution.

## Prefect
## Hamilton

[prefect](https://docs.prefect.io/latest/) uses TaskRunners to execute tasks.
The default executor is the [SequentialTaskRunner](#sequentialtaskrunner), which
runs all tasks in sequence on the local machine. A full list of executors can be
found in the
[prefect documentation](https://docs.prefect.io/latest/concepts/task-runners/).
After the internal workflow creation, the workflow is converted into a
[Hamilton DAG](https://hamilton.dagworks.io/en/latest/). Hamilton is a
general-purpose framework to write dataflows using regular Python functions.

Since prefect is not widely used in High Energy Particle Physics, let's go over
the task runners that are most relevant to us.
### Work in progress

### SequentialTaskRunner

The `SequentialTaskRunner` (see
[prefect docs](https://docs.prefect.io/latest/api-ref/prefect/task-runners/#prefect.task_runners.SequentialTaskRunner))
runs each task in a separate process on the local machine. This is the default
executor for `fasthep-flow`.

### ConcurrentTaskRunner

[prefect docs](https://docs.prefect.io/latest/api-ref/prefect/task-runners/#prefect.task_runners.ConcurrentTaskRunner))
runs each task in a separate thread on the local machine.

### DaskTaskRunner

The `DaskTaskRunner` (see
[prefect docs](https://prefecthq.github.io/prefect-dask/)) runs each task in a
separate process on a Dask cluster. A Dask cluster can be run on a local machine
or as a distributed cluster using a batch system (e.g. HTCondor, LSF, PBS, SGE,
SLURM) or other distributed systems such as LHCb's DIRAC. This is the
recommended executor for running `fasthep-flow` workflows on distributed
resources.

## Custom orchestration

Documentation on how to create custom orchestration can be found in the
[developer's corner](devcon/orchestration.md).
Hamilton allows for scaling execution via Dask, PySpark and Ray.
Loading