Skip to content
/ beam Public
forked from apache/beam

Commit

Permalink
Collect metric of message size distribution in KafkaIO read. (apache#…
Browse files Browse the repository at this point in the history
…29443)

* Collect metric of message size distribution in KafkaIO read.

Tests on the metric are also added for legacy code path
(invoked by `ReadFromKafkaViaUnbounded`) and the SDF code path
(invoked by `ReadFromKafkaViaSDF`) of kafkaIO read.

* Consolidate kafkaio read metric name contants in one place
  • Loading branch information
shunping committed Nov 21, 2023
1 parent bb4b633 commit a349e76
Show file tree
Hide file tree
Showing 4 changed files with 113 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.apache.beam.sdk.io.kafka.KafkaCheckpointMark.PartitionMark;
import org.apache.beam.sdk.io.kafka.KafkaIO.Read;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Distribution;
import org.apache.beam.sdk.metrics.Gauge;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.metrics.SourceMetrics;
Expand Down Expand Up @@ -217,6 +218,12 @@ public boolean advance() throws IOException {
pState.recordConsumed(offset, recordSize, offsetGap);
bytesRead.inc(recordSize);
bytesReadBySplit.inc(recordSize);

Distribution rawSizes =
Metrics.distribution(
METRIC_NAMESPACE, RAW_SIZE_METRIC_PREFIX + pState.topicPartition.toString());
rawSizes.update(recordSize);

return true;

} else { // -- (b)
Expand Down Expand Up @@ -309,6 +316,8 @@ public long getSplitBacklogBytes() {

@VisibleForTesting static final String METRIC_NAMESPACE = "KafkaIOReader";

@VisibleForTesting static final String RAW_SIZE_METRIC_PREFIX = "rawSize/";

@VisibleForTesting
static final String CHECKPOINT_MARK_COMMITS_ENQUEUED_METRIC = "checkpointMarkCommitsEnqueued";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
import org.apache.beam.sdk.io.kafka.KafkaIOUtils.MovingAvg;
import org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.TimestampPolicyContext;
import org.apache.beam.sdk.io.range.OffsetRange;
import org.apache.beam.sdk.metrics.Distribution;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.splittabledofn.GrowableOffsetRangeTracker;
Expand Down Expand Up @@ -203,6 +205,10 @@ private ReadFromKafkaDoFn(ReadSourceDescriptors<K, V> transform) {
@VisibleForTesting final DeserializerProvider<K> keyDeserializerProvider;
@VisibleForTesting final DeserializerProvider<V> valueDeserializerProvider;
@VisibleForTesting final Map<String, Object> consumerConfig;
@VisibleForTesting static final String METRIC_NAMESPACE = KafkaUnboundedReader.METRIC_NAMESPACE;

@VisibleForTesting
static final String RAW_SIZE_METRIC_PREFIX = KafkaUnboundedReader.RAW_SIZE_METRIC_PREFIX;

/**
* A {@link GrowableOffsetRangeTracker.RangeEndEstimator} which uses a Kafka {@link Consumer} to
Expand Down Expand Up @@ -362,6 +368,10 @@ public ProcessContinuation processElement(
Preconditions.checkStateNotNull(this.keyDeserializerInstance);
final Deserializer<V> valueDeserializerInstance =
Preconditions.checkStateNotNull(this.valueDeserializerInstance);
final Distribution rawSizes =
Metrics.distribution(
METRIC_NAMESPACE,
RAW_SIZE_METRIC_PREFIX + kafkaSourceDescriptor.getTopicPartition().toString());
// Stop processing current TopicPartition when it's time to stop.
if (checkStopReadingFn != null
&& checkStopReadingFn.apply(kafkaSourceDescriptor.getTopicPartition())) {
Expand Down Expand Up @@ -437,6 +447,7 @@ public ProcessContinuation processElement(
avgRecordSize
.getUnchecked(kafkaSourceDescriptor.getTopicPartition())
.update(recordSize, rawRecord.offset() - expectedOffset);
rawSizes.update(recordSize);
expectedOffset = rawRecord.offset() + 1;
Instant outputTimestamp;
// The outputTimestamp and watermark will be computed by timestampPolicy, where the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@
import org.apache.beam.sdk.io.kafka.KafkaIO.Read.FakeFlinkPipelineOptions;
import org.apache.beam.sdk.io.kafka.KafkaMocks.PositionErrorConsumerFactory;
import org.apache.beam.sdk.io.kafka.KafkaMocks.SendErrorProducerFactory;
import org.apache.beam.sdk.metrics.DistributionResult;
import org.apache.beam.sdk.metrics.MetricName;
import org.apache.beam.sdk.metrics.MetricNameFilter;
import org.apache.beam.sdk.metrics.MetricQueryResults;
Expand Down Expand Up @@ -1874,6 +1875,67 @@ public void testUnboundedSourceStartReadTimeException() {
p.run();
}

@Test
public void testUnboundedSourceRawSizeMetric() {
final String readStep = "readFromKafka";
final int numElements = 1000;
final int numPartitionsPerTopic = 10;
final int recordSize = 12; // The size of key and value is defined in ConsumerFactoryFn.

List<String> topics = ImmutableList.of("test");

KafkaIO.Read<byte[], Long> reader =
KafkaIO.<byte[], Long>read()
.withBootstrapServers("none")
.withTopicPartitions(
ImmutableList.of(new TopicPartition("test", 5), new TopicPartition("test", 8)))
.withConsumerFactoryFn(
new ConsumerFactoryFn(
topics, numPartitionsPerTopic, numElements, OffsetResetStrategy.EARLIEST))
.withKeyDeserializer(ByteArrayDeserializer.class)
.withValueDeserializer(LongDeserializer.class)
.withMaxNumRecords(numElements / numPartitionsPerTopic * 2); // 2 is the # of partitions

p.apply(readStep, reader.withoutMetadata()).apply(Values.create());

PipelineResult result = p.run();

MetricQueryResults metrics =
result
.metrics()
.queryMetrics(
MetricsFilter.builder()
.addNameFilter(
MetricNameFilter.inNamespace(KafkaUnboundedReader.METRIC_NAMESPACE))
.build());

assertThat(
metrics.getDistributions(),
hasItem(
attemptedMetricsResult(
KafkaUnboundedReader.METRIC_NAMESPACE,
KafkaUnboundedReader.RAW_SIZE_METRIC_PREFIX + "test-5",
readStep,
DistributionResult.create(
recordSize * numElements / numPartitionsPerTopic,
numElements / numPartitionsPerTopic,
recordSize,
recordSize))));

assertThat(
metrics.getDistributions(),
hasItem(
attemptedMetricsResult(
KafkaUnboundedReader.METRIC_NAMESPACE,
KafkaUnboundedReader.RAW_SIZE_METRIC_PREFIX + "test-8",
readStep,
DistributionResult.create(
recordSize * numElements / numPartitionsPerTopic,
numElements / numPartitionsPerTopic,
recordSize,
recordSize))));
}

@Test
public void testSourceDisplayData() {
KafkaIO.Read<Integer, Long> read = mkKafkaReadTransform(10, null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,17 @@
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.beam.runners.core.metrics.DistributionCell;
import org.apache.beam.runners.core.metrics.DistributionData;
import org.apache.beam.runners.core.metrics.MetricsContainerImpl;
import org.apache.beam.sdk.Pipeline.PipelineVisitor;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.SerializableCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.kafka.KafkaIO.ReadSourceDescriptors;
import org.apache.beam.sdk.io.range.OffsetRange;
import org.apache.beam.sdk.metrics.MetricName;
import org.apache.beam.sdk.metrics.MetricsEnvironment;
import org.apache.beam.sdk.runners.TransformHierarchy.Node;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
Expand Down Expand Up @@ -394,6 +399,32 @@ public void testProcessElement() throws Exception {
createExpectedRecords(descriptor, startOffset, 3, "key", "value"), receiver.getOutputs());
}

@Test
public void testRawSizeMetric() throws Exception {
final int numElements = 1000;
final int recordSize = 8; // The size of key and value is defined in SimpleMockKafkaConsumer.
MetricsContainerImpl container = new MetricsContainerImpl("any");
MetricsEnvironment.setCurrentContainer(container);

MockOutputReceiver receiver = new MockOutputReceiver();
consumer.setNumOfRecordsPerPoll(numElements);
OffsetRangeTracker tracker = new OffsetRangeTracker(new OffsetRange(0, numElements));
KafkaSourceDescriptor descriptor =
KafkaSourceDescriptor.of(topicPartition, null, null, null, null, null);
ProcessContinuation result = dofnInstance.processElement(descriptor, tracker, null, receiver);
assertEquals(ProcessContinuation.stop(), result);

DistributionCell d =
container.getDistribution(
MetricName.named(
ReadFromKafkaDoFn.METRIC_NAMESPACE,
ReadFromKafkaDoFn.RAW_SIZE_METRIC_PREFIX + topicPartition));

assertEquals(
d.getCumulative(),
DistributionData.create(recordSize * numElements, numElements, recordSize, recordSize));
}

@Test
public void testProcessElementWithEmptyPoll() throws Exception {
MockOutputReceiver receiver = new MockOutputReceiver();
Expand Down

0 comments on commit a349e76

Please sign in to comment.