Skip to content

Commit

Permalink
GH-2357: Switch to CompletableFuture
Browse files Browse the repository at this point in the history
Resolves #2357

Spring Framework is planning to deprecate `ListenableFuture` in 6.0.

Add methods to the `KafkaOperations` (`KafkaTemplate`) that return
`CompletableFuture` instead; the `ListenableFuture` methods will be
removed in 3.0.

Provide mechanisms to ease the migration; allowing users to use
`CompletableFuture`s in this release, which will significantly reduce
the effort to switch in 3.0.

**2.9 Only; I will issue a separate PR for main**
  • Loading branch information
garyrussell authored and artembilan committed Jul 20, 2022
1 parent a4ee066 commit 128372f
Show file tree
Hide file tree
Showing 14 changed files with 679 additions and 29 deletions.
21 changes: 21 additions & 0 deletions spring-kafka-docs/src/main/asciidoc/kafka.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,21 @@ interface ProducerCallback<K, V, T> {

See the https://docs.spring.io/spring-kafka/api/org/springframework/kafka/core/KafkaTemplate.html[Javadoc] for more detail.

IMPORTANT: In version 3.0, the methods that return `ListenableFuture` will be changed to return `CompletableFuture`.
To facilitate the migration, the 2.9 version has a method `.usingCompletableFuture()` which will provide the same methods with `CompletableFuture` return types.

====
[source, java]
----
KafkaOperations2<String, String> template = new KafkaTemplate<>().usingCompletableFuture();
CompletableFuture<SendResult<String, String>> future = template.send(topic1, 0, 0, "buz")
.whenComplete((sr, thrown) -> {
...
});
)
----
====

The `sendDefault` API requires that a default topic has been provided to the template.

The API takes in a `timestamp` as a parameter and stores this timestamp in the record.
Expand Down Expand Up @@ -553,6 +568,9 @@ The result is a `ListenableFuture` that is asynchronously populated with the res
The result also has a `sendFuture` property, which is the result of calling `KafkaTemplate.send()`.
You can use this future to determine the result of the send operation.

IMPORTANT: In version 3.0, the futures returned by these methods (and their `sendFuture` properties) will be `CompletableFuture` s instead of `ListenableFuture` s.
To assit in the transition, using this release, you can convert these types to a `CompleteableFuture` by calling `asCompletable()` on the returned `Future`.

If the first method is used, or the `replyTimeout` argument is `null`, the template's `defaultReplyTimeout` property is used (5 seconds by default).

Starting with version 2.8.8, the template has a new method `waitForAssignment`.
Expand Down Expand Up @@ -791,6 +809,9 @@ RequestReplyMessageFuture<K, V> sendAndReceive(Message<?> message);

These will use the template's default `replyTimeout`, there are also overloaded versions that can take a timeout in the method call.

IMPORTANT: In version 3.0, the futures returned by these methods (and their `sendFuture` properties) will be `CompletableFuture` s instead of `ListenableFuture` s.
To assit in the transition, using this release, you can convert these types to a `CompleteableFuture` by calling `asCompletable()` on the returned `Future`.

Use the first method if the consumer's `Deserializer` or the template's `MessageConverter` can convert the payload without any additional information, either via configuration or type metadata in the reply message.

Use the second method if you need to provide type information for the return type, to assist the message converter.
Expand Down
9 changes: 9 additions & 0 deletions spring-kafka-docs/src/main/asciidoc/whats-new.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,18 @@ You can now configure which inbound headers should be mapped.
Also available in version 2.8.8 or later.
See <<headers>> for more information.

[[x29-template-changes]]
==== `KafkaTemplate` Changes

In 3.0, the futures returned by this class will be `CompletableFuture` s instead of `ListenableFuture` s.
See <<kafka-template>> for assistance in transitioning when using this release.

[[x29-rkt-changes]]
==== `ReplyingKafkaTemplate` Changes

The template now provides a method to wait for assignment on the reply container, to avoid a race when sending a request before the reply container is initialized.
Also available in version 2.8.8 or later.
See <<replying-template>>.

In 3.0, the futures returned by this class will be `CompletableFuture` s instead of `ListenableFuture` s.
See <<replying-template>> and <<exchanging-messages>> for assistance in transitioning when using this release.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;

