Skip to content

Commit

Permalink
KAFKA-13522: add position tracking and bounding to IQv2 (apache#11581)
Browse files Browse the repository at this point in the history
* Fill in the Position response in the IQv2 result.
* Enforce PositionBound in IQv2 queries.
* Update integration testing approach to leverage consistent queries.

Reviewers: Patrick Stuedi <pstuedi@apache.org>, Vicky Papavasileiou <vpapavasileiou@confluent.io>, Guozhang Wang <guozhang@apache.org>
  • Loading branch information
vvcephei committed Dec 11, 2021
1 parent 2cd96f0 commit acd1f9c
Show file tree
Hide file tree
Showing 24 changed files with 327 additions and 153 deletions.
42 changes: 25 additions & 17 deletions streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
Original file line number Diff line number Diff line change
Expand Up @@ -1771,17 +1771,14 @@ public <R> StateQueryResult<R> query(final StateQueryRequest<R> request) {

final Map<String, StateStore> globalStateStores = topologyMetadata.globalStateStores();
if (globalStateStores.containsKey(storeName)) {
final StateStore store = globalStateStores.get(storeName);
final QueryResult<R> r =
store.query(
request.getQuery(),
request.getPositionBound(),
request.executionInfoEnabled()
);
result.setGlobalResult(r);
// See KAFKA-13523
result.setGlobalResult(
QueryResult.forFailure(
FailureReason.UNKNOWN_QUERY_TYPE,
"Global stores do not yet support the KafkaStreams#query API. Use KafkaStreams#store instead."
)
);
} else {
final Set<Integer> handledPartitions = new HashSet<>();

for (final StreamThread thread : threads) {
final Map<TaskId, Task> tasks = thread.allTasks();
for (final Entry<TaskId, Task> entry : tasks.entrySet()) {
Expand Down Expand Up @@ -1818,20 +1815,31 @@ public <R> StateQueryResult<R> query(final StateQueryRequest<R> request) {
);
result.addResult(partition, r);
}
}

// optimization: if we have handled all the requested partitions,
// we can return right away.
handledPartitions.add(partition);
if (!request.isAllPartitions()
&& handledPartitions.containsAll(request.getPartitions())) {
return result;

// optimization: if we have handled all the requested partitions,
// we can return right away.
if (!request.isAllPartitions()
&& result.getPartitionResults().keySet().containsAll(request.getPartitions())) {
return result;
}
}
}
}
}
}

if (!request.isAllPartitions()) {
for (final Integer partition : request.getPartitions()) {
if (!result.getPartitionResults().containsKey(partition)) {
result.addResult(partition, QueryResult.forFailure(
FailureReason.NOT_PRESENT,
"The requested partition was not present at the time of the query."
));
}
}
}

return result;
}

Expand Down
18 changes: 10 additions & 8 deletions streams/src/main/java/org/apache/kafka/streams/query/Position.java
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,7 @@ public Position copy() {
}

