Skip to content

Commit

Permalink
More thoroughly type RemoteHandle
Browse files Browse the repository at this point in the history
* Convert sizes to `Py_ssize_t`, which Python uses for this info
* Type file offsets with `off_t`
* Update default values to work with typing
* Simplify and optimize `parse_buffer_argument`
* Add `.pxd` file for `RemoteHandle` to allow leveraging it in Cython
  • Loading branch information
jakirkham committed Oct 16, 2024
1 parent a34d6bf commit 73438a7
Show file tree
Hide file tree
Showing 7 changed files with 136 additions and 62 deletions.
4 changes: 2 additions & 2 deletions python/kvikio/kvikio/_lib/arr.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,6 @@ cdef class Array:
cpdef Array asarray(obj)


cdef pair[uintptr_t, size_t] parse_buffer_argument(
buf, size, bint accept_host_buffer
cdef pair[uintptr_t, Py_ssize_t] parse_buffer_argument(
buf, Py_ssize_t size, bint accept_host_buffer
) except *
25 changes: 13 additions & 12 deletions python/kvikio/kvikio/_lib/arr.pyx
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Copyright (c) 2020-2024, NVIDIA CORPORATION. All rights reserved.
# See file LICENSE for terms.

# distutils: language = c++
# cython: language_level=3


Expand All @@ -13,6 +14,7 @@ from cpython.tuple cimport PyTuple_New, PyTuple_SET_ITEM
from cython cimport auto_pickle, boundscheck, initializedcheck, nonecheck, wraparound
from libc.stdint cimport uintptr_t
from libc.string cimport memcpy
from libcpp.utility cimport pair

try:
from numpy import dtype as numpy_dtype
Expand Down Expand Up @@ -305,22 +307,21 @@ cpdef Array asarray(obj):
return Array(obj)


cdef pair[uintptr_t, size_t] parse_buffer_argument(
buf, size, bint accept_host_buffer
cdef pair[uintptr_t, Py_ssize_t] parse_buffer_argument(
buf, Py_ssize_t nbytes, bint accept_host_buffer
) except *:
"""Parse `buf` and `size` argument and return a pointer and nbytes"""
if not isinstance(buf, Array):
buf = Array(buf)
cdef Array arr = buf
cdef Array arr = asarray(buf)

if not arr._contiguous():
raise ValueError("Array must be contiguous")
if not accept_host_buffer and not arr.cuda:
raise ValueError("Non-CUDA buffers not supported")
cdef size_t nbytes
if size is None:
nbytes = arr.nbytes
elif size > arr.nbytes:

cdef Py_ssize_t arr_nbytes = arr._nbytes()
if nbytes < 0:
nbytes = arr_nbytes
elif nbytes > arr_nbytes:
raise ValueError("Size is greater than the size of the buffer")
else:
nbytes = size
return pair[uintptr_t, size_t](arr.ptr, nbytes)

return pair(arr.ptr, nbytes)
53 changes: 53 additions & 0 deletions python/kvikio/kvikio/_lib/file_handle.pxd
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
# Copyright (c) 2021-2024, NVIDIA CORPORATION. All rights reserved.
# See file LICENSE for terms.

# distutils: language = c++
# cython: language_level=3

import pathlib
from typing import Optional

from posix cimport fcntl

from libc.stdint cimport uintptr_t
from libcpp cimport bool
from libcpp.string cimport string
from libcpp.utility cimport move, pair

from kvikio._lib.arr cimport parse_buffer_argument
from kvikio._lib.future cimport (
IOFuture,
IOFutureStream,
_wrap_io_future,
_wrap_stream_future,
cpp_StreamFuture,
future,
)

from kvikio._lib import defaults

ctypedef int c_int


cdef class CuFile:
"""File handle for GPUDirect Storage (GDS)"""
cdef FileHandle _handle

cpdef close(self)
cpdef bint closed(self)
cpdef c_int fileno(self)
cpdef c_int open_flags(self)
cpdef IOFuture pread(self, buf, Py_ssize_t size=*, Py_ssize_t file_offset=*,
Py_ssize_t task_size=*)
cpdef IOFuture pwrite(self, buf, Py_ssize_t size=*, Py_ssize_t file_offset=*,
Py_ssize_t task_size=*)
cpdef Py_ssize_t read(self, buf, Py_ssize_t size=*, Py_ssize_t file_offset=*,
Py_ssize_t dev_offset=*)
cpdef Py_ssize_t write(self, buf, Py_ssize_t size=*, Py_ssize_t file_offset=*,
Py_ssize_t dev_offset=*)
cpdef IOFutureStream read_async(self, buf, Py_ssize_t size=*,
Py_ssize_t file_offset=*, Py_ssize_t dev_offset=*,
uintptr_t stream=*)
cpdef IOFutureStream write_async(self, buf, Py_ssize_t size=*,
Py_ssize_t file_offset=*, Py_ssize_t dev_offset=*,
uintptr_t stream=*)
54 changes: 28 additions & 26 deletions python/kvikio/kvikio/_lib/file_handle.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
# cython: language_level=3

import pathlib
from typing import Optional

from posix cimport fcntl

Expand Down Expand Up @@ -87,11 +86,10 @@ cdef extern from "<kvikio/file_handle.hpp>" namespace "kvikio" nogil:
CUstream stream
) except +

ctypedef int c_int

cdef class CuFile:
"""File handle for GPUDirect Storage (GDS)"""
cdef FileHandle _handle

cdef class CuFile:
def __init__(self, file_path, flags="r"):
self._handle = move(
FileHandle(
Expand All @@ -100,20 +98,21 @@ cdef class CuFile:
)
)

def close(self) -> None:
cpdef close(self):
self._handle.close()

def closed(self) -> bool:
cpdef bint closed(self):
return self._handle.closed()

def fileno(self) -> int:
cpdef c_int fileno(self):
return self._handle.fd()

def open_flags(self) -> int:
cpdef c_int open_flags(self):
return self._handle.fd_open_flags()

def pread(self, buf, size: Optional[int], file_offset: int, task_size) -> IOFuture:
cdef pair[uintptr_t, size_t] info = parse_buffer_argument(buf, size, True)
cpdef IOFuture pread(self, buf, Py_ssize_t size=-1, Py_ssize_t file_offset=0,
Py_ssize_t task_size=0):
cdef pair[uintptr_t, Py_ssize_t] info = parse_buffer_argument(buf, size, True)
return _wrap_io_future(
self._handle.pread(
<void*>info.first,
Expand All @@ -123,8 +122,9 @@ cdef class CuFile:
)
)

def pwrite(self, buf, size: Optional[int], file_offset: int, task_size) -> IOFuture:
cdef pair[uintptr_t, size_t] info = parse_buffer_argument(buf, size, True)
cpdef IOFuture pwrite(self, buf, Py_ssize_t size=-1, Py_ssize_t file_offset=0,
Py_ssize_t task_size=0):
cdef pair[uintptr_t, Py_ssize_t] info = parse_buffer_argument(buf, size, True)
return _wrap_io_future(
self._handle.pwrite(
<void*>info.first,
Expand All @@ -134,44 +134,46 @@ cdef class CuFile:
)
)

def read(self, buf, size: Optional[int], file_offset: int, dev_offset: int) -> int:
cdef pair[uintptr_t, size_t] info = parse_buffer_argument(buf, size, False)
cpdef Py_ssize_t read(self, buf, Py_ssize_t size=-1, Py_ssize_t file_offset=0,
Py_ssize_t dev_offset=0):
cdef pair[uintptr_t, Py_ssize_t] info = parse_buffer_argument(buf, size, False)
return self._handle.read(
<void*>info.first,
info.second,
file_offset,
dev_offset,
)

def write(self, buf, size: Optional[int], file_offset: int, dev_offset: int) -> int:
cdef pair[uintptr_t, size_t] info = parse_buffer_argument(buf, size, False)
cpdef Py_ssize_t write(self, buf, Py_ssize_t size=-1, Py_ssize_t file_offset=0,
Py_ssize_t dev_offset=0):
cdef pair[uintptr_t, Py_ssize_t] info = parse_buffer_argument(buf, size, False)
return self._handle.write(
<void*>info.first,
info.second,
file_offset,
dev_offset,
)

def read_async(self, buf, size: Optional[int], file_offset: int, dev_offset: int,
st: uintptr_t) -> IOFutureStream:
stream = <CUstream>st
cdef pair[uintptr_t, size_t] info = parse_buffer_argument(buf, size, False)
cpdef IOFutureStream read_async(self, buf, Py_ssize_t size=-1,
Py_ssize_t file_offset=0, Py_ssize_t dev_offset=0,
uintptr_t stream=0):
cdef pair[uintptr_t, Py_ssize_t] info = parse_buffer_argument(buf, size, False)
return _wrap_stream_future(self._handle.read_async(
<void*>info.first,
info.second,
file_offset,
dev_offset,
stream,
<CUstream>stream,
))

def write_async(self, buf, size: Optional[int], file_offset: int, dev_offset: int,
st: uintptr_t) -> IOFutureStream:
stream = <CUstream>st
cdef pair[uintptr_t, size_t] info = parse_buffer_argument(buf, size, False)
cpdef IOFutureStream write_async(self, buf, Py_ssize_t size=-1,
Py_ssize_t file_offset=0, Py_ssize_t dev_offset=0,
uintptr_t stream=0):
cdef pair[uintptr_t, Py_ssize_t] info = parse_buffer_argument(buf, size, False)
return _wrap_stream_future(self._handle.write_async(
<void*>info.first,
info.second,
file_offset,
dev_offset,
stream,
<CUstream>stream,
))
27 changes: 27 additions & 0 deletions python/kvikio/kvikio/_lib/remote_handle.pxd
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved.
# See file LICENSE for terms.

# distutils: language = c++
# cython: language_level=3

from cython cimport Py_ssize_t
from libcpp.memory cimport unique_ptr

from kvikio._lib.future cimport IOFuture


cdef extern from "<kvikio/remote_handle.hpp>" nogil:
cdef cppclass cpp_RemoteEndpoint "kvikio::RemoteEndpoint":
pass


cdef class RemoteFile:
cdef unique_ptr[cpp_RemoteHandle] _handle

@classmethod
cpdef RemoteFile open_http(cls, str url, Py_ssize_t nbytes=*)

cpdef Py_ssize_t nbytes(self)

cpdef Py_ssize_t read(self, buf, Py_ssize_t size=*, Py_ssize_t file_offset=*)
cpdef IOFuture pread(self, buf, Py_ssize_t size=*, Py_ssize_t file_offset=*)
28 changes: 10 additions & 18 deletions python/kvikio/kvikio/_lib/remote_handle.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,7 @@
# distutils: language = c++
# cython: language_level=3

from typing import Optional

from cython cimport Py_ssize_t
from cython.operator cimport dereference as deref
from libc.stdint cimport uintptr_t
from libcpp.memory cimport make_unique, unique_ptr
Expand All @@ -28,7 +27,7 @@ cdef extern from "<kvikio/remote_handle.hpp>" nogil:
unique_ptr[cpp_RemoteEndpoint] endpoint, size_t nbytes
) except +
cpp_RemoteHandle(unique_ptr[cpp_RemoteEndpoint] endpoint) except +
int nbytes() except +
size_t nbytes() except +
size_t read(
void* buf,
size_t size,
Expand All @@ -50,38 +49,31 @@ cdef string _to_string(str s):


cdef class RemoteFile:
cdef unique_ptr[cpp_RemoteHandle] _handle

@classmethod
def open_http(
cls,
url: str,
nbytes: Optional[int],
):
cpdef RemoteFile open_http(cls, str url, Py_ssize_t nbytes=-1):
cdef RemoteFile ret = RemoteFile()
cdef unique_ptr[cpp_HttpEndpoint] ep = make_unique[cpp_HttpEndpoint](
_to_string(url)
)
if nbytes is None:
if nbytes >= 0:
ret._handle = make_unique[cpp_RemoteHandle](move(ep))
return ret
cdef size_t n = nbytes
ret._handle = make_unique[cpp_RemoteHandle](move(ep), n)
ret._handle = make_unique[cpp_RemoteHandle](move(ep), nbytes)
return ret

def nbytes(self) -> int:
cpdef Py_ssize_t nbytes(self):
return deref(self._handle).nbytes()

def read(self, buf, size: Optional[int], file_offset: int) -> int:
cdef pair[uintptr_t, size_t] info = parse_buffer_argument(buf, size, True)
cpdef Py_ssize_t read(self, buf, Py_ssize_t size=-1, Py_ssize_t file_offset=0):
cdef pair[uintptr_t, Py_ssize_t] info = parse_buffer_argument(buf, size, True)
return deref(self._handle).read(
<void*>info.first,
info.second,
file_offset,
)

def pread(self, buf, size: Optional[int], file_offset: int) -> IOFuture:
cdef pair[uintptr_t, size_t] info = parse_buffer_argument(buf, size, True)
cpdef IOFuture pread(self, buf, Py_ssize_t size=-1, Py_ssize_t file_offset=0):
cdef pair[uintptr_t, Py_ssize_t] info = parse_buffer_argument(buf, size, True)
return _wrap_io_future(
deref(self._handle).pread(
<void*>info.first,
Expand Down
7 changes: 3 additions & 4 deletions python/kvikio/kvikio/remote_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
from __future__ import annotations

import functools
from typing import Optional

from kvikio.cufile import IOFuture

Expand Down Expand Up @@ -54,7 +53,7 @@ def __init__(self, handle):
def open_http(
cls,
url: str,
nbytes: Optional[int] = None,
nbytes: int = -1,
) -> RemoteFile:
"""Open a http file.
Expand Down Expand Up @@ -89,7 +88,7 @@ def nbytes(self) -> int:
"""
return self._handle.nbytes()

def read(self, buf, size: Optional[int] = None, file_offset: int = 0) -> int:
def read(self, buf, size: int = -1, file_offset: int = 0) -> int:
"""Read from remote source into buffer (host or device memory) in parallel.
Parameters
Expand All @@ -107,7 +106,7 @@ def read(self, buf, size: Optional[int] = None, file_offset: int = 0) -> int:
"""
return self.pread(buf, size, file_offset).get()

def pread(self, buf, size: Optional[int] = None, file_offset: int = 0) -> IOFuture:
def pread(self, buf, size: int = -1, file_offset: int = 0) -> IOFuture:
"""Read from remote source into buffer (host or device memory) in parallel.
Parameters
Expand Down

0 comments on commit 73438a7

Please sign in to comment.