Skip to content

Commit

Permalink
Flink: Backport apache#10565 to v1.18 and v1.19
Browse files Browse the repository at this point in the history
  • Loading branch information
fengjiajie committed Jul 10, 2024
1 parent 4714bbf commit e19d698
Show file tree
Hide file tree
Showing 6 changed files with 90 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -79,22 +79,45 @@ public static Object convertConstant(Type type, Object value) {
* the arity check.
*/
public static RowData clone(
RowData from, RowData reuse, RowType rowType, TypeSerializer[] fieldSerializers) {
RowData from,
RowData reuse,
RowType rowType,
TypeSerializer[] fieldSerializers,
RowData.FieldGetter[] fieldGetters) {
GenericRowData ret;
if (reuse instanceof GenericRowData) {
ret = (GenericRowData) reuse;
} else {
ret = new GenericRowData(from.getArity());
}

ret.setRowKind(from.getRowKind());
for (int i = 0; i < rowType.getFieldCount(); i++) {
if (!from.isNullAt(i)) {
RowData.FieldGetter getter = RowData.createFieldGetter(rowType.getTypeAt(i), i);
ret.setField(i, fieldSerializers[i].copy(getter.getFieldOrNull(from)));
ret.setField(i, fieldSerializers[i].copy(fieldGetters[i].getFieldOrNull(from)));
} else {
ret.setField(i, null);
}
}

return ret;
}

/**
* @deprecated will be removed in 1.7.0; Not reusing FieldGetter in this method could lead to
* performance degradation, use {@link #clone(RowData, RowData, RowType, TypeSerializer[],
* RowData.FieldGetter[])} instead.
*/
@Deprecated
public static RowData clone(
RowData from, RowData reuse, RowType rowType, TypeSerializer[] fieldSerializers) {
RowData.FieldGetter[] fieldGetters = new RowData.FieldGetter[rowType.getFieldCount()];
for (int i = 0; i < rowType.getFieldCount(); ++i) {
if (!from.isNullAt(i)) {
fieldGetters[i] = RowData.createFieldGetter(rowType.getTypeAt(i), i);
}
}

return clone(from, reuse, rowType, fieldSerializers, fieldGetters);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,12 @@
class RowDataRecordFactory implements RecordFactory<RowData> {
private final RowType rowType;
private final TypeSerializer[] fieldSerializers;
private final RowData.FieldGetter[] fieldGetters;

RowDataRecordFactory(RowType rowType) {
this.rowType = rowType;
this.fieldSerializers = createFieldSerializers(rowType);
this.fieldGetters = createFieldGetters(rowType);
}

static TypeSerializer[] createFieldSerializers(RowType rowType) {
Expand All @@ -40,6 +42,15 @@ static TypeSerializer[] createFieldSerializers(RowType rowType) {
.toArray(TypeSerializer[]::new);
}

static RowData.FieldGetter[] createFieldGetters(RowType rowType) {
RowData.FieldGetter[] fieldGetters = new RowData.FieldGetter[rowType.getFieldCount()];
for (int i = 0; i < rowType.getFieldCount(); ++i) {
fieldGetters[i] = RowData.createFieldGetter(rowType.getTypeAt(i), i);
}

return fieldGetters;
}

@Override
public RowData[] createBatch(int batchSize) {
RowData[] arr = new RowData[batchSize];
Expand All @@ -56,6 +67,7 @@ public void clone(RowData from, RowData[] batch, int position) {
// Clone method will allocate a new GenericRowData object
// if the target object is NOT a GenericRowData.
// So we should always set the clone return value back to the array.
batch[position] = RowDataUtil.clone(from, batch[position], rowType, fieldSerializers);
batch[position] =
RowDataUtil.clone(from, batch[position], rowType, fieldSerializers, fieldGetters);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,12 @@ public static RowData copyRowData(RowData from, RowType rowType) {
rowType.getChildren().stream()
.map((LogicalType type) -> InternalSerializers.create(type))
.toArray(TypeSerializer[]::new);
return RowDataUtil.clone(from, null, rowType, fieldSerializers);
RowData.FieldGetter[] fieldGetters = new RowData.FieldGetter[rowType.getFieldCount()];
for (int i = 0; i < rowType.getFieldCount(); ++i) {
fieldGetters[i] = RowData.createFieldGetter(rowType.getTypeAt(i), i);
}

return RowDataUtil.clone(from, null, rowType, fieldSerializers, fieldGetters);
}

public static void readRowData(FlinkInputFormat input, Consumer<RowData> visitor)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,22 +79,45 @@ public static Object convertConstant(Type type, Object value) {
* the arity check.
*/
public static RowData clone(
RowData from, RowData reuse, RowType rowType, TypeSerializer[] fieldSerializers) {
RowData from,
RowData reuse,
RowType rowType,
TypeSerializer[] fieldSerializers,
RowData.FieldGetter[] fieldGetters) {
GenericRowData ret;
if (reuse instanceof GenericRowData) {
ret = (GenericRowData) reuse;
} else {
ret = new GenericRowData(from.getArity());
}

ret.setRowKind(from.getRowKind());
for (int i = 0; i < rowType.getFieldCount(); i++) {
if (!from.isNullAt(i)) {
RowData.FieldGetter getter = RowData.createFieldGetter(rowType.getTypeAt(i), i);
ret.setField(i, fieldSerializers[i].copy(getter.getFieldOrNull(from)));
ret.setField(i, fieldSerializers[i].copy(fieldGetters[i].getFieldOrNull(from)));
} else {
ret.setField(i, null);
}
}

return ret;
}

/**
* @deprecated will be removed in 1.7.0; Not reusing FieldGetter in this method could lead to
* performance degradation, use {@link #clone(RowData, RowData, RowType, TypeSerializer[],
* RowData.FieldGetter[])} instead.
*/
@Deprecated
public static RowData clone(
RowData from, RowData reuse, RowType rowType, TypeSerializer[] fieldSerializers) {
RowData.FieldGetter[] fieldGetters = new RowData.FieldGetter[rowType.getFieldCount()];
for (int i = 0; i < rowType.getFieldCount(); ++i) {
if (!from.isNullAt(i)) {
fieldGetters[i] = RowData.createFieldGetter(rowType.getTypeAt(i), i);
}
}

return clone(from, reuse, rowType, fieldSerializers, fieldGetters);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,12 @@
class RowDataRecordFactory implements RecordFactory<RowData> {
private final RowType rowType;
private final TypeSerializer[] fieldSerializers;
private final RowData.FieldGetter[] fieldGetters;

RowDataRecordFactory(RowType rowType) {
this.rowType = rowType;
this.fieldSerializers = createFieldSerializers(rowType);
this.fieldGetters = createFieldGetters(rowType);
}

static TypeSerializer[] createFieldSerializers(RowType rowType) {
Expand All @@ -40,6 +42,15 @@ static TypeSerializer[] createFieldSerializers(RowType rowType) {
.toArray(TypeSerializer[]::new);
}

static RowData.FieldGetter[] createFieldGetters(RowType rowType) {
RowData.FieldGetter[] fieldGetters = new RowData.FieldGetter[rowType.getFieldCount()];
for (int i = 0; i < rowType.getFieldCount(); ++i) {
fieldGetters[i] = RowData.createFieldGetter(rowType.getTypeAt(i), i);
}

return fieldGetters;
}

@Override
public RowData[] createBatch(int batchSize) {
RowData[] arr = new RowData[batchSize];
Expand All @@ -56,6 +67,7 @@ public void clone(RowData from, RowData[] batch, int position) {
// Clone method will allocate a new GenericRowData object
// if the target object is NOT a GenericRowData.
// So we should always set the clone return value back to the array.
batch[position] = RowDataUtil.clone(from, batch[position], rowType, fieldSerializers);
batch[position] =
RowDataUtil.clone(from, batch[position], rowType, fieldSerializers, fieldGetters);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,12 @@ public static RowData copyRowData(RowData from, RowType rowType) {
rowType.getChildren().stream()
.map((LogicalType type) -> InternalSerializers.create(type))
.toArray(TypeSerializer[]::new);
return RowDataUtil.clone(from, null, rowType, fieldSerializers);
RowData.FieldGetter[] fieldGetters = new RowData.FieldGetter[rowType.getFieldCount()];
for (int i = 0; i < rowType.getFieldCount(); ++i) {
fieldGetters[i] = RowData.createFieldGetter(rowType.getTypeAt(i), i);
}

return RowDataUtil.clone(from, null, rowType, fieldSerializers, fieldGetters);
}

public static void readRowData(FlinkInputFormat input, Consumer<RowData> visitor)
Expand Down

0 comments on commit e19d698

Please sign in to comment.