Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[coordinator] Apply auto-mapping rules if-and-only-if no drop policies are in effect #3339

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
128 changes: 126 additions & 2 deletions src/cmd/services/m3coordinator/downsample/downsampler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -447,6 +447,88 @@ func TestDownsamplerAggregationWithRulesConfigMappingRules(t *testing.T) {
testDownsamplerAggregation(t, testDownsampler)
}

func TestDownsamplerAggregationWithAutoMappingRulesAndRulesConfigMappingRulesAndDropRule(t *testing.T) {
t.Parallel()

gaugeMetric := testGaugeMetric{
tags: map[string]string{
nameTag: "foo_metric",
"app": "nginx_edge",
"env": "staging",
},
timedSamples: []testGaugeMetricTimedSample{
{value: 15}, {value: 10}, {value: 30}, {value: 5}, {value: 0},
},
expectDropPolicyApplied: true,
}
testDownsampler := newTestDownsampler(t, testDownsamplerOptions{
autoMappingRules: []m3.ClusterNamespaceOptions{
m3.NewClusterNamespaceOptions(
storagemetadata.Attributes{
MetricsType: storagemetadata.AggregatedMetricsType,
Retention: 2 * time.Hour,
Resolution: 1 * time.Second,
},
nil,
),
m3.NewClusterNamespaceOptions(
storagemetadata.Attributes{
MetricsType: storagemetadata.AggregatedMetricsType,
Retention: 12 * time.Hour,
Resolution: 5 * time.Second,
},
nil,
),
},
rulesConfig: &RulesConfiguration{
MappingRules: []MappingRuleConfiguration{
{
Filter: "env:staging",
Drop: true,
},
{
Filter: "app:nginx*",
Aggregations: []aggregation.Type{aggregation.Max},
StoragePolicies: []StoragePolicyConfiguration{
{
Resolution: 10 * time.Second,
Retention: 30 * 24 * time.Hour,
},
},
},
},
},
ingest: &testDownsamplerOptionsIngest{
gaugeMetrics: []testGaugeMetric{gaugeMetric},
},
expect: &testDownsamplerOptionsExpect{
allowFilter: &testDownsamplerOptionsExpectAllowFilter{
attributes: []storagemetadata.Attributes{
{
MetricsType: storagemetadata.AggregatedMetricsType,
Resolution: 10 * time.Second,
Retention: 30 * 24 * time.Hour,
},
},
},
writes: []testExpectedWrite{
{
tags: gaugeMetric.tags,
values: []expectedValue{{value: 30}},
attributes: &storagemetadata.Attributes{
MetricsType: storagemetadata.AggregatedMetricsType,
Resolution: 10 * time.Second,
Retention: 30 * 24 * time.Hour,
},
},
},
},
})

// Test expected output
testDownsamplerAggregation(t, testDownsampler)
}

func TestDownsamplerAggregationWithRulesConfigMappingRulesPartialReplaceAutoMappingRuleFromNamespacesWatcher(t *testing.T) {
t.Parallel()

Expand Down Expand Up @@ -2270,12 +2352,14 @@ func testDownsamplerAggregation(
expectedWrites := append(counterMetricsExpect, gaugeMetricsExpect...)

// Allow overrides
var allowFilter *testDownsamplerOptionsExpectAllowFilter
if ingest := testOpts.ingest; ingest != nil {
counterMetrics = ingest.counterMetrics
gaugeMetrics = ingest.gaugeMetrics
}
if expect := testOpts.expect; expect != nil {
expectedWrites = expect.writes
allowFilter = expect.allowFilter
}

// Ingest points
Expand Down Expand Up @@ -2393,6 +2477,24 @@ CheckAllWritesArrivedLoop:
}
}
}

if allowFilter == nil {
return // No allow filter checking required.
}

for _, write := range testDownsampler.storage.Writes() {
attrs := write.Attributes()
foundMatchingAttribute := false
for _, allowed := range allowFilter.attributes {
if allowed == attrs {
foundMatchingAttribute = true
break
}
}
assert.True(t, foundMatchingAttribute,
fmt.Sprintf("attribute not allowed: allowed=%v, actual=%v",
allowFilter.attributes, attrs))
}
}

