Skip to content

Commit

Permalink
Adds decorate option to wrap existing spring rabbit components (openz…
Browse files Browse the repository at this point in the history
  • Loading branch information
marcingrzejszczak authored and adriancole committed Mar 6, 2018
1 parent 539422e commit dcdfa1c
Show file tree
Hide file tree
Showing 4 changed files with 240 additions and 33 deletions.
6 changes: 5 additions & 1 deletion instrumentation/spring-rabbit/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory,
}
```

You can also use `SpringRabbitTracing.decorateRabbitTemplate()` to add
tracing to an existing template.

### Message Consumer
Tracing is supported for spring-rabbit `@RabbitListener` based services.
Expand All @@ -52,8 +54,10 @@ public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(
ConnectionFactory connectionFactory,
SpringRabbitTracing springRabbitTracing
) {
return springRabbitTracing.newSimpleMessageListenerContainerFactory(connectionFactory);
return springRabbitTracing.newSimpleRabbitListenerContainerFactory(connectionFactory);
}
```

You can also use `SpringRabbitTracing.decorateSimpleRabbitListenerContainerFactory()`
to add tracing to an existing factory.

Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
package brave.spring.rabbit;

import brave.Tracing;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import org.aopalliance.aop.Advice;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
Expand Down Expand Up @@ -45,30 +51,98 @@ public SpringRabbitTracing build() {

final TracingMessagePostProcessor tracingMessagePostProcessor;
final TracingRabbitListenerAdvice tracingRabbitListenerAdvice;
final Field beforePublishPostProcessorsField;

SpringRabbitTracing(Builder builder) {
Tracing tracing = builder.tracing;
String remoteServiceName = builder.remoteServiceName;
this.tracingMessagePostProcessor = new TracingMessagePostProcessor(tracing, remoteServiceName);
this.tracingRabbitListenerAdvice = new TracingRabbitListenerAdvice(tracing, remoteServiceName);
Field beforePublishPostProcessorsField = null;
try {
beforePublishPostProcessorsField =
RabbitTemplate.class.getDeclaredField("beforePublishPostProcessors");
beforePublishPostProcessorsField.setAccessible(true);
} catch (NoSuchFieldException e) {
}
this.beforePublishPostProcessorsField = beforePublishPostProcessorsField;
}

/** Creates an instrumented rabbit template. */
/** Creates an instrumented {@linkplain RabbitTemplate} */
public RabbitTemplate newRabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setBeforePublishPostProcessors(tracingMessagePostProcessor);
return rabbitTemplate;
}

/**
* Creates an instrumented SimpleRabbitListenerContainerFactory to be used to consume rabbit
* messages.
*/
public SimpleRabbitListenerContainerFactory newSimpleMessageListenerContainerFactory(
ConnectionFactory connectionFactory) {
/** Instruments an existing {@linkplain RabbitTemplate} */
public RabbitTemplate decorateRabbitTemplate(RabbitTemplate rabbitTemplate) {
// Skip out if we can't read the field for the existing post processors
if (beforePublishPostProcessorsField == null) return rabbitTemplate;
Collection<MessagePostProcessor> processors;
try {
processors = (Collection) beforePublishPostProcessorsField.get(rabbitTemplate);
} catch (IllegalAccessException e) {
return rabbitTemplate;
}

// If there are no existing post processors, return only the tracing one
if (processors == null) {
rabbitTemplate.setBeforePublishPostProcessors(tracingMessagePostProcessor);
return rabbitTemplate;
}

// If there is an existing tracing post processor return
for (MessagePostProcessor processor : processors) {
if (processor instanceof TracingMessagePostProcessor) {
return rabbitTemplate;
}
}

// Otherwise, add ours and return
List<MessagePostProcessor> newProcessors = new ArrayList<>(processors.size() + 1);
newProcessors.addAll(processors);
newProcessors.add(tracingMessagePostProcessor);
rabbitTemplate.setBeforePublishPostProcessors(
newProcessors.toArray(new MessagePostProcessor[0])
);
return rabbitTemplate;
}

/** Creates an instrumented {@linkplain SimpleRabbitListenerContainerFactory} */
public SimpleRabbitListenerContainerFactory newSimpleRabbitListenerContainerFactory(
ConnectionFactory connectionFactory
) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setAdviceChain(tracingRabbitListenerAdvice);
return factory;
}

