Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Use StorageManager in execute_DAG #258

Draft
wants to merge 95 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
95 commits
Select commit Hold shift + click to select a range
ffdb901
PathLike wrapper/cache for ExternalStorage
dwhswenson Apr 26, 2023
4ad7fb1
Merge branch 'main' into shared-object-v2
dwhswenson Apr 26, 2023
eb19e0f
mypy
dwhswenson Apr 26, 2023
8e6a78c
Merge branch 'shared-object-v2' of github.com:OpenFreeEnergy/gufe int…
dwhswenson Apr 26, 2023
3be13c0
docstrings
dwhswenson Apr 27, 2023
dda02e2
Merge branch 'main' of github.com:OpenFreeEnergy/gufe into shared-obj…
dwhswenson May 17, 2023
472151e
Add StorageManager code
dwhswenson May 30, 2023
b692c2f
rename to Stagine
dwhswenson May 30, 2023
7432ff4
Add tests for _delete_empty_dirs
dwhswenson May 30, 2023
319d0d0
Storage docs
dwhswenson May 31, 2023
5650609
outline of storage manager tests
dwhswenson May 31, 2023
ddbbd19
minor improvements on staging directory
dwhswenson May 31, 2023
c5ce48a
first storage lifecycle test works
dwhswenson May 31, 2023
236b263
Merge branch 'main' of github.com:OpenFreeEnergy/gufe into shared-obj…
dwhswenson May 31, 2023
b805aac
cleanup mypy
dwhswenson May 31, 2023
ed5e83c
change to unit taking in the label
dwhswenson May 31, 2023
6181039
lots of updates; switched to harness for tests
dwhswenson Jun 5, 2023
1880f73
Big reorg for shared overlapping staging
dwhswenson Jun 6, 2023
1e4ca2c
remove _storage_path_conflict
dwhswenson Jun 6, 2023
a6d26f3
docs & types
dwhswenson Jun 6, 2023
aabbc33
docs, types, logging
dwhswenson Jun 6, 2023
b4d73b3
finish TestHoldingOverlapsPermanentStorageManager
dwhswenson Jun 6, 2023
7af006e
mypy
dwhswenson Jun 6, 2023
58a58bc
test_repr
dwhswenson Jun 6, 2023
8e429f5
renaming around DAGContextManager
dwhswenson Jun 6, 2023
b70df48
holding => staging
dwhswenson Jun 7, 2023
08e3ac2
finish docs (I think?)
dwhswenson Jun 7, 2023
ca7871b
remove completed TODO
dwhswenson Jun 7, 2023
2aa0616
start to testing edge case logging
dwhswenson Jun 9, 2023
7c03dcd
Update stagingdirectory.py
richardjgowers Jun 12, 2023
383075e
tests for single_file_transfer logging
dwhswenson Jun 12, 2023
6365398
tests for read-only transfers
dwhswenson Jun 16, 2023
d35bd60
fix repr and cleanup tests
dwhswenson Jun 16, 2023
7cc10f9
test for permanent transfer to external
dwhswenson Jun 16, 2023
ab025f1
test for Permanent delete staging
dwhswenson Jun 17, 2023
cd70ab2
Add test for missing file on cleanup
dwhswenson Jun 17, 2023
ac1b1d0
Merge branch 'shared-object-v2' of github.com:OpenFreeEnergy/gufe int…
dwhswenson Jun 17, 2023
ea054be
Merge branch 'main' of github.com:OpenFreeEnergy/gufe into shared-obj…
dwhswenson Jun 22, 2023
90f2597
get_other_shared to private
dwhswenson Jun 22, 2023
a2e05b2
Merge branch 'main' into shared-object-v2
dwhswenson Jul 6, 2023
80eccc4
Merge branch 'main' into shared-object-v2
dwhswenson Aug 28, 2023
e9ed7a8
Merge branch 'main' of github.com:OpenFreeEnergy/gufe into shared-obj…
dwhswenson Sep 8, 2023
b4e1d42
Merge branch 'main' into shared-object-v2
dotsdl Sep 8, 2023
2b070ca
Merge branch 'shared-object-v2' of github.com:OpenFreeEnergy/gufe int…
dwhswenson Sep 9, 2023
4fd4a66
Merge branch 'main' into shared-object-v2
dotsdl Sep 12, 2023
5e56461
Merge branch 'main' into shared-object-v2
dotsdl Sep 19, 2023
73d3a1e
Merge branch 'main' into shared-object-v2
dwhswenson Nov 3, 2023
524cc6e
Merge branch 'main' of github.com:OpenFreeEnergy/gufe into shared-obj…
dwhswenson Dec 1, 2023
dd1b6dc
updates from other branch
dwhswenson Dec 1, 2023
a575dd3
make mypy happy
dwhswenson Dec 1, 2023
24d3820
Use StorageManager/StagingPath in execute_DAG
dwhswenson Dec 1, 2023
0c973b7
fix up protocol tests
dwhswenson Dec 1, 2023
9260a2d
Add __eq__ for ext resources
dwhswenson Dec 1, 2023
d97fca4
Test: FileStorage.store_path for nested target
dwhswenson Dec 1, 2023
a01feaf
add tests for ext resource __eq__
dwhswenson Dec 1, 2023
ba6fcff
Merge branch 'main' into shared-object-v2
dwhswenson Dec 1, 2023
11810eb
make mypy happy
dwhswenson Dec 1, 2023
908329c
some docstrings
dwhswenson Dec 1, 2023
d8507d4
Merge branch 'ext-resource-cleanup' into staging-execute-dag
dwhswenson Dec 1, 2023
a659afa
staging serialization
dwhswenson Dec 1, 2023
32b1fe4
tests for staging serialization
dwhswenson Dec 1, 2023
5d0df5f
pep8
dwhswenson Dec 1, 2023
1418aee
Merge branch 'shared-object-v2' of github.com:OpenFreeEnergy/gufe int…
dwhswenson Dec 1, 2023
e057332
pep8
dwhswenson Dec 1, 2023
bdb37bb
pep8
dwhswenson Dec 1, 2023
e39a790
pep8
dwhswenson Dec 1, 2023
1cfe910
StagingDirectory -> StagingRegistry
dwhswenson Dec 4, 2023
78e003b
remove prefix; remove get_other_shared
dwhswenson Dec 7, 2023
265e786
delete_empty_dirs => keep_empty_dirs
dwhswenson Dec 7, 2023
006d787
Merge branch 'main' into shared-object-v2
dwhswenson Dec 11, 2023
ce12326
Add logging to not clean up registered directory
dwhswenson Dec 11, 2023
9afeb2f
Merge branch 'shared-object-v2' of github.com:OpenFreeEnergy/gufe int…
dwhswenson Dec 11, 2023
b3fea47
Merge branch 'shared-object-v2' into staging-execute-dag
dwhswenson Dec 11, 2023
2420f6c
remove unneeded comment
dwhswenson Dec 11, 2023
fe64baa
yield Context instead of tuple
dwhswenson Dec 11, 2023
288e6cc
ReproduceOldBehaviorStorageManager.from_old_args
dwhswenson Dec 11, 2023
bc1f36f
tests for ReproduceOldBehaviorStorageManager.from_old_args
dwhswenson Dec 11, 2023
aaa2aab
StagingPath.fspath => StagingPath.as_path()
dwhswenson Dec 13, 2023
cf60d1b
pep8
dwhswenson Dec 13, 2023
d0af5cf
Merge branch 'shared-object-v2' into staging-execute-dag
dwhswenson Dec 14, 2023
4861b3e
Add tests for nested files in shared
dwhswenson Dec 14, 2023
c0a37b3
tests for empty directories
dwhswenson Dec 14, 2023
3d7ca7b
tests for the directories in the overlap example
dwhswenson Dec 14, 2023
529b8a6
Merge branch 'staging-execute-dag' into staging-serialization
dwhswenson Dec 14, 2023
c412611
update for changes in preceding PRs
dwhswenson Dec 14, 2023
bf49c34
First version of the change storage backend story
dwhswenson Dec 14, 2023
10a40eb
docstrings; sketch out yet ANOTHER user story
dwhswenson Dec 15, 2023
4b2510c
minor docs update
dwhswenson Dec 15, 2023
3390bb0
test new codec added in staging serialization
dwhswenson Dec 21, 2023
da9955e
remove fspath from StagingRegistry
dwhswenson Dec 22, 2023
b72e6f9
Merge branch 'shared-object-v2' into staging-execute-dag
dwhswenson Dec 22, 2023
409ec06
Merge branch 'staging-execute-dag' into staging-serialization
dwhswenson Dec 22, 2023
b22445b
Attach StorageSerialization to StorageManager
dwhswenson Dec 22, 2023
8534d71
update for some review comments
dwhswenson Jan 17, 2024
01db5b1
Merge branch 'main' into staging-execute-dag
dwhswenson Jan 22, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 102 additions & 1 deletion docs/guide/storage.rst
Original file line number Diff line number Diff line change
@@ -1,5 +1,106 @@
The GUFE Storage System
=======================

