Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-23025][upsert-kafka] Fix upsert-kafka produce duplicates when … #16242

Merged
merged 2 commits into from
Jun 24, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -132,8 +132,8 @@ public void open(Configuration parameters) throws Exception {
consumedRowDataTypeInfo.createSerializer(getRuntimeContext().getExecutionConfig());
this.valueCopier =
getRuntimeContext().getExecutionConfig().isObjectReuseEnabled()
? Function.identity()
: typeSerializer::copy;
? typeSerializer::copy
: Function.identity();

// register timer
this.scheduler =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,19 @@
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.runtime.connector.sink.SinkRuntimeProviderContext;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.runtime.typeutils.RowDataSerializer;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;

Expand All @@ -54,8 +59,14 @@
import static org.junit.Assert.assertTrue;

/** Test for {@link BufferedUpsertSinkFunction}. */
@RunWith(Parameterized.class)
public class BufferedUpsertSinkFunctionTest {

@Parameterized.Parameters(name = "object reuse = {0}")
public static Object[] enableObjectReuse() {
return new Boolean[] {true, false};
}

private static final ResolvedSchema SCHEMA =
ResolvedSchema.of(
Column.physical("id", DataTypes.INT().notNull()),
Expand Down Expand Up @@ -129,17 +140,23 @@ public class BufferedUpsertSinkFunctionTest {
TimestampData.fromInstant(Instant.parse("2021-03-30T21:00:00Z")))
};

private final boolean enableObjectReuse;

public BufferedUpsertSinkFunctionTest(boolean enableObjectReuse) {
this.enableObjectReuse = enableObjectReuse;
}

@Test
public void testWriteData() throws Exception {
MockedSinkFunction sinkFunction = new MockedSinkFunction();
MockedSinkFunction sinkFunction = new MockedSinkFunction(enableObjectReuse);
fsk119 marked this conversation as resolved.
Show resolved Hide resolved
BufferedUpsertSinkFunction bufferedSink = createBufferedSink(sinkFunction);

// write 3 records which doesn't trigger batch size
writeData(bufferedSink, TEST_DATA, 0, 3);
writeData(bufferedSink, new ReusableIterator(0, 3, enableObjectReuse));
assertTrue(sinkFunction.rowDataCollectors.isEmpty());

// write one more record, and should flush the buffer
writeData(bufferedSink, TEST_DATA, 3, 1);
writeData(bufferedSink, new ReusableIterator(3, 1, enableObjectReuse));

HashMap<Integer, List<RowData>> expected = new HashMap<>();
expected.put(
Expand Down Expand Up @@ -180,16 +197,16 @@ public void testWriteData() throws Exception {

sinkFunction.rowDataCollectors.clear();
// write remaining data, and they are still buffered
writeData(bufferedSink, TEST_DATA, 4, 3);
writeData(bufferedSink, new ReusableIterator(4, 3, enableObjectReuse));
assertTrue(sinkFunction.rowDataCollectors.isEmpty());
}

@Test
public void testFlushDataWhenCheckpointing() throws Exception {
MockedSinkFunction sinkFunction = new MockedSinkFunction();
MockedSinkFunction sinkFunction = new MockedSinkFunction(enableObjectReuse);
BufferedUpsertSinkFunction bufferedFunction = createBufferedSink(sinkFunction);
// write all data, there should be 3 records are still buffered
writeData(bufferedFunction, TEST_DATA, 0, TEST_DATA.length);
writeData(bufferedFunction, new ReusableIterator(0, TEST_DATA.length, enableObjectReuse));
// snapshot should flush the buffer
bufferedFunction.snapshotState(null);

Expand Down Expand Up @@ -256,17 +273,17 @@ private BufferedUpsertSinkFunction createBufferedSink(MockedSinkFunction sinkFun
new int[] {keyIndices},
typeInformation,
BUFFER_FLUSH_MODE);
bufferedSinkFunction.getRuntimeContext().getExecutionConfig().enableObjectReuse();
fsk119 marked this conversation as resolved.
Show resolved Hide resolved
bufferedSinkFunction.open(new Configuration());

return bufferedSinkFunction;
}

private void writeData(BufferedUpsertSinkFunction sink, RowData[] data, int startPos, int size)
private void writeData(BufferedUpsertSinkFunction sink, Iterator<RowData> iterator)
throws Exception {
for (int i = startPos; i < startPos + size; i++) {
RowData row = data[i];
long rowtime = row.getTimestamp(TIMESTAMP_INDICES, 3).getMillisecond();
sink.invoke(row, SinkContextUtil.forTimestamp(rowtime));
while (iterator.hasNext()) {
RowData next = iterator.next();
long rowtime = next.getTimestamp(TIMESTAMP_INDICES, 3).getMillisecond();
sink.invoke(next, SinkContextUtil.forTimestamp(rowtime));
}
}

Expand All @@ -291,11 +308,18 @@ private static class MockedSinkFunction extends RichSinkFunction<RowData>
implements CheckpointedFunction, CheckpointListener {

private static final long serialVersionUID = 1L;
private final RuntimeContext context = new MockStreamingRuntimeContext(true, 1, 1);
transient List<RowData> rowDataCollectors;

MockedSinkFunction(boolean enableObjectReuse) {
if (enableObjectReuse) {
context.getExecutionConfig().enableObjectReuse();
}
}

@Override
public RuntimeContext getRuntimeContext() {
return new MockStreamingRuntimeContext(true, 1, 1);
return context;
}

@Override
Expand Down Expand Up @@ -327,4 +351,35 @@ public void invoke(RowData value, Context context) {
rowDataCollectors.add(value);
}
}

private static class ReusableIterator implements Iterator<RowData> {

private static final RowDataSerializer SERIALIZER =
InternalTypeInfo.of(SCHEMA.toSinkRowDataType().getLogicalType()).toRowSerializer();
private static final RowData REUSED_ROW = new GenericRowData(SCHEMA.getColumnCount());

private int begin;
private final int end;
private final boolean enableObjectReuse;

ReusableIterator(int begin, int size, boolean enableObjectReuse) {
this.begin = begin;
this.end = begin + size;
this.enableObjectReuse = enableObjectReuse;
}

@Override
public boolean hasNext() {
return begin < end;
}

@Override
public RowData next() {
if (enableObjectReuse) {
return SERIALIZER.copy(TEST_DATA[begin++], REUSED_ROW);
} else {
return TEST_DATA[begin++];
}
}
}
}