Skip to content

Commit

Permalink
spring-projectsGH-2357: Switch to CompletableFuture
Browse files Browse the repository at this point in the history
Resolves spring-projects#2357

Spring Framework is planning to deprecate `ListenableFuture` in 6.0.

Add methods to the `KafkaOperations` (`KafkaTemplate`) that return
`CompletableFuture` instead; the `ListenableFuture` methods have now
been removed in 3.0.
  • Loading branch information
garyrussell committed Jul 20, 2022
1 parent a9ec919 commit 933999c
Show file tree
Hide file tree
Showing 18 changed files with 525 additions and 158 deletions.
25 changes: 25 additions & 0 deletions spring-kafka-docs/src/main/asciidoc/kafka.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,23 @@ interface ProducerCallback<K, V, T> {

See the https://docs.spring.io/spring-kafka/api/org/springframework/kafka/core/KafkaTemplate.html[Javadoc] for more detail.

IMPORTANT: In version 3.0, the methods that returned `ListenableFuture` have been changed to return `CompletableFuture`.
To facilitate the migration, the 2.9 version added a method `.usingCompletableFuture()` which provided the same methods with `CompletableFuture` return types.

====
[source, java]
----
KafkaOperations2<String, String> template = new KafkaTemplate<>().usingCompletableFuture();
CompletableFuture<SendResult<String, String>> future = template.send(topic1, 0, 0, "buz")
.whenComplete((sr, thrown) -> {
...
});
)
----
====

`KafkaOperations2` and `.usingCompletableFuture()` have now been deprecated, and should be removed, since the method simply returns `this`.

The `sendDefault` API requires that a default topic has been provided to the template.

The API takes in a `timestamp` as a parameter and stores this timestamp in the record.
Expand Down Expand Up @@ -553,6 +570,10 @@ The result is a `ListenableFuture` that is asynchronously populated with the res
The result also has a `sendFuture` property, which is the result of calling `KafkaTemplate.send()`.
You can use this future to determine the result of the send operation.

IMPORTANT: In version 3.0, the futures returned by these methods (and their `sendFuture` properties) have been changed to `CompletableFuture` s instead of `ListenableFuture` s.
To assit in the transition, using this version 2.9, you could convert these types to a `CompleteableFuture` by calling `asCompletable()` on the returned `Future`.
This method is no longer available and the future itself is a `CompletableFuture`.

If the first method is used, or the `replyTimeout` argument is `null`, the template's `defaultReplyTimeout` property is used (5 seconds by default).

Starting with version 2.8.8, the template has a new method `waitForAssignment`.
Expand Down Expand Up @@ -791,6 +812,10 @@ RequestReplyMessageFuture<K, V> sendAndReceive(Message<?> message);

These will use the template's default `replyTimeout`, there are also overloaded versions that can take a timeout in the method call.

IMPORTANT: In version 3.0, the futures returned by these methods (and their `sendFuture` properties) have been changed to `CompletableFuture` s instead of `ListenableFuture` s.
To assit in the transition, using this version 2.9, you could convert these types to a `CompleteableFuture` by calling `asCompletable()` on the returned `Future`.
This method is no longer available and the future itself is a `CompletableFuture`.

Use the first method if the consumer's `Deserializer` or the template's `MessageConverter` can convert the payload without any additional information, either via configuration or type metadata in the reply message.

Use the second method if you need to provide type information for the return type, to assist the message converter.
Expand Down
12 changes: 12 additions & 0 deletions spring-kafka-docs/src/main/asciidoc/whats-new.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,15 @@ See <<retry-config>> for more information.

Events related to consumer authentication and authorization failures are now published by the container.
See <<events>> for more information.

[[x30-template-changes]]
==== `KafkaTemplate` Changes

The futures returned by this class are now `CompletableFuture` s instead of `ListenableFuture` s.
See <<kafka-template>>.

[[x30-rkt-changes]]
==== `ReplyingKafkaTemplate` Changes

The futures returned by this class are now `CompletableFuture` s instead of `ListenableFuture` s.
See <<replying-template>> and <<exchanging-messages>>.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;

import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
import org.apache.kafka.clients.consumer.ConsumerRecord;
Expand All @@ -39,7 +40,7 @@
import org.springframework.util.concurrent.ListenableFuture;

