From cdf5f1196534bb8891de58bace8848ba2e6036bf Mon Sep 17 00:00:00 2001 From: Gediminas Guoba Date: Tue, 19 Jan 2021 13:12:13 +0200 Subject: [PATCH] Revert "[aggregator] keep metric type during the aggregation" (#3099) --- .../m3_stack/m3coordinator-aggregator.yml | 2 - .../m3_stack/m3coordinator-standard.yml | 2 - .../aggregator/m3coordinator.yml | 2 - .../aggregator/test.sh | 52 -------- src/aggregator/aggregation/counter_test.go | 2 - src/aggregator/aggregator/aggregator.go | 1 - src/aggregator/aggregator/counter_elem_gen.go | 7 +- src/aggregator/aggregator/elem_base_test.go | 4 +- src/aggregator/aggregator/elem_test.go | 11 +- src/aggregator/aggregator/flush.go | 2 - src/aggregator/aggregator/gauge_elem_gen.go | 7 +- src/aggregator/aggregator/generic_elem.go | 6 +- .../aggregator/handler/writer/protobuf.go | 1 - src/aggregator/aggregator/list.go | 4 - src/aggregator/aggregator/list_test.go | 2 - src/aggregator/aggregator/timer_elem_gen.go | 7 +- src/aggregator/generated-source-files.mk | 1 - .../m3coordinator/ingest/m3msg/config.go | 5 +- .../m3coordinator/ingest/m3msg/ingest.go | 116 ++++++------------ .../m3coordinator/ingest/m3msg/ingest_test.go | 10 +- .../services/m3coordinator/ingest/write.go | 27 ++-- .../server/m3msg/protobuf_handler.go | 2 +- .../server/m3msg/protobuf_handler_test.go | 2 - .../m3coordinator/server/m3msg/types.go | 2 - src/metrics/aggregation/type.go | 2 +- .../encoding/protobuf/aggregated_decoder.go | 11 -- src/metrics/metric/aggregated/types.go | 1 - .../api/v1/handler/prometheus/remote/write.go | 24 +--- .../handler/prometheus/remote/write_test.go | 83 ------------- src/query/server/query.go | 7 +- src/query/server/query_test.go | 4 +- src/query/storage/converter.go | 13 +- src/query/storage/converter_test.go | 8 +- src/x/headers/headers.go | 5 - 34 files changed, 85 insertions(+), 350 deletions(-) diff --git a/scripts/development/m3_stack/m3coordinator-aggregator.yml b/scripts/development/m3_stack/m3coordinator-aggregator.yml index 2591de57d2..b622668a4a 100644 --- a/scripts/development/m3_stack/m3coordinator-aggregator.yml +++ b/scripts/development/m3_stack/m3coordinator-aggregator.yml @@ -80,5 +80,3 @@ carbon: tagOptions: idScheme: quoted - -storeMetricsType: true diff --git a/scripts/development/m3_stack/m3coordinator-standard.yml b/scripts/development/m3_stack/m3coordinator-standard.yml index 5da1d01b5b..16137c65b4 100644 --- a/scripts/development/m3_stack/m3coordinator-standard.yml +++ b/scripts/development/m3_stack/m3coordinator-standard.yml @@ -36,5 +36,3 @@ carbon: tagOptions: idScheme: quoted - -storeMetricsType: true diff --git a/scripts/docker-integration-tests/aggregator/m3coordinator.yml b/scripts/docker-integration-tests/aggregator/m3coordinator.yml index 59319cfde7..10ba278c91 100644 --- a/scripts/docker-integration-tests/aggregator/m3coordinator.yml +++ b/scripts/docker-integration-tests/aggregator/m3coordinator.yml @@ -77,5 +77,3 @@ ingest: retry: maxBackoff: 10s jitter: true - -storeMetricsType: true diff --git a/scripts/docker-integration-tests/aggregator/test.sh b/scripts/docker-integration-tests/aggregator/test.sh index a0453ef12a..8d287df7e0 100755 --- a/scripts/docker-integration-tests/aggregator/test.sh +++ b/scripts/docker-integration-tests/aggregator/test.sh @@ -170,14 +170,12 @@ function prometheus_remote_write { local label1_value=${label1_value:-label1} local label2_name=${label2_name:-label2} local label2_value=${label2_value:-label2} - local metric_type=${metric_type:counter} network_name="aggregator" network=$(docker network ls | fgrep $network_name | tr -s ' ' | cut -f 1 -d ' ' | tail -n 1) out=$((docker run -it --rm --network $network \ $PROMREMOTECLI_IMAGE \ -u http://m3coordinator01:7202/api/v1/prom/remote/write \ - -h M3-Prom-Type:${metric_type} \ -t __name__:${metric_name} \ -t ${label0_name}:${label0_value} \ -t ${label1_name}:${label1_value} \ @@ -219,22 +217,6 @@ function prometheus_query_native { return $? } -function dbnode_fetch { - local namespace=${namespace} - local id=${id} - local rangeStart=${rangeStart} - local rangeEnd=${rangeEnd} - local jq_path=${jq_path:-} - local expected_value=${expected_value:-} - - result=$(curl -s \ - "0.0.0.0:9002/fetch" \ - "-d" \ - "{\"namespace\": \"${namespace}\", \"id\": \"${id}\", \"rangeStart\": ${rangeStart}, \"rangeEnd\": ${rangeEnd}}" | jq -r "${jq_path}") - test "$result" = "$expected_value" - return $? -} - function test_aggregated_rollup_rule { resolution_seconds="10" now=$(date +"%s") @@ -252,7 +234,6 @@ function test_aggregated_rollup_rule { label0_name="app" label0_value="nginx_edge" \ label1_name="status_code" label1_value="500" \ label2_name="endpoint" label2_value="/foo/bar" \ - metric_type="counter" \ prometheus_remote_write \ http_requests $write_at $value \ true "Expected request to succeed" \ @@ -270,7 +251,6 @@ function test_aggregated_rollup_rule { label0_name="app" label0_value="nginx_edge" \ label1_name="status_code" label1_value="500" \ label2_name="endpoint" label2_value="/foo/baz" \ - metric_type="gauge" \ prometheus_remote_write \ http_requests $write_at $value \ true "Expected request to succeed" \ @@ -304,38 +284,6 @@ function test_aggregated_rollup_rule { retry_with_backoff prometheus_query_native } -function test_metric_type_survives_aggregation { - now=$(date +"%s") - - echo "Test metric type should be kept after aggregation" - - # Emit values for endpoint /foo/bar (to ensure right values aggregated) - write_at="$now_truncated" - value="42" - - metric_type="counter" \ - prometheus_remote_write \ - metric_type_test $now $value \ - true "Expected request to succeed" \ - 200 "Expected request to return status code 200" - - start=$(( $now - 3600 )) - end=$(( $now + 3600 )) - jq_path=".datapoints[0].annotation" - - echo "Test query metric type" - - # Test by metric types are stored in aggregated namespace - ATTEMPTS=50 TIMEOUT=2 MAX_TIMEOUT=4 \ - namespace="agg" \ - id='{__name__=\"metric_type_test\",label0=\"label0\",label1=\"label1\",label2=\"label2\"}' \ - rangeStart=${start} \ - rangeEnd=${end} \ - jq_path="$jq_path" expected_value="CAEQAQ==" \ - retry_with_backoff dbnode_fetch -} - echo "Run tests" test_aggregated_graphite_metric test_aggregated_rollup_rule -test_metric_type_survives_aggregation diff --git a/src/aggregator/aggregation/counter_test.go b/src/aggregator/aggregation/counter_test.go index 57610479aa..47492459fe 100644 --- a/src/aggregator/aggregation/counter_test.go +++ b/src/aggregator/aggregation/counter_test.go @@ -70,8 +70,6 @@ func TestCounterCustomAggregationType(t *testing.T) { require.Equal(t, float64(338350), v) case aggregation.Stdev: require.InDelta(t, 29.01149, v, 0.001) - case aggregation.Last: - require.Equal(t, 0.0, v) default: require.Equal(t, float64(0), v) require.False(t, aggType.IsValidForCounter()) diff --git a/src/aggregator/aggregator/aggregator.go b/src/aggregator/aggregator/aggregator.go index 2e30c8b4e5..5e0df44e9c 100644 --- a/src/aggregator/aggregator/aggregator.go +++ b/src/aggregator/aggregator/aggregator.go @@ -287,7 +287,6 @@ func (agg *aggregator) AddPassthrough( ChunkedID: id.ChunkedID{ Data: []byte(metric.ID), }, - Type: metric.Type, TimeNanos: metric.TimeNanos, Value: metric.Value, }, diff --git a/src/aggregator/aggregator/counter_elem_gen.go b/src/aggregator/aggregator/counter_elem_gen.go index 040a988321..625ce7de2d 100644 --- a/src/aggregator/aggregator/counter_elem_gen.go +++ b/src/aggregator/aggregator/counter_elem_gen.go @@ -31,7 +31,6 @@ import ( "time" maggregation "github.com/m3db/m3/src/metrics/aggregation" - "github.com/m3db/m3/src/metrics/metric" "github.com/m3db/m3/src/metrics/metric/id" "github.com/m3db/m3/src/metrics/metric/unaggregated" "github.com/m3db/m3/src/metrics/pipeline/applied" @@ -481,10 +480,10 @@ func (e *CounterElem) processValueWithAggregationLock( for _, point := range toFlush { switch e.idPrefixSuffixType { case NoPrefixNoSuffix: - flushLocalFn(nil, e.id, metric.CounterType, nil, point.TimeNanos, point.Value, e.sp) + flushLocalFn(nil, e.id, nil, point.TimeNanos, point.Value, e.sp) case WithPrefixWithSuffix: - flushLocalFn(e.FullPrefix(e.opts), e.id, metric.CounterType, - e.TypeStringFor(e.aggTypesOpts, aggType), point.TimeNanos, point.Value, e.sp) + flushLocalFn(e.FullPrefix(e.opts), e.id, e.TypeStringFor(e.aggTypesOpts, aggType), + point.TimeNanos, point.Value, e.sp) } } } else { diff --git a/src/aggregator/aggregator/elem_base_test.go b/src/aggregator/aggregator/elem_base_test.go index a13ab372bc..a277432a51 100644 --- a/src/aggregator/aggregator/elem_base_test.go +++ b/src/aggregator/aggregator/elem_base_test.go @@ -203,9 +203,9 @@ func TestCounterElemBaseResetSetData(t *testing.T) { func TestCounterElemBaseResetSetDataInvalidTypes(t *testing.T) { e := counterElemBase{} - err := e.ResetSetData(nil, maggregation.Types{maggregation.P10}, false) + err := e.ResetSetData(nil, maggregation.Types{maggregation.Last}, false) require.Error(t, err) - require.True(t, strings.Contains(err.Error(), "invalid aggregation types P10 for counter")) + require.True(t, strings.Contains(err.Error(), "invalid aggregation types Last for counter")) } func TestTimerElemBase(t *testing.T) { diff --git a/src/aggregator/aggregator/elem_test.go b/src/aggregator/aggregator/elem_test.go index a87130d10c..845c47c13a 100644 --- a/src/aggregator/aggregator/elem_test.go +++ b/src/aggregator/aggregator/elem_test.go @@ -158,10 +158,12 @@ func TestCounterResetSetData(t *testing.T) { func TestCounterResetSetDataInvalidAggregationType(t *testing.T) { opts := NewOptions() - ce := MustNewCounterElem(nil, policy.EmptyStoragePolicy, maggregation.DefaultTypes, - applied.DefaultPipeline, testNumForwardedTimes, NoPrefixNoSuffix, opts) - err := ce.ResetSetData(testCounterID, testStoragePolicy, maggregation.Types{maggregation.P10}, - applied.DefaultPipeline, 0, NoPrefixNoSuffix) + ce := MustNewCounterElem(nil, policy.EmptyStoragePolicy, + maggregation.DefaultTypes, applied.DefaultPipeline, + testNumForwardedTimes, NoPrefixNoSuffix, opts) + err := ce.ResetSetData(testCounterID, testStoragePolicy, + maggregation.Types{maggregation.Last}, applied.DefaultPipeline, + 0, NoPrefixNoSuffix) require.Error(t, err) } @@ -1812,7 +1814,6 @@ func testFlushLocalMetricFn() ( return func( idPrefix []byte, id id.RawID, - metricType metric.Type, idSuffix []byte, timeNanos int64, value float64, diff --git a/src/aggregator/aggregator/flush.go b/src/aggregator/aggregator/flush.go index 1f93fa1621..e5d41e037e 100644 --- a/src/aggregator/aggregator/flush.go +++ b/src/aggregator/aggregator/flush.go @@ -23,7 +23,6 @@ package aggregator import ( "time" - "github.com/m3db/m3/src/metrics/metric" "github.com/m3db/m3/src/metrics/metric/id" "github.com/m3db/m3/src/metrics/policy" ) @@ -84,7 +83,6 @@ const ( type flushLocalMetricFn func( idPrefix []byte, id id.RawID, - metricType metric.Type, idSuffix []byte, timeNanos int64, value float64, diff --git a/src/aggregator/aggregator/gauge_elem_gen.go b/src/aggregator/aggregator/gauge_elem_gen.go index a845a296a2..20efbc03fa 100644 --- a/src/aggregator/aggregator/gauge_elem_gen.go +++ b/src/aggregator/aggregator/gauge_elem_gen.go @@ -31,7 +31,6 @@ import ( "time" maggregation "github.com/m3db/m3/src/metrics/aggregation" - "github.com/m3db/m3/src/metrics/metric" "github.com/m3db/m3/src/metrics/metric/id" "github.com/m3db/m3/src/metrics/metric/unaggregated" "github.com/m3db/m3/src/metrics/pipeline/applied" @@ -481,10 +480,10 @@ func (e *GaugeElem) processValueWithAggregationLock( for _, point := range toFlush { switch e.idPrefixSuffixType { case NoPrefixNoSuffix: - flushLocalFn(nil, e.id, metric.GaugeType, nil, point.TimeNanos, point.Value, e.sp) + flushLocalFn(nil, e.id, nil, point.TimeNanos, point.Value, e.sp) case WithPrefixWithSuffix: - flushLocalFn(e.FullPrefix(e.opts), e.id, metric.GaugeType, - e.TypeStringFor(e.aggTypesOpts, aggType), point.TimeNanos, point.Value, e.sp) + flushLocalFn(e.FullPrefix(e.opts), e.id, e.TypeStringFor(e.aggTypesOpts, aggType), + point.TimeNanos, point.Value, e.sp) } } } else { diff --git a/src/aggregator/aggregator/generic_elem.go b/src/aggregator/aggregator/generic_elem.go index 4a1370c83c..eab77d6b34 100644 --- a/src/aggregator/aggregator/generic_elem.go +++ b/src/aggregator/aggregator/generic_elem.go @@ -537,10 +537,10 @@ func (e *GenericElem) processValueWithAggregationLock( for _, point := range toFlush { switch e.idPrefixSuffixType { case NoPrefixNoSuffix: - flushLocalFn(nil, e.id, metric.GaugeType, nil, point.TimeNanos, point.Value, e.sp) + flushLocalFn(nil, e.id, nil, point.TimeNanos, point.Value, e.sp) case WithPrefixWithSuffix: - flushLocalFn(e.FullPrefix(e.opts), e.id, metric.GaugeType, - e.TypeStringFor(e.aggTypesOpts, aggType), point.TimeNanos, point.Value, e.sp) + flushLocalFn(e.FullPrefix(e.opts), e.id, e.TypeStringFor(e.aggTypesOpts, aggType), + point.TimeNanos, point.Value, e.sp) } } } else { diff --git a/src/aggregator/aggregator/handler/writer/protobuf.go b/src/aggregator/aggregator/handler/writer/protobuf.go index abe84608f3..f00f8d3564 100644 --- a/src/aggregator/aggregator/handler/writer/protobuf.go +++ b/src/aggregator/aggregator/handler/writer/protobuf.go @@ -133,7 +133,6 @@ func (w *protobufWriter) prepare(mp aggregated.ChunkedMetricWithStoragePolicy) ( w.m.ID = append(w.m.ID, mp.Suffix...) w.m.Metric.TimeNanos = mp.TimeNanos w.m.Metric.Value = mp.Value - w.m.Metric.Type = mp.Type w.m.StoragePolicy = mp.StoragePolicy shard := w.shardFn(w.m.ID, w.numShards) return w.m, shard diff --git a/src/aggregator/aggregator/list.go b/src/aggregator/aggregator/list.go index 20f51b6d2c..ea38acfcc3 100644 --- a/src/aggregator/aggregator/list.go +++ b/src/aggregator/aggregator/list.go @@ -30,7 +30,6 @@ import ( "github.com/m3db/m3/src/aggregator/aggregator/handler" "github.com/m3db/m3/src/aggregator/aggregator/handler/writer" - "github.com/m3db/m3/src/metrics/metric" "github.com/m3db/m3/src/metrics/metric/aggregated" metricid "github.com/m3db/m3/src/metrics/metric/id" "github.com/m3db/m3/src/metrics/policy" @@ -435,7 +434,6 @@ func (l *baseMetricList) flushBefore(beforeNanos int64, flushType flushType) { func (l *baseMetricList) consumeLocalMetric( idPrefix []byte, id metricid.RawID, - metricType metric.Type, idSuffix []byte, timeNanos int64, value float64, @@ -449,7 +447,6 @@ func (l *baseMetricList) consumeLocalMetric( chunkedMetricWithPolicy := aggregated.ChunkedMetricWithStoragePolicy{ ChunkedMetric: aggregated.ChunkedMetric{ ChunkedID: chunkedID, - Type: metricType, TimeNanos: timeNanos, Value: value, }, @@ -466,7 +463,6 @@ func (l *baseMetricList) consumeLocalMetric( func (l *baseMetricList) discardLocalMetric( idPrefix []byte, id metricid.RawID, - metricType metric.Type, idSuffix []byte, timeNanos int64, value float64, diff --git a/src/aggregator/aggregator/list_test.go b/src/aggregator/aggregator/list_test.go index 9a523521d3..67e61f1c4b 100644 --- a/src/aggregator/aggregator/list_test.go +++ b/src/aggregator/aggregator/list_test.go @@ -604,7 +604,6 @@ func TestTimedMetricListFlushConsumingAndCollectingTimedMetrics(t *testing.T) { ChunkedID: id.ChunkedID{ Data: ep.metric.ID, }, - Type: ep.metric.Type, TimeNanos: alignedStart, Value: ep.metric.Value, }, @@ -1057,7 +1056,6 @@ func TestForwardedMetricListLastStepLocalFlush(t *testing.T) { Prefix: ep.expectedPrefix, Data: ep.metric.ID, }, - Type: ep.metric.Type, TimeNanos: alignedStart, Value: ep.metric.Values[0], }, diff --git a/src/aggregator/aggregator/timer_elem_gen.go b/src/aggregator/aggregator/timer_elem_gen.go index cf46acc790..52e0d2be88 100644 --- a/src/aggregator/aggregator/timer_elem_gen.go +++ b/src/aggregator/aggregator/timer_elem_gen.go @@ -31,7 +31,6 @@ import ( "time" maggregation "github.com/m3db/m3/src/metrics/aggregation" - "github.com/m3db/m3/src/metrics/metric" "github.com/m3db/m3/src/metrics/metric/id" "github.com/m3db/m3/src/metrics/metric/unaggregated" "github.com/m3db/m3/src/metrics/pipeline/applied" @@ -481,10 +480,10 @@ func (e *TimerElem) processValueWithAggregationLock( for _, point := range toFlush { switch e.idPrefixSuffixType { case NoPrefixNoSuffix: - flushLocalFn(nil, e.id, metric.GaugeType, nil, point.TimeNanos, point.Value, e.sp) + flushLocalFn(nil, e.id, nil, point.TimeNanos, point.Value, e.sp) case WithPrefixWithSuffix: - flushLocalFn(e.FullPrefix(e.opts), e.id, metric.GaugeType, - e.TypeStringFor(e.aggTypesOpts, aggType), point.TimeNanos, point.Value, e.sp) + flushLocalFn(e.FullPrefix(e.opts), e.id, e.TypeStringFor(e.aggTypesOpts, aggType), + point.TimeNanos, point.Value, e.sp) } } } else { diff --git a/src/aggregator/generated-source-files.mk b/src/aggregator/generated-source-files.mk index d95dbd85d3..df9293c791 100644 --- a/src/aggregator/generated-source-files.mk +++ b/src/aggregator/generated-source-files.mk @@ -13,7 +13,6 @@ genny-all: genny-aggregator-counter-elem genny-aggregator-timer-elem genny-aggre genny-aggregator-counter-elem: cat $(m3db_package_path)/src/aggregator/aggregator/generic_elem.go \ | awk '/^package/{i++}i' \ - | sed 's/metric.GaugeType/metric.CounterType/' \ | genny -out=$(m3db_package_path)/src/aggregator/aggregator/counter_elem_gen.go -pkg=aggregator gen \ "timedAggregation=timedCounter lockedAggregation=lockedCounterAggregation typeSpecificAggregation=counterAggregation typeSpecificElemBase=counterElemBase genericElemPool=CounterElemPool GenericElem=CounterElem" diff --git a/src/cmd/services/m3coordinator/ingest/m3msg/config.go b/src/cmd/services/m3coordinator/ingest/m3msg/config.go index 7ca43135d1..9d7c17cce2 100644 --- a/src/cmd/services/m3coordinator/ingest/m3msg/config.go +++ b/src/cmd/services/m3coordinator/ingest/m3msg/config.go @@ -46,9 +46,8 @@ func (cfg Configuration) NewIngester( appender storage.Appender, tagOptions models.TagOptions, instrumentOptions instrument.Options, - storeMetricsType bool, ) (*Ingester, error) { - opts, err := cfg.newOptions(appender, tagOptions, instrumentOptions, storeMetricsType) + opts, err := cfg.newOptions(appender, tagOptions, instrumentOptions) if err != nil { return nil, err } @@ -59,7 +58,6 @@ func (cfg Configuration) newOptions( appender storage.Appender, tagOptions models.TagOptions, instrumentOptions instrument.Options, - storeMetricsType bool, ) (Options, error) { scope := instrumentOptions.MetricsScope().Tagged( map[string]string{"component": "ingester"}, @@ -100,6 +98,5 @@ func (cfg Configuration) newOptions( RetryOptions: cfg.Retry.NewOptions(scope), Sampler: sampler, InstrumentOptions: instrumentOptions, - StoreMetricsType: storeMetricsType, }, nil } diff --git a/src/cmd/services/m3coordinator/ingest/m3msg/ingest.go b/src/cmd/services/m3coordinator/ingest/m3msg/ingest.go index ca2920197f..8105bd9b3c 100644 --- a/src/cmd/services/m3coordinator/ingest/m3msg/ingest.go +++ b/src/cmd/services/m3coordinator/ingest/m3msg/ingest.go @@ -56,7 +56,6 @@ type Options struct { Sampler *sampler.Sampler InstrumentOptions instrument.Options TagOptions models.TagOptions - StoreMetricsType bool } type ingestMetrics struct { @@ -100,15 +99,14 @@ func NewIngester( // pooled, but currently this is the only way to get tag decoder. tagDecoder := opts.TagDecoderPool.Get() op := ingestOp{ - storageAppender: opts.Appender, - retrier: retrier, - iter: serialize.NewMetricTagsIterator(tagDecoder, nil), - tagOpts: tagOpts, - pool: p, - metrics: m, - logger: opts.InstrumentOptions.Logger(), - sampler: opts.Sampler, - storeMetricsType: opts.StoreMetricsType, + s: opts.Appender, + r: retrier, + it: serialize.NewMetricTagsIterator(tagDecoder, nil), + tagOpts: tagOpts, + p: p, + m: m, + logger: opts.InstrumentOptions.Logger(), + sampler: opts.Sampler, } op.attemptFn = op.attempt op.ingestFn = op.ingest @@ -125,16 +123,14 @@ func NewIngester( func (i *Ingester) Ingest( ctx context.Context, id []byte, - metricType ts.PromMetricType, metricNanos, encodeNanos int64, value float64, sp policy.StoragePolicy, callback m3msg.Callbackable, ) { op := i.p.Get().(*ingestOp) - op.ctx = ctx + op.c = ctx op.id = id - op.metricType = metricType op.metricNanos = metricNanos op.value = value op.sp = sp @@ -143,28 +139,26 @@ func (i *Ingester) Ingest( } type ingestOp struct { - storageAppender storage.Appender - retrier retry.Retrier - iter id.SortedTagIterator - tagOpts models.TagOptions - pool pool.ObjectPool - metrics ingestMetrics - logger *zap.Logger - sampler *sampler.Sampler - attemptFn retry.Fn - ingestFn func() - storeMetricsType bool + s storage.Appender + r retry.Retrier + it id.SortedTagIterator + tagOpts models.TagOptions + p pool.ObjectPool + m ingestMetrics + logger *zap.Logger + sampler *sampler.Sampler + attemptFn retry.Fn + ingestFn func() - ctx context.Context + c context.Context id []byte - metricType ts.PromMetricType metricNanos int64 value float64 sp policy.StoragePolicy callback m3msg.Callbackable tags models.Tags datapoints ts.Datapoints - writeQuery storage.WriteQuery + q storage.WriteQuery } func (op *ingestOp) sample() bool { @@ -176,22 +170,22 @@ func (op *ingestOp) sample() bool { func (op *ingestOp) ingest() { if err := op.resetWriteQuery(); err != nil { - op.metrics.ingestInternalError.Inc(1) + op.m.ingestInternalError.Inc(1) op.callback.Callback(m3msg.OnRetriableError) - op.pool.Put(op) + op.p.Put(op) if op.sample() { op.logger.Error("could not reset ingest op", zap.Error(err)) } return } - if err := op.retrier.Attempt(op.attemptFn); err != nil { + if err := op.r.Attempt(op.attemptFn); err != nil { nonRetryableErr := xerrors.IsNonRetryableError(err) if nonRetryableErr { op.callback.Callback(m3msg.OnNonRetriableError) - op.metrics.ingestNonRetryableError.Inc(1) + op.m.ingestNonRetryableError.Inc(1) } else { op.callback.Callback(m3msg.OnRetriableError) - op.metrics.ingestInternalError.Inc(1) + op.m.ingestInternalError.Inc(1) } // NB(r): Always log non-retriable errors since they are usually @@ -203,16 +197,16 @@ func (op *ingestOp) ingest() { zap.Bool("retryableError", !nonRetryableErr)) } - op.pool.Put(op) + op.p.Put(op) return } - op.metrics.ingestSuccess.Inc(1) + op.m.ingestSuccess.Inc(1) op.callback.Callback(m3msg.OnSuccess) - op.pool.Put(op) + op.p.Put(op) } func (op *ingestOp) attempt() error { - return op.storageAppender.Write(op.ctx, &op.writeQuery) + return op.s.Write(op.c, &op.q) } func (op *ingestOp) resetWriteQuery() error { @@ -220,8 +214,7 @@ func (op *ingestOp) resetWriteQuery() error { return err } op.resetDataPoints() - - wq := storage.WriteQueryOptions{ + return op.q.Reset(storage.WriteQueryOptions{ Tags: op.tags, Datapoints: op.datapoints, Unit: convert.UnitForM3DB(op.sp.Resolution().Precision), @@ -230,50 +223,15 @@ func (op *ingestOp) resetWriteQuery() error { Resolution: op.sp.Resolution().Window, Retention: op.sp.Retention().Duration(), }, - } - - if op.storeMetricsType { - var err error - wq.Annotation, err = op.convertTypeToAnnotation(op.metricType) - if err != nil { - return err - } - } - - return op.writeQuery.Reset(wq) -} - -func (op *ingestOp) convertTypeToAnnotation(tp ts.PromMetricType) ([]byte, error) { - if tp == ts.PromMetricTypeUnknown { - return nil, nil - } - - handleValueResets := false - if tp == ts.PromMetricTypeCounter { - handleValueResets = true - } - - annotationPayload, err := storage.SeriesAttributesToAnnotationPayload(tp, handleValueResets) - if err != nil { - return nil, err - } - annot, err := annotationPayload.Marshal() - if err != nil { - return nil, err - } - - if len(annot) == 0 { - annot = nil - } - return annot, nil + }) } func (op *ingestOp) resetTags() error { - op.iter.Reset(op.id) + op.it.Reset(op.id) op.tags.Tags = op.tags.Tags[:0] op.tags.Opts = op.tagOpts - for op.iter.Next() { - name, value := op.iter.Current() + for op.it.Next() { + name, value := op.it.Current() // TODO_FIX_GRAPHITE_TAGGING: Using this string constant to track // all places worth fixing this hack. There is at least one @@ -283,7 +241,7 @@ func (op *ingestOp) resetTags() error { if bytes.Equal(value, downsample.GraphiteIDSchemeTagValue) && op.tags.Opts.IDSchemeType() != models.TypeGraphite { // Restart iteration with graphite tag options parsing - op.iter.Reset(op.id) + op.it.Reset(op.id) op.tags.Tags = op.tags.Tags[:0] op.tags.Opts = op.tags.Opts.SetIDSchemeType(models.TypeGraphite) } @@ -298,7 +256,7 @@ func (op *ingestOp) resetTags() error { }.Clone()) } op.tags.Normalize() - return op.iter.Err() + return op.it.Err() } func (op *ingestOp) resetDataPoints() { diff --git a/src/cmd/services/m3coordinator/ingest/m3msg/ingest_test.go b/src/cmd/services/m3coordinator/ingest/m3msg/ingest_test.go index 2e9336065f..5e9053231f 100644 --- a/src/cmd/services/m3coordinator/ingest/m3msg/ingest_test.go +++ b/src/cmd/services/m3coordinator/ingest/m3msg/ingest_test.go @@ -59,7 +59,7 @@ func TestIngest(t *testing.T) { } appender := &mockAppender{} ingester, err := cfg.NewIngester(appender, models.NewTagOptions(), - instrument.NewOptions(), true) + instrument.NewOptions()) require.NoError(t, err) id := newTestID(t, "__name__", "foo", "app", "bar") @@ -72,14 +72,14 @@ func TestIngest(t *testing.T) { callback := m3msg.NewProtobufCallback(m, protobuf.NewAggregatedDecoder(nil), &wg) m.EXPECT().Ack() - ingester.Ingest(context.TODO(), id, ts.PromMetricTypeGauge, metricNanos, 0, val, sp, callback) + ingester.Ingest(context.TODO(), id, metricNanos, 0, val, sp, callback) for appender.cnt() != 1 { time.Sleep(100 * time.Millisecond) } expected, err := storage.NewWriteQuery(storage.WriteQueryOptions{ - Annotation: []byte{8, 2}, + Annotation: nil, Attributes: storagemetadata.Attributes{ MetricsType: storagemetadata.AggregatedMetricsType, Resolution: time.Minute, @@ -131,7 +131,7 @@ func TestIngestNonRetryableError(t *testing.T) { nonRetryableError := xerrors.NewNonRetryableError(errors.New("bad request error")) appender := &mockAppender{expectErr: nonRetryableError} ingester, err := cfg.NewIngester(appender, models.NewTagOptions(), - instrumentOpts, true) + instrumentOpts) require.NoError(t, err) id := newTestID(t, "__name__", "foo", "app", "bar") @@ -144,7 +144,7 @@ func TestIngestNonRetryableError(t *testing.T) { callback := m3msg.NewProtobufCallback(m, protobuf.NewAggregatedDecoder(nil), &wg) m.EXPECT().Ack() - ingester.Ingest(context.TODO(), id, ts.PromMetricTypeGauge, metricNanos, 0, val, sp, callback) + ingester.Ingest(context.TODO(), id, metricNanos, 0, val, sp, callback) for appender.cntErr() != 1 { time.Sleep(100 * time.Millisecond) diff --git a/src/cmd/services/m3coordinator/ingest/write.go b/src/cmd/services/m3coordinator/ingest/write.go index 6d131f8e15..27cd4b477a 100644 --- a/src/cmd/services/m3coordinator/ingest/write.go +++ b/src/cmd/services/m3coordinator/ingest/write.go @@ -22,7 +22,6 @@ package ingest import ( "context" - "fmt" "sync" "github.com/m3db/m3/src/cmd/services/m3coordinator/downsample" @@ -496,26 +495,14 @@ func (d *downsamplerAndWriter) writeAggregatedBatch( } for _, dp := range value.Datapoints { - if value.Attributes.PromType != ts.PromMetricTypeUnknown { - switch value.Attributes.PromType { - case ts.PromMetricTypeCounter: - err = result.SamplesAppender.AppendCounterTimedSample(dp.Timestamp, int64(dp.Value)) - default: - err = result.SamplesAppender.AppendGaugeTimedSample(dp.Timestamp, dp.Value) - } - } else { - switch value.Attributes.M3Type { - case ts.M3MetricTypeGauge: - err = result.SamplesAppender.AppendGaugeTimedSample(dp.Timestamp, dp.Value) - case ts.M3MetricTypeCounter: - err = result.SamplesAppender.AppendCounterTimedSample(dp.Timestamp, int64(dp.Value)) - case ts.M3MetricTypeTimer: - err = result.SamplesAppender.AppendTimerTimedSample(dp.Timestamp, dp.Value) - default: - err = fmt.Errorf("unknown m3type '%v'", value.Attributes.M3Type) - } + switch value.Attributes.M3Type { + case ts.M3MetricTypeGauge: + err = result.SamplesAppender.AppendGaugeTimedSample(dp.Timestamp, dp.Value) + case ts.M3MetricTypeCounter: + err = result.SamplesAppender.AppendCounterTimedSample(dp.Timestamp, int64(dp.Value)) + case ts.M3MetricTypeTimer: + err = result.SamplesAppender.AppendTimerTimedSample(dp.Timestamp, dp.Value) } - if err != nil { // If we see an error break out so we can try processing the // next datapoint. diff --git a/src/cmd/services/m3coordinator/server/m3msg/protobuf_handler.go b/src/cmd/services/m3coordinator/server/m3msg/protobuf_handler.go index 65f2d6b6c5..ffca0e0c1d 100644 --- a/src/cmd/services/m3coordinator/server/m3msg/protobuf_handler.go +++ b/src/cmd/services/m3coordinator/server/m3msg/protobuf_handler.go @@ -122,7 +122,7 @@ func (h *pbHandler) Process(msg consumer.Message) { } } - h.writeFn(h.ctx, dec.ID(), dec.Type(), dec.TimeNanos(), dec.EncodeNanos(), dec.Value(), sp, r) + h.writeFn(h.ctx, dec.ID(), dec.TimeNanos(), dec.EncodeNanos(), dec.Value(), sp, r) } func (h *pbHandler) Close() { h.wg.Wait() } diff --git a/src/cmd/services/m3coordinator/server/m3msg/protobuf_handler_test.go b/src/cmd/services/m3coordinator/server/m3msg/protobuf_handler_test.go index 1770a2b757..02e411749a 100644 --- a/src/cmd/services/m3coordinator/server/m3msg/protobuf_handler_test.go +++ b/src/cmd/services/m3coordinator/server/m3msg/protobuf_handler_test.go @@ -35,7 +35,6 @@ import ( "github.com/m3db/m3/src/msg/consumer" "github.com/m3db/m3/src/msg/generated/proto/msgpb" "github.com/m3db/m3/src/msg/protocol/proto" - "github.com/m3db/m3/src/query/ts" "github.com/m3db/m3/src/x/instrument" "github.com/m3db/m3/src/x/server" xtime "github.com/m3db/m3/src/x/time" @@ -234,7 +233,6 @@ type mockWriter struct { func (m *mockWriter) write( ctx context.Context, name []byte, - metricType ts.PromMetricType, metricNanos, encodeNanos int64, value float64, sp policy.StoragePolicy, diff --git a/src/cmd/services/m3coordinator/server/m3msg/types.go b/src/cmd/services/m3coordinator/server/m3msg/types.go index 20c20c37fe..8d3a5f96be 100644 --- a/src/cmd/services/m3coordinator/server/m3msg/types.go +++ b/src/cmd/services/m3coordinator/server/m3msg/types.go @@ -24,14 +24,12 @@ import ( "context" "github.com/m3db/m3/src/metrics/policy" - "github.com/m3db/m3/src/query/ts" ) // WriteFn is the function that writes a metric. type WriteFn func( ctx context.Context, id []byte, - metricType ts.PromMetricType, metricNanos, encodeNanos int64, value float64, sp policy.StoragePolicy, diff --git a/src/metrics/aggregation/type.go b/src/metrics/aggregation/type.go index 9716dd560d..db10e274ae 100644 --- a/src/metrics/aggregation/type.go +++ b/src/metrics/aggregation/type.go @@ -164,7 +164,7 @@ func (a Type) IsValidForGauge() bool { // IsValidForCounter if an Type is valid for Counter. func (a Type) IsValidForCounter() bool { switch a { - case Min, Max, Mean, Count, Sum, SumSq, Stdev, Last: + case Min, Max, Mean, Count, Sum, SumSq, Stdev: return true default: return false diff --git a/src/metrics/encoding/protobuf/aggregated_decoder.go b/src/metrics/encoding/protobuf/aggregated_decoder.go index a9ee58e9b8..ca9052c44c 100644 --- a/src/metrics/encoding/protobuf/aggregated_decoder.go +++ b/src/metrics/encoding/protobuf/aggregated_decoder.go @@ -23,7 +23,6 @@ package protobuf import ( "github.com/m3db/m3/src/metrics/generated/proto/metricpb" "github.com/m3db/m3/src/metrics/policy" - "github.com/m3db/m3/src/query/ts" ) // AggregatedDecoder is a decoder for decoding aggregated metrics. @@ -53,16 +52,6 @@ func (d AggregatedDecoder) ID() []byte { return d.pb.Metric.TimedMetric.Id } -// Type returns the type of the metric. -func (d *AggregatedDecoder) Type() ts.PromMetricType { - switch d.pb.Metric.TimedMetric.Type { - case metricpb.MetricType_COUNTER: - return ts.PromMetricTypeCounter - default: - return ts.PromMetricTypeGauge - } -} - // TimeNanos returns the decoded timestamp. func (d AggregatedDecoder) TimeNanos() int64 { return d.pb.Metric.TimedMetric.TimeNanos diff --git a/src/metrics/metric/aggregated/types.go b/src/metrics/metric/aggregated/types.go index e4503a0c60..4415dfa039 100644 --- a/src/metrics/metric/aggregated/types.go +++ b/src/metrics/metric/aggregated/types.go @@ -81,7 +81,6 @@ func (m Metric) String() string { // ChunkedMetric is a metric with a chunked ID. type ChunkedMetric struct { id.ChunkedID - Type metric.Type TimeNanos int64 Value float64 } diff --git a/src/query/api/v1/handler/prometheus/remote/write.go b/src/query/api/v1/handler/prometheus/remote/write.go index 7e11b60130..61ec7918de 100644 --- a/src/query/api/v1/handler/prometheus/remote/write.go +++ b/src/query/api/v1/handler/prometheus/remote/write.go @@ -93,16 +93,6 @@ var ( Attributes: ts.DefaultSeriesAttributes(), Metadata: ts.Metadata{}, } - - headerToMetricType = map[string]prompb.MetricType{ - "counter": prompb.MetricType_COUNTER, - "gauge": prompb.MetricType_GAUGE, - "gauge-histogram": prompb.MetricType_GAUGE_HISTOGRAM, - "histogram": prompb.MetricType_HISTOGRAM, - "info": prompb.MetricType_INFO, - "stateset": prompb.MetricType_STATESET, - "summary": prompb.MetricType_SUMMARY, - } ) // PromWriteHandler represents a handler for prometheus write endpoint. @@ -471,16 +461,6 @@ func (h *PromWriteHandler) parseRequest( } } - if promType := r.Header.Get(headers.PromTypeHeader); promType != "" { - tp, ok := headerToMetricType[strings.ToLower(promType)] - if !ok { - return parseRequestResult{}, fmt.Errorf("unknown prom metric type %s", promType) - } - for i := range req.Timeseries { - req.Timeseries[i].Type = tp - } - } - return parseRequestResult{ Request: &req, Options: opts, @@ -621,9 +601,7 @@ func (i *promTSIter) Next() bool { return true } - annotationPayload, err := storage.SeriesAttributesToAnnotationPayload( - i.attributes[i.idx].PromType, - i.attributes[i.idx].HandleValueResets) + annotationPayload, err := storage.SeriesAttributesToAnnotationPayload(i.attributes[i.idx]) if err != nil { i.err = err return false diff --git a/src/query/api/v1/handler/prometheus/remote/write_test.go b/src/query/api/v1/handler/prometheus/remote/write_test.go index 58b2a8d419..3f71305f35 100644 --- a/src/query/api/v1/handler/prometheus/remote/write_test.go +++ b/src/query/api/v1/handler/prometheus/remote/write_test.go @@ -86,89 +86,6 @@ func TestPromWriteParsing(t *testing.T) { require.Equal(t, ingest.WriteOptions{}, r.Options) } -func TestMetricTypeHeader(t *testing.T) { - tests := []struct { - headerValue string - expectedType prompb.MetricType - }{ - { - expectedType: prompb.MetricType_UNKNOWN, - }, - { - headerValue: "counter", - expectedType: prompb.MetricType_COUNTER, - }, - { - headerValue: "Counter", - expectedType: prompb.MetricType_COUNTER, - }, - { - headerValue: "gauge", - expectedType: prompb.MetricType_GAUGE, - }, - { - headerValue: "histogram", - expectedType: prompb.MetricType_HISTOGRAM, - }, - { - headerValue: "gauge-histogram", - expectedType: prompb.MetricType_GAUGE_HISTOGRAM, - }, - { - headerValue: "summary", - expectedType: prompb.MetricType_SUMMARY, - }, - { - headerValue: "info", - expectedType: prompb.MetricType_INFO, - }, - { - headerValue: "stateset", - expectedType: prompb.MetricType_STATESET, - }, - } - - ctrl := xtest.NewController(t) - defer ctrl.Finish() - - mockDownsamplerAndWriter := ingest.NewMockDownsamplerAndWriter(ctrl) - handlerOpts := makeOptions(mockDownsamplerAndWriter) - handler, err := NewPromWriteHandler(handlerOpts) - require.NoError(t, err) - - for _, testCase := range tests { - t.Run(testCase.headerValue, func(tt *testing.T) { - tc := testCase // nolint - promReq := test.GeneratePromWriteRequest() - promReqBody := test.GeneratePromWriteRequestBody(tt, promReq) - req := httptest.NewRequest(PromWriteHTTPMethod, PromWriteURL, promReqBody) - if tc.headerValue > "" { - req.Header.Add(headers.PromTypeHeader, tc.headerValue) - } - r, err := handler.(*PromWriteHandler).parseRequest(req) - require.NoError(tt, err) - require.Equal(tt, tc.expectedType, r.Request.Timeseries[0].Type) - }) - } -} - -func TestInvalidMetricTypeHeader(t *testing.T) { - ctrl := xtest.NewController(t) - defer ctrl.Finish() - - mockDownsamplerAndWriter := ingest.NewMockDownsamplerAndWriter(ctrl) - handlerOpts := makeOptions(mockDownsamplerAndWriter) - handler, err := NewPromWriteHandler(handlerOpts) - require.NoError(t, err) - - promReq := test.GeneratePromWriteRequest() - promReqBody := test.GeneratePromWriteRequestBody(t, promReq) - req := httptest.NewRequest(PromWriteHTTPMethod, PromWriteURL, promReqBody) - req.Header.Add(headers.PromTypeHeader, "random") - _, err = handler.(*PromWriteHandler).parseRequest(req) - require.Error(t, err) -} - func TestPromWrite(t *testing.T) { ctrl := xtest.NewController(t) defer ctrl.Finish() diff --git a/src/query/server/query.go b/src/query/server/query.go index 21cd32561b..5b3aae5121 100644 --- a/src/query/server/query.go +++ b/src/query/server/query.go @@ -550,15 +550,10 @@ func Run(runOpts RunOptions) { }() if cfg.Ingest != nil { - storeMetricsType := false - if cfg.StoreMetricsType != nil { - storeMetricsType = *cfg.StoreMetricsType - } - logger.Info("starting m3msg server", zap.String("address", cfg.Ingest.M3Msg.Server.ListenAddress)) ingester, err := cfg.Ingest.Ingester.NewIngester(backendStorage, - tagOptions, instrumentOptions, storeMetricsType) + tagOptions, instrumentOptions) if err != nil { logger.Fatal("unable to create ingester", zap.Error(err)) } diff --git a/src/query/server/query_test.go b/src/query/server/query_test.go index f79ccf05cb..d1f95e0171 100644 --- a/src/query/server/query_test.go +++ b/src/query/server/query_test.go @@ -116,8 +116,6 @@ writeWorkerPoolPolicy: size: 100 shards: 100 killProbability: 0.3 - -storeMetricsType: true ` func TestWrite(t *testing.T) { @@ -255,7 +253,7 @@ func TestIngest(t *testing.T) { gomock.Any(), 42.0, gomock.Any(), - []byte{8, 2}). + nil). Do(func(_, _, _, _, _, _, _ interface{}) { numWrites.Add(1) }) diff --git a/src/query/storage/converter.go b/src/query/storage/converter.go index 9302266006..c39d69a4c7 100644 --- a/src/query/storage/converter.go +++ b/src/query/storage/converter.go @@ -149,12 +149,11 @@ func PromTimeSeriesToSeriesAttributes(series prompb.TimeSeries) (ts.SeriesAttrib }, nil } -// SeriesAttributesToAnnotationPayload converts passed arguments into an annotation.Payload. -func SeriesAttributesToAnnotationPayload( - promType ts.PromMetricType, - handleValueResets bool) (annotation.Payload, error) { +// SeriesAttributesToAnnotationPayload converts ts.SeriesAttributes into an annotation.Payload. +func SeriesAttributesToAnnotationPayload(seriesAttributes ts.SeriesAttributes) (annotation.Payload, error) { var metricType annotation.MetricType - switch promType { + + switch seriesAttributes.PromType { case ts.PromMetricTypeUnknown: metricType = annotation.MetricType_UNKNOWN @@ -180,12 +179,12 @@ func SeriesAttributesToAnnotationPayload( metricType = annotation.MetricType_STATESET default: - return annotation.Payload{}, fmt.Errorf("invalid Prometheus metric type %v", promType) + return annotation.Payload{}, fmt.Errorf("invalid Prometheus metric type %v", seriesAttributes.PromType) } return annotation.Payload{ MetricType: metricType, - HandleValueResets: handleValueResets, + HandleValueResets: seriesAttributes.HandleValueResets, }, nil } diff --git a/src/query/storage/converter_test.go b/src/query/storage/converter_test.go index 88a68ef72e..ca808ab4f5 100644 --- a/src/query/storage/converter_test.go +++ b/src/query/storage/converter_test.go @@ -385,19 +385,19 @@ func TestSeriesAttributesToAnnotationPayload(t *testing.T) { } for promType, expected := range mapping { - payload, err := SeriesAttributesToAnnotationPayload(promType, false) + payload, err := SeriesAttributesToAnnotationPayload(ts.SeriesAttributes{PromType: promType}) require.NoError(t, err) assert.Equal(t, expected, payload.MetricType) } - _, err := SeriesAttributesToAnnotationPayload(math.MaxUint8, false) + _, err := SeriesAttributesToAnnotationPayload(ts.SeriesAttributes{PromType: math.MaxUint8}) require.Error(t, err) - payload, err := SeriesAttributesToAnnotationPayload(0, true) + payload, err := SeriesAttributesToAnnotationPayload(ts.SeriesAttributes{HandleValueResets: true}) require.NoError(t, err) assert.True(t, payload.HandleValueResets) - payload, err = SeriesAttributesToAnnotationPayload(0, false) + payload, err = SeriesAttributesToAnnotationPayload(ts.SeriesAttributes{HandleValueResets: false}) require.NoError(t, err) assert.False(t, payload.HandleValueResets) } diff --git a/src/x/headers/headers.go b/src/x/headers/headers.go index 579ac13080..65a4a74fb8 100644 --- a/src/x/headers/headers.go +++ b/src/x/headers/headers.go @@ -43,11 +43,6 @@ const ( // Valid values are "unaggregated" or "aggregated". MetricsTypeHeader = M3HeaderPrefix + "Metrics-Type" - // PromTypeHeader sets the prometheus metric type. Valid values are - // "counter", "gauge", etc. (see src/query/api/v1/handler/prometheus/remote/write.go - // field `headerToMetricType`) - PromTypeHeader = M3HeaderPrefix + "Prom-Type" - // WriteTypeHeader is a header that controls if default // writes should be written to both unaggregated and aggregated // namespaces, or if unaggregated values are skipped and