-
Notifications
You must be signed in to change notification settings - Fork 13.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-13522: add position tracking and bounding to IQv2 #11581
Conversation
final PositionBound positionBound, | ||
final int partition | ||
) { | ||
if (positionBound.isUnbounded()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I still think this entire check should be method on Position, as in, Position::dominates(PositionBound)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I know we've discussed this, but just for the record :-), I believe the type PositionBound could be saved, unbounded is basically an empty Position.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, I think I finally see what you are talking about. Let me give it a shot...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, so the situation is a little different now than when I originally added a separate PositionBound class. At that point, we also needed a bound representing "latest" (i.e., that the query is executed on an active running task), for which there's no sensible Position representation.
My mind was still half in that world last time we spoke about this point. Now, I can see that the only special position bound is the "unbounded" one, which is the same thing semantically as an empty position.
I just tried out a change to completely get rid of the PositionBound class, but I think it makes the API more confusing. As a user, it seems more clear to create an "unbounded" PositionBound than an "empty" Position. I think it makes sense if you think about in terms of comparing vectors (an empty vector is "less than" all other vectors, so when it's used as a lower bound, it permits everything). But I don't want people to have to think that hard about it.
Another option I considered is to add a Position.unbounded()
factory, but that doesn't completely make sense either, since a Position is just a point in vector space. It doesn't bound anything by itself, though it can be used as a bound.
Plus, I think query handling implementation, both for Streams and for custom user stores, is easier to keep track of if you have two types. You simply can't mix up which Position was supposed to be the bound.
On balance, it still seems better to keep the separate PositionBound class. However, I did realize that the logic of PositionBound and the internal logic in Streams can be simplified with this observation that an unbounded position is equivalent to an empty position. I just added that commit to this PR.
Thanks!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good, thanks for thinking about it.
Looks good, thanks John! |
@@ -1764,20 +1764,18 @@ protected void processStreamThread(final Consumer<StreamThread> consumer) { | |||
); | |||
} | |||
final StateQueryResult<R> result = new StateQueryResult<>(); | |||
final Set<Integer> handledPartitions = new HashSet<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why moving it out of the else block scope?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, sorry about that. That was an artifact from a recent refactor. But now that I'm looking at it, I realize we don't need a separate set for tracking this, since we can use the result's partition set itself.
Position position = Position.emptyPosition(); | ||
for (final QueryResult<R> r : partitionResults.values()) { | ||
position = position.merge(r.getPosition()); | ||
if (globalResult != null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure I understand this: in this PR we've effectively turned off the only caller of "result.setGlobalResult(r);" so this should never hit any more right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's correct, but since the old version of this method was incorrect, I figured I'd go ahead and fix it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay, but still just to clarify, at least for now we do not expect the if
block to ever trigger right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's correct.
final ProcessorRecordContext current = context.recordContext(); | ||
context.setRecordContext(entry.entry().context()); | ||
wrapped().put(entry.key(), entry.newValue()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why reorder the steps here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is actually important. We were not previously setting the record context before passing the cache-evicted record down to the lower store layers. Previously, the context was incorrectly not set during that operation, and if stores relied on the record context (via the old ProcessorContext), they were getting the wrong metadata.
Apparently, this work is the first time we added a feature in Streams that actually relied on that metadata. What is happening now is that we use that metadata to set the Position in the lower store, and if it's not set, then we get an error.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think there's some rationales to not set the processor context for the evicted record, e.g. let's say we first put record A offset 0 into the cache, and then record B offset 1, and then record C offset 2, the third insert caused the cache to evict A, which will then be written to the underlying store and also be forwarded downstreams, if we set the metadata as offset 2
for record A
that may not be correct --- but I admit that if we do not set it, then it would be either some order context like offset 1
or even just a null, but it seems forwarding A
with offset 2
is not appropriate still since we would prefer to forward A
with offset 0
ideally
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure I follow; we always did set the context, we just did it (incorrectly) between the wrapped().put
and forwarding downstream. Why should we do the wrapped().put
with the wrong context and then forward with the right context?
Taking your example sequence, the correct offset for record A
is 0.
The old behavior was that we would do wrapped().put(A)
with offset 2 and then forward A
with offset 0.
The new behavior is that we do wrapped().put(A)
with offset 0 and then forward A
with offset 0.
There's no scenario in which we would forward A with offset 2.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are right! This is a good fix.
final Position bound = positionBound.position(); | ||
for (final String topic : bound.getTopics()) { | ||
final Map<Integer, Long> partitionBounds = bound.getBound(topic); | ||
final Map<Integer, Long> seenPartitionBounds = position.getBound(topic); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
qq: why we name this function of position getBound
while it seems just retrieving the current positions of the topic, not really a bound?
Also for this local variables, similarly, why name it seenPartitionBounds
if the returned value semantics are not really "bounds"? Should that just be currentOffsets
or something?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Woah, you're absolutely right. I'll fix it.
result = kafkaStreams.query(request); | ||
final LinkedList<QueryResult<R>> allResults = getAllResults(result); | ||
|
||
if (allResults.isEmpty()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just curious when would the result be empty actually? I thought even if the tasks were not initialized, we would still return the NOT_PRESENT
error instead of returning empty.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You'll only get a NOT_PRESENT
response if you specifically request a partition. The default is to just get all locally present partitions. This check is actually just an assumption that in the context of an integration test, if you call this method, you're probably expecting at least one result.
It is good to note, though, that if a test is looking for results for a specific set of partitions, it should include that in the query.
@@ -62,4 +71,28 @@ public static void updatePosition( | |||
position.withComponent(meta.topic(), meta.partition(), meta.offset()); | |||
} | |||
} | |||
|
|||
public static boolean isPermitted( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we add this as a method in Position
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was hesitant to add anything extra to the Position class, but there's no particular reason for it.
* Once position bounding is generally supported, we should migrate tests to wait on the | ||
* expected response position. | ||
*/ | ||
public static <R> StateQueryResult<R> iqv2WaitForResult( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM! Thank you @vvcephei !
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM!
The test failures are unrelated:
|
Thanks for the reviews, all! |
* Fill in the Position response in the IQv2 result. * Enforce PositionBound in IQv2 queries. * Update integration testing approach to leverage consistent queries. Reviewers: Patrick Stuedi <pstuedi@apache.org>, Vicky Papavasileiou <vpapavasileiou@confluent.io>, Guozhang Wang <guozhang@apache.org>
Committer Checklist (excluded from commit message)