/**
* The basic Kafka operations contract returning {@link ListenableFuture}s.
* The basic Kafka operations contract returning {@link CompletableFuture}s.
*
* @param <K> the key type.
* @param <V> the value type.
Expand All @@ -56,7 +57,8 @@
* @author Gary Russell
* @author Biju Kunjummen
*/
public interface KafkaOperations<K, V> {
@SuppressWarnings("deprecation")
public interface KafkaOperations<K, V> extends KafkaOperations2<K, V> {

/**
* Default timeout for {@link #receive(String, int, long)}.
Expand All @@ -68,15 +70,17 @@ public interface KafkaOperations<K, V> {
* @param data The data.
* @return a Future for the {@link SendResult}.
*/
ListenableFuture<SendResult<K, V>> sendDefault(V data);
@Override
CompletableFuture<SendResult<K, V>> sendDefault(V data);

/**
* Send the data to the default topic with the provided key and no partition.
* @param key the key.
* @param data The data.
* @return a Future for the {@link SendResult}.
*/
ListenableFuture<SendResult<K, V>> sendDefault(K key, V data);
@Override
CompletableFuture<SendResult<K, V>> sendDefault(K key, V data);

/**
* Send the data to the default topic with the provided key and partition.
Expand All @@ -85,7 +89,8 @@ public interface KafkaOperations<K, V> {
* @param data the data.
* @return a Future for the {@link SendResult}.
*/
ListenableFuture<SendResult<K, V>> sendDefault(Integer partition, K key, V data);
@Override
CompletableFuture<SendResult<K, V>> sendDefault(Integer partition, K key, V data);

/**
* Send the data to the default topic with the provided key and partition.
Expand All @@ -96,15 +101,17 @@ public interface KafkaOperations<K, V> {
* @return a Future for the {@link SendResult}.
* @since 1.3
*/
ListenableFuture<SendResult<K, V>> sendDefault(Integer partition, Long timestamp, K key, V data);
@Override
CompletableFuture<SendResult<K, V>> sendDefault(Integer partition, Long timestamp, K key, V data);

/**
* Send the data to the provided topic with no key or partition.
* @param topic the topic.
* @param data The data.
* @return a Future for the {@link SendResult}.
*/
ListenableFuture<SendResult<K, V>> send(String topic, V data);
@Override
CompletableFuture<SendResult<K, V>> send(String topic, V data);

/**
* Send the data to the provided topic with the provided key and no partition.
Expand All @@ -113,7 +120,8 @@ public interface KafkaOperations<K, V> {
* @param data The data.
* @return a Future for the {@link SendResult}.
*/
ListenableFuture<SendResult<K, V>> send(String topic, K key, V data);
@Override
CompletableFuture<SendResult<K, V>> send(String topic, K key, V data);

/**
* Send the data to the provided topic with the provided key and partition.
Expand All @@ -123,7 +131,8 @@ public interface KafkaOperations<K, V> {
* @param data the data.
* @return a Future for the {@link SendResult}.
*/
ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, K key, V data);
@Override
CompletableFuture<SendResult<K, V>> send(String topic, Integer partition, K key, V data);

/**
* Send the data to the provided topic with the provided key and partition.
Expand All @@ -135,15 +144,17 @@ public interface KafkaOperations<K, V> {
* @return a Future for the {@link SendResult}.
* @since 1.3
*/
ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, Long timestamp, K key, V data);
@Override
CompletableFuture<SendResult<K, V>> send(String topic, Integer partition, Long timestamp, K key, V data);

/**
* Send the provided {@link ProducerRecord}.
* @param record the record.
* @return a Future for the {@link SendResult}.
* @since 1.3
*/
ListenableFuture<SendResult<K, V>> send(ProducerRecord<K, V> record);
@Override
CompletableFuture<SendResult<K, V>> send(ProducerRecord<K, V> record);

/**
* Send a message with routing information in message headers. The message payload
Expand All @@ -154,21 +165,24 @@ public interface KafkaOperations<K, V> {
* @see org.springframework.kafka.support.KafkaHeaders#PARTITION
* @see org.springframework.kafka.support.KafkaHeaders#KEY
*/
ListenableFuture<SendResult<K, V>> send(Message<?> message);
@Override
CompletableFuture<SendResult<K, V>> send(Message<?> message);

