Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

JNI: Pass names of children struct columns to native Arrow IPC writer [skip ci] #7598

Merged
merged 16 commits into from
Mar 20, 2021
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 33 additions & 1 deletion java/src/main/java/ai/rapids/cudf/ArrowIPCWriterOptions.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/*
*
* Copyright (c) 2020, NVIDIA CORPORATION.
* Copyright (c) 2020-2021, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -18,6 +18,10 @@

package ai.rapids.cudf;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

/**
* Settings for writing Arrow IPC data.
*/
Expand All @@ -34,11 +38,13 @@ public interface DoneOnGpu {

private final long size;
private final DoneOnGpu callback;
private final ColumnMetadata[] columnMeta;

private ArrowIPCWriterOptions(Builder builder) {
super(builder);
this.size = builder.size;
this.callback = builder.callback;
this.columnMeta = builder.columnMeta.toArray(new ColumnMetadata[builder.columnMeta.size()]);
}

public long getMaxChunkSize() {
Expand All @@ -49,9 +55,23 @@ public DoneOnGpu getCallback() {
return callback;
}

public ColumnMetadata[] getColumnMetadata() {
if (columnMeta == null || columnMeta.length == 0) {
// This is for compatibility. Try building from column names when column meta is empty.
// Should remove this once all the callers update to use only column metadata.
return Arrays
.stream(getColumnNames())
.map(ColumnMetadata::new)
.toArray(ColumnMetadata[]::new);
} else {
return columnMeta;
}
}

public static class Builder extends WriterBuilder<Builder> {
private long size = -1;
private DoneOnGpu callback = (ignored) -> {};
private List<ColumnMetadata> columnMeta = new ArrayList<>();

public Builder withMaxChunkSize(long size) {
this.size = size;
Expand All @@ -67,6 +87,18 @@ public Builder withCallback(DoneOnGpu callback) {
return this;
}

/**
* This should be used instead of `withColumnNames` when there are children
* columns of struct type.
*
* (`columnNullability` is not used by Arrow IPC Writer, so it's fine to be ignored here.
* It can be placed into `ColumnMetadata` if needing it in the future.)
*/
public Builder withColumnMetadata(ColumnMetadata... columnMeta) {
this.columnMeta.addAll(Arrays.asList(columnMeta));
return this;
}

public ArrowIPCWriterOptions build() {
return new ArrowIPCWriterOptions(this);
}
Expand Down
47 changes: 47 additions & 0 deletions java/src/main/java/ai/rapids/cudf/ColumnMetadata.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
*
* Copyright (c) 2021, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package ai.rapids.cudf;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

/**
* Detailed meta data information for arrow array.
*
* (This is analogous to the native `column_metadata`.)
*/
public class ColumnMetadata {
// No getXXX for name, since it is accessed from native.
private String name;
private List<ColumnMetadata> children = new ArrayList<>();

public ColumnMetadata(final String colName) {
this.name = colName;
}

public ColumnMetadata addChildren(ColumnMetadata... childrenMeta) {
children.addAll(Arrays.asList(childrenMeta));
return this;
}

public ColumnMetadata[] getChildren() {
return children.toArray(new ColumnMetadata[children.size()]);
}
}
12 changes: 6 additions & 6 deletions java/src/main/java/ai/rapids/cudf/Table.java
Original file line number Diff line number Diff line change
Expand Up @@ -365,19 +365,19 @@ private static native long writeORCBufferBegin(String[] columnNames,

/**
* Setup everything to write Arrow IPC formatted data to a file.
* @param columnNames names that correspond to the table columns
* @param columnMeta column metadata that correspond to the table columns
* @param filename local output path
* @return a handle that is used in later calls to writeArrowIPCChunk and writeArrowIPCEnd.
*/
private static native long writeArrowIPCFileBegin(String[] columnNames, String filename);
private static native long writeArrowIPCFileBegin(ColumnMetadata[] columnMeta, String filename);

/**
* Setup everything to write Arrow IPC formatted data to a buffer.
* @param columnNames names that correspond to the table columns
* @param columnMeta column metadata that correspond to the table columns
* @param consumer consumer of host buffers produced.
* @return a handle that is used in later calls to writeArrowIPCChunk and writeArrowIPCEnd.
*/
private static native long writeArrowIPCBufferBegin(String[] columnNames,
private static native long writeArrowIPCBufferBegin(ColumnMetadata[] columnMeta,
HostBufferConsumer consumer);

/**
Expand Down Expand Up @@ -988,7 +988,7 @@ private ArrowIPCTableWriter(ArrowIPCWriterOptions options,
this.consumer = null;
this.maxChunkSize = options.getMaxChunkSize();
this.handle = writeArrowIPCFileBegin(
options.getColumnNames(),
options.getColumnMetadata(),
outputFile.getAbsolutePath());
}

Expand All @@ -998,7 +998,7 @@ private ArrowIPCTableWriter(ArrowIPCWriterOptions options,
this.consumer = consumer;
this.maxChunkSize = options.getMaxChunkSize();
this.handle = writeArrowIPCBufferBegin(
options.getColumnNames(),
options.getColumnMetadata(),
consumer);
}

Expand Down
72 changes: 52 additions & 20 deletions java/src/main/native/src/TableJni.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -203,16 +203,16 @@ typedef jni_table_writer_handle<cudf::io::orc_chunked_writer> native_orc_writer_

class native_arrow_ipc_writer_handle final {
public:
explicit native_arrow_ipc_writer_handle(const std::vector<std::string> &col_names,
explicit native_arrow_ipc_writer_handle(const std::vector<cudf::column_metadata> &col_meta,
const std::string &file_name)
: initialized(false), column_names(col_names), file_name(file_name) {}
: initialized(false), column_metadata(col_meta), file_name(file_name) {}

explicit native_arrow_ipc_writer_handle(const std::vector<std::string> &col_names,
explicit native_arrow_ipc_writer_handle(const std::vector<cudf::column_metadata> &col_meta,
const std::shared_ptr<arrow::io::OutputStream> &sink)
: initialized(false), column_names(col_names), file_name(""), sink(sink) {}
: initialized(false), column_metadata(col_meta), file_name(""), sink(sink) {}

bool initialized;
std::vector<std::string> column_names;
std::vector<cudf::column_metadata> column_metadata;
std::string file_name;
std::shared_ptr<arrow::io::OutputStream> sink;
std::shared_ptr<arrow::ipc::RecordBatchWriter> writer;
Expand Down Expand Up @@ -563,6 +563,42 @@ bool valid_window_parameters(native_jintArray const &values,
values.size() == preceding.size() && values.size() == following.size();
}

static void build_one_column_metadata(JNIEnv *env, jobject meta_obj,
jfieldID name_id,
jmethodID children_mid,
cudf::column_metadata& out) {
// get column name
cudf::jni::native_jstring col_name(env, (jstring)env->GetObjectField(meta_obj, name_id));
jlowe marked this conversation as resolved.
Show resolved Hide resolved
out.name = std::string(col_name.get() == NULL ? "" : col_name.get());
// children
jobjectArray j_children_meta = (jobjectArray)env->CallObjectMethod(meta_obj, children_mid);
cudf::jni::native_jobjectArray<jobject> children_meta(env, j_children_meta);
for (int i = 0; i < children_meta.size(); i ++) {
cudf::column_metadata cudf_col_meta;
build_one_column_metadata(env, children_meta.get(i), name_id, children_mid, cudf_col_meta);
out.children_meta.push_back(std::move(cudf_col_meta));
}
}

static void build_column_metadata_from_handle(JNIEnv *env, jobjectArray meta_handle,
std::vector<cudf::column_metadata>& out_meta) {
native_jobjectArray<jobject> col_meta(env, meta_handle);
out_meta.reserve(col_meta.size());
// Init the column meatadata class
jclass col_meta_cls = env->FindClass("ai/rapids/cudf/ColumnMetadata");
JNI_NULL_CHECK(env, col_meta_cls, "Can not find class: ai/rapids/cudf/ColumnMetadata", );
jlowe marked this conversation as resolved.
Show resolved Hide resolved
jfieldID name_id = env->GetFieldID(col_meta_cls, "name", "Ljava/lang/String;");
jmethodID children_mid =
env->GetMethodID(col_meta_cls, "getChildren", "()[Lai/rapids/cudf/ColumnMetadata;");
jlowe marked this conversation as resolved.
Show resolved Hide resolved

for(int x = 0; x < col_meta.size(); x ++) {
cudf::column_metadata cudf_col_meta;
build_one_column_metadata(env, col_meta.get(x), name_id, children_mid, cudf_col_meta);
out_meta.push_back(std::move(cudf_col_meta));
}
cudf::jni::check_java_exception(env);
}

} // namespace

} // namespace jni
Expand Down Expand Up @@ -1196,36 +1232,37 @@ JNIEXPORT void JNICALL Java_ai_rapids_cudf_Table_writeORCEnd(JNIEnv *env, jclass
}

JNIEXPORT long JNICALL Java_ai_rapids_cudf_Table_writeArrowIPCBufferBegin(JNIEnv *env, jclass,
jobjectArray j_col_names,
jobjectArray j_col_meta,
jobject consumer) {
JNI_NULL_CHECK(env, j_col_names, "null columns", 0);
JNI_NULL_CHECK(env, j_col_meta, "null columns", 0);
JNI_NULL_CHECK(env, consumer, "null consumer", 0);
try {
cudf::jni::auto_set_device(env);
cudf::jni::native_jstringArray col_names(env, j_col_names);

std::shared_ptr<cudf::jni::jni_arrow_output_stream> data_sink(
new cudf::jni::jni_arrow_output_stream(env, consumer));
std::vector<cudf::column_metadata> col_meta;
cudf::jni::build_column_metadata_from_handle(env, j_col_meta, col_meta);

cudf::jni::native_arrow_ipc_writer_handle *ret =
new cudf::jni::native_arrow_ipc_writer_handle(col_names.as_cpp_vector(), data_sink);
new cudf::jni::native_arrow_ipc_writer_handle(col_meta, data_sink);
return reinterpret_cast<jlong>(ret);
}
CATCH_STD(env, 0)
}

JNIEXPORT long JNICALL Java_ai_rapids_cudf_Table_writeArrowIPCFileBegin(JNIEnv *env, jclass,
jobjectArray j_col_names,
jobjectArray j_col_meta,
jstring j_output_path) {
JNI_NULL_CHECK(env, j_col_names, "null columns", 0);
JNI_NULL_CHECK(env, j_col_meta, "null columns", 0);
JNI_NULL_CHECK(env, j_output_path, "null output path", 0);
try {
cudf::jni::auto_set_device(env);
cudf::jni::native_jstringArray col_names(env, j_col_names);
cudf::jni::native_jstring output_path(env, j_output_path);
std::vector<cudf::column_metadata> col_meta;
cudf::jni::build_column_metadata_from_handle(env, j_col_meta, col_meta);

cudf::jni::native_arrow_ipc_writer_handle *ret =
new cudf::jni::native_arrow_ipc_writer_handle(col_names.as_cpp_vector(), output_path.get());
new cudf::jni::native_arrow_ipc_writer_handle(col_meta, output_path.get());
return reinterpret_cast<jlong>(ret);
}
CATCH_STD(env, 0)
Expand All @@ -1245,12 +1282,7 @@ JNIEXPORT jlong JNICALL Java_ai_rapids_cudf_Table_convertCudfToArrowTable(JNIEnv
cudf::jni::auto_set_device(env);
std::unique_ptr<std::shared_ptr<arrow::Table>> result(
new std::shared_ptr<arrow::Table>(nullptr));
auto column_metadata = std::vector<cudf::column_metadata>{};
column_metadata.reserve(state->column_names.size());
std::transform(std::begin(state->column_names), std::end(state->column_names),
std::back_inserter(column_metadata),
[](auto const &column_name) { return cudf::column_metadata{column_name}; });
*result = cudf::to_arrow(*tview, column_metadata);
*result = cudf::to_arrow(*tview, state->column_metadata);
if (!result->get()) {
return 0;
}
Expand Down
56 changes: 45 additions & 11 deletions java/src/test/java/ai/rapids/cudf/TableTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -4056,15 +4056,38 @@ void testTableBasedFilter() {
}

private Table getExpectedFileTable() {
return new TestBuilder()
.column(true, false, false, true, false)
.column(5, 1, 0, 2, 7)
.column(new Byte[]{2, 3, 4, 5, 9})
.column(3l, 9l, 4l, 2l, 20l)
.column("this", "is", "a", "test", "string")
.column(1.0f, 3.5f, 5.9f, 7.1f, 9.8f)
.column(5.0d, 9.5d, 0.9d, 7.23d, 2.8d)
.build();
return getExpectedFileTable(false);
}

private Table getExpectedFileTable(boolean withNestedColumns) {
TestBuilder tb = new TestBuilder()
.column(true, false, false, true, false)
.column(5, 1, 0, 2, 7)
.column(new Byte[]{2, 3, 4, 5, 9})
.column(3l, 9l, 4l, 2l, 20l)
.column("this", "is", "a", "test", "string")
.column(1.0f, 3.5f, 5.9f, 7.1f, 9.8f)
.column(5.0d, 9.5d, 0.9d, 7.23d, 2.8d);
if (withNestedColumns) {
StructType nestedType = new StructType(true,
new BasicType(false, DType.INT32), new BasicType(false, DType.STRING));
tb.column(nestedType,
struct(1, "k1"), struct(2, "k2"), struct(3, "k3"),
struct(4, "k4"), new HostColumnVector.StructData((List) null))
.column(new ListType(false, new BasicType(false, DType.INT32)),
Arrays.asList(1, 2),
Arrays.asList(3, 4),
Arrays.asList(5),
Arrays.asList(6, 7),
Arrays.asList(8, 9, 10))
.column(new ListType(false, nestedType),
Arrays.asList(struct(1, "k1"), struct(2, "k2"), struct(3, "k3")),
Arrays.asList(struct(4, "k4"), struct(5, "k5")),
Arrays.asList(struct(6, "k6")),
Arrays.asList(new HostColumnVector.StructData((List) null)),
Arrays.asList());
}
return tb.build();
}

private Table getExpectedFileTableWithDecimals() {
Expand Down Expand Up @@ -4272,10 +4295,21 @@ void testArrowIPCWriteToFileWithNamesAndMetadata() throws IOException {

@Test
void testArrowIPCWriteToBufferChunked() {
try (Table table0 = getExpectedFileTable();
try (Table table0 = getExpectedFileTable(true);
MyBufferConsumer consumer = new MyBufferConsumer()) {
ColumnMetadata[] colMeta = Arrays.asList("first", "second", "third", "fourth", "fifth",
"sixth", "seventh").stream().map(ColumnMetadata::new)
.toArray(ColumnMetadata[]::new);
ColumnMetadata structCM = new ColumnMetadata("eighth");
structCM.addChildren(new ColumnMetadata("id"), new ColumnMetadata("name"));
ColumnMetadata arrayPriCM = new ColumnMetadata("ninth");
// Array type needs a stub metadata for the offset column
arrayPriCM.addChildren(new ColumnMetadata(null), new ColumnMetadata("aid"));
ColumnMetadata arrayStructCM = new ColumnMetadata("tenth");
arrayStructCM.addChildren(new ColumnMetadata(null), structCM);
ArrowIPCWriterOptions options = ArrowIPCWriterOptions.builder()
.withColumnNames("first", "second", "third", "fourth", "fifth", "sixth", "seventh")
.withColumnMetadata(colMeta)
.withColumnMetadata(structCM, arrayPriCM, arrayStructCM)
.build();
try (TableWriter writer = Table.writeArrowIPCChunked(options, consumer)) {
writer.write(table0);
Expand Down