-
Notifications
You must be signed in to change notification settings - Fork 13.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-13525: Implement KeyQuery in Streams IQv2 #11582
Conversation
bae3a36
to
6e686a6
Compare
6e686a6
to
f4d43c4
Compare
final boolean timestamped = WrappedStateStore.isTimestamped(wrapped()); | ||
final Serde<V> vSerde = serdes.valueSerde(); | ||
final Deserializer<V> deserializer; | ||
if (!timestamped && vSerde instanceof ValueAndTimestampSerde) { | ||
final ValueAndTimestampDeserializer valueAndTimestampDeserializer = | ||
(ValueAndTimestampDeserializer) ((ValueAndTimestampSerde) vSerde).deserializer(); | ||
deserializer = (Deserializer<V>) valueAndTimestampDeserializer.valueDeserializer; | ||
} else { | ||
deserializer = vSerde.deserializer(); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a bit of a hack. I think we can possibly do a better job of this in the Timestamped facade that we insert in front of non-timestamped stores, but I don't want to over-engineer it until we have a few different query types in the code base to make sure that what we do works in general.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note for the reviewers in case this is mysterious: We support one store, the non-timestamped RocksDB store (Stores.persistentKeyValueStore
), which does not return a ValueAndTimestamp tuple, but just a raw value. But Streams has a store layer that inserts dummy timestamps to make everything conform to the same types. It's counter to the goals of IQv2 to spend time and memory copying the result arrays to add the timestamp only to strip it off again later (as in IQv1), so instead we just pass through the non-timestamped value from the bytes store and strip off the ValueAndTimestamp serde so we can deserialize the raw value.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But Streams has a store layer that inserts dummy timestamps to make everything conform to the same types.
We only do this for the DSL. For the PAPI, the "plain KeyValueStore" can still be used as-is, and we don't modify anything about it. Thus IQ must be able to handle both cases.
Also, if we add this face, we would use a MeteredTimestampedKeyValueStore
not this class. I think we need to move the logic for the timestamped case into the other class.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so instead we just pass through the non-timestamped value from the bytes store and strip off the ValueAndTimestamp serde so we can deserialize the raw value.
Not sure if I understand how we would do this? We don't control the inner stores?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, I missed this thread before. I think these points are discussed on other threads in this PR, though. Tl;dr: I think we should aim to clean this up in https://issues.apache.org/jira/browse/KAFKA-13526
For now, I believe this logic is correct. However, it's good that you pointed out we're only testing all dsl store combinations. I filed https://issues.apache.org/jira/browse/KAFKA-13553 to extend the IT to also test all papi store combinations.
/** | ||
* The store that handled the query got an exception during query execution. The message | ||
* will contain the exception details. Depending on the nature of the exception, the caller | ||
* may be able to retry this instance or may need to try a different instance. | ||
*/ | ||
STORE_EXCEPTION; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I realized in the implementation for RocksDB that we will need to account for runtime exceptions from the stores. I'll update the KIP.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did not follow the details of the KIP discussion, but wondering to what extend this new FailureReason
class voids https://cwiki.apache.org/confluence/display/KAFKA/KIP-216%3A+IQ+should+throw+different+exceptions+for+different+errors
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks! @guozhangwang reminded me during the discussion to make sure that all the cases in that KIP were accounted for. Some are still exceptions, and some are now FailureReasons: https://lists.apache.org/thread/brvwvpvsbsfvqpqg6jvry5hqny0vm2tr
private Map<Class, QueryHandler> queryHandlers = | ||
mkMap( | ||
mkEntry( | ||
KeyQuery.class, | ||
(query, positionBound, collectExecutionInfo, store) -> runKeyQuery(query, positionBound, collectExecutionInfo) | ||
) | ||
); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just trying to establish some pattern here that can let us dispatch these queries efficiently. This O(1) lookup should be faster than an O(n) if/else check or an O(log n) string switch statement, but we won't know for sure without benchmarking.
final boolean timestamped = WrappedStateStore.isTimestamped(wrapped()); | ||
final Serde<V> vSerde = serdes.valueSerde(); | ||
final Deserializer<V> deserializer; | ||
if (!timestamped && vSerde instanceof ValueAndTimestampSerde) { | ||
final ValueAndTimestampDeserializer valueAndTimestampDeserializer = | ||
(ValueAndTimestampDeserializer) ((ValueAndTimestampSerde) vSerde).deserializer(); | ||
deserializer = (Deserializer<V>) valueAndTimestampDeserializer.valueDeserializer; | ||
} else { | ||
deserializer = vSerde.deserializer(); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note for the reviewers in case this is mysterious: We support one store, the non-timestamped RocksDB store (Stores.persistentKeyValueStore
), which does not return a ValueAndTimestamp tuple, but just a raw value. But Streams has a store layer that inserts dummy timestamps to make everything conform to the same types. It's counter to the goals of IQv2 to spend time and memory copying the result arrays to add the timestamp only to strip it off again later (as in IQv1), so instead we just pass through the non-timestamped value from the bytes store and strip off the ValueAndTimestamp serde so we can deserialize the raw value.
public boolean timestamped() { | ||
return true; // most stores are timestamped | ||
}; | ||
|
||
public boolean global() { | ||
return false; | ||
} | ||
|
||
public boolean keyValue() { | ||
return false; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These help us adjust our expectations in the validations below, so that we can cover all store types in the same test.
if (storeToTest.keyValue()) { | ||
if (storeToTest.timestamped()) { | ||
shouldHandleKeyQuery( | ||
2, | ||
(Function<ValueAndTimestamp<Integer>, Integer>) ValueAndTimestamp::value, | ||
2 | ||
); | ||
} else { | ||
shouldHandleKeyQuery( | ||
2, | ||
Function.identity(), | ||
2 | ||
); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here's where we use those properties. KeyQueries are only implemented for KeyValue stores. For Timestamped stores, we get back a ValueAndTimestamp, which we extract the value from before making the assertion. Otherwise, we just get back the value and can assert directly on it.
final V result1 = queryResult.getResult(); | ||
final Integer integer = valueExtactor.apply(result1); | ||
assertThat(integer, is(expectedValue)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here's where we run that function to either get the value out of the ValueAndTimestamp or just give back the value with the identity function.
Note to reviewers, upon reflection, and based on the discussions on the other ongoing KIPs to add IQv2 queries, I've decided to drop the "RawKeyQuery" that I had originally proposed. It really just saved us from one extra cast in an execution path that already has a ton of other casts. It was an attempt to be a little elegant, but I don't think it was successful. I'm hoping that once we have several queries in place, we'll be able to golf it a bit and come up with an actually more elegant approach to the internal code. |
@@ -197,6 +197,18 @@ public R getResult() { | |||
return result; | |||
} | |||
|
|||
@SuppressWarnings("unchecked") | |||
public <V> QueryResult<V> swapResult(final V value) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't really need this. You can just do QueryResult.forResult
in the MeteredKeyValue
store for example to get the typed result
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The purpose of this method is to allow MeteredKeyValue
store to deserialize the result without wiping out the execution info or position that it got back from the bytes store. I missed that while reviewing your PR, so I went ahead and added a fix for it to this one.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @vvcephei ! Just a small comment but other than that, LGTM! +1
*/ | ||
package org.apache.kafka.streams.query; | ||
|
||
import org.apache.kafka.common.annotation.InterfaceStability.Evolving; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Javadocs :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems to apply to FailureReason
, too? There are not class JavaDocs.
/** | ||
* The store that handled the query got an exception during query execution. The message | ||
* will contain the exception details. Depending on the nature of the exception, the caller | ||
* may be able to retry this instance or may need to try a different instance. | ||
*/ | ||
STORE_EXCEPTION; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did not follow the details of the KIP discussion, but wondering to what extend this new FailureReason
class voids https://cwiki.apache.org/confluence/display/KAFKA/KIP-216%3A+IQ+should+throw+different+exceptions+for+different+errors
*/ | ||
package org.apache.kafka.streams.query; | ||
|
||
import org.apache.kafka.common.annotation.InterfaceStability.Evolving; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems to apply to FailureReason
, too? There are not class JavaDocs.
private final FailureReason failureReason; | ||
private final String failure; | ||
private final R result; | ||
private List<String> executionInfo = new LinkedList<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not keep it final
and use clear()
and addAll
instead?
For my own education: what is executionInfo
(and why is it a String
list)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks; that would be another way to do it. I'm not sure if that would be clearly better or not, though.
It's for getting more details about how the query was actually executed inside of Streams. Right now, if you request it as part of the query, each store layer will report what it did and how long it took. For runtime queries, you wouldn't want to use it, but I wanted to enable debugging if the cases where query execution seems like it's taking longer than expected. Also, it could be used for tracing, in which every Nth query is run with the execution info on.
It's a list of Strings so that each store layer / operation can just add one "line" of info (like a stack trace), but we don't waste time and memory actually concatenating them with newlines. We considered adding more structure (such as having a field for execution time), but kept it as a string so as not to restrict the kind of "execution information" we might find useful to add in the future.
@SuppressWarnings("unchecked") | ||
public <V> QueryResult<V> swapResult(final V value) { | ||
if (isFailure()) { | ||
return (QueryResult<V>) this; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we not allow to swap the result if the current result has a failure? And if we don't want to allow swapping, why just return this
but not throw an exception?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the case of a failure, there is no result, just the failure message. I wanted to maintain an invariant that there is always either a failure or a result, but not both or neither. I also didn't think it would be right to allow accidentally converting a failure to a successful result via this method.
I suppose we could throw an IllegalStateException, since the caller probably shouldn't be even trying to "swap" the result on a failed result to begin with, but this seems fine, too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suppose we could throw an IllegalStateException, since the caller probably shouldn't be even trying to "swap" the result on a failed result to begin with, but this seems fine, too.
If there is no strict reason not to throw an IllegalStateException
, I would strongly advocate to throw. It not only guards against potential bugs, but also expresses the semantics for developers (ie, us) much cleaner and makes the code easier to read/reason about.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sounds good!
if (isFailure()) { | ||
return (QueryResult<V>) this; | ||
} else { | ||
final QueryResult<V> result = new QueryResult<>(value); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The generic type of QueryResult
is R
-- it does not seem safe to allow swapping in a V
result?
If we need to allow for different return types, why does QueryType
has a generic parameter to begin with? Seems useless if we cannot rely on it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's happening here is that we're turning a QueryResult<R>
into a QueryResult<V>
. A concrete example (in fact the only use case) of this is in the MeteredStore, we get back a raw result from the BytesStore and need to deserialize it, so we need to convert the QueryResult<byte[]>
into a QueryResult<Integer>
or something.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@vvcephei WDYT about having an extended QueryResultInBytes
(just a placeholder for name) on QueryResult<byte[]>
and move this function to that extended class? This way we can avoid mistakenly using the swap functions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not use two objects?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, @guozhangwang , I think something like that will be the outcome of this follow-on work: https://issues.apache.org/jira/browse/KAFKA-13526
We'll tackle that question before the first release of this new API.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, @mjsax , I'm not sure precisely what you mean. This does create a new object. If you think it would be clearer to add a constructor allowing people to set the result along with a pre-populated executionInfo and position instead, we could, but this is the API we agreed on in the KIP.
I want this new API to have good ergonomics, so I do want to consider these, but I don't think we need to hold up the KeyQuery PR on it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I mean, if the upper layers need QueryResult<R>
with proper type, and the inner layers need QueryResult<bytes[]>
, we should just have two properly types object, and instead of "swapping", just take the byte[]
from QueryResult<bytes[]>
, deserialize them, and stuff the result into the QueryResult<R>
object.
but this is the API we agreed on in the KIP.
I did not read the KIP :D (maybe I should have). And we can always adjust it. So me it seems useless to have a generic type parameter if we don't obey it anyway, and use casts. It's the purpose of generics to avoid casts, and if it does not avoid casts, it seems pointless to have).
* allowing them to generically store query execution logic as the values | ||
* in a map. | ||
*/ | ||
@FunctionalInterface |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We have a lot of functional interfaces but I don't think we annotated it anywhere. Also wondering what we gain by doing it? The Java compiler should be smart enough and not need this annotation?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The compiler is smart enough. It's just an informative annotation. Its only practical purpose is to raise compilation error if you try to declare more than one method in it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see. So we should add it elsewhere, too (of course not as part of the IQ work).
|
||
|
||
@SuppressWarnings("unchecked") | ||
private static final Map<Class<?>, QueryHandler> QUERY_HANDLER_MAP = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the difference between the map within MeteredKeyValueStore
and this one? Why do we need both?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
They both exist to dispatch query execution logic. The MeteredStores' logic is to translate results from the inner stores, and the inner stores' logic is to execute the query. Since we have a lot of functionally identical stores (i.e., many KeyValue stores, etc.), it made sense to consolidate their execution logic here instead of duplicating it in every store class.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure if I fully understand, but might be less important.
private static final Map<Class<?>, QueryHandler> QUERY_HANDLER_MAP = | ||
mkMap( | ||
mkEntry( | ||
PingQuery.class, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe I should read the KIP, but was it a PingQuery
(well, I can guess, but what's its purpose (why is it useful)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, the PingQuery isn't in the KIP at all. I added it (as an internal API) so that I could verify the stores work properly in the absence of any actual queries (because I implemented the framework before any real queries, to control the scope of the PR).
Now that we have real queries, I don't think we need to keep Ping around. I'll remove it.
try { | ||
final byte[] bytes = keyValueStore.get(rawKeyQuery.getKey()); | ||
return QueryResult.forResult(bytes); | ||
} catch (final Throwable t) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: t
-> throwable
(of just exception
?)
btw: is it really best practice to catch Throwable
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. It's fine to catch Throwables, but it's not fine to swallow Errors, as I'm doing here.
} else { | ||
final QueryResult<R> result; | ||
|
||
final QueryHandler handler = QUERY_HANDLER_MAP.get(query.getClass()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems I don't understand the control flow. I thought we handle queries inside state store. What is handleBasicQuery
? And why does the "query map" know about KeyValueStore
(should it not be store type agnostic)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep, that's accurate, but many of the stores will have the exact same logic as each other, so it made sense to consolidate it, which is what this util class is for.
The function in the query map just checks the type of the store so that it can either cast it to execute the query or return "unknown query". That way, we can use the same dispatch map for all queries.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🤔
e7d8b33
to
9f8d21b
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@@ -34,7 +34,7 @@ | |||
* <p> | |||
*/ | |||
@Evolving | |||
public class RangeQuery<K, V> implements Query<KeyValueIterator<K, V>> { | |||
public final class RangeQuery<K, V> implements Query<KeyValueIterator<K, V>> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I had a review comment to add this to KeyQuery, so I added it to RangeQuery for exactly the same reason.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just realized: we call the range query of kv-store RangeQuery
, and the range query of window store WindowRangeQuery
, and similarly for key queries. Is that intentional?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. We can rename it to KeyRangeQuery in a follow-on PR. I'll file a Jira.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@@ -52,7 +52,6 @@ private RangeQuery(final Optional<K> lower, final Optional<K> upper) { | |||
* @param upper The key that specifies the upper bound of the range | |||
* @param <K> The key type | |||
* @param <V> The value type | |||
* @return An iterator of records |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Noticed these, which are not correct. This method returns a query, not an iterator of records.
query, | ||
positionBound, | ||
collectExecutionInfo, | ||
this |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry about this. The last PR contained some bad formatting, so I'm just biting the bullet and fixing it here.
* <p> | ||
* This is not a public API and may change without notice. | ||
*/ | ||
public class PingQuery implements Query<Boolean> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed, since it was only for validating the framework in the absence of any query implementations, and now we have query implementations.
mkEntry( | ||
PingQuery.class, | ||
(query, positionBound, collectExecutionInfo, store) -> QueryResult.forResult(true) | ||
), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed see the comment on the PingQuery class.
@@ -162,15 +163,39 @@ public static boolean isPermitted( | |||
} | |||
final R result = (R) iterator; | |||
return QueryResult.forResult(result); | |||
} catch (final Throwable t) { | |||
final String message = parseStoreException(t, store, query); | |||
} catch (final Exception e) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed from Throwable to Exception to avoid swallowing Errors
@@ -178,26 +180,26 @@ public static void after() { | |||
|
|||
@Test | |||
public void shouldFailUnknownStore() { | |||
final PingQuery query = new PingQuery(); | |||
final StateQueryRequest<Boolean> request = | |||
final KeyQuery<Integer, ValueAndTimestamp<Integer>> query = KeyQuery.withKey(1); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The query itself doesn't matter for these evaluations, so I just arbitrarily swapped KeyQuery in for PingQuery
@@ -527,10 +541,11 @@ public void verifyStore() { | |||
private void globalShouldRejectAllQueries() { | |||
// See KAFKA-13523 | |||
|
|||
final PingQuery query = new PingQuery(); | |||
final StateQueryRequest<Boolean> request = inStore(STORE_NAME).withQuery(query); | |||
final KeyQuery<Integer, ValueAndTimestamp<Integer>> query = KeyQuery.withKey(1); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also replaced the PingQuery here. It also doesn't affect the evaluation.
} | ||
assertThat(actualValue, is(expectedValue)); | ||
assertThat(queryResult.getExecutionInfo(), is(empty())); | ||
fail("global tables aren't implemented"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was covered in a prior PR. It will be fixed in https://issues.apache.org/jira/browse/KAFKA-13523
if (isFailure()) { | ||
return (QueryResult<V>) this; | ||
} else { | ||
final QueryResult<V> result = new QueryResult<>(value); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@vvcephei WDYT about having an extended QueryResultInBytes
(just a placeholder for name) on QueryResult<byte[]>
and move this function to that extended class? This way we can avoid mistakenly using the swap functions.
@@ -34,7 +34,7 @@ | |||
* <p> | |||
*/ | |||
@Evolving | |||
public class RangeQuery<K, V> implements Query<KeyValueIterator<K, V>> { | |||
public final class RangeQuery<K, V> implements Query<KeyValueIterator<K, V>> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just realized: we call the range query of kv-store RangeQuery
, and the range query of window store WindowRangeQuery
, and similarly for key queries. Is that intentional?
getValueDeserializer() | ||
); | ||
final QueryResult<KeyValueIterator<K, V>> typedQueryResult = rawResult.swapResult( | ||
resultIterator |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: why newline with just one parameter?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably autoformatted because the line was too long.
* @param <V> The type of the value that will be retrieved | ||
*/ | ||
public static <K, V> KeyQuery<K, V> withKey(final K key) { | ||
return new KeyQuery<>(key); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we check the key
is not null here? Since in later callers e.g. final KeyQuery<Bytes, byte[]> rawKeyQuery = KeyQuery.withKey(keyBytes(typedKeyQuery.getKey()));
we do not check if getKey()
is null or not, and keyBytes
function could throw if it is.
mkEntry( | ||
RangeQuery.class, | ||
StoreQueryUtils::runRangeQuery | ||
), | ||
mkEntry(KeyQuery.class, | ||
StoreQueryUtils::runKeyQuery |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems misaligned.
(KeyValueStore<Bytes, byte[]>) store; | ||
try { | ||
final byte[] bytes = keyValueStore.get(rawKeyQuery.getKey()); | ||
return (QueryResult<R>) QueryResult.forResult(bytes); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we use swap
here as well?
Also, I'm feeling maybe we can introduce an internal class extending on KeyQuery<?, byte[]>
and only define the swap
in that class (see my other comment above).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks; let's keep that in mind as we tackle some of the API refactor tasks we've queued up. We started with the RawXQuery approach, then dropped it. Before we add it back, I think we'd better have a representative set of queries and also bear in mind all the other sharp edges we'd like to smooth over before release.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@vvcephei but in this PR at least, Should we use swap here as well?
?
final byte[] bytes = keyValueStore.get(rawKeyQuery.getKey()); | ||
return (QueryResult<R>) QueryResult.forResult(bytes); | ||
} catch (final Exception e) { | ||
final String message = parseStoreException(e, store, query); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should parseStoreException
's first parameter be Exception
then?
IntegrationTestUtils.iqv2WaitForResult(kafkaStreams, request); | ||
|
||
final QueryResult<V> queryResult = | ||
result.getGlobalResult() != null |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I'm still a bit confused here about global result, thinking we should have not supported this, and hence here it should always be null
, is that right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This line was written before I scratched global store support from the current scope. I'll drop the check from this test for now.
final boolean timestamped = WrappedStateStore.isTimestamped(wrapped()); | ||
final Deserializer<V> deserializer; | ||
if (!timestamped && vSerde instanceof ValueAndTimestampSerde) { | ||
if (!timestamped && valueSerde instanceof ValueAndTimestampSerde) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the part that I'm not completely sure about either... maybe some quick sync on this would be more effective?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I know it's weird, but it is correct. I would like to revisit it, but I think we really need to do that after the current round of queries are implemented.
LGTM, I believe @vvcephei will address the remaining comments right after it's merged. |
Thanks, all! This is all really good feedback, and we clearly have some opportunities to tighten up both the API and internals. I will go through the follow-on tickets that are filed in Jira and mark them as 3.2 blockers. That way, we can be sure that we will either address the API issues before the release or pull IQv2 from the release. Immediately after merging, I will follow up with a PR to remove |
Follow-on from #11582 . Removes a public API method in favor of an internal utility method. Reviewer: Matthias J. Sax <mjsax@apache.org>
Implement the KeyQuery as proposed in KIP-796 Reviewers: Vicky Papavasileiou <vpapavasileiou@confluent.io>, Matthias J. Sax <mjsax@apache.org>, Guozhang Wang <guozhang@apache.org>
Follow-on from apache#11582 . Removes a public API method in favor of an internal utility method. Reviewer: Matthias J. Sax <mjsax@apache.org>
Implement the KeyQuery as proposed in KIP-796
Committer Checklist (excluded from commit message)