From b134c9274dee07a453ab9bdc1b08976dc48da39a Mon Sep 17 00:00:00 2001 From: Nathan Piper Date: Sat, 6 May 2023 13:13:31 +1000 Subject: [PATCH] fix: PubSubPublishCallback handling of success and failure callbacks (#1800) Fixes #1799. --- .../outbound/PubSubMessageHandler.java | 14 ++++++++++++-- .../outbound/PubSubMessageHandlerTests.java | 15 +++++++++++++++ 2 files changed, 27 insertions(+), 2 deletions(-) diff --git a/spring-cloud-gcp-pubsub/src/main/java/com/google/cloud/spring/pubsub/integration/outbound/PubSubMessageHandler.java b/spring-cloud-gcp-pubsub/src/main/java/com/google/cloud/spring/pubsub/integration/outbound/PubSubMessageHandler.java index dcab0f43c6..a285058fc6 100644 --- a/spring-cloud-gcp-pubsub/src/main/java/com/google/cloud/spring/pubsub/integration/outbound/PubSubMessageHandler.java +++ b/spring-cloud-gcp-pubsub/src/main/java/com/google/cloud/spring/pubsub/integration/outbound/PubSubMessageHandler.java @@ -294,15 +294,25 @@ private class PubSubPublishCallback implements BiConsumer { this.message = message; } - @Override - public void accept(String messageId, Throwable throwable) { + private void handleSuccess(String messageId) { if (PubSubMessageHandler.this.successCallback != null) { PubSubMessageHandler.this.successCallback.onSuccess(messageId, message); } + } + private void handleFailure(Throwable throwable) { if (PubSubMessageHandler.this.failureCallback != null) { PubSubMessageHandler.this.failureCallback.onFailure(throwable, message); } } + + @Override + public void accept(String messageId, Throwable throwable) { + if (throwable == null) { + handleSuccess(messageId); + } else { + handleFailure(throwable); + } + } } } diff --git a/spring-cloud-gcp-pubsub/src/test/java/com/google/cloud/spring/pubsub/integration/outbound/PubSubMessageHandlerTests.java b/spring-cloud-gcp-pubsub/src/test/java/com/google/cloud/spring/pubsub/integration/outbound/PubSubMessageHandlerTests.java index 5bad9f52d9..8418c7e3e6 100644 --- a/spring-cloud-gcp-pubsub/src/test/java/com/google/cloud/spring/pubsub/integration/outbound/PubSubMessageHandlerTests.java +++ b/spring-cloud-gcp-pubsub/src/test/java/com/google/cloud/spring/pubsub/integration/outbound/PubSubMessageHandlerTests.java @@ -224,6 +224,7 @@ void publishWithSuccessCallback() { AtomicReference messageIdRef = new AtomicReference<>(); AtomicReference ackIdRef = new AtomicReference<>(); + AtomicReference failureCauseRef = new AtomicReference<>(); this.adapter.setSuccessCallback( (ackId, message) -> { @@ -231,11 +232,17 @@ void publishWithSuccessCallback() { ackIdRef.set(ackId); }); + this.adapter.setFailureCallback( + (exception, message) -> { + failureCauseRef.set(exception); + }); + this.adapter.handleMessage(testMessage); Awaitility.await().atMost(Duration.ofSeconds(1)).untilAtomic(messageIdRef, notNullValue()); assertThat(messageIdRef).hasValue("123"); assertThat(ackIdRef).hasValue("published12345"); + assertThat(failureCauseRef).hasValue(null); } @Test @@ -249,9 +256,15 @@ void publishWithFailureCallback() { Message testMessage = new GenericMessage<>("testPayload", Collections.singletonMap("message_id", "123")); + AtomicReference ackIdRef = new AtomicReference<>(); AtomicReference failureCauseRef = new AtomicReference<>(); AtomicReference messageIdRef = new AtomicReference<>(); + this.adapter.setSuccessCallback( + (ackId, message) -> { + ackIdRef.set(ackId); + }); + this.adapter.setFailureCallback( (exception, message) -> { failureCauseRef.set(exception); @@ -264,5 +277,7 @@ void publishWithFailureCallback() { assertThat(messageIdRef).hasValue("123"); Throwable cause = failureCauseRef.get(); assertThat(cause).isInstanceOf(RuntimeException.class).hasMessage("boom!"); + + assertThat(ackIdRef).hasValue(null); } }