diff --git a/docs/src/main/asciidoc/pubsub.adoc b/docs/src/main/asciidoc/pubsub.adoc index 6bfc0afaac..23ab7181aa 100644 --- a/docs/src/main/asciidoc/pubsub.adoc +++ b/docs/src/main/asciidoc/pubsub.adoc @@ -177,6 +177,10 @@ MaxRpcTimeout puts a limit on the value of the RPC timeout, so that the RpcTimeo can't increase the RPC timeout higher than this amount. | No | 0 |=== +==== Programmatic Configuration +To apply publishing customizations not covered by the properties above, you may provide custom beans of type `PublisherCustomizer` to post-process the `Publisher.Builder` object right before it is built into a `Publisher`. +The `PublisherCustomizer` beans may be annotated with Spring Framework's `@Order` annotation to ensure they are applied in a particular sequence. + === Spring Boot Actuator Support ==== Cloud Pub/Sub Health Indicator diff --git a/spring-cloud-gcp-autoconfigure/src/main/java/com/google/cloud/spring/autoconfigure/pubsub/GcpPubSubAutoConfiguration.java b/spring-cloud-gcp-autoconfigure/src/main/java/com/google/cloud/spring/autoconfigure/pubsub/GcpPubSubAutoConfiguration.java index 41893d9d63..6e8923f89f 100644 --- a/spring-cloud-gcp-autoconfigure/src/main/java/com/google/cloud/spring/autoconfigure/pubsub/GcpPubSubAutoConfiguration.java +++ b/spring-cloud-gcp-autoconfigure/src/main/java/com/google/cloud/spring/autoconfigure/pubsub/GcpPubSubAutoConfiguration.java @@ -43,6 +43,7 @@ import com.google.cloud.spring.pubsub.core.PubSubTemplate; import com.google.cloud.spring.pubsub.core.health.HealthTrackerRegistry; import com.google.cloud.spring.pubsub.core.publisher.PubSubPublisherTemplate; +import com.google.cloud.spring.pubsub.core.publisher.PublisherCustomizer; import com.google.cloud.spring.pubsub.core.subscriber.PubSubSubscriberTemplate; import com.google.cloud.spring.pubsub.support.CachingPublisherFactory; import com.google.cloud.spring.pubsub.support.DefaultPublisherFactory; @@ -53,11 +54,14 @@ import com.google.cloud.spring.pubsub.support.converter.PubSubMessageConverter; import com.google.pubsub.v1.ProjectSubscriptionName; import java.io.IOException; +import java.util.Collections; +import java.util.List; import java.util.Map; import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executor; import java.util.function.Consumer; +import java.util.stream.Collectors; import javax.annotation.PostConstruct; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -347,7 +351,8 @@ public PublisherFactory defaultPublisherFactory( @Qualifier("publisherBatchSettings") ObjectProvider batchingSettings, @Qualifier("publisherRetrySettings") ObjectProvider retrySettings, @Qualifier("publisherTransportChannelProvider") - TransportChannelProvider publisherTransportChannelProvider) { + TransportChannelProvider publisherTransportChannelProvider, + ObjectProvider customizersProvider) { DefaultPublisherFactory factory = new DefaultPublisherFactory(this.finalProjectIdProvider); factory.setExecutorProvider(executorProvider); factory.setCredentialsProvider(this.finalCredentialsProvider); @@ -357,6 +362,12 @@ public PublisherFactory defaultPublisherFactory( batchingSettings.ifAvailable(factory::setBatchingSettings); factory.setEnableMessageOrdering(gcpPubSubProperties.getPublisher().getEnableMessageOrdering()); factory.setEndpoint(gcpPubSubProperties.getPublisher().getEndpoint()); + + List customizers = customizersProvider.orderedStream() + .collect(Collectors.toList()); + Collections.reverse(customizers); // highest priority customizer needs to be last + factory.setCustomizers(customizers); + return new CachingPublisherFactory(factory); } diff --git a/spring-cloud-gcp-autoconfigure/src/main/java/com/google/cloud/spring/autoconfigure/trace/pubsub/PubSubTracing.java b/spring-cloud-gcp-autoconfigure/src/main/java/com/google/cloud/spring/autoconfigure/trace/pubsub/PubSubTracing.java index d5e2530000..864b53cade 100644 --- a/spring-cloud-gcp-autoconfigure/src/main/java/com/google/cloud/spring/autoconfigure/trace/pubsub/PubSubTracing.java +++ b/spring-cloud-gcp-autoconfigure/src/main/java/com/google/cloud/spring/autoconfigure/trace/pubsub/PubSubTracing.java @@ -29,7 +29,6 @@ import brave.propagation.TraceContextOrSamplingFlags; import brave.sampler.SamplerFunction; import com.google.cloud.pubsub.v1.MessageReceiver; -import com.google.cloud.pubsub.v1.PublisherInterface; import com.google.cloud.pubsub.v1.stub.SubscriberStub; import com.google.pubsub.v1.PubsubMessage; import com.google.pubsub.v1.PullResponse; @@ -116,11 +115,6 @@ public static Builder newBuilder(MessagingTracing messagingTracing) { return new Builder(messagingTracing); } - /** Creates an instrumented {@linkplain PublisherInterface}. */ - public TracingPublisher publisher(PublisherInterface publisher, String topic) { - return new TracingPublisher(publisher, this, topic); - } - /** Creates an instrumented {@linkplain SubscriberStub} for use in message pulling scenario. */ public TracingSubscriberStub subscriberStub(SubscriberStub subscriberStub) { return new TracingSubscriberStub(subscriberStub, this); diff --git a/spring-cloud-gcp-autoconfigure/src/main/java/com/google/cloud/spring/autoconfigure/trace/pubsub/TracingPublisher.java b/spring-cloud-gcp-autoconfigure/src/main/java/com/google/cloud/spring/autoconfigure/trace/pubsub/TraceHelper.java similarity index 65% rename from spring-cloud-gcp-autoconfigure/src/main/java/com/google/cloud/spring/autoconfigure/trace/pubsub/TracingPublisher.java rename to spring-cloud-gcp-autoconfigure/src/main/java/com/google/cloud/spring/autoconfigure/trace/pubsub/TraceHelper.java index a6e0085fea..67373c16c2 100644 --- a/spring-cloud-gcp-autoconfigure/src/main/java/com/google/cloud/spring/autoconfigure/trace/pubsub/TracingPublisher.java +++ b/spring-cloud-gcp-autoconfigure/src/main/java/com/google/cloud/spring/autoconfigure/trace/pubsub/TraceHelper.java @@ -1,11 +1,11 @@ /* - * Copyright 2017-2020 the original author or authors. + * Copyright 2022-2022 Google LLC * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * - * https://www.apache.org/licenses/LICENSE-2.0 + * https://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -21,40 +21,30 @@ import brave.Span; import brave.propagation.TraceContext; import brave.propagation.TraceContextOrSamplingFlags; -import com.google.api.core.ApiFuture; -import com.google.cloud.pubsub.v1.PublisherInterface; import com.google.pubsub.v1.PubsubMessage; -final class TracingPublisher implements PublisherInterface { - private final PublisherInterface delegate; +class TraceHelper { private final PubSubTracing pubSubTracing; - private final String topic; - - TracingPublisher(PublisherInterface delegate, PubSubTracing pubSubTracing, String topic) { - this.delegate = delegate; + TraceHelper(PubSubTracing pubSubTracing) { this.pubSubTracing = pubSubTracing; - this.topic = topic; - } - - @Override - public ApiFuture publish(PubsubMessage message) { - PubsubMessage.Builder builder = message.toBuilder(); - postProcessMessageForPublishing(builder); - PubsubMessage tracedMessage = builder.build(); - return delegate.publish(tracedMessage); } - private void postProcessMessageForPublishing(PubsubMessage.Builder messageBuilder) { + /** + * Adds tracing headers to an outgoing Pub/Sub message. + * Uses the current application trace context; falls back to original message header context + * if not available. + * + * @param originalMessage message to instrument + * @param topic destination topic, used as channel name and {@link PubSubTags#PUBSUB_TOPIC_TAG}. + */ + PubsubMessage instrumentMessage(PubsubMessage originalMessage, String topic) { + PubsubMessage.Builder messageBuilder = PubsubMessage.newBuilder(originalMessage); PubSubProducerRequest request = new PubSubProducerRequest(messageBuilder, topic); TraceContext maybeParent = pubSubTracing.tracing.currentTraceContext().get(); - // Unlike message consumers, we try current span before trying extraction. This is the proper - // order because the span in scope should take precedence over a potentially stale header entry. - // - // NOTE: Brave instrumentation used properly does not result in stale header entries, as we - // always clear message headers after reading. + Span span; if (maybeParent == null) { TraceContextOrSamplingFlags extracted = @@ -83,5 +73,8 @@ private void postProcessMessageForPublishing(PubsubMessage.Builder messageBuilde // inject span context into the messageBuilder pubSubTracing.producerInjector.inject(span.context(), request); + + return messageBuilder.build(); } + } diff --git a/spring-cloud-gcp-autoconfigure/src/main/java/com/google/cloud/spring/autoconfigure/trace/pubsub/TracePubSubAutoConfiguration.java b/spring-cloud-gcp-autoconfigure/src/main/java/com/google/cloud/spring/autoconfigure/trace/pubsub/TracePubSubAutoConfiguration.java index 8fcdd7f10f..81b676ce53 100644 --- a/spring-cloud-gcp-autoconfigure/src/main/java/com/google/cloud/spring/autoconfigure/trace/pubsub/TracePubSubAutoConfiguration.java +++ b/spring-cloud-gcp-autoconfigure/src/main/java/com/google/cloud/spring/autoconfigure/trace/pubsub/TracePubSubAutoConfiguration.java @@ -18,9 +18,13 @@ import brave.Tracing; import brave.messaging.MessagingTracing; +import com.google.cloud.pubsub.v1.Publisher; +import com.google.cloud.spring.autoconfigure.pubsub.GcpPubSubAutoConfiguration; +import com.google.cloud.spring.pubsub.core.publisher.PublisherCustomizer; import com.google.cloud.spring.pubsub.support.PublisherFactory; import org.springframework.beans.factory.BeanFactory; import org.springframework.boot.autoconfigure.AutoConfigureAfter; +import org.springframework.boot.autoconfigure.AutoConfigureBefore; import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; @@ -30,6 +34,8 @@ import org.springframework.cloud.sleuth.brave.instrument.messaging.ConditionalOnMessagingEnabled; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; +import org.springframework.core.Ordered; +import org.springframework.core.annotation.Order; @Configuration(proxyBeanMethods = false) @ConditionalOnMessagingEnabled @@ -37,6 +43,7 @@ @ConditionalOnProperty(value = "spring.cloud.gcp.trace.pubsub.enabled", matchIfMissing = false) @ConditionalOnClass({PublisherFactory.class, MessagingTracing.class}) @AutoConfigureAfter({BraveAutoConfiguration.class, BraveMessagingAutoConfiguration.class}) +@AutoConfigureBefore(GcpPubSubAutoConfiguration.class) class TracePubSubAutoConfiguration { @Bean @@ -50,4 +57,14 @@ static TracePubSubBeanPostProcessor tracePubSubBeanPostProcessor(BeanFactory bea PubSubTracing pubSubTracing(MessagingTracing messagingTracing) { return PubSubTracing.newBuilder(messagingTracing).build(); } + + @Bean + @Order(Ordered.HIGHEST_PRECEDENCE) + PublisherCustomizer tracePublisherCustomizer(PubSubTracing pubSubTracing) { + TraceHelper helper = new TraceHelper(pubSubTracing); + + return (Publisher.Builder publisherBuilder, String topic) -> { + publisherBuilder.setTransform(msg -> helper.instrumentMessage(msg, topic)); + }; + } } diff --git a/spring-cloud-gcp-autoconfigure/src/main/java/com/google/cloud/spring/autoconfigure/trace/pubsub/TracePubSubBeanPostProcessor.java b/spring-cloud-gcp-autoconfigure/src/main/java/com/google/cloud/spring/autoconfigure/trace/pubsub/TracePubSubBeanPostProcessor.java index b5e7d740ba..1e8e9966d6 100644 --- a/spring-cloud-gcp-autoconfigure/src/main/java/com/google/cloud/spring/autoconfigure/trace/pubsub/TracePubSubBeanPostProcessor.java +++ b/spring-cloud-gcp-autoconfigure/src/main/java/com/google/cloud/spring/autoconfigure/trace/pubsub/TracePubSubBeanPostProcessor.java @@ -16,8 +16,6 @@ package com.google.cloud.spring.autoconfigure.trace.pubsub; -import com.google.cloud.spring.pubsub.support.CachingPublisherFactory; -import com.google.cloud.spring.pubsub.support.PublisherFactory; import com.google.cloud.spring.pubsub.support.SubscriberFactory; import org.springframework.beans.BeansException; import org.springframework.beans.factory.BeanFactory; @@ -35,10 +33,7 @@ class TracePubSubBeanPostProcessor implements BeanPostProcessor { @Override public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException { - if (bean instanceof PublisherFactory) { - return new CachingPublisherFactory( - new TracingPublisherFactory(pubSubTracing(), (PublisherFactory) bean)); - } else if (bean instanceof SubscriberFactory) { + if (bean instanceof SubscriberFactory) { return new TracingSubscriberFactory(pubSubTracing(), (SubscriberFactory) bean); } return bean; diff --git a/spring-cloud-gcp-autoconfigure/src/main/java/com/google/cloud/spring/autoconfigure/trace/pubsub/TracingPublisherFactory.java b/spring-cloud-gcp-autoconfigure/src/main/java/com/google/cloud/spring/autoconfigure/trace/pubsub/TracingPublisherFactory.java deleted file mode 100644 index d1ff7cb29f..0000000000 --- a/spring-cloud-gcp-autoconfigure/src/main/java/com/google/cloud/spring/autoconfigure/trace/pubsub/TracingPublisherFactory.java +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Copyright 2017-2020 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.google.cloud.spring.autoconfigure.trace.pubsub; - -import com.google.cloud.pubsub.v1.PublisherInterface; -import com.google.cloud.spring.pubsub.support.PublisherFactory; - -final class TracingPublisherFactory implements PublisherFactory { - private final PubSubTracing pubSubTracing; - - private final PublisherFactory publisherFactory; - - TracingPublisherFactory(PubSubTracing pubSubTracing, PublisherFactory publisherFactory) { - this.pubSubTracing = pubSubTracing; - this.publisherFactory = publisherFactory; - } - - @Override - public PublisherInterface createPublisher(String topic) { - return pubSubTracing.publisher(publisherFactory.createPublisher(topic), topic); - } -} diff --git a/spring-cloud-gcp-autoconfigure/src/test/java/com/google/cloud/spring/autoconfigure/pubsub/GcpPubSubAutoConfigurationTests.java b/spring-cloud-gcp-autoconfigure/src/test/java/com/google/cloud/spring/autoconfigure/pubsub/GcpPubSubAutoConfigurationTests.java index 937c6469d6..ed41c612a7 100644 --- a/spring-cloud-gcp-autoconfigure/src/test/java/com/google/cloud/spring/autoconfigure/pubsub/GcpPubSubAutoConfigurationTests.java +++ b/spring-cloud-gcp-autoconfigure/src/test/java/com/google/cloud/spring/autoconfigure/pubsub/GcpPubSubAutoConfigurationTests.java @@ -19,6 +19,7 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.Mockito.mock; +import com.google.api.gax.batching.BatchingSettings; import com.google.api.gax.batching.FlowControlSettings; import com.google.api.gax.batching.FlowController; import com.google.api.gax.core.CredentialsProvider; @@ -28,21 +29,38 @@ import com.google.api.gax.rpc.StatusCode.Code; import com.google.api.gax.rpc.TransportChannelProvider; import com.google.auth.Credentials; +import com.google.cloud.pubsub.v1.Publisher; import com.google.cloud.pubsub.v1.Subscriber; import com.google.cloud.spring.core.GcpProjectIdProvider; import com.google.cloud.spring.pubsub.core.PubSubConfiguration; +import com.google.cloud.spring.pubsub.core.publisher.PublisherCustomizer; +import com.google.cloud.spring.pubsub.support.CachingPublisherFactory; +import com.google.cloud.spring.pubsub.support.DefaultPublisherFactory; import com.google.cloud.spring.pubsub.support.DefaultSubscriberFactory; +import com.google.cloud.spring.pubsub.support.PublisherFactory; +import java.util.List; import org.apache.commons.lang3.reflect.FieldUtils; import org.junit.jupiter.api.Test; import org.springframework.boot.autoconfigure.AutoConfigurations; import org.springframework.boot.test.context.runner.ApplicationContextRunner; import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.core.Ordered; +import org.springframework.core.annotation.Order; import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; import org.threeten.bp.Duration; /** Tests for Pub/Sub autoconfiguration. */ class GcpPubSubAutoConfigurationTests { + ApplicationContextRunner baseContextRunner = new ApplicationContextRunner() + .withConfiguration(AutoConfigurations.of(GcpPubSubAutoConfiguration.class)) + .withUserConfiguration(TestConfig.class); + + static final BatchingSettings TEST_BATCHING_SETTINGS = BatchingSettings.newBuilder() + .setDelayThreshold(Duration.ofSeconds(11)) + .build(); + @Test void keepAliveValue_default() { ApplicationContextRunner contextRunner = @@ -1285,6 +1303,66 @@ void createSubscriberStub_flowControlSettings_noPropertiesSet() { }); } + @Test + void createPublisherWithCustomizer() { + + baseContextRunner + .withUserConfiguration(CustomizerConfig.class) + .run(ctx -> { + PublisherFactory factory = + ctx.getBean("defaultPublisherFactory", PublisherFactory.class); + + DefaultPublisherFactory defaultFactory = + (DefaultPublisherFactory) ((CachingPublisherFactory) factory).getDelegate(); + List customizers = + (List) FieldUtils.readField(defaultFactory, "customizers", true); + assertThat(customizers) + .hasSize(3); + assertThat(customizers.get(0)).isInstanceOf(NoopCustomizer.class); + assertThat(customizers.get(1)).isInstanceOf(NoopCustomizer.class); + // DefaultPublisherFactory applies highest priority last + assertThat(customizers.get(2)).isInstanceOf(BatchingSettingsCustomizer.class); + + Publisher testPublisher = factory.createPublisher("unused"); + assertThat(testPublisher.getBatchingSettings()).isSameAs(TEST_BATCHING_SETTINGS); + }); + } + + @Configuration + static class CustomizerConfig { + @Bean + NoopCustomizer noop1() { + return new NoopCustomizer(); + } + + @Order(Ordered.HIGHEST_PRECEDENCE) + @Bean + BatchingSettingsCustomizer batchingSettingsCustomizer() { + return new BatchingSettingsCustomizer(); + } + + @Bean + NoopCustomizer noop2() { + return new NoopCustomizer(); + } + + } + + static class NoopCustomizer implements PublisherCustomizer { + + @Override + public void apply(Publisher.Builder publisherBuilder, String topic) { + // do nothing + } + } + + static class BatchingSettingsCustomizer implements PublisherCustomizer { + @Override + public void apply(Publisher.Builder publisherBuilder, String topic) { + publisherBuilder.setBatchingSettings(TEST_BATCHING_SETTINGS); + } + } + static class TestConfig { @Bean diff --git a/spring-cloud-gcp-autoconfigure/src/test/java/com/google/cloud/spring/autoconfigure/trace/pubsub/TracingPublisherTest.java b/spring-cloud-gcp-autoconfigure/src/test/java/com/google/cloud/spring/autoconfigure/trace/pubsub/TraceHelperTests.java similarity index 57% rename from spring-cloud-gcp-autoconfigure/src/test/java/com/google/cloud/spring/autoconfigure/trace/pubsub/TracingPublisherTest.java rename to spring-cloud-gcp-autoconfigure/src/test/java/com/google/cloud/spring/autoconfigure/trace/pubsub/TraceHelperTests.java index 9853b9d7c0..648d449d49 100644 --- a/spring-cloud-gcp-autoconfigure/src/test/java/com/google/cloud/spring/autoconfigure/trace/pubsub/TracingPublisherTest.java +++ b/spring-cloud-gcp-autoconfigure/src/test/java/com/google/cloud/spring/autoconfigure/trace/pubsub/TraceHelperTests.java @@ -1,11 +1,11 @@ /* - * Copyright 2017-2020 the original author or authors. + * Copyright 2022-2022 Google LLC * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * - * https://www.apache.org/licenses/LICENSE-2.0 + * https://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -18,96 +18,81 @@ import static brave.Span.Kind.PRODUCER; import static org.assertj.core.api.Assertions.assertThat; -import static org.assertj.core.api.Assertions.entry; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; +import static org.assertj.core.data.MapEntry.entry; import brave.handler.MutableSpan; import brave.propagation.CurrentTraceContext.Scope; -import com.google.cloud.pubsub.v1.PublisherInterface; import com.google.pubsub.v1.PubsubMessage; import org.junit.jupiter.api.Test; -import org.mockito.ArgumentCaptor; -class TracingPublisherTest extends PubSubTestBase { - - PublisherInterface mockPublisher = mock(PublisherInterface.class); - - TracingPublisher tracingPublisher = pubSubTracing.publisher(mockPublisher, TEST_TOPIC); - - ArgumentCaptor messageCaptor = ArgumentCaptor.forClass(PubsubMessage.class); +class TraceHelperTests extends PubSubTestBase { @Test void should_add_b3_headers_to_messages() { - tracingPublisher.publish(producerMessage.build()); + TraceHelper traceHelper = new TraceHelper(pubSubTracing); - when(mockPublisher.publish(any())).thenReturn(null); + PubsubMessage instrumentedMessage = + traceHelper.instrumentMessage(producerMessage.build(), TEST_TOPIC); - verify(mockPublisher).publish(messageCaptor.capture()); - assertThat(messageCaptor.getValue().getAttributesOrThrow("b3")).isNotNull(); - assertThat(messageCaptor.getValue().getAttributesCount()).isEqualTo(1); + assertThat(instrumentedMessage.getAttributesOrThrow("b3")).isNotNull(); + assertThat(instrumentedMessage.getAttributesCount()).isEqualTo(1); } @Test void should_add_b3_headers_when_other_headers_exist() { PubsubMessage.Builder message = producerMessage.putAttributes("tx-id", "1"); - tracingPublisher.publish(message.build()); - - verify(mockPublisher).publish(messageCaptor.capture()); + TraceHelper traceHelper = new TraceHelper(pubSubTracing); + PubsubMessage instrumentedMessage = traceHelper.instrumentMessage(message.build(), TEST_TOPIC); MutableSpan producerSpan = spans.get(0); assertThat(producerSpan.kind()).isEqualTo(PRODUCER); - assertThat(messageCaptor.getValue().getAttributesMap()) + assertThat(instrumentedMessage.getAttributesMap()) .containsEntry("tx-id", "1") .containsEntry("b3", producerSpan.traceId() + "-" + producerSpan.id() + "-1"); } @Test void should_inject_child_context() { + + TraceHelper traceHelper = new TraceHelper(pubSubTracing); + PubsubMessage instrumentedMessage; try (Scope scope = currentTraceContext.newScope(parent)) { - tracingPublisher.publish(producerMessage.build()); + instrumentedMessage = + traceHelper.instrumentMessage(producerMessage.build(), TEST_TOPIC); } - verify(mockPublisher).publish(messageCaptor.capture()); - MutableSpan producerSpan = spans.get(0); assertThat(producerSpan.kind()).isEqualTo(PRODUCER); assertChildOf(producerSpan, parent); - assertThat(messageCaptor.getValue().getAttributesMap()) + assertThat(instrumentedMessage.getAttributesMap()) + .isNotNull() .containsEntry("b3", producerSpan.traceId() + "-" + producerSpan.id() + "-1"); } @Test void should_add_parent_trace_when_context_injected_on_headers() { PubsubMessage.Builder message = producerMessage.putAttributes("tx-id", "1"); + TraceHelper traceHelper = new TraceHelper(pubSubTracing); pubSubTracing.producerInjector.inject(parent, new PubSubProducerRequest(message, "myTopic")); - tracingPublisher.publish(message.build()); - - verify(mockPublisher).publish(messageCaptor.capture()); + PubsubMessage instrumentedMessage = + traceHelper.instrumentMessage(message.build(), TEST_TOPIC); MutableSpan producerSpan = spans.get(0); assertThat(producerSpan.kind()).isEqualTo(PRODUCER); assertChildOf(producerSpan, parent); - assertThat(messageCaptor.getValue().getAttributesMap()) + assertThat(instrumentedMessage.getAttributesMap()) .containsEntry("b3", producerSpan.traceId() + "-" + producerSpan.id() + "-1"); } - @Test - void should_call_wrapped_producer() { - PubsubMessage message = producerMessage.build(); - tracingPublisher.publish(message); - verify(mockPublisher, times(1)).publish(any()); - } - @Test void send_should_set_name() { - tracingPublisher.publish(producerMessage.build()); + TraceHelper traceHelper = new TraceHelper(pubSubTracing); + + PubsubMessage instrumentedMessage = + traceHelper.instrumentMessage(producerMessage.build(), TEST_TOPIC); MutableSpan producerSpan = spans.get(0); assertThat(producerSpan.kind()).isEqualTo(PRODUCER); @@ -116,7 +101,10 @@ void send_should_set_name() { @Test void send_should_tag_topic() { - tracingPublisher.publish(producerMessage.build()); + TraceHelper traceHelper = new TraceHelper(pubSubTracing); + + PubsubMessage instrumentedMessage = + traceHelper.instrumentMessage(producerMessage.build(), TEST_TOPIC); MutableSpan producerSpan = spans.get(0); assertThat(producerSpan.kind()).isEqualTo(PRODUCER); @@ -125,11 +113,15 @@ void send_should_tag_topic() { @Test void send_shouldnt_tag_null_topic() { - TracingPublisher tracingPublisher = pubSubTracing.publisher(mockPublisher, null); - tracingPublisher.publish(producerMessage.build()); + TraceHelper traceHelper = new TraceHelper(pubSubTracing); + + PubsubMessage instrumentedMessage = + traceHelper.instrumentMessage(producerMessage.build(), null); MutableSpan producerSpan = spans.get(0); assertThat(producerSpan.kind()).isEqualTo(PRODUCER); assertThat(producerSpan.tags()).isEmpty(); } + + } diff --git a/spring-cloud-gcp-autoconfigure/src/test/java/com/google/cloud/spring/autoconfigure/trace/pubsub/TracePubSubAutoConfigurationTest.java b/spring-cloud-gcp-autoconfigure/src/test/java/com/google/cloud/spring/autoconfigure/trace/pubsub/TracePubSubAutoConfigurationTest.java index a10836bb71..8955eac4e2 100644 --- a/spring-cloud-gcp-autoconfigure/src/test/java/com/google/cloud/spring/autoconfigure/trace/pubsub/TracePubSubAutoConfigurationTest.java +++ b/spring-cloud-gcp-autoconfigure/src/test/java/com/google/cloud/spring/autoconfigure/trace/pubsub/TracePubSubAutoConfigurationTest.java @@ -16,21 +16,33 @@ package com.google.cloud.spring.autoconfigure.trace.pubsub; +import static com.google.cloud.spring.autoconfigure.trace.StackdriverTraceAutoConfiguration.REPORTER_BEAN_NAME; import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; import brave.handler.SpanHandler; import brave.http.HttpRequestParser; import brave.http.HttpTracingCustomizer; +import com.google.api.core.ApiFunction; +import com.google.cloud.pubsub.v1.Publisher; import com.google.cloud.spring.autoconfigure.core.GcpContextAutoConfiguration; import com.google.cloud.spring.autoconfigure.trace.MockConfiguration; import com.google.cloud.spring.autoconfigure.trace.StackdriverTraceAutoConfiguration; +import com.google.cloud.spring.pubsub.core.publisher.PublisherCustomizer; import io.grpc.ManagedChannel; +import java.util.List; +import java.util.stream.Collectors; import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.ObjectProvider; import org.springframework.boot.autoconfigure.AutoConfigurations; import org.springframework.boot.test.context.runner.ApplicationContextRunner; import org.springframework.cloud.autoconfigure.RefreshAutoConfiguration; import org.springframework.cloud.sleuth.autoconfig.brave.BraveAutoConfiguration; import org.springframework.cloud.sleuth.autoconfig.brave.instrument.messaging.BraveMessagingAutoConfiguration; +import zipkin2.reporter.Reporter; import zipkin2.reporter.Sender; /** Tests for Trace Pub/Sub auto-config. */ @@ -51,6 +63,8 @@ class TracePubSubAutoConfigurationTest { StackdriverTraceAutoConfiguration.SPAN_HANDLER_BEAN_NAME, SpanHandler.class, () -> SpanHandler.NOOP) + // Prevent healthcheck from triggering a real call to Trace. + .withBean(REPORTER_BEAN_NAME, Reporter.class, () -> mock(Reporter.class)) .withPropertyValues( "spring.cloud.gcp.project-id=proj", "spring.sleuth.sampler.probability=1.0"); @@ -86,4 +100,28 @@ void testPubSubTracingEnabled() { assertThat(context.getBean(PubSubTracing.class)).isNotNull(); }); } + + @Test + void tracePubSubCustomizerAppliedLast() { + PublisherCustomizer noopCustomizer = (pb, t) -> {}; + this.contextRunner + .withPropertyValues("spring.cloud.gcp.trace.pubsub.enabled=true") + .withBean(PublisherCustomizer.class, () -> noopCustomizer) + .run(context -> { + ObjectProvider customizersProvider = + context.getBeanProvider(PublisherCustomizer.class); + List customizers = + customizersProvider.orderedStream().collect(Collectors.toList()); + assertThat(customizers).hasSize(2); + + // Object provider lists highest priority first, so default priority `noopCustomizer` + // will be second + assertThat(customizers.get(1)).isSameAs(noopCustomizer); + + PublisherCustomizer traceCustomizer = customizers.get(0); + Publisher.Builder spyBuilder = spy(Publisher.newBuilder("test")); + traceCustomizer.apply(spyBuilder, "test"); + verify(spyBuilder).setTransform(any(ApiFunction.class)); + }); + } } diff --git a/spring-cloud-gcp-autoconfigure/src/test/java/com/google/cloud/spring/autoconfigure/trace/pubsub/TracePubSubBeanPostProcessorTest.java b/spring-cloud-gcp-autoconfigure/src/test/java/com/google/cloud/spring/autoconfigure/trace/pubsub/TracePubSubBeanPostProcessorTest.java index 7a98865c4a..c99bfeebfa 100644 --- a/spring-cloud-gcp-autoconfigure/src/test/java/com/google/cloud/spring/autoconfigure/trace/pubsub/TracePubSubBeanPostProcessorTest.java +++ b/spring-cloud-gcp-autoconfigure/src/test/java/com/google/cloud/spring/autoconfigure/trace/pubsub/TracePubSubBeanPostProcessorTest.java @@ -23,8 +23,6 @@ import static org.mockito.Mockito.when; import com.google.cloud.spring.pubsub.core.PubSubTemplate; -import com.google.cloud.spring.pubsub.support.CachingPublisherFactory; -import com.google.cloud.spring.pubsub.support.PublisherFactory; import com.google.cloud.spring.pubsub.support.SubscriberFactory; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.BeanFactory; @@ -38,19 +36,6 @@ final class TracePubSubBeanPostProcessorTest { TracePubSubBeanPostProcessor tracePubSubBeanPostProcessor = new TracePubSubBeanPostProcessor(mockBeanFactory); - @Test - void test_postProcessBeforeInitialization_PublisherFactory() { - PublisherFactory mockPublisherFactory = mock(PublisherFactory.class); - - Object result = - tracePubSubBeanPostProcessor.postProcessBeforeInitialization( - mockPublisherFactory, "publisherFactory"); - - assertThat(result).isInstanceOf(CachingPublisherFactory.class); - assertThat(((CachingPublisherFactory) result).getDelegate()) - .isInstanceOf(TracingPublisherFactory.class); - } - @Test void test_postProcessBeforeInitialization_SubscriberFactory() { SubscriberFactory mockSubscriberFactory = mock(SubscriberFactory.class); diff --git a/spring-cloud-gcp-pubsub/src/main/java/com/google/cloud/spring/pubsub/core/publisher/PublisherCustomizer.java b/spring-cloud-gcp-pubsub/src/main/java/com/google/cloud/spring/pubsub/core/publisher/PublisherCustomizer.java new file mode 100644 index 0000000000..6cd2bc813c --- /dev/null +++ b/spring-cloud-gcp-pubsub/src/main/java/com/google/cloud/spring/pubsub/core/publisher/PublisherCustomizer.java @@ -0,0 +1,27 @@ +/* + * Copyright 2022-2022 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.spring.pubsub.core.publisher; + +import com.google.cloud.pubsub.v1.Publisher; + +/** + * A customizer of {@link Publisher.Builder} objects. + * Can be implemented as a lambda accepting a {@link Publisher.Builder} and a `String` topic. + */ +public interface PublisherCustomizer { + public void apply(Publisher.Builder publisherBuilder, String topic); +} diff --git a/spring-cloud-gcp-pubsub/src/main/java/com/google/cloud/spring/pubsub/support/CachingPublisherFactory.java b/spring-cloud-gcp-pubsub/src/main/java/com/google/cloud/spring/pubsub/support/CachingPublisherFactory.java index dce32ec557..42f267d1c3 100644 --- a/spring-cloud-gcp-pubsub/src/main/java/com/google/cloud/spring/pubsub/support/CachingPublisherFactory.java +++ b/spring-cloud-gcp-pubsub/src/main/java/com/google/cloud/spring/pubsub/support/CachingPublisherFactory.java @@ -17,7 +17,6 @@ package com.google.cloud.spring.pubsub.support; import com.google.cloud.pubsub.v1.Publisher; -import com.google.cloud.pubsub.v1.PublisherInterface; import java.util.concurrent.ConcurrentHashMap; /** @@ -27,7 +26,7 @@ */ public class CachingPublisherFactory implements PublisherFactory { /** {@link Publisher} cache, enforces only one {@link Publisher} per Pub/Sub topic exists. */ - private final ConcurrentHashMap publishers = + private final ConcurrentHashMap publishers = new ConcurrentHashMap<>(); private PublisherFactory delegate; @@ -42,14 +41,14 @@ public CachingPublisherFactory(PublisherFactory delegate) { } @Override - public PublisherInterface createPublisher(String topic) { + public Publisher createPublisher(String topic) { return this.publishers.computeIfAbsent(topic, delegate::createPublisher); } /** * Returns the delegate. * - * @return the delgate. + * @return the delegate. */ public PublisherFactory getDelegate() { return delegate; diff --git a/spring-cloud-gcp-pubsub/src/main/java/com/google/cloud/spring/pubsub/support/DefaultPublisherFactory.java b/spring-cloud-gcp-pubsub/src/main/java/com/google/cloud/spring/pubsub/support/DefaultPublisherFactory.java index a7fafdf152..5b5c1d62fb 100644 --- a/spring-cloud-gcp-pubsub/src/main/java/com/google/cloud/spring/pubsub/support/DefaultPublisherFactory.java +++ b/spring-cloud-gcp-pubsub/src/main/java/com/google/cloud/spring/pubsub/support/DefaultPublisherFactory.java @@ -25,7 +25,10 @@ import com.google.cloud.pubsub.v1.Publisher; import com.google.cloud.spring.core.GcpProjectIdProvider; import com.google.cloud.spring.pubsub.core.PubSubException; +import com.google.cloud.spring.pubsub.core.publisher.PublisherCustomizer; import java.io.IOException; +import java.util.Collections; +import java.util.List; import org.springframework.util.Assert; /** @@ -53,6 +56,8 @@ public class DefaultPublisherFactory implements PublisherFactory { private String endpoint; + private List customizers; + /** * Create {@link DefaultPublisherFactory} instance based on the provided {@link * GcpProjectIdProvider}. @@ -146,6 +151,29 @@ public void setEndpoint(String endpoint) { this.endpoint = endpoint; } + /** + * Accepts a list of {@link Publisher.Builder} customizers. + * The customizers are applied in the order provided, so the later customizers can override + * any settings provided by the earlier ones. + */ + public void setCustomizers(List customizers) { + Assert.notNull(customizers, "Non-null customizers expected"); + this.customizers = Collections.unmodifiableList(customizers); + } + + /** + * Creates a {@link Publisher} for a given topic. + * + *

Configuration precedence: + *
    + *
  1. modifications applied by the factory customizers + *
  2. {@code spring.cloud.gcp.pubsub.publisher} configuration options + *
  3. client library defaults + *
+ * + * @param topic destination topic + * @return fully configured publisher + */ @Override public Publisher createPublisher(String topic) { try { @@ -153,6 +181,7 @@ public Publisher createPublisher(String topic) { Publisher.newBuilder(PubSubTopicUtils.toTopicName(topic, this.projectId)); applyPublisherSettings(publisherBuilder); + applyCustomizers(publisherBuilder, topic); return publisherBuilder.build(); } catch (IOException ioe) { @@ -194,4 +223,14 @@ void applyPublisherSettings(Publisher.Builder publisherBuilder) { publisherBuilder.setEndpoint(this.endpoint); } } + + void applyCustomizers(Publisher.Builder publisherBuilder, String topic) { + if (this.customizers == null) { + return; + } + + for (PublisherCustomizer customizer : this.customizers) { + customizer.apply(publisherBuilder, topic); + } + } } diff --git a/spring-cloud-gcp-pubsub/src/main/java/com/google/cloud/spring/pubsub/support/PublisherFactory.java b/spring-cloud-gcp-pubsub/src/main/java/com/google/cloud/spring/pubsub/support/PublisherFactory.java index 5de1e64631..b5f623f04d 100644 --- a/spring-cloud-gcp-pubsub/src/main/java/com/google/cloud/spring/pubsub/support/PublisherFactory.java +++ b/spring-cloud-gcp-pubsub/src/main/java/com/google/cloud/spring/pubsub/support/PublisherFactory.java @@ -16,10 +16,17 @@ package com.google.cloud.spring.pubsub.support; -import com.google.cloud.pubsub.v1.PublisherInterface; +import com.google.cloud.pubsub.v1.Publisher; /** The publisher factory interface that can create publishers. */ public interface PublisherFactory { - PublisherInterface createPublisher(String topic); + /** + * Creates a {@link Publisher} for a given topic. + * + * @param topic destination topic + * @return fully configured publisher + */ + Publisher createPublisher(String topic); + } diff --git a/spring-cloud-gcp-pubsub/src/test/java/com/google/cloud/spring/pubsub/support/DefaultPublisherFactoryTests.java b/spring-cloud-gcp-pubsub/src/test/java/com/google/cloud/spring/pubsub/support/DefaultPublisherFactoryTests.java index 85197a1a43..bdbab069fa 100644 --- a/spring-cloud-gcp-pubsub/src/test/java/com/google/cloud/spring/pubsub/support/DefaultPublisherFactoryTests.java +++ b/spring-cloud-gcp-pubsub/src/test/java/com/google/cloud/spring/pubsub/support/DefaultPublisherFactoryTests.java @@ -17,30 +17,45 @@ package com.google.cloud.spring.pubsub.support; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; -import com.google.api.gax.core.CredentialsProvider; +import com.google.api.gax.core.NoCredentialsProvider; +import com.google.api.gax.rpc.ApiCallContext; +import com.google.api.gax.rpc.TransportChannel; +import com.google.api.gax.rpc.TransportChannelProvider; import com.google.cloud.pubsub.v1.Publisher; +import com.google.cloud.spring.pubsub.core.publisher.PublisherCustomizer; import com.google.pubsub.v1.ProjectTopicName; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.ExpectedException; -import org.junit.runner.RunWith; -import org.mockito.Mock; -import org.mockito.junit.MockitoJUnitRunner; +import java.io.IOException; +import java.util.Arrays; +import java.util.concurrent.atomic.AtomicInteger; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; /** Tests for the publisher factory. */ -@RunWith(MockitoJUnitRunner.class) -public class DefaultPublisherFactoryTests { +class DefaultPublisherFactoryTests { - /** used to test exception messages and types. */ - @Rule public ExpectedException expectedException = ExpectedException.none(); + DefaultPublisherFactory factory; - @Mock private CredentialsProvider credentialsProvider; + @BeforeEach + public void setUp() throws IOException { + factory = new DefaultPublisherFactory(() -> "projectId"); + factory.setCredentialsProvider(NoCredentialsProvider.create()); + TransportChannelProvider mockChannelProvider = mock(TransportChannelProvider.class); + TransportChannel mockTransportChannel = mock(TransportChannel.class); + when(mockChannelProvider.getTransportChannel()).thenReturn(mockTransportChannel); + ApiCallContext mockContext = mock(ApiCallContext.class); + when(mockTransportChannel.getEmptyCallContext()).thenReturn(mockContext); + when(mockContext.withTransportChannel(any())).thenReturn(mockContext); + factory.setChannelProvider(mockChannelProvider); + } @Test - public void testGetPublisher() { - DefaultPublisherFactory factory = new DefaultPublisherFactory(() -> "projectId"); - factory.setCredentialsProvider(this.credentialsProvider); + void testGetPublisher() { + Publisher publisher = factory.createPublisher("testTopic"); assertThat(((ProjectTopicName) publisher.getTopicName()).getTopic()).isEqualTo("testTopic"); @@ -48,16 +63,55 @@ public void testGetPublisher() { } @Test - public void testNewDefaultPublisherFactory_nullProjectIdProvider() { - this.expectedException.expect(IllegalArgumentException.class); - this.expectedException.expectMessage("The project ID provider can't be null."); - new DefaultPublisherFactory(null); + void testNewDefaultPublisherFactory_nullProjectIdProvider() { + assertThatThrownBy(() -> new DefaultPublisherFactory(null)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("The project ID provider can't be null."); + } + + @Test + void testNewDefaultPublisherFactory_nullProjectId() { + + assertThatThrownBy(() -> new DefaultPublisherFactory(() -> null)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("The project ID can't be null or empty."); + } + + @Test + void createPublisherUsesCustomizersInOrder() { + final AtomicInteger counter = new AtomicInteger(1); + + PublisherCustomizer c1 = (pb, t) -> { + assertThat(counter.getAndIncrement()).isEqualTo(1); + }; + PublisherCustomizer c2 = (pb, t) -> { + assertThat(counter.getAndIncrement()).isEqualTo(2); + }; + PublisherCustomizer c3 = (pb, t) -> { + assertThat(counter.getAndIncrement()).isEqualTo(3); + }; + + factory.setCustomizers(Arrays.asList(c1, c2, c3)); + factory.createPublisher("testtopic"); + + assertThat(counter).hasValue(4); + } + + @Test + void createPublisherWithoutCustomizersWorksFine() throws Exception { + + Publisher publisher = factory.createPublisher("testtopic"); + + Publisher defaultPublisher = Publisher.newBuilder("testtopic") + .setCredentialsProvider(NoCredentialsProvider.create()) + .build(); + assertThat(publisher.getBatchingSettings()).isSameAs(defaultPublisher.getBatchingSettings()); } @Test - public void testNewDefaultPublisherFactory_nullProjectId() { - this.expectedException.expect(IllegalArgumentException.class); - this.expectedException.expectMessage("The project ID can't be null or empty."); - new DefaultPublisherFactory(() -> null); + void createPublisherWithExplicitNullCustomizersFails() { + assertThatThrownBy(() -> factory.setCustomizers(null)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Non-null customizers expected"); } }