Storage lifetimes
-----------------

Storage docs.
The storage system in GUFE is heavily tied to the GUFE protocol system. The
key concept here is that the different levels of the GUFE protocol system;
campaign, DAG, and unit; inherently imply different lifetimes for the data
that is created. Those different lifetimes define the stages of the GUFE
storage system.

In an abstract sense, as used by protocol developers, these three levels
correspond to three lifetimes of data:

* ``scratch``: This is temporary data that is only needed for the lifetime
of a :class:`.ProtocolUnit`. This data is not guaranteed to be available
beyond the single :class:`.ProtocolUnit` where it is created, but may be
reused within that :class:`.ProtocolUnit`.
* ``shared``: This is data that is shared between different units in a
:class:`.ProtocolDAG`. For example, a single equilibration stage might be
shared between multiple production runs. The output snapshot of the
equilibration would be suitable for as something to put in ``shared``
data. This data is guaranteed to be present from when it is created until
the end of the :class:`.ProtocolDAG`, but is not guaranteed to exist after
the :class:`.ProtocolDAG` terminates.
* ``permanent``: This is the data that will be needed beyond the scope of a
single rough estimate of the calculation. This could include anything that
an extension of the simulation would require, or things that require
network-scale analysis. Anything stored here will be usable after the
calculation has completed.

