Skip to content

Commit

Permalink
Fix broken spanmetrics counters after span producing service restart (#…
Browse files Browse the repository at this point in the history
…29711)

My spanmetrics counters (e.g. `calls_total`) break after restarting the
span producing service.

For example:

![Screenshot from 2023-12-06
11-39-57](https://github.com/open-telemetry/opentelemetry-collector-contrib/assets/149630/abea1b72-392b-4f1f-a403-644c4e356f3d)

I discovered that the resource key used for the calculated metrics was a
map hash of the resource attributes. This worked fine for some
instrumented services, however, other services include attributes like
its process id etc. Restarting one of these services would result in a
new hash and calculated resource metrics (in addition to the existing
ones).

This pull-request filters the resource attributes used to produce the
resource metrics key map hash. I am now able to restart services without
breaking my counters.

---------

Signed-off-by: Sean Porter <portertech@gmail.com>
Co-authored-by: Albert <26584478+albertteoh@users.noreply.github.com>
  • Loading branch information
portertech and albertteoh committed Dec 21, 2023
1 parent 03e4f9d commit 4acf9f8
Show file tree
Hide file tree
Showing 6 changed files with 146 additions and 27 deletions.
27 changes: 27 additions & 0 deletions .chloggen/spanmetrics-fix-resource-metrics-key.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: connector/spanmetrics

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Configurable resource metrics key attributes, filter the resource attributes used to create the resource metrics key.

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [29711]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: This enhancement can be used to fix broken spanmetrics counters after a span producing service restart, when resource attributes contain dynamic/ephemeral values (e.g. process id).

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
9 changes: 9 additions & 0 deletions connector/spanmetricsconnector/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,15 @@ type Config struct {
// Optional. See defaultResourceMetricsCacheSize in connector.go for the default value.
ResourceMetricsCacheSize int `mapstructure:"resource_metrics_cache_size"`

// ResourceMetricsKeyAttributes filters the resource attributes used to create the resource metrics key hash.
// This can be used to avoid situations where resource attributes may change across service restarts, causing
// metric counters to break (and duplicate). A resource does not need to have all of the attributes. The list
// must include enough attributes to properly identify unique resources or risk aggregating data from more
// than one service and span.
// e.g. ["service.name", "telemetry.sdk.language", "telemetry.sdk.name"]
// See https://opentelemetry.io/docs/specs/semconv/resource/ for possible attributes.
ResourceMetricsKeyAttributes []string `mapstructure:"resource_metrics_key_attributes"`

AggregationTemporality string `mapstructure:"aggregation_temporality"`

Histogram HistogramConfig `mapstructure:"histogram"`
Expand Down
11 changes: 11 additions & 0 deletions connector/spanmetricsconnector/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,17 @@ func TestLoadConfig(t *testing.T) {
Exemplars: ExemplarsConfig{Enabled: true, MaxPerDataPoint: &defaultMaxPerDatapoint},
},
},
{
id: component.NewIDWithName(metadata.Type, "resource_metrics_key_attributes"),
expected: &Config{
AggregationTemporality: "AGGREGATION_TEMPORALITY_CUMULATIVE",
DimensionsCacheSize: defaultDimensionsCacheSize,
ResourceMetricsCacheSize: defaultResourceMetricsCacheSize,
ResourceMetricsKeyAttributes: []string{"service.name", "telemetry.sdk.language", "telemetry.sdk.name"},
MetricsFlushInterval: 15 * time.Second,
Histogram: HistogramConfig{Disable: false, Unit: defaultUnit},
},
},
}

