Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flink: Pre-create fieldGetters to avoid constructing them for each row #10565

Merged
merged 4 commits into from
Jul 10, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Flink: Pre-create fieldGetters to avoid constructing them for each row
  • Loading branch information
fengjiajie committed Jun 25, 2024
commit da0213cb8bed3855d894db9985200f5fa5115cbb
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,11 @@ public static Object convertConstant(Type type, Object value) {
* the arity check.
*/
public static RowData clone(
RowData from, RowData reuse, RowType rowType, TypeSerializer[] fieldSerializers) {
RowData from,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please create a deprecated version of the original public method to keep compatibility.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the reminder! I've added the original function below. The reason I didn't directly use the new function is that it requires FieldGetter for all fields, while the old function only needs to construct FieldGetter for fields with values. Constructing FieldGetter for all fields might negatively impact performance.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel that performance degradation is acceptable for the deprecated methods if it is justified otherwise. The number of the duplicated lines seems high enough for me to try to avoid them.

The users could always do the caching of the getters themselves, or find another solution soon, since we remove the deprecated methods regularly.

WDYT?

Also CC-ed, @stevenzwu, who might be also interested in this PR

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK

RowData reuse,
RowType rowType,
TypeSerializer[] fieldSerializers,
RowData.FieldGetter[] fieldGetters) {
GenericRowData ret;
if (reuse instanceof GenericRowData) {
ret = (GenericRowData) reuse;
Expand All @@ -89,8 +93,7 @@ public static RowData clone(
ret.setRowKind(from.getRowKind());
for (int i = 0; i < rowType.getFieldCount(); i++) {
if (!from.isNullAt(i)) {
RowData.FieldGetter getter = RowData.createFieldGetter(rowType.getTypeAt(i), i);
ret.setField(i, fieldSerializers[i].copy(getter.getFieldOrNull(from)));
ret.setField(i, fieldSerializers[i].copy(fieldGetters[i].getFieldOrNull(from)));
} else {
ret.setField(i, null);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,12 @@
class RowDataRecordFactory implements RecordFactory<RowData> {
private final RowType rowType;
private final TypeSerializer[] fieldSerializers;
private final RowData.FieldGetter[] fieldGetters;

RowDataRecordFactory(RowType rowType) {
this.rowType = rowType;
this.fieldSerializers = createFieldSerializers(rowType);
this.fieldGetters = createFieldGetters(rowType);
}

static TypeSerializer[] createFieldSerializers(RowType rowType) {
Expand All @@ -40,6 +42,14 @@ static TypeSerializer[] createFieldSerializers(RowType rowType) {
.toArray(TypeSerializer[]::new);
}

static RowData.FieldGetter[] createFieldGetters(RowType rowType) {
RowData.FieldGetter[] fieldGetters = new RowData.FieldGetter[rowType.getFieldCount()];
for (int i = 0; i < rowType.getFieldCount(); ++i) {
fieldGetters[i] = RowData.createFieldGetter(rowType.getTypeAt(i), i);
}
return fieldGetters;
fengjiajie marked this conversation as resolved.
Show resolved Hide resolved
}

@Override
public RowData[] createBatch(int batchSize) {
RowData[] arr = new RowData[batchSize];
Expand All @@ -56,6 +66,7 @@ public void clone(RowData from, RowData[] batch, int position) {
// Clone method will allocate a new GenericRowData object
// if the target object is NOT a GenericRowData.
// So we should always set the clone return value back to the array.
batch[position] = RowDataUtil.clone(from, batch[position], rowType, fieldSerializers);
batch[position] =
RowDataUtil.clone(from, batch[position], rowType, fieldSerializers, fieldGetters);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,11 @@ public static RowData copyRowData(RowData from, RowType rowType) {
rowType.getChildren().stream()
.map((LogicalType type) -> InternalSerializers.create(type))
.toArray(TypeSerializer[]::new);
return RowDataUtil.clone(from, null, rowType, fieldSerializers);
RowData.FieldGetter[] fieldGetters = new RowData.FieldGetter[rowType.getFieldCount()];
for (int i = 0; i < rowType.getFieldCount(); ++i) {
fieldGetters[i] = RowData.createFieldGetter(rowType.getTypeAt(i), i);
}
return RowDataUtil.clone(from, null, rowType, fieldSerializers, fieldGetters);
fengjiajie marked this conversation as resolved.
Show resolved Hide resolved
}

public static void readRowData(FlinkInputFormat input, Consumer<RowData> visitor)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,11 @@ public static Object convertConstant(Type type, Object value) {
* the arity check.
*/
public static RowData clone(
RowData from, RowData reuse, RowType rowType, TypeSerializer[] fieldSerializers) {
RowData from,
RowData reuse,
RowType rowType,
TypeSerializer[] fieldSerializers,
RowData.FieldGetter[] fieldGetters) {
GenericRowData ret;
if (reuse instanceof GenericRowData) {
ret = (GenericRowData) reuse;
Expand All @@ -89,8 +93,7 @@ public static RowData clone(
ret.setRowKind(from.getRowKind());
for (int i = 0; i < rowType.getFieldCount(); i++) {
if (!from.isNullAt(i)) {
RowData.FieldGetter getter = RowData.createFieldGetter(rowType.getTypeAt(i), i);
ret.setField(i, fieldSerializers[i].copy(getter.getFieldOrNull(from)));
ret.setField(i, fieldSerializers[i].copy(fieldGetters[i].getFieldOrNull(from)));
} else {
ret.setField(i, null);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,12 @@
class RowDataRecordFactory implements RecordFactory<RowData> {
private final RowType rowType;
private final TypeSerializer[] fieldSerializers;
private final RowData.FieldGetter[] fieldGetters;

RowDataRecordFactory(RowType rowType) {
this.rowType = rowType;
this.fieldSerializers = createFieldSerializers(rowType);
this.fieldGetters = createFieldGetters(rowType);
}

static TypeSerializer[] createFieldSerializers(RowType rowType) {
Expand All @@ -40,6 +42,14 @@ static TypeSerializer[] createFieldSerializers(RowType rowType) {
.toArray(TypeSerializer[]::new);
}

static RowData.FieldGetter[] createFieldGetters(RowType rowType) {
RowData.FieldGetter[] fieldGetters = new RowData.FieldGetter[rowType.getFieldCount()];
for (int i = 0; i < rowType.getFieldCount(); ++i) {
fieldGetters[i] = RowData.createFieldGetter(rowType.getTypeAt(i), i);
}
return fieldGetters;
}

@Override
public RowData[] createBatch(int batchSize) {
RowData[] arr = new RowData[batchSize];
Expand All @@ -56,6 +66,7 @@ public void clone(RowData from, RowData[] batch, int position) {
// Clone method will allocate a new GenericRowData object
// if the target object is NOT a GenericRowData.
// So we should always set the clone return value back to the array.
batch[position] = RowDataUtil.clone(from, batch[position], rowType, fieldSerializers);
batch[position] =
RowDataUtil.clone(from, batch[position], rowType, fieldSerializers, fieldGetters);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,11 @@ public static RowData copyRowData(RowData from, RowType rowType) {
rowType.getChildren().stream()
.map((LogicalType type) -> InternalSerializers.create(type))
.toArray(TypeSerializer[]::new);
return RowDataUtil.clone(from, null, rowType, fieldSerializers);
RowData.FieldGetter[] fieldGetters = new RowData.FieldGetter[rowType.getFieldCount()];
for (int i = 0; i < rowType.getFieldCount(); ++i) {
fieldGetters[i] = RowData.createFieldGetter(rowType.getTypeAt(i), i);
}
return RowDataUtil.clone(from, null, rowType, fieldSerializers, fieldGetters);
}

public static void readRowData(FlinkInputFormat input, Consumer<RowData> visitor)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,11 @@ public static Object convertConstant(Type type, Object value) {
* the arity check.
*/
public static RowData clone(
RowData from, RowData reuse, RowType rowType, TypeSerializer[] fieldSerializers) {
RowData from,
RowData reuse,
RowType rowType,
TypeSerializer[] fieldSerializers,
RowData.FieldGetter[] fieldGetters) {
GenericRowData ret;
if (reuse instanceof GenericRowData) {
ret = (GenericRowData) reuse;
Expand All @@ -89,8 +93,7 @@ public static RowData clone(
ret.setRowKind(from.getRowKind());
for (int i = 0; i < rowType.getFieldCount(); i++) {
if (!from.isNullAt(i)) {
RowData.FieldGetter getter = RowData.createFieldGetter(rowType.getTypeAt(i), i);
ret.setField(i, fieldSerializers[i].copy(getter.getFieldOrNull(from)));
ret.setField(i, fieldSerializers[i].copy(fieldGetters[i].getFieldOrNull(from)));
} else {
ret.setField(i, null);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,12 @@
class RowDataRecordFactory implements RecordFactory<RowData> {
private final RowType rowType;
private final TypeSerializer[] fieldSerializers;
private final RowData.FieldGetter[] fieldGetters;

RowDataRecordFactory(RowType rowType) {
this.rowType = rowType;
this.fieldSerializers = createFieldSerializers(rowType);
this.fieldGetters = createFieldGetters(rowType);
}

static TypeSerializer[] createFieldSerializers(RowType rowType) {
Expand All @@ -40,6 +42,14 @@ static TypeSerializer[] createFieldSerializers(RowType rowType) {
.toArray(TypeSerializer[]::new);
}

static RowData.FieldGetter[] createFieldGetters(RowType rowType) {
RowData.FieldGetter[] fieldGetters = new RowData.FieldGetter[rowType.getFieldCount()];
for (int i = 0; i < rowType.getFieldCount(); ++i) {
fieldGetters[i] = RowData.createFieldGetter(rowType.getTypeAt(i), i);
}
return fieldGetters;
}

@Override
public RowData[] createBatch(int batchSize) {
RowData[] arr = new RowData[batchSize];
Expand All @@ -56,6 +66,7 @@ public void clone(RowData from, RowData[] batch, int position) {
// Clone method will allocate a new GenericRowData object
// if the target object is NOT a GenericRowData.
// So we should always set the clone return value back to the array.
batch[position] = RowDataUtil.clone(from, batch[position], rowType, fieldSerializers);
batch[position] =
RowDataUtil.clone(from, batch[position], rowType, fieldSerializers, fieldGetters);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,11 @@ public static RowData copyRowData(RowData from, RowType rowType) {
rowType.getChildren().stream()
.map((LogicalType type) -> InternalSerializers.create(type))
.toArray(TypeSerializer[]::new);
return RowDataUtil.clone(from, null, rowType, fieldSerializers);
RowData.FieldGetter[] fieldGetters = new RowData.FieldGetter[rowType.getFieldCount()];
for (int i = 0; i < rowType.getFieldCount(); ++i) {
fieldGetters[i] = RowData.createFieldGetter(rowType.getTypeAt(i), i);
}
return RowDataUtil.clone(from, null, rowType, fieldSerializers, fieldGetters);
}

public static void readRowData(FlinkInputFormat input, Consumer<RowData> visitor)
Expand Down