/**
* See {@link Producer#partitionsFor(String)}.
* @param topic the topic.
* @return the partition info.
* @since 1.1
*/
@Override
List<PartitionInfo> partitionsFor(String topic);

/**
* See {@link Producer#metrics()}.
* @return the metrics.
* @since 1.1
*/
@Override
Map<MetricName, ? extends Metric> metrics();

/**
Expand All @@ -178,6 +192,7 @@ public interface KafkaOperations<K, V> {
* @return the result.
* @since 1.1
*/
@Override
@Nullable
<T> T execute(ProducerCallback<K, V, T> callback);

Expand All @@ -190,12 +205,14 @@ public interface KafkaOperations<K, V> {
* @return the result.
* @since 1.1
*/
@Override
@Nullable
<T> T executeInTransaction(OperationsCallback<K, V, T> callback);

/**
* Flush the producer.
*/
@Override
void flush();

/**
Expand All @@ -210,6 +227,7 @@ public interface KafkaOperations<K, V> {
* @since 2.5
* @see Producer#sendOffsetsToTransaction(Map, ConsumerGroupMetadata)
*/
@Override
default void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets,
ConsumerGroupMetadata groupMetadata) {

Expand All @@ -222,13 +240,15 @@ default void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> off
* @return true or false.
* @since 2.3
*/
@Override
boolean isTransactional();

/**
* Return true if this template, when transactional, allows non-transactional operations.
* @return true to allow.
* @since 2.4.3
*/
@Override
default boolean isAllowNonTransactional() {
return false;
}
Expand All @@ -239,6 +259,7 @@ default boolean isAllowNonTransactional() {
* @return true if a transaction is running.
* @since 2.5
*/
@Override
default boolean inTransaction() {
return false;
}
Expand All @@ -248,6 +269,7 @@ default boolean inTransaction() {
* @return the factory.
* @since 2.5
*/
@Override
default ProducerFactory<K, V> getProducerFactory() {
throw new UnsupportedOperationException("This implementation does not support this operation");
}
Expand All @@ -261,6 +283,7 @@ default ProducerFactory<K, V> getProducerFactory() {
* @since 2.8
* @see #DEFAULT_POLL_TIMEOUT
*/
@Override
@Nullable
default ConsumerRecord<K, V> receive(String topic, int partition, long offset) {
return receive(topic, partition, offset, DEFAULT_POLL_TIMEOUT);
Expand All @@ -275,6 +298,7 @@ default ConsumerRecord<K, V> receive(String topic, int partition, long offset) {
* @return the record or null.
* @since 2.8
*/
@Override
@Nullable
ConsumerRecord<K, V> receive(String topic, int partition, long offset, Duration pollTimeout);

Expand All @@ -286,6 +310,7 @@ default ConsumerRecord<K, V> receive(String topic, int partition, long offset) {
* @since 2.8
* @see #DEFAULT_POLL_TIMEOUT
*/
@Override
default ConsumerRecords<K, V> receive(Collection<TopicPartitionOffset> requested) {
return receive(requested, DEFAULT_POLL_TIMEOUT);
}
Expand All @@ -297,8 +322,22 @@ default ConsumerRecords<K, V> receive(Collection<TopicPartitionOffset> requested
* @return the record or null.
* @since 2.8
*/
@Override
ConsumerRecords<K, V> receive(Collection<TopicPartitionOffset> requested, Duration pollTimeout);

/**
* Return an implementation that returns {@link CompletableFuture} instead of
* {@link CompletableFuture}. The methods returning {@link ListenableFuture} will be
* removed in 3.0
* @return the implementation.
* @since 2.9.
* @deprecated no longer needed; {@link KafkaOperations} now returns {@link CompletableFuture}.
*/
@Deprecated
default KafkaOperations2<K, V> usingCompletableFuture() {
return this;
}

/**
* A callback for executing arbitrary operations on the {@link Producer}.
* @param <K> the key type.
Expand Down
Loading

0 comments on commit 933999c

Please sign in to comment.