Skip to content

Commit

Permalink
Add error handling for IllegalStateExceptions from Kafka Headers (o…
Browse files Browse the repository at this point in the history
  • Loading branch information
m50d committed Nov 2, 2020
1 parent 1705218 commit d78503a
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,17 @@
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;

import static brave.kafka.clients.KafkaTracing.log;
import static java.nio.charset.StandardCharsets.UTF_8;

final class KafkaHeaders {
static void replaceHeader(Headers headers, String key, String value) {
headers.remove(key);
headers.add(key, value.getBytes(UTF_8));
try {
headers.remove(key);
headers.add(key, value.getBytes(UTF_8));
} catch (IllegalStateException e) {
log(e, "error setting header {0} in headers {1}", key, headers);
}
}

@Nullable static String lastStringHeader(Headers headers, String key) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import brave.SpanCustomizer;
import brave.Tracer;
import brave.Tracing;
import brave.internal.Nullable;
import brave.messaging.MessagingRequest;
import brave.messaging.MessagingTracing;
import brave.propagation.B3Propagation;
Expand All @@ -29,6 +30,10 @@
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.Set;
import java.util.logging.Level;
import java.util.logging.LogRecord;
import java.util.logging.Logger;

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.Producer;
Expand All @@ -49,6 +54,10 @@ public final class KafkaTracing {
return "Headers::lastHeader";
}
};
// Use nested class to ensure logger isn't initialized unless it is accessed once.
private static final class LoggerHolder {
static final Logger LOG = Logger.getLogger(KafkaTracing.class.getName());
}

public static KafkaTracing create(Tracing tracing) {
return newBuilder(tracing).build();
Expand Down Expand Up @@ -243,4 +252,35 @@ static void addTags(ConsumerRecord<?, ?> record, SpanCustomizer result) {
}
result.tag(KafkaTags.KAFKA_TOPIC_TAG, record.topic());
}


/**
* Avoids array allocation when logging a parameterized message when fine level is disabled. The
* second parameter is optional.
*
* <p>Ex.
* <pre>{@code
* try {
* tracePropagationThatMayThrow(record);
* } catch (Throwable e) {
* Call.propagateIfFatal(e);
* log(e, "error adding propagation information to {0}", record, null);
* return null;
* }
* }</pre>
*
* @param thrown the exception that was caught
* @param msg the format string
* @param zero will end up as {@code {0}} in the format string
* @param one if present, will end up as {@code {1}} in the format string
*/
static void log(Throwable thrown, String msg, Object zero, @Nullable Object one) {
Logger logger = LoggerHolder.LOG;
if (!logger.isLoggable(Level.FINE)) return; // fine level to not fill logs
LogRecord lr = new LogRecord(Level.FINE, msg);
Object[] params = one != null ? new Object[] {zero, one} : new Object[] {zero};
lr.setParameters(params);
lr.setThrown(thrown);
logger.log(lr);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package brave.kafka.clients;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.junit.Test;

import static org.assertj.core.api.Assertions.assertThat;
Expand Down Expand Up @@ -46,4 +47,9 @@ public class KafkaHeadersTest {
assertThat(record.headers().lastHeader("b3").value())
.containsExactly('1');
}

@Test public void replaceHeader_readonly() {
((RecordHeaders) record.headers()).setReadOnly();
KafkaHeaders.replaceHeader(record.headers(), "b3", "1");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.kafka.clients.producer.MockProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.junit.Test;

import static brave.Span.Kind.PRODUCER;
Expand Down Expand Up @@ -126,4 +127,10 @@ public class TracingProducerTest extends KafkaTest {
assertThat(producerSpan.tags())
.containsOnly(entry("kafka.topic", TEST_TOPIC));
}

@Test public void should_not_error_if_headers_are_read_only() {
final ProducerRecord<Object, String> record = new ProducerRecord<>(TEST_TOPIC, TEST_KEY, TEST_VALUE);
((RecordHeaders) record.headers()).setReadOnly();
tracingProducer.send(record);
}
}

0 comments on commit d78503a

Please sign in to comment.