The ``scratch`` area is always a local directory. However, ``shared`` and
``permanent`` can be external (remote) resources, using the
:class:`.ExternalResource` API.

As a practical matter, the GUFE storage system can be handled with a
:class:`.StorageManager`. This automates some aspects of the transfer
between stages of the GUFE storage system, and simplifies the API for
protocol authors. In detail, this provides protocol authors with
``PathLike`` objects for ``scratch``, ``shared``, and ``permanent``. All
three of these objects actually point to special subdirectories of the
local scratch space for a specific unit, but are managed by context
managers at the executor level, which handle the process of moving objects
from local staging directories to the actual ``shared`` and ``permanent``
locations, which can be external resources.


External resource utilities
---------------------------

For flexible data storage, GUFE defines the :class:`.ExternalResource` API,
which allows data be stored/loaded in a way that is agnostic to the
underlying data store, as long as the store can be represented as a
key-value store. Withing GUFE, we provide several examples, including
:class:`.FileStorage` and :class:`.MemoryStorage` (primarily useful for
testing.) The specific ``shared`` and ``permanent`` resources, as provided
to the executor, can be instances of an :class:`.ExternalResource`.

.. note::

The ``shared`` space must be a resource where an uploaded object is
instantaneously available, otherwise later protocol units may fail if the
shared result is unavailable. This means that files or approaches based
on ``scp`` or ``sftp`` are fine, but things like cloud storage, where the
existence of a new document may take time to propagate through the
network, are not recommended for ``shared``.


Details: Manangement of the Storage Lifetime
--------------------------------------------

The concepts of the storage lifetimes are important for protocol authors,
but details of implementation are left to the specific executor. In order to
facilitate correct treatment of the storage lifecycle, GUFE provides a few
helpers. The information in this section is mostly of interest to authors of
executors. The helpers are:

* :class:`.StorageManager`: This is the overall façade interface for
interacting with the rest of the storage lifecycle tools. It provides two
methods to generate context managers; one for the :class:`.ProtocolDAG`
level of the lifecycle, and one for the :class:`.ProtocoUnit` level of the
lifecycle. This class is designed for the use case that the entire DAG is
run in serial within a single process. Subclasses of this can be created
for other execution architectures, where the main logic changes would be
in the methods that return those context managers.
* :class:`.StagingRegistry`: This handles the logic around staging paths
within a :class:`.ProtocolUnit`. Think of this as an abstract
representation of a local directory. Paths within it register with it, and
it handles deletion of the temporary local files when not needed, as well
as the download of remote files when necessary for reading. There are two
important subclasses of this: :class:`.SharedStaging` for a ``shared``
resource, and :class:`.PermanentStaging` for a ``permanent`` resource.
* :class:`.StagingPath`: This represents a file within the
:class:`.StagingRegistry`. It contains both the key (label) used in the
key-value store, as well as the actual local path to the file. When its
``__fspath__`` method is called, it registers itself with its
:class:`.StagingRegistry`, which handles managing it over its lifecycle.

