Skip to content

Commit

Permalink
Replace PublisherInterface with Publisher (#900)
Browse files Browse the repository at this point in the history
After the discussion in googleapis/java-pubsub#949, reverting back to using `Publisher` for all API return types. I've ported the trace header injection functionality from `TracingPublisher` into `TracingPublisherFactory`.

The Trace information will now get into a standard `Publisher` through a builder callback `PubsubMessage.Builder.setTransform()`.
  • Loading branch information
elefeint authored Jan 26, 2022
1 parent 6e8a98d commit 07d7500
Show file tree
Hide file tree
Showing 16 changed files with 360 additions and 163 deletions.
4 changes: 4 additions & 0 deletions docs/src/main/asciidoc/pubsub.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,10 @@ MaxRpcTimeout puts a limit on the value of the RPC timeout, so that the RpcTimeo
can't increase the RPC timeout higher than this amount. | No | 0
|===

==== Programmatic Configuration
To apply publishing customizations not covered by the properties above, you may provide custom beans of type `PublisherCustomizer` to post-process the `Publisher.Builder` object right before it is built into a `Publisher`.
The `PublisherCustomizer` beans may be annotated with Spring Framework's `@Order` annotation to ensure they are applied in a particular sequence.

=== Spring Boot Actuator Support

==== Cloud Pub/Sub Health Indicator
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import com.google.cloud.spring.pubsub.core.PubSubTemplate;
import com.google.cloud.spring.pubsub.core.health.HealthTrackerRegistry;
import com.google.cloud.spring.pubsub.core.publisher.PubSubPublisherTemplate;
import com.google.cloud.spring.pubsub.core.publisher.PublisherCustomizer;
import com.google.cloud.spring.pubsub.core.subscriber.PubSubSubscriberTemplate;
import com.google.cloud.spring.pubsub.support.CachingPublisherFactory;
import com.google.cloud.spring.pubsub.support.DefaultPublisherFactory;
Expand All @@ -53,11 +54,14 @@
import com.google.cloud.spring.pubsub.support.converter.PubSubMessageConverter;
import com.google.pubsub.v1.ProjectSubscriptionName;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import javax.annotation.PostConstruct;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -347,7 +351,8 @@ public PublisherFactory defaultPublisherFactory(
@Qualifier("publisherBatchSettings") ObjectProvider<BatchingSettings> batchingSettings,
@Qualifier("publisherRetrySettings") ObjectProvider<RetrySettings> retrySettings,
@Qualifier("publisherTransportChannelProvider")
TransportChannelProvider publisherTransportChannelProvider) {
TransportChannelProvider publisherTransportChannelProvider,
ObjectProvider<PublisherCustomizer> customizersProvider) {
DefaultPublisherFactory factory = new DefaultPublisherFactory(this.finalProjectIdProvider);
factory.setExecutorProvider(executorProvider);
factory.setCredentialsProvider(this.finalCredentialsProvider);
Expand All @@ -357,6 +362,12 @@ public PublisherFactory defaultPublisherFactory(
batchingSettings.ifAvailable(factory::setBatchingSettings);
factory.setEnableMessageOrdering(gcpPubSubProperties.getPublisher().getEnableMessageOrdering());
factory.setEndpoint(gcpPubSubProperties.getPublisher().getEndpoint());

List<PublisherCustomizer> customizers = customizersProvider.orderedStream()
.collect(Collectors.toList());
Collections.reverse(customizers); // highest priority customizer needs to be last
factory.setCustomizers(customizers);

return new CachingPublisherFactory(factory);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import brave.propagation.TraceContextOrSamplingFlags;
import brave.sampler.SamplerFunction;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.PublisherInterface;
import com.google.cloud.pubsub.v1.stub.SubscriberStub;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.PullResponse;
Expand Down Expand Up @@ -116,11 +115,6 @@ public static Builder newBuilder(MessagingTracing messagingTracing) {
return new Builder(messagingTracing);
}

/** Creates an instrumented {@linkplain PublisherInterface}. */
public TracingPublisher publisher(PublisherInterface publisher, String topic) {
return new TracingPublisher(publisher, this, topic);
}

/** Creates an instrumented {@linkplain SubscriberStub} for use in message pulling scenario. */
public TracingSubscriberStub subscriberStub(SubscriberStub subscriberStub) {
return new TracingSubscriberStub(subscriberStub, this);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
/*
* Copyright 2017-2020 the original author or authors.
* Copyright 2022-2022 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
Expand All @@ -21,40 +21,30 @@
import brave.Span;
import brave.propagation.TraceContext;
import brave.propagation.TraceContextOrSamplingFlags;
import com.google.api.core.ApiFuture;
import com.google.cloud.pubsub.v1.PublisherInterface;
import com.google.pubsub.v1.PubsubMessage;

final class TracingPublisher implements PublisherInterface {
private final PublisherInterface delegate;
class TraceHelper {

private final PubSubTracing pubSubTracing;

private final String topic;

TracingPublisher(PublisherInterface delegate, PubSubTracing pubSubTracing, String topic) {
this.delegate = delegate;
TraceHelper(PubSubTracing pubSubTracing) {
this.pubSubTracing = pubSubTracing;
this.topic = topic;
}

@Override
public ApiFuture<String> publish(PubsubMessage message) {
PubsubMessage.Builder builder = message.toBuilder();
postProcessMessageForPublishing(builder);
PubsubMessage tracedMessage = builder.build();
return delegate.publish(tracedMessage);
}

private void postProcessMessageForPublishing(PubsubMessage.Builder messageBuilder) {
/**
* Adds tracing headers to an outgoing Pub/Sub message.
* Uses the current application trace context; falls back to original message header context
* if not available.
*
* @param originalMessage message to instrument
* @param topic destination topic, used as channel name and {@link PubSubTags#PUBSUB_TOPIC_TAG}.
*/
PubsubMessage instrumentMessage(PubsubMessage originalMessage, String topic) {
PubsubMessage.Builder messageBuilder = PubsubMessage.newBuilder(originalMessage);
PubSubProducerRequest request = new PubSubProducerRequest(messageBuilder, topic);

TraceContext maybeParent = pubSubTracing.tracing.currentTraceContext().get();
// Unlike message consumers, we try current span before trying extraction. This is the proper
// order because the span in scope should take precedence over a potentially stale header entry.
//
// NOTE: Brave instrumentation used properly does not result in stale header entries, as we
// always clear message headers after reading.

Span span;
if (maybeParent == null) {
TraceContextOrSamplingFlags extracted =
Expand Down Expand Up @@ -83,5 +73,8 @@ private void postProcessMessageForPublishing(PubsubMessage.Builder messageBuilde

// inject span context into the messageBuilder
pubSubTracing.producerInjector.inject(span.context(), request);

return messageBuilder.build();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,13 @@

import brave.Tracing;
import brave.messaging.MessagingTracing;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.cloud.spring.autoconfigure.pubsub.GcpPubSubAutoConfiguration;
import com.google.cloud.spring.pubsub.core.publisher.PublisherCustomizer;
import com.google.cloud.spring.pubsub.support.PublisherFactory;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.boot.autoconfigure.AutoConfigureBefore;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
Expand All @@ -30,13 +34,16 @@
import org.springframework.cloud.sleuth.brave.instrument.messaging.ConditionalOnMessagingEnabled;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.Ordered;
import org.springframework.core.annotation.Order;

@Configuration(proxyBeanMethods = false)
@ConditionalOnMessagingEnabled
@ConditionalOnBean(Tracing.class)
@ConditionalOnProperty(value = "spring.cloud.gcp.trace.pubsub.enabled", matchIfMissing = false)
@ConditionalOnClass({PublisherFactory.class, MessagingTracing.class})
@AutoConfigureAfter({BraveAutoConfiguration.class, BraveMessagingAutoConfiguration.class})
@AutoConfigureBefore(GcpPubSubAutoConfiguration.class)
class TracePubSubAutoConfiguration {

@Bean
Expand All @@ -50,4 +57,14 @@ static TracePubSubBeanPostProcessor tracePubSubBeanPostProcessor(BeanFactory bea
PubSubTracing pubSubTracing(MessagingTracing messagingTracing) {
return PubSubTracing.newBuilder(messagingTracing).build();
}

@Bean
@Order(Ordered.HIGHEST_PRECEDENCE)
PublisherCustomizer tracePublisherCustomizer(PubSubTracing pubSubTracing) {
TraceHelper helper = new TraceHelper(pubSubTracing);

return (Publisher.Builder publisherBuilder, String topic) -> {
publisherBuilder.setTransform(msg -> helper.instrumentMessage(msg, topic));
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@

package com.google.cloud.spring.autoconfigure.trace.pubsub;

import com.google.cloud.spring.pubsub.support.CachingPublisherFactory;
import com.google.cloud.spring.pubsub.support.PublisherFactory;
import com.google.cloud.spring.pubsub.support.SubscriberFactory;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.BeanFactory;
Expand All @@ -35,10 +33,7 @@ class TracePubSubBeanPostProcessor implements BeanPostProcessor {
@Override
public Object postProcessBeforeInitialization(Object bean, String beanName)
throws BeansException {
if (bean instanceof PublisherFactory) {
return new CachingPublisherFactory(
new TracingPublisherFactory(pubSubTracing(), (PublisherFactory) bean));
} else if (bean instanceof SubscriberFactory) {
if (bean instanceof SubscriberFactory) {
return new TracingSubscriberFactory(pubSubTracing(), (SubscriberFactory) bean);
}
return bean;
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock;

import com.google.api.gax.batching.BatchingSettings;
import com.google.api.gax.batching.FlowControlSettings;
import com.google.api.gax.batching.FlowController;
import com.google.api.gax.core.CredentialsProvider;
Expand All @@ -28,21 +29,38 @@
import com.google.api.gax.rpc.StatusCode.Code;
import com.google.api.gax.rpc.TransportChannelProvider;
import com.google.auth.Credentials;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.cloud.spring.core.GcpProjectIdProvider;
import com.google.cloud.spring.pubsub.core.PubSubConfiguration;
import com.google.cloud.spring.pubsub.core.publisher.PublisherCustomizer;
import com.google.cloud.spring.pubsub.support.CachingPublisherFactory;
import com.google.cloud.spring.pubsub.support.DefaultPublisherFactory;
import com.google.cloud.spring.pubsub.support.DefaultSubscriberFactory;
import com.google.cloud.spring.pubsub.support.PublisherFactory;
import java.util.List;
import org.apache.commons.lang3.reflect.FieldUtils;
import org.junit.jupiter.api.Test;
import org.springframework.boot.autoconfigure.AutoConfigurations;
import org.springframework.boot.test.context.runner.ApplicationContextRunner;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.Ordered;
import org.springframework.core.annotation.Order;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.threeten.bp.Duration;

/** Tests for Pub/Sub autoconfiguration. */
class GcpPubSubAutoConfigurationTests {

ApplicationContextRunner baseContextRunner = new ApplicationContextRunner()
.withConfiguration(AutoConfigurations.of(GcpPubSubAutoConfiguration.class))
.withUserConfiguration(TestConfig.class);

static final BatchingSettings TEST_BATCHING_SETTINGS = BatchingSettings.newBuilder()
.setDelayThreshold(Duration.ofSeconds(11))
.build();

@Test
void keepAliveValue_default() {
ApplicationContextRunner contextRunner =
Expand Down Expand Up @@ -1285,6 +1303,66 @@ void createSubscriberStub_flowControlSettings_noPropertiesSet() {
});
}

@Test
void createPublisherWithCustomizer() {

baseContextRunner
.withUserConfiguration(CustomizerConfig.class)
.run(ctx -> {
PublisherFactory factory =
ctx.getBean("defaultPublisherFactory", PublisherFactory.class);

DefaultPublisherFactory defaultFactory =
(DefaultPublisherFactory) ((CachingPublisherFactory) factory).getDelegate();
List<PublisherCustomizer> customizers =
(List<PublisherCustomizer>) FieldUtils.readField(defaultFactory, "customizers", true);
assertThat(customizers)
.hasSize(3);
assertThat(customizers.get(0)).isInstanceOf(NoopCustomizer.class);
assertThat(customizers.get(1)).isInstanceOf(NoopCustomizer.class);
// DefaultPublisherFactory applies highest priority last
assertThat(customizers.get(2)).isInstanceOf(BatchingSettingsCustomizer.class);

Publisher testPublisher = factory.createPublisher("unused");
assertThat(testPublisher.getBatchingSettings()).isSameAs(TEST_BATCHING_SETTINGS);
});
}

@Configuration
static class CustomizerConfig {
@Bean
NoopCustomizer noop1() {
return new NoopCustomizer();
}

@Order(Ordered.HIGHEST_PRECEDENCE)
@Bean
BatchingSettingsCustomizer batchingSettingsCustomizer() {
return new BatchingSettingsCustomizer();
}

@Bean
NoopCustomizer noop2() {
return new NoopCustomizer();
}

}

static class NoopCustomizer implements PublisherCustomizer {

@Override
public void apply(Publisher.Builder publisherBuilder, String topic) {
// do nothing
}
}

static class BatchingSettingsCustomizer implements PublisherCustomizer {
@Override
public void apply(Publisher.Builder publisherBuilder, String topic) {
publisherBuilder.setBatchingSettings(TEST_BATCHING_SETTINGS);
}
}

static class TestConfig {

@Bean
Expand Down
Loading

0 comments on commit 07d7500

Please sign in to comment.