import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
import org.apache.kafka.clients.consumer.ConsumerRecord;
Expand Down Expand Up @@ -67,15 +68,21 @@ public interface KafkaOperations<K, V> {
* Send the data to the default topic with no key or partition.
* @param data The data.
* @return a Future for the {@link SendResult}.
* @deprecated see {@link #usingCompletableFuture()}
* @see #usingCompletableFuture()
*/
@Deprecated
ListenableFuture<SendResult<K, V>> sendDefault(V data);

/**
* Send the data to the default topic with the provided key and no partition.
* @param key the key.
* @param data The data.
* @return a Future for the {@link SendResult}.
* @deprecated see {@link #usingCompletableFuture()}
* @see #usingCompletableFuture()
*/
@Deprecated
ListenableFuture<SendResult<K, V>> sendDefault(K key, V data);

/**
Expand All @@ -84,7 +91,10 @@ public interface KafkaOperations<K, V> {
* @param key the key.
* @param data the data.
* @return a Future for the {@link SendResult}.
* @deprecated see {@link #usingCompletableFuture()}
* @see #usingCompletableFuture()
*/
@Deprecated
ListenableFuture<SendResult<K, V>> sendDefault(Integer partition, K key, V data);

/**
Expand All @@ -95,15 +105,21 @@ public interface KafkaOperations<K, V> {
* @param data the data.
* @return a Future for the {@link SendResult}.
* @since 1.3
* @deprecated see {@link #usingCompletableFuture()}
* @see #usingCompletableFuture()
*/
@Deprecated
ListenableFuture<SendResult<K, V>> sendDefault(Integer partition, Long timestamp, K key, V data);

/**
* Send the data to the provided topic with no key or partition.
* @param topic the topic.
* @param data The data.
* @return a Future for the {@link SendResult}.
* @deprecated see {@link #usingCompletableFuture()}
* @see #usingCompletableFuture()
*/
@Deprecated
ListenableFuture<SendResult<K, V>> send(String topic, V data);

/**
Expand All @@ -112,7 +128,10 @@ public interface KafkaOperations<K, V> {
* @param key the key.
* @param data The data.
* @return a Future for the {@link SendResult}.
* @deprecated see {@link #usingCompletableFuture()}
* @see #usingCompletableFuture()
*/
@Deprecated
ListenableFuture<SendResult<K, V>> send(String topic, K key, V data);

/**
Expand All @@ -122,7 +141,10 @@ public interface KafkaOperations<K, V> {
* @param key the key.
* @param data the data.
* @return a Future for the {@link SendResult}.
* @deprecated see {@link #usingCompletableFuture()}
* @see #usingCompletableFuture()
*/
@Deprecated
ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, K key, V data);

/**
Expand All @@ -134,26 +156,35 @@ public interface KafkaOperations<K, V> {
* @param data the data.
* @return a Future for the {@link SendResult}.
* @since 1.3
* @deprecated see {@link #usingCompletableFuture()}
* @see #usingCompletableFuture()
*/
@Deprecated
ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, Long timestamp, K key, V data);

/**
* Send the provided {@link ProducerRecord}.
* @param record the record.
* @return a Future for the {@link SendResult}.
* @since 1.3
* @deprecated see {@link #usingCompletableFuture()}
* @see #usingCompletableFuture()
*/
@Deprecated
ListenableFuture<SendResult<K, V>> send(ProducerRecord<K, V> record);

/**
* Send a message with routing information in message headers. The message payload
* may be converted before sending.
* @param message the message to send.
* @return a Future for the {@link SendResult}.
* @deprecated see {@link #usingCompletableFuture()}
* @see org.springframework.kafka.support.KafkaHeaders#TOPIC
* @see org.springframework.kafka.support.KafkaHeaders#PARTITION
* @see org.springframework.kafka.support.KafkaHeaders#KEY
* @see #usingCompletableFuture()
*/
@Deprecated
ListenableFuture<SendResult<K, V>> send(Message<?> message);

/**
Expand Down Expand Up @@ -328,6 +359,148 @@ default ConsumerRecords<K, V> receive(Collection<TopicPartitionOffset> requested
*/
ConsumerRecords<K, V> receive(Collection<TopicPartitionOffset> requested, Duration pollTimeout);

