Skip to content

Commit

Permalink
Implement JNI for chunked Parquet reader (#11961)
Browse files Browse the repository at this point in the history
This adds JNI for chunked Parquet reader. It depends on the chunked Parquet reader implementation PR  (#11867).

Authors:
   - https://github.com/nvdbaranec
   - Nghia Truong (https://github.com/ttnghia)

Approvers:
   - MithunR (https://github.com/mythrocks)
   - Robert (Bobby) Evans (https://github.com/revans2)
  • Loading branch information
ttnghia authored Nov 18, 2022
1 parent a2f69e4 commit 782fba3
Show file tree
Hide file tree
Showing 5 changed files with 298 additions and 0 deletions.
155 changes: 155 additions & 0 deletions java/src/main/java/ai/rapids/cudf/ParquetChunkedReader.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
/*
*
* Copyright (c) 2022, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package ai.rapids.cudf;

import java.io.File;

/**
* Provide an interface for reading a Parquet file in an iterative manner.
*/
public class ParquetChunkedReader implements AutoCloseable {
static {
NativeDepsLoader.loadNativeDeps();
}

/**
* Construct the reader instance from a read limit and a file path.
*
* @param chunkSizeByteLimit Limit on total number of bytes to be returned per read,
* or 0 if there is no limit.
* @param filePath Full path of the input Parquet file to read.
*/
public ParquetChunkedReader(long chunkSizeByteLimit, File filePath) {
this(chunkSizeByteLimit, ParquetOptions.DEFAULT, filePath);
}

/**
* Construct the reader instance from a read limit, a ParquetOptions object, and a file path.
*
* @param chunkSizeByteLimit Limit on total number of bytes to be returned per read,
* or 0 if there is no limit.
* @param opts The options for Parquet reading.
* @param filePath Full path of the input Parquet file to read.
*/
public ParquetChunkedReader(long chunkSizeByteLimit, ParquetOptions opts, File filePath) {
handle = create(chunkSizeByteLimit, opts.getIncludeColumnNames(), opts.getReadBinaryAsString(),
filePath.getAbsolutePath(), 0, 0, opts.timeUnit().typeId.getNativeId());

if(handle == 0) {
throw new IllegalStateException("Cannot create native chunked Parquet reader object.");
}
}

/**
* Construct the reader instance from a read limit and a file already read in a memory buffer.
*
* @param chunkSizeByteLimit Limit on total number of bytes to be returned per read,
* or 0 if there is no limit.
* @param opts The options for Parquet reading.
* @param buffer Raw Parquet file content.
* @param offset The starting offset into buffer.
* @param len The number of bytes to parse the given buffer.
*/
public ParquetChunkedReader(long chunkSizeByteLimit, ParquetOptions opts, HostMemoryBuffer buffer,
long offset, long len) {
handle = create(chunkSizeByteLimit, opts.getIncludeColumnNames(), opts.getReadBinaryAsString(), null,
buffer.getAddress() + offset, len, opts.timeUnit().typeId.getNativeId());

if(handle == 0) {
throw new IllegalStateException("Cannot create native chunked Parquet reader object.");
}
}

/**
* Check if the given file has anything left to read.
*
* @return A boolean value indicating if there is more data to read from file.
*/
public boolean hasNext() {
if(handle == 0) {
throw new IllegalStateException("Native chunked Parquet reader object may have been closed.");
}

if (firstCall) {
// This function needs to return true at least once, so an empty table
// (but having empty columns instead of no column) can be returned by readChunk()
// if the input file has no row.
firstCall = false;
return true;
}
return hasNext(handle);
}

/**
* Read a chunk of rows in the given Parquet file such that the returning data has total size
* does not exceed the given read limit. If the given file has no data, or all data has been read
* before by previous calls to this function, a null Table will be returned.
*
* @return A table of new rows reading from the given file.
*/
public Table readChunk() {
if(handle == 0) {
throw new IllegalStateException("Native chunked Parquet reader object may have been closed.");
}

long[] columnPtrs = readChunk(handle);
return columnPtrs != null ? new Table(columnPtrs) : null;
}

@Override
public void close() {
if (handle != 0) {
close(handle);
handle = 0;
}
}


/**
* Auxiliary variable to help {@link #hasNext()} returning true at least once.
*/
private boolean firstCall = true;

/**
* Handle for memory address of the native Parquet chunked reader class.
*/
private long handle;


/**
* Create a native chunked Parquet reader object on heap and return its memory address.
*
* @param chunkSizeByteLimit Limit on total number of bytes to be returned per read,
* or 0 if there is no limit.
* @param filterColumnNames Name of the columns to read, or an empty array if we want to read all.
* @param binaryToString Whether to convert the corresponding column to String if it is binary.
* @param filePath Full path of the file to read, or given as null if reading from a buffer.
* @param bufferAddrs The address of a buffer to read from, or 0 if we are not using that buffer.
* @param length The length of the buffer to read from.
* @param timeUnit Return type of time unit for timestamps.
*/
private static native long create(long chunkSizeByteLimit, String[] filterColumnNames,
boolean[] binaryToString, String filePath, long bufferAddrs, long length, int timeUnit);

private static native boolean hasNext(long handle);

private static native long[] readChunk(long handle);

private static native void close(long handle);
}
1 change: 1 addition & 0 deletions java/src/main/native/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ add_library(
cudfjni
src/Aggregation128UtilsJni.cpp
src/AggregationJni.cpp
src/ChunkedReaderJni.cpp
src/CudfJni.cpp
src/CudaJni.cpp
src/ColumnVectorJni.cpp
Expand Down
124 changes: 124 additions & 0 deletions java/src/main/native/src/ChunkedReaderJni.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
/*
* Copyright (c) 2022, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include <memory>
#include <vector>

#include <cudf/column/column.hpp>
#include <cudf/io/parquet.hpp>
#include <cudf/table/table.hpp>

#include "cudf_jni_apis.hpp"
#include "jni_utils.hpp"

// This function is defined in `TableJni.cpp`.
jlongArray
cudf::jni::convert_table_for_return(JNIEnv *env, std::unique_ptr<cudf::table> &&table_result,
std::vector<std::unique_ptr<cudf::column>> &&extra_columns);

// This file is for the code releated to chunked reader (Parquet, ORC, etc.).

extern "C" {

// This function should take all the parameters that `Table.readParquet` takes,
// plus one more parameter `long chunkSizeByteLimit`.
JNIEXPORT jlong JNICALL Java_ai_rapids_cudf_ParquetChunkedReader_create(
JNIEnv *env, jclass, jlong chunk_read_limit, jobjectArray filter_col_names,
jbooleanArray j_col_binary_read, jstring inp_file_path, jlong buffer, jlong buffer_length,
jint unit) {
JNI_NULL_CHECK(env, j_col_binary_read, "Null col_binary_read", 0);
bool read_buffer = true;
if (buffer == 0) {
JNI_NULL_CHECK(env, inp_file_path, "Input file or buffer must be supplied", 0);
read_buffer = false;
} else if (inp_file_path != nullptr) {
JNI_THROW_NEW(env, "java/lang/IllegalArgumentException",
"Cannot pass in both a buffer and an inp_file_path", 0);
} else if (buffer_length <= 0) {
JNI_THROW_NEW(env, "java/lang/IllegalArgumentException", "An empty buffer is not supported", 0);
}

try {
cudf::jni::auto_set_device(env);
cudf::jni::native_jstring filename(env, inp_file_path);
if (!read_buffer && filename.is_empty()) {
JNI_THROW_NEW(env, "java/lang/IllegalArgumentException", "inp_file_path cannot be empty", 0);
}

cudf::jni::native_jstringArray n_filter_col_names(env, filter_col_names);

// TODO: This variable is unused now, but we still don't know what to do with it yet.
// As such, it needs to stay here for a little more time before we decide to use it again,
// or remove it completely.
cudf::jni::native_jbooleanArray n_col_binary_read(env, j_col_binary_read);
(void)n_col_binary_read;

auto const source = read_buffer ?
cudf::io::source_info(reinterpret_cast<char *>(buffer),
static_cast<std::size_t>(buffer_length)) :
cudf::io::source_info(filename.get());

auto opts_builder = cudf::io::parquet_reader_options::builder(source);
if (n_filter_col_names.size() > 0) {
opts_builder = opts_builder.columns(n_filter_col_names.as_cpp_vector());
}
auto const read_opts = opts_builder.convert_strings_to_categories(false)
.timestamp_type(cudf::data_type(static_cast<cudf::type_id>(unit)))
.build();

return reinterpret_cast<jlong>(new cudf::io::chunked_parquet_reader(
static_cast<std::size_t>(chunk_read_limit), read_opts));
}
CATCH_STD(env, 0);
}

JNIEXPORT jboolean JNICALL Java_ai_rapids_cudf_ParquetChunkedReader_hasNext(JNIEnv *env, jclass,
jlong handle) {
JNI_NULL_CHECK(env, handle, "handle is null", false);

try {
cudf::jni::auto_set_device(env);
auto const reader_ptr = reinterpret_cast<cudf::io::chunked_parquet_reader *const>(handle);
return reader_ptr->has_next();
}
CATCH_STD(env, false);
}

JNIEXPORT jlongArray JNICALL Java_ai_rapids_cudf_ParquetChunkedReader_readChunk(JNIEnv *env, jclass,
jlong handle) {
JNI_NULL_CHECK(env, handle, "handle is null", 0);

try {
cudf::jni::auto_set_device(env);
auto const reader_ptr = reinterpret_cast<cudf::io::chunked_parquet_reader *const>(handle);
auto chunk = reader_ptr->read_chunk();
return chunk.tbl ? cudf::jni::convert_table_for_return(env, chunk.tbl) : nullptr;
}
CATCH_STD(env, 0);
}

JNIEXPORT void JNICALL Java_ai_rapids_cudf_ParquetChunkedReader_close(JNIEnv *env, jclass,
jlong handle) {
JNI_NULL_CHECK(env, handle, "handle is null", );

try {
cudf::jni::auto_set_device(env);
delete reinterpret_cast<cudf::io::chunked_parquet_reader *>(handle);
}
CATCH_STD(env, );
}

} // extern "C"
18 changes: 18 additions & 0 deletions java/src/test/java/ai/rapids/cudf/TableTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@

public class TableTest extends CudfTestBase {
private static final File TEST_PARQUET_FILE = TestUtils.getResourceAsFile("acq.parquet");
private static final File TEST_PARQUET_FILE_CHUNKED_READ = TestUtils.getResourceAsFile("splittable.parquet");
private static final File TEST_PARQUET_FILE_BINARY = TestUtils.getResourceAsFile("binary.parquet");
private static final File TEST_ORC_FILE = TestUtils.getResourceAsFile("TestOrcFile.orc");
private static final File TEST_ORC_TIMESTAMP_DATE_FILE = TestUtils.getResourceAsFile("timestamp-date-test.orc");
Expand Down Expand Up @@ -725,6 +726,23 @@ void testReadParquetContainsDecimalData() {
}
}

@Test
void testChunkedReadParquet() {
try (ParquetChunkedReader reader = new ParquetChunkedReader(240000,
TEST_PARQUET_FILE_CHUNKED_READ)) {
int numChunks = 0;
long totalRows = 0;
while(reader.hasNext()) {
++numChunks;
try(Table chunk = reader.readChunk()) {
totalRows += chunk.getRowCount();
}
}
assertEquals(2, numChunks);
assertEquals(40000, totalRows);
}
}

@Test
void testReadAvro() {
AvroOptions opts = AvroOptions.builder()
Expand Down
Binary file added java/src/test/resources/splittable.parquet
Binary file not shown.

0 comments on commit 782fba3

Please sign in to comment.