Skip to content

Commit

Permalink
Enable custom KafkaTracing to be passed to KafkaStreamsTracing.kafkaC…
Browse files Browse the repository at this point in the history
…lientSupplier (#1024)
  • Loading branch information
simondean authored and adriancole committed Oct 30, 2019
1 parent 564bd57 commit cddd22c
Show file tree
Hide file tree
Showing 7 changed files with 27 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,10 @@ public KafkaTracing build() {
this.remoteServiceName = builder.remoteServiceName;
}

public Tracing tracing() {
return tracing;
}

/**
* Extracts or creates a {@link Span.Kind#CONSUMER} span for each message received. This span is
* injected onto each message so it becomes the parent when a processor later calls {@link
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,18 +42,26 @@
/** Use this class to decorate Kafka Stream Topologies and enable Tracing. */
public final class KafkaStreamsTracing {

final Tracing tracing;
final KafkaTracing kafkaTracing;
final TraceContext.Extractor<Headers> extractor;
final TraceContext.Injector<Headers> injector;

KafkaStreamsTracing(Builder builder) { // intentionally hidden constructor
this.tracing = builder.tracing;
this.extractor = tracing.propagation().extractor(KafkaStreamsPropagation.GETTER);
this.injector = tracing.propagation().injector(KafkaStreamsPropagation.SETTER);
this.kafkaTracing = builder.kafkaTracing;
this.extractor = kafkaTracing.tracing().propagation().extractor(KafkaStreamsPropagation.GETTER);
this.injector = kafkaTracing.tracing().propagation().injector(KafkaStreamsPropagation.SETTER);
}

public static KafkaStreamsTracing create(Tracing tracing) {
return new KafkaStreamsTracing.Builder(tracing).build();
return new KafkaStreamsTracing.Builder(KafkaTracing.create(tracing)).build();
}

public static KafkaStreamsTracing create(KafkaTracing kafkaTracing) {
return new KafkaStreamsTracing.Builder(kafkaTracing).build();
}

public KafkaTracing kafkaTracing() {
return kafkaTracing;
}

/**
Expand All @@ -65,7 +73,6 @@ public static KafkaStreamsTracing create(Tracing tracing) {
* accepted.
*/
public KafkaClientSupplier kafkaClientSupplier() {
final KafkaTracing kafkaTracing = KafkaTracing.create(tracing);
return new TracingKafkaClientSupplier(kafkaTracing);
}

Expand Down Expand Up @@ -391,19 +398,19 @@ static void addTags(ProcessorContext processorContext, SpanCustomizer result) {

Span nextSpan(ProcessorContext context) {
TraceContextOrSamplingFlags extracted = extractor.extract(context.headers());
Span result = tracing.tracer().nextSpan(extracted);
Span result = kafkaTracing.tracing().tracer().nextSpan(extracted);
if (!result.isNoop()) {
addTags(context, result);
}
return result;
}

public static final class Builder {
final Tracing tracing;
final KafkaTracing kafkaTracing;

Builder(Tracing tracing) {
if (tracing == null) throw new NullPointerException("tracing == null");
this.tracing = tracing;
Builder(KafkaTracing kafkaTracing) {
if (kafkaTracing == null) throw new NullPointerException("kafkaTracing == null");
this.kafkaTracing = kafkaTracing;
}

public KafkaStreamsTracing build() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ abstract class TracingFilter<K, V, R> {
TracingFilter(KafkaStreamsTracing tracing, String spanName,
Predicate<K, V> delegatePredicate, boolean filterNot) {
this.kafkaStreamsTracing = tracing;
this.tracer = kafkaStreamsTracing.tracing.tracer();
this.tracer = kafkaStreamsTracing.kafkaTracing().tracing().tracer();
this.spanName = spanName;
this.delegatePredicate = delegatePredicate;
this.filterNot = filterNot;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ class TracingProcessor<K, V> implements Processor<K, V> {
TracingProcessor(KafkaStreamsTracing kafkaStreamsTracing,
String spanName, Processor<K, V> delegateProcessor) {
this.kafkaStreamsTracing = kafkaStreamsTracing;
this.tracer = kafkaStreamsTracing.tracing.tracer();
this.tracer = kafkaStreamsTracing.kafkaTracing().tracing().tracer();
this.spanName = spanName;
this.delegateProcessor = delegateProcessor;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ class TracingTransformer<K, V, R> implements Transformer<K, V, R> {
TracingTransformer(KafkaStreamsTracing kafkaStreamsTracing, String spanName,
Transformer<K, V, R> delegateTransformer) {
this.kafkaStreamsTracing = kafkaStreamsTracing;
this.tracer = kafkaStreamsTracing.tracing.tracer();
this.tracer = kafkaStreamsTracing.kafkaTracing().tracing().tracer();
this.spanName = spanName;
this.delegateTransformer = delegateTransformer;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ class TracingValueTransformer<V, VR> implements ValueTransformer<V, VR> {
TracingValueTransformer(KafkaStreamsTracing kafkaStreamsTracing, String spanName,
ValueTransformer<V, VR> delegateTransformer) {
this.kafkaStreamsTracing = kafkaStreamsTracing;
this.tracer = kafkaStreamsTracing.tracing.tracer();
this.tracer = kafkaStreamsTracing.kafkaTracing().tracing().tracer();
this.spanName = spanName;
this.delegateTransformer = delegateTransformer;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ class TracingValueTransformerWithKey<K, V, VR> implements ValueTransformerWithKe
TracingValueTransformerWithKey(KafkaStreamsTracing kafkaStreamsTracing, String spanName,
ValueTransformerWithKey<K, V, VR> delegateTransformer) {
this.kafkaStreamsTracing = kafkaStreamsTracing;
this.tracer = kafkaStreamsTracing.tracing.tracer();
this.tracer = kafkaStreamsTracing.kafkaTracing().tracing().tracer();
this.spanName = spanName;
this.delegateTransformer = delegateTransformer;
}
Expand Down

0 comments on commit cddd22c

Please sign in to comment.