diff --git a/python/kvikio/kvikio/_lib/arr.pxd b/python/kvikio/kvikio/_lib/arr.pxd index 47bad21a3b..52ca5d20f6 100644 --- a/python/kvikio/kvikio/_lib/arr.pxd +++ b/python/kvikio/kvikio/_lib/arr.pxd @@ -31,6 +31,6 @@ cdef class Array: cpdef Array asarray(obj) -cdef pair[uintptr_t, size_t] parse_buffer_argument( - buf, size, bint accept_host_buffer +cdef pair[uintptr_t, Py_ssize_t] parse_buffer_argument( + buf, Py_ssize_t size, bint accept_host_buffer ) except * diff --git a/python/kvikio/kvikio/_lib/arr.pyx b/python/kvikio/kvikio/_lib/arr.pyx index 45c7430313..e4b147bdae 100644 --- a/python/kvikio/kvikio/_lib/arr.pyx +++ b/python/kvikio/kvikio/_lib/arr.pyx @@ -1,6 +1,7 @@ # Copyright (c) 2020-2024, NVIDIA CORPORATION. All rights reserved. # See file LICENSE for terms. +# distutils: language = c++ # cython: language_level=3 @@ -13,6 +14,7 @@ from cpython.tuple cimport PyTuple_New, PyTuple_SET_ITEM from cython cimport auto_pickle, boundscheck, initializedcheck, nonecheck, wraparound from libc.stdint cimport uintptr_t from libc.string cimport memcpy +from libcpp.utility cimport pair try: from numpy import dtype as numpy_dtype @@ -305,22 +307,21 @@ cpdef Array asarray(obj): return Array(obj) -cdef pair[uintptr_t, size_t] parse_buffer_argument( - buf, size, bint accept_host_buffer +cdef pair[uintptr_t, Py_ssize_t] parse_buffer_argument( + buf, Py_ssize_t nbytes, bint accept_host_buffer ) except *: """Parse `buf` and `size` argument and return a pointer and nbytes""" - if not isinstance(buf, Array): - buf = Array(buf) - cdef Array arr = buf + cdef Array arr = asarray(buf) + if not arr._contiguous(): raise ValueError("Array must be contiguous") if not accept_host_buffer and not arr.cuda: raise ValueError("Non-CUDA buffers not supported") - cdef size_t nbytes - if size is None: - nbytes = arr.nbytes - elif size > arr.nbytes: + + cdef Py_ssize_t arr_nbytes = arr._nbytes() + if nbytes < 0: + nbytes = arr_nbytes + elif nbytes > arr_nbytes: raise ValueError("Size is greater than the size of the buffer") - else: - nbytes = size - return pair[uintptr_t, size_t](arr.ptr, nbytes) + + return pair(arr.ptr, nbytes) diff --git a/python/kvikio/kvikio/_lib/file_handle.pyx b/python/kvikio/kvikio/_lib/file_handle.pyx index 7a8de368ef..c606ea2621 100644 --- a/python/kvikio/kvikio/_lib/file_handle.pyx +++ b/python/kvikio/kvikio/_lib/file_handle.pyx @@ -9,6 +9,7 @@ from typing import Optional from posix cimport fcntl +from cython cimport bint from libc.stdint cimport uintptr_t from libcpp cimport bool from libcpp.string cimport string @@ -89,9 +90,6 @@ cdef extern from "" namespace "kvikio" nogil: cdef class CuFile: - """File handle for GPUDirect Storage (GDS)""" - cdef FileHandle _handle - def __init__(self, file_path, flags="r"): self._handle = move( FileHandle( @@ -100,10 +98,10 @@ cdef class CuFile: ) ) - def close(self) -> None: + cpdef close(self) -> None: self._handle.close() - def closed(self) -> bool: + cpdef closed(self) -> bint: return self._handle.closed() def fileno(self) -> int: @@ -112,8 +110,8 @@ cdef class CuFile: def open_flags(self) -> int: return self._handle.fd_open_flags() - def pread(self, buf, size: Optional[int], file_offset: int, task_size) -> IOFuture: - cdef pair[uintptr_t, size_t] info = parse_buffer_argument(buf, size, True) + def pread(self, buf, size: Py_ssize_t = -1, file_offset: Py_ssize_t = 0, task_size: Py_ssize_t = 0) -> IOFuture: + cdef pair[uintptr_t, Py_ssize_t] info = parse_buffer_argument(buf, size, True) return _wrap_io_future( self._handle.pread( info.first, @@ -123,8 +121,8 @@ cdef class CuFile: ) ) - def pwrite(self, buf, size: Optional[int], file_offset: int, task_size) -> IOFuture: - cdef pair[uintptr_t, size_t] info = parse_buffer_argument(buf, size, True) + def pwrite(self, buf, size: Py_ssize_t = -1, file_offset: Py_ssize_t = 0, task_size: Py_ssize_t 0) -> IOFuture: + cdef pair[uintptr_t, Py_ssize_t] info = parse_buffer_argument(buf, size, True) return _wrap_io_future( self._handle.pwrite( info.first, @@ -134,8 +132,8 @@ cdef class CuFile: ) ) - def read(self, buf, size: Optional[int], file_offset: int, dev_offset: int) -> int: - cdef pair[uintptr_t, size_t] info = parse_buffer_argument(buf, size, False) + def read(self, buf, size: Py_ssize_t = -1, file_offset: Py_ssize_t = 0, dev_offset: Py_ssize_t = 0) -> int: + cdef pair[uintptr_t, Py_ssize_t] info = parse_buffer_argument(buf, size, False) return self._handle.read( info.first, info.second, @@ -143,8 +141,8 @@ cdef class CuFile: dev_offset, ) - def write(self, buf, size: Optional[int], file_offset: int, dev_offset: int) -> int: - cdef pair[uintptr_t, size_t] info = parse_buffer_argument(buf, size, False) + def write(self, buf, size: Py_ssize_t = -1, file_offset: Py_ssize_t = 0, dev_offset: Py_ssize_t =0) -> int: + cdef pair[uintptr_t, Py_ssize_t] info = parse_buffer_argument(buf, size, False) return self._handle.write( info.first, info.second, @@ -152,10 +150,10 @@ cdef class CuFile: dev_offset, ) - def read_async(self, buf, size: Optional[int], file_offset: int, dev_offset: int, - st: uintptr_t) -> IOFutureStream: + def read_async(self, buf, size: Py_ssize_t = -1, file_offset: Py_ssize_t = 0, dev_offset: Py_ssize_t = 0, + st: uintptr_t = 0) -> IOFutureStream: stream = st - cdef pair[uintptr_t, size_t] info = parse_buffer_argument(buf, size, False) + cdef pair[uintptr_t, Py_ssize_t] info = parse_buffer_argument(buf, size, False) return _wrap_stream_future(self._handle.read_async( info.first, info.second, @@ -164,10 +162,10 @@ cdef class CuFile: stream, )) - def write_async(self, buf, size: Optional[int], file_offset: int, dev_offset: int, - st: uintptr_t) -> IOFutureStream: + def write_async(self, buf, size: Py_ssize_t = -1, file_offset: Py_ssize_t = 0, dev_offset: Py_ssize_t = 0, + st: uintptr_t = 0) -> IOFutureStream: stream = st - cdef pair[uintptr_t, size_t] info = parse_buffer_argument(buf, size, False) + cdef pair[uintptr_t, Py_ssize_t] info = parse_buffer_argument(buf, size, False) return _wrap_stream_future(self._handle.write_async( info.first, info.second, diff --git a/python/kvikio/kvikio/_lib/remote_handle.pxd b/python/kvikio/kvikio/_lib/remote_handle.pxd new file mode 100644 index 0000000000..f3e6b77698 --- /dev/null +++ b/python/kvikio/kvikio/_lib/remote_handle.pxd @@ -0,0 +1,27 @@ +# Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved. +# See file LICENSE for terms. + +# distutils: language = c++ +# cython: language_level=3 + +from cython cimport Py_ssize_t +from libcpp.memory cimport unique_ptr + +from kvikio._lib.future cimport IOFuture + + +cdef class RemoteFile: + cdef unique_ptr[cpp_RemoteHandle] _handle + + @classmethod + cpdef open_http( + cls, + url: str, + nbytes: Py_ssize_t = -1, + ) -> RemoteFile + + cpdef nbytes(self) -> Py_ssize_t + + cpdef read(self, buf, size: Py_ssize_t = -1, file_offset: Py_ssize_t = 0) -> Py_ssize_t + + cpdef pread(self, buf, size: Py_ssize_t = -1, file_offset: Py_ssize_t = 0) -> IOFuture diff --git a/python/kvikio/kvikio/_lib/remote_handle.pyx b/python/kvikio/kvikio/_lib/remote_handle.pyx index 93c6ac398a..46a8cfc64d 100644 --- a/python/kvikio/kvikio/_lib/remote_handle.pyx +++ b/python/kvikio/kvikio/_lib/remote_handle.pyx @@ -4,8 +4,7 @@ # distutils: language = c++ # cython: language_level=3 -from typing import Optional - +from cython cimport Py_ssize_t from cython.operator cimport dereference as deref from libc.stdint cimport uintptr_t from libcpp.memory cimport make_unique, unique_ptr @@ -28,7 +27,7 @@ cdef extern from "" nogil: unique_ptr[cpp_RemoteEndpoint] endpoint, size_t nbytes ) except + cpp_RemoteHandle(unique_ptr[cpp_RemoteEndpoint] endpoint) except + - int nbytes() except + + size_t nbytes() except + size_t read( void* buf, size_t size, @@ -50,38 +49,35 @@ cdef string _to_string(str s): cdef class RemoteFile: - cdef unique_ptr[cpp_RemoteHandle] _handle - @classmethod - def open_http( + cpdef open_http( cls, url: str, - nbytes: Optional[int], - ): + nbytes: Py_ssize_t = -1, + ) -> RemoteFile: cdef RemoteFile ret = RemoteFile() cdef unique_ptr[cpp_HttpEndpoint] ep = make_unique[cpp_HttpEndpoint]( _to_string(url) ) - if nbytes is None: + if nbytes >= 0: ret._handle = make_unique[cpp_RemoteHandle](move(ep)) return ret - cdef size_t n = nbytes - ret._handle = make_unique[cpp_RemoteHandle](move(ep), n) + ret._handle = make_unique[cpp_RemoteHandle](move(ep), nbytes) return ret - def nbytes(self) -> int: + cpdef nbytes(self) -> Py_ssize_t: return deref(self._handle).nbytes() - def read(self, buf, size: Optional[int], file_offset: int) -> int: - cdef pair[uintptr_t, size_t] info = parse_buffer_argument(buf, size, True) + cpdef read(self, buf, size: Py_ssize_t = -1, file_offset: Py_ssize_t = 0) -> Py_ssize_t: + cdef pair[uintptr_t, Py_ssize_t] info = parse_buffer_argument(buf, size, True) return deref(self._handle).read( info.first, info.second, file_offset, ) - def pread(self, buf, size: Optional[int], file_offset: int) -> IOFuture: - cdef pair[uintptr_t, size_t] info = parse_buffer_argument(buf, size, True) + cpdef pread(self, buf, size: Py_ssize_t = -1, file_offset: Py_ssize_t = 0) -> IOFuture: + cdef pair[uintptr_t, Py_ssize_t] info = parse_buffer_argument(buf, size, True) return _wrap_io_future( deref(self._handle).pread( info.first, diff --git a/python/kvikio/kvikio/remote_file.py b/python/kvikio/kvikio/remote_file.py index 52bbe8010f..b107876124 100644 --- a/python/kvikio/kvikio/remote_file.py +++ b/python/kvikio/kvikio/remote_file.py @@ -4,7 +4,6 @@ from __future__ import annotations import functools -from typing import Optional from kvikio.cufile import IOFuture @@ -54,7 +53,7 @@ def __init__(self, handle): def open_http( cls, url: str, - nbytes: Optional[int] = None, + nbytes: int = -1, ) -> RemoteFile: """Open a http file. @@ -89,7 +88,7 @@ def nbytes(self) -> int: """ return self._handle.nbytes() - def read(self, buf, size: Optional[int] = None, file_offset: int = 0) -> int: + def read(self, buf, size: int = -1, file_offset: int = 0) -> int: """Read from remote source into buffer (host or device memory) in parallel. Parameters @@ -107,7 +106,7 @@ def read(self, buf, size: Optional[int] = None, file_offset: int = 0) -> int: """ return self.pread(buf, size, file_offset).get() - def pread(self, buf, size: Optional[int] = None, file_offset: int = 0) -> IOFuture: + def pread(self, buf, size: int = -1, file_offset: int = 0) -> IOFuture: """Read from remote source into buffer (host or device memory) in parallel. Parameters