diff --git a/docs/guide/storage.rst b/docs/guide/storage.rst index 39413208..165b9640 100644 --- a/docs/guide/storage.rst +++ b/docs/guide/storage.rst @@ -1,5 +1,106 @@ The GUFE Storage System ======================= +Storage lifetimes +----------------- -Storage docs. +The storage system in GUFE is heavily tied to the GUFE protocol system. The +key concept here is that the different levels of the GUFE protocol system; +campaign, DAG, and unit; inherently imply different lifetimes for the data +that is created. Those different lifetimes define the stages of the GUFE +storage system. + +In an abstract sense, as used by protocol developers, these three levels +correspond to three lifetimes of data: + +* ``scratch``: This is temporary data that is only needed for the lifetime + of a :class:`.ProtocolUnit`. This data is not guaranteed to be available + beyond the single :class:`.ProtocolUnit` where it is created, but may be + reused within that :class:`.ProtocolUnit`. +* ``shared``: This is data that is shared between different units in a + :class:`.ProtocolDAG`. For example, a single equilibration stage might be + shared between multiple production runs. The output snapshot of the + equilibration would be suitable for as something to put in ``shared`` + data. This data is guaranteed to be present from when it is created until + the end of the :class:`.ProtocolDAG`, but is not guaranteed to exist after + the :class:`.ProtocolDAG` terminates. +* ``permanent``: This is the data that will be needed beyond the scope of a + single rough estimate of the calculation. This could include anything that + an extension of the simulation would require, or things that require + network-scale analysis. Anything stored here will be usable after the + calculation has completed. + +The ``scratch`` area is always a local directory. However, ``shared`` and +``permanent`` can be external (remote) resources, using the +:class:`.ExternalResource` API. + +As a practical matter, the GUFE storage system can be handled with a +:class:`.StorageManager`. This automates some aspects of the transfer +between stages of the GUFE storage system, and simplifies the API for +protocol authors. In detail, this provides protocol authors with +``PathLike`` objects for ``scratch``, ``shared``, and ``permanent``. All +three of these objects actually point to special subdirectories of the +local scratch space for a specific unit, but are managed by context +managers at the executor level, which handle the process of moving objects +from local staging directories to the actual ``shared`` and ``permanent`` +locations, which can be external resources. + + +External resource utilities +--------------------------- + +For flexible data storage, GUFE defines the :class:`.ExternalResource` API, +which allows data be stored/loaded in a way that is agnostic to the +underlying data store, as long as the store can be represented as a +key-value store. Withing GUFE, we provide several examples, including +:class:`.FileStorage` and :class:`.MemoryStorage` (primarily useful for +testing.) The specific ``shared`` and ``permanent`` resources, as provided +to the executor, can be instances of an :class:`.ExternalResource`. + +.. note:: + + The ``shared`` space must be a resource where an uploaded object is + instantaneously available, otherwise later protocol units may fail if the + shared result is unavailable. This means that files or approaches based + on ``scp`` or ``sftp`` are fine, but things like cloud storage, where the + existence of a new document may take time to propagate through the + network, are not recommended for ``shared``. + + +Details: Manangement of the Storage Lifetime +-------------------------------------------- + +The concepts of the storage lifetimes are important for protocol authors, +but details of implementation are left to the specific executor. In order to +facilitate correct treatment of the storage lifecycle, GUFE provides a few +helpers. The information in this section is mostly of interest to authors of +executors. The helpers are: + +* :class:`.StorageManager`: This is the overall façade interface for + interacting with the rest of the storage lifecycle tools. It provides two + methods to generate context managers; one for the :class:`.ProtocolDAG` + level of the lifecycle, and one for the :class:`.ProtocoUnit` level of the + lifecycle. This class is designed for the use case that the entire DAG is + run in serial within a single process. Subclasses of this can be created + for other execution architectures, where the main logic changes would be + in the methods that return those context managers. +* :class:`.StagingRegistry`: This handles the logic around staging paths + within a :class:`.ProtocolUnit`. Think of this as an abstract + representation of a local directory. Paths within it register with it, and + it handles deletion of the temporary local files when not needed, as well + as the download of remote files when necessary for reading. There are two + important subclasses of this: :class:`.SharedStaging` for a ``shared`` + resource, and :class:`.PermanentStaging` for a ``permanent`` resource. +* :class:`.StagingPath`: This represents a file within the + :class:`.StagingRegistry`. It contains both the key (label) used in the + key-value store, as well as the actual local path to the file. When its + ``__fspath__`` method is called, it registers itself with its + :class:`.StagingRegistry`, which handles managing it over its lifecycle. + +In practice, the executor uses the :class:`.StorageManager` to create a +:class:`.DAGContextManager` at the level of a DAG, and then uses the +:class:`.DAGContextManager` to create a context to run a unit. That context +creates a :class:`.SharedStaging` and a :class:`.PermanentStaging` +associated with the specific unit. Those staging directories, with the +scratch directory, are provided to the :class:`.ProtocolUnit`, so that +these are the only objects protocol authors need to interact with. diff --git a/gufe/protocols/protocoldag.py b/gufe/protocols/protocoldag.py index 85978136..41b8f529 100644 --- a/gufe/protocols/protocoldag.py +++ b/gufe/protocols/protocoldag.py @@ -17,6 +17,13 @@ ProtocolUnit, ProtocolUnitResult, ProtocolUnitFailure, Context ) +from ..storage.storagemanager import StorageManager +from ..storage.externalresource.filestorage import FileStorage +from ..storage.externalresource.base import ExternalStorage + +import logging +_logger = logging.getLogger(__name__) + class DAGMixin: _protocol_units: list[ProtocolUnit] @@ -345,9 +352,67 @@ def _from_dict(cls, dct: dict): return cls(**dct) +class ReproduceOldBehaviorStorageManager(StorageManager): + # Default behavior has scratch at {dag_label}/scratch/{unit_label} and + # shared at {dag_label}/{unit_label}. This little class makes changes + # that get us back to the original behavior of this class: scratch at + # {dag_label}/scratch_{unit_label} and shared at + # {dag_label}/shared_{unit_label}. + def _scratch_loc(self, dag_label, unit_label, attempt): + return ( + self.scratch_root + / f"{dag_label}/scratch_{unit_label}_attempt_{attempt}" + ) + + def make_label(self, dag_label, unit_label, attempt): + return f"{dag_label}/shared_{unit_label}_attempt_{attempt}" + + @classmethod + def from_old_args( + cls, + shared_basedir: PathLike, + scratch_basedir: PathLike, *, + keep_shared: bool = False, + keep_scratch: bool = False, + ): + """ + Create an new storage manager based on the old execute_DAG args. + + Parameters + ---------- + shared_basedir : Path + Filesystem path to use for shared space that persists across whole DAG + execution. Used by a `ProtocolUnit` to pass file contents to dependent + class:``ProtocolUnit`` instances. + scratch_basedir : Path + Filesystem path to use for `ProtocolUnit` `scratch` space. + keep_shared : bool + If True, don't remove shared directories for `ProtocolUnit`s after + the `ProtocolDAG` is executed. + keep_scratch : bool + If True, don't remove scratch directories for a `ProtocolUnit` after + it is executed. + """ + # doing this here makes it easier to test than putting in + # execute_DAG + shared_basedir = Path(shared_basedir) + shared = FileStorage(shared_basedir.parent) + storage_manager = cls( + scratch_root=scratch_basedir, + shared_root=shared, + permanent_root=shared, + keep_scratch=keep_scratch, + keep_shared=keep_shared, + keep_staging=True, + keep_empty_dirs=True, + staging=Path(""), # use the actual directories as the staging + ) + return storage_manager + + def execute_DAG(protocoldag: ProtocolDAG, *, - shared_basedir: Path, - scratch_basedir: Path, + shared_basedir: PathLike, + scratch_basedir: PathLike, keep_shared: bool = False, keep_scratch: bool = False, raise_error: bool = True, @@ -385,52 +450,92 @@ def execute_DAG(protocoldag: ProtocolDAG, *, The result of executing the `ProtocolDAG`. """ + # the directory given as shared_root is actually the directory for this + # DAG; the "shared_root" for the storage manager is the parent. We'll + # force permanent to be the same. + storage_manager = ReproduceOldBehaviorStorageManager.from_old_args( + shared_basedir=shared_basedir, + scratch_basedir=scratch_basedir, + keep_shared=keep_shared, + keep_scratch=keep_scratch + ) + dag_label = shared_basedir.name + return new_execute_DAG(protocoldag, dag_label, storage_manager, + raise_error, n_retries) + + +def new_execute_DAG( # TODO: this is a terrible name + protocoldag: ProtocolDAG, + dag_label: str, + storage_manager: StorageManager, + raise_error: bool = False, + n_retries: int = 0 +) -> ProtocolDAGResult: + """ + Locally execute a full :class:`ProtocolDAG` in serial and in-process. + + Alternate input signature to generalize execute_DAG + + Parameters + ---------- + protocoldag : ProtocolDAG + The :class:``ProtocolDAG`` to execute. + dag_label : str + Label to use for the DAG + storage_manager : StorageManager + The :class:`.StorageManager` to handle storing files. + raise_error : bool + If True, raise an exception if a ProtocolUnit fails, default True + if False, any exceptions will be stored as `ProtocolUnitFailure` + objects inside the returned `ProtocolDAGResult` + n_retries : int + the number of times to attempt, default 0, i.e. try once and only once + + Returns + ------- + ProtocolDAGResult + The result of executing the `ProtocolDAG`. + """ + # this simplifies setup of execute_DAG by allowing you to directly + # provide the storage_manager; the extra options in the old one just + # configure the storage_manager if n_retries < 0: raise ValueError("Must give positive number of retries") # iterate in DAG order results: dict[GufeKey, ProtocolUnitResult] = {} all_results = [] # successes AND failures - shared_paths = [] - for unit in protocoldag.protocol_units: - # translate each `ProtocolUnit` in input into corresponding - # `ProtocolUnitResult` - inputs = _pu_to_pur(unit.inputs, results) - - attempt = 0 - while attempt <= n_retries: - shared = shared_basedir / f'shared_{str(unit.key)}_attempt_{attempt}' - shared_paths.append(shared) - shared.mkdir() - - scratch = scratch_basedir / f'scratch_{str(unit.key)}_attempt_{attempt}' - scratch.mkdir() - - context = Context(shared=shared, - scratch=scratch) - - # execute - result = unit.execute( - context=context, - raise_error=raise_error, - **inputs) - all_results.append(result) - - if not keep_scratch: - shutil.rmtree(scratch) - - if result.ok(): - # attach result to this `ProtocolUnit` - results[unit.key] = result - break - attempt += 1 - if not result.ok(): - break - - if not keep_shared: - for shared_path in shared_paths: - shutil.rmtree(shared_path) + with storage_manager.running_dag(dag_label) as dag_ctx: + for unit in protocoldag.protocol_units: + # import pdb; pdb.set_trace() + attempt = 0 + while attempt <= n_retries: + # translate each `ProtocolUnit` in input into corresponding + # `ProtocolUnitResult` + inputs = _pu_to_pur(unit.inputs, results) + + label = storage_manager.make_label(dag_label, unit.key, + attempt=attempt) + with dag_ctx.running_unit( + dag_label, unit.key, attempt=attempt + ) as context: + _logger.info(f"Starting unit {label}") + _logger.info(context) + result = unit.execute( + context=context, + raise_error=raise_error, + **inputs) + all_results.append(result) + + if result.ok(): + # attach result to this `ProtocolUnit` + results[unit.key] = result + break + attempt += 1 + + if not result.ok(): + break return ProtocolDAGResult( name=protocoldag.name, diff --git a/gufe/protocols/protocolunit.py b/gufe/protocols/protocolunit.py index 73728bdf..8e99e97a 100644 --- a/gufe/protocols/protocolunit.py +++ b/gufe/protocols/protocolunit.py @@ -23,6 +23,8 @@ GufeTokenizable, GufeKey, TOKENIZABLE_REGISTRY ) +from ..storage.stagingregistry import StagingPath + @dataclass class Context: @@ -31,7 +33,8 @@ class Context: """ scratch: PathLike - shared: PathLike + shared: StagingPath + permanent: StagingPath def _list_dependencies(inputs, cls): diff --git a/gufe/storage/stagingregistry.py b/gufe/storage/stagingregistry.py new file mode 100644 index 00000000..24e4a1e1 --- /dev/null +++ b/gufe/storage/stagingregistry.py @@ -0,0 +1,357 @@ +from __future__ import annotations + +from typing import Union, Optional +from pathlib import Path +from os import PathLike, rmdir, remove +from .externalresource import ExternalStorage, FileStorage +from contextlib import contextmanager + +from gufe.utils import delete_empty_dirs + +import logging +_logger = logging.getLogger(__name__) + + +def _safe_to_delete_file( + external: ExternalStorage, + path: PathLike +) -> bool: + """Check that deleting this file will not remove it from external""" + # kind of brittle: deals with internals of FileStorage + if isinstance(external, FileStorage): + root = external.root_dir + else: + return True + + p = Path(path) + try: + label = str(p.relative_to(root)) + except ValueError: + return True + return not external.exists(label) + + +class StagingRegistry: + """Local representation of an :class:`.ExternalStorage`. + + This connects objects on a local filesystem to the key-value store of a + (possibly remote) :class:`.ExternalStorage`. It presents a FileLike + interface to users, but internally (via the :class:`.StagingPath` objects + it contains in its registry) maps local filenames to the keys (labels) + for the key-value store. + + 1. If a local path is requested that corresponds to an existing label in + the :class:`.ExternalStorage`, this object will "download" the + contents of that key to that local path. + + 2. When requested, it transfers any newly created files to the + :class:`.ExternalStorage`. + + 3. It can delete all of the files it manages. + + Parameters + ---------- + scratch : PathLike + the scratch directory shared by all objects on this host + external : :class:`.ExternalStorage` + external storage resource where objects should eventualy go + staging : PathLike + name of the subdirectory of scratch where staged results are + temporarily stored; default is '.staging'. This must be the same for + all units within a DAG. + delete_staging : bool + whether to delete the contents of the $SCRATCH/$STAGING + directory when this object is deleted + """ + def __init__( + self, + scratch: PathLike, + external: ExternalStorage, + *, + staging: PathLike = Path(".staging"), + delete_staging: bool = True, + keep_empty_dirs: bool = False, + ): + self.external = external + self.scratch = Path(scratch) + self.delete_staging = delete_staging + self.keep_empty_dirs = keep_empty_dirs + self.staging = staging + + self.registry: set[StagingPath] = set() + self.preexisting: set[StagingPath] = set() + self.staging_dir = self.scratch / staging + self.staging_dir.mkdir(exist_ok=True, parents=True) + + def _delete_file_safe(self, file): + """Check if deleting this file will remove it from external.""" + return _safe_to_delete_file( + external=self.external, + path=file + ) + + def transfer_single_file_to_external(self, held_file: StagingPath): + """Transfer a given file from staging into external storage + """ + path = held_file.as_path() + if not path.exists(): + _logger.info(f"Found nonexistent path {path}, not " + "transfering to external storage") + elif path.is_dir(): + _logger.debug(f"Found directory {path}, not " + "transfering to external storage") + else: + _logger.info(f"Transfering {path} to external storage") + self.external.store_path(held_file.label, path) + return held_file + + return None # no transfer + + def transfer_staging_to_external(self): + """Transfer all objects in the registry to external storage + + """ + return [ + transferred + for file in self.registry + if (transferred := self.transfer_single_file_to_external(file)) + ] + + def _delete_file(self, file: StagingPath): + path = file.as_path() + if path.exists(): + if not path.is_dir(): + _logger.debug(f"Removing file '{file}'") + path.unlink() + else: + _logger.debug( + f"During staging cleanup, the directory '{file}' was " + "found as a staged path. This will be deleted only if " + "`keep_empty` is False." + ) + self.registry.remove(file) + else: + _logger.warning( + f"During staging cleanup, file '{file}' was marked for " + "deletion, but can not be found on disk." + ) + + def cleanup(self): + """Perform end-of-lifecycle cleanup. + """ + if self.delete_staging: + for file in self.registry - self.preexisting: + if self._delete_file_safe(file): + self._delete_file(file) + + if not self.keep_empty_dirs: + delete_empty_dirs(self.staging_dir) + + def register_path(self, staging_path: StagingPath): + """ + Register a :class:`.StagingPath` with this :class:`.StagingRegistry`. + + This marks a given path as something for this object to manage, by + loading it into the ``registry``. This way it is tracked such that + its contents can be transfered to the :class:`.ExternalStorage` and + such that the local copy can be deleted when it is no longer needed. + + If this objects's :class:`.ExternalStorage` already has data for the + label associated with the provided :class:`.Stagingpath`, then the + contents of that should copied to the local path so that it can be + read by the user. + + Parameters + ---------- + staging_path: :class:`.StagingPath` + the path to track + """ + label_exists = self.external.exists(staging_path.label) + fspath = staging_path.as_path() + + if not fspath.parent.exists(): + fspath.parent.mkdir(parents=True, exist_ok=True) + + self.registry.add(staging_path) + + # if this is a file that exists, bring it into our subdir + # NB: this happens even if you're intending to overwrite the path, + # which is kind of wasteful + if label_exists: + self._load_file_from_external(self.external, staging_path) + + def _load_file_from_external(self, external: ExternalStorage, + staging_path: StagingPath): + # import pdb; pdb.set_trace() + scratch_path = self.staging_dir / staging_path.path + # TODO: switch this to using `get_filename` and `store_path` + if scratch_path.exists(): + self.preexisting.add(staging_path) + + with external.load_stream(staging_path.label) as f: + external_bytes = f.read() + ... # TODO: check that the bytes are the same if preexisting? + + scratch_path.parent.mkdir(exist_ok=True, parents=True) + with open(scratch_path, mode='wb') as f: + f.write(external_bytes) + + def __truediv__(self, path: Union[PathLike, str]): + return StagingPath(root=self, path=path) + + def __repr__(self): + return ( + f"{self.__class__.__name__}('{self.scratch}', {self.external})" + ) + + def __del__(self): # -no-cov- + # in case someone doesn't use this within a context manager + if self.staging_dir.exists(): + self.cleanup() + + +class SharedStaging(StagingRegistry): + """Staging for shared external storage. + + This enables read-only versions to be loaded from other units. + """ + def __init__( + self, + scratch: PathLike, + external: ExternalStorage, + *, + staging: PathLike = Path(".staging"), + delete_staging: bool = True, + keep_empty_dirs: bool = False, + read_only: bool = False, + ): + super().__init__(scratch, external, staging=staging, + delete_staging=delete_staging, + keep_empty_dirs=keep_empty_dirs) + self.read_only = read_only + + def transfer_single_file_to_external(self, held_file: StagingPath): + if self.read_only: + _logger.debug("Read-only: Not transfering to external storage") + return # early exit + + return super().transfer_single_file_to_external(held_file) + + def transfer_staging_to_external(self): + if self.read_only: + _logger.debug("Read-only: Not transfering to external storage") + return # early exit + + return super().transfer_staging_to_external() + + def register_path(self, staging_path: StagingPath): + label_exists = self.external.exists(staging_path.label) + + if self.read_only and not label_exists: + raise IOError(f"Unable to create '{staging_path.label}'. File " + "does not exist in external storage, and this " + "staging path is read-only.") + + super().register_path(staging_path) + + +class PermanentStaging(StagingRegistry): + """Staging directory for the permanent storage. + + This allows files to be downloaded from a shared + :class:`.ExternalStorage`. + """ + def __init__( + self, + scratch: PathLike, + external: ExternalStorage, + shared: ExternalStorage, + *, + staging: PathLike = Path(".staging"), + delete_staging: bool = True, + keep_empty_dirs: bool = False, + ): + super().__init__(scratch, external, staging=staging, + delete_staging=delete_staging, + keep_empty_dirs=keep_empty_dirs) + self.shared = shared + + def _delete_file_safe(self, file): + shared_safe = _safe_to_delete_file( + external=self.shared, + path=file + ) + return shared_safe and super()._delete_file_safe(file) + + def transfer_single_file_to_external(self, held_file: StagingPath): + # if we can't find it locally, we load it from shared storage + path = held_file.as_path() + if not path.exists(): + self._load_file_from_external(self.shared, held_file) + + super().transfer_single_file_to_external(held_file) + + +class StagingPath: + """PathLike object linking local path with label for external storage. + + On creation, this registers with a :class:`.StagingRegistry` that will + manage the local path and transferring data with its + :class:`.ExternalStorage`. + + This object can always be used as a FileLike (using, e.g., the standard + ``open`` builtin). This requires that a staged path that exists on an + external resource be downloaded into a local file when it is referenced. + + For a representation of a file that does not require the download (for + example, when deserializing results that point to files) instead use + :class:`.ExternalFile`. + """ + def __init__(self, root: StagingRegistry, + path: Union[PathLike, str]): + self.root = root + self.path = Path(path) + + def register(self): + """Register this path with its StagingRegistry. + + If a file associated with this path exists in an external storage, + it will be downloaded to the staging area as part of registration. + """ + self.root.register_path(self) + + def __truediv__(self, path: Union[PathLike, str]): + return StagingPath(self.root, self.path / path) + + def __eq__(self, other): + return (isinstance(other, StagingPath) + and self.root == other.root + and self.path == other.path) + + def __hash__(self): + return hash((self.root, self.path)) + + def as_path(self): + """Return the pathlib.Path where this is staged""" + return Path(self._fspath) + + @property + def _fspath(self): + return str(self.root.staging_dir / self.path) + + def __fspath__(self): + self.register() + return self._fspath + + @property + def label(self) -> str: + """Label used in :class:`.ExternalStorage` for this path""" + return str(self.path) + + def __repr__(self): + return f"StagingPath('{self._fspath}')" + + # TODO: how much of the pathlib.Path interface do we want to wrap? + # although edge cases may be a pain, we can get most of it with, e.g.: + # def exists(self): return Path(self).exists() + # but also, can do pathlib.Path(staging_path) and get hte whole thing diff --git a/gufe/storage/stagingserialization.py b/gufe/storage/stagingserialization.py new file mode 100644 index 00000000..c4db2df7 --- /dev/null +++ b/gufe/storage/stagingserialization.py @@ -0,0 +1,154 @@ +from gufe.tokenization import JSON_HANDLER +from gufe.custom_json import JSONCodec, JSONSerializerDeserializer +from .stagingregistry import StagingPath + + +class StagingPathSerialization: + """Class for managing serialization of a :class:`.StagingPath`. + + This class is only created internally. Developers of executors will + interface with this indirectly through the :class:`.StorageManager`; the + expectation is that the only thing they will need is access to the + ``encoder`` and ``decoder`` properties. + + Serialization of a :class:`.StagingPath` needs to strip the specific + storage context (path to external files storage) because we should able + to change that out-of-process (e.g., move the directory containing + results) and still be able to deserialize correctly. This class is + responsible for abstracting/injecting the storage context for a + :class:`.StagingPath` is serialized/deserialized. + """ + # TODO: this long comment should probably go somewhere where it will + # show up in docs as well? Maybe just bump it into the class docstring? + # + # Serializing staging paths + # ------------------------- + # + # 1. I am loading my results object, and I will want to use the + # associated files. This should be transparent, regardless of where + # the permanent storage is located. + # 2. I am loading my results object, but I do not need the large stored + # files. I do not want to download them when they aren't needed. + # 3. My permanent storage was a directory on my file system, but I have + # moved that directory (with use cases of (a) I moved the absolute + # path; (b) it is at a different relative path with respect to my + # pwd. + # 4. I'm working with files from two different permanent storages. I + # need to be able to load from both in the same Python process. + # 5. I have generated data in one backend, and I tranferred it to + # another backend. It needs to be readable from the other backend. + # (Use case: data is in long-term cloud storage that requires + # credentials, but I want to share some part of that data with + # someone else by transferring it to a disk.) + # 6. I am interfacing with a package that adds serialization types to + # the gufe JSON_HANDLER via an external JSONCodec. Maybe, in the + # worst case, the external codec gets added *after* I've created my + # serialization object. I need to be able to serialize those custom + # types. + # + # Outputs from a protocol may contain a :class:`.StagingPath`. Note that + # a :class:`.StagingPath` is inherently not a complete description of + # how to obtain the associated data: in almost every situation, there is + # some additional context required. This can include the credentials to + # an external server, or a base path that the file can be found relative + # to (which may have changed if the user moves it.) Because of this, we + # need to inject that context in to the deserialization. + # + # This object injects the relevant context, provided by the + # :class:`.StorageManager`. It creates a JSONSerializerDeserializer + # based on the one being used by gufe in this process, using all the + # installed codecs plus an additional codec specific to this context. + # + # User stories 1 and 2 are handled by the nature of the + # :class:`.StagingPath` object. The external file downloads as part of + # the ``__fspath__`` method. This means that when using the ``open`` + # builtin, you will automatically download the file to a local staging + # directory. However, the reference to the file can exist in the results + # object without downloading the file. + # + # User stories 3--6 are handled by this + # :class:`.StagingPathSerialization` class. Story 3 is handled by + # allowing the appropriate context (in the form of a + # :class:`.StorageManager`) to be injected into the deserialization + # process. Story 4 can be handled by using more than one + # :class:`.StagingPathSerialization` context (associated with different + # :class:`.StorageManager` objects. Story 5 is handled by injecting + # the appropriate context (and, in principle, is a variant of story 3.) + # Story 6 is handled by doing a just-in-time generation of the + # JSONSerializerDeserializer that we use for this class. + def __init__(self, manager): + self.manager = manager + self.codec = JSONCodec( + cls=StagingPath, + to_dict=self.to_dict, + from_dict=self.from_dict, + ) + self.refresh_handler() + + def refresh_handler(self): + """Ensure that the current handler includes all registered codecs""" + codecs = JSON_HANDLER.codecs + [self.codec] + self.json_handler = JSONSerializerDeserializer(codecs) + + @property + def encoder(self): + """ + JSONEncoder class to use when serializing a :class:`.StagingPath` + """ + self.refresh_handler() + return self.json_handler.encoder + + @property + def decoder(self): + """ + JSONdecoder class to use when deserializing a :class:`.StagingPath` + """ + self.refresh_handler() + return self.json_handler.decoder + + def to_dict(self, path: StagingPath): + """ + Dict representation of a StagingPath, abstracting specific context. + + This provides a JSON-serializable representation of a StagingPath + where the specific context of the StagingPath (the specific storage + backend where it is located) is replaced by a generic representation + of 'scratch', 'shared', or 'permanent', allowing a new specific + context to be injected on deserialization. + """ + # scratch, shared, permanent may form nested with progressively + # smaller contexts, so the last of those it is in is where it should + # be labelled. TODO: opportunity for performance improvement if + # needed + loc = None + if path.label in self.manager.scratch_root.iterdir(): + # TODO: does this happen? we should only trigger this function + # on a StagingPath, and anything in scratch will only be + # pathlib.Path, right? + loc = "scratch" + if path.label in self.manager.shared_root.iter_contents(): + loc = "shared" + if path.label in self.manager.permanent_root.iter_contents(): + loc = "permanent" + + if loc is None: + raise RuntimeError( + f"Unable to serialize {path}: it does not appear to be " + "associated with storage managed by the context manager " + f"{self.manager}." + ) + + return { + ':container:': loc, + ':label:': path.label, + } + + def from_dict(self, dct: dict) -> StagingPath: + """Recreate a StagingPath from its dict representation. + + This undoes the process from :method:`.to_dict`. It injects the + storage context in ``self.storage_manager`` into the deserialized + :class:`.StagingPath` instance. + """ + staging = getattr(self.manager, f"{dct[':container:']}_staging") + return staging / dct[':label:'] diff --git a/gufe/storage/storagemanager.py b/gufe/storage/storagemanager.py new file mode 100644 index 00000000..ef95286a --- /dev/null +++ b/gufe/storage/storagemanager.py @@ -0,0 +1,154 @@ +from __future__ import annotations +from os import PathLike +from pathlib import Path +from contextlib import contextmanager +import shutil + +from gufe.utils import delete_empty_dirs + +from typing import Type + +from .externalresource import ExternalStorage, FileStorage +from .stagingregistry import SharedStaging, PermanentStaging +from .stagingserialization import StagingPathSerialization +from .stagingregistry import StagingPath # typing + +from gufe.protocols.protocolunit import Context + + +class StorageManager: + def __init__( + self, + scratch_root: PathLike, + shared_root: ExternalStorage, + permanent_root: ExternalStorage, + *, + keep_scratch: bool = False, + keep_staging: bool = False, + keep_shared: bool = False, + keep_empty_dirs: bool = False, + staging: PathLike = Path(".staging"), + ): + self.scratch_root = Path(scratch_root) + self.shared_root = shared_root + self.permanent_root = permanent_root + self.keep_scratch = keep_scratch + self.keep_staging = keep_staging + self.keep_shared = keep_shared + self.staging = staging + self.keep_empty_dirs = keep_empty_dirs + + # these are used to track what files can be deleted from shared if + # keep_shared is False + self.shared_xfer: set[StagingPath] = set() + self.permanent_xfer: set[StagingPath] = set() + + self.permanent_staging = PermanentStaging( + scratch=self.scratch_root, + external=self.permanent_root, + shared=self.shared_root, + staging=self.staging, + keep_empty_dirs=keep_empty_dirs, + ) + + self.shared_staging = SharedStaging( + scratch=self.scratch_root, + external=self.shared_root, + staging=self.staging, + keep_empty_dirs=keep_empty_dirs, + ) + + self._serialization = StagingPathSerialization(self) + + @property + def json_encoder(self): + return self._serialization.encoder + + @property + def json_decoder(self): + return self._serialization.decoder + + def make_label(self, dag_label, unit_label, attempt, **kwargs): + """ + + The specific executor may change this by making a very simple + adapter subclass and overriding this method, which can take + arbitrary additional kwargs that may tie it to a specific executor. + """ + return f"{dag_label}/{unit_label}_attempt_{attempt}" + + @property + def _scratch_base(self): + return self.scratch_root / "scratch" + + def _scratch_loc(self, dag_label, unit_label, attempt, **kwargs): + label = self.make_label(dag_label, unit_label, attempt) + return self._scratch_base / label + + @contextmanager + def running_dag(self, dag_label): + # TODO: remove (or use) dag_label + try: + yield self + finally: + # import pdb; pdb.set_trace() + # clean up after DAG completes + self.permanent_staging.transfer_staging_to_external() + + if not self.keep_staging: + self.permanent_staging.cleanup() + + if not self.keep_shared: + # we'd like to do something like loop over + # self.shared_xfer - self.permanent_xfer; however, + # StagedPaths have different staging registries. This gives + # the set of paths we do want to delete + perm_xfer_paths = {p.as_path() for p in self.permanent_xfer} + shared_xfer_to_delete = { + p for p in self.shared_xfer + if p.as_path() not in perm_xfer_paths + } + + for file in shared_xfer_to_delete: + self.shared_root.delete(file.label) + + for file in self.permanent_xfer: + if self.shared_root != self.permanent_root: + self.shared_root.delete(file.label) + + if not self.keep_empty_dirs: + delete_empty_dirs(self._scratch_base, delete_root=False) + + @contextmanager + def running_unit(self, dag_label, unit_label, **kwargs): + scratch = self._scratch_loc(dag_label, unit_label, **kwargs) + label = self.make_label(dag_label, unit_label, **kwargs) + scratch.mkdir(parents=True, exist_ok=True) + shared = self.shared_staging / label + permanent = self.permanent_staging / label + context = Context( + scratch=scratch, + shared=shared, + permanent=permanent + ) + try: + yield context + finally: + # import pdb; pdb.set_trace() + # clean up after unit + + # track the files that were in shared so that we can delete them + # at the end of the DAG if requires + shared_xfers = self.shared_staging.transfer_staging_to_external() + self.shared_xfer.update(set(shared_xfers)) + + # everything in permanent should also be in shared + for file in self.permanent_staging.registry: + self.shared_staging.transfer_single_file_to_external(file) + self.permanent_xfer.add(file) + + if not self.keep_scratch: + shutil.rmtree(scratch) + + if not self.keep_staging: + self.shared_staging.cleanup() diff --git a/gufe/tests/storage/test_stagingregistry.py b/gufe/tests/storage/test_stagingregistry.py new file mode 100644 index 00000000..d5e5d71e --- /dev/null +++ b/gufe/tests/storage/test_stagingregistry.py @@ -0,0 +1,366 @@ +import pytest +from unittest import mock +import logging + +import os +import pathlib + +from gufe.storage.externalresource import MemoryStorage, FileStorage +from gufe.storage.stagingregistry import ( + SharedStaging, PermanentStaging, _safe_to_delete_file, + delete_empty_dirs, # TODO: move to appropriate place +) + + +@pytest.fixture +def root(tmp_path): + external = MemoryStorage() + external.store_bytes("old_unit/data.txt", b"foo") + root = SharedStaging( + scratch=tmp_path, + external=external, + delete_staging=False + ) + return root + + +@pytest.fixture +def root_with_contents(root): + # file staged but not yet shipped to external + with open(root / "new_unit/data.txt", mode='wb') as f: + f.write(b"bar") + + return root + + +@pytest.fixture +def read_only_with_overwritten(root_with_contents): + read_only = SharedStaging( + scratch=root_with_contents.scratch, + external=root_with_contents.external, + staging=root_with_contents.staging, + delete_staging=root_with_contents.delete_staging, + read_only=True + ) + filename = (read_only / "old_unit/data.txt").as_path() + assert not filename.exists() + staged = read_only / "old_unit/data.txt" + assert not filename.exists() + staged.__fspath__() + assert filename.exists() + with open(staged, mode='w') as f: + f.write("changed") + + return read_only, staged + + +@pytest.fixture +def permanent(tmp_path): + shared = MemoryStorage() + shared.store_bytes("old_unit/data.txt", b"foo") + perm = PermanentStaging( + scratch=tmp_path / "final", + external=MemoryStorage(), + shared=shared, + delete_staging=True + ) + return perm + + +@pytest.mark.parametrize('rel_path', [ + ("bar"), ("baz"), ("../bar") +]) +def test_safe_to_delete_file(tmp_path, rel_path): + external = FileStorage(tmp_path / "foo") + external.store_bytes("bar", b"") + ext_loc = tmp_path / "foo" / "bar" + assert ext_loc.exists() + + staged = external.root_dir / rel_path + is_safe = (rel_path != "bar") + assert _safe_to_delete_file(external, staged) is is_safe + + +def test_safe_to_delete_file_not_filestorage(tmp_path): + external = MemoryStorage() + external.store_bytes("bar", b"") + staging = tmp_path / "bar" + assert _safe_to_delete_file(external, staging) + + +def test_delete_empty_dirs(tmp_path): + base = tmp_path / "tmp" + paths = [ + base / "foo" / "qux" / "qux.txt", + + ] + dirs = [ + base / "foo" / "bar" / "baz", + base / "quux", + ] + for directory in dirs: + directory.mkdir(parents=True, exist_ok=True) + + for path in paths: + path.parent.mkdir(parents=True, exist_ok=True) + path.touch() + + delete_empty_dirs(base) + for path in paths: + assert path.exists() + + for directory in dirs: + assert not directory.exists() + + assert not (base / "foo" / "bar").exists() + + +@pytest.mark.parametrize('delete_root', [True, False]) +def test_delete_empty_dirs_delete_root(tmp_path, delete_root): + base = tmp_path / "tmp" + dirs = [ + base / "foo" / "bar" / "baz", + base / "quux", + ] + for directory in dirs: + directory.mkdir(parents=True, exist_ok=True) + + delete_empty_dirs(base, delete_root=delete_root) + + for directory in dirs: + assert not directory.exists() + + assert not (base / "foo" / "bar").exists() + assert base.exists() is not delete_root + + +class TestSharedStaging: + def test_repr(self, root): + r = repr(root) + assert r.startswith("SharedStaging") + assert "MemoryStorage" in r + + def test_fspath_fail(self, root): + # ensure that we get an error on os.path.join (or really, anything + # that hits os.fspath) + with pytest.raises(TypeError): + os.path.join(root, "filename.txt") + + @pytest.mark.parametrize('pathlist', [ + ['file.txt'], ['dir', 'file.txt'] + ]) + def test_path(self, root, pathlist): + path = root + for p in pathlist: + path = path / p + + inner_path = os.sep.join(pathlist) + actual_path = root.staging_dir / inner_path + + assert pathlib.Path(path) == actual_path + + def test_read_old(self, root): + # When the file doesn't exist locally, it should be pulled down the + # first time that we register the path. + + # initial conditions, without touching StagingRegistry/StagingPath + label = "old_unit/data.txt" + on_filesystem = root.scratch / root.staging / "old_unit/data.txt" + assert not on_filesystem.exists() + assert root.external.exists(label) + + # when we create the specific StagingPath, it registers and + # "downloads" the file + filepath = root / "old_unit/data.txt" + assert filepath.as_path() == on_filesystem + + assert not on_filesystem.exists() + filepath.register() + assert on_filesystem.exists() + + # let's just be sure we can read in the data as desired + with open(filepath, mode='rb') as f: + assert f.read() == b"foo" + + def test_write_new(self, root): + label = "new_unit/somefile.txt" + on_filesystem = root.scratch / root.staging / "new_unit/somefile.txt" + assert not on_filesystem.exists() + with open(root / "new_unit/somefile.txt", mode='wb') as f: + f.write(b"testing") + + # this has been written to disk in scratch, but not yet saved to + # external storage + assert on_filesystem.exists() + assert not root.external.exists(label) + + @pytest.mark.xfail # Need test that read-only errors on new files + def test_write_old_fail(self, root): + old_staging = root._get_other_shared("old_unit") + staged = old_staging / "foo.txt" + with pytest.raises(IOError, match="read-only"): + staged.__fspath__() + + def test_transfer_to_external(self, root_with_contents): + path = list(root_with_contents.registry)[0] # only 1 + assert not root_with_contents.external.exists(path.label) + + root_with_contents.transfer_staging_to_external() + assert root_with_contents.external.exists(path.label) + + with root_with_contents.external.load_stream(path.label) as f: + assert f.read() == b"bar" + + def test_transfer_to_external_no_file(self, root, caplog): + with mock.patch.object(root, 'register_path'): + nonfile = root / "old_unit/does_not_exist.txt" + # ensure that we've set this up correctly + assert nonfile not in root.registry + logger_name = "gufe.storage.stagingregistry" + caplog.set_level(logging.INFO, logger=logger_name) + root.transfer_single_file_to_external(nonfile) + assert len(caplog.records) == 1 + record = caplog.records[0] + assert "nonexistent" in record.msg + + def test_transfer_to_external_directory(self, root, caplog): + directory = root / "old_unit/directory" + with open(directory / "file.txt", mode='w') as f: + f.write("foo") + + logger_name = "gufe.storage.stagingregistry" + caplog.set_level(logging.DEBUG, logger=logger_name) + root.transfer_single_file_to_external(directory) + assert len(caplog.records) == 1 + record = caplog.records[0] + assert "Found directory" in record.msg + assert "not transfering" in record.msg + + def test_single_file_transfer_read_only(self, + read_only_with_overwritten, + caplog): + read_only, staged = read_only_with_overwritten + with read_only.external.load_stream("old_unit/data.txt") as f: + old_contents = f.read() + + assert old_contents == b"foo" + logger_name = "gufe.storage.stagingregistry" + caplog.set_level(logging.DEBUG, logger=logger_name) + read_only.transfer_single_file_to_external(staged) + assert len(caplog.records) == 1 + record = caplog.records[0] + assert "Read-only:" in record.msg + with read_only.external.load_stream("old_unit/data.txt") as f: + new_contents = f.read() + assert old_contents == new_contents + + def test_transfer_read_only(self, read_only_with_overwritten, caplog): + read_only, staged = read_only_with_overwritten + with read_only.external.load_stream("old_unit/data.txt") as f: + old_contents = f.read() + + assert old_contents == b"foo" + logger_name = "gufe.storage.stagingregistry" + caplog.set_level(logging.DEBUG, logger=logger_name) + read_only.transfer_staging_to_external() + assert len(caplog.records) == 1 + record = caplog.records[0] + assert "Read-only:" in record.msg + with read_only.external.load_stream("old_unit/data.txt") as f: + new_contents = f.read() + assert old_contents == new_contents + + def test_cleanup(self, root_with_contents): + root_with_contents.delete_staging = True # slightly naughty + path = (root_with_contents / "new_unit/data.txt").as_path() + assert path.exists() + root_with_contents.cleanup() + assert not path.exists() + + def test_cleanup_missing(self, root, caplog): + root.delete_staging = True + file = root / "old_unit/foo.txt" + file.register() + assert file in root.registry + assert not pathlib.Path(file).exists() + logger_name = "gufe.storage.stagingregistry" + caplog.set_level(logging.WARNING, logger=logger_name) + root.cleanup() + assert len(caplog.records) == 1 + record = caplog.records[0] + assert "can not be found on disk" in record.msg + + def test_cleanup_directory(self, root, caplog): + root.delete_staging = True + dirname = root / "old_unit" + assert dirname not in root.registry + dirname.register() + assert dirname in root.registry + + assert not pathlib.Path(dirname).exists() + file = dirname / "foo.txt" + file.register() + # directory is created when something in the directory registered + assert pathlib.Path(dirname).exists() + logger_name = "gufe.storage.stagingregistry" + caplog.set_level(logging.DEBUG, logger=logger_name) + root.cleanup() + assert "During staging cleanup, the directory" in caplog.text + + def test_register_cleanup_preexisting_file(self, root): + filename = (root / "new_unit/foo.txt").as_path() + filename.parent.mkdir(parents=True, exist_ok=True) + filename.touch() + root.external.store_bytes("new_unit/foo.txt", b"") + assert len(root.registry) == 0 + assert len(root.preexisting) == 0 + staging = root / "new_unit/foo.txt" + assert staging.label == "new_unit/foo.txt" + assert len(root.registry) == 0 + assert len(root.preexisting) == 0 + staging.__fspath__() + assert len(root.registry) == 1 + assert len(root.preexisting) == 1 + + assert filename.exists() + root.cleanup() + assert filename.exists() + + +class TestPermanentStaging: + @pytest.mark.parametrize('is_safe', [True, False]) + def test_delete_file_safe(self, tmp_path, is_safe): + staging = ".staging" if is_safe else "" + scratch_root_dir = tmp_path / "final" + + # create a file in the external storage + external = FileStorage(scratch_root_dir) + external.store_bytes("foo.txt", b"foo") + external_file_loc = external.root_dir / "foo.txt" + assert external_file_loc.exists() + + permanent = PermanentStaging( + scratch=scratch_root_dir, + external=MemoryStorage(), + shared=external, + staging=staging, + delete_staging=True + ) + my_file = permanent / "foo.txt" + + # double check that we set things up correctly + assert (str(external_file_loc) != my_file._fspath) is is_safe + + # test the code + assert permanent._delete_file_safe(my_file) is is_safe + + def test_load_missing_for_transfer(self, permanent): + fname = (permanent / "old_unit/data.txt").as_path() + assert not fname.exists() + staging = permanent / "old_unit/data.txt" + staging.__fspath__() + assert not fname.exists() + assert permanent.external._data == {} + permanent.transfer_staging_to_external() + assert fname.exists() + assert permanent.external._data == {"old_unit/data.txt": b"foo"} diff --git a/gufe/tests/storage/test_stagingserialization.py b/gufe/tests/storage/test_stagingserialization.py new file mode 100644 index 00000000..f36de19a --- /dev/null +++ b/gufe/tests/storage/test_stagingserialization.py @@ -0,0 +1,358 @@ +import pytest +from gufe.storage.stagingserialization import StagingPathSerialization + +from gufe.storage.stagingregistry import StagingPath +from gufe.storage.storagemanager import StorageManager +from gufe.storage.externalresource import MemoryStorage, FileStorage + +from gufe.tokenization import GufeTokenizable, from_dict +from gufe.custom_json import JSONCodec + +import json +import pathlib +import shutil + + +@pytest.fixture +def storage_manager(tmp_path): + return StorageManager( + scratch_root=tmp_path / "working", + shared_root=MemoryStorage(), + permanent_root=MemoryStorage(), + ) + + +@pytest.fixture +def shared_path(storage_manager): + label = storage_manager.make_label("dag", "unit", attempt=0) + path = storage_manager.shared_staging / label / "file.txt" + with open(path, mode='w') as f: + f.write("contents here") + + storage_manager.shared_staging.transfer_staging_to_external() + return path + + +@pytest.fixture +def permanent_path(storage_manager): + label = storage_manager.make_label("dag", "unit", attempt=0) + path = storage_manager.permanent_staging / label / "file.txt" + with open(path, mode='w') as f: + f.write("contents here") + + storage_manager.permanent_staging.transfer_staging_to_external() + return path + + +@pytest.fixture +def scratch_path(storage_manager): + scratch_dir = storage_manager._scratch_loc("dag", "unit", attempt=0) + path = scratch_dir / "file.txt" + return path + + +@pytest.fixture +def serialization_handler(storage_manager): + return StagingPathSerialization(storage_manager) + + +class NewType: + # used in new codec test (putting as a nested class required fancier + # serialization approaches, easier to put it at module level) + # class where any instance is equivalent (carries no data) + def __eq__(self, other): + return isinstance(other, self.__class__) + + + +class TestStagingPathSerialization: + @pytest.mark.parametrize('pathtype', ['scratch', 'shared', 'permanent']) + def test_round_trip(self, serialization_handler, pathtype, request): + # NB: scratch is a pathlib.Path, not a StagingPath. It is tested + # here to ensure round-trips as part of the overall user story for + # this, but it doesn't invoke the machinery of the + # StagingPathSerialization object + path = request.getfixturevalue(f"{pathtype}_path") + as_json = json.dumps( + path, + cls=serialization_handler.json_handler.encoder + ) + reloaded = json.loads( + as_json, + cls=serialization_handler.json_handler.decoder + ) + + assert path == reloaded + + @pytest.mark.parametrize('pathtype', ['shared', 'permanent']) + def test_to_dict(self, serialization_handler, pathtype, request): + path = request.getfixturevalue(f"{pathtype}_path") + dct = serialization_handler.to_dict(path) + assert dct == { + ':container:': pathtype, + ':label:': "dag/unit_attempt_0/file.txt", + } + + # tests for specific user stories + @pytest.mark.parametrize('pathtype', ['shared', 'permanent']) + def test_reload_file_contents(self, pathtype, request): + # USER STORY: I am loading my results object, and I will want to use + # the associated files. This should be transparent, regardless of + # where the storage is located (e.g., not local storage). (This is + # actually a test of the staging tools, but we include it here for + # completeness of the user stories.) + path = request.getfixturevalue(f"{pathtype}_path") + + # remove the file (remains in the MemoryStorage) + p = path.as_path() + assert p.exists() + p.unlink() + assert not p.exists() + + # reload the file (NB: nothing special done here; download is + # transparent to user) + with open(path, mode='r') as f: + contents = f.read() + + assert p.exists() + assert contents == "contents here" + + @pytest.mark.parametrize('pathtype', ['shared', 'permanent']) + def test_load_results_object_file_not_downloaded(self, + serialization_handler, + pathtype, request): + # USER STORY: I am loading my results object, but I do not need the + # large stored files. I do not want to download them when they + # aren't needed. + path = request.getfixturevalue(f"{pathtype}_path") + # serialize the path object + json_str = json.dumps(path, cls=serialization_handler.encoder) + + # delete the path from the directory + p = pathlib.Path(path) + assert p.exists() + p.unlink() + assert not p.exists() + + # reload the serialized form of the object + reloaded = json.loads(json_str, cls=serialization_handler.decoder) + + # check that the deserialized version has the path, but that the + # path does not exist on the filesystem + assert isinstance(reloaded, StagingPath) + assert reloaded.label == path.label + assert reloaded.path == path.path + assert not p.exists() + # NOTE: as soon as you call `__fspath__`, the file will download + + @pytest.mark.parametrize('move', ['relative', 'absolute']) + def test_permanent_storage_moved(self, move, tmp_path, monkeypatch): + # USER STORY: My permanent storage was a directory on my file + # system, but I have moved that directory (with use cases of (a) I + # moved the absolute path; (b) it is at a different relative path + # with respect to my pwd). + monkeypatch.chdir(tmp_path) + old_manager = StorageManager( + scratch_root="old/scratch", + shared_root=FileStorage("old/shared"), + permanent_root=FileStorage("old/permanent") + ) + old_path = old_manager.permanent_staging / "dag/unit/result.txt" + with open(old_path, mode='w') as f: + f.write("contents here") + + old_manager.permanent_staging.transfer_staging_to_external() + perm_p = pathlib.Path(tmp_path / "old/permanent/dag/unit/result.txt") + assert perm_p.exists() + + # serialize the path object + json_str = json.dumps(old_path, cls=old_manager.json_encoder) + + # move the storage subdirectory; create a new, associated storage + # manager/serialization handler + if move == "relative": + # change to within t + monkeypatch.chdir(tmp_path / "old") + new_manager = StorageManager( + scratch_root="scratch", + shared_root=FileStorage("shared"), + permanent_root=FileStorage("permanent") + ) + expected_path = tmp_path / "old/permanent/dag/unit/result.txt" + elif move == "absolute": + shutil.move(tmp_path / "old", tmp_path / "new") + new_manager = StorageManager( + scratch_root="new/scratch", + shared_root=FileStorage("new/shared"), + permanent_root=FileStorage("new/permanent") + ) + expected_path = tmp_path / "new/permanent/dag/unit/result.txt" + else: # -no-cov- + raise RuntimeWarning(f"Bad test parameter '{move}': should be " + "'relative' or 'absolute'") + + # deserialize the path using the new serialization handler + reloaded = json.loads(json_str, cls=new_manager.json_decoder) + + # ensure that the path exists and that the data can be reloaded + assert isinstance(reloaded, StagingPath) + assert reloaded.label == old_path.label + assert pathlib.Path(expected_path).exists() + + with open(reloaded, mode='r') as f: + contents = f.read() + + assert contents == "contents here" + + def test_two_different_permanent_storages(self, tmp_path): + # USER STORY: I'm working with files from two different permanent + # storages. I need to be able to load from both in the same Python + # process. (NOTE: this user story is primarily to prevent us from + # changing to a solution based on global/class vars to set context.) + manager1 = StorageManager( + scratch_root=tmp_path / "working1", + shared_root=MemoryStorage(), + permanent_root=MemoryStorage(), + ) + manager2 = StorageManager( + scratch_root=tmp_path / "working2", + shared_root=MemoryStorage(), + permanent_root=MemoryStorage(), + ) + + path1 = manager1.permanent_staging / "file1.txt" + with open(path1, mode='w') as f: + f.write("contents 1") + manager1.permanent_staging.transfer_staging_to_external() + + path2 = manager2.permanent_staging / "file2.txt" + with open(path2, mode='w') as f: + f.write("contents 2") + manager2.permanent_staging.transfer_staging_to_external() + + # serialize the paths + json_str1 = json.dumps(path1, cls=manager1.json_encoder) + json_str2 = json.dumps(path2, cls=manager2.json_encoder) + + # delete all staged files + assert path1.as_path().exists() + manager1.permanent_staging.cleanup() + assert not path1.as_path().exists() + + assert path2.as_path().exists() + manager2.permanent_staging.cleanup() + assert not path2.as_path().exists() + + # reload and check contents of both permanent files + reloaded1 = json.loads(json_str1, cls=manager1.json_decoder) + reloaded2 = json.loads(json_str2, cls=manager2.json_decoder) + + assert isinstance(reloaded1, StagingPath) + assert reloaded1.label == path1.label + assert not reloaded1.as_path().exists() + with open(reloaded1, mode='r') as f: + assert f.read() == "contents 1" + + assert isinstance(reloaded2, StagingPath) + assert reloaded2.label == path2.label + assert not reloaded2.as_path().exists() + with open(reloaded2, mode='r') as f: + assert f.read() == "contents 2" + + def test_change_storage_backend(self, tmp_path): + # USER STORY: I have generated data in one backend, and I tranferred + # it to another backend. It needs to be readable from the other + # backend. (Use case: data is in long-term cloud storage that + # requires credentials, but I want to share some part of that data + # with someone else by transferring it to a disk.) + cloud_manager = StorageManager( + scratch_root=tmp_path / "cloud", + shared_root=MemoryStorage(), + permanent_root=MemoryStorage(), + ) + + local_manager = StorageManager( + scratch_root=tmp_path / "local_scratch", + shared_root=MemoryStorage(), + permanent_root=FileStorage(tmp_path / "local_perm"), + ) + + # TODO: maybe add some more safety asserts in here? that each step + # goes as expected, to better diagnose potential failures? + # load data into the cloud storage + with cloud_manager.running_dag("dag") as dag_ctx: + with dag_ctx.running_unit("dag", "unit", attempt=0) as ctx: + cloud_path = ctx.permanent / "data.txt" + with open(cloud_path, mode='w') as f: + f.write("will store on cloud") + + # serialize the cloud_path (assume it is saved somewhere) + serialized = json.dumps(cloud_path, cls=cloud_manager.json_encoder) + + # transfer from cloud storage to the local_manager + for label in cloud_manager.permanent_root.iter_contents("dag/unit"): + with cloud_manager.permanent_root.load_stream(label) as f: + local_manager.permanent_root.store_bytes(label, f.read()) + + # ensure that we can reload objects from the local manager + local_path = json.loads(serialized, cls=local_manager.json_decoder) + + assert local_path != cloud_path + + with open(local_path, mode='r') as f: + contents = f.read() + + assert contents == "will store on cloud" + + def test_requires_new_codec(self, tmp_path): + # USER STORY: I am interfacing with a package that adds + # serialization types to the gufe JSON_HANDLER via an external + # JSONCodec. Maybe, in the worst case, the external codec gets added + # *after* I've created my serialization object. I need to be able to + # serialize those custom types. (NOTE: A better solution here is to + # have JSONCodecs also include some codec identifier in their + # `:is_custom:` field. That would allow us to dynamically add any + # missing codec, and only to do so when deserialization is needed. + # This is a change to the custom JSON stuff which hasn't been made + # yet. This might also allow faster deserialization by having + # :is_custom: map to something that can be used in a dispatch table.) + manager = StorageManager( + scratch_root=tmp_path / "working", + shared_root=MemoryStorage(), + permanent_root=MemoryStorage(), + ) + + # add a new custom codec for serialization + new_type_codec = JSONCodec( + cls=NewType, + to_dict=lambda obj: {}, + from_dict=lambda dct: NewType(), + ) + + # Create a dict to serialize; this represents the output dict that + # might come from a unit_result object. NB: including the file stuff + # here is actually extraneous (unless implementation changes + # significantly). + with manager.running_dag("dag") as dag_ctx: + with manager.running_unit("dag", "unit", attempt=0) as context: + file = context.permanent / "dag/unit/file.txt" + with open(file, mode='w') as f: + f.write("contents") + + output_dict = { + 'new_type_result': NewType(), + 'file_result': file, + } + + # before codec registration, error as not JSON serializable + with pytest.raises(TypeError, match="not JSON serializable"): + _ = json.dumps(output_dict, cls=manager.json_encoder) + + # register codec and it works + from gufe.tokenization import JSON_HANDLER + JSON_HANDLER.add_codec(new_type_codec) + dumped = json.dumps(output_dict, cls=manager.json_encoder) + + reloaded = json.loads(dumped, cls=manager.json_decoder) + + assert reloaded == output_dict diff --git a/gufe/tests/storage/test_storage_demo.py b/gufe/tests/storage/test_storage_demo.py new file mode 100644 index 00000000..a78b1dca --- /dev/null +++ b/gufe/tests/storage/test_storage_demo.py @@ -0,0 +1,452 @@ +import pytest + +import pathlib + +import gufe +from gufe.storage.externalresource import MemoryStorage, FileStorage +from gufe.storage.storagemanager import StorageManager +from gufe.storage.stagingregistry import StagingPath +from gufe.protocols.protocoldag import new_execute_DAG + +""" +This module contains complete integration tests for the storage lifecycle, +using an actual protocol as an example. + +These tests are largely redundant from the perspective of unit testing, but +the :class:`.StoragedDemoProtocol` is useful as an example for +implementation. Furthermore, as integration tests, they ensure that the +whole setup works together. +""" + + +class Unit1(gufe.ProtocolUnit): + def _execute(self, ctx): + share_file = ctx.shared / "shared.txt" + with open(share_file, mode='w') as f: + f.write("I can be shared") + + nested_file = ctx.shared / "nested" / "shared.txt" + with open(nested_file, mode='w') as f: + f.write("Nested files work as well") + + implicit_nested_file = ctx.shared / "implicit/nested.txt" + with open(implicit_nested_file, mode='w') as f: + f.write("Even if the new diretory is implicit") + + perm_file = ctx.permanent / "permanent.txt" + with open(perm_file, mode='w') as f: + f.write("I'm permanent (but I can be shared)") + + scratch_file = ctx.scratch / "scratch.txt" + with open(scratch_file, mode='w') as f: + f.write("This is scratch -- can't be shared") + + return {'share_file': share_file, + 'perm_file': perm_file, + 'scratch_file': scratch_file} + + +class Unit2(gufe.ProtocolUnit): + def _execute(self, ctx, unit1_result): + u1_outputs = unit1_result.outputs + + outputs = {} + for file_label, file in unit1_result.outputs.items(): + # import pdb; pdb.set_trace() + # labels are, e.g., share_file; file is StagingPath + key = f"{file_label}_contents" + try: + with open(file, mode='r') as f: + outputs[key] = f.read() + except FileNotFoundError: + outputs[key] = "File not found" + + return outputs + + +class StorageDemoProtocol(gufe.Protocol): + @classmethod + def _default_settings(cls): + return {} + + @classmethod + def _defaults(cls): + return {} + + def _create(self, stateA, stateB, mapping, extends): + u1 = Unit1() + u2 = Unit2(unit1_result=u1) + return [u1, u2] + + def _gather(self, protocol_dag_results): + return {} + + +@pytest.fixture +def demo_dag(solvated_ligand, solvated_complex): + transformation = gufe.Transformation( + solvated_ligand, + solvated_complex, + protocol=StorageDemoProtocol(StorageDemoProtocol.default_settings()), + mapping=None + ) + dag = transformation.create() + return dag + + +class ExecutionStorageDemoTest: + """ + Template method pattern ABC for tests of StorageDemoProtocol execution. + + Using template method here because it ensures that all aspects get + tested for all implementations, even though individual aspects may + differ between different setups. + """ + def get_shared_and_permanent(self): + raise NotImplementedError() + + @staticmethod + def _parse_keep(keep): + return ( + 'scratch' in keep, + 'staging' in keep, + 'shared' in keep, + 'empties' in keep + ) + + def assert_dag_result(self, result, demo_dag, storage_manager): + """Test that the ProtocolDAGResult has the expected contents. + + This should be preserved across all execution methods. + """ + u1_label = self.u1_label(demo_dag) + keep_scratch = storage_manager.keep_scratch + + assert result.ok + assert len(result.protocol_unit_results) == 2 + res1, res2 = result.protocol_unit_results + assert set(res1.outputs) == {'share_file', 'perm_file', 'scratch_file'} + assert isinstance(res1.outputs['scratch_file'], pathlib.Path) + assert isinstance(res1.outputs['share_file'], StagingPath) + assert isinstance(res1.outputs['perm_file'], StagingPath) + + if keep_scratch: + scratch_res2 = "This is scratch -- can't be shared" + else: + scratch_res2 = "File not found" + + assert res2.outputs == { + 'share_file_contents': "I can be shared", + 'perm_file_contents': "I'm permanent (but I can be shared)", + 'scratch_file_contents': scratch_res2 + } + + def assert_shared_and_permanent(self, storage_manager, dag): + """Check the final status of the shared and permanent containers. + + The can depend on the relation between the shared and permanent + external storage containers. For example, if they are the same + object, the final contents of permament will also include the final + contents of shared (and vice versa). + + Default behavior here is for the case of distinct backends. + """ + shared = storage_manager.shared_root + permanent = storage_manager.permanent_root + u1_label = self.u1_label(dag) + keep_shared = storage_manager.keep_shared + + perm_file = f"{u1_label}/permanent.txt" + shared_file = f"{u1_label}/shared.txt" + nested_file = f"{u1_label}/nested/shared.txt" + implicit_nested_file = f"{u1_label}/implicit/nested.txt" + + assert list(permanent.iter_contents()) == [perm_file] + with permanent.load_stream(perm_file) as f: + assert f.read() == b"I'm permanent (but I can be shared)" + + if keep_shared: + assert set(shared.iter_contents()) == { + shared_file, perm_file, nested_file, implicit_nested_file + } + with shared.load_stream(shared_file) as f: + assert f.read() == b"I can be shared" + with shared.load_stream(perm_file) as f: + assert f.read() == b"I'm permanent (but I can be shared)" + else: + assert list(shared.iter_contents()) == [] + + def assert_scratch(self, storage_manager): + """Check the final status of the scratch directory. + + This will change if the scratch is within the staging root directory + (for cases where we want to keep one of staging/scratch and not the + other; empty directories might get deleted in one case). + """ + scratch = storage_manager.scratch_root + keep_scratch = storage_manager.keep_scratch + del_empty_dirs = not storage_manager.keep_empty_dirs + assert scratch.is_dir() + + if keep_scratch: + n_expected = 1 if del_empty_dirs else 2 + dag_dir = scratch / "scratch/dag" + assert len(list(dag_dir.iterdir())) == n_expected + else: + assert 'scratch' not in list(scratch.iterdir()) + + def assert_staging(self, storage_manager, dag): + """Check the final status of the staging directory. + + Behavior here will change if staging overlaps with a FileStorage for + either shared or permanent. + """ + keep_staging = storage_manager.keep_staging + u1_label = self.u1_label(dag) + scratch_root = storage_manager.scratch_root + u1_staging = scratch_root / ".staging" / u1_label + + if keep_staging: + assert (u1_staging / "shared.txt").exists() + assert (u1_staging / "permanent.txt").exists() + assert (u1_staging / "nested/shared.txt").exists() + assert (u1_staging / "implicit/nested.txt").exists() + else: + assert ".staging" not in list(scratch_root.iterdir()) + + def assert_empty_directories(self, storage_manager, dag): + """Check the final status for empty directories.""" + u1_label = self.u1_label(dag) + staging = storage_manager.scratch_root / storage_manager.staging + + directories = [ + staging / u1_label / "nested", + staging / u1_label / "implicit", + ] + + # name these conditions so the logic takes less thought + expected_empty = (storage_manager.keep_empty_dirs + and not storage_manager.keep_staging) + dir_exists = expected_empty or storage_manager.keep_staging + + for directory in directories: + assert directory.exists() == dir_exists + if dir_exists: + assert directory.is_dir() + actual_empty = len(list(directory.iterdir())) == 0 + assert actual_empty == expected_empty + + @staticmethod + def u1_label(dag): + """Unit 1 label""" + return f"dag/{dag.protocol_units[0].key}_attempt_0" + + @staticmethod + def u2_label(dag): + """Unit 2 label""" + return f"dag/{dag.protocol_units[1].key}_attempt_0" + + def get_storage_manager(self, keep, tmp_path): + keep_scr, keep_sta, keep_sha, empties = self._parse_keep(keep) + shared, permanent = self.get_shared_and_permanent() + + storage_manager = StorageManager( + scratch_root=tmp_path, + shared_root=shared, + permanent_root=permanent, + keep_scratch=keep_scr, + keep_staging=keep_sta, + keep_shared=keep_sha, + keep_empty_dirs=empties, + ) + return storage_manager + + def execute(self, storage_manager, dag, dag_label): + result = new_execute_DAG(dag, dag_label, storage_manager, + raise_error=True, n_retries=2) + return result + + @pytest.mark.parametrize('keep', [ + 'nothing', 'scratch', 'staging', 'shared', 'scratch,staging', + 'scratch,shared', 'staging,shared', 'scratch,staging,shared', + 'scratch,empties', 'scratch,shared,empties', 'staging,empties', + ]) + def test_execute_dag(self, demo_dag, keep, tmp_path): + storage_manager = self.get_storage_manager(keep, tmp_path) + + dag_label = "dag" + result = self.execute(storage_manager, demo_dag, dag_label) + + self.assert_dag_result(result, demo_dag, storage_manager) + self.assert_empty_directories(storage_manager, demo_dag) + self.assert_shared_and_permanent(storage_manager, demo_dag) + self.assert_scratch(storage_manager) + self.assert_staging(storage_manager, demo_dag) + + +class TestExecuteStorageDemoDiffBackends(ExecutionStorageDemoTest): + """ + Test execution when permanent and shared are different MemoryStorages. + + This is considered the standard base case; this should be easiest to + pass, as there should be no special case code that needs to be invoked. + """ + def get_shared_and_permanent(self): + return MemoryStorage(), MemoryStorage() + + +class TestExecuteStorageDemoSameBackend(ExecutionStorageDemoTest): + """ + Test execution when permanent and shared are the same MemoryStorage. + """ + def get_shared_and_permanent(self): + backend = MemoryStorage() + return backend, backend + + def assert_shared_and_permanent(self, storage_manager, dag): + shared = storage_manager.shared_root + permanent = storage_manager.permanent_root + u1_label = self.u1_label(dag) + keep_shared = storage_manager.keep_shared + + perm_file = f"{u1_label}/permanent.txt" + shared_file = f"{u1_label}/shared.txt" + nested_file = f"{u1_label}/nested/shared.txt" + implicit_nested_file = f"{u1_label}/implicit/nested.txt" + + assert shared is permanent + # we'll test everything in permanent, because shared is identical + + if keep_shared: + expected = {perm_file, shared_file, nested_file, + implicit_nested_file} + else: + expected = {perm_file} + + assert set(permanent.iter_contents()) == expected + with permanent.load_stream(perm_file) as f: + assert f.read() == b"I'm permanent (but I can be shared)" + + if keep_shared: + with permanent.load_stream(shared_file) as f: + assert f.read() == b"I can be shared" + + +class TestExecuteStorageDemoStagingOverlap(TestExecuteStorageDemoSameBackend): + """ + Test execution when permanent and shared overlap with staging. + + This represents the approach we will probably actually use. In this + case, we use identical FileStorage for shared and permanent, and those + overlap with the staging directory. The result is that file locations + don't actually change. + """ + def get_shared_and_permanent(self): + ... # override the need for this; not the prettiest, but it works + + def get_storage_manager(self, keep, tmp_path): + keep_scr, keep_sta, keep_sha, empties = self._parse_keep(keep) + backend = FileStorage(tmp_path) + storage_manager = StorageManager( + scratch_root=tmp_path, + shared_root=backend, + permanent_root=backend, + keep_scratch=keep_scr, + keep_staging=keep_sta, + keep_shared=keep_sha, + keep_empty_dirs=empties, + staging="", + ) + return storage_manager + + def test_overlap_directories(self, tmp_path): + # test that the staging and shared/permanent backends overlap as + # expected; basic idea is that creating a file in staging + # automatically creates a file in the shared/permanent external + # resources + storage_manager = self.get_storage_manager(keep="nothing", + tmp_path=tmp_path) + shared = storage_manager.shared_root + perm = storage_manager.permanent_root + stage1 = storage_manager.shared_staging / "foo/file.txt" + assert len(storage_manager.shared_staging.registry) == 0 + assert list(shared.iter_contents()) == [] + assert list(perm.iter_contents()) == [] + pathlib.Path(stage1).touch() + assert len(storage_manager.shared_staging.registry) == 1 + # because iter_contents actually loops over the directory, these + # have effectively been automatically added to the external storage + # resources + assert list(shared.iter_contents()) == ["foo/file.txt"] + assert list(perm.iter_contents()) == ["foo/file.txt"] + + def assert_shared_and_permanent(self, storage_manager, dag): + shared = storage_manager.shared_root + permanent = storage_manager.permanent_root + u1_label = self.u1_label(dag) + keep_shared = storage_manager.keep_shared + keep_scratch = storage_manager.keep_scratch + + perm_file = f"{u1_label}/permanent.txt" + shared_file = f"{u1_label}/shared.txt" + nested_file = f"{u1_label}/nested/shared.txt" + implicit_nested_file = f"{u1_label}/implicit/nested.txt" + scratch_file = f"scratch/{u1_label}/scratch.txt" + + assert shared is permanent + # we'll test everything in permanent, because shared is identical + + expected = {perm_file} + + if keep_shared: + expected.update({shared_file, nested_file, + implicit_nested_file}) + + if keep_scratch: + expected.add(scratch_file) + + assert set(permanent.iter_contents()) == expected + with permanent.load_stream(perm_file) as f: + assert f.read() == b"I'm permanent (but I can be shared)" + + if keep_shared: + with permanent.load_stream(shared_file) as f: + assert f.read() == b"I can be shared" + + if keep_scratch: + with permanent.load_stream(scratch_file) as f: + assert f.read() == b"This is scratch -- can't be shared" + + def assert_staging(self, storage_manager, dag): + # in this case, keep_staging is ignored in favor of the behavior of + # keep_shared + keep_shared = storage_manager.keep_shared + u1_label = self.u1_label(dag) + scratch_root = storage_manager.scratch_root + u1_staging = scratch_root / u1_label + + assert (u1_staging / "permanent.txt").exists() + + if keep_shared: + assert (u1_staging / "shared.txt").exists() + + def assert_empty_directories(self, storage_manager, dag): + # in this case, the staging directories should not have been cleaned + # (because they overlap with storage), so all should exist. They + # will only be empty if `keep_shared is False`. NOTE: I don't think + # it would be an API break here if code changed to ensure that all + # empty directories were cleaned out + u1_label = self.u1_label(dag) + staging = storage_manager.scratch_root / storage_manager.staging + + directories = [ + staging / u1_label / "nested", + staging / u1_label / "implicit", + ] + + for directory in directories: + assert directory.exists() + assert directory.is_dir() + expected_empty = not storage_manager.keep_shared + actual_empty = len(list(directory.iterdir())) == 0 + assert actual_empty == expected_empty diff --git a/gufe/tests/storage/test_storagemanager.py b/gufe/tests/storage/test_storagemanager.py new file mode 100644 index 00000000..9be3b3af --- /dev/null +++ b/gufe/tests/storage/test_storagemanager.py @@ -0,0 +1,300 @@ +import pytest +from gufe.storage.storagemanager import StorageManager +from gufe.storage.externalresource import MemoryStorage, FileStorage +from pathlib import Path + + +@pytest.fixture +def storage_manager_std(tmp_path): + return StorageManager( + scratch_root=tmp_path / "working", + shared_root=MemoryStorage(), + permanent_root=MemoryStorage(), + ) + + +@pytest.fixture +def dag_units(): + class Unit1: + key = "unit1" + + def run(self, scratch, shared, permanent): + (scratch / "foo.txt").touch() + with open(shared / "bar.txt", mode='w') as f: + f.write("bar was written") + with open(permanent / "baz.txt", mode='w') as f: + f.write("baz was written") + + return "done 1" + + class Unit2: + key = "unit2" + + def run(self, scratch, shared, permanent): + (scratch / "foo2.txt").touch() + # TODO: this will change; the inputs should include a way to get + # the previous shared unit label + prev_shared = shared.root / "dag/unit1_attempt_0" + with open(prev_shared / "bar.txt", mode='r') as f: + bar = f.read() + + # note that you can open a file from permanent as if it was + # from shared -- everything in permanent is in shared + with open(prev_shared / "baz.txt", mode='r') as f: + baz = f.read() + + return {"bar": bar, "baz": baz} + + return [Unit1(), Unit2()] + + +class LifecycleHarness: + @pytest.fixture + def storage_manager(self, tmp_path): + raise NotImplementedError() + + @staticmethod + def get_files_dict(storage_manager): + root = storage_manager.scratch_root + staging = storage_manager.staging + return { + "foo": root / "scratch/dag/unit1_attempt_0/foo.txt", + "foo2": root / "scratch/dag/unit2_attempt_0/foo2.txt", + "bar": root / staging / "dag/unit1_attempt_0/bar.txt", + "baz": root / staging / "dag/unit1_attempt_0/baz.txt", + } + + def test_lifecycle(self, storage_manager, dag_units, tmp_path): + results = [] + dag_label = "dag" + with storage_manager.running_dag(dag_label) as dag_ctx: + for unit in dag_units: + label = f"{dag_label}/{unit.key}" + with dag_ctx.running_unit(dag_label, unit.key, attempt=0) as ( + context + ): + # import pdb; pdb.set_trace() + scratch = context.scratch + shared = context.shared + perm = context.permanent + results.append(unit.run(scratch, shared, perm)) + self.in_unit_asserts(storage_manager, label) + self.after_unit_asserts(storage_manager, label) + self.after_dag_asserts(storage_manager) + assert results == [ + "done 1", + {"bar": "bar was written", "baz": "baz was written"} + ] + + def _in_unit_existing_files(self, unit_label): + raise NotImplementedError() + + def _after_unit_existing_files(self, unit_label): + raise NotImplementedError() + + def _after_dag_existing_files(self): + raise NotImplementedError() + + @staticmethod + def assert_existing_files(files_dict, existing): + for file in existing: + assert files_dict[file].exists() + + for file in set(files_dict) - existing: + assert not files_dict[file].exists() + + def _in_staging_shared(self, unit_label, in_after): + """ + This is to include things when a shared staging directory reports + that files exist in it. + """ + return set() + + def _in_staging_permanent(self, unit_label, in_after): + """ + This is to include things when a permanent staging directory reports + that files exist in it. + """ + return set() + + def in_unit_asserts(self, storage_manager, unit_label): + # check that shared and permanent are correct + shared_root = storage_manager.shared_root + permanent_root = storage_manager.permanent_root + expected_in_shared = { + "dag/unit1": set(), + "dag/unit2": {"dag/unit1_attempt_0/bar.txt", + "dag/unit1_attempt_0/baz.txt"} + }[unit_label] | self._in_staging_shared(unit_label, "in") + assert set(shared_root.iter_contents()) == expected_in_shared + + expected_in_permanent = self._in_staging_permanent(unit_label, "in") + assert set(permanent_root.iter_contents()) == expected_in_permanent + + # manager-specific check for files + files_dict = self.get_files_dict(storage_manager) + existing = self._in_unit_existing_files(unit_label) + self.assert_existing_files(files_dict, existing) + + def after_unit_asserts(self, storage_manager, unit_label): + shared_root = storage_manager.shared_root + permanent_root = storage_manager.permanent_root + shared_extras = self._in_staging_shared(unit_label, "after") + permanent_extras = self._in_staging_permanent(unit_label, "after") + expected_in_shared = {"dag/unit1_attempt_0/bar.txt", + "dag/unit1_attempt_0/baz.txt"} + expected_in_shared |= shared_extras + assert set(shared_root.iter_contents()) == expected_in_shared + assert set(permanent_root.iter_contents()) == permanent_extras + + # manager-specific check for files + files_dict = self.get_files_dict(storage_manager) + existing = self._after_unit_existing_files(unit_label) + self.assert_existing_files(files_dict, existing) + + def after_dag_asserts(self, storage_manager): + permanent_root = storage_manager.permanent_root + permanent_extras = self._in_staging_permanent('dag/unit2', "after") + # shared still contains everything it had; but this isn't something + # we guarantee, so we don't actually test for it, but we could with + # this: + # shared_root = storage_manager.shared_root + # shared_extras = self._in_staging_shared('dag/unit2', "after") + # expected_in_shared = {"dag/unit1/bar.txt", "dag/unit1/baz.txt"} + # expected_in_shared |= shared_extras + # assert set(shared_root.iter_contents()) == expected_in_shared + expected_in_permanent = ({"dag/unit1_attempt_0/baz.txt"} + | permanent_extras) + assert set(permanent_root.iter_contents()) == expected_in_permanent + + # manager-specific check for files + files_dict = self.get_files_dict(storage_manager) + existing = self._after_dag_existing_files() + self.assert_existing_files(files_dict, existing) + + +class TestStandardStorageManager(LifecycleHarness): + @pytest.fixture + def storage_manager(self, storage_manager_std): + return storage_manager_std + + def _in_unit_existing_files(self, unit_label): + return { + "dag/unit1": {'bar', 'baz', 'foo'}, + "dag/unit2": {'foo2', 'baz', 'bar'}, + # bar was deleted, but gets brought back in unit2 + }[unit_label] + + def _after_unit_existing_files(self, unit_label): + # Same for both units because unit2 doesn't add anything to + # shared/permanent; in this one, only files staged for permanent + # should remain + return {'baz'} + + def _after_dag_existing_files(self): + return set() + + +class TestKeepScratchAndStagingStorageManager(LifecycleHarness): + @pytest.fixture + def storage_manager(self, tmp_path): + return StorageManager( + scratch_root=tmp_path / "working", + shared_root=MemoryStorage(), + permanent_root=MemoryStorage(), + keep_scratch=True, + keep_staging=True + ) + + @staticmethod + def files_after_unit(unit_label): + unit1 = {'bar', 'baz', 'foo'} + unit2 = {'foo2', 'baz'} + return { + 'dag/unit1': unit1, + 'dag/unit2': unit1 | unit2 + }[unit_label] + + def _in_unit_existing_files(self, unit_label): + return self.files_after_unit(unit_label) + + def _after_unit_existing_files(self, unit_label): + return self.files_after_unit(unit_label) + + def _after_dag_existing_files(self): + return self.files_after_unit('dag/unit2') + + +class TestStagingOverlapsSharedStorageManager(LifecycleHarness): + @pytest.fixture + def storage_manager(self, tmp_path): + root = tmp_path / "working" + return StorageManager( + scratch_root=root, + shared_root=FileStorage(root), + permanent_root=MemoryStorage(), + staging="", + ) + + def _in_unit_existing_files(self, unit_label): + return { + "dag/unit1": {'foo', 'bar', 'baz'}, + "dag/unit2": {'foo2', 'bar', 'baz'}, + }[unit_label] + + def _after_unit_existing_files(self, unit_label): + # same for both; all files come from unit 1 + return {"bar", "baz"} + + def _after_dag_existing_files(self): + # these get deleted because we don't keep shared here + return set() + + def _in_staging_shared(self, unit_label, in_after): + bar = "dag/unit1_attempt_0/bar.txt" + baz = "dag/unit1_attempt_0/baz.txt" + foo = "scratch/dag/unit1_attempt_0/foo.txt" + foo2 = "scratch/dag/unit2_attempt_0/foo2.txt" + return { + ("dag/unit1", "in"): {bar, baz, foo}, + ("dag/unit1", "after"): {bar, baz}, + ("dag/unit2", "in"): {bar, baz, foo2}, + ("dag/unit2", "after"): {baz} + }[unit_label, in_after] + + +class TestStagingOverlapsPermanentStorageManager(LifecycleHarness): + @pytest.fixture + def storage_manager(self, tmp_path): + root = tmp_path / "working" + return StorageManager( + scratch_root=root, + permanent_root=FileStorage(root), + shared_root=MemoryStorage(), + staging="", + ) + + def _in_unit_existing_files(self, unit_label): + return { + "dag/unit1": {'foo', 'bar', 'baz'}, + "dag/unit2": {"foo2", "baz", "bar"}, # bar is resurrected + }[unit_label] + + def _after_dag_existing_files(self): + return {"baz"} + + def _in_staging_permanent(self, unit_label, in_after): + bar = "dag/unit1_attempt_0/bar.txt" + baz = "dag/unit1_attempt_0/baz.txt" + foo = "scratch/dag/unit1_attempt_0/foo.txt" + foo2 = "scratch/dag/unit2_attempt_0/foo2.txt" + return { + ("dag/unit1", "in"): {bar, baz, foo}, + ("dag/unit1", "after"): {baz}, + ("dag/unit2", "in"): {baz, foo2, bar}, # bar is resurrected + ("dag/unit2", "after"): {baz} + }[unit_label, in_after] + + def _after_unit_existing_files(self, unit_label): + # same for both; all files come from unit 1 + return {"baz"} diff --git a/gufe/tests/test_protocol.py b/gufe/tests/test_protocol.py index f700fa45..6a468bfd 100644 --- a/gufe/tests/test_protocol.py +++ b/gufe/tests/test_protocol.py @@ -696,7 +696,8 @@ def test_execute_DAG_retries(solvated_ligand, vacuum_ligand, tmpdir): # we did 3 retries, so 4 total failures assert len(r.protocol_unit_results) == 5 assert len(r.protocol_unit_failures) == 4 - assert len(list(shared.iterdir())) == 5 + assert len(list(shared.iterdir())) == 0 + # assert len(list(shared.iterdir())) == 5 def test_execute_DAG_bad_nretries(solvated_ligand, vacuum_ligand, tmpdir): diff --git a/gufe/tests/test_protocoldag.py b/gufe/tests/test_protocoldag.py index a4a08a7b..e5fe8752 100644 --- a/gufe/tests/test_protocoldag.py +++ b/gufe/tests/test_protocoldag.py @@ -7,6 +7,7 @@ import gufe from gufe.protocols import execute_DAG +from gufe.protocols.protocoldag import ReproduceOldBehaviorStorageManager class WriterUnit(gufe.ProtocolUnit): @@ -14,9 +15,9 @@ class WriterUnit(gufe.ProtocolUnit): def _execute(ctx, **inputs): my_id = inputs['identity'] - with open(os.path.join(ctx.shared, f'unit_{my_id}_shared.txt'), 'w') as out: + with open(ctx.shared / f'unit_{my_id}_shared.txt', 'w') as out: out.write(f'unit {my_id} existed!\n') - with open(os.path.join(ctx.scratch, f'unit_{my_id}_scratch.txt'), 'w') as out: + with open(ctx.scratch / f'unit_{my_id}_scratch.txt', 'w') as out: out.write(f'unit {my_id} was here\n') return { @@ -72,6 +73,40 @@ def writefile_dag(): return p.create(stateA=s1, stateB=s2, mapping={}) +class TestReproduceOldBehaviorStorageManager: + def test_context(self, tmp_path): + # check that the paths are the ones we expect + base = tmp_path / "working" + manager = ReproduceOldBehaviorStorageManager.from_old_args( + scratch_basedir=base, + shared_basedir=base + ) + dag_label = "dag" + unit_label = "unit" + expected_scratch = "working/dag/scratch_unit_attempt_0/scratch.txt" + expected_shared = "working/dag/shared_unit_attempt_0/shared.txt" + expected_perm = "working/dag/shared_unit_attempt_0/perm.txt" + with manager.running_dag(dag_label) as dag_ctx: + with dag_ctx.running_unit( + dag_label, unit_label, attempt=0 + ) as ctx: + scratch_f = ctx.scratch / "scratch.txt" + shared_f = ctx.shared / "shared.txt" + perm_f = ctx.permanent / "perm.txt" + + found_scratch = pathlib.Path(scratch_f).relative_to(tmp_path) + found_shared = pathlib.Path(shared_f.fspath).relative_to(tmp_path) + found_perm = pathlib.Path(perm_f.fspath).relative_to(tmp_path) + + assert str(found_scratch) == expected_scratch + assert str(found_shared) == expected_shared + assert str(found_perm) == expected_perm + # the label is the relative path to the base directory for a + # FileStorage + assert "working/" + shared_f.label == expected_shared + assert "working/" + perm_f.label == expected_perm + + @pytest.mark.parametrize('keep_shared', [False, True]) @pytest.mark.parametrize('keep_scratch', [False, True]) def test_execute_dag(tmpdir, keep_shared, keep_scratch, writefile_dag): @@ -94,12 +129,12 @@ def test_execute_dag(tmpdir, keep_shared, keep_scratch, writefile_dag): # will have produced 4 files in scratch and shared directory for pu in writefile_dag.protocol_units: identity = pu.inputs['identity'] - shared_file = os.path.join(shared, - f'shared_{str(pu.key)}_attempt_0', - f'unit_{identity}_shared.txt') - scratch_file = os.path.join(scratch, - f'scratch_{str(pu.key)}_attempt_0', - f'unit_{identity}_scratch.txt') + shared_file = (shared + / f'shared_{str(pu.key)}_attempt_0' + / f'unit_{identity}_shared.txt') + scratch_file = (scratch + / f'scratch_{str(pu.key)}_attempt_0' + / f'unit_{identity}_scratch.txt') if keep_shared: assert os.path.exists(shared_file) else: diff --git a/gufe/tests/test_protocolunit.py b/gufe/tests/test_protocolunit.py index 9896f856..2daaa687 100644 --- a/gufe/tests/test_protocolunit.py +++ b/gufe/tests/test_protocolunit.py @@ -57,8 +57,12 @@ def test_execute(self, tmpdir): scratch = Path('scratch') / str(unit.key) scratch.mkdir(parents=True) - ctx = Context(shared=shared, scratch=scratch) - + permanent = Path('permanent') / str(unit.key) + permanent.mkdir(parents=True) + + ctx = Context(shared=shared, scratch=scratch, + permanent=permanent) + u: ProtocolUnitFailure = unit.execute(context=ctx, an_input=3) assert u.exception[0] == "ValueError" @@ -70,8 +74,12 @@ def test_execute(self, tmpdir): scratch = Path('scratch') / str(unit.key) scratch.mkdir(parents=True) - ctx = Context(shared=shared, scratch=scratch) - + permanent = Path('permanent') / str(unit.key) + permanent.mkdir(parents=True) + + ctx = Context(shared=shared, scratch=scratch, + permanent=permanent) + # now try actually letting the error raise on execute with pytest.raises(ValueError, match="should always be 2"): unit.execute(context=ctx, raise_error=True, an_input=3) @@ -87,8 +95,12 @@ def test_execute_KeyboardInterrupt(self, tmpdir): scratch = Path('scratch') / str(unit.key) scratch.mkdir(parents=True) - ctx = Context(shared=shared, scratch=scratch) - + permanent = Path('permanent') / str(unit.key) + permanent.mkdir(parents=True) + + ctx = Context(shared=shared, scratch=scratch, + permanent=permanent) + with pytest.raises(KeyboardInterrupt): unit.execute(context=ctx, an_input=3) diff --git a/gufe/utils.py b/gufe/utils.py index f9d3b0ff..519835d5 100644 --- a/gufe/utils.py +++ b/gufe/utils.py @@ -4,6 +4,12 @@ import io import warnings +from os import PathLike, rmdir +import pathlib + +import logging +_logger = logging.getLogger(__name__) + class ensure_filelike: """Context manager to convert pathlike or filelike to filelike. @@ -52,3 +58,24 @@ def __exit__(self, type, value, traceback): if self.do_close: self.context.close() + +def delete_empty_dirs(root: PathLike, delete_root: bool = True): + """Delete all empty directories. + + Repeats so that directories that only contained empty directories also + get deleted. + """ + root = pathlib.Path(root) + + def find_empty_dirs(directory): + if not (paths := list(directory.iterdir())): + return [directory] + directories = [p for p in paths if p.is_dir()] + return sum([find_empty_dirs(d) for d in directories], []) + + while root.exists() and (empties := find_empty_dirs(root)): + if empties == [root] and not delete_root: + return + for directory in empties: + _logger.debug(f"Removing '{directory}'") + rmdir(directory)