Skip to content

Commit

Permalink
first stab at caching metadata
Browse files Browse the repository at this point in the history
  • Loading branch information
msmitherdc authored Aug 27, 2024
1 parent 3f10e03 commit 5696d3a
Showing 1 changed file with 49 additions and 4 deletions.
53 changes: 49 additions & 4 deletions cloudpathlib/s3/s3client.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,19 @@
import dataclasses
import mimetypes
import os
from pathlib import Path, PurePosixPath
from typing import Any, Callable, Dict, Iterable, Optional, Tuple, Union
from typing import (
Any,
Callable,
Dict,
Iterable,
Optional,
TYPE_CHECKING,
Tuple,
Union,
MutableMapping,
)
from weakref import WeakKeyDictionary

from ..client import Client, register_client_class
from ..cloudpath import implementation_registry
Expand All @@ -18,6 +30,10 @@
except ModuleNotFoundError:
implementation_registry["s3"].dependencies_loaded = False

@dataclasses.dataclass
class PathMetadata:
is_file_or_dir: Optional[str]


@register_client_class("s3")
class S3Client(Client):
Expand Down Expand Up @@ -126,6 +142,8 @@ def __init__(
for k in ["RequestPayer", "ExpectedBucketOwner"]
if k in self._extra_args
}

self._metadata_cache: MutableMapping[S3Path, PathMetadata] = WeakKeyDictionary()

super().__init__(
local_cache_dir=local_cache_dir,
Expand All @@ -138,7 +156,7 @@ def _get_metadata(self, cloud_path: S3Path) -> Dict[str, Any]:
data = self.s3.ObjectSummary(cloud_path.bucket, cloud_path.key).get(
**self.boto3_dl_extra_args
)

self._set_metadata_cache(cloud_path, "file")
return {
"last_modified": data["LastModified"],
"size": data["ContentLength"],
Expand Down Expand Up @@ -252,8 +270,10 @@ def _list_dir(self, cloud_path: S3Path, recursive=False) -> Iterable[Tuple[S3Pat
for parent in PurePosixPath(o_relative_path).parents:
parent_canonical = prefix + str(parent).rstrip("/")
if parent_canonical not in yielded_dirs and str(parent) != ".":
path = self.CloudPath(f"s3://{cloud_path.bucket}/{parent_canonical}")
self._set_metadata_cache(path, "dir")
yield (
self.CloudPath(f"s3://{cloud_path.bucket}/{parent_canonical}"),
path,
True,
)
yielded_dirs.add(parent_canonical)
Expand All @@ -265,8 +285,10 @@ def _list_dir(self, cloud_path: S3Path, recursive=False) -> Iterable[Tuple[S3Pat

# s3 fake directories have 0 size and end with "/"
if result_key.get("Key").endswith("/") and result_key.get("Size") == 0:
path = self.CloudPath(f"s3://{cloud_path.bucket}/{canonical}")
self._set_metadata_cache(path, "file")
yield (
self.CloudPath(f"s3://{cloud_path.bucket}/{canonical}"),
path,
True,
)
yielded_dirs.add(canonical)
Expand Down Expand Up @@ -298,12 +320,14 @@ def _move_file(self, src: S3Path, dst: S3Path, remove_src: bool = True) -> S3Pat
)

if remove_src:
self._set_metadata_cache(src, None)
self._remove(src)
return dst

def _remove(self, cloud_path: S3Path, missing_ok: bool = True) -> None:
file_or_dir = self._is_file_or_dir(cloud_path=cloud_path)
if file_or_dir == "file":
self._set_metadata_cache(cloud_path, None)
resp = self.s3.Object(cloud_path.bucket, cloud_path.key).delete(
**self.boto3_list_extra_args
)
Expand All @@ -323,6 +347,13 @@ def _remove(self, cloud_path: S3Path, missing_ok: bool = True) -> None:
resp = bucket.objects.filter(Prefix=prefix, **self.boto3_list_extra_args).delete(
**self.boto3_list_extra_args
)

files = [
path for path, is_dir in self._list_dir(cloud_path, recursive=True) if not is_dir
]
for path in files:
self._set_metadata_cache(path, None)

if resp[0].get("ResponseMetadata").get("HTTPStatusCode") not in (204, 200):
raise CloudPathException(
f"Delete operation failed for {cloud_path} with response: {resp}"
Expand All @@ -348,6 +379,20 @@ def _upload_file(self, local_path: Union[str, os.PathLike], cloud_path: S3Path)

obj.upload_file(str(local_path), Config=self.boto3_transfer_config, ExtraArgs=extra_args)
return cloud_path

def _set_metadata_cache(self, cloud_path: S3Path, is_file_or_dir: Optional[str]) -> None:
if is_file_or_dir is None:
self._metadata_cache[cloud_path] = PathMetadata(is_file_or_dir=is_file_or_dir)
# If a file/dir is now known to not exist, its parent directories may no longer exist
# either, since cloud directories only exist if they have a file in them. Since their
# state is no longer known we remove them from the cache.
for parent in cloud_path.parents:
if parent in self._metadata_cache:
del self._metadata_cache[parent]
else:
self._metadata_cache[cloud_path] = PathMetadata(is_file_or_dir=is_file_or_dir)

def clear_metadata_cache(self) -> None:
self._metadata_cache.clear()

S3Client.S3Path = S3Client.CloudPath # type: ignore

0 comments on commit 5696d3a

Please sign in to comment.