/** Instruments an existing {@linkplain SimpleRabbitListenerContainerFactory} */
public SimpleRabbitListenerContainerFactory decorateSimpleRabbitListenerContainerFactory(
SimpleRabbitListenerContainerFactory factory
) {
Advice[] chain = factory.getAdviceChain();

// If there are no existing advice, return only the tracing one
if (chain == null) {
factory.setAdviceChain(tracingRabbitListenerAdvice);
return factory;
}

// If there is an existing tracing advice return
for (Advice advice : chain) {
if (advice instanceof TracingRabbitListenerAdvice) {
return factory;
}
}

// Otherwise, add ours and return
Advice[] newChain = new Advice[chain.length + 1];
System.arraycopy(chain, 0, newChain, 0, chain.length);
newChain[chain.length] = tracingRabbitListenerAdvice;
factory.setAdviceChain(newChain);
return factory;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.junit.BrokerRunning;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.Bean;
Expand Down Expand Up @@ -105,6 +106,27 @@ public class ITSpringRabbitTracing {
.isEmpty();
}

@Test public void tags_spans_with_exchange_and_routing_key_from_default() throws Exception {
testFixture.produceMessageFromDefault();
testFixture.awaitMessageConsumed();

assertThat(testFixture.consumerSpans).hasSize(2);

assertThat(testFixture.consumerSpans)
.filteredOn(s -> s.kind() == CONSUMER)
.flatExtracting(s -> s.tags().entrySet())
.containsOnly(
entry("rabbit.exchange", "test-exchange"),
entry("rabbit.routing_key", "test.binding"),
entry("rabbit.queue", "test-queue")
);

assertThat(testFixture.consumerSpans)
.filteredOn(s -> s.kind() != CONSUMER)
.flatExtracting(s -> s.tags().entrySet())
.isEmpty();
}

// We will revisit this eventually, but these names mostly match the method names
@Test public void method_names_as_span_names() throws Exception {
testFixture.produceMessage();
Expand Down Expand Up @@ -172,16 +194,37 @@ public List<Span> producerSpans() {
}

@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory,
SpringRabbitTracing springRabbitTracing) {
RabbitTemplate rabbitTemplate = springRabbitTracing.newRabbitTemplate(connectionFactory);
rabbitTemplate.setExchange("test-exchange");
return rabbitTemplate;
public RabbitTemplate newRabbitTemplate(
ConnectionFactory connectionFactory,
SpringRabbitTracing springRabbitTracing
) {
RabbitTemplate newRabbitTemplate = springRabbitTracing.newRabbitTemplate(connectionFactory);
newRabbitTemplate.setExchange("test-exchange");
return newRabbitTemplate;
}

@Bean
public HelloWorldRabbitProducer tracingRabbitProducer(RabbitTemplate rabbitTemplate) {
return new HelloWorldRabbitProducer(rabbitTemplate);
public RabbitTemplate decorateRabbitTemplate(
ConnectionFactory connectionFactory,
SpringRabbitTracing springRabbitTracing
) {
RabbitTemplate newRabbitTemplate = new RabbitTemplate(connectionFactory);
newRabbitTemplate.setExchange("test-exchange");
return springRabbitTracing.decorateRabbitTemplate(newRabbitTemplate);
}

@Bean
public HelloWorldProducer tracingRabbitProducer_new(
@Qualifier("newRabbitTemplate") RabbitTemplate newRabbitTemplate
) {
return new HelloWorldProducer(newRabbitTemplate);
}

@Bean
public HelloWorldProducer tracingRabbitProducer_decorate(
@Qualifier("decorateRabbitTemplate") RabbitTemplate newRabbitTemplate
) {
return new HelloWorldProducer(newRabbitTemplate);
}
}

Expand Down Expand Up @@ -210,37 +253,43 @@ public List<Span> consumerSpans() {
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(
ConnectionFactory connectionFactory,
SpringRabbitTracing springRabbitTracing) {
return springRabbitTracing.newSimpleMessageListenerContainerFactory(connectionFactory);
SpringRabbitTracing springRabbitTracing
) {
SimpleRabbitListenerContainerFactory simpleRabbitListenerContainerFactory =
new SimpleRabbitListenerContainerFactory();
simpleRabbitListenerContainerFactory.setConnectionFactory(connectionFactory);
return springRabbitTracing.decorateSimpleRabbitListenerContainerFactory(
simpleRabbitListenerContainerFactory
);
}

@Bean
public HelloWorldRabbitConsumer helloWorldRabbitConsumer() {
return new HelloWorldRabbitConsumer();
public HelloWorldConsumer helloWorldRabbitConsumer() {
return new HelloWorldConsumer();
}
}

private static class HelloWorldRabbitProducer {
private final RabbitTemplate rabbitTemplate;
private static class HelloWorldProducer {
private final RabbitTemplate newRabbitTemplate;

HelloWorldRabbitProducer(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
HelloWorldProducer(RabbitTemplate newRabbitTemplate) {
this.newRabbitTemplate = newRabbitTemplate;
}

void send() {
byte[] messageBody = "hello world".getBytes();
MessageProperties properties = new MessageProperties();
properties.setHeader("not-zipkin-header", "fakeValue");
Message message = MessageBuilder.withBody(messageBody).andProperties(properties).build();
rabbitTemplate.send("test.binding", message);
newRabbitTemplate.send("test.binding", message);
}
}

private static class HelloWorldRabbitConsumer {
private static class HelloWorldConsumer {
private CountDownLatch countDownLatch;
private Message capturedMessage;

HelloWorldRabbitConsumer() {
HelloWorldConsumer() {
this.countDownLatch = new CountDownLatch(1);
}

Expand Down Expand Up @@ -275,7 +324,7 @@ private static class ITSpringAmqpTracingTestFixture {
}

private void reset() {
HelloWorldRabbitConsumer consumer = consumerContext.getBean(HelloWorldRabbitConsumer.class);
HelloWorldConsumer consumer = consumerContext.getBean(HelloWorldConsumer.class);
consumer.reset();
producerSpans.clear();
consumerSpans.clear();
Expand All @@ -297,19 +346,24 @@ private ApplicationContext consumerSpringContext() {
}

private void produceMessage() {
HelloWorldRabbitProducer rabbitProducer =
producerContext.getBean(HelloWorldRabbitProducer.class);
HelloWorldProducer rabbitProducer =
producerContext.getBean("tracingRabbitProducer_new", HelloWorldProducer.class);
rabbitProducer.send();
}

private void produceMessageFromDefault() {
HelloWorldProducer rabbitProducer =
producerContext.getBean("tracingRabbitProducer_decorate", HelloWorldProducer.class);
rabbitProducer.send();
}

private void awaitMessageConsumed()
throws InterruptedException {
HelloWorldRabbitConsumer consumer = consumerContext.getBean(HelloWorldRabbitConsumer.class);
private void awaitMessageConsumed() throws InterruptedException {
HelloWorldConsumer consumer = consumerContext.getBean(HelloWorldConsumer.class);
consumer.getCountDownLatch().await();
}

private Message capturedMessage() {
HelloWorldRabbitConsumer consumer = consumerContext.getBean(HelloWorldRabbitConsumer.class);
HelloWorldConsumer consumer = consumerContext.getBean(HelloWorldConsumer.class);
return consumer.capturedMessage;
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package brave.spring.rabbit;

import brave.Tracing;
import brave.sampler.Sampler;
import org.junit.After;
import org.junit.Test;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.postprocessor.UnzipPostProcessor;
import org.springframework.cache.interceptor.CacheInterceptor;
import zipkin2.reporter.Reporter;

import static java.util.Arrays.asList;
import static org.assertj.core.api.Java6Assertions.assertThat;

public class SpringRabbitTracingTest {
SpringRabbitTracing tracing = SpringRabbitTracing.create(Tracing.newBuilder()
.sampler(Sampler.ALWAYS_SAMPLE)
.spanReporter(Reporter.NOOP)
.build());

@After public void close() {
Tracing.current().close();
}

@Test public void decorateRabbitTemplate_adds_by_default() {
RabbitTemplate template = new RabbitTemplate();
assertThat(tracing.decorateRabbitTemplate(template))
.extracting("beforePublishPostProcessors")
.containsExactly(asList(tracing.tracingMessagePostProcessor));
}

@Test public void decorateRabbitTemplate_skips_when_present() {
RabbitTemplate template = new RabbitTemplate();
template.setBeforePublishPostProcessors(tracing.tracingMessagePostProcessor);

assertThat(tracing.decorateRabbitTemplate(template))
.extracting("beforePublishPostProcessors")
.containsExactly(asList(tracing.tracingMessagePostProcessor));
}

@Test public void decorateRabbitTemplate_appends_when_absent() {
RabbitTemplate template = new RabbitTemplate();
UnzipPostProcessor postProcessor = new UnzipPostProcessor();
template.setBeforePublishPostProcessors(postProcessor);

assertThat(tracing.decorateRabbitTemplate(template))
.extracting("beforePublishPostProcessors")
.containsExactly(asList(postProcessor, tracing.tracingMessagePostProcessor));
}

@Test public void decorateSimpleRabbitListenerContainerFactory_adds_by_default() {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();

assertThat(tracing.decorateSimpleRabbitListenerContainerFactory(factory).getAdviceChain())
.containsExactly(tracing.tracingRabbitListenerAdvice);
}

@Test public void decorateSimpleRabbitListenerContainerFactory_skips_when_present() {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setAdviceChain(tracing.tracingRabbitListenerAdvice);

assertThat(tracing.decorateSimpleRabbitListenerContainerFactory(factory).getAdviceChain())
.containsExactly(tracing.tracingRabbitListenerAdvice);
}

@Test public void decorateSimpleRabbitListenerContainerFactory_appends_when_absent() {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
CacheInterceptor advice = new CacheInterceptor();
factory.setAdviceChain(advice);

assertThat(tracing.decorateSimpleRabbitListenerContainerFactory(factory).getAdviceChain())
.containsExactly(advice, tracing.tracingRabbitListenerAdvice);
}
}

0 comments on commit dcdfa1c

Please sign in to comment.