In practice, the executor uses the :class:`.StorageManager` to create a
:class:`.DAGContextManager` at the level of a DAG, and then uses the
:class:`.DAGContextManager` to create a context to run a unit. That context
creates a :class:`.SharedStaging` and a :class:`.PermanentStaging`
associated with the specific unit. Those staging directories, with the
scratch directory, are provided to the :class:`.ProtocolUnit`, so that
these are the only objects protocol authors need to interact with.
187 changes: 146 additions & 41 deletions gufe/protocols/protocoldag.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,13 @@
ProtocolUnit, ProtocolUnitResult, ProtocolUnitFailure, Context
)

from ..storage.storagemanager import StorageManager
from ..storage.externalresource.filestorage import FileStorage
from ..storage.externalresource.base import ExternalStorage
Comment on lines +20 to +22
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we fix the namespace of storage so these are all available at the same level? from ..storage import StorageManager, FileStorage, etc


import logging
_logger = logging.getLogger(__name__)


class DAGMixin:
_protocol_units: list[ProtocolUnit]
Expand Down Expand Up @@ -345,9 +352,67 @@ def _from_dict(cls, dct: dict):
return cls(**dct)


class ReproduceOldBehaviorStorageManager(StorageManager):
# Default behavior has scratch at {dag_label}/scratch/{unit_label} and
# shared at {dag_label}/{unit_label}. This little class makes changes
# that get us back to the original behavior of this class: scratch at
# {dag_label}/scratch_{unit_label} and shared at
# {dag_label}/shared_{unit_label}.
def _scratch_loc(self, dag_label, unit_label, attempt):
return (
self.scratch_root
/ f"{dag_label}/scratch_{unit_label}_attempt_{attempt}"
)

def make_label(self, dag_label, unit_label, attempt):
return f"{dag_label}/shared_{unit_label}_attempt_{attempt}"

@classmethod
def from_old_args(
cls,
shared_basedir: PathLike,
scratch_basedir: PathLike, *,
keep_shared: bool = False,
keep_scratch: bool = False,
):
"""
Create an new storage manager based on the old execute_DAG args.

Parameters
----------
shared_basedir : Path
Filesystem path to use for shared space that persists across whole DAG
execution. Used by a `ProtocolUnit` to pass file contents to dependent
class:``ProtocolUnit`` instances.
scratch_basedir : Path
Filesystem path to use for `ProtocolUnit` `scratch` space.
keep_shared : bool
If True, don't remove shared directories for `ProtocolUnit`s after
the `ProtocolDAG` is executed.
keep_scratch : bool
If True, don't remove scratch directories for a `ProtocolUnit` after
it is executed.
"""
# doing this here makes it easier to test than putting in
# execute_DAG
shared_basedir = Path(shared_basedir)
shared = FileStorage(shared_basedir.parent)
storage_manager = cls(
scratch_root=scratch_basedir,
shared_root=shared,
permanent_root=shared,
keep_scratch=keep_scratch,
keep_shared=keep_shared,
keep_staging=True,
keep_empty_dirs=True,
staging=Path(""), # use the actual directories as the staging
)
return storage_manager


