Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flink: Pre-create fieldGetters to avoid constructing them for each row #10565

Merged
merged 4 commits into from
Jul 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -79,22 +79,45 @@ public static Object convertConstant(Type type, Object value) {
* the arity check.
*/
public static RowData clone(
RowData from, RowData reuse, RowType rowType, TypeSerializer[] fieldSerializers) {
RowData from,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please create a deprecated version of the original public method to keep compatibility.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the reminder! I've added the original function below. The reason I didn't directly use the new function is that it requires FieldGetter for all fields, while the old function only needs to construct FieldGetter for fields with values. Constructing FieldGetter for all fields might negatively impact performance.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel that performance degradation is acceptable for the deprecated methods if it is justified otherwise. The number of the duplicated lines seems high enough for me to try to avoid them.

The users could always do the caching of the getters themselves, or find another solution soon, since we remove the deprecated methods regularly.

WDYT?

Also CC-ed, @stevenzwu, who might be also interested in this PR

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK

RowData reuse,
RowType rowType,
TypeSerializer[] fieldSerializers,
RowData.FieldGetter[] fieldGetters) {
GenericRowData ret;
if (reuse instanceof GenericRowData) {
ret = (GenericRowData) reuse;
} else {
ret = new GenericRowData(from.getArity());
}

ret.setRowKind(from.getRowKind());
for (int i = 0; i < rowType.getFieldCount(); i++) {
if (!from.isNullAt(i)) {
RowData.FieldGetter getter = RowData.createFieldGetter(rowType.getTypeAt(i), i);
ret.setField(i, fieldSerializers[i].copy(getter.getFieldOrNull(from)));
ret.setField(i, fieldSerializers[i].copy(fieldGetters[i].getFieldOrNull(from)));
} else {
ret.setField(i, null);
}
}

return ret;
}

/**
* @deprecated will be removed in 1.7.0; Not reusing FieldGetter in this method could lead to
* performance degradation, use {@link #clone(RowData, RowData, RowType, TypeSerializer[],
* RowData.FieldGetter[])} instead.
*/
@Deprecated
public static RowData clone(
RowData from, RowData reuse, RowType rowType, TypeSerializer[] fieldSerializers) {
RowData.FieldGetter[] fieldGetters = new RowData.FieldGetter[rowType.getFieldCount()];
for (int i = 0; i < rowType.getFieldCount(); ++i) {
if (!from.isNullAt(i)) {
fieldGetters[i] = RowData.createFieldGetter(rowType.getTypeAt(i), i);
}
}

return clone(from, reuse, rowType, fieldSerializers, fieldGetters);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,12 @@
class RowDataRecordFactory implements RecordFactory<RowData> {
private final RowType rowType;
private final TypeSerializer[] fieldSerializers;
private final RowData.FieldGetter[] fieldGetters;

RowDataRecordFactory(RowType rowType) {
this.rowType = rowType;
this.fieldSerializers = createFieldSerializers(rowType);
this.fieldGetters = createFieldGetters(rowType);
}

static TypeSerializer[] createFieldSerializers(RowType rowType) {
Expand All @@ -40,6 +42,15 @@ static TypeSerializer[] createFieldSerializers(RowType rowType) {
.toArray(TypeSerializer[]::new);
}

static RowData.FieldGetter[] createFieldGetters(RowType rowType) {
RowData.FieldGetter[] fieldGetters = new RowData.FieldGetter[rowType.getFieldCount()];
for (int i = 0; i < rowType.getFieldCount(); ++i) {
fieldGetters[i] = RowData.createFieldGetter(rowType.getTypeAt(i), i);
}

return fieldGetters;
fengjiajie marked this conversation as resolved.
Show resolved Hide resolved
}

@Override
public RowData[] createBatch(int batchSize) {
RowData[] arr = new RowData[batchSize];
Expand All @@ -56,6 +67,7 @@ public void clone(RowData from, RowData[] batch, int position) {
// Clone method will allocate a new GenericRowData object
// if the target object is NOT a GenericRowData.
// So we should always set the clone return value back to the array.
batch[position] = RowDataUtil.clone(from, batch[position], rowType, fieldSerializers);
batch[position] =
RowDataUtil.clone(from, batch[position], rowType, fieldSerializers, fieldGetters);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,12 @@ public static RowData copyRowData(RowData from, RowType rowType) {
rowType.getChildren().stream()
.map((LogicalType type) -> InternalSerializers.create(type))
.toArray(TypeSerializer[]::new);
return RowDataUtil.clone(from, null, rowType, fieldSerializers);
RowData.FieldGetter[] fieldGetters = new RowData.FieldGetter[rowType.getFieldCount()];
for (int i = 0; i < rowType.getFieldCount(); ++i) {
fieldGetters[i] = RowData.createFieldGetter(rowType.getTypeAt(i), i);
}

return RowDataUtil.clone(from, null, rowType, fieldSerializers, fieldGetters);
fengjiajie marked this conversation as resolved.
Show resolved Hide resolved
}

public static void readRowData(FlinkInputFormat input, Consumer<RowData> visitor)
Expand Down