Skip to content

Commit

Permalink
[FLINK-23025][upsert-kafka] Fix upsert-kafka produce duplicates when …
Browse files Browse the repository at this point in the history
…enable object reuse

This closes apache#16257
  • Loading branch information
fsk119 committed Jun 24, 2021
1 parent 2da9760 commit 6defc99
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -132,8 +132,8 @@ public void open(Configuration parameters) throws Exception {
consumedRowDataTypeInfo.createSerializer(getRuntimeContext().getExecutionConfig());
this.valueCopier =
getRuntimeContext().getExecutionConfig().isObjectReuseEnabled()
? Function.identity()
: typeSerializer::copy;
? typeSerializer::copy
: Function.identity();

// register timer
this.scheduler =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,19 @@
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.runtime.connector.sink.SinkRuntimeProviderContext;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.runtime.typeutils.RowDataSerializer;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;

Expand All @@ -54,8 +59,14 @@
import static org.junit.Assert.assertTrue;

/** Test for {@link BufferedUpsertSinkFunction}. */
@RunWith(Parameterized.class)
public class BufferedUpsertSinkFunctionTest {

@Parameterized.Parameters(name = "object reuse = {0}")
public static Object[] enableObjectReuse() {
return new Boolean[] {true, false};
}

private static final ResolvedSchema SCHEMA =
ResolvedSchema.of(
Column.physical("id", DataTypes.INT().notNull()),
Expand Down Expand Up @@ -129,17 +140,23 @@ public class BufferedUpsertSinkFunctionTest {
TimestampData.fromInstant(Instant.parse("2021-03-30T21:00:00Z")))
};

private final boolean enableObjectReuse;

public BufferedUpsertSinkFunctionTest(boolean enableObjectReuse) {
this.enableObjectReuse = enableObjectReuse;
}

@Test
public void testWriteData() throws Exception {
MockedSinkFunction sinkFunction = new MockedSinkFunction();
BufferedUpsertSinkFunction bufferedSink = createBufferedSink(sinkFunction);

// write 3 records which doesn't trigger batch size
writeData(bufferedSink, TEST_DATA, 0, 3);
writeData(bufferedSink, new ReusableIterator(0, 3));
assertTrue(sinkFunction.rowDataCollectors.isEmpty());

// write one more record, and should flush the buffer
writeData(bufferedSink, TEST_DATA, 3, 1);
writeData(bufferedSink, new ReusableIterator(3, 1));

HashMap<Integer, List<RowData>> expected = new HashMap<>();
expected.put(
Expand Down Expand Up @@ -180,7 +197,7 @@ public void testWriteData() throws Exception {

sinkFunction.rowDataCollectors.clear();
// write remaining data, and they are still buffered
writeData(bufferedSink, TEST_DATA, 4, 3);
writeData(bufferedSink, new ReusableIterator(4, 3));
assertTrue(sinkFunction.rowDataCollectors.isEmpty());
}

Expand All @@ -189,7 +206,7 @@ public void testFlushDataWhenCheckpointing() throws Exception {
MockedSinkFunction sinkFunction = new MockedSinkFunction();
BufferedUpsertSinkFunction bufferedFunction = createBufferedSink(sinkFunction);
// write all data, there should be 3 records are still buffered
writeData(bufferedFunction, TEST_DATA, 0, TEST_DATA.length);
writeData(bufferedFunction, new ReusableIterator(0, TEST_DATA.length));
// snapshot should flush the buffer
bufferedFunction.snapshotState(null);

Expand Down Expand Up @@ -257,16 +274,15 @@ private BufferedUpsertSinkFunction createBufferedSink(MockedSinkFunction sinkFun
typeInformation,
BUFFER_FLUSH_MODE);
bufferedSinkFunction.open(new Configuration());

return bufferedSinkFunction;
}

private void writeData(BufferedUpsertSinkFunction sink, RowData[] data, int startPos, int size)
private void writeData(BufferedUpsertSinkFunction sink, Iterator<RowData> iterator)
throws Exception {
for (int i = startPos; i < startPos + size; i++) {
RowData row = data[i];
long rowtime = row.getTimestamp(TIMESTAMP_INDICES, 3).getMillisecond();
sink.invoke(row, SinkContextUtil.forTimestamp(rowtime));
while (iterator.hasNext()) {
RowData next = iterator.next();
long rowtime = next.getTimestamp(TIMESTAMP_INDICES, 3).getMillisecond();
sink.invoke(next, SinkContextUtil.forTimestamp(rowtime));
}
}

Expand All @@ -287,15 +303,22 @@ private void compareCompactedResult(

// --------------------------------------------------------------------------------------------

private static class MockedSinkFunction extends RichSinkFunction<RowData>
private class MockedSinkFunction extends RichSinkFunction<RowData>
implements CheckpointedFunction, CheckpointListener {

private static final long serialVersionUID = 1L;
private final RuntimeContext context = new MockStreamingRuntimeContext(true, 1, 1);
transient List<RowData> rowDataCollectors;

MockedSinkFunction() {
if (enableObjectReuse) {
context.getExecutionConfig().enableObjectReuse();
}
}

@Override
public RuntimeContext getRuntimeContext() {
return new MockStreamingRuntimeContext(true, 1, 1);
return context;
}

@Override
Expand Down Expand Up @@ -327,4 +350,33 @@ public void invoke(RowData value, Context context) {
rowDataCollectors.add(value);
}
}

private class ReusableIterator implements Iterator<RowData> {

private final RowDataSerializer serializer =
InternalTypeInfo.of(SCHEMA.toSinkRowDataType().getLogicalType()).toRowSerializer();
private final RowData reusedRow = new GenericRowData(SCHEMA.getColumnCount());

private int begin;
private final int end;

ReusableIterator(int begin, int size) {
this.begin = begin;
this.end = begin + size;
}

@Override
public boolean hasNext() {
return begin < end;
}

@Override
public RowData next() {
if (enableObjectReuse) {
return serializer.copy(TEST_DATA[begin++], reusedRow);
} else {
return TEST_DATA[begin++];
}
}
}
}

0 comments on commit 6defc99

Please sign in to comment.