def execute_DAG(protocoldag: ProtocolDAG, *,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are you keeping this for comparison for the PR? I don't think we need to keep the old behaviour around

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we told Joe that the API was stable enough that he could use it in workflows. So yeah, I spent a lot of time ensuring that there would be a code path that doesn't break the current API.

shared_basedir: Path,
scratch_basedir: Path,
shared_basedir: PathLike,
scratch_basedir: PathLike,
keep_shared: bool = False,
keep_scratch: bool = False,
raise_error: bool = True,
Expand Down Expand Up @@ -385,52 +450,92 @@ def execute_DAG(protocoldag: ProtocolDAG, *,
The result of executing the `ProtocolDAG`.

"""
# the directory given as shared_root is actually the directory for this
# DAG; the "shared_root" for the storage manager is the parent. We'll
# force permanent to be the same.
storage_manager = ReproduceOldBehaviorStorageManager.from_old_args(
shared_basedir=shared_basedir,
scratch_basedir=scratch_basedir,
keep_shared=keep_shared,
keep_scratch=keep_scratch
)
dag_label = shared_basedir.name
return new_execute_DAG(protocoldag, dag_label, storage_manager,
raise_error, n_retries)


def new_execute_DAG( # TODO: this is a terrible name
protocoldag: ProtocolDAG,
dag_label: str,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is a DAG label now required?

and why can't the label just be taken off the input DAG?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Breaks current behavior, where the equivalent of the DAG label is given by the user as the -o from quickrun.

storage_manager: StorageManager,
raise_error: bool = False,
n_retries: int = 0
) -> ProtocolDAGResult:
"""
Locally execute a full :class:`ProtocolDAG` in serial and in-process.

Alternate input signature to generalize execute_DAG

Parameters
----------
protocoldag : ProtocolDAG
The :class:``ProtocolDAG`` to execute.
dag_label : str
Label to use for the DAG
storage_manager : StorageManager
The :class:`.StorageManager` to handle storing files.
raise_error : bool
If True, raise an exception if a ProtocolUnit fails, default True
if False, any exceptions will be stored as `ProtocolUnitFailure`
objects inside the returned `ProtocolDAGResult`
n_retries : int
the number of times to attempt, default 0, i.e. try once and only once

Returns
-------
ProtocolDAGResult
The result of executing the `ProtocolDAG`.
"""
# this simplifies setup of execute_DAG by allowing you to directly
# provide the storage_manager; the extra options in the old one just
# configure the storage_manager
if n_retries < 0:
raise ValueError("Must give positive number of retries")

# iterate in DAG order
results: dict[GufeKey, ProtocolUnitResult] = {}
all_results = [] # successes AND failures
shared_paths = []
for unit in protocoldag.protocol_units:
# translate each `ProtocolUnit` in input into corresponding
# `ProtocolUnitResult`
inputs = _pu_to_pur(unit.inputs, results)

attempt = 0
while attempt <= n_retries:
shared = shared_basedir / f'shared_{str(unit.key)}_attempt_{attempt}'
shared_paths.append(shared)
shared.mkdir()

scratch = scratch_basedir / f'scratch_{str(unit.key)}_attempt_{attempt}'
scratch.mkdir()

context = Context(shared=shared,
scratch=scratch)

# execute
result = unit.execute(
context=context,
raise_error=raise_error,
**inputs)
all_results.append(result)

if not keep_scratch:
shutil.rmtree(scratch)

if result.ok():
# attach result to this `ProtocolUnit`
results[unit.key] = result
break
attempt += 1

if not result.ok():
break

if not keep_shared:
for shared_path in shared_paths:
shutil.rmtree(shared_path)
with storage_manager.running_dag(dag_label) as dag_ctx:
for unit in protocoldag.protocol_units:
# import pdb; pdb.set_trace()
attempt = 0
while attempt <= n_retries:
# translate each `ProtocolUnit` in input into corresponding
# `ProtocolUnitResult`
inputs = _pu_to_pur(unit.inputs, results)

label = storage_manager.make_label(dag_label, unit.key,
attempt=attempt)
with dag_ctx.running_unit(
dag_label, unit.key, attempt=attempt
) as context:
_logger.info(f"Starting unit {label}")
_logger.info(context)
result = unit.execute(
context=context,
raise_error=raise_error,
**inputs)
all_results.append(result)

if result.ok():
# attach result to this `ProtocolUnit`
results[unit.key] = result
break
attempt += 1

if not result.ok():
break

return ProtocolDAGResult(
name=protocoldag.name,
Expand Down
5 changes: 4 additions & 1 deletion gufe/protocols/protocolunit.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
GufeTokenizable, GufeKey, TOKENIZABLE_REGISTRY
)

from ..storage.stagingregistry import StagingPath


@dataclass
class Context:
Expand All @@ -31,7 +33,8 @@ class Context:

"""
scratch: PathLike
shared: PathLike
shared: StagingPath
permanent: StagingPath


def _list_dependencies(inputs, cls):
Expand Down
Loading
Loading