diff --git a/python/kvikio/kvikio/_lib/arr.pxd b/python/kvikio/kvikio/_lib/arr.pxd index 47bad21a3b..88b711dac7 100644 --- a/python/kvikio/kvikio/_lib/arr.pxd +++ b/python/kvikio/kvikio/_lib/arr.pxd @@ -1,12 +1,10 @@ # Copyright (c) 2020-2024, NVIDIA CORPORATION. All rights reserved. # See file LICENSE for terms. -# distutils: language = c++ # cython: language_level=3 from libc.stdint cimport uintptr_t -from libcpp.utility cimport pair cdef class Array: @@ -31,6 +29,11 @@ cdef class Array: cpdef Array asarray(obj) -cdef pair[uintptr_t, size_t] parse_buffer_argument( - buf, size, bint accept_host_buffer +cdef struct mem_ptr_nbytes: + uintptr_t ptr + Py_ssize_t nbytes + + +cdef mem_ptr_nbytes parse_buffer_argument( + buf, Py_ssize_t nbytes, bint accept_host_buffer ) except * diff --git a/python/kvikio/kvikio/_lib/arr.pyx b/python/kvikio/kvikio/_lib/arr.pyx index 45c7430313..10a177eebf 100644 --- a/python/kvikio/kvikio/_lib/arr.pyx +++ b/python/kvikio/kvikio/_lib/arr.pyx @@ -305,22 +305,21 @@ cpdef Array asarray(obj): return Array(obj) -cdef pair[uintptr_t, size_t] parse_buffer_argument( - buf, size, bint accept_host_buffer +cdef mem_ptr_nbytes parse_buffer_argument( + buf, Py_ssize_t nbytes, bint accept_host_buffer ) except *: """Parse `buf` and `size` argument and return a pointer and nbytes""" - if not isinstance(buf, Array): - buf = Array(buf) - cdef Array arr = buf + cdef Array arr = asarray(buf) + if not arr._contiguous(): raise ValueError("Array must be contiguous") if not accept_host_buffer and not arr.cuda: raise ValueError("Non-CUDA buffers not supported") - cdef size_t nbytes - if size is None: - nbytes = arr.nbytes - elif size > arr.nbytes: + + cdef Py_ssize_t arr_nbytes = arr._nbytes() + if nbytes < 0: + nbytes = arr_nbytes + elif nbytes > arr_nbytes: raise ValueError("Size is greater than the size of the buffer") - else: - nbytes = size - return pair[uintptr_t, size_t](arr.ptr, nbytes) + + return mem_ptr_nbytes(ptr=arr.ptr, nbytes=nbytes) diff --git a/python/kvikio/kvikio/_lib/file_handle.pxd b/python/kvikio/kvikio/_lib/file_handle.pxd new file mode 100644 index 0000000000..e7f4c425a6 --- /dev/null +++ b/python/kvikio/kvikio/_lib/file_handle.pxd @@ -0,0 +1,101 @@ +# Copyright (c) 2021-2024, NVIDIA CORPORATION. All rights reserved. +# See file LICENSE for terms. + +# distutils: language = c++ +# cython: language_level=3 + +from posix cimport fcntl + +from libc.stdint cimport uintptr_t +from libcpp cimport bool as cpp_bool +from libcpp.string cimport string + +from kvikio._lib import defaults +from kvikio._lib.future cimport IOFuture, IOFutureStream, cpp_StreamFuture, future + +ctypedef int c_int + + +cdef extern from "cuda.h": + ctypedef void* CUstream + + +cdef extern from "" namespace "kvikio" nogil: + cdef cppclass FileHandle: + FileHandle() except + + FileHandle(c_int fd) except + + FileHandle( + string file_path, + string flags, + ) except + + FileHandle( + string file_path, + string flags, + fcntl.mode_t mode + ) except + + void close() + cpp_bool closed() + c_int fd() + c_int fd_open_flags() except + + future[size_t] pread( + void* devPtr, + size_t size, + size_t file_offset, + size_t task_size + ) except + + future[size_t] pwrite( + void* devPtr, + size_t size, + size_t file_offset, + size_t task_size + ) except + + size_t read( + void* devPtr_base, + size_t size, + size_t file_offset, + size_t devPtr_offset + ) except + + size_t write( + void* devPtr_base, + size_t size, + size_t file_offset, + size_t devPtr_offset + ) except + + cpp_StreamFuture read_async( + void* devPtr_base, + size_t size, + size_t file_offset, + size_t devPtr_offset, + CUstream stream + ) except + + cpp_StreamFuture write_async( + void* devPtr_base, + size_t size, + size_t file_offset, + size_t devPtr_offset, + CUstream stream + ) except + + + +cdef class CuFile: + """File handle for GPUDirect Storage (GDS)""" + cdef FileHandle _handle + + cpdef close(self) + cpdef cpp_bool closed(self) + cpdef c_int fileno(self) + cpdef c_int open_flags(self) + cpdef IOFuture pread(self, buf, Py_ssize_t size=*, Py_ssize_t file_offset=*, + Py_ssize_t task_size=*) + cpdef IOFuture pwrite(self, buf, Py_ssize_t size=*, Py_ssize_t file_offset=*, + Py_ssize_t task_size=*) + cpdef Py_ssize_t read(self, buf, Py_ssize_t size=*, Py_ssize_t file_offset=*, + Py_ssize_t dev_offset=*) + cpdef Py_ssize_t write(self, buf, Py_ssize_t size=*, Py_ssize_t file_offset=*, + Py_ssize_t dev_offset=*) + cpdef IOFutureStream read_async(self, buf, Py_ssize_t size=*, + Py_ssize_t file_offset=*, Py_ssize_t dev_offset=*, + uintptr_t stream=*) + cpdef IOFutureStream write_async(self, buf, Py_ssize_t size=*, + Py_ssize_t file_offset=*, Py_ssize_t dev_offset=*, + uintptr_t stream=*) diff --git a/python/kvikio/kvikio/_lib/file_handle.pyx b/python/kvikio/kvikio/_lib/file_handle.pyx index 7a8de368ef..e3b0c18fc6 100644 --- a/python/kvikio/kvikio/_lib/file_handle.pyx +++ b/python/kvikio/kvikio/_lib/file_handle.pyx @@ -5,93 +5,23 @@ # cython: language_level=3 import pathlib -from typing import Optional - -from posix cimport fcntl from libc.stdint cimport uintptr_t -from libcpp cimport bool -from libcpp.string cimport string -from libcpp.utility cimport move, pair +from libcpp cimport bool as cpp_bool +from libcpp.utility cimport move -from kvikio._lib.arr cimport parse_buffer_argument +from kvikio._lib.arr cimport mem_ptr_nbytes, parse_buffer_argument from kvikio._lib.future cimport ( IOFuture, IOFutureStream, _wrap_io_future, _wrap_stream_future, - cpp_StreamFuture, - future, ) from kvikio._lib import defaults -cdef extern from "cuda.h": - ctypedef void* CUstream - - -cdef extern from "" namespace "kvikio" nogil: - cdef cppclass FileHandle: - FileHandle() except + - FileHandle(int fd) except + - FileHandle( - string file_path, - string flags, - ) except + - FileHandle( - string file_path, - string flags, - fcntl.mode_t mode - ) except + - void close() - bool closed() - int fd() - int fd_open_flags() except + - future[size_t] pread( - void* devPtr, - size_t size, - size_t file_offset, - size_t task_size - ) except + - future[size_t] pwrite( - void* devPtr, - size_t size, - size_t file_offset, - size_t task_size - ) except + - size_t read( - void* devPtr_base, - size_t size, - size_t file_offset, - size_t devPtr_offset - ) except + - size_t write( - void* devPtr_base, - size_t size, - size_t file_offset, - size_t devPtr_offset - ) except + - cpp_StreamFuture read_async( - void* devPtr_base, - size_t size, - size_t file_offset, - size_t devPtr_offset, - CUstream stream - ) except + - cpp_StreamFuture write_async( - void* devPtr_base, - size_t size, - size_t file_offset, - size_t devPtr_offset, - CUstream stream - ) except + - - cdef class CuFile: - """File handle for GPUDirect Storage (GDS)""" - cdef FileHandle _handle - def __init__(self, file_path, flags="r"): self._handle = move( FileHandle( @@ -100,78 +30,83 @@ cdef class CuFile: ) ) - def close(self) -> None: + cpdef close(self): self._handle.close() - def closed(self) -> bool: + cpdef cpp_bool closed(self): return self._handle.closed() - def fileno(self) -> int: + cpdef c_int fileno(self): return self._handle.fd() - def open_flags(self) -> int: + cpdef c_int open_flags(self): return self._handle.fd_open_flags() - def pread(self, buf, size: Optional[int], file_offset: int, task_size) -> IOFuture: - cdef pair[uintptr_t, size_t] info = parse_buffer_argument(buf, size, True) + cpdef IOFuture pread(self, buf, Py_ssize_t size=-1, Py_ssize_t file_offset=0, + Py_ssize_t task_size=0): + + cdef mem_ptr_nbytes info = parse_buffer_argument(buf, size, True) return _wrap_io_future( self._handle.pread( - info.first, - info.second, + info.ptr, + info.nbytes, file_offset, task_size if task_size else defaults.task_size() ) ) - def pwrite(self, buf, size: Optional[int], file_offset: int, task_size) -> IOFuture: - cdef pair[uintptr_t, size_t] info = parse_buffer_argument(buf, size, True) + cpdef IOFuture pwrite(self, buf, Py_ssize_t size=-1, Py_ssize_t file_offset=0, + Py_ssize_t task_size=0): + cdef mem_ptr_nbytes info = parse_buffer_argument(buf, size, True) return _wrap_io_future( self._handle.pwrite( - info.first, - info.second, + info.ptr, + info.nbytes, file_offset, task_size if task_size else defaults.task_size() ) ) - def read(self, buf, size: Optional[int], file_offset: int, dev_offset: int) -> int: - cdef pair[uintptr_t, size_t] info = parse_buffer_argument(buf, size, False) + cpdef Py_ssize_t read(self, buf, Py_ssize_t size=-1, Py_ssize_t file_offset=0, + Py_ssize_t dev_offset=0): + cdef mem_ptr_nbytes info = parse_buffer_argument(buf, size, False) return self._handle.read( - info.first, - info.second, + info.ptr, + info.nbytes, file_offset, dev_offset, ) - def write(self, buf, size: Optional[int], file_offset: int, dev_offset: int) -> int: - cdef pair[uintptr_t, size_t] info = parse_buffer_argument(buf, size, False) + cpdef Py_ssize_t write(self, buf, Py_ssize_t size=-1, Py_ssize_t file_offset=0, + Py_ssize_t dev_offset=0): + cdef mem_ptr_nbytes info = parse_buffer_argument(buf, size, False) return self._handle.write( - info.first, - info.second, + info.ptr, + info.nbytes, file_offset, dev_offset, ) - def read_async(self, buf, size: Optional[int], file_offset: int, dev_offset: int, - st: uintptr_t) -> IOFutureStream: - stream = st - cdef pair[uintptr_t, size_t] info = parse_buffer_argument(buf, size, False) + cpdef IOFutureStream read_async(self, buf, Py_ssize_t size=-1, + Py_ssize_t file_offset=0, Py_ssize_t dev_offset=0, + uintptr_t stream=0): + cdef mem_ptr_nbytes info = parse_buffer_argument(buf, size, False) return _wrap_stream_future(self._handle.read_async( - info.first, - info.second, + info.ptr, + info.nbytes, file_offset, dev_offset, - stream, + stream, )) - def write_async(self, buf, size: Optional[int], file_offset: int, dev_offset: int, - st: uintptr_t) -> IOFutureStream: - stream = st - cdef pair[uintptr_t, size_t] info = parse_buffer_argument(buf, size, False) + cpdef IOFutureStream write_async(self, buf, Py_ssize_t size=-1, + Py_ssize_t file_offset=0, Py_ssize_t dev_offset=0, + uintptr_t stream=0): + cdef mem_ptr_nbytes info = parse_buffer_argument(buf, size, False) return _wrap_stream_future(self._handle.write_async( - info.first, - info.second, + info.ptr, + info.nbytes, file_offset, dev_offset, - stream, + stream, )) diff --git a/python/kvikio/kvikio/_lib/remote_handle.pxd b/python/kvikio/kvikio/_lib/remote_handle.pxd new file mode 100644 index 0000000000..26f6e23b57 --- /dev/null +++ b/python/kvikio/kvikio/_lib/remote_handle.pxd @@ -0,0 +1,43 @@ +# Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved. +# See file LICENSE for terms. + +# distutils: language = c++ +# cython: language_level=3 + +from libcpp.memory cimport unique_ptr + +from kvikio._lib.future cimport IOFuture, future + + +cdef extern from "" nogil: + cdef cppclass cpp_RemoteEndpoint "kvikio::RemoteEndpoint": + pass + + cdef cppclass cpp_RemoteHandle "kvikio::RemoteHandle": + cpp_RemoteHandle( + unique_ptr[cpp_RemoteEndpoint] endpoint, size_t nbytes + ) except + + cpp_RemoteHandle(unique_ptr[cpp_RemoteEndpoint] endpoint) except + + size_t nbytes() except + + size_t read( + void* buf, + size_t size, + size_t file_offset + ) except + + future[size_t] pread( + void* buf, + size_t size, + size_t file_offset + ) except + + + +cdef class RemoteFile: + cdef unique_ptr[cpp_RemoteHandle] _handle + + @classmethod + cpdef RemoteFile open_http(cls, str url, Py_ssize_t nbytes=*) + + cpdef Py_ssize_t nbytes(self) + + cpdef Py_ssize_t read(self, buf, Py_ssize_t size=*, Py_ssize_t file_offset=*) + cpdef IOFuture pread(self, buf, Py_ssize_t size=*, Py_ssize_t file_offset=*) diff --git a/python/kvikio/kvikio/_lib/remote_handle.pyx b/python/kvikio/kvikio/_lib/remote_handle.pyx index 93c6ac398a..8675dd529f 100644 --- a/python/kvikio/kvikio/_lib/remote_handle.pyx +++ b/python/kvikio/kvikio/_lib/remote_handle.pyx @@ -4,42 +4,20 @@ # distutils: language = c++ # cython: language_level=3 -from typing import Optional - +from cython cimport Py_ssize_t from cython.operator cimport dereference as deref -from libc.stdint cimport uintptr_t from libcpp.memory cimport make_unique, unique_ptr from libcpp.string cimport string -from libcpp.utility cimport move, pair +from libcpp.utility cimport move -from kvikio._lib.arr cimport parse_buffer_argument -from kvikio._lib.future cimport IOFuture, _wrap_io_future, future +from kvikio._lib.arr cimport mem_ptr_nbytes, parse_buffer_argument +from kvikio._lib.future cimport IOFuture, _wrap_io_future cdef extern from "" nogil: - cdef cppclass cpp_RemoteEndpoint "kvikio::RemoteEndpoint": - pass - cdef cppclass cpp_HttpEndpoint "kvikio::HttpEndpoint": cpp_HttpEndpoint(string url) except + - cdef cppclass cpp_RemoteHandle "kvikio::RemoteHandle": - cpp_RemoteHandle( - unique_ptr[cpp_RemoteEndpoint] endpoint, size_t nbytes - ) except + - cpp_RemoteHandle(unique_ptr[cpp_RemoteEndpoint] endpoint) except + - int nbytes() except + - size_t read( - void* buf, - size_t size, - size_t file_offset - ) except + - future[size_t] pread( - void* buf, - size_t size, - size_t file_offset - ) except + - cdef string _to_string(str s): """Convert Python object to a C++ string (if None, return the empty string)""" @@ -50,42 +28,35 @@ cdef string _to_string(str s): cdef class RemoteFile: - cdef unique_ptr[cpp_RemoteHandle] _handle - @classmethod - def open_http( - cls, - url: str, - nbytes: Optional[int], - ): + cpdef RemoteFile open_http(cls, str url, Py_ssize_t nbytes=-1): cdef RemoteFile ret = RemoteFile() cdef unique_ptr[cpp_HttpEndpoint] ep = make_unique[cpp_HttpEndpoint]( _to_string(url) ) - if nbytes is None: + if nbytes >= 0: ret._handle = make_unique[cpp_RemoteHandle](move(ep)) return ret - cdef size_t n = nbytes - ret._handle = make_unique[cpp_RemoteHandle](move(ep), n) + ret._handle = make_unique[cpp_RemoteHandle](move(ep), nbytes) return ret - def nbytes(self) -> int: + cpdef Py_ssize_t nbytes(self): return deref(self._handle).nbytes() - def read(self, buf, size: Optional[int], file_offset: int) -> int: - cdef pair[uintptr_t, size_t] info = parse_buffer_argument(buf, size, True) + cpdef Py_ssize_t read(self, buf, Py_ssize_t size=-1, Py_ssize_t file_offset=0): + cdef mem_ptr_nbytes info = parse_buffer_argument(buf, size, True) return deref(self._handle).read( - info.first, - info.second, + info.ptr, + info.nbytes, file_offset, ) - def pread(self, buf, size: Optional[int], file_offset: int) -> IOFuture: - cdef pair[uintptr_t, size_t] info = parse_buffer_argument(buf, size, True) + cpdef IOFuture pread(self, buf, Py_ssize_t size=-1, Py_ssize_t file_offset=0): + cdef mem_ptr_nbytes info = parse_buffer_argument(buf, size, True) return _wrap_io_future( deref(self._handle).pread( - info.first, - info.second, + info.ptr, + info.nbytes, file_offset, ) ) diff --git a/python/kvikio/kvikio/remote_file.py b/python/kvikio/kvikio/remote_file.py index 52bbe8010f..b107876124 100644 --- a/python/kvikio/kvikio/remote_file.py +++ b/python/kvikio/kvikio/remote_file.py @@ -4,7 +4,6 @@ from __future__ import annotations import functools -from typing import Optional from kvikio.cufile import IOFuture @@ -54,7 +53,7 @@ def __init__(self, handle): def open_http( cls, url: str, - nbytes: Optional[int] = None, + nbytes: int = -1, ) -> RemoteFile: """Open a http file. @@ -89,7 +88,7 @@ def nbytes(self) -> int: """ return self._handle.nbytes() - def read(self, buf, size: Optional[int] = None, file_offset: int = 0) -> int: + def read(self, buf, size: int = -1, file_offset: int = 0) -> int: """Read from remote source into buffer (host or device memory) in parallel. Parameters @@ -107,7 +106,7 @@ def read(self, buf, size: Optional[int] = None, file_offset: int = 0) -> int: """ return self.pread(buf, size, file_offset).get() - def pread(self, buf, size: Optional[int] = None, file_offset: int = 0) -> IOFuture: + def pread(self, buf, size: int = -1, file_offset: int = 0) -> IOFuture: """Read from remote source into buffer (host or device memory) in parallel. Parameters