From a02ee1e265532651ccce309f1691756e6fb776ec Mon Sep 17 00:00:00 2001 From: jakirkham Date: Wed, 16 Oct 2024 15:54:19 -0700 Subject: [PATCH] More thoroughly type `RemoteHandle` * Convert sizes to `Py_ssize_t`, which Python uses for this info * Type file offsets with `off_t` * Update default values to work with typing * Simplify and optimize `parse_buffer_argument` * Add `.pxd` file for `RemoteHandle` to allow leveraging it in Cython --- python/kvikio/kvikio/_lib/arr.pxd | 4 +- python/kvikio/kvikio/_lib/arr.pyx | 25 +++++----- python/kvikio/kvikio/_lib/file_handle.pxd | 53 ++++++++++++++++++++ python/kvikio/kvikio/_lib/file_handle.pyx | 54 +++++++++++---------- python/kvikio/kvikio/_lib/remote_handle.pxd | 27 +++++++++++ python/kvikio/kvikio/_lib/remote_handle.pyx | 28 ++++------- python/kvikio/kvikio/remote_file.py | 7 ++- 7 files changed, 136 insertions(+), 62 deletions(-) create mode 100644 python/kvikio/kvikio/_lib/file_handle.pxd create mode 100644 python/kvikio/kvikio/_lib/remote_handle.pxd diff --git a/python/kvikio/kvikio/_lib/arr.pxd b/python/kvikio/kvikio/_lib/arr.pxd index 47bad21a3b..52ca5d20f6 100644 --- a/python/kvikio/kvikio/_lib/arr.pxd +++ b/python/kvikio/kvikio/_lib/arr.pxd @@ -31,6 +31,6 @@ cdef class Array: cpdef Array asarray(obj) -cdef pair[uintptr_t, size_t] parse_buffer_argument( - buf, size, bint accept_host_buffer +cdef pair[uintptr_t, Py_ssize_t] parse_buffer_argument( + buf, Py_ssize_t size, bint accept_host_buffer ) except * diff --git a/python/kvikio/kvikio/_lib/arr.pyx b/python/kvikio/kvikio/_lib/arr.pyx index 45c7430313..e4b147bdae 100644 --- a/python/kvikio/kvikio/_lib/arr.pyx +++ b/python/kvikio/kvikio/_lib/arr.pyx @@ -1,6 +1,7 @@ # Copyright (c) 2020-2024, NVIDIA CORPORATION. All rights reserved. # See file LICENSE for terms. +# distutils: language = c++ # cython: language_level=3 @@ -13,6 +14,7 @@ from cpython.tuple cimport PyTuple_New, PyTuple_SET_ITEM from cython cimport auto_pickle, boundscheck, initializedcheck, nonecheck, wraparound from libc.stdint cimport uintptr_t from libc.string cimport memcpy +from libcpp.utility cimport pair try: from numpy import dtype as numpy_dtype @@ -305,22 +307,21 @@ cpdef Array asarray(obj): return Array(obj) -cdef pair[uintptr_t, size_t] parse_buffer_argument( - buf, size, bint accept_host_buffer +cdef pair[uintptr_t, Py_ssize_t] parse_buffer_argument( + buf, Py_ssize_t nbytes, bint accept_host_buffer ) except *: """Parse `buf` and `size` argument and return a pointer and nbytes""" - if not isinstance(buf, Array): - buf = Array(buf) - cdef Array arr = buf + cdef Array arr = asarray(buf) + if not arr._contiguous(): raise ValueError("Array must be contiguous") if not accept_host_buffer and not arr.cuda: raise ValueError("Non-CUDA buffers not supported") - cdef size_t nbytes - if size is None: - nbytes = arr.nbytes - elif size > arr.nbytes: + + cdef Py_ssize_t arr_nbytes = arr._nbytes() + if nbytes < 0: + nbytes = arr_nbytes + elif nbytes > arr_nbytes: raise ValueError("Size is greater than the size of the buffer") - else: - nbytes = size - return pair[uintptr_t, size_t](arr.ptr, nbytes) + + return pair(arr.ptr, nbytes) diff --git a/python/kvikio/kvikio/_lib/file_handle.pxd b/python/kvikio/kvikio/_lib/file_handle.pxd new file mode 100644 index 0000000000..331c54182c --- /dev/null +++ b/python/kvikio/kvikio/_lib/file_handle.pxd @@ -0,0 +1,53 @@ +# Copyright (c) 2021-2024, NVIDIA CORPORATION. All rights reserved. +# See file LICENSE for terms. + +# distutils: language = c++ +# cython: language_level=3 + +import pathlib +from typing import Optional + +from posix cimport fcntl + +from libc.stdint cimport uintptr_t +from libcpp cimport bool +from libcpp.string cimport string +from libcpp.utility cimport move, pair + +from kvikio._lib.arr cimport parse_buffer_argument +from kvikio._lib.future cimport ( + IOFuture, + IOFutureStream, + _wrap_io_future, + _wrap_stream_future, + cpp_StreamFuture, + future, +) + +from kvikio._lib import defaults + +ctypedef int c_int + + +cdef class CuFile: + """File handle for GPUDirect Storage (GDS)""" + cdef FileHandle _handle + + cpdef close(self) + cpdef bint closed(self) + cpdef c_int fileno(self) + cpdef c_int open_flags(self) + cpdef IOFuture pread(self, buf, Py_ssize_t size=*, Py_ssize_t file_offset=*, + Py_ssize_t task_size=*) + cpdef IOFuture pwrite(self, buf, Py_ssize_t size=*, Py_ssize_t file_offset=*, + Py_ssize_t task_size=*) + cpdef Py_ssize_t read(self, buf, Py_ssize_t size=*, Py_ssize_t file_offset=*, + Py_ssize_t dev_offset=*) + cpdef Py_ssize_t write(self, buf, Py_ssize_t size=*, Py_ssize_t file_offset=*, + Py_ssize_t dev_offset=*) + cpdef IOFutureStream read_async(self, buf, Py_ssize_t size=*, + Py_ssize_t file_offset=*, Py_ssize_t dev_offset=*, + uintptr_t stream=*) + cpdef IOFutureStream write_async(self, buf, Py_ssize_t size=*, + Py_ssize_t file_offset=*, Py_ssize_t dev_offset=*, + uintptr_t stream=*) diff --git a/python/kvikio/kvikio/_lib/file_handle.pyx b/python/kvikio/kvikio/_lib/file_handle.pyx index 7a8de368ef..cf36d07e18 100644 --- a/python/kvikio/kvikio/_lib/file_handle.pyx +++ b/python/kvikio/kvikio/_lib/file_handle.pyx @@ -5,7 +5,6 @@ # cython: language_level=3 import pathlib -from typing import Optional from posix cimport fcntl @@ -87,11 +86,10 @@ cdef extern from "" namespace "kvikio" nogil: CUstream stream ) except + +ctypedef int c_int -cdef class CuFile: - """File handle for GPUDirect Storage (GDS)""" - cdef FileHandle _handle +cdef class CuFile: def __init__(self, file_path, flags="r"): self._handle = move( FileHandle( @@ -100,20 +98,21 @@ cdef class CuFile: ) ) - def close(self) -> None: + cpdef close(self): self._handle.close() - def closed(self) -> bool: + cpdef bint closed(self): return self._handle.closed() - def fileno(self) -> int: + cpdef c_int fileno(self): return self._handle.fd() - def open_flags(self) -> int: + cpdef c_int open_flags(self): return self._handle.fd_open_flags() - def pread(self, buf, size: Optional[int], file_offset: int, task_size) -> IOFuture: - cdef pair[uintptr_t, size_t] info = parse_buffer_argument(buf, size, True) + cpdef IOFuture pread(self, buf, Py_ssize_t size=-1, Py_ssize_t file_offset=0, + Py_ssize_t task_size=0): + cdef pair[uintptr_t, Py_ssize_t] info = parse_buffer_argument(buf, size, True) return _wrap_io_future( self._handle.pread( info.first, @@ -123,8 +122,9 @@ cdef class CuFile: ) ) - def pwrite(self, buf, size: Optional[int], file_offset: int, task_size) -> IOFuture: - cdef pair[uintptr_t, size_t] info = parse_buffer_argument(buf, size, True) + cpdef IOFuture pwrite(self, buf, Py_ssize_t size=-1, Py_ssize_t file_offset=0, + Py_ssize_t task_size=0): + cdef pair[uintptr_t, Py_ssize_t] info = parse_buffer_argument(buf, size, True) return _wrap_io_future( self._handle.pwrite( info.first, @@ -134,8 +134,9 @@ cdef class CuFile: ) ) - def read(self, buf, size: Optional[int], file_offset: int, dev_offset: int) -> int: - cdef pair[uintptr_t, size_t] info = parse_buffer_argument(buf, size, False) + cpdef Py_ssize_t read(self, buf, Py_ssize_t size=-1, Py_ssize_t file_offset=0, + Py_ssize_t dev_offset=0): + cdef pair[uintptr_t, Py_ssize_t] info = parse_buffer_argument(buf, size, False) return self._handle.read( info.first, info.second, @@ -143,8 +144,9 @@ cdef class CuFile: dev_offset, ) - def write(self, buf, size: Optional[int], file_offset: int, dev_offset: int) -> int: - cdef pair[uintptr_t, size_t] info = parse_buffer_argument(buf, size, False) + cpdef Py_ssize_t write(self, buf, Py_ssize_t size=-1, Py_ssize_t file_offset=0, + Py_ssize_t dev_offset=0): + cdef pair[uintptr_t, Py_ssize_t] info = parse_buffer_argument(buf, size, False) return self._handle.write( info.first, info.second, @@ -152,26 +154,26 @@ cdef class CuFile: dev_offset, ) - def read_async(self, buf, size: Optional[int], file_offset: int, dev_offset: int, - st: uintptr_t) -> IOFutureStream: - stream = st - cdef pair[uintptr_t, size_t] info = parse_buffer_argument(buf, size, False) + cpdef IOFutureStream read_async(self, buf, Py_ssize_t size=-1, + Py_ssize_t file_offset=0, Py_ssize_t dev_offset=0, + uintptr_t stream=0): + cdef pair[uintptr_t, Py_ssize_t] info = parse_buffer_argument(buf, size, False) return _wrap_stream_future(self._handle.read_async( info.first, info.second, file_offset, dev_offset, - stream, + stream, )) - def write_async(self, buf, size: Optional[int], file_offset: int, dev_offset: int, - st: uintptr_t) -> IOFutureStream: - stream = st - cdef pair[uintptr_t, size_t] info = parse_buffer_argument(buf, size, False) + cpdef IOFutureStream write_async(self, buf, Py_ssize_t size=-1, + Py_ssize_t file_offset=0, Py_ssize_t dev_offset=0, + uintptr_t stream=0): + cdef pair[uintptr_t, Py_ssize_t] info = parse_buffer_argument(buf, size, False) return _wrap_stream_future(self._handle.write_async( info.first, info.second, file_offset, dev_offset, - stream, + stream, )) diff --git a/python/kvikio/kvikio/_lib/remote_handle.pxd b/python/kvikio/kvikio/_lib/remote_handle.pxd new file mode 100644 index 0000000000..4b8924a781 --- /dev/null +++ b/python/kvikio/kvikio/_lib/remote_handle.pxd @@ -0,0 +1,27 @@ +# Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved. +# See file LICENSE for terms. + +# distutils: language = c++ +# cython: language_level=3 + +from cython cimport Py_ssize_t +from libcpp.memory cimport unique_ptr + +from kvikio._lib.future cimport IOFuture + + +cdef extern from "" nogil: + cdef cppclass cpp_RemoteEndpoint "kvikio::RemoteEndpoint": + pass + + +cdef class RemoteFile: + cdef unique_ptr[cpp_RemoteHandle] _handle + + @classmethod + cpdef RemoteFile open_http(cls, str url, Py_ssize_t nbytes=*) + + cpdef Py_ssize_t nbytes(self) + + cpdef Py_ssize_t read(self, buf, Py_ssize_t size=*, Py_ssize_t file_offset=*) + cpdef IOFuture pread(self, buf, Py_ssize_t size=*, Py_ssize_t file_offset=*) diff --git a/python/kvikio/kvikio/_lib/remote_handle.pyx b/python/kvikio/kvikio/_lib/remote_handle.pyx index 93c6ac398a..95c496471b 100644 --- a/python/kvikio/kvikio/_lib/remote_handle.pyx +++ b/python/kvikio/kvikio/_lib/remote_handle.pyx @@ -4,8 +4,7 @@ # distutils: language = c++ # cython: language_level=3 -from typing import Optional - +from cython cimport Py_ssize_t from cython.operator cimport dereference as deref from libc.stdint cimport uintptr_t from libcpp.memory cimport make_unique, unique_ptr @@ -28,7 +27,7 @@ cdef extern from "" nogil: unique_ptr[cpp_RemoteEndpoint] endpoint, size_t nbytes ) except + cpp_RemoteHandle(unique_ptr[cpp_RemoteEndpoint] endpoint) except + - int nbytes() except + + size_t nbytes() except + size_t read( void* buf, size_t size, @@ -50,38 +49,31 @@ cdef string _to_string(str s): cdef class RemoteFile: - cdef unique_ptr[cpp_RemoteHandle] _handle - @classmethod - def open_http( - cls, - url: str, - nbytes: Optional[int], - ): + cpdef RemoteFile open_http(cls, str url, Py_ssize_t nbytes=-1): cdef RemoteFile ret = RemoteFile() cdef unique_ptr[cpp_HttpEndpoint] ep = make_unique[cpp_HttpEndpoint]( _to_string(url) ) - if nbytes is None: + if nbytes >= 0: ret._handle = make_unique[cpp_RemoteHandle](move(ep)) return ret - cdef size_t n = nbytes - ret._handle = make_unique[cpp_RemoteHandle](move(ep), n) + ret._handle = make_unique[cpp_RemoteHandle](move(ep), nbytes) return ret - def nbytes(self) -> int: + cpdef Py_ssize_t nbytes(self): return deref(self._handle).nbytes() - def read(self, buf, size: Optional[int], file_offset: int) -> int: - cdef pair[uintptr_t, size_t] info = parse_buffer_argument(buf, size, True) + cpdef Py_ssize_t read(self, buf, Py_ssize_t size=-1, Py_ssize_t file_offset=0): + cdef pair[uintptr_t, Py_ssize_t] info = parse_buffer_argument(buf, size, True) return deref(self._handle).read( info.first, info.second, file_offset, ) - def pread(self, buf, size: Optional[int], file_offset: int) -> IOFuture: - cdef pair[uintptr_t, size_t] info = parse_buffer_argument(buf, size, True) + cpdef IOFuture pread(self, buf, Py_ssize_t size=-1, Py_ssize_t file_offset=0): + cdef pair[uintptr_t, Py_ssize_t] info = parse_buffer_argument(buf, size, True) return _wrap_io_future( deref(self._handle).pread( info.first, diff --git a/python/kvikio/kvikio/remote_file.py b/python/kvikio/kvikio/remote_file.py index 52bbe8010f..b107876124 100644 --- a/python/kvikio/kvikio/remote_file.py +++ b/python/kvikio/kvikio/remote_file.py @@ -4,7 +4,6 @@ from __future__ import annotations import functools -from typing import Optional from kvikio.cufile import IOFuture @@ -54,7 +53,7 @@ def __init__(self, handle): def open_http( cls, url: str, - nbytes: Optional[int] = None, + nbytes: int = -1, ) -> RemoteFile: """Open a http file. @@ -89,7 +88,7 @@ def nbytes(self) -> int: """ return self._handle.nbytes() - def read(self, buf, size: Optional[int] = None, file_offset: int = 0) -> int: + def read(self, buf, size: int = -1, file_offset: int = 0) -> int: """Read from remote source into buffer (host or device memory) in parallel. Parameters @@ -107,7 +106,7 @@ def read(self, buf, size: Optional[int] = None, file_offset: int = 0) -> int: """ return self.pread(buf, size, file_offset).get() - def pread(self, buf, size: Optional[int] = None, file_offset: int = 0) -> IOFuture: + def pread(self, buf, size: int = -1, file_offset: int = 0) -> IOFuture: """Read from remote source into buffer (host or device memory) in parallel. Parameters