func testDownsamplerRemoteAggregation(
Expand Down Expand Up @@ -2618,7 +2720,7 @@ type testDownsamplerOptions struct {
identTag string

// Options for the test
autoMappingRules []AutoMappingRule
autoMappingRules []m3.ClusterNamespaceOptions
sampleAppenderOpts *SampleAppenderOptions
remoteClientMock *client.MockClient
rulesConfig *RulesConfiguration
Expand All @@ -2635,7 +2737,12 @@ type testDownsamplerOptionsIngest struct {
}

type testDownsamplerOptionsExpect struct {
writes []testExpectedWrite
writes []testExpectedWrite
allowFilter *testDownsamplerOptionsExpectAllowFilter
}

type testDownsamplerOptionsExpectAllowFilter struct {
attributes []storagemetadata.Attributes
}

func newTestDownsampler(t *testing.T, opts testDownsamplerOptions) testDownsampler {
Expand Down Expand Up @@ -2719,6 +2826,23 @@ func newTestDownsampler(t *testing.T, opts testDownsamplerOptions) testDownsampl
})
require.NoError(t, err)

if len(opts.autoMappingRules) > 0 {
// Simulate the automapping rules being injected into the downsampler.
ctrl := gomock.NewController(t)

var mockNamespaces m3.ClusterNamespaces
for _, r := range opts.autoMappingRules {
n := m3.NewMockClusterNamespace(ctrl)
n.EXPECT().
Options().
Return(r).
AnyTimes()
mockNamespaces = append(mockNamespaces, n)
}

instance.(*downsampler).OnUpdate(mockNamespaces)
}

downcast, ok := instance.(*downsampler)
require.True(t, ok)

Expand Down
166 changes: 87 additions & 79 deletions src/cmd/services/m3coordinator/downsample/metrics_appender.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,14 +209,11 @@ func (a *metricsAppender) SamplesAppender(opts SampleAppenderOptions) (SamplesAp
tags.filterPrefix(filter)
}

var (
dropApplyResult metadata.ApplyOrRemoveDropPoliciesResult
dropTimestamp bool
)
if opts.Override {
// Reuse a slice to keep the current staged metadatas we will apply.
a.curr.Pipelines = a.curr.Pipelines[:0]
// Reuse a slice to keep the current staged metadatas we will apply.
a.curr.Pipelines = a.curr.Pipelines[:0]

if opts.Override {
// Process an override explicitly provided as part of request.
for _, rule := range opts.OverrideRules.MappingRules {
stagedMetadatas, err := rule.StagedMetadatas()
if err != nil {
Expand All @@ -234,56 +231,68 @@ func (a *metricsAppender) SamplesAppender(opts SampleAppenderOptions) (SamplesAp
if err := a.addSamplesAppenders(tags, a.curr); err != nil {
return SamplesAppenderResult{}, err
}
} else {
// Reuse a slice to keep the current staged metadatas we will apply.
a.curr.Pipelines = a.curr.Pipelines[:0]

// NB(r): First apply mapping rules to see which storage policies
// have been applied, any that have been applied as part of
// mapping rules that exact match a default storage policy will be
// skipped when applying default rules, so as to avoid storing
// the same metrics in the same namespace with the same metric
// name and tags (i.e. overwriting each other).
a.mappingRuleStoragePolicies = a.mappingRuleStoragePolicies[:0]

ruleStagedMetadatas := matchResult.ForExistingIDAt(nowNanos)
if !ruleStagedMetadatas.IsDefault() && len(ruleStagedMetadatas) != 0 {
a.debugLogMatch("downsampler applying matched rule",
debugLogMatchOptions{Meta: ruleStagedMetadatas})

// Collect storage policies for all the current active mapping rules.
// TODO: we should convert this to iterate over pointers
// nolint:gocritic
for _, stagedMetadata := range ruleStagedMetadatas {
for _, pipe := range stagedMetadata.Pipelines {
// Skip rollup rules unless configured otherwise.
// We only want to consider mapping rules here,
// as we still want to apply default mapping rules to
// metrics that are rolled up to ensure the underlying metric
// gets written to aggregated namespaces.
if pipe.IsMappingRule() {
for _, sp := range pipe.StoragePolicies {
a.mappingRuleStoragePolicies =
append(a.mappingRuleStoragePolicies, sp)
}
} else {
a.debugLogMatch(
"skipping rollup rule in populating active mapping rule policies",
debugLogMatchOptions{},
)

return SamplesAppenderResult{
SamplesAppender: a.multiSamplesAppender,
IsDropPolicyApplied: false,
ShouldDropTimestamp: false,
}, nil
}

// NB(r): First apply mapping rules to see which storage policies
// have been applied, any that have been applied as part of
// mapping rules that exact match a default storage policy will be
// skipped when applying default rules, so as to avoid storing
// the same metrics in the same namespace with the same metric
// name and tags (i.e. overwriting each other).
var (
ruleStagedMetadatas = matchResult.ForExistingIDAt(nowNanos)
dropApplyResult metadata.ApplyOrRemoveDropPoliciesResult
dropTimestamp bool
)
a.mappingRuleStoragePolicies = a.mappingRuleStoragePolicies[:0]
if !ruleStagedMetadatas.IsDefault() && len(ruleStagedMetadatas) != 0 {
a.debugLogMatch("downsampler applying matched rule",
debugLogMatchOptions{Meta: ruleStagedMetadatas})

// Collect storage policies for all the current active mapping rules.
// TODO: we should convert this to iterate over pointers
// nolint:gocritic
for _, stagedMetadata := range ruleStagedMetadatas {
for _, pipe := range stagedMetadata.Pipelines {
// Skip rollup rules unless configured otherwise.
// We only want to consider mapping rules here,
// as we still want to apply default mapping rules to
// metrics that are rolled up to ensure the underlying metric
// gets written to aggregated namespaces.
if pipe.IsMappingRule() {
for _, sp := range pipe.StoragePolicies {
a.mappingRuleStoragePolicies =
append(a.mappingRuleStoragePolicies, sp)
}
} else {
a.debugLogMatch(
"skipping rollup rule in populating active mapping rule policies",
debugLogMatchOptions{},
)
}
}

// Only sample if going to actually aggregate
pipelines := ruleStagedMetadatas[len(ruleStagedMetadatas)-1]
a.curr.Pipelines =
append(a.curr.Pipelines, pipelines.Pipelines...)
}

// Always aggregate any default staged metadatas (unless
// mapping rule has provided an override for a storage policy,
// if so then skip aggregating for that storage policy).
// Only sample if going to actually aggregate
pipelines := ruleStagedMetadatas[len(ruleStagedMetadatas)-1]
a.curr.Pipelines = append(a.curr.Pipelines, pipelines.Pipelines...)
}

// Always aggregate any default staged metadatas with a few exceptions.
// Exceptions are:
// 1. A mapping rule has provided an override for a storage policy,
// if so then skip aggregating for that storage policy).
// 2. Any type of drop rule has been set, since they should only
// impact mapping rules, not default staged metadatas provided from
// auto-mapping rules (i.e. default namespace aggregation).
if !a.curr.Pipelines.IsDropPolicySet() {
// No drop rule has been set as part of rule matching.
for idx, stagedMetadatasProto := range a.defaultStagedMetadatasProtos {
// NB(r): Need to take copy of default staged metadatas as we
// sometimes mutate it.
Expand Down Expand Up @@ -362,40 +371,39 @@ func (a *metricsAppender) SamplesAppender(opts SampleAppenderOptions) (SamplesAp
debugLogMatchOptions{Meta: stagedMetadatas})

pipelines := stagedMetadatas[len(stagedMetadatas)-1]
a.curr.Pipelines =
append(a.curr.Pipelines, pipelines.Pipelines...)
a.curr.Pipelines = append(a.curr.Pipelines, pipelines.Pipelines...)
}
}

// Apply the custom tags first so that they apply even if mapping
// rules drop the metric.
dropTimestamp = a.curr.Pipelines.ApplyCustomTags()
// Apply the custom tags first so that they apply even if mapping
// rules drop the metric.
dropTimestamp = a.curr.Pipelines.ApplyCustomTags()

// Apply drop policies results
a.curr.Pipelines, dropApplyResult = a.curr.Pipelines.ApplyOrRemoveDropPolicies()
// Apply drop policies results
a.curr.Pipelines, dropApplyResult = a.curr.Pipelines.ApplyOrRemoveDropPolicies()

if len(a.curr.Pipelines) > 0 && !a.curr.IsDropPolicyApplied() {
// Send to downsampler if we have something in the pipeline.
a.debugLogMatch("downsampler using built mapping staged metadatas",
debugLogMatchOptions{Meta: []metadata.StagedMetadata{a.curr}})
if len(a.curr.Pipelines) > 0 && !a.curr.IsDropPolicyApplied() {
// Send to downsampler if we have something in the pipeline.
a.debugLogMatch("downsampler using built mapping staged metadatas",
debugLogMatchOptions{Meta: []metadata.StagedMetadata{a.curr}})

if err := a.addSamplesAppenders(tags, a.curr); err != nil {
return SamplesAppenderResult{}, err
}
if err := a.addSamplesAppenders(tags, a.curr); err != nil {
return SamplesAppenderResult{}, err
}
}

numRollups := matchResult.NumNewRollupIDs()
for i := 0; i < numRollups; i++ {
rollup := matchResult.ForNewRollupIDsAt(i, nowNanos)

a.debugLogMatch("downsampler applying matched rollup rule",
debugLogMatchOptions{Meta: rollup.Metadatas, RollupID: rollup.ID})
a.multiSamplesAppender.addSamplesAppender(samplesAppender{
agg: a.agg,
clientRemote: a.clientRemote,
unownedID: rollup.ID,
stagedMetadatas: rollup.Metadatas,
})
}
numRollups := matchResult.NumNewRollupIDs()
for i := 0; i < numRollups; i++ {
rollup := matchResult.ForNewRollupIDsAt(i, nowNanos)

a.debugLogMatch("downsampler applying matched rollup rule",
debugLogMatchOptions{Meta: rollup.Metadatas, RollupID: rollup.ID})
a.multiSamplesAppender.addSamplesAppender(samplesAppender{
agg: a.agg,
clientRemote: a.clientRemote,
unownedID: rollup.ID,
stagedMetadatas: rollup.Metadatas,
})
}

dropPolicyApplied := dropApplyResult != metadata.NoDropPolicyPresentResult
Expand Down
Loading