From d2dd3e4fdc59621abc75009edc5ab9796ffceeca Mon Sep 17 00:00:00 2001 From: Gal Topper Date: Mon, 13 Nov 2023 18:49:03 +0800 Subject: [PATCH 1/2] Append to file when opened with mode `a` Relates to [ML-4910](https://jira.iguazeng.com/browse/ML-4910). --- tests/test_file.py | 16 ++++++++++++++++ v3iofs/file.py | 3 ++- 2 files changed, 18 insertions(+), 1 deletion(-) diff --git a/tests/test_file.py b/tests/test_file.py index 23f25d3..3fb517f 100644 --- a/tests/test_file.py +++ b/tests/test_file.py @@ -37,6 +37,22 @@ def test_upload_chunk(fs: V3ioFS, tmp_obj): assert expected == data, "bad data" +def test_write_truncate_and_append(fs: V3ioFS, tmp_obj): + with V3ioFile(fs, tmp_obj.path, "wb") as v3f: + v3f.write(b"123") + + with fs.open(tmp_obj.path, "rb") as fp: + data = fp.read() + assert data == b"123" + + with V3ioFile(fs, tmp_obj.path, "ab") as v3f: + v3f.write(b"456") + + with fs.open(tmp_obj.path, "rb") as fp: + data = fp.read() + assert data == b"123456" + + def test_initiate_upload(fs: V3ioFS, tmp_obj): fs.touch(tmp_obj.path) assert fs.exists(tmp_obj.path) diff --git a/v3iofs/file.py b/v3iofs/file.py index b1cf820..3ef2f07 100644 --- a/v3iofs/file.py +++ b/v3iofs/file.py @@ -62,4 +62,5 @@ def _upload_chunk(self, final=False): def _initiate_upload(self): """Create remote file/upload""" - self.fs.rm_file(self.path) + if "a" not in self.mode: + self.fs.rm_file(self.path) From 52fee3667e35792d36dda0c1866c339a07135854 Mon Sep 17 00:00:00 2001 From: Gal Topper Date: Tue, 14 Nov 2023 14:16:34 +0800 Subject: [PATCH 2/2] Fix cache to fix transient test failure, add debug flag --- tests/test_file.py | 12 ++++++------ v3iofs/fs.py | 18 +++++++++++++++--- 2 files changed, 21 insertions(+), 9 deletions(-) diff --git a/tests/test_file.py b/tests/test_file.py index 3fb517f..fc14954 100644 --- a/tests/test_file.py +++ b/tests/test_file.py @@ -12,11 +12,11 @@ # See the License for the specific language governing permissions and # limitations under the License. -from v3iofs import V3ioFile, V3ioFS +from v3iofs import V3ioFS def test_fetch_range(fs: V3ioFS, tmp_obj): - v3f = V3ioFile(fs, tmp_obj.path) + v3f = fs.open(tmp_obj.path) start, end = 3, len(tmp_obj.data) - 3 data = v3f._fetch_range(start, end) expected = tmp_obj.data[start:end] @@ -24,7 +24,7 @@ def test_fetch_range(fs: V3ioFS, tmp_obj): def test_upload_chunk(fs: V3ioFS, tmp_obj): - v3f = V3ioFile(fs, tmp_obj.path, "ab") + v3f = fs.open(tmp_obj.path, "ab") chunk = b"::chunk of data" v3f.buffer.write(chunk) v3f._upload_chunk() @@ -38,14 +38,14 @@ def test_upload_chunk(fs: V3ioFS, tmp_obj): def test_write_truncate_and_append(fs: V3ioFS, tmp_obj): - with V3ioFile(fs, tmp_obj.path, "wb") as v3f: + with fs.open(tmp_obj.path, "wb") as v3f: v3f.write(b"123") with fs.open(tmp_obj.path, "rb") as fp: data = fp.read() assert data == b"123" - with V3ioFile(fs, tmp_obj.path, "ab") as v3f: + with fs.open(tmp_obj.path, "ab") as v3f: v3f.write(b"456") with fs.open(tmp_obj.path, "rb") as fp: @@ -56,7 +56,7 @@ def test_write_truncate_and_append(fs: V3ioFS, tmp_obj): def test_initiate_upload(fs: V3ioFS, tmp_obj): fs.touch(tmp_obj.path) assert fs.exists(tmp_obj.path) - v3f = V3ioFile(fs, tmp_obj.path, "wb") + v3f = fs.open(tmp_obj.path, "wb") v3f._initiate_upload() assert not fs.exists(tmp_obj.path) # should not fail even if the file does not exist diff --git a/v3iofs/fs.py b/v3iofs/fs.py index 3f5a323..70fd9d8 100644 --- a/v3iofs/fs.py +++ b/v3iofs/fs.py @@ -96,16 +96,20 @@ class V3ioFS(AbstractFileSystem): cache_capacity: int | str | None limits the size of the cache. If cache_validity_seconds is not set, this parameter has no effect. Default is 128. + debug: bool + Turn on transport debug logs. Default is False. **kw: Passed to fsspec.AbstractFileSystem """ protocol = "v3io" - def __init__(self, v3io_api=None, v3io_access_key=None, cache_validity_seconds=None, cache_capacity=None, **kw): + def __init__( + self, v3io_api=None, v3io_access_key=None, cache_validity_seconds=None, cache_capacity=None, debug=False, **kw + ): # TODO: Support storage options for creds (in kw) super().__init__(**kw) - self._client = _new_client(v3io_api, v3io_access_key) + self._client = _new_client(v3io_api, v3io_access_key, debug) self._cache = None if cache_validity_seconds is None: cache_validity_seconds = 2 @@ -338,6 +342,8 @@ def _open( cache_options=None, **kw, ): + if mode != "rb": + self._cache.delete_if_exists(path) return V3ioFile( fs=self, path=path, @@ -450,11 +456,17 @@ def _has_data(resp): return hasattr(out, "common_prefixes") or hasattr(out, "contents") -def _new_client(v3io_api=None, v3io_access_key=None) -> Client: +def _new_client(v3io_api=None, v3io_access_key=None, debug=False) -> Client: v3io_api = v3io_api or environ.get("V3IO_API") v3io_access_key = v3io_access_key or environ.get("V3IO_ACCESS_KEY") + client_kwargs = {} + if debug: + client_kwargs["logger_verbosity"] = "DEBUG" + client_kwargs["transport_verbosity"] = "DEBUG" + return Client( endpoint=v3io_api, access_key=v3io_access_key, + **client_kwargs, )