From 562abd5fd13ad07f446d2cc4a646cf1641fc5253 Mon Sep 17 00:00:00 2001 From: jakirkham Date: Wed, 16 Oct 2024 16:03:53 -0700 Subject: [PATCH] More thoroughly type `RemoteHandle` * Convert sizes to `Py_ssize_t`, which Python uses for this info * Type file offsets with `off_t` * Update default values to work with typing * Simplify and optimize `parse_buffer_argument` * Add `.pxd` file for `RemoteHandle` to allow leveraging it in Cython --- python/kvikio/kvikio/_lib/arr.pxd | 11 ++- python/kvikio/kvikio/_lib/arr.pyx | 23 +++--- python/kvikio/kvikio/_lib/file_handle.pxd | 44 +++++++++++ python/kvikio/kvikio/_lib/file_handle.pyx | 81 +++++++++++---------- python/kvikio/kvikio/_lib/remote_handle.pxd | 26 +++++++ python/kvikio/kvikio/_lib/remote_handle.pyx | 40 ++++------ python/kvikio/kvikio/remote_file.py | 7 +- 7 files changed, 148 insertions(+), 84 deletions(-) create mode 100644 python/kvikio/kvikio/_lib/file_handle.pxd create mode 100644 python/kvikio/kvikio/_lib/remote_handle.pxd diff --git a/python/kvikio/kvikio/_lib/arr.pxd b/python/kvikio/kvikio/_lib/arr.pxd index 47bad21a3b..88b711dac7 100644 --- a/python/kvikio/kvikio/_lib/arr.pxd +++ b/python/kvikio/kvikio/_lib/arr.pxd @@ -1,12 +1,10 @@ # Copyright (c) 2020-2024, NVIDIA CORPORATION. All rights reserved. # See file LICENSE for terms. -# distutils: language = c++ # cython: language_level=3 from libc.stdint cimport uintptr_t -from libcpp.utility cimport pair cdef class Array: @@ -31,6 +29,11 @@ cdef class Array: cpdef Array asarray(obj) -cdef pair[uintptr_t, size_t] parse_buffer_argument( - buf, size, bint accept_host_buffer +cdef struct mem_ptr_nbytes: + uintptr_t ptr + Py_ssize_t nbytes + + +cdef mem_ptr_nbytes parse_buffer_argument( + buf, Py_ssize_t nbytes, bint accept_host_buffer ) except * diff --git a/python/kvikio/kvikio/_lib/arr.pyx b/python/kvikio/kvikio/_lib/arr.pyx index 45c7430313..10a177eebf 100644 --- a/python/kvikio/kvikio/_lib/arr.pyx +++ b/python/kvikio/kvikio/_lib/arr.pyx @@ -305,22 +305,21 @@ cpdef Array asarray(obj): return Array(obj) -cdef pair[uintptr_t, size_t] parse_buffer_argument( - buf, size, bint accept_host_buffer +cdef mem_ptr_nbytes parse_buffer_argument( + buf, Py_ssize_t nbytes, bint accept_host_buffer ) except *: """Parse `buf` and `size` argument and return a pointer and nbytes""" - if not isinstance(buf, Array): - buf = Array(buf) - cdef Array arr = buf + cdef Array arr = asarray(buf) + if not arr._contiguous(): raise ValueError("Array must be contiguous") if not accept_host_buffer and not arr.cuda: raise ValueError("Non-CUDA buffers not supported") - cdef size_t nbytes - if size is None: - nbytes = arr.nbytes - elif size > arr.nbytes: + + cdef Py_ssize_t arr_nbytes = arr._nbytes() + if nbytes < 0: + nbytes = arr_nbytes + elif nbytes > arr_nbytes: raise ValueError("Size is greater than the size of the buffer") - else: - nbytes = size - return pair[uintptr_t, size_t](arr.ptr, nbytes) + + return mem_ptr_nbytes(ptr=arr.ptr, nbytes=nbytes) diff --git a/python/kvikio/kvikio/_lib/file_handle.pxd b/python/kvikio/kvikio/_lib/file_handle.pxd new file mode 100644 index 0000000000..d35aa67c41 --- /dev/null +++ b/python/kvikio/kvikio/_lib/file_handle.pxd @@ -0,0 +1,44 @@ +# Copyright (c) 2021-2024, NVIDIA CORPORATION. All rights reserved. +# See file LICENSE for terms. + +# distutils: language = c++ +# cython: language_level=3 + +from posix cimport fcntl + +from libc.stdint cimport uintptr_t +from libcpp cimport bool + +from kvikio._lib import defaults +from kvikio._lib.future cimport IOFuture, IOFutureStream + +ctypedef int c_int + + +cdef extern from "" namespace "kvikio" nogil: + cdef cppclass FileHandle: + pass + + +cdef class CuFile: + """File handle for GPUDirect Storage (GDS)""" + cdef FileHandle _handle + + cpdef close(self) + cpdef bint closed(self) + cpdef c_int fileno(self) + cpdef c_int open_flags(self) + cpdef IOFuture pread(self, buf, Py_ssize_t size=*, Py_ssize_t file_offset=*, + Py_ssize_t task_size=*) + cpdef IOFuture pwrite(self, buf, Py_ssize_t size=*, Py_ssize_t file_offset=*, + Py_ssize_t task_size=*) + cpdef Py_ssize_t read(self, buf, Py_ssize_t size=*, Py_ssize_t file_offset=*, + Py_ssize_t dev_offset=*) + cpdef Py_ssize_t write(self, buf, Py_ssize_t size=*, Py_ssize_t file_offset=*, + Py_ssize_t dev_offset=*) + cpdef IOFutureStream read_async(self, buf, Py_ssize_t size=*, + Py_ssize_t file_offset=*, Py_ssize_t dev_offset=*, + uintptr_t stream=*) + cpdef IOFutureStream write_async(self, buf, Py_ssize_t size=*, + Py_ssize_t file_offset=*, Py_ssize_t dev_offset=*, + uintptr_t stream=*) diff --git a/python/kvikio/kvikio/_lib/file_handle.pyx b/python/kvikio/kvikio/_lib/file_handle.pyx index 7a8de368ef..796f54b7a6 100644 --- a/python/kvikio/kvikio/_lib/file_handle.pyx +++ b/python/kvikio/kvikio/_lib/file_handle.pyx @@ -5,16 +5,15 @@ # cython: language_level=3 import pathlib -from typing import Optional from posix cimport fcntl from libc.stdint cimport uintptr_t from libcpp cimport bool from libcpp.string cimport string -from libcpp.utility cimport move, pair +from libcpp.utility cimport move -from kvikio._lib.arr cimport parse_buffer_argument +from kvikio._lib.arr cimport mem_ptr_nbytes, parse_buffer_argument from kvikio._lib.future cimport ( IOFuture, IOFutureStream, @@ -89,9 +88,6 @@ cdef extern from "" namespace "kvikio" nogil: cdef class CuFile: - """File handle for GPUDirect Storage (GDS)""" - cdef FileHandle _handle - def __init__(self, file_path, flags="r"): self._handle = move( FileHandle( @@ -100,78 +96,83 @@ cdef class CuFile: ) ) - def close(self) -> None: + cpdef close(self): self._handle.close() - def closed(self) -> bool: + cpdef bint closed(self): return self._handle.closed() - def fileno(self) -> int: + cpdef c_int fileno(self): return self._handle.fd() - def open_flags(self) -> int: + cpdef c_int open_flags(self): return self._handle.fd_open_flags() - def pread(self, buf, size: Optional[int], file_offset: int, task_size) -> IOFuture: - cdef pair[uintptr_t, size_t] info = parse_buffer_argument(buf, size, True) + cpdef IOFuture pread(self, buf, Py_ssize_t size=-1, Py_ssize_t file_offset=0, + Py_ssize_t task_size=0): + + cdef mem_ptr_nbytes info = parse_buffer_argument(buf, size, True) return _wrap_io_future( self._handle.pread( - info.first, - info.second, + info.ptr, + info.nbytes, file_offset, task_size if task_size else defaults.task_size() ) ) - def pwrite(self, buf, size: Optional[int], file_offset: int, task_size) -> IOFuture: - cdef pair[uintptr_t, size_t] info = parse_buffer_argument(buf, size, True) + cpdef IOFuture pwrite(self, buf, Py_ssize_t size=-1, Py_ssize_t file_offset=0, + Py_ssize_t task_size=0): + cdef mem_ptr_nbytes info = parse_buffer_argument(buf, size, True) return _wrap_io_future( self._handle.pwrite( - info.first, - info.second, + info.ptr, + info.nbytes, file_offset, task_size if task_size else defaults.task_size() ) ) - def read(self, buf, size: Optional[int], file_offset: int, dev_offset: int) -> int: - cdef pair[uintptr_t, size_t] info = parse_buffer_argument(buf, size, False) + cpdef Py_ssize_t read(self, buf, Py_ssize_t size=-1, Py_ssize_t file_offset=0, + Py_ssize_t dev_offset=0): + cdef mem_ptr_nbytes info = parse_buffer_argument(buf, size, False) return self._handle.read( - info.first, - info.second, + info.ptr, + info.nbytes, file_offset, dev_offset, ) - def write(self, buf, size: Optional[int], file_offset: int, dev_offset: int) -> int: - cdef pair[uintptr_t, size_t] info = parse_buffer_argument(buf, size, False) + cpdef Py_ssize_t write(self, buf, Py_ssize_t size=-1, Py_ssize_t file_offset=0, + Py_ssize_t dev_offset=0): + cdef mem_ptr_nbytes info = parse_buffer_argument(buf, size, False) return self._handle.write( - info.first, - info.second, + info.ptr, + info.nbytes, file_offset, dev_offset, ) - def read_async(self, buf, size: Optional[int], file_offset: int, dev_offset: int, - st: uintptr_t) -> IOFutureStream: - stream = st - cdef pair[uintptr_t, size_t] info = parse_buffer_argument(buf, size, False) + cpdef IOFutureStream read_async(self, buf, Py_ssize_t size=-1, + Py_ssize_t file_offset=0, Py_ssize_t dev_offset=0, + uintptr_t stream=0): + cdef mem_ptr_nbytes info = parse_buffer_argument(buf, size, False) return _wrap_stream_future(self._handle.read_async( - info.first, - info.second, + info.ptr, + info.nbytes, file_offset, dev_offset, - stream, + stream, )) - def write_async(self, buf, size: Optional[int], file_offset: int, dev_offset: int, - st: uintptr_t) -> IOFutureStream: - stream = st - cdef pair[uintptr_t, size_t] info = parse_buffer_argument(buf, size, False) + cpdef IOFutureStream write_async(self, buf, Py_ssize_t size=-1, + Py_ssize_t file_offset=0, Py_ssize_t dev_offset=0, + uintptr_t stream=0): + cdef mem_ptr_nbytes info = parse_buffer_argument(buf, size, False) return _wrap_stream_future(self._handle.write_async( - info.first, - info.second, + info.ptr, + info.nbytes, file_offset, dev_offset, - stream, + stream, )) diff --git a/python/kvikio/kvikio/_lib/remote_handle.pxd b/python/kvikio/kvikio/_lib/remote_handle.pxd new file mode 100644 index 0000000000..59cde1f0e2 --- /dev/null +++ b/python/kvikio/kvikio/_lib/remote_handle.pxd @@ -0,0 +1,26 @@ +# Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved. +# See file LICENSE for terms. + +# distutils: language = c++ +# cython: language_level=3 + +from libcpp.memory cimport unique_ptr + +from kvikio._lib.future cimport IOFuture + + +cdef extern from "" nogil: + cdef cppclass cpp_RemoteEndpoint "kvikio::RemoteEndpoint": + pass + + +cdef class RemoteFile: + cdef unique_ptr[cpp_RemoteHandle] _handle + + @classmethod + cpdef RemoteFile open_http(cls, str url, Py_ssize_t nbytes=*) + + cpdef Py_ssize_t nbytes(self) + + cpdef Py_ssize_t read(self, buf, Py_ssize_t size=*, Py_ssize_t file_offset=*) + cpdef IOFuture pread(self, buf, Py_ssize_t size=*, Py_ssize_t file_offset=*) diff --git a/python/kvikio/kvikio/_lib/remote_handle.pyx b/python/kvikio/kvikio/_lib/remote_handle.pyx index 93c6ac398a..49aa189000 100644 --- a/python/kvikio/kvikio/_lib/remote_handle.pyx +++ b/python/kvikio/kvikio/_lib/remote_handle.pyx @@ -4,15 +4,14 @@ # distutils: language = c++ # cython: language_level=3 -from typing import Optional - +from cython cimport Py_ssize_t from cython.operator cimport dereference as deref from libc.stdint cimport uintptr_t from libcpp.memory cimport make_unique, unique_ptr from libcpp.string cimport string -from libcpp.utility cimport move, pair +from libcpp.utility cimport move -from kvikio._lib.arr cimport parse_buffer_argument +from kvikio._lib.arr cimport mem_ptr_nbytes, parse_buffer_argument from kvikio._lib.future cimport IOFuture, _wrap_io_future, future @@ -28,7 +27,7 @@ cdef extern from "" nogil: unique_ptr[cpp_RemoteEndpoint] endpoint, size_t nbytes ) except + cpp_RemoteHandle(unique_ptr[cpp_RemoteEndpoint] endpoint) except + - int nbytes() except + + size_t nbytes() except + size_t read( void* buf, size_t size, @@ -50,42 +49,35 @@ cdef string _to_string(str s): cdef class RemoteFile: - cdef unique_ptr[cpp_RemoteHandle] _handle - @classmethod - def open_http( - cls, - url: str, - nbytes: Optional[int], - ): + cpdef RemoteFile open_http(cls, str url, Py_ssize_t nbytes=-1): cdef RemoteFile ret = RemoteFile() cdef unique_ptr[cpp_HttpEndpoint] ep = make_unique[cpp_HttpEndpoint]( _to_string(url) ) - if nbytes is None: + if nbytes >= 0: ret._handle = make_unique[cpp_RemoteHandle](move(ep)) return ret - cdef size_t n = nbytes - ret._handle = make_unique[cpp_RemoteHandle](move(ep), n) + ret._handle = make_unique[cpp_RemoteHandle](move(ep), nbytes) return ret - def nbytes(self) -> int: + cpdef Py_ssize_t nbytes(self): return deref(self._handle).nbytes() - def read(self, buf, size: Optional[int], file_offset: int) -> int: - cdef pair[uintptr_t, size_t] info = parse_buffer_argument(buf, size, True) + cpdef Py_ssize_t read(self, buf, Py_ssize_t size=-1, Py_ssize_t file_offset=0): + cdef mem_ptr_nbytes info = parse_buffer_argument(buf, size, True) return deref(self._handle).read( - info.first, - info.second, + info.ptr, + info.nbytes, file_offset, ) - def pread(self, buf, size: Optional[int], file_offset: int) -> IOFuture: - cdef pair[uintptr_t, size_t] info = parse_buffer_argument(buf, size, True) + cpdef IOFuture pread(self, buf, Py_ssize_t size=-1, Py_ssize_t file_offset=0): + cdef mem_ptr_nbytes info = parse_buffer_argument(buf, size, True) return _wrap_io_future( deref(self._handle).pread( - info.first, - info.second, + info.ptr, + info.nbytes, file_offset, ) ) diff --git a/python/kvikio/kvikio/remote_file.py b/python/kvikio/kvikio/remote_file.py index 52bbe8010f..b107876124 100644 --- a/python/kvikio/kvikio/remote_file.py +++ b/python/kvikio/kvikio/remote_file.py @@ -4,7 +4,6 @@ from __future__ import annotations import functools -from typing import Optional from kvikio.cufile import IOFuture @@ -54,7 +53,7 @@ def __init__(self, handle): def open_http( cls, url: str, - nbytes: Optional[int] = None, + nbytes: int = -1, ) -> RemoteFile: """Open a http file. @@ -89,7 +88,7 @@ def nbytes(self) -> int: """ return self._handle.nbytes() - def read(self, buf, size: Optional[int] = None, file_offset: int = 0) -> int: + def read(self, buf, size: int = -1, file_offset: int = 0) -> int: """Read from remote source into buffer (host or device memory) in parallel. Parameters @@ -107,7 +106,7 @@ def read(self, buf, size: Optional[int] = None, file_offset: int = 0) -> int: """ return self.pread(buf, size, file_offset).get() - def pread(self, buf, size: Optional[int] = None, file_offset: int = 0) -> IOFuture: + def pread(self, buf, size: int = -1, file_offset: int = 0) -> IOFuture: """Read from remote source into buffer (host or device memory) in parallel. Parameters