/**
* Create a new, structurally independent Position that is the result of merging two other
* Positions.
* Merges the provided Position into the current instance.
* <p>
* If both Positions contain the same topic -> partition -> offset mapping, the resulting
* Position will contain a mapping with the larger of the two offsets.
Expand All @@ -103,12 +102,10 @@ public Position merge(final Position other) {
if (other == null) {
return this;
} else {
final ConcurrentHashMap<String, ConcurrentHashMap<Integer, Long>> copy =
deepCopy(position);
for (final Entry<String, ConcurrentHashMap<Integer, Long>> entry : other.position.entrySet()) {
final String topic = entry.getKey();
final Map<Integer, Long> partitionMap =
copy.computeIfAbsent(topic, k -> new ConcurrentHashMap<>());
position.computeIfAbsent(topic, k -> new ConcurrentHashMap<>());
for (final Entry<Integer, Long> partitionOffset : entry.getValue().entrySet()) {
final Integer partition = partitionOffset.getKey();
final Long offset = partitionOffset.getValue();
Expand All @@ -118,7 +115,7 @@ public Position merge(final Position other) {
}
}
}
return new Position(copy);
return this;
}
}

Expand All @@ -132,8 +129,9 @@ public Set<String> getTopics() {
/**
* Return the partition -> offset mapping for a specific topic.
*/
public Map<Integer, Long> getBound(final String topic) {
return Collections.unmodifiableMap(position.get(topic));
public Map<Integer, Long> getPartitionPositions(final String topic) {
final ConcurrentHashMap<Integer, Long> bound = position.get(topic);
return bound == null ? Collections.emptyMap() : Collections.unmodifiableMap(bound);
}

private static ConcurrentHashMap<String, ConcurrentHashMap<Integer, Long>> deepCopy(
Expand Down Expand Up @@ -174,4 +172,8 @@ public int hashCode() {
throw new UnsupportedOperationException(
"This mutable object is not suitable as a hash key");
}

public boolean isEmpty() {
return position.isEmpty();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,63 +30,42 @@
public class PositionBound {

private final Position position;
private final boolean unbounded;

private PositionBound(final Position position, final boolean unbounded) {
if (unbounded && position != null) {
throw new IllegalArgumentException();
} else if (position != null) {
this.position = position.copy();
this.unbounded = false;
} else {
this.position = null;
this.unbounded = unbounded;
}
private PositionBound(final Position position) {
this.position = position.copy();
}

/**
* Creates a new PositionBound representing "no bound"
*/
public static PositionBound unbounded() {
return new PositionBound(null, true);
return new PositionBound(Position.emptyPosition());
}

/**
* Creates a new PositionBound representing a specific position.
*/
public static PositionBound at(final Position position) {
return new PositionBound(position, false);
return new PositionBound(position);
}

/**
* Returns true iff this object specifies that there is no position bound.
*/
public boolean isUnbounded() {
return unbounded;
return position.isEmpty();
}

/**
* Returns the specific position of this bound.
*
* @throws IllegalArgumentException if this is an "unbounded" position.
*/
public Position position() {
if (unbounded) {
throw new IllegalArgumentException(
"Cannot get the position of an unbounded PositionBound."
);
} else {
return position;
}
return position;
}

@Override
public String toString() {
if (isUnbounded()) {
return "PositionBound{unbounded}";
} else {
return "PositionBound{position=" + position + '}';
}
return "PositionBound{position=" + position + '}';
}

@Override
Expand All @@ -98,7 +77,7 @@ public boolean equals(final Object o) {
return false;
}
final PositionBound that = (PositionBound) o;
return unbounded == that.unbounded && Objects.equals(position, that.position);
return Objects.equals(position, that.position);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,11 +97,15 @@ public QueryResult<R> getGlobalResult() {
* prior observations.
*/
public Position getPosition() {
Position position = Position.emptyPosition();
for (final QueryResult<R> r : partitionResults.values()) {
position = position.merge(r.getPosition());
if (globalResult != null) {
return globalResult.getPosition();
} else {
final Position position = Position.emptyPosition();
for (final QueryResult<R> r : partitionResults.values()) {
position.merge(r.getPosition());
}
return position;
}
return position;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,10 +107,10 @@ private void putAndMaybeForward(final ThreadCache.DirtyEntry entry,
// we can skip flushing to downstream as well as writing to underlying store
if (rawNewValue != null || rawOldValue != null) {
// we need to get the old values if needed, and then put to store, and then flush
wrapped().put(entry.key(), entry.newValue());

final ProcessorRecordContext current = context.recordContext();
context.setRecordContext(entry.entry().context());
wrapped().put(entry.key(), entry.newValue());

try {
flushListener.apply(
new Record<>(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,12 @@ public class InMemoryKeyValueStore implements KeyValueStore<Bytes, byte[]> {

private final String name;
private final NavigableMap<Bytes, byte[]> map = new TreeMap<>();
private final Position position = Position.emptyPosition();
private volatile boolean open = false;
private StateStoreContext context;
private Position position;

public InMemoryKeyValueStore(final String name) {
this.name = name;
this.position = Position.emptyPosition();
}

@Override
Expand Down Expand Up @@ -102,7 +101,9 @@ public <R> QueryResult<R> query(
query,
positionBound,
collectExecutionInfo,
this
this,
position,
context.taskId().partition()
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,14 @@ public boolean isOpen() {
@Override
public <R> QueryResult<R> query(final Query<R> query, final PositionBound positionBound,
final boolean collectExecutionInfo) {
return StoreQueryUtils.handleBasicQueries(query, positionBound, collectExecutionInfo, this);
return StoreQueryUtils.handleBasicQueries(
query,
positionBound,
collectExecutionInfo,
this,
position,
context.taskId().partition()
);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,9 @@ public <R> QueryResult<R> query(final Query<R> query, final PositionBound positi
query,
positionBound,
collectExecutionInfo,
this
this,
position,
context.taskId().partition()
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.StateStoreContext;
import org.apache.kafka.streams.query.Position;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;

Expand All @@ -35,6 +36,9 @@
*/
public class MemoryLRUCache implements KeyValueStore<Bytes, byte[]> {

protected StateStoreContext context;
protected Position position = Position.emptyPosition();

public interface EldestEntryRemovalListener {
void apply(Bytes key, byte[] value);
}
Expand Down Expand Up @@ -95,6 +99,7 @@ public void init(final StateStoreContext context, final StateStore root) {
put(Bytes.wrap(key), value);
restoring = false;
});
this.context = context;
}

@Override
Expand Down Expand Up @@ -122,6 +127,7 @@ public synchronized void put(final Bytes key, final byte[] value) {
} else {
this.map.put(key, value);
}
StoreQueryUtils.updatePosition(position, context);
}

@Override
Expand All @@ -144,6 +150,7 @@ public void putAll(final List<KeyValue<Bytes, byte[]>> entries) {
@Override
public synchronized byte[] delete(final Bytes key) {
Objects.requireNonNull(key);
StoreQueryUtils.updatePosition(position, context);
return this.map.remove(key);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,9 @@ public <R> QueryResult<R> query(
query,
positionBound,
collectExecutionInfo,
this
this,
position,
context.taskId().partition()
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public static ByteBuffer serialize(final Position position) {
arraySize += Integer.SIZE; // topic name length
arraySize += topicBytes.length; // topic name itself

final Map<Integer, Long> partitionOffsets = position.getBound(topic);
final Map<Integer, Long> partitionOffsets = position.getPartitionPositions(topic);
arraySize += Integer.SIZE; // Number of PartitionOffset pairs
arraySize += (Integer.SIZE + Long.SIZE)
* partitionOffsets.size(); // partitionOffsets themselves
Expand All @@ -93,7 +93,7 @@ public static ByteBuffer serialize(final Position position) {
buffer.put(topics[i]);

final String topic = entries.get(i);
final Map<Integer, Long> partitionOffsets = position.getBound(topic);
final Map<Integer, Long> partitionOffsets = position.getPartitionPositions(topic);
buffer.putInt(partitionOffsets.size());
for (final Entry<Integer, Long> partitionOffset : partitionOffsets.entrySet()) {
buffer.putInt(partitionOffset.getKey());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,14 @@ Position getPosition() {
@Override
public <R> QueryResult<R> query(final Query<R> query, final PositionBound positionBound,
final boolean collectExecutionInfo) {
return StoreQueryUtils.handleBasicQueries(query, positionBound, collectExecutionInfo, this);
return StoreQueryUtils.handleBasicQueries(
query,
positionBound,
collectExecutionInfo,
this,
position,
stateStoreContext.taskId().partition()
);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -326,13 +326,18 @@ public void putAll(final List<KeyValue<Bytes, byte[]>> entries) {
}

@Override
public <R> QueryResult<R> query(final Query<R> query, final PositionBound positionBound,
final boolean collectExecutionInfo) {
public <R> QueryResult<R> query(
final Query<R> query,
final PositionBound positionBound,
final boolean collectExecutionInfo) {

return StoreQueryUtils.handleBasicQueries(
query,
positionBound,
collectExecutionInfo,
this
query,
positionBound,
collectExecutionInfo,
this,
position,
context.taskId().partition()
);
}

Expand Down
Loading

0 comments on commit acd1f9c

Please sign in to comment.