Skip to content

Commit

Permalink
More thoroughly type RemoteHandle
Browse files Browse the repository at this point in the history
* Convert sizes to `Py_ssize_t`, which Python uses for this info
* Type file offsets with `off_t`
* Update default values to work with typing
* Simplify and optimize `parse_buffer_argument`
* Add `.pxd` file for `RemoteHandle` to allow leveraging it in Cython
  • Loading branch information
jakirkham committed Oct 16, 2024
1 parent a34d6bf commit 0393684
Show file tree
Hide file tree
Showing 7 changed files with 148 additions and 85 deletions.
11 changes: 7 additions & 4 deletions python/kvikio/kvikio/_lib/arr.pxd
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
# Copyright (c) 2020-2024, NVIDIA CORPORATION. All rights reserved.
# See file LICENSE for terms.

# distutils: language = c++
# cython: language_level=3


from libc.stdint cimport uintptr_t
from libcpp.utility cimport pair


cdef class Array:
Expand All @@ -31,6 +29,11 @@ cdef class Array:
cpdef Array asarray(obj)


cdef pair[uintptr_t, size_t] parse_buffer_argument(
buf, size, bint accept_host_buffer
cdef struct mem_ptr_nbytes:
uintptr_t ptr
Py_ssize_t nbytes


cdef mem_ptr_nbytes parse_buffer_argument(
buf, Py_ssize_t nbytes, bint accept_host_buffer
) except *
23 changes: 11 additions & 12 deletions python/kvikio/kvikio/_lib/arr.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -305,22 +305,21 @@ cpdef Array asarray(obj):
return Array(obj)


cdef pair[uintptr_t, size_t] parse_buffer_argument(
buf, size, bint accept_host_buffer
cdef mem_ptr_nbytes parse_buffer_argument(
buf, Py_ssize_t nbytes, bint accept_host_buffer
) except *:
"""Parse `buf` and `size` argument and return a pointer and nbytes"""
if not isinstance(buf, Array):
buf = Array(buf)
cdef Array arr = buf
cdef Array arr = asarray(buf)

if not arr._contiguous():
raise ValueError("Array must be contiguous")
if not accept_host_buffer and not arr.cuda:
raise ValueError("Non-CUDA buffers not supported")
cdef size_t nbytes
if size is None:
nbytes = arr.nbytes
elif size > arr.nbytes:

cdef Py_ssize_t arr_nbytes = arr._nbytes()
if nbytes < 0:
nbytes = arr_nbytes
elif nbytes > arr_nbytes:
raise ValueError("Size is greater than the size of the buffer")
else:
nbytes = size
return pair[uintptr_t, size_t](arr.ptr, nbytes)

return mem_ptr_nbytes(ptr=arr.ptr, nbytes=nbytes)
44 changes: 44 additions & 0 deletions python/kvikio/kvikio/_lib/file_handle.pxd
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
# Copyright (c) 2021-2024, NVIDIA CORPORATION. All rights reserved.
# See file LICENSE for terms.

# distutils: language = c++
# cython: language_level=3

from posix cimport fcntl

from libc.stdint cimport uintptr_t
from libcpp cimport bool

from kvikio._lib import defaults
from kvikio._lib.future cimport IOFuture, IOFutureStream

ctypedef int c_int


cdef extern from "<kvikio/file_handle.hpp>" namespace "kvikio" nogil:
cdef cppclass FileHandle:
pass


cdef class CuFile:
"""File handle for GPUDirect Storage (GDS)"""
cdef FileHandle _handle

cpdef close(self)
cpdef bint closed(self)
cpdef c_int fileno(self)
cpdef c_int open_flags(self)
cpdef IOFuture pread(self, buf, Py_ssize_t size=*, Py_ssize_t file_offset=*,
Py_ssize_t task_size=*)
cpdef IOFuture pwrite(self, buf, Py_ssize_t size=*, Py_ssize_t file_offset=*,
Py_ssize_t task_size=*)
cpdef Py_ssize_t read(self, buf, Py_ssize_t size=*, Py_ssize_t file_offset=*,
Py_ssize_t dev_offset=*)
cpdef Py_ssize_t write(self, buf, Py_ssize_t size=*, Py_ssize_t file_offset=*,
Py_ssize_t dev_offset=*)
cpdef IOFutureStream read_async(self, buf, Py_ssize_t size=*,
Py_ssize_t file_offset=*, Py_ssize_t dev_offset=*,
uintptr_t stream=*)
cpdef IOFutureStream write_async(self, buf, Py_ssize_t size=*,
Py_ssize_t file_offset=*, Py_ssize_t dev_offset=*,
uintptr_t stream=*)
81 changes: 41 additions & 40 deletions python/kvikio/kvikio/_lib/file_handle.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,15 @@
# cython: language_level=3

import pathlib
from typing import Optional

from posix cimport fcntl

from libc.stdint cimport uintptr_t
from libcpp cimport bool
from libcpp.string cimport string
from libcpp.utility cimport move, pair
from libcpp.utility cimport move

from kvikio._lib.arr cimport parse_buffer_argument
from kvikio._lib.arr cimport mem_ptr_nbytes, parse_buffer_argument
from kvikio._lib.future cimport (
IOFuture,
IOFutureStream,
Expand Down Expand Up @@ -89,9 +88,6 @@ cdef extern from "<kvikio/file_handle.hpp>" namespace "kvikio" nogil:


cdef class CuFile:
"""File handle for GPUDirect Storage (GDS)"""
cdef FileHandle _handle

def __init__(self, file_path, flags="r"):
self._handle = move(
FileHandle(
Expand All @@ -100,78 +96,83 @@ cdef class CuFile:
)
)

def close(self) -> None:
cpdef close(self):
self._handle.close()

def closed(self) -> bool:
cpdef bint closed(self):
return self._handle.closed()

def fileno(self) -> int:
cpdef c_int fileno(self):
return self._handle.fd()

def open_flags(self) -> int:
cpdef c_int open_flags(self):
return self._handle.fd_open_flags()

def pread(self, buf, size: Optional[int], file_offset: int, task_size) -> IOFuture:
cdef pair[uintptr_t, size_t] info = parse_buffer_argument(buf, size, True)
cpdef IOFuture pread(self, buf, Py_ssize_t size=-1, Py_ssize_t file_offset=0,
Py_ssize_t task_size=0):

cdef mem_ptr_nbytes info = parse_buffer_argument(buf, size, True)
return _wrap_io_future(
self._handle.pread(
<void*>info.first,
info.second,
<void*>info.ptr,
info.nbytes,
file_offset,
task_size if task_size else defaults.task_size()
)
)

def pwrite(self, buf, size: Optional[int], file_offset: int, task_size) -> IOFuture:
cdef pair[uintptr_t, size_t] info = parse_buffer_argument(buf, size, True)
cpdef IOFuture pwrite(self, buf, Py_ssize_t size=-1, Py_ssize_t file_offset=0,
Py_ssize_t task_size=0):
cdef mem_ptr_nbytes info = parse_buffer_argument(buf, size, True)
return _wrap_io_future(
self._handle.pwrite(
<void*>info.first,
info.second,
<void*>info.ptr,
info.nbytes,
file_offset,
task_size if task_size else defaults.task_size()
)
)

def read(self, buf, size: Optional[int], file_offset: int, dev_offset: int) -> int:
cdef pair[uintptr_t, size_t] info = parse_buffer_argument(buf, size, False)
cpdef Py_ssize_t read(self, buf, Py_ssize_t size=-1, Py_ssize_t file_offset=0,
Py_ssize_t dev_offset=0):
cdef mem_ptr_nbytes info = parse_buffer_argument(buf, size, False)
return self._handle.read(
<void*>info.first,
info.second,
<void*>info.ptr,
info.nbytes,
file_offset,
dev_offset,
)

def write(self, buf, size: Optional[int], file_offset: int, dev_offset: int) -> int:
cdef pair[uintptr_t, size_t] info = parse_buffer_argument(buf, size, False)
cpdef Py_ssize_t write(self, buf, Py_ssize_t size=-1, Py_ssize_t file_offset=0,
Py_ssize_t dev_offset=0):
cdef mem_ptr_nbytes info = parse_buffer_argument(buf, size, False)
return self._handle.write(
<void*>info.first,
info.second,
<void*>info.ptr,
info.nbytes,
file_offset,
dev_offset,
)

def read_async(self, buf, size: Optional[int], file_offset: int, dev_offset: int,
st: uintptr_t) -> IOFutureStream:
stream = <CUstream>st
cdef pair[uintptr_t, size_t] info = parse_buffer_argument(buf, size, False)
cpdef IOFutureStream read_async(self, buf, Py_ssize_t size=-1,
Py_ssize_t file_offset=0, Py_ssize_t dev_offset=0,
uintptr_t stream=0):
cdef mem_ptr_nbytes info = parse_buffer_argument(buf, size, False)
return _wrap_stream_future(self._handle.read_async(
<void*>info.first,
info.second,
<void*>info.ptr,
info.nbytes,
file_offset,
dev_offset,
stream,
<CUstream>stream,
))

def write_async(self, buf, size: Optional[int], file_offset: int, dev_offset: int,
st: uintptr_t) -> IOFutureStream:
stream = <CUstream>st
cdef pair[uintptr_t, size_t] info = parse_buffer_argument(buf, size, False)
cpdef IOFutureStream write_async(self, buf, Py_ssize_t size=-1,
Py_ssize_t file_offset=0, Py_ssize_t dev_offset=0,
uintptr_t stream=0):
cdef mem_ptr_nbytes info = parse_buffer_argument(buf, size, False)
return _wrap_stream_future(self._handle.write_async(
<void*>info.first,
info.second,
<void*>info.ptr,
info.nbytes,
file_offset,
dev_offset,
stream,
<CUstream>stream,
))
26 changes: 26 additions & 0 deletions python/kvikio/kvikio/_lib/remote_handle.pxd
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved.
# See file LICENSE for terms.

# distutils: language = c++
# cython: language_level=3

from libcpp.memory cimport unique_ptr

from kvikio._lib.future cimport IOFuture


cdef extern from "<kvikio/remote_handle.hpp>" nogil:
cdef cppclass cpp_RemoteEndpoint "kvikio::RemoteEndpoint":
pass


cdef class RemoteFile:
cdef unique_ptr[cpp_RemoteHandle] _handle

@classmethod
cpdef RemoteFile open_http(cls, str url, Py_ssize_t nbytes=*)

cpdef Py_ssize_t nbytes(self)

cpdef Py_ssize_t read(self, buf, Py_ssize_t size=*, Py_ssize_t file_offset=*)
cpdef IOFuture pread(self, buf, Py_ssize_t size=*, Py_ssize_t file_offset=*)
41 changes: 16 additions & 25 deletions python/kvikio/kvikio/_lib/remote_handle.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,13 @@
# distutils: language = c++
# cython: language_level=3

from typing import Optional

from cython cimport Py_ssize_t
from cython.operator cimport dereference as deref
from libc.stdint cimport uintptr_t
from libcpp.memory cimport make_unique, unique_ptr
from libcpp.string cimport string
from libcpp.utility cimport move, pair
from libcpp.utility cimport move

from kvikio._lib.arr cimport parse_buffer_argument
from kvikio._lib.arr cimport mem_ptr_nbytes, parse_buffer_argument
from kvikio._lib.future cimport IOFuture, _wrap_io_future, future


Expand All @@ -28,7 +26,7 @@ cdef extern from "<kvikio/remote_handle.hpp>" nogil:
unique_ptr[cpp_RemoteEndpoint] endpoint, size_t nbytes
) except +
cpp_RemoteHandle(unique_ptr[cpp_RemoteEndpoint] endpoint) except +
int nbytes() except +
size_t nbytes() except +
size_t read(
void* buf,
size_t size,
Expand All @@ -50,42 +48,35 @@ cdef string _to_string(str s):


cdef class RemoteFile:
cdef unique_ptr[cpp_RemoteHandle] _handle

@classmethod
def open_http(
cls,
url: str,
nbytes: Optional[int],
):
cpdef RemoteFile open_http(cls, str url, Py_ssize_t nbytes=-1):
cdef RemoteFile ret = RemoteFile()
cdef unique_ptr[cpp_HttpEndpoint] ep = make_unique[cpp_HttpEndpoint](
_to_string(url)
)
if nbytes is None:
if nbytes >= 0:
ret._handle = make_unique[cpp_RemoteHandle](move(ep))
return ret
cdef size_t n = nbytes
ret._handle = make_unique[cpp_RemoteHandle](move(ep), n)
ret._handle = make_unique[cpp_RemoteHandle](move(ep), nbytes)
return ret

def nbytes(self) -> int:
cpdef Py_ssize_t nbytes(self):
return deref(self._handle).nbytes()

def read(self, buf, size: Optional[int], file_offset: int) -> int:
cdef pair[uintptr_t, size_t] info = parse_buffer_argument(buf, size, True)
cpdef Py_ssize_t read(self, buf, Py_ssize_t size=-1, Py_ssize_t file_offset=0):
cdef mem_ptr_nbytes info = parse_buffer_argument(buf, size, True)
return deref(self._handle).read(
<void*>info.first,
info.second,
<void*>info.ptr,
info.nbytes,
file_offset,
)

def pread(self, buf, size: Optional[int], file_offset: int) -> IOFuture:
cdef pair[uintptr_t, size_t] info = parse_buffer_argument(buf, size, True)
cpdef IOFuture pread(self, buf, Py_ssize_t size=-1, Py_ssize_t file_offset=0):
cdef mem_ptr_nbytes info = parse_buffer_argument(buf, size, True)
return _wrap_io_future(
deref(self._handle).pread(
<void*>info.first,
info.second,
<void*>info.ptr,
info.nbytes,
file_offset,
)
)
7 changes: 3 additions & 4 deletions python/kvikio/kvikio/remote_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
from __future__ import annotations

import functools
from typing import Optional

from kvikio.cufile import IOFuture

Expand Down Expand Up @@ -54,7 +53,7 @@ def __init__(self, handle):
def open_http(
cls,
url: str,
nbytes: Optional[int] = None,
nbytes: int = -1,
) -> RemoteFile:
"""Open a http file.
Expand Down Expand Up @@ -89,7 +88,7 @@ def nbytes(self) -> int:
"""
return self._handle.nbytes()

def read(self, buf, size: Optional[int] = None, file_offset: int = 0) -> int:
def read(self, buf, size: int = -1, file_offset: int = 0) -> int:
"""Read from remote source into buffer (host or device memory) in parallel.
Parameters
Expand All @@ -107,7 +106,7 @@ def read(self, buf, size: Optional[int] = None, file_offset: int = 0) -> int:
"""
return self.pread(buf, size, file_offset).get()

def pread(self, buf, size: Optional[int] = None, file_offset: int = 0) -> IOFuture:
def pread(self, buf, size: int = -1, file_offset: int = 0) -> IOFuture:
"""Read from remote source into buffer (host or device memory) in parallel.
Parameters
Expand Down

0 comments on commit 0393684

Please sign in to comment.