Skip to content

Commit

Permalink
More thoroughly type handle objects
Browse files Browse the repository at this point in the history
  • Loading branch information
jakirkham committed Oct 16, 2024
1 parent a34d6bf commit f6c45d8
Show file tree
Hide file tree
Showing 7 changed files with 223 additions and 172 deletions.
11 changes: 7 additions & 4 deletions python/kvikio/kvikio/_lib/arr.pxd
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
# Copyright (c) 2020-2024, NVIDIA CORPORATION. All rights reserved.
# See file LICENSE for terms.

# distutils: language = c++
# cython: language_level=3


from libc.stdint cimport uintptr_t
from libcpp.utility cimport pair


cdef class Array:
Expand All @@ -31,6 +29,11 @@ cdef class Array:
cpdef Array asarray(obj)


cdef pair[uintptr_t, size_t] parse_buffer_argument(
buf, size, bint accept_host_buffer
cdef struct mem_ptr_nbytes:
uintptr_t ptr
Py_ssize_t nbytes


cdef mem_ptr_nbytes parse_buffer_argument(
buf, Py_ssize_t nbytes, bint accept_host_buffer
) except *
23 changes: 11 additions & 12 deletions python/kvikio/kvikio/_lib/arr.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -305,22 +305,21 @@ cpdef Array asarray(obj):
return Array(obj)


cdef pair[uintptr_t, size_t] parse_buffer_argument(
buf, size, bint accept_host_buffer
cdef mem_ptr_nbytes parse_buffer_argument(
buf, Py_ssize_t nbytes, bint accept_host_buffer
) except *:
"""Parse `buf` and `size` argument and return a pointer and nbytes"""
if not isinstance(buf, Array):
buf = Array(buf)
cdef Array arr = buf
cdef Array arr = asarray(buf)

if not arr._contiguous():
raise ValueError("Array must be contiguous")
if not accept_host_buffer and not arr.cuda:
raise ValueError("Non-CUDA buffers not supported")
cdef size_t nbytes
if size is None:
nbytes = arr.nbytes
elif size > arr.nbytes:

cdef Py_ssize_t arr_nbytes = arr._nbytes()
if nbytes < 0:
nbytes = arr_nbytes
elif nbytes > arr_nbytes:
raise ValueError("Size is greater than the size of the buffer")
else:
nbytes = size
return pair[uintptr_t, size_t](arr.ptr, nbytes)

return mem_ptr_nbytes(ptr=arr.ptr, nbytes=nbytes)
101 changes: 101 additions & 0 deletions python/kvikio/kvikio/_lib/file_handle.pxd
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
# Copyright (c) 2021-2024, NVIDIA CORPORATION. All rights reserved.
# See file LICENSE for terms.

# distutils: language = c++
# cython: language_level=3

from posix cimport fcntl

from libc.stdint cimport uintptr_t
from libcpp cimport bool as cpp_bool
from libcpp.string cimport string

from kvikio._lib import defaults
from kvikio._lib.future cimport IOFuture, IOFutureStream, cpp_StreamFuture, future

ctypedef int c_int


cdef extern from "cuda.h":
ctypedef void* CUstream


cdef extern from "<kvikio/file_handle.hpp>" namespace "kvikio" nogil:
cdef cppclass FileHandle:
FileHandle() except +
FileHandle(c_int fd) except +
FileHandle(
string file_path,
string flags,
) except +
FileHandle(
string file_path,
string flags,
fcntl.mode_t mode
) except +
void close()
cpp_bool closed()
c_int fd()
c_int fd_open_flags() except +
future[size_t] pread(
void* devPtr,
size_t size,
size_t file_offset,
size_t task_size
) except +
future[size_t] pwrite(
void* devPtr,
size_t size,
size_t file_offset,
size_t task_size
) except +
size_t read(
void* devPtr_base,
size_t size,
size_t file_offset,
size_t devPtr_offset
) except +
size_t write(
void* devPtr_base,
size_t size,
size_t file_offset,
size_t devPtr_offset
) except +
cpp_StreamFuture read_async(
void* devPtr_base,
size_t size,
size_t file_offset,
size_t devPtr_offset,
CUstream stream
) except +
cpp_StreamFuture write_async(
void* devPtr_base,
size_t size,
size_t file_offset,
size_t devPtr_offset,
CUstream stream
) except +


cdef class CuFile:
"""File handle for GPUDirect Storage (GDS)"""
cdef FileHandle _handle

cpdef close(self)
cpdef cpp_bool closed(self)
cpdef c_int fileno(self)
cpdef c_int open_flags(self)
cpdef IOFuture pread(self, buf, Py_ssize_t size=*, Py_ssize_t file_offset=*,
Py_ssize_t task_size=*)
cpdef IOFuture pwrite(self, buf, Py_ssize_t size=*, Py_ssize_t file_offset=*,
Py_ssize_t task_size=*)
cpdef Py_ssize_t read(self, buf, Py_ssize_t size=*, Py_ssize_t file_offset=*,
Py_ssize_t dev_offset=*)
cpdef Py_ssize_t write(self, buf, Py_ssize_t size=*, Py_ssize_t file_offset=*,
Py_ssize_t dev_offset=*)
cpdef IOFutureStream read_async(self, buf, Py_ssize_t size=*,
Py_ssize_t file_offset=*, Py_ssize_t dev_offset=*,
uintptr_t stream=*)
cpdef IOFutureStream write_async(self, buf, Py_ssize_t size=*,
Py_ssize_t file_offset=*, Py_ssize_t dev_offset=*,
uintptr_t stream=*)
149 changes: 42 additions & 107 deletions python/kvikio/kvikio/_lib/file_handle.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -5,93 +5,23 @@
# cython: language_level=3

import pathlib
from typing import Optional

from posix cimport fcntl

from libc.stdint cimport uintptr_t
from libcpp cimport bool
from libcpp.string cimport string
from libcpp.utility cimport move, pair
from libcpp cimport bool as cpp_bool
from libcpp.utility cimport move

from kvikio._lib.arr cimport parse_buffer_argument
from kvikio._lib.arr cimport mem_ptr_nbytes, parse_buffer_argument
from kvikio._lib.future cimport (
IOFuture,
IOFutureStream,
_wrap_io_future,
_wrap_stream_future,
cpp_StreamFuture,
future,
)

from kvikio._lib import defaults


cdef extern from "cuda.h":
ctypedef void* CUstream


cdef extern from "<kvikio/file_handle.hpp>" namespace "kvikio" nogil:
cdef cppclass FileHandle:
FileHandle() except +
FileHandle(int fd) except +
FileHandle(
string file_path,
string flags,
) except +
FileHandle(
string file_path,
string flags,
fcntl.mode_t mode
) except +
void close()
bool closed()
int fd()
int fd_open_flags() except +
future[size_t] pread(
void* devPtr,
size_t size,
size_t file_offset,
size_t task_size
) except +
future[size_t] pwrite(
void* devPtr,
size_t size,
size_t file_offset,
size_t task_size
) except +
size_t read(
void* devPtr_base,
size_t size,
size_t file_offset,
size_t devPtr_offset
) except +
size_t write(
void* devPtr_base,
size_t size,
size_t file_offset,
size_t devPtr_offset
) except +
cpp_StreamFuture read_async(
void* devPtr_base,
size_t size,
size_t file_offset,
size_t devPtr_offset,
CUstream stream
) except +
cpp_StreamFuture write_async(
void* devPtr_base,
size_t size,
size_t file_offset,
size_t devPtr_offset,
CUstream stream
) except +


cdef class CuFile:
"""File handle for GPUDirect Storage (GDS)"""
cdef FileHandle _handle

def __init__(self, file_path, flags="r"):
self._handle = move(
FileHandle(
Expand All @@ -100,78 +30,83 @@ cdef class CuFile:
)
)

def close(self) -> None:
cpdef close(self):
self._handle.close()

def closed(self) -> bool:
cpdef cpp_bool closed(self):
return self._handle.closed()

def fileno(self) -> int:
cpdef c_int fileno(self):
return self._handle.fd()

def open_flags(self) -> int:
cpdef c_int open_flags(self):
return self._handle.fd_open_flags()

def pread(self, buf, size: Optional[int], file_offset: int, task_size) -> IOFuture:
cdef pair[uintptr_t, size_t] info = parse_buffer_argument(buf, size, True)
cpdef IOFuture pread(self, buf, Py_ssize_t size=-1, Py_ssize_t file_offset=0,
Py_ssize_t task_size=0):

cdef mem_ptr_nbytes info = parse_buffer_argument(buf, size, True)
return _wrap_io_future(
self._handle.pread(
<void*>info.first,
info.second,
<void*>info.ptr,
info.nbytes,
file_offset,
task_size if task_size else defaults.task_size()
)
)

def pwrite(self, buf, size: Optional[int], file_offset: int, task_size) -> IOFuture:
cdef pair[uintptr_t, size_t] info = parse_buffer_argument(buf, size, True)
cpdef IOFuture pwrite(self, buf, Py_ssize_t size=-1, Py_ssize_t file_offset=0,
Py_ssize_t task_size=0):
cdef mem_ptr_nbytes info = parse_buffer_argument(buf, size, True)
return _wrap_io_future(
self._handle.pwrite(
<void*>info.first,
info.second,
<void*>info.ptr,
info.nbytes,
file_offset,
task_size if task_size else defaults.task_size()
)
)

def read(self, buf, size: Optional[int], file_offset: int, dev_offset: int) -> int:
cdef pair[uintptr_t, size_t] info = parse_buffer_argument(buf, size, False)
cpdef Py_ssize_t read(self, buf, Py_ssize_t size=-1, Py_ssize_t file_offset=0,
Py_ssize_t dev_offset=0):
cdef mem_ptr_nbytes info = parse_buffer_argument(buf, size, False)
return self._handle.read(
<void*>info.first,
info.second,
<void*>info.ptr,
info.nbytes,
file_offset,
dev_offset,
)

def write(self, buf, size: Optional[int], file_offset: int, dev_offset: int) -> int:
cdef pair[uintptr_t, size_t] info = parse_buffer_argument(buf, size, False)
cpdef Py_ssize_t write(self, buf, Py_ssize_t size=-1, Py_ssize_t file_offset=0,
Py_ssize_t dev_offset=0):
cdef mem_ptr_nbytes info = parse_buffer_argument(buf, size, False)
return self._handle.write(
<void*>info.first,
info.second,
<void*>info.ptr,
info.nbytes,
file_offset,
dev_offset,
)

def read_async(self, buf, size: Optional[int], file_offset: int, dev_offset: int,
st: uintptr_t) -> IOFutureStream:
stream = <CUstream>st
cdef pair[uintptr_t, size_t] info = parse_buffer_argument(buf, size, False)
cpdef IOFutureStream read_async(self, buf, Py_ssize_t size=-1,
Py_ssize_t file_offset=0, Py_ssize_t dev_offset=0,
uintptr_t stream=0):
cdef mem_ptr_nbytes info = parse_buffer_argument(buf, size, False)
return _wrap_stream_future(self._handle.read_async(
<void*>info.first,
info.second,
<void*>info.ptr,
info.nbytes,
file_offset,
dev_offset,
stream,
<CUstream>stream,
))

def write_async(self, buf, size: Optional[int], file_offset: int, dev_offset: int,
st: uintptr_t) -> IOFutureStream:
stream = <CUstream>st
cdef pair[uintptr_t, size_t] info = parse_buffer_argument(buf, size, False)
cpdef IOFutureStream write_async(self, buf, Py_ssize_t size=-1,
Py_ssize_t file_offset=0, Py_ssize_t dev_offset=0,
uintptr_t stream=0):
cdef mem_ptr_nbytes info = parse_buffer_argument(buf, size, False)
return _wrap_stream_future(self._handle.write_async(
<void*>info.first,
info.second,
<void*>info.ptr,
info.nbytes,
file_offset,
dev_offset,
stream,
<CUstream>stream,
))
Loading

0 comments on commit f6c45d8

Please sign in to comment.