for _, tt := range tests {
Expand Down
44 changes: 33 additions & 11 deletions connector/spanmetricsconnector/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ type connectorImp struct {

resourceMetrics *cache.Cache[resourceKey, *resourceMetrics]

resourceMetricsKeyAttributes map[string]struct{}

keyBuf *bytes.Buffer

// An LRU cache of dimension key-value maps keyed by a unique identifier formed by a concatenation of its values:
Expand Down Expand Up @@ -115,17 +117,24 @@ func newConnector(logger *zap.Logger, config component.Config, ticker *clock.Tic
return nil, err
}

resourceMetricsKeyAttributes := make(map[string]struct{}, len(cfg.ResourceMetricsKeyAttributes))
var s struct{}
for _, attr := range cfg.ResourceMetricsKeyAttributes {
resourceMetricsKeyAttributes[attr] = s
}

return &connectorImp{
logger: logger,
config: *cfg,
resourceMetrics: resourceMetricsCache,
dimensions: newDimensions(cfg.Dimensions),
keyBuf: bytes.NewBuffer(make([]byte, 0, 1024)),
metricKeyToDimensions: metricKeyToDimensionsCache,
ticker: ticker,
done: make(chan struct{}),
eDimensions: newDimensions(cfg.Events.Dimensions),
events: cfg.Events,
logger: logger,
config: *cfg,
resourceMetrics: resourceMetricsCache,
resourceMetricsKeyAttributes: resourceMetricsKeyAttributes,
dimensions: newDimensions(cfg.Dimensions),
keyBuf: bytes.NewBuffer(make([]byte, 0, 1024)),
metricKeyToDimensions: metricKeyToDimensionsCache,
ticker: ticker,
done: make(chan struct{}),
eDimensions: newDimensions(cfg.Events.Dimensions),
events: cfg.Events,
}, nil
}

Expand Down Expand Up @@ -390,8 +399,21 @@ func (p *connectorImp) addExemplar(span ptrace.Span, duration float64, h metrics

type resourceKey [16]byte

func (p *connectorImp) createResourceKey(attr pcommon.Map) resourceKey {
if len(p.resourceMetricsKeyAttributes) == 0 {
return pdatautil.MapHash(attr)
}
m := pcommon.NewMap()
attr.CopyTo(m)
m.RemoveIf(func(k string, _ pcommon.Value) bool {
_, ok := p.resourceMetricsKeyAttributes[k]
return !ok
})
return pdatautil.MapHash(m)
}

func (p *connectorImp) getOrCreateResourceMetrics(attr pcommon.Map) *resourceMetrics {
key := resourceKey(pdatautil.MapHash(attr))
key := p.createResourceKey(attr)
v, ok := p.resourceMetrics.Get(key)
if !ok {
v = &resourceMetrics{
Expand Down
75 changes: 59 additions & 16 deletions connector/spanmetricsconnector/connector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -600,7 +600,7 @@ func TestConcurrentShutdown(t *testing.T) {
ticker := mockClock.NewTicker(time.Nanosecond)

// Test
p := newConnectorImp(t, new(consumertest.MetricsSink), nil, explicitHistogramsConfig, disabledExemplarsConfig, disabledEventsConfig, cumulative, logger, ticker)
p := newConnectorImp(t, new(consumertest.MetricsSink), nil, explicitHistogramsConfig, disabledExemplarsConfig, disabledEventsConfig, cumulative, []string{}, logger, ticker)
err := p.Start(ctx, componenttest.NewNopHost())
require.NoError(t, err)

Expand Down Expand Up @@ -680,7 +680,7 @@ func TestConsumeMetricsErrors(t *testing.T) {
}
mockClock := clock.NewMock(time.Now())
ticker := mockClock.NewTicker(time.Nanosecond)
p := newConnectorImp(t, mcon, nil, explicitHistogramsConfig, disabledExemplarsConfig, disabledEventsConfig, cumulative, logger, ticker)
p := newConnectorImp(t, mcon, nil, explicitHistogramsConfig, disabledExemplarsConfig, disabledEventsConfig, cumulative, []string{}, logger, ticker)

ctx := metadata.NewIncomingContext(context.Background(), nil)
err := p.Start(ctx, componenttest.NewNopHost())
Expand Down Expand Up @@ -842,7 +842,7 @@ func TestConsumeTraces(t *testing.T) {
mockClock := clock.NewMock(time.Now())
ticker := mockClock.NewTicker(time.Nanosecond)

p := newConnectorImp(t, mcon, stringp("defaultNullValue"), tc.histogramConfig, tc.exemplarConfig, disabledEventsConfig, tc.aggregationTemporality, zaptest.NewLogger(t), ticker)
p := newConnectorImp(t, mcon, stringp("defaultNullValue"), tc.histogramConfig, tc.exemplarConfig, disabledEventsConfig, tc.aggregationTemporality, []string{}, zaptest.NewLogger(t), ticker)

ctx := metadata.NewIncomingContext(context.Background(), nil)
err := p.Start(ctx, componenttest.NewNopHost())
Expand All @@ -868,7 +868,7 @@ func TestConsumeTraces(t *testing.T) {
func TestMetricKeyCache(t *testing.T) {
mcon := consumertest.NewNop()

p := newConnectorImp(t, mcon, stringp("defaultNullValue"), explicitHistogramsConfig, disabledExemplarsConfig, disabledEventsConfig, cumulative, zaptest.NewLogger(t), nil)
p := newConnectorImp(t, mcon, stringp("defaultNullValue"), explicitHistogramsConfig, disabledExemplarsConfig, disabledEventsConfig, cumulative, []string{}, zaptest.NewLogger(t), nil)
traces := buildSampleTrace()

// Test
Expand Down Expand Up @@ -898,7 +898,7 @@ func TestMetricKeyCache(t *testing.T) {
func TestResourceMetricsCache(t *testing.T) {
mcon := consumertest.NewNop()

p := newConnectorImp(t, mcon, stringp("defaultNullValue"), explicitHistogramsConfig, disabledExemplarsConfig, disabledEventsConfig, cumulative, zaptest.NewLogger(t), nil)
p := newConnectorImp(t, mcon, stringp("defaultNullValue"), explicitHistogramsConfig, disabledExemplarsConfig, disabledEventsConfig, cumulative, []string{}, zaptest.NewLogger(t), nil)

// Test
ctx := metadata.NewIncomingContext(context.Background(), nil)
Expand Down Expand Up @@ -933,11 +933,53 @@ func TestResourceMetricsCache(t *testing.T) {
assert.Equal(t, resourceMetricsCacheSize, p.resourceMetrics.Len())
}

func TestResourceMetricsKeyAttributes(t *testing.T) {
mcon := consumertest.NewNop()

resourceMetricsKeyAttributes := []string{
"service.name",
}

p := newConnectorImp(t, mcon, stringp("defaultNullValue"), explicitHistogramsConfig, disabledExemplarsConfig, disabledEventsConfig, cumulative, resourceMetricsKeyAttributes, zaptest.NewLogger(t), nil)

// Test
ctx := metadata.NewIncomingContext(context.Background(), nil)

// 0 resources in the beginning
assert.Zero(t, p.resourceMetrics.Len())

err := p.ConsumeTraces(ctx, buildSampleTrace())
// Validate
require.NoError(t, err)
assert.Equal(t, 2, p.resourceMetrics.Len())

// consume another batch of traces for the same resources
err = p.ConsumeTraces(ctx, buildSampleTrace())
require.NoError(t, err)
assert.Equal(t, 2, p.resourceMetrics.Len())

// consume more batches for new resources. Max size is exceeded causing old resource entries to be discarded
for i := 0; i < resourceMetricsCacheSize; i++ {
traces := buildSampleTrace()

// add resource attributes to simulate additional resources providing data
for j := 0; j < traces.ResourceSpans().Len(); j++ {
traces.ResourceSpans().At(j).Resource().Attributes().PutStr("not included in resource key attributes", fmt.Sprintf("%d", i))
}

err = p.ConsumeTraces(ctx, traces)
require.NoError(t, err)
}

// validate that the additional resources providing data did not result in additional resource metrics
assert.Equal(t, 2, p.resourceMetrics.Len())
}

func BenchmarkConnectorConsumeTraces(b *testing.B) {
// Prepare
mcon := consumertest.NewNop()

conn := newConnectorImp(nil, mcon, stringp("defaultNullValue"), explicitHistogramsConfig, disabledExemplarsConfig, disabledEventsConfig, cumulative, zaptest.NewLogger(b), nil)
conn := newConnectorImp(nil, mcon, stringp("defaultNullValue"), explicitHistogramsConfig, disabledExemplarsConfig, disabledEventsConfig, cumulative, []string{}, zaptest.NewLogger(b), nil)

traces := buildSampleTrace()

Expand All @@ -951,7 +993,7 @@ func BenchmarkConnectorConsumeTraces(b *testing.B) {
func TestExcludeDimensionsConsumeTraces(t *testing.T) {
mcon := consumertest.NewNop()
excludeDimensions := []string{"span.kind", "span.name", "totallyWrongNameDoesNotAffectAnything"}
p := newConnectorImp(t, mcon, stringp("defaultNullValue"), explicitHistogramsConfig, disabledExemplarsConfig, disabledEventsConfig, cumulative, zaptest.NewLogger(t), nil, excludeDimensions...)
p := newConnectorImp(t, mcon, stringp("defaultNullValue"), explicitHistogramsConfig, disabledExemplarsConfig, disabledEventsConfig, cumulative, []string{}, zaptest.NewLogger(t), nil, excludeDimensions...)
traces := buildSampleTrace()

// Test
Expand Down Expand Up @@ -1000,15 +1042,16 @@ func TestExcludeDimensionsConsumeTraces(t *testing.T) {

}

func newConnectorImp(t *testing.T, mcon consumer.Metrics, defaultNullValue *string, histogramConfig func() HistogramConfig, exemplarsConfig func() ExemplarsConfig, eventsConfig func() EventsConfig, temporality string, logger *zap.Logger, ticker *clock.Ticker, excludedDimensions ...string) *connectorImp {
func newConnectorImp(t *testing.T, mcon consumer.Metrics, defaultNullValue *string, histogramConfig func() HistogramConfig, exemplarsConfig func() ExemplarsConfig, eventsConfig func() EventsConfig, temporality string, resourceMetricsKeyAttributes []string, logger *zap.Logger, ticker *clock.Ticker, excludedDimensions ...string) *connectorImp {

cfg := &Config{
AggregationTemporality: temporality,
Histogram: histogramConfig(),
Exemplars: exemplarsConfig(),
ExcludeDimensions: excludedDimensions,
DimensionsCacheSize: dimensionsCacheSize,
ResourceMetricsCacheSize: resourceMetricsCacheSize,
AggregationTemporality: temporality,
Histogram: histogramConfig(),
Exemplars: exemplarsConfig(),
ExcludeDimensions: excludedDimensions,
DimensionsCacheSize: dimensionsCacheSize,
ResourceMetricsCacheSize: resourceMetricsCacheSize,
ResourceMetricsKeyAttributes: resourceMetricsKeyAttributes,
Dimensions: []Dimension{
// Set nil defaults to force a lookup for the attribute in the span.
{stringAttrName, nil},
Expand Down Expand Up @@ -1120,7 +1163,7 @@ func TestConnectorConsumeTracesEvictedCacheKey(t *testing.T) {
ticker := mockClock.NewTicker(time.Nanosecond)

// Note: default dimension key cache size is 2.
p := newConnectorImp(t, mcon, stringp("defaultNullValue"), explicitHistogramsConfig, disabledExemplarsConfig, disabledEventsConfig, cumulative, zaptest.NewLogger(t), ticker)
p := newConnectorImp(t, mcon, stringp("defaultNullValue"), explicitHistogramsConfig, disabledExemplarsConfig, disabledEventsConfig, cumulative, []string{}, zaptest.NewLogger(t), ticker)

ctx := metadata.NewIncomingContext(context.Background(), nil)
err := p.Start(ctx, componenttest.NewNopHost())
Expand Down Expand Up @@ -1374,7 +1417,7 @@ func TestSpanMetrics_Events(t *testing.T) {
}
func TestExemplarsForSumMetrics(t *testing.T) {
mcon := consumertest.NewNop()
p := newConnectorImp(t, mcon, stringp("defaultNullValue"), explicitHistogramsConfig, enabledExemplarsConfig, enabledEventsConfig, cumulative, zaptest.NewLogger(t), nil)
p := newConnectorImp(t, mcon, stringp("defaultNullValue"), explicitHistogramsConfig, enabledExemplarsConfig, enabledEventsConfig, cumulative, []string{}, zaptest.NewLogger(t), nil)
traces := buildSampleTrace()

// Test
Expand Down
7 changes: 7 additions & 0 deletions connector/spanmetricsconnector/testdata/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -68,3 +68,10 @@ spanmetrics/exemplars_enabled_with_max_per_datapoint:
exemplars:
enabled: true
max_per_data_point: 5

# resource metrics key attributes filter
spanmetrics/resource_metrics_key_attributes:
resource_metrics_key_attributes:
- service.name
- telemetry.sdk.language
- telemetry.sdk.name

0 comments on commit 4acf9f8

Please sign in to comment.