Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-13525: Implement KeyQuery in Streams IQv2 #11582

Merged
merged 5 commits into from
Dec 20, 2021
Merged

Conversation

vvcephei
Copy link
Contributor

@vvcephei vvcephei commented Dec 9, 2021

Implement the KeyQuery as proposed in KIP-796

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

Base automatically changed from iqv2-position-api to trunk December 11, 2021 07:01
Comment on lines 247 to 256
final boolean timestamped = WrappedStateStore.isTimestamped(wrapped());
final Serde<V> vSerde = serdes.valueSerde();
final Deserializer<V> deserializer;
if (!timestamped && vSerde instanceof ValueAndTimestampSerde) {
final ValueAndTimestampDeserializer valueAndTimestampDeserializer =
(ValueAndTimestampDeserializer) ((ValueAndTimestampSerde) vSerde).deserializer();
deserializer = (Deserializer<V>) valueAndTimestampDeserializer.valueDeserializer;
} else {
deserializer = vSerde.deserializer();
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a bit of a hack. I think we can possibly do a better job of this in the Timestamped facade that we insert in front of non-timestamped stores, but I don't want to over-engineer it until we have a few different query types in the code base to make sure that what we do works in general.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note for the reviewers in case this is mysterious: We support one store, the non-timestamped RocksDB store (Stores.persistentKeyValueStore), which does not return a ValueAndTimestamp tuple, but just a raw value. But Streams has a store layer that inserts dummy timestamps to make everything conform to the same types. It's counter to the goals of IQv2 to spend time and memory copying the result arrays to add the timestamp only to strip it off again later (as in IQv1), so instead we just pass through the non-timestamped value from the bytes store and strip off the ValueAndTimestamp serde so we can deserialize the raw value.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But Streams has a store layer that inserts dummy timestamps to make everything conform to the same types.

We only do this for the DSL. For the PAPI, the "plain KeyValueStore" can still be used as-is, and we don't modify anything about it. Thus IQ must be able to handle both cases.

Also, if we add this face, we would use a MeteredTimestampedKeyValueStore not this class. I think we need to move the logic for the timestamped case into the other class.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so instead we just pass through the non-timestamped value from the bytes store and strip off the ValueAndTimestamp serde so we can deserialize the raw value.

Not sure if I understand how we would do this? We don't control the inner stores?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I missed this thread before. I think these points are discussed on other threads in this PR, though. Tl;dr: I think we should aim to clean this up in https://issues.apache.org/jira/browse/KAFKA-13526

For now, I believe this logic is correct. However, it's good that you pointed out we're only testing all dsl store combinations. I filed https://issues.apache.org/jira/browse/KAFKA-13553 to extend the IT to also test all papi store combinations.

Comment on lines 57 to 62
/**
* The store that handled the query got an exception during query execution. The message
* will contain the exception details. Depending on the nature of the exception, the caller
* may be able to retry this instance or may need to try a different instance.
*/
STORE_EXCEPTION;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I realized in the implementation for RocksDB that we will need to account for runtime exceptions from the stores. I'll update the KIP.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did not follow the details of the KIP discussion, but wondering to what extend this new FailureReason class voids https://cwiki.apache.org/confluence/display/KAFKA/KIP-216%3A+IQ+should+throw+different+exceptions+for+different+errors

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! @guozhangwang reminded me during the discussion to make sure that all the cases in that KIP were accounted for. Some are still exceptions, and some are now FailureReasons: https://lists.apache.org/thread/brvwvpvsbsfvqpqg6jvry5hqny0vm2tr

Comment on lines 91 to 104
private Map<Class, QueryHandler> queryHandlers =
mkMap(
mkEntry(
KeyQuery.class,
(query, positionBound, collectExecutionInfo, store) -> runKeyQuery(query, positionBound, collectExecutionInfo)
)
);

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just trying to establish some pattern here that can let us dispatch these queries efficiently. This O(1) lookup should be faster than an O(n) if/else check or an O(log n) string switch statement, but we won't know for sure without benchmarking.

Comment on lines 247 to 256
final boolean timestamped = WrappedStateStore.isTimestamped(wrapped());
final Serde<V> vSerde = serdes.valueSerde();
final Deserializer<V> deserializer;
if (!timestamped && vSerde instanceof ValueAndTimestampSerde) {
final ValueAndTimestampDeserializer valueAndTimestampDeserializer =
(ValueAndTimestampDeserializer) ((ValueAndTimestampSerde) vSerde).deserializer();
deserializer = (Deserializer<V>) valueAndTimestampDeserializer.valueDeserializer;
} else {
deserializer = vSerde.deserializer();
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note for the reviewers in case this is mysterious: We support one store, the non-timestamped RocksDB store (Stores.persistentKeyValueStore), which does not return a ValueAndTimestamp tuple, but just a raw value. But Streams has a store layer that inserts dummy timestamps to make everything conform to the same types. It's counter to the goals of IQv2 to spend time and memory copying the result arrays to add the timestamp only to strip it off again later (as in IQv1), so instead we just pass through the non-timestamped value from the bytes store and strip off the ValueAndTimestamp serde so we can deserialize the raw value.

Comment on lines 272 to 288
public boolean timestamped() {
return true; // most stores are timestamped
};

public boolean global() {
return false;
}

public boolean keyValue() {
return false;
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These help us adjust our expectations in the validations below, so that we can cover all store types in the same test.

Comment on lines 491 to 503
if (storeToTest.keyValue()) {
if (storeToTest.timestamped()) {
shouldHandleKeyQuery(
2,
(Function<ValueAndTimestamp<Integer>, Integer>) ValueAndTimestamp::value,
2
);
} else {
shouldHandleKeyQuery(
2,
Function.identity(),
2
);
}
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here's where we use those properties. KeyQueries are only implemented for KeyValue stores. For Timestamped stores, we get back a ValueAndTimestamp, which we extract the value from before making the assertion. Otherwise, we just get back the value and can assert directly on it.

Comment on lines +622 to +713
final V result1 = queryResult.getResult();
final Integer integer = valueExtactor.apply(result1);
assertThat(integer, is(expectedValue));
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here's where we run that function to either get the value out of the ValueAndTimestamp or just give back the value with the identity function.

@vvcephei
Copy link
Contributor Author

Note to reviewers, upon reflection, and based on the discussions on the other ongoing KIPs to add IQv2 queries, I've decided to drop the "RawKeyQuery" that I had originally proposed. It really just saved us from one extra cast in an execution path that already has a ton of other casts. It was an attempt to be a little elegant, but I don't think it was successful.

I'm hoping that once we have several queries in place, we'll be able to golf it a bit and come up with an actually more elegant approach to the internal code.

@@ -197,6 +197,18 @@ public R getResult() {
return result;
}

@SuppressWarnings("unchecked")
public <V> QueryResult<V> swapResult(final V value) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't really need this. You can just do QueryResult.forResult in the MeteredKeyValue store for example to get the typed result

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The purpose of this method is to allow MeteredKeyValue store to deserialize the result without wiping out the execution info or position that it got back from the bytes store. I missed that while reviewing your PR, so I went ahead and added a fix for it to this one.

Copy link
Contributor

@vpapavas vpapavas left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @vvcephei ! Just a small comment but other than that, LGTM! +1

*/
package org.apache.kafka.streams.query;

import org.apache.kafka.common.annotation.InterfaceStability.Evolving;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Javadocs :)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems to apply to FailureReason, too? There are not class JavaDocs.

Comment on lines 57 to 62
/**
* The store that handled the query got an exception during query execution. The message
* will contain the exception details. Depending on the nature of the exception, the caller
* may be able to retry this instance or may need to try a different instance.
*/
STORE_EXCEPTION;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did not follow the details of the KIP discussion, but wondering to what extend this new FailureReason class voids https://cwiki.apache.org/confluence/display/KAFKA/KIP-216%3A+IQ+should+throw+different+exceptions+for+different+errors

*/
package org.apache.kafka.streams.query;

import org.apache.kafka.common.annotation.InterfaceStability.Evolving;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems to apply to FailureReason, too? There are not class JavaDocs.

private final FailureReason failureReason;
private final String failure;
private final R result;
private List<String> executionInfo = new LinkedList<>();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not keep it final and use clear() and addAll instead?

For my own education: what is executionInfo (and why is it a String list)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks; that would be another way to do it. I'm not sure if that would be clearly better or not, though.

It's for getting more details about how the query was actually executed inside of Streams. Right now, if you request it as part of the query, each store layer will report what it did and how long it took. For runtime queries, you wouldn't want to use it, but I wanted to enable debugging if the cases where query execution seems like it's taking longer than expected. Also, it could be used for tracing, in which every Nth query is run with the execution info on.

It's a list of Strings so that each store layer / operation can just add one "line" of info (like a stack trace), but we don't waste time and memory actually concatenating them with newlines. We considered adding more structure (such as having a field for execution time), but kept it as a string so as not to restrict the kind of "execution information" we might find useful to add in the future.

@SuppressWarnings("unchecked")
public <V> QueryResult<V> swapResult(final V value) {
if (isFailure()) {
return (QueryResult<V>) this;
Copy link
Member

@mjsax mjsax Dec 15, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we not allow to swap the result if the current result has a failure? And if we don't want to allow swapping, why just return this but not throw an exception?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the case of a failure, there is no result, just the failure message. I wanted to maintain an invariant that there is always either a failure or a result, but not both or neither. I also didn't think it would be right to allow accidentally converting a failure to a successful result via this method.

I suppose we could throw an IllegalStateException, since the caller probably shouldn't be even trying to "swap" the result on a failed result to begin with, but this seems fine, too.

Copy link
Member

@mjsax mjsax Dec 17, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suppose we could throw an IllegalStateException, since the caller probably shouldn't be even trying to "swap" the result on a failed result to begin with, but this seems fine, too.

If there is no strict reason not to throw an IllegalStateException, I would strongly advocate to throw. It not only guards against potential bugs, but also expresses the semantics for developers (ie, us) much cleaner and makes the code easier to read/reason about.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sounds good!

if (isFailure()) {
return (QueryResult<V>) this;
} else {
final QueryResult<V> result = new QueryResult<>(value);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The generic type of QueryResult is R -- it does not seem safe to allow swapping in a V result?

If we need to allow for different return types, why does QueryType has a generic parameter to begin with? Seems useless if we cannot rely on it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's happening here is that we're turning a QueryResult<R> into a QueryResult<V>. A concrete example (in fact the only use case) of this is in the MeteredStore, we get back a raw result from the BytesStore and need to deserialize it, so we need to convert the QueryResult<byte[]> into a QueryResult<Integer> or something.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vvcephei WDYT about having an extended QueryResultInBytes (just a placeholder for name) on QueryResult<byte[]> and move this function to that extended class? This way we can avoid mistakenly using the swap functions.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not use two objects?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, @guozhangwang , I think something like that will be the outcome of this follow-on work: https://issues.apache.org/jira/browse/KAFKA-13526

We'll tackle that question before the first release of this new API.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, @mjsax , I'm not sure precisely what you mean. This does create a new object. If you think it would be clearer to add a constructor allowing people to set the result along with a pre-populated executionInfo and position instead, we could, but this is the API we agreed on in the KIP.

I want this new API to have good ergonomics, so I do want to consider these, but I don't think we need to hold up the KeyQuery PR on it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean, if the upper layers need QueryResult<R> with proper type, and the inner layers need QueryResult<bytes[]>, we should just have two properly types object, and instead of "swapping", just take the byte[] from QueryResult<bytes[]>, deserialize them, and stuff the result into the QueryResult<R> object.

but this is the API we agreed on in the KIP.

I did not read the KIP :D (maybe I should have). And we can always adjust it. So me it seems useless to have a generic type parameter if we don't obey it anyway, and use casts. It's the purpose of generics to avoid casts, and if it does not avoid casts, it seems pointless to have).

* allowing them to generically store query execution logic as the values
* in a map.
*/
@FunctionalInterface
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have a lot of functional interfaces but I don't think we annotated it anywhere. Also wondering what we gain by doing it? The Java compiler should be smart enough and not need this annotation?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The compiler is smart enough. It's just an informative annotation. Its only practical purpose is to raise compilation error if you try to declare more than one method in it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. So we should add it elsewhere, too (of course not as part of the IQ work).



@SuppressWarnings("unchecked")
private static final Map<Class<?>, QueryHandler> QUERY_HANDLER_MAP =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the difference between the map within MeteredKeyValueStore and this one? Why do we need both?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They both exist to dispatch query execution logic. The MeteredStores' logic is to translate results from the inner stores, and the inner stores' logic is to execute the query. Since we have a lot of functionally identical stores (i.e., many KeyValue stores, etc.), it made sense to consolidate their execution logic here instead of duplicating it in every store class.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if I fully understand, but might be less important.

private static final Map<Class<?>, QueryHandler> QUERY_HANDLER_MAP =
mkMap(
mkEntry(
PingQuery.class,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe I should read the KIP, but was it a PingQuery (well, I can guess, but what's its purpose (why is it useful)?

Copy link
Contributor Author

@vvcephei vvcephei Dec 17, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, the PingQuery isn't in the KIP at all. I added it (as an internal API) so that I could verify the stores work properly in the absence of any actual queries (because I implemented the framework before any real queries, to control the scope of the PR).

Now that we have real queries, I don't think we need to keep Ping around. I'll remove it.

try {
final byte[] bytes = keyValueStore.get(rawKeyQuery.getKey());
return QueryResult.forResult(bytes);
} catch (final Throwable t) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: t -> throwable (of just exception?)

btw: is it really best practice to catch Throwable?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. It's fine to catch Throwables, but it's not fine to swallow Errors, as I'm doing here.

} else {
final QueryResult<R> result;

final QueryHandler handler = QUERY_HANDLER_MAP.get(query.getClass());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems I don't understand the control flow. I thought we handle queries inside state store. What is handleBasicQuery? And why does the "query map" know about KeyValueStore (should it not be store type agnostic)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, that's accurate, but many of the stores will have the exact same logic as each other, so it made sense to consolidate it, which is what this util class is for.

The function in the query map just checks the type of the store so that it can either cast it to execute the query or return "unknown query". That way, we can use the same dispatch map for all queries.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤔

Copy link
Contributor Author

@vvcephei vvcephei left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the reviews, @mjsax and @vvcephei !

@@ -34,7 +34,7 @@
* <p>
*/
@Evolving
public class RangeQuery<K, V> implements Query<KeyValueIterator<K, V>> {
public final class RangeQuery<K, V> implements Query<KeyValueIterator<K, V>> {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had a review comment to add this to KeyQuery, so I added it to RangeQuery for exactly the same reason.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just realized: we call the range query of kv-store RangeQuery, and the range query of window store WindowRangeQuery, and similarly for key queries. Is that intentional?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. We can rename it to KeyRangeQuery in a follow-on PR. I'll file a Jira.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@@ -52,7 +52,6 @@ private RangeQuery(final Optional<K> lower, final Optional<K> upper) {
* @param upper The key that specifies the upper bound of the range
* @param <K> The key type
* @param <V> The value type
* @return An iterator of records
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Noticed these, which are not correct. This method returns a query, not an iterator of records.

Comment on lines +230 to +233
query,
positionBound,
collectExecutionInfo,
this
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry about this. The last PR contained some bad formatting, so I'm just biting the bullet and fixing it here.

* <p>
* This is not a public API and may change without notice.
*/
public class PingQuery implements Query<Boolean> {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed, since it was only for validating the framework in the absence of any query implementations, and now we have query implementations.

Comment on lines -59 to -62
mkEntry(
PingQuery.class,
(query, positionBound, collectExecutionInfo, store) -> QueryResult.forResult(true)
),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed see the comment on the PingQuery class.

@@ -162,15 +163,39 @@ public static boolean isPermitted(
}
final R result = (R) iterator;
return QueryResult.forResult(result);
} catch (final Throwable t) {
final String message = parseStoreException(t, store, query);
} catch (final Exception e) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed from Throwable to Exception to avoid swallowing Errors

@@ -178,26 +180,26 @@ public static void after() {

@Test
public void shouldFailUnknownStore() {
final PingQuery query = new PingQuery();
final StateQueryRequest<Boolean> request =
final KeyQuery<Integer, ValueAndTimestamp<Integer>> query = KeyQuery.withKey(1);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The query itself doesn't matter for these evaluations, so I just arbitrarily swapped KeyQuery in for PingQuery

@@ -527,10 +541,11 @@ public void verifyStore() {
private void globalShouldRejectAllQueries() {
// See KAFKA-13523

final PingQuery query = new PingQuery();
final StateQueryRequest<Boolean> request = inStore(STORE_NAME).withQuery(query);
final KeyQuery<Integer, ValueAndTimestamp<Integer>> query = KeyQuery.withKey(1);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also replaced the PingQuery here. It also doesn't affect the evaluation.

}
assertThat(actualValue, is(expectedValue));
assertThat(queryResult.getExecutionInfo(), is(empty()));
fail("global tables aren't implemented");
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was covered in a prior PR. It will be fixed in https://issues.apache.org/jira/browse/KAFKA-13523

if (isFailure()) {
return (QueryResult<V>) this;
} else {
final QueryResult<V> result = new QueryResult<>(value);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vvcephei WDYT about having an extended QueryResultInBytes (just a placeholder for name) on QueryResult<byte[]> and move this function to that extended class? This way we can avoid mistakenly using the swap functions.

@@ -34,7 +34,7 @@
* <p>
*/
@Evolving
public class RangeQuery<K, V> implements Query<KeyValueIterator<K, V>> {
public final class RangeQuery<K, V> implements Query<KeyValueIterator<K, V>> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just realized: we call the range query of kv-store RangeQuery, and the range query of window store WindowRangeQuery, and similarly for key queries. Is that intentional?

getValueDeserializer()
);
final QueryResult<KeyValueIterator<K, V>> typedQueryResult = rawResult.swapResult(
resultIterator
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: why newline with just one parameter?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably autoformatted because the line was too long.

* @param <V> The type of the value that will be retrieved
*/
public static <K, V> KeyQuery<K, V> withKey(final K key) {
return new KeyQuery<>(key);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we check the key is not null here? Since in later callers e.g. final KeyQuery<Bytes, byte[]> rawKeyQuery = KeyQuery.withKey(keyBytes(typedKeyQuery.getKey())); we do not check if getKey() is null or not, and keyBytes function could throw if it is.

mkEntry(
RangeQuery.class,
StoreQueryUtils::runRangeQuery
),
mkEntry(KeyQuery.class,
StoreQueryUtils::runKeyQuery
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems misaligned.

(KeyValueStore<Bytes, byte[]>) store;
try {
final byte[] bytes = keyValueStore.get(rawKeyQuery.getKey());
return (QueryResult<R>) QueryResult.forResult(bytes);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we use swap here as well?

Also, I'm feeling maybe we can introduce an internal class extending on KeyQuery<?, byte[]> and only define the swap in that class (see my other comment above).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks; let's keep that in mind as we tackle some of the API refactor tasks we've queued up. We started with the RawXQuery approach, then dropped it. Before we add it back, I think we'd better have a representative set of queries and also bear in mind all the other sharp edges we'd like to smooth over before release.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vvcephei but in this PR at least, Should we use swap here as well??

final byte[] bytes = keyValueStore.get(rawKeyQuery.getKey());
return (QueryResult<R>) QueryResult.forResult(bytes);
} catch (final Exception e) {
final String message = parseStoreException(e, store, query);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should parseStoreException's first parameter be Exception then?

IntegrationTestUtils.iqv2WaitForResult(kafkaStreams, request);

final QueryResult<V> queryResult =
result.getGlobalResult() != null
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I'm still a bit confused here about global result, thinking we should have not supported this, and hence here it should always be null, is that right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This line was written before I scratched global store support from the current scope. I'll drop the check from this test for now.

final boolean timestamped = WrappedStateStore.isTimestamped(wrapped());
final Deserializer<V> deserializer;
if (!timestamped && vSerde instanceof ValueAndTimestampSerde) {
if (!timestamped && valueSerde instanceof ValueAndTimestampSerde) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the part that I'm not completely sure about either... maybe some quick sync on this would be more effective?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know it's weird, but it is correct. I would like to revisit it, but I think we really need to do that after the current round of queries are implemented.

@guozhangwang
Copy link
Contributor

LGTM, I believe @vvcephei will address the remaining comments right after it's merged.

@vvcephei
Copy link
Contributor Author

Thanks, all! This is all really good feedback, and we clearly have some opportunities to tighten up both the API and internals.

I will go through the follow-on tickets that are filed in Jira and mark them as 3.2 blockers. That way, we can be sure that we will either address the API issues before the release or pull IQv2 from the release.

Immediately after merging, I will follow up with a PR to remove QueryResult#swapResult from the public API. I think both the fact that it's an instance method and its name were misleading, but after reflection it also doesn't really need to be part of the caller-facing result class at all.

@vvcephei vvcephei merged commit 5747788 into trunk Dec 20, 2021
@vvcephei vvcephei deleted the iqv2-key-query branch December 20, 2021 18:22
vvcephei added a commit that referenced this pull request Dec 21, 2021
Follow-on from #11582 .
Removes a public API method in favor of an internal utility method.

Reviewer: Matthias J. Sax <mjsax@apache.org>
xdgrulez pushed a commit to xdgrulez/kafka that referenced this pull request Dec 22, 2021
Implement the KeyQuery as proposed in KIP-796

Reviewers: Vicky Papavasileiou <vpapavasileiou@confluent.io>, Matthias J. Sax <mjsax@apache.org>, Guozhang Wang <guozhang@apache.org>
xdgrulez pushed a commit to xdgrulez/kafka that referenced this pull request Dec 22, 2021
Follow-on from apache#11582 .
Removes a public API method in favor of an internal utility method.

Reviewer: Matthias J. Sax <mjsax@apache.org>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
4 participants