Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-13522: add position tracking and bounding to IQv2 #11581

Merged
merged 8 commits into from
Dec 11, 2021

Conversation

vvcephei
Copy link
Contributor

@vvcephei vvcephei commented Dec 8, 2021

  • Fill in the Position response in the IQv2 result.
  • Enforce PositionBound in IQv2 queries.
  • Update integration testing approach to leverage consistent queries.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

final PositionBound positionBound,
final int partition
) {
if (positionBound.isUnbounded()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still think this entire check should be method on Position, as in, Position::dominates(PositionBound)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know we've discussed this, but just for the record :-), I believe the type PositionBound could be saved, unbounded is basically an empty Position.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, I think I finally see what you are talking about. Let me give it a shot...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, so the situation is a little different now than when I originally added a separate PositionBound class. At that point, we also needed a bound representing "latest" (i.e., that the query is executed on an active running task), for which there's no sensible Position representation.

My mind was still half in that world last time we spoke about this point. Now, I can see that the only special position bound is the "unbounded" one, which is the same thing semantically as an empty position.

I just tried out a change to completely get rid of the PositionBound class, but I think it makes the API more confusing. As a user, it seems more clear to create an "unbounded" PositionBound than an "empty" Position. I think it makes sense if you think about in terms of comparing vectors (an empty vector is "less than" all other vectors, so when it's used as a lower bound, it permits everything). But I don't want people to have to think that hard about it.

Another option I considered is to add a Position.unbounded() factory, but that doesn't completely make sense either, since a Position is just a point in vector space. It doesn't bound anything by itself, though it can be used as a bound.

Plus, I think query handling implementation, both for Streams and for custom user stores, is easier to keep track of if you have two types. You simply can't mix up which Position was supposed to be the bound.

On balance, it still seems better to keep the separate PositionBound class. However, I did realize that the logic of PositionBound and the internal logic in Streams can be simplified with this observation that an unbounded position is equivalent to an empty position. I just added that commit to this PR.

Thanks!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good, thanks for thinking about it.

@patrickstuedi
Copy link
Contributor

Looks good, thanks John!

@@ -1764,20 +1764,18 @@ protected void processStreamThread(final Consumer<StreamThread> consumer) {
);
}
final StateQueryResult<R> result = new StateQueryResult<>();
final Set<Integer> handledPartitions = new HashSet<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why moving it out of the else block scope?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, sorry about that. That was an artifact from a recent refactor. But now that I'm looking at it, I realize we don't need a separate set for tracking this, since we can use the result's partition set itself.

Position position = Position.emptyPosition();
for (final QueryResult<R> r : partitionResults.values()) {
position = position.merge(r.getPosition());
if (globalResult != null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure I understand this: in this PR we've effectively turned off the only caller of "result.setGlobalResult(r);" so this should never hit any more right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's correct, but since the old version of this method was incorrect, I figured I'd go ahead and fix it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, but still just to clarify, at least for now we do not expect the if block to ever trigger right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's correct.

final ProcessorRecordContext current = context.recordContext();
context.setRecordContext(entry.entry().context());
wrapped().put(entry.key(), entry.newValue());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why reorder the steps here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is actually important. We were not previously setting the record context before passing the cache-evicted record down to the lower store layers. Previously, the context was incorrectly not set during that operation, and if stores relied on the record context (via the old ProcessorContext), they were getting the wrong metadata.

Apparently, this work is the first time we added a feature in Streams that actually relied on that metadata. What is happening now is that we use that metadata to set the Position in the lower store, and if it's not set, then we get an error.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there's some rationales to not set the processor context for the evicted record, e.g. let's say we first put record A offset 0 into the cache, and then record B offset 1, and then record C offset 2, the third insert caused the cache to evict A, which will then be written to the underlying store and also be forwarded downstreams, if we set the metadata as offset 2 for record A that may not be correct --- but I admit that if we do not set it, then it would be either some order context like offset 1 or even just a null, but it seems forwarding A with offset 2 is not appropriate still since we would prefer to forward A with offset 0 ideally

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I follow; we always did set the context, we just did it (incorrectly) between the wrapped().put and forwarding downstream. Why should we do the wrapped().put with the wrong context and then forward with the right context?

Taking your example sequence, the correct offset for record A is 0.
The old behavior was that we would do wrapped().put(A) with offset 2 and then forward A with offset 0.
The new behavior is that we do wrapped().put(A) with offset 0 and then forward A with offset 0.

There's no scenario in which we would forward A with offset 2.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right! This is a good fix.

final Position bound = positionBound.position();
for (final String topic : bound.getTopics()) {
final Map<Integer, Long> partitionBounds = bound.getBound(topic);
final Map<Integer, Long> seenPartitionBounds = position.getBound(topic);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

qq: why we name this function of position getBound while it seems just retrieving the current positions of the topic, not really a bound?

Also for this local variables, similarly, why name it seenPartitionBounds if the returned value semantics are not really "bounds"? Should that just be currentOffsets or something?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Woah, you're absolutely right. I'll fix it.

result = kafkaStreams.query(request);
final LinkedList<QueryResult<R>> allResults = getAllResults(result);

if (allResults.isEmpty()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just curious when would the result be empty actually? I thought even if the tasks were not initialized, we would still return the NOT_PRESENT error instead of returning empty.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You'll only get a NOT_PRESENT response if you specifically request a partition. The default is to just get all locally present partitions. This check is actually just an assumption that in the context of an integration test, if you call this method, you're probably expecting at least one result.

It is good to note, though, that if a test is looking for results for a specific set of partitions, it should include that in the query.

@@ -62,4 +71,28 @@ public static void updatePosition(
position.withComponent(meta.topic(), meta.partition(), meta.offset());
}
}

public static boolean isPermitted(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add this as a method in Position?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was hesitant to add anything extra to the Position class, but there's no particular reason for it.

* Once position bounding is generally supported, we should migrate tests to wait on the
* expected response position.
*/
public static <R> StateQueryResult<R> iqv2WaitForResult(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks!

Copy link
Contributor

@vpapavas vpapavas left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM! Thank you @vvcephei !

Copy link
Contributor

@guozhangwang guozhangwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!

@vvcephei
Copy link
Contributor Author

The test failures are unrelated:


Build / JDK 11 and Scala 2.13 / kafka.security.authorizer.AclAuthorizerWithZkSaslTest.testAclUpdateWithSessionExpiration() | 16 sec | 1

Build  / JDK 8 and Scala 2.12 /  org.apache.kafka.streams.integration.NamedTopologyIntegrationTest.shouldRemoveNamedTopologyToRunningApplicationWithMultipleNodesAndResetsOffsets

@vvcephei vvcephei merged commit acd1f9c into trunk Dec 11, 2021
@vvcephei vvcephei deleted the iqv2-position-api branch December 11, 2021 07:01
@vvcephei
Copy link
Contributor Author

Thanks for the reviews, all!

xdgrulez pushed a commit to xdgrulez/kafka that referenced this pull request Dec 22, 2021
* Fill in the Position response in the IQv2 result.
* Enforce PositionBound in IQv2 queries.
* Update integration testing approach to leverage consistent queries.

Reviewers: Patrick Stuedi <pstuedi@apache.org>, Vicky Papavasileiou <vpapavasileiou@confluent.io>, Guozhang Wang <guozhang@apache.org>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
4 participants