/**
* Return an implementation that returns {@link CompletableFuture} instead of
* {@link ListenableFuture}. The methods returning {@link ListenableFuture} will be
* removed in 3.0
* @return the implementation.
* @since 2.9.
*/
default KafkaOperations2<K, V> usingCompletableFuture() {
return new KafkaOperations2<K, V>() {

KafkaOperations<K, V> ops;

@Override
public CompletableFuture<SendResult<K, V>> sendDefault(V data) {
return KafkaOperations.this.sendDefault(data).completable();
}

@Override
public CompletableFuture<SendResult<K, V>> sendDefault(K key, V data) {
return KafkaOperations.this.sendDefault(key, data).completable();
}

@Override
public CompletableFuture<SendResult<K, V>> sendDefault(Integer partition, K key, V data) {
return KafkaOperations.this.sendDefault(partition, key, data).completable();
}

@Override
public CompletableFuture<SendResult<K, V>> sendDefault(Integer partition, Long timestamp, K key, V data) {
return KafkaOperations.this.sendDefault(partition, timestamp, key, data).completable();
}

@Override
public CompletableFuture<SendResult<K, V>> send(String topic, V data) {
return KafkaOperations.this.send(topic, data).completable();
}

@Override
public CompletableFuture<SendResult<K, V>> send(String topic, K key, V data) {
return KafkaOperations.this.send(topic, key, data).completable();
}

@Override
public CompletableFuture<SendResult<K, V>> send(String topic, Integer partition, K key, V data) {
return KafkaOperations.this.send(topic, partition, key, data).completable();
}

@Override
public CompletableFuture<SendResult<K, V>> send(String topic, Integer partition, Long timestamp, K key,
V data) {
return KafkaOperations.this.send(topic, partition, timestamp, key, data).completable();
}

@Override
public CompletableFuture<SendResult<K, V>> send(ProducerRecord<K, V> record) {
return KafkaOperations.this.send(record).completable();
}

@Override
public CompletableFuture<SendResult<K, V>> send(Message<?> message) {
return KafkaOperations.this.send(message).completable();
}

@Override
public List<PartitionInfo> partitionsFor(String topic) {
return KafkaOperations.this.partitionsFor(topic);
}

@Override
public Map<MetricName, ? extends Metric> metrics() {
return KafkaOperations.this.metrics();
}

@Override
@Nullable
public <T> T execute(ProducerCallback<K, V, T> callback) {
return KafkaOperations.this.execute(callback);
}

@Override
@Nullable
public <T> T executeInTransaction(OperationsCallback<K, V, T> callback) {
return KafkaOperations.this.executeInTransaction(callback);
}

@Override
public void flush() {
KafkaOperations.this.flush();
}

@Override
public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets,
ConsumerGroupMetadata groupMetadata) {
KafkaOperations.this.sendOffsetsToTransaction(offsets, groupMetadata);
}

@Override
public boolean isTransactional() {
return KafkaOperations.this.isTransactional();
}

@Override
public boolean isAllowNonTransactional() {
return KafkaOperations.this.isAllowNonTransactional();
}

@Override
public boolean inTransaction() {
return KafkaOperations.this.inTransaction();
}

@Override
public ProducerFactory<K, V> getProducerFactory() {
return KafkaOperations.this.getProducerFactory();
}

@Override
@Nullable
public ConsumerRecord<K, V> receive(String topic, int partition, long offset) {
return KafkaOperations.this.receive(topic, partition, offset);
}

@Override
@Nullable
public ConsumerRecord<K, V> receive(String topic, int partition, long offset, Duration pollTimeout) {
return KafkaOperations.this.receive(topic, partition, offset, pollTimeout);
}

@Override
public ConsumerRecords<K, V> receive(Collection<TopicPartitionOffset> requested) {
return KafkaOperations.this.receive(requested);
}

@Override
public ConsumerRecords<K, V> receive(Collection<TopicPartitionOffset> requested, Duration pollTimeout) {
return KafkaOperations.this.receive(requested, pollTimeout);
}

};

}

/**
* A callback for executing arbitrary operations on the {@link Producer}.
* @param <K> the key type.
Expand Down
Loading

0 comments on commit 128372f

Please sign in to comment.