diff --git a/java/src/main/java/ai/rapids/cudf/ParquetChunkedReader.java b/java/src/main/java/ai/rapids/cudf/ParquetChunkedReader.java new file mode 100644 index 00000000000..c34336ac73f --- /dev/null +++ b/java/src/main/java/ai/rapids/cudf/ParquetChunkedReader.java @@ -0,0 +1,155 @@ +/* + * + * Copyright (c) 2022, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package ai.rapids.cudf; + +import java.io.File; + +/** + * Provide an interface for reading a Parquet file in an iterative manner. + */ +public class ParquetChunkedReader implements AutoCloseable { + static { + NativeDepsLoader.loadNativeDeps(); + } + + /** + * Construct the reader instance from a read limit and a file path. + * + * @param chunkSizeByteLimit Limit on total number of bytes to be returned per read, + * or 0 if there is no limit. + * @param filePath Full path of the input Parquet file to read. + */ + public ParquetChunkedReader(long chunkSizeByteLimit, File filePath) { + this(chunkSizeByteLimit, ParquetOptions.DEFAULT, filePath); + } + + /** + * Construct the reader instance from a read limit, a ParquetOptions object, and a file path. + * + * @param chunkSizeByteLimit Limit on total number of bytes to be returned per read, + * or 0 if there is no limit. + * @param opts The options for Parquet reading. + * @param filePath Full path of the input Parquet file to read. + */ + public ParquetChunkedReader(long chunkSizeByteLimit, ParquetOptions opts, File filePath) { + handle = create(chunkSizeByteLimit, opts.getIncludeColumnNames(), opts.getReadBinaryAsString(), + filePath.getAbsolutePath(), 0, 0, opts.timeUnit().typeId.getNativeId()); + + if(handle == 0) { + throw new IllegalStateException("Cannot create native chunked Parquet reader object."); + } + } + + /** + * Construct the reader instance from a read limit and a file already read in a memory buffer. + * + * @param chunkSizeByteLimit Limit on total number of bytes to be returned per read, + * or 0 if there is no limit. + * @param opts The options for Parquet reading. + * @param buffer Raw Parquet file content. + * @param offset The starting offset into buffer. + * @param len The number of bytes to parse the given buffer. + */ + public ParquetChunkedReader(long chunkSizeByteLimit, ParquetOptions opts, HostMemoryBuffer buffer, + long offset, long len) { + handle = create(chunkSizeByteLimit, opts.getIncludeColumnNames(), opts.getReadBinaryAsString(), null, + buffer.getAddress() + offset, len, opts.timeUnit().typeId.getNativeId()); + + if(handle == 0) { + throw new IllegalStateException("Cannot create native chunked Parquet reader object."); + } + } + + /** + * Check if the given file has anything left to read. + * + * @return A boolean value indicating if there is more data to read from file. + */ + public boolean hasNext() { + if(handle == 0) { + throw new IllegalStateException("Native chunked Parquet reader object may have been closed."); + } + + if (firstCall) { + // This function needs to return true at least once, so an empty table + // (but having empty columns instead of no column) can be returned by readChunk() + // if the input file has no row. + firstCall = false; + return true; + } + return hasNext(handle); + } + + /** + * Read a chunk of rows in the given Parquet file such that the returning data has total size + * does not exceed the given read limit. If the given file has no data, or all data has been read + * before by previous calls to this function, a null Table will be returned. + * + * @return A table of new rows reading from the given file. + */ + public Table readChunk() { + if(handle == 0) { + throw new IllegalStateException("Native chunked Parquet reader object may have been closed."); + } + + long[] columnPtrs = readChunk(handle); + return columnPtrs != null ? new Table(columnPtrs) : null; + } + + @Override + public void close() { + if (handle != 0) { + close(handle); + handle = 0; + } + } + + + /** + * Auxiliary variable to help {@link #hasNext()} returning true at least once. + */ + private boolean firstCall = true; + + /** + * Handle for memory address of the native Parquet chunked reader class. + */ + private long handle; + + + /** + * Create a native chunked Parquet reader object on heap and return its memory address. + * + * @param chunkSizeByteLimit Limit on total number of bytes to be returned per read, + * or 0 if there is no limit. + * @param filterColumnNames Name of the columns to read, or an empty array if we want to read all. + * @param binaryToString Whether to convert the corresponding column to String if it is binary. + * @param filePath Full path of the file to read, or given as null if reading from a buffer. + * @param bufferAddrs The address of a buffer to read from, or 0 if we are not using that buffer. + * @param length The length of the buffer to read from. + * @param timeUnit Return type of time unit for timestamps. + */ + private static native long create(long chunkSizeByteLimit, String[] filterColumnNames, + boolean[] binaryToString, String filePath, long bufferAddrs, long length, int timeUnit); + + private static native boolean hasNext(long handle); + + private static native long[] readChunk(long handle); + + private static native void close(long handle); +} diff --git a/java/src/main/native/CMakeLists.txt b/java/src/main/native/CMakeLists.txt index 339f0f439a0..ac05b16b39a 100755 --- a/java/src/main/native/CMakeLists.txt +++ b/java/src/main/native/CMakeLists.txt @@ -130,6 +130,7 @@ add_library( cudfjni src/Aggregation128UtilsJni.cpp src/AggregationJni.cpp + src/ChunkedReaderJni.cpp src/CudfJni.cpp src/CudaJni.cpp src/ColumnVectorJni.cpp diff --git a/java/src/main/native/src/ChunkedReaderJni.cpp b/java/src/main/native/src/ChunkedReaderJni.cpp new file mode 100644 index 00000000000..553ec46d569 --- /dev/null +++ b/java/src/main/native/src/ChunkedReaderJni.cpp @@ -0,0 +1,124 @@ +/* + * Copyright (c) 2022, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include + +#include +#include +#include + +#include "cudf_jni_apis.hpp" +#include "jni_utils.hpp" + +// This function is defined in `TableJni.cpp`. +jlongArray +cudf::jni::convert_table_for_return(JNIEnv *env, std::unique_ptr &&table_result, + std::vector> &&extra_columns); + +// This file is for the code releated to chunked reader (Parquet, ORC, etc.). + +extern "C" { + +// This function should take all the parameters that `Table.readParquet` takes, +// plus one more parameter `long chunkSizeByteLimit`. +JNIEXPORT jlong JNICALL Java_ai_rapids_cudf_ParquetChunkedReader_create( + JNIEnv *env, jclass, jlong chunk_read_limit, jobjectArray filter_col_names, + jbooleanArray j_col_binary_read, jstring inp_file_path, jlong buffer, jlong buffer_length, + jint unit) { + JNI_NULL_CHECK(env, j_col_binary_read, "Null col_binary_read", 0); + bool read_buffer = true; + if (buffer == 0) { + JNI_NULL_CHECK(env, inp_file_path, "Input file or buffer must be supplied", 0); + read_buffer = false; + } else if (inp_file_path != nullptr) { + JNI_THROW_NEW(env, "java/lang/IllegalArgumentException", + "Cannot pass in both a buffer and an inp_file_path", 0); + } else if (buffer_length <= 0) { + JNI_THROW_NEW(env, "java/lang/IllegalArgumentException", "An empty buffer is not supported", 0); + } + + try { + cudf::jni::auto_set_device(env); + cudf::jni::native_jstring filename(env, inp_file_path); + if (!read_buffer && filename.is_empty()) { + JNI_THROW_NEW(env, "java/lang/IllegalArgumentException", "inp_file_path cannot be empty", 0); + } + + cudf::jni::native_jstringArray n_filter_col_names(env, filter_col_names); + + // TODO: This variable is unused now, but we still don't know what to do with it yet. + // As such, it needs to stay here for a little more time before we decide to use it again, + // or remove it completely. + cudf::jni::native_jbooleanArray n_col_binary_read(env, j_col_binary_read); + (void)n_col_binary_read; + + auto const source = read_buffer ? + cudf::io::source_info(reinterpret_cast(buffer), + static_cast(buffer_length)) : + cudf::io::source_info(filename.get()); + + auto opts_builder = cudf::io::parquet_reader_options::builder(source); + if (n_filter_col_names.size() > 0) { + opts_builder = opts_builder.columns(n_filter_col_names.as_cpp_vector()); + } + auto const read_opts = opts_builder.convert_strings_to_categories(false) + .timestamp_type(cudf::data_type(static_cast(unit))) + .build(); + + return reinterpret_cast(new cudf::io::chunked_parquet_reader( + static_cast(chunk_read_limit), read_opts)); + } + CATCH_STD(env, 0); +} + +JNIEXPORT jboolean JNICALL Java_ai_rapids_cudf_ParquetChunkedReader_hasNext(JNIEnv *env, jclass, + jlong handle) { + JNI_NULL_CHECK(env, handle, "handle is null", false); + + try { + cudf::jni::auto_set_device(env); + auto const reader_ptr = reinterpret_cast(handle); + return reader_ptr->has_next(); + } + CATCH_STD(env, false); +} + +JNIEXPORT jlongArray JNICALL Java_ai_rapids_cudf_ParquetChunkedReader_readChunk(JNIEnv *env, jclass, + jlong handle) { + JNI_NULL_CHECK(env, handle, "handle is null", 0); + + try { + cudf::jni::auto_set_device(env); + auto const reader_ptr = reinterpret_cast(handle); + auto chunk = reader_ptr->read_chunk(); + return chunk.tbl ? cudf::jni::convert_table_for_return(env, chunk.tbl) : nullptr; + } + CATCH_STD(env, 0); +} + +JNIEXPORT void JNICALL Java_ai_rapids_cudf_ParquetChunkedReader_close(JNIEnv *env, jclass, + jlong handle) { + JNI_NULL_CHECK(env, handle, "handle is null", ); + + try { + cudf::jni::auto_set_device(env); + delete reinterpret_cast(handle); + } + CATCH_STD(env, ); +} + +} // extern "C" diff --git a/java/src/test/java/ai/rapids/cudf/TableTest.java b/java/src/test/java/ai/rapids/cudf/TableTest.java index 2a33c37a8d6..bf951a871e7 100644 --- a/java/src/test/java/ai/rapids/cudf/TableTest.java +++ b/java/src/test/java/ai/rapids/cudf/TableTest.java @@ -75,6 +75,7 @@ public class TableTest extends CudfTestBase { private static final File TEST_PARQUET_FILE = TestUtils.getResourceAsFile("acq.parquet"); + private static final File TEST_PARQUET_FILE_CHUNKED_READ = TestUtils.getResourceAsFile("splittable.parquet"); private static final File TEST_PARQUET_FILE_BINARY = TestUtils.getResourceAsFile("binary.parquet"); private static final File TEST_ORC_FILE = TestUtils.getResourceAsFile("TestOrcFile.orc"); private static final File TEST_ORC_TIMESTAMP_DATE_FILE = TestUtils.getResourceAsFile("timestamp-date-test.orc"); @@ -725,6 +726,23 @@ void testReadParquetContainsDecimalData() { } } + @Test + void testChunkedReadParquet() { + try (ParquetChunkedReader reader = new ParquetChunkedReader(240000, + TEST_PARQUET_FILE_CHUNKED_READ)) { + int numChunks = 0; + long totalRows = 0; + while(reader.hasNext()) { + ++numChunks; + try(Table chunk = reader.readChunk()) { + totalRows += chunk.getRowCount(); + } + } + assertEquals(2, numChunks); + assertEquals(40000, totalRows); + } + } + @Test void testReadAvro() { AvroOptions opts = AvroOptions.builder() diff --git a/java/src/test/resources/splittable.parquet b/java/src/test/resources/splittable.parquet new file mode 100644 index 00000000000..0f110ee1000 Binary files /dev/null and b/java/src/test/resources/splittable.parquet differ