From 9d918e270aba512c9374be0924caefd54ad41daa Mon Sep 17 00:00:00 2001 From: Rob Skillington Date: Wed, 11 Nov 2020 21:05:45 -0500 Subject: [PATCH 1/3] [coordinator] Set default namespace tag to avoid colliding with common "namespace" default value --- src/cmd/services/m3coordinator/downsample/options.go | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/src/cmd/services/m3coordinator/downsample/options.go b/src/cmd/services/m3coordinator/downsample/options.go index b1b61df731..7f259f5572 100644 --- a/src/cmd/services/m3coordinator/downsample/options.go +++ b/src/cmd/services/m3coordinator/downsample/options.go @@ -83,6 +83,7 @@ const ( defaultBufferFutureTimedMetric = time.Minute defaultVerboseErrors = true defaultMatcherCacheCapacity = 100000 + defaultNamespaceTag = "__m3_namespace__" ) var ( @@ -267,6 +268,9 @@ type Configuration struct { type MatcherConfiguration struct { // Cache if non-zero will set the capacity of the rules matching cache. Cache MatcherCacheConfiguration `yaml:"cache"` + // NamespaceTag defines the namespace tag to use to select rules + // namespace to evaluate against. Default is "__m3_namespace__". + NamespaceTag string `yaml:"namespaceTag"` } // MatcherCacheConfiguration is the configuration for the rule matcher cache. @@ -647,6 +651,7 @@ func (cfg Configuration) newAggregator(o DownsamplerOptions) (agg, error) { logger = instrumentOpts.Logger() openTimeout = defaultOpenTimeout m3PrefixFilter = false + namespaceTag = defaultNamespaceTag ) if o.StorageFlushConcurrency > 0 { storageFlushConcurrency = o.StorageFlushConcurrency @@ -654,6 +659,9 @@ func (cfg Configuration) newAggregator(o DownsamplerOptions) (agg, error) { if o.OpenTimeout > 0 { openTimeout = o.OpenTimeout } + if cfg.Matcher.NamespaceTag != "" { + namespaceTag = cfg.Matcher.NamespaceTag + } pools := o.newAggregatorPools() ruleSetOpts := o.newAggregatorRulesOptions(pools) @@ -662,7 +670,8 @@ func (cfg Configuration) newAggregator(o DownsamplerOptions) (agg, error) { SetClockOptions(clockOpts). SetInstrumentOptions(instrumentOpts). SetRuleSetOptions(ruleSetOpts). - SetKVStore(o.RulesKVStore) + SetKVStore(o.RulesKVStore). + SetNamespaceTag([]byte(namespaceTag)) // NB(r): If rules are being explicitly set in config then we are // going to use an in memory KV store for rules and explicitly set them up. From 9db9202cf19fcdc998038c3ec82c7ae88cc9db84 Mon Sep 17 00:00:00 2001 From: Rob Skillington Date: Wed, 11 Nov 2020 21:07:53 -0500 Subject: [PATCH 2/3] Use defined constant --- src/cmd/services/m3coordinator/downsample/options.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/cmd/services/m3coordinator/downsample/options.go b/src/cmd/services/m3coordinator/downsample/options.go index 7f259f5572..da966cfe00 100644 --- a/src/cmd/services/m3coordinator/downsample/options.go +++ b/src/cmd/services/m3coordinator/downsample/options.go @@ -83,11 +83,11 @@ const ( defaultBufferFutureTimedMetric = time.Minute defaultVerboseErrors = true defaultMatcherCacheCapacity = 100000 - defaultNamespaceTag = "__m3_namespace__" ) var ( - numShards = runtime.NumCPU() + numShards = runtime.NumCPU() + defaultNamespaceTag = metric.M3MetricsPrefixString + "_namespace__" errNoStorage = errors.New("downsampling enabled with storage not set") errNoClusterClient = errors.New("downsampling enabled with cluster client not set") From 9c3cf01bd6cb72c669118999bf1788f0cb8d17ea Mon Sep 17 00:00:00 2001 From: Wesley Kim Date: Wed, 11 Nov 2020 22:28:34 -0500 Subject: [PATCH 3/3] Add downsampler test case to demonstrate override namespace tag --- .../downsample/downsampler_test.go | 83 +++++++++++++++++++ 1 file changed, 83 insertions(+) diff --git a/src/cmd/services/m3coordinator/downsample/downsampler_test.go b/src/cmd/services/m3coordinator/downsample/downsampler_test.go index bc850de531..4b9f4aac64 100644 --- a/src/cmd/services/m3coordinator/downsample/downsampler_test.go +++ b/src/cmd/services/m3coordinator/downsample/downsampler_test.go @@ -1274,6 +1274,87 @@ func TestDownsamplerAggregationWithRemoteAggregatorClient(t *testing.T) { testDownsamplerRemoteAggregation(t, testDownsampler) } +func TestDownsamplerWithOverrideNamespace(t *testing.T) { + overrideNamespaceTag := "override_namespace_tag" + + gaugeMetric := testGaugeMetric{ + tags: map[string]string{ + nameTag: "http_requests", + "app": "nginx_edge", + "status_code": "500", + "endpoint": "/foo/bar", + "not_rolled_up": "not_rolled_up_value", + // Set namespace tags on ingested metrics. + // The test demonstrates that overrideNamespaceTag is respected, meaning setting + // values on defaultNamespaceTag won't affect aggregation. + defaultNamespaceTag: "namespace_ignored", + }, + timedSamples: []testGaugeMetricTimedSample{ + {value: 42}, + {value: 64, offset: 5 * time.Second}, + }, + } + res := 5 * time.Second + ret := 30 * 24 * time.Hour + testDownsampler := newTestDownsampler(t, testDownsamplerOptions{ + rulesConfig: &RulesConfiguration{ + RollupRules: []RollupRuleConfiguration{ + { + Filter: fmt.Sprintf( + "%s:http_requests app:* status_code:* endpoint:*", + nameTag), + Transforms: []TransformConfiguration{ + { + Transform: &TransformOperationConfiguration{ + Type: transformation.PerSecond, + }, + }, + { + Rollup: &RollupOperationConfiguration{ + MetricName: "http_requests_by_status_code", + GroupBy: []string{"app", "status_code", "endpoint"}, + Aggregations: []aggregation.Type{aggregation.Sum}, + }, + }, + }, + StoragePolicies: []StoragePolicyConfiguration{ + { + Resolution: res, + Retention: ret, + }, + }, + }, + }, + }, + matcherConfig: MatcherConfiguration{NamespaceTag: overrideNamespaceTag}, + ingest: &testDownsamplerOptionsIngest{ + gaugeMetrics: []testGaugeMetric{gaugeMetric}, + }, + expect: &testDownsamplerOptionsExpect{ + writes: []testExpectedWrite{ + { + tags: map[string]string{ + nameTag: "http_requests_by_status_code", + string(rollupTagName): string(rollupTagValue), + "app": "nginx_edge", + "status_code": "500", + "endpoint": "/foo/bar", + }, + values: []expectedValue{{value: 4.4}}, + attributes: &storagemetadata.Attributes{ + MetricsType: storagemetadata.AggregatedMetricsType, + Resolution: res, + Retention: ret, + }, + }, + }, + }, + }) + + // Test expected output + testDownsamplerAggregation(t, testDownsampler) +} + func originalStagedMetadata(t *testing.T, testDownsampler testDownsampler) []metricpb.StagedMetadatas { ds, ok := testDownsampler.downsampler.(*downsampler) require.True(t, ok) @@ -1751,6 +1832,7 @@ type testDownsamplerOptions struct { sampleAppenderOpts *SampleAppenderOptions remoteClientMock *client.MockClient rulesConfig *RulesConfiguration + matcherConfig MatcherConfiguration // Test ingest and expectations overrides ingest *testDownsamplerOptionsIngest @@ -1821,6 +1903,7 @@ func newTestDownsampler(t *testing.T, opts testDownsamplerOptions) testDownsampl if opts.rulesConfig != nil { cfg.Rules = opts.rulesConfig } + cfg.Matcher = opts.matcherConfig instance, err := cfg.NewDownsampler(DownsamplerOptions{ Storage: storage,