From 146cad60aac8f40a424117f87b7c8afea0df4b92 Mon Sep 17 00:00:00 2001 From: Vytenis Darulis Date: Fri, 15 Jan 2021 12:56:27 -0500 Subject: [PATCH 1/6] [cluster] Fix flaky watchmanager tests (#3091) --- src/cluster/etcd/watchmanager/manager_test.go | 16 +++++++++----- src/cluster/kv/etcd/store_test.go | 22 ------------------- 2 files changed, 10 insertions(+), 28 deletions(-) diff --git a/src/cluster/etcd/watchmanager/manager_test.go b/src/cluster/etcd/watchmanager/manager_test.go index 3cd9e1646c..0a28d232df 100644 --- a/src/cluster/etcd/watchmanager/manager_test.go +++ b/src/cluster/etcd/watchmanager/manager_test.go @@ -38,11 +38,11 @@ import ( ) func TestWatchChan(t *testing.T) { - t.Parallel() wh, ecluster, _, _, _, closer := testCluster(t) //nolint:dogsled defer closer() ec := ecluster.RandClient() + integration.WaitClientV3(t, ec) wc, _, err := wh.watchChanWithTimeout("foo", 0) require.NoError(t, err) @@ -67,9 +67,9 @@ func TestWatchChan(t *testing.T) { } func TestWatchSimple(t *testing.T) { - t.Parallel() wh, ec, updateCalled, shouldStop, doneCh, closer := testSetup(t) defer closer() + integration.WaitClientV3(t, ec) require.Equal(t, int32(0), atomic.LoadInt32(updateCalled)) go wh.Watch("foo") @@ -115,11 +115,11 @@ func TestWatchSimple(t *testing.T) { } func TestWatchRecreate(t *testing.T) { - t.Parallel() wh, ecluster, updateCalled, shouldStop, doneCh, closer := testCluster(t) defer closer() ec := ecluster.RandClient() + integration.WaitClientV3(t, ec) failTotal := 1 wh.opts = wh.opts. @@ -165,7 +165,6 @@ func TestWatchRecreate(t *testing.T) { } func TestWatchNoLeader(t *testing.T) { - t.Parallel() const ( watchInitAndRetryDelay = 200 * time.Millisecond watchCheckInterval = 50 * time.Millisecond @@ -210,6 +209,8 @@ func TestWatchNoLeader(t *testing.T) { SetWatchChanResetInterval(watchInitAndRetryDelay). SetWatchChanCheckInterval(watchCheckInterval) + integration.WaitClientV3(t, ec) + wh, err := NewWatchManager(opts) require.NoError(t, err) @@ -234,11 +235,13 @@ func TestWatchNoLeader(t *testing.T) { require.NoError(t, ecluster.Members[1].Restart(t)) require.NoError(t, ecluster.Members[2].Restart(t)) + // wait for leader + election delay just in case time.Sleep(time.Duration(3*ecluster.Members[0].ElectionTicks) * tickDuration) leaderIdx = ecluster.WaitLeader(t) require.True(t, leaderIdx >= 0 && leaderIdx < len(ecluster.Members), "got invalid leader") + integration.WaitClientV3(t, ec) // wait for client to be ready again _, err = ec.Put(context.Background(), "foo", "baz") require.NoError(t, err) @@ -246,7 +249,7 @@ func TestWatchNoLeader(t *testing.T) { // give some time for watch to be updated require.True(t, clock.WaitUntil(func() bool { return atomic.LoadInt32(&updateCalled) >= 2 - }, 30*time.Second)) + }, 10*time.Second)) updates := atomic.LoadInt32(&updateCalled) if updates < 2 { @@ -269,10 +272,11 @@ func TestWatchNoLeader(t *testing.T) { } func TestWatchCompactedRevision(t *testing.T) { - t.Parallel() wh, ec, updateCalled, shouldStop, doneCh, closer := testSetup(t) defer closer() + integration.WaitClientV3(t, ec) + ts := tally.NewTestScope("", nil) errC := ts.Counter("errors") wh.m.etcdWatchError = errC diff --git a/src/cluster/kv/etcd/store_test.go b/src/cluster/kv/etcd/store_test.go index 70fb5e10a9..de5b36bef7 100644 --- a/src/cluster/kv/etcd/store_test.go +++ b/src/cluster/kv/etcd/store_test.go @@ -84,8 +84,6 @@ func TestGetAndSet(t *testing.T) { } func TestNoCache(t *testing.T) { - t.Parallel() - ec, opts, closeFn := testStore(t) store, err := NewStore(ec, opts) @@ -152,8 +150,6 @@ func TestCacheDirCreation(t *testing.T) { } func TestCache(t *testing.T) { - t.Parallel() - ec, opts, closeFn := testStore(t) f, err := ioutil.TempFile("", "") @@ -206,8 +202,6 @@ func TestCache(t *testing.T) { } func TestSetIfNotExist(t *testing.T) { - t.Parallel() - ec, opts, closeFn := testStore(t) defer closeFn() @@ -227,8 +221,6 @@ func TestSetIfNotExist(t *testing.T) { } func TestCheckAndSet(t *testing.T) { - t.Parallel() - ec, opts, closeFn := testStore(t) defer closeFn() @@ -255,8 +247,6 @@ func TestCheckAndSet(t *testing.T) { } func TestWatchClose(t *testing.T) { - t.Parallel() - ec, opts, closeFn := testStore(t) defer closeFn() @@ -306,8 +296,6 @@ func TestWatchClose(t *testing.T) { } func TestWatchLastVersion(t *testing.T) { - t.Parallel() - ec, opts, closeFn := testStore(t) defer closeFn() @@ -350,8 +338,6 @@ func TestWatchLastVersion(t *testing.T) { } func TestWatchFromExist(t *testing.T) { - t.Parallel() - ec, opts, closeFn := testStore(t) defer closeFn() @@ -389,8 +375,6 @@ func TestWatchFromExist(t *testing.T) { } func TestWatchFromNotExist(t *testing.T) { - t.Parallel() - ec, opts, closeFn := testStore(t) defer closeFn() @@ -434,8 +418,6 @@ func TestGetFromKvNotFound(t *testing.T) { } func TestMultipleWatchesFromExist(t *testing.T) { - t.Parallel() - ec, opts, closeFn := testStore(t) defer closeFn() @@ -486,8 +468,6 @@ func TestMultipleWatchesFromExist(t *testing.T) { } func TestMultipleWatchesFromNotExist(t *testing.T) { - t.Parallel() - ec, opts, closeFn := testStore(t) defer closeFn() @@ -530,8 +510,6 @@ func TestMultipleWatchesFromNotExist(t *testing.T) { } func TestWatchNonBlocking(t *testing.T) { - t.Parallel() - ecluster, opts, closeFn := testCluster(t) defer closeFn() From 5b4f793d4e3feb80b427266da55c87cb710a649e Mon Sep 17 00:00:00 2001 From: Rob Skillington Date: Fri, 15 Jan 2021 16:29:39 -0500 Subject: [PATCH 2/6] [dbnode] Remove allocation per series ID when streaming block from disk (#3093) --- .../network/server/tchannelthrift/node/service.go | 6 ++++-- src/dbnode/persist/fs/retriever.go | 14 +++++++++++--- 2 files changed, 15 insertions(+), 5 deletions(-) diff --git a/src/dbnode/network/server/tchannelthrift/node/service.go b/src/dbnode/network/server/tchannelthrift/node/service.go index 8a7d7a0f6a..5b60d27804 100644 --- a/src/dbnode/network/server/tchannelthrift/node/service.go +++ b/src/dbnode/network/server/tchannelthrift/node/service.go @@ -808,12 +808,14 @@ func (s *service) fetchReadEncoded(ctx context.Context, // Re-use reader and id for more memory-efficient processing of // tags from doc.Metadata reader := docs.NewEncodedDocumentReader() - id := ident.NewReusableBytesID() for _, entry := range results.Map().Iter() { idx := i i++ - id.Reset(entry.Key()) + // NB(r): Use a bytes ID here so that this ID doesn't need to be + // copied by the blockRetriever in the streamRequest method when + // it checks if the ID is finalizeable or not with IsNoFinalize. + id := ident.BytesID(entry.Key()) d := entry.Value() metadata, err := docs.MetadataFromDocument(d, reader) diff --git a/src/dbnode/persist/fs/retriever.go b/src/dbnode/persist/fs/retriever.go index a64b76da83..a5bfab3511 100644 --- a/src/dbnode/persist/fs/retriever.go +++ b/src/dbnode/persist/fs/retriever.go @@ -565,9 +565,17 @@ func (r *blockRetriever) streamRequest( nsCtx namespace.Context, ) (bool, error) { req.shard = shard - // NB(r): Clone the ID as we're not positive it will stay valid throughout - // the lifecycle of the async request. - req.id = r.idPool.Clone(id) + + // NB(r): If the ID is a ident.BytesID then we can just hold + // onto this ID. + seriesID := id + if !seriesID.IsNoFinalize() { + // NB(r): Clone the ID as we're not positive it will stay valid throughout + // the lifecycle of the async request. + seriesID = r.idPool.Clone(id) + } + + req.id = seriesID req.start = startTime req.blockSize = r.blockSize From acbe5331a670d492bf2aefd318607eac11babd70 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Linas=20Med=C5=BEi=C5=ABnas?= Date: Mon, 18 Jan 2021 11:27:51 +0200 Subject: [PATCH 3/6] [dbnode] Remove reverse index sharing leftovers (#3095) --- src/dbnode/storage/index.go | 37 +---- src/dbnode/storage/index_test.go | 18 --- src/dbnode/storage/readonly_index_proxy.go | 134 ------------------ .../storage/readonly_index_proxy_test.go | 128 ----------------- src/dbnode/storage/storage_mock.go | 14 +- src/dbnode/storage/types.go | 3 - 6 files changed, 8 insertions(+), 326 deletions(-) delete mode 100644 src/dbnode/storage/readonly_index_proxy.go delete mode 100644 src/dbnode/storage/readonly_index_proxy_test.go diff --git a/src/dbnode/storage/index.go b/src/dbnode/storage/index.go index b1481c1c34..df35e4dd80 100644 --- a/src/dbnode/storage/index.go +++ b/src/dbnode/storage/index.go @@ -95,8 +95,6 @@ var ( type nsIndex struct { state nsIndexState - extendedRetentionPeriod time.Duration - // all the vars below this line are not modified past the ctor // and don't require a lock when being accessed. nowFn clock.NowFn @@ -658,7 +656,7 @@ func (i *nsIndex) writeBatches( blockSize = i.blockSize futureLimit = now.Add(1 * i.bufferFuture) pastLimit = now.Add(-1 * i.bufferPast) - earliestBlockStartToRetain = i.earliestBlockStartToRetainWithLock(now) + earliestBlockStartToRetain = retention.FlushTimeStartForRetentionPeriod(i.retentionPeriod, i.blockSize, now) batchOptions = batch.Options() forwardIndexDice = i.forwardIndexDice forwardIndexEnabled = forwardIndexDice.enabled @@ -866,7 +864,10 @@ func (i *nsIndex) Bootstrapped() bool { } func (i *nsIndex) Tick(c context.Cancellable, startTime time.Time) (namespaceIndexTickResult, error) { - var result namespaceIndexTickResult + var ( + result = namespaceIndexTickResult{} + earliestBlockStartToRetain = retention.FlushTimeStartForRetentionPeriod(i.retentionPeriod, i.blockSize, startTime) + ) i.state.Lock() defer func() { @@ -874,8 +875,6 @@ func (i *nsIndex) Tick(c context.Cancellable, startTime time.Time) (namespaceInd i.state.Unlock() }() - earliestBlockStartToRetain := i.earliestBlockStartToRetainWithLock(startTime) - result.NumBlocks = int64(len(i.state.blocksByTime)) var multiErr xerrors.MultiError @@ -1033,7 +1032,7 @@ func (i *nsIndex) flushableBlocks( flushable := make([]index.Block, 0, len(i.state.blocksByTime)) now := i.nowFn() - earliestBlockStartToRetain := i.earliestBlockStartToRetainWithLock(now) + earliestBlockStartToRetain := retention.FlushTimeStartForRetentionPeriod(i.retentionPeriod, i.blockSize, now) currentBlockStart := now.Truncate(i.blockSize) // Check for flushable blocks by iterating through all block starts w/in retention. for blockStart := earliestBlockStartToRetain; blockStart.Before(currentBlockStart); blockStart = blockStart.Add(i.blockSize) { @@ -1973,7 +1972,7 @@ func (i *nsIndex) CleanupExpiredFileSets(t time.Time) error { } // earliest block to retain based on retention period - earliestBlockStartToRetain := i.earliestBlockStartToRetainWithLock(t) + earliestBlockStartToRetain := retention.FlushTimeStartForRetentionPeriod(i.retentionPeriod, i.blockSize, t) // now we loop through the blocks we hold, to ensure we don't delete any data for them. for t := range i.state.blocksByTime { @@ -2182,28 +2181,6 @@ func (i *nsIndex) unableToAllocBlockInvariantError(err error) error { return ierr } -func (i *nsIndex) SetExtendedRetentionPeriod(period time.Duration) { - i.state.Lock() - defer i.state.Unlock() - - if period > i.extendedRetentionPeriod { - i.extendedRetentionPeriod = period - } -} - -func (i *nsIndex) effectiveRetentionPeriodWithLock() time.Duration { - period := i.retentionPeriod - if i.extendedRetentionPeriod > period { - period = i.extendedRetentionPeriod - } - - return period -} - -func (i *nsIndex) earliestBlockStartToRetainWithLock(t time.Time) time.Time { - return retention.FlushTimeStartForRetentionPeriod(i.effectiveRetentionPeriodWithLock(), i.blockSize, t) -} - type nsIndexMetrics struct { asyncInsertAttemptTotal tally.Counter asyncInsertAttemptSkip tally.Counter diff --git a/src/dbnode/storage/index_test.go b/src/dbnode/storage/index_test.go index b04bb5b82c..8ba96d87bf 100644 --- a/src/dbnode/storage/index_test.go +++ b/src/dbnode/storage/index_test.go @@ -378,24 +378,6 @@ func TestNamespaceIndexQueryNoMatchingBlocks(t *testing.T) { require.NoError(t, err) } -func TestNamespaceIndexSetExtendedRetentionPeriod(t *testing.T) { - ctrl := gomock.NewController(xtest.Reporter{T: t}) - defer ctrl.Finish() - - idx := newTestIndex(t, ctrl).index.(*nsIndex) - originalRetention := idx.retentionPeriod - - assert.Equal(t, originalRetention, idx.effectiveRetentionPeriodWithLock()) - - longerRetention := originalRetention + time.Minute - idx.SetExtendedRetentionPeriod(longerRetention) - assert.Equal(t, longerRetention, idx.effectiveRetentionPeriodWithLock()) - - shorterRetention := longerRetention - time.Second - idx.SetExtendedRetentionPeriod(shorterRetention) - assert.Equal(t, longerRetention, idx.effectiveRetentionPeriodWithLock()) -} - func verifyFlushForShards( t *testing.T, ctrl *gomock.Controller, diff --git a/src/dbnode/storage/readonly_index_proxy.go b/src/dbnode/storage/readonly_index_proxy.go deleted file mode 100644 index fec2bc8eea..0000000000 --- a/src/dbnode/storage/readonly_index_proxy.go +++ /dev/null @@ -1,134 +0,0 @@ -// Copyright (c) 2020 Uber Technologies, Inc. -// -// Permission is hereby granted, free of charge, to any person obtaining a copy -// of this software and associated documentation files (the "Software"), to deal -// in the Software without restriction, including without limitation the rights -// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -// copies of the Software, and to permit persons to whom the Software is -// furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -// THE SOFTWARE. - -package storage - -import ( - "errors" - "time" - - "github.com/m3db/m3/src/dbnode/persist" - "github.com/m3db/m3/src/dbnode/sharding" - "github.com/m3db/m3/src/dbnode/storage/bootstrap/result" - "github.com/m3db/m3/src/dbnode/storage/index" - "github.com/m3db/m3/src/dbnode/ts/writes" - "github.com/m3db/m3/src/x/context" - "github.com/m3db/m3/src/x/ident" - xtime "github.com/m3db/m3/src/x/time" -) - -var errNamespaceIndexReadOnly = errors.New("write operation on read only namespace index") - -type readOnlyIndexProxy struct { - underlying NamespaceIndex -} - -func (r readOnlyIndexProxy) AssignShardSet(shardSet sharding.ShardSet) {} - -func (r readOnlyIndexProxy) BlockStartForWriteTime(writeTime time.Time) xtime.UnixNano { - return r.underlying.BlockStartForWriteTime(writeTime) -} - -func (r readOnlyIndexProxy) BlockForBlockStart(blockStart time.Time) (index.Block, error) { - return r.underlying.BlockForBlockStart(blockStart) -} - -func (r readOnlyIndexProxy) WriteBatch(batch *index.WriteBatch) error { - return errNamespaceIndexReadOnly -} - -func (r readOnlyIndexProxy) WritePending(pending []writes.PendingIndexInsert) error { - return errNamespaceIndexReadOnly -} - -func (r readOnlyIndexProxy) Query( - ctx context.Context, - query index.Query, - opts index.QueryOptions, -) (index.QueryResult, error) { - return r.underlying.Query(ctx, query, opts) -} - -func (r readOnlyIndexProxy) AggregateQuery( - ctx context.Context, - query index.Query, - opts index.AggregationOptions, -) (index.AggregateQueryResult, error) { - return r.underlying.AggregateQuery(ctx, query, opts) -} - -func (r readOnlyIndexProxy) WideQuery( - ctx context.Context, - query index.Query, - collector chan *ident.IDBatch, - opts index.WideQueryOptions, -) error { - return r.underlying.WideQuery(ctx, query, collector, opts) -} - -func (r readOnlyIndexProxy) Bootstrap(bootstrapResults result.IndexResults) error { - return nil -} - -func (r readOnlyIndexProxy) Bootstrapped() bool { - return r.underlying.Bootstrapped() -} - -func (r readOnlyIndexProxy) CleanupExpiredFileSets(t time.Time) error { - return nil -} - -func (r readOnlyIndexProxy) CleanupDuplicateFileSets() error { - return nil -} - -func (r readOnlyIndexProxy) Tick(c context.Cancellable, startTime time.Time) (namespaceIndexTickResult, error) { - return namespaceIndexTickResult{}, nil -} - -func (r readOnlyIndexProxy) WarmFlush(flush persist.IndexFlush, shards []databaseShard) error { - return nil -} - -func (r readOnlyIndexProxy) ColdFlush(shards []databaseShard) (OnColdFlushDone, error) { - return noopOnColdFlushDone, nil -} - -func (r readOnlyIndexProxy) SetExtendedRetentionPeriod(period time.Duration) { - r.underlying.SetExtendedRetentionPeriod(period) -} - -func (r readOnlyIndexProxy) DebugMemorySegments(opts DebugMemorySegmentsOptions) error { - return r.underlying.DebugMemorySegments(opts) -} - -func (r readOnlyIndexProxy) Close() error { - return nil -} - -// NewReadOnlyIndexProxy builds a new NamespaceIndex that proxies only read -// operations, and no-ops on write operations. -func NewReadOnlyIndexProxy(underlying NamespaceIndex) NamespaceIndex { - return readOnlyIndexProxy{underlying: underlying} -} - -func noopOnColdFlushDone() error { - return nil -} diff --git a/src/dbnode/storage/readonly_index_proxy_test.go b/src/dbnode/storage/readonly_index_proxy_test.go deleted file mode 100644 index ea3518370d..0000000000 --- a/src/dbnode/storage/readonly_index_proxy_test.go +++ /dev/null @@ -1,128 +0,0 @@ -// Copyright (c) 2020 Uber Technologies, Inc. -// -// Permission is hereby granted, free of charge, to any person obtaining a copy -// of this software and associated documentation files (the "Software"), to deal -// in the Software without restriction, including without limitation the rights -// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -// copies of the Software, and to permit persons to whom the Software is -// furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -// THE SOFTWARE. - -package storage - -import ( - "errors" - "testing" - "time" - - "github.com/m3db/m3/src/dbnode/storage/index" - "github.com/m3db/m3/src/x/context" - "github.com/m3db/m3/src/x/ident" - xtime "github.com/m3db/m3/src/x/time" - - "github.com/golang/mock/gomock" - "github.com/stretchr/testify/assert" -) - -func TestReadOnlyIndexProxyReject(t *testing.T) { - ctrl := gomock.NewController(t) - defer ctrl.Finish() - - idx := NewMockNamespaceIndex(ctrl) - roIdx := NewReadOnlyIndexProxy(idx) - - assert.Equal(t, errNamespaceIndexReadOnly, roIdx.WriteBatch(nil)) - assert.Equal(t, errNamespaceIndexReadOnly, roIdx.WritePending(nil)) -} - -func TestReadOnlyIndexProxySuppress(t *testing.T) { - ctrl := gomock.NewController(t) - defer ctrl.Finish() - - idx := NewMockNamespaceIndex(ctrl) - roIdx := NewReadOnlyIndexProxy(idx) - - roIdx.AssignShardSet(nil) - - assert.NoError(t, roIdx.Bootstrap(nil)) - - assert.NoError(t, roIdx.CleanupExpiredFileSets(time.Now())) - - assert.NoError(t, roIdx.CleanupDuplicateFileSets()) - - res, err := roIdx.Tick(nil, time.Now()) - assert.Equal(t, namespaceIndexTickResult{}, res) - assert.NoError(t, err) - - assert.NoError(t, roIdx.WarmFlush(nil, nil)) - - _, err = roIdx.ColdFlush(nil) - assert.NoError(t, err) - - assert.NoError(t, roIdx.Close()) -} - -func TestReadOnlyIndexProxyDelegate(t *testing.T) { - ctrl := gomock.NewController(t) - defer ctrl.Finish() - - idx := NewMockNamespaceIndex(ctrl) - roIdx := NewReadOnlyIndexProxy(idx) - - now := time.Now().Truncate(time.Hour) - later := xtime.ToUnixNano(now.Add(time.Hour)) - testErr := errors.New("test error") - - idx.EXPECT().BlockStartForWriteTime(now).Return(later) - assert.Equal(t, later, roIdx.BlockStartForWriteTime(now)) - - block := index.NewMockBlock(ctrl) - idx.EXPECT().BlockForBlockStart(now).Return(block, testErr) - res, err := roIdx.BlockForBlockStart(now) - assert.Equal(t, testErr, err) - assert.Equal(t, block, res) - - ctx := context.NewContext() - query := index.Query{} - queryOpts := index.QueryOptions{} - queryRes := index.QueryResult{} - - idx.EXPECT().Query(ctx, query, queryOpts).Return(queryRes, testErr) - qRes, err := roIdx.Query(ctx, query, queryOpts) - assert.Equal(t, testErr, err) - assert.Equal(t, queryRes, qRes) - - aggOpts := index.AggregationOptions{} - aggRes := index.AggregateQueryResult{} - idx.EXPECT().AggregateQuery(ctx, query, aggOpts).Return(aggRes, testErr) - aRes, err := roIdx.AggregateQuery(ctx, query, aggOpts) - assert.Equal(t, testErr, err) - assert.Equal(t, aggRes, aRes) - - wideOpts := index.WideQueryOptions{} - ch := make(chan *ident.IDBatch) - idx.EXPECT().WideQuery(ctx, query, ch, wideOpts).Return(testErr) - err = roIdx.WideQuery(ctx, query, ch, wideOpts) - assert.Equal(t, testErr, err) - close(ch) - - idx.EXPECT().Bootstrapped().Return(true) - assert.True(t, roIdx.Bootstrapped()) - - idx.EXPECT().SetExtendedRetentionPeriod(time.Minute) - roIdx.SetExtendedRetentionPeriod(time.Minute) - - debugOpts := DebugMemorySegmentsOptions{} - idx.EXPECT().DebugMemorySegments(debugOpts).Return(testErr) - assert.Equal(t, testErr, roIdx.DebugMemorySegments(debugOpts)) -} diff --git a/src/dbnode/storage/storage_mock.go b/src/dbnode/storage/storage_mock.go index ff27a2feeb..3a3ef7e947 100644 --- a/src/dbnode/storage/storage_mock.go +++ b/src/dbnode/storage/storage_mock.go @@ -1,7 +1,7 @@ // Code generated by MockGen. DO NOT EDIT. // Source: github.com/m3db/m3/src/dbnode/storage/types.go -// Copyright (c) 2020 Uber Technologies, Inc. +// Copyright (c) 2021 Uber Technologies, Inc. // // Permission is hereby granted, free of charge, to any person obtaining a copy // of this software and associated documentation files (the "Software"), to deal @@ -2661,18 +2661,6 @@ func (mr *MockNamespaceIndexMockRecorder) ColdFlush(shards interface{}) *gomock. return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ColdFlush", reflect.TypeOf((*MockNamespaceIndex)(nil).ColdFlush), shards) } -// SetExtendedRetentionPeriod mocks base method -func (m *MockNamespaceIndex) SetExtendedRetentionPeriod(period time.Duration) { - m.ctrl.T.Helper() - m.ctrl.Call(m, "SetExtendedRetentionPeriod", period) -} - -// SetExtendedRetentionPeriod indicates an expected call of SetExtendedRetentionPeriod -func (mr *MockNamespaceIndexMockRecorder) SetExtendedRetentionPeriod(period interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetExtendedRetentionPeriod", reflect.TypeOf((*MockNamespaceIndex)(nil).SetExtendedRetentionPeriod), period) -} - // DebugMemorySegments mocks base method func (m *MockNamespaceIndex) DebugMemorySegments(opts DebugMemorySegmentsOptions) error { m.ctrl.T.Helper() diff --git a/src/dbnode/storage/types.go b/src/dbnode/storage/types.go index afdbde4492..be916df8f3 100644 --- a/src/dbnode/storage/types.go +++ b/src/dbnode/storage/types.go @@ -778,9 +778,6 @@ type NamespaceIndex interface { // cold flushing completes to perform houskeeping. ColdFlush(shards []databaseShard) (OnColdFlushDone, error) - // SetExtendedRetentionPeriod allows to extend index retention beyond the retention of the namespace it belongs to. - SetExtendedRetentionPeriod(period time.Duration) - // DebugMemorySegments allows for debugging memory segments. DebugMemorySegments(opts DebugMemorySegmentsOptions) error From 7ef646d5aba22f3bfd9071a24549e7039b246bee Mon Sep 17 00:00:00 2001 From: Ryan Hall Date: Mon, 18 Jan 2021 08:55:14 -0800 Subject: [PATCH 4/6] Limit for time series read from disk (#3094) * Limit for time series read from disk There is additional memory costs when time series need to be loaded from disk (in addition to the actual bytes being read). This new limit allows docLimits to be higher so series already in memory can easily be served without hitting limits. --- .../services/m3dbnode/config/config_test.go | 1 + src/cmd/services/m3dbnode/config/limits.go | 6 +++ src/dbnode/persist/fs/retriever.go | 44 +++++++++++-------- src/dbnode/persist/fs/retriever_test.go | 38 +++++++++++++++- src/dbnode/server/server.go | 6 +++ .../storage/limits/noop_query_limits.go | 4 ++ src/dbnode/storage/limits/options.go | 21 +++++++-- src/dbnode/storage/limits/query_limits.go | 37 ++++++++++------ .../storage/limits/query_limits_test.go | 28 ++++++++++-- src/dbnode/storage/limits/types.go | 8 ++++ 10 files changed, 153 insertions(+), 40 deletions(-) diff --git a/src/cmd/services/m3dbnode/config/config_test.go b/src/cmd/services/m3dbnode/config/config_test.go index 02ca66b97e..717dfbb87c 100644 --- a/src/cmd/services/m3dbnode/config/config_test.go +++ b/src/cmd/services/m3dbnode/config/config_test.go @@ -730,6 +730,7 @@ func TestConfiguration(t *testing.T) { meta_event_reporting_enabled: false limits: maxRecentlyQueriedSeriesDiskBytesRead: null + maxRecentlyQueriedSeriesDiskRead: null maxRecentlyQueriedSeriesBlocks: null maxOutstandingWriteRequests: 0 maxOutstandingReadRequests: 0 diff --git a/src/cmd/services/m3dbnode/config/limits.go b/src/cmd/services/m3dbnode/config/limits.go index 29c6dafe78..eb26cb0de8 100644 --- a/src/cmd/services/m3dbnode/config/limits.go +++ b/src/cmd/services/m3dbnode/config/limits.go @@ -29,6 +29,12 @@ type LimitsConfiguration struct { // max is surpassed encounter an error. MaxRecentlyQueriedSeriesDiskBytesRead *MaxRecentQueryResourceLimitConfiguration `yaml:"maxRecentlyQueriedSeriesDiskBytesRead"` + // MaxRecentlyQueriedSeriesDiskRead sets the upper limit on time series read from disk within a given lookback + // period. Queries which are issued while this max is surpassed encounter an error. + // This is the number of time series, which is different from the number of bytes controlled by + // MaxRecentlyQueriedSeriesDiskBytesRead. + MaxRecentlyQueriedSeriesDiskRead *MaxRecentQueryResourceLimitConfiguration `yaml:"maxRecentlyQueriedSeriesDiskRead"` + // MaxRecentlyQueriedSeriesBlocks sets the upper limit on time series blocks // count within a given lookback period. Queries which are issued while this // max is surpassed encounter an error. diff --git a/src/dbnode/persist/fs/retriever.go b/src/dbnode/persist/fs/retriever.go index a5bfab3511..23622ab484 100644 --- a/src/dbnode/persist/fs/retriever.go +++ b/src/dbnode/persist/fs/retriever.go @@ -50,6 +50,7 @@ import ( "github.com/m3db/m3/src/x/ident" "github.com/m3db/m3/src/x/pool" + "github.com/uber-go/tally" "go.uber.org/zap" ) @@ -88,11 +89,12 @@ const ( type blockRetriever struct { sync.RWMutex - opts BlockRetrieverOptions - fsOpts Options - logger *zap.Logger - queryLimits limits.QueryLimits - bytesReadLimit limits.LookbackLimit + opts BlockRetrieverOptions + fsOpts Options + logger *zap.Logger + queryLimits limits.QueryLimits + bytesReadLimit limits.LookbackLimit + seriesReadCount tally.Counter newSeekerMgrFn newSeekerMgrFn @@ -121,18 +123,21 @@ func NewBlockRetriever( return nil, err } + scope := fsOpts.InstrumentOptions().MetricsScope().SubScope("retriever") + return &blockRetriever{ - opts: opts, - fsOpts: fsOpts, - logger: fsOpts.InstrumentOptions().Logger(), - queryLimits: opts.QueryLimits(), - bytesReadLimit: opts.QueryLimits().BytesReadLimit(), - newSeekerMgrFn: NewSeekerManager, - reqPool: opts.RetrieveRequestPool(), - bytesPool: opts.BytesPool(), - idPool: opts.IdentifierPool(), - status: blockRetrieverNotOpen, - notifyFetch: make(chan struct{}, 1), + opts: opts, + fsOpts: fsOpts, + logger: fsOpts.InstrumentOptions().Logger(), + queryLimits: opts.QueryLimits(), + bytesReadLimit: opts.QueryLimits().BytesReadLimit(), + seriesReadCount: scope.Counter("series-read"), + newSeekerMgrFn: NewSeekerManager, + reqPool: opts.RetrieveRequestPool(), + bytesPool: opts.BytesPool(), + idPool: opts.IdentifierPool(), + status: blockRetrieverNotOpen, + notifyFetch: make(chan struct{}, 1), // We just close this channel when the fetchLoops should shutdown, so no // buffering is required fetchLoopsShouldShutdownCh: make(chan struct{}), @@ -564,6 +569,11 @@ func (r *blockRetriever) streamRequest( startTime time.Time, nsCtx namespace.Context, ) (bool, error) { + req.resultWg.Add(1) + r.seriesReadCount.Inc(1) + if err := r.queryLimits.DiskSeriesReadLimit().Inc(1, req.source); err != nil { + return false, err + } req.shard = shard // NB(r): If the ID is a ident.BytesID then we can just hold @@ -579,8 +589,6 @@ func (r *blockRetriever) streamRequest( req.start = startTime req.blockSize = r.blockSize - req.resultWg.Add(1) - // Ensure to finalize at the end of request ctx.RegisterFinalizer(req) diff --git a/src/dbnode/persist/fs/retriever_test.go b/src/dbnode/persist/fs/retriever_test.go index 07754dc90e..54d57c0947 100644 --- a/src/dbnode/persist/fs/retriever_test.go +++ b/src/dbnode/persist/fs/retriever_test.go @@ -32,25 +32,29 @@ import ( "testing" "time" + "github.com/uber-go/tally" + "github.com/m3db/m3/src/cluster/shard" "github.com/m3db/m3/src/dbnode/digest" + "github.com/m3db/m3/src/dbnode/namespace" "github.com/m3db/m3/src/dbnode/persist" "github.com/m3db/m3/src/dbnode/retention" "github.com/m3db/m3/src/dbnode/sharding" "github.com/m3db/m3/src/dbnode/storage/block" "github.com/m3db/m3/src/dbnode/storage/index/convert" + "github.com/m3db/m3/src/dbnode/storage/limits" "github.com/m3db/m3/src/dbnode/ts" "github.com/m3db/m3/src/dbnode/x/xio" "github.com/m3db/m3/src/x/checked" "github.com/m3db/m3/src/x/context" "github.com/m3db/m3/src/x/ident" + "github.com/m3db/m3/src/x/instrument" "github.com/m3db/m3/src/x/pool" xsync "github.com/m3db/m3/src/x/sync" xtime "github.com/m3db/m3/src/x/time" "github.com/fortytw2/leaktest" "github.com/golang/mock/gomock" - "github.com/m3db/m3/src/dbnode/namespace" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -799,6 +803,38 @@ func TestBlockRetrieverHandlesSeekByIndexEntryErrors(t *testing.T) { testBlockRetrieverHandlesSeekErrors(t, ctrl, mockSeeker) } +func TestLimitSeriesReadFromDisk(t *testing.T) { + scope := tally.NewTestScope("test", nil) + limitOpts := limits.NewOptions(). + SetInstrumentOptions(instrument.NewOptions().SetMetricsScope(scope)). + SetBytesReadLimitOpts(limits.DefaultLookbackLimitOptions()). + SetDocsLimitOpts(limits.DefaultLookbackLimitOptions()). + SetDiskSeriesReadLimitOpts(limits.LookbackLimitOptions{ + Limit: 1, + Lookback: time.Second * 1, + }) + queryLimits, err := limits.NewQueryLimits(limitOpts) + require.NoError(t, err) + opts := NewBlockRetrieverOptions(). + SetBlockLeaseManager(&block.NoopLeaseManager{}). + SetQueryLimits(queryLimits) + publicRetriever, err := NewBlockRetriever(opts, NewOptions(). + SetInstrumentOptions(instrument.NewOptions().SetMetricsScope(scope))) + require.NoError(t, err) + req := &retrieveRequest{} + retriever := publicRetriever.(*blockRetriever) + _, _ = retriever.streamRequest(context.NewContext(), req, 0, ident.StringID("id"), time.Now(), namespace.Context{}) + _, err = retriever.streamRequest(context.NewContext(), req, 0, ident.StringID("id"), time.Now(), namespace.Context{}) + require.Error(t, err) + require.Contains(t, err.Error(), "query aborted due to limit") + + snapshot := scope.Snapshot() + seriesRead := snapshot.Counters()["test.retriever.series-read+"] + require.Equal(t, int64(2), seriesRead.Value()) + seriesLimit := snapshot.Counters()["test.query-limit.exceeded+limit=disk-series-read"] + require.Equal(t, int64(1), seriesLimit.Value()) +} + var errSeekErr = errors.New("some-error") func testBlockRetrieverHandlesSeekErrors(t *testing.T, ctrl *gomock.Controller, mockSeeker ConcurrentDataFileSetSeeker) { diff --git a/src/dbnode/server/server.go b/src/dbnode/server/server.go index ef21426ef7..7b50475ec1 100644 --- a/src/dbnode/server/server.go +++ b/src/dbnode/server/server.go @@ -450,6 +450,7 @@ func Run(runOpts RunOptions) { // Setup query stats tracking. docsLimit := limits.DefaultLookbackLimitOptions() bytesReadLimit := limits.DefaultLookbackLimitOptions() + diskSeriesReadLimit := limits.DefaultLookbackLimitOptions() if limitConfig := runOpts.Config.Limits.MaxRecentlyQueriedSeriesBlocks; limitConfig != nil { docsLimit.Limit = limitConfig.Value docsLimit.Lookback = limitConfig.Lookback @@ -458,9 +459,14 @@ func Run(runOpts RunOptions) { bytesReadLimit.Limit = limitConfig.Value bytesReadLimit.Lookback = limitConfig.Lookback } + if limitConfig := runOpts.Config.Limits.MaxRecentlyQueriedSeriesDiskRead; limitConfig != nil { + diskSeriesReadLimit.Limit = limitConfig.Value + diskSeriesReadLimit.Lookback = limitConfig.Lookback + } limitOpts := limits.NewOptions(). SetDocsLimitOpts(docsLimit). SetBytesReadLimitOpts(bytesReadLimit). + SetDiskSeriesReadLimitOpts(diskSeriesReadLimit). SetInstrumentOptions(iOpts) if builder := opts.SourceLoggerBuilder(); builder != nil { limitOpts = limitOpts.SetSourceLoggerBuilder(builder) diff --git a/src/dbnode/storage/limits/noop_query_limits.go b/src/dbnode/storage/limits/noop_query_limits.go index 672ea8d52d..cf0a9497a6 100644 --- a/src/dbnode/storage/limits/noop_query_limits.go +++ b/src/dbnode/storage/limits/noop_query_limits.go @@ -44,6 +44,10 @@ func (q *noOpQueryLimits) BytesReadLimit() LookbackLimit { return &noOpLookbackLimit{} } +func (q *noOpQueryLimits) DiskSeriesReadLimit() LookbackLimit { + return &noOpLookbackLimit{} +} + func (q *noOpQueryLimits) AnyExceeded() error { return nil } diff --git a/src/dbnode/storage/limits/options.go b/src/dbnode/storage/limits/options.go index 5f8c49a89b..cc82c121aa 100644 --- a/src/dbnode/storage/limits/options.go +++ b/src/dbnode/storage/limits/options.go @@ -28,10 +28,11 @@ import ( ) type limitOpts struct { - iOpts instrument.Options - docsLimitOpts LookbackLimitOptions - bytesReadLimitOpts LookbackLimitOptions - sourceLoggerBuilder SourceLoggerBuilder + iOpts instrument.Options + docsLimitOpts LookbackLimitOptions + bytesReadLimitOpts LookbackLimitOptions + diskSeriesReadLimitOpts LookbackLimitOptions + sourceLoggerBuilder SourceLoggerBuilder } // NewOptions creates limit options with default values. @@ -94,6 +95,18 @@ func (o *limitOpts) BytesReadLimitOpts() LookbackLimitOptions { return o.bytesReadLimitOpts } +// SetDiskSeriesReadLimitOpts sets the disk ts read limit options. +func (o *limitOpts) SetDiskSeriesReadLimitOpts(value LookbackLimitOptions) Options { + opts := *o + opts.diskSeriesReadLimitOpts = value + return &opts +} + +// DiskSeriesReadLimitOpts returns the disk ts read limit options. +func (o *limitOpts) DiskSeriesReadLimitOpts() LookbackLimitOptions { + return o.diskSeriesReadLimitOpts +} + // SetSourceLoggerBuilder sets the source logger. func (o *limitOpts) SetSourceLoggerBuilder(value SourceLoggerBuilder) Options { opts := *o diff --git a/src/dbnode/storage/limits/query_limits.go b/src/dbnode/storage/limits/query_limits.go index 44643d40ec..3b4201729c 100644 --- a/src/dbnode/storage/limits/query_limits.go +++ b/src/dbnode/storage/limits/query_limits.go @@ -36,8 +36,9 @@ const ( ) type queryLimits struct { - docsLimit *lookbackLimit - bytesReadLimit *lookbackLimit + docsLimit *lookbackLimit + bytesReadLimit *lookbackLimit + seriesDiskReadLimit *lookbackLimit } type lookbackLimit struct { @@ -72,31 +73,30 @@ func DefaultLookbackLimitOptions() LookbackLimitOptions { } // NewQueryLimits returns a new query limits manager. -func NewQueryLimits( - options Options, - // docsLimitOpts LookbackLimitOptions, - // bytesReadLimitOpts LookbackLimitOptions, - // instrumentOpts instrument.Options, -) (QueryLimits, error) { +func NewQueryLimits(options Options) (QueryLimits, error) { if err := options.Validate(); err != nil { return nil, err } var ( - iOpts = options.InstrumentOptions() - docsLimitOpts = options.DocsLimitOpts() - bytesReadLimitOpts = options.BytesReadLimitOpts() - sourceLoggerBuilder = options.SourceLoggerBuilder() + iOpts = options.InstrumentOptions() + docsLimitOpts = options.DocsLimitOpts() + bytesReadLimitOpts = options.BytesReadLimitOpts() + diskSeriesReadLimitOpts = options.DiskSeriesReadLimitOpts() + sourceLoggerBuilder = options.SourceLoggerBuilder() docsLimit = newLookbackLimit( iOpts, docsLimitOpts, "docs-matched", sourceLoggerBuilder) bytesReadLimit = newLookbackLimit( iOpts, bytesReadLimitOpts, "disk-bytes-read", sourceLoggerBuilder) + seriesDiskReadLimit = newLookbackLimit( + iOpts, diskSeriesReadLimitOpts, "disk-series-read", sourceLoggerBuilder) ) return &queryLimits{ - docsLimit: docsLimit, - bytesReadLimit: bytesReadLimit, + docsLimit: docsLimit, + bytesReadLimit: bytesReadLimit, + seriesDiskReadLimit: seriesDiskReadLimit, }, nil } @@ -145,13 +145,19 @@ func (q *queryLimits) BytesReadLimit() LookbackLimit { return q.bytesReadLimit } +func (q *queryLimits) DiskSeriesReadLimit() LookbackLimit { + return q.seriesDiskReadLimit +} + func (q *queryLimits) Start() { q.docsLimit.start() + q.seriesDiskReadLimit.start() q.bytesReadLimit.start() } func (q *queryLimits) Stop() { q.docsLimit.stop() + q.seriesDiskReadLimit.stop() q.bytesReadLimit.stop() } @@ -159,6 +165,9 @@ func (q *queryLimits) AnyExceeded() error { if err := q.docsLimit.exceeded(); err != nil { return err } + if err := q.seriesDiskReadLimit.exceeded(); err != nil { + return err + } return q.bytesReadLimit.exceeded() } diff --git a/src/dbnode/storage/limits/query_limits_test.go b/src/dbnode/storage/limits/query_limits_test.go index 7739812c62..23ee91c2ef 100644 --- a/src/dbnode/storage/limits/query_limits_test.go +++ b/src/dbnode/storage/limits/query_limits_test.go @@ -37,11 +37,13 @@ import ( func testQueryLimitOptions( docOpts LookbackLimitOptions, bytesOpts LookbackLimitOptions, + seriesOpts LookbackLimitOptions, iOpts instrument.Options, ) Options { return NewOptions(). SetDocsLimitOpts(docOpts). SetBytesReadLimitOpts(bytesOpts). + SetDiskSeriesReadLimitOpts(seriesOpts). SetInstrumentOptions(iOpts) } @@ -54,7 +56,11 @@ func TestQueryLimits(t *testing.T) { Limit: 1, Lookback: time.Second, } - opts := testQueryLimitOptions(docOpts, bytesOpts, instrument.NewOptions()) + seriesOpts := LookbackLimitOptions{ + Limit: 1, + Lookback: time.Second, + } + opts := testQueryLimitOptions(docOpts, bytesOpts, seriesOpts, instrument.NewOptions()) queryLimits, err := NewQueryLimits(opts) require.NoError(t, err) require.NotNil(t, queryLimits) @@ -69,7 +75,7 @@ func TestQueryLimits(t *testing.T) { require.True(t, xerrors.IsInvalidParams(err)) require.True(t, IsQueryLimitExceededError(err)) - opts = testQueryLimitOptions(docOpts, bytesOpts, instrument.NewOptions()) + opts = testQueryLimitOptions(docOpts, bytesOpts, seriesOpts, instrument.NewOptions()) queryLimits, err = NewQueryLimits(opts) require.NoError(t, err) require.NotNil(t, queryLimits) @@ -84,6 +90,22 @@ func TestQueryLimits(t *testing.T) { require.Error(t, err) require.True(t, xerrors.IsInvalidParams(err)) require.True(t, IsQueryLimitExceededError(err)) + + opts = testQueryLimitOptions(docOpts, bytesOpts, seriesOpts, instrument.NewOptions()) + queryLimits, err = NewQueryLimits(opts) + require.NoError(t, err) + require.NotNil(t, queryLimits) + + // No error yet. + err = queryLimits.AnyExceeded() + require.NoError(t, err) + + // Limit from bytes. + require.Error(t, queryLimits.DiskSeriesReadLimit().Inc(2, nil)) + err = queryLimits.AnyExceeded() + require.Error(t, err) + require.True(t, xerrors.IsInvalidParams(err)) + require.True(t, IsQueryLimitExceededError(err)) } func TestLookbackLimit(t *testing.T) { @@ -298,7 +320,7 @@ func TestSourceLogger(t *testing.T) { } builder = &testBuilder{records: []testLoggerRecord{}} - opts = testQueryLimitOptions(noLimit, noLimit, iOpts). + opts = testQueryLimitOptions(noLimit, noLimit, noLimit, iOpts). SetSourceLoggerBuilder(builder) ) diff --git a/src/dbnode/storage/limits/types.go b/src/dbnode/storage/limits/types.go index d38db00452..aa87c493bf 100644 --- a/src/dbnode/storage/limits/types.go +++ b/src/dbnode/storage/limits/types.go @@ -39,6 +39,8 @@ type QueryLimits interface { DocsLimit() LookbackLimit // BytesReadLimit limits queries by a global concurrent count of bytes read from disk. BytesReadLimit() LookbackLimit + // DiskSeriesReadLimit limits queries by a global concurrent count of time series read from disk. + DiskSeriesReadLimit() LookbackLimit // AnyExceeded returns an error if any of the query limits are exceeded. AnyExceeded() error @@ -97,6 +99,12 @@ type Options interface { // BytesReadLimitOpts returns the byte read limit options. BytesReadLimitOpts() LookbackLimitOptions + // SetDiskSeriesReadLimitOpts sets the disk series read limit options. + SetDiskSeriesReadLimitOpts(value LookbackLimitOptions) Options + + // DiskSeriesReadLimitOpts returns the disk series read limit options. + DiskSeriesReadLimitOpts() LookbackLimitOptions + // SetSourceLoggerBuilder sets the source logger. SetSourceLoggerBuilder(value SourceLoggerBuilder) Options From 2d68fad060e2cfc42ea3eb4dbc174157d94a66ed Mon Sep 17 00:00:00 2001 From: Vilius Pranckaitis Date: Tue, 19 Jan 2021 19:15:37 +1100 Subject: [PATCH 5/6] [x] Make serialize.TagValueFromEncodedTagsFast() faster (#3097) --- src/x/serialize/decoder_fast.go | 4 +- .../serialize/decoder_fast_benchmark_test.go | 118 ++++++++++++++++++ 2 files changed, 120 insertions(+), 2 deletions(-) create mode 100644 src/x/serialize/decoder_fast_benchmark_test.go diff --git a/src/x/serialize/decoder_fast.go b/src/x/serialize/decoder_fast.go index ef5ac8d406..e9846ac8d1 100644 --- a/src/x/serialize/decoder_fast.go +++ b/src/x/serialize/decoder_fast.go @@ -51,7 +51,7 @@ func TagValueFromEncodedTagsFast( return nil, false, fmt.Errorf("missing size for tag name: index=%d", i) } numBytesName := int(byteOrder.Uint16(encodedTags[:2])) - if numBytesName <= 0 { + if numBytesName == 0 { return nil, false, errEmptyTagNameLiteral } encodedTags = encodedTags[2:] @@ -69,7 +69,7 @@ func TagValueFromEncodedTagsFast( bytesValue := encodedTags[:numBytesValue] encodedTags = encodedTags[numBytesValue:] - if bytes.Compare(bytesName, tagName) == 0 { + if bytes.Equal(bytesName, tagName) { return bytesValue, true, nil } } diff --git a/src/x/serialize/decoder_fast_benchmark_test.go b/src/x/serialize/decoder_fast_benchmark_test.go new file mode 100644 index 0000000000..79547207c6 --- /dev/null +++ b/src/x/serialize/decoder_fast_benchmark_test.go @@ -0,0 +1,118 @@ +// Copyright (c) 2021 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package serialize + +import ( + "encoding/base64" + "math/rand" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/m3db/m3/src/x/checked" + "github.com/m3db/m3/src/x/pool" +) + +type encodedTagsWithTagName struct { + encodedTags, tagName []byte +} + +// Samples of encoded tags, taken from metrics generated by promremotebench. +//nolint:lll +var samples = []string{ + "dScMAAgAX19uYW1lX18GAGRpc2tpbwQAYXJjaAMAeDY0CgBkYXRhY2VudGVyCgB1cy13ZXN0LTJjCABob3N0bmFtZQcAaG9zdF83OAsAbWVhc3VyZW1lbnQFAHJlYWRzAgBvcwsAVWJ1bnR1MTUuMTAEAHJhY2sCADg3BgByZWdpb24JAHVzLXdlc3QtMgcAc2VydmljZQIAMTETAHNlcnZpY2VfZW52aXJvbm1lbnQKAHByb2R1Y3Rpb24PAHNlcnZpY2VfdmVyc2lvbgEAMQQAdGVhbQIAU0Y=", + "dScMAAgAX19uYW1lX18FAG5naW54BABhcmNoAwB4NjQKAGRhdGFjZW50ZXIKAHVzLXdlc3QtMWEIAGhvc3RuYW1lBwBob3N0XzM3CwBtZWFzdXJlbWVudAYAYWN0aXZlAgBvcwsAVWJ1bnR1MTYuMTAEAHJhY2sCADc4BgByZWdpb24JAHVzLXdlc3QtMQcAc2VydmljZQIAMTATAHNlcnZpY2VfZW52aXJvbm1lbnQEAHRlc3QPAHNlcnZpY2VfdmVyc2lvbgEAMAQAdGVhbQMATE9O", + "dScMAAgAX19uYW1lX18EAGRpc2sEAGFyY2gDAHg2NAoAZGF0YWNlbnRlcgoAc2EtZWFzdC0xYggAaG9zdG5hbWUHAGhvc3RfNTQLAG1lYXN1cmVtZW50DABpbm9kZXNfdG90YWwCAG9zCwBVYnVudHUxNi4xMAQAcmFjawIAODgGAHJlZ2lvbgkAc2EtZWFzdC0xBwBzZXJ2aWNlAgAxNRMAc2VydmljZV9lbnZpcm9ubWVudAoAcHJvZHVjdGlvbg8Ac2VydmljZV92ZXJzaW9uAQAwBAB0ZWFtAwBDSEk=", + "dScMAAgAX19uYW1lX18DAG5ldAQAYXJjaAMAeDg2CgBkYXRhY2VudGVyCgB1cy1lYXN0LTFiCABob3N0bmFtZQcAaG9zdF85MwsAbWVhc3VyZW1lbnQGAGVycl9pbgIAb3MLAFVidW50dTE1LjEwBAByYWNrAgAzNwYAcmVnaW9uCQB1cy1lYXN0LTEHAHNlcnZpY2UCADEyEwBzZXJ2aWNlX2Vudmlyb25tZW50CgBwcm9kdWN0aW9uDwBzZXJ2aWNlX3ZlcnNpb24BADEEAHRlYW0DAENISQ==", + "dScMAAgAX19uYW1lX18FAHJlZGlzBABhcmNoAwB4ODYKAGRhdGFjZW50ZXINAGV1LWNlbnRyYWwtMWEIAGhvc3RuYW1lBwBob3N0XzcwCwBtZWFzdXJlbWVudA8Aa2V5c3BhY2VfbWlzc2VzAgBvcw4AVWJ1bnR1MTYuMDRMVFMEAHJhY2sCADQ3BgByZWdpb24MAGV1LWNlbnRyYWwtMQcAc2VydmljZQIAMTITAHNlcnZpY2VfZW52aXJvbm1lbnQHAHN0YWdpbmcPAHNlcnZpY2VfdmVyc2lvbgEAMQQAdGVhbQMATE9O", + "dScMAAgAX19uYW1lX18FAG5naW54BABhcmNoAwB4ODYKAGRhdGFjZW50ZXIKAHVzLWVhc3QtMWIIAGhvc3RuYW1lBwBob3N0Xzg0CwBtZWFzdXJlbWVudAgAcmVxdWVzdHMCAG9zDgBVYnVudHUxNi4wNExUUwQAcmFjawIAOTAGAHJlZ2lvbgkAdXMtZWFzdC0xBwBzZXJ2aWNlAgAxMxMAc2VydmljZV9lbnZpcm9ubWVudAQAdGVzdA8Ac2VydmljZV92ZXJzaW9uAQAwBAB0ZWFtAwBOWUM=", + "dScMAAgAX19uYW1lX18DAG1lbQQAYXJjaAMAeDY0CgBkYXRhY2VudGVyDQBldS1jZW50cmFsLTFiCABob3N0bmFtZQcAaG9zdF8yNwsAbWVhc3VyZW1lbnQIAGJ1ZmZlcmVkAgBvcw4AVWJ1bnR1MTYuMDRMVFMEAHJhY2sCADU4BgByZWdpb24MAGV1LWNlbnRyYWwtMQcAc2VydmljZQEAMBMAc2VydmljZV9lbnZpcm9ubWVudAQAdGVzdA8Ac2VydmljZV92ZXJzaW9uAQAwBAB0ZWFtAwBOWUM=", + "dScMAAgAX19uYW1lX18GAGtlcm5lbAQAYXJjaAMAeDg2CgBkYXRhY2VudGVyCgB1cy13ZXN0LTJhCABob3N0bmFtZQcAaG9zdF84MAsAbWVhc3VyZW1lbnQNAGRpc2tfcGFnZXNfaW4CAG9zCwBVYnVudHUxNi4xMAQAcmFjawIANDIGAHJlZ2lvbgkAdXMtd2VzdC0yBwBzZXJ2aWNlAgAxMxMAc2VydmljZV9lbnZpcm9ubWVudAQAdGVzdA8Ac2VydmljZV92ZXJzaW9uAQAxBAB0ZWFtAgBTRg==", + "dScMAAgAX19uYW1lX18EAGRpc2sEAGFyY2gDAHg2NAoAZGF0YWNlbnRlcg8AYXAtbm9ydGhlYXN0LTFjCABob3N0bmFtZQcAaG9zdF83NwsAbWVhc3VyZW1lbnQLAGlub2Rlc191c2VkAgBvcw4AVWJ1bnR1MTYuMDRMVFMEAHJhY2sCADg0BgByZWdpb24OAGFwLW5vcnRoZWFzdC0xBwBzZXJ2aWNlAQA1EwBzZXJ2aWNlX2Vudmlyb25tZW50CgBwcm9kdWN0aW9uDwBzZXJ2aWNlX3ZlcnNpb24BADAEAHRlYW0DAExPTg==", + "dScMAAgAX19uYW1lX18JAHBvc3RncmVzbAQAYXJjaAMAeDY0CgBkYXRhY2VudGVyDQBldS1jZW50cmFsLTFiCABob3N0bmFtZQcAaG9zdF8yNwsAbWVhc3VyZW1lbnQNAHhhY3Rfcm9sbGJhY2sCAG9zDgBVYnVudHUxNi4wNExUUwQAcmFjawIANTgGAHJlZ2lvbgwAZXUtY2VudHJhbC0xBwBzZXJ2aWNlAQAwEwBzZXJ2aWNlX2Vudmlyb25tZW50BAB0ZXN0DwBzZXJ2aWNlX3ZlcnNpb24BADAEAHRlYW0DAE5ZQw==", + "dScMAAgAX19uYW1lX18DAGNwdQQAYXJjaAMAeDY0CgBkYXRhY2VudGVyCgBzYS1lYXN0LTFiCABob3N0bmFtZQcAaG9zdF80MwsAbWVhc3VyZW1lbnQKAHVzYWdlX25pY2UCAG9zCwBVYnVudHUxNi4xMAQAcmFjawIAOTUGAHJlZ2lvbgkAc2EtZWFzdC0xBwBzZXJ2aWNlAQA0EwBzZXJ2aWNlX2Vudmlyb25tZW50BAB0ZXN0DwBzZXJ2aWNlX3ZlcnNpb24BADAEAHRlYW0CAFNG", + "dScMAAgAX19uYW1lX18EAGRpc2sEAGFyY2gDAHg2NAoAZGF0YWNlbnRlcg8AYXAtbm9ydGhlYXN0LTFjCABob3N0bmFtZQcAaG9zdF8xNwsAbWVhc3VyZW1lbnQMAGlub2Rlc190b3RhbAIAb3MLAFVidW50dTE2LjEwBAByYWNrAgA5NAYAcmVnaW9uDgBhcC1ub3J0aGVhc3QtMQcAc2VydmljZQEAORMAc2VydmljZV9lbnZpcm9ubWVudAcAc3RhZ2luZw8Ac2VydmljZV92ZXJzaW9uAQAwBAB0ZWFtAgBTRg==", + "dScMAAgAX19uYW1lX18FAHJlZGlzBABhcmNoAwB4ODYKAGRhdGFjZW50ZXIKAHVzLXdlc3QtMmEIAGhvc3RuYW1lBwBob3N0XzgwCwBtZWFzdXJlbWVudBAAc3luY19wYXJ0aWFsX2VycgIAb3MLAFVidW50dTE2LjEwBAByYWNrAgA0MgYAcmVnaW9uCQB1cy13ZXN0LTIHAHNlcnZpY2UCADEzEwBzZXJ2aWNlX2Vudmlyb25tZW50BAB0ZXN0DwBzZXJ2aWNlX3ZlcnNpb24BADEEAHRlYW0CAFNG", + "dScMAAgAX19uYW1lX18DAG5ldAQAYXJjaAMAeDg2CgBkYXRhY2VudGVyCgB1cy1lYXN0LTFhCABob3N0bmFtZQcAaG9zdF83OQsAbWVhc3VyZW1lbnQIAGRyb3Bfb3V0AgBvcw4AVWJ1bnR1MTYuMDRMVFMEAHJhY2sCADE3BgByZWdpb24JAHVzLWVhc3QtMQcAc2VydmljZQIAMTcTAHNlcnZpY2VfZW52aXJvbm1lbnQHAHN0YWdpbmcPAHNlcnZpY2VfdmVyc2lvbgEAMQQAdGVhbQIAU0Y=", + "dScMAAgAX19uYW1lX18FAHJlZGlzBABhcmNoAwB4ODYKAGRhdGFjZW50ZXIPAGFwLXNvdXRoZWFzdC0yYggAaG9zdG5hbWUIAGhvc3RfMTAwCwBtZWFzdXJlbWVudBYAdXNlZF9jcHVfdXNlcl9jaGlsZHJlbgIAb3MOAFVidW50dTE2LjA0TFRTBAByYWNrAgA0MAYAcmVnaW9uDgBhcC1zb3V0aGVhc3QtMgcAc2VydmljZQIAMTQTAHNlcnZpY2VfZW52aXJvbm1lbnQHAHN0YWdpbmcPAHNlcnZpY2VfdmVyc2lvbgEAMQQAdGVhbQMATllD", + "dScMAAgAX19uYW1lX18EAGRpc2sEAGFyY2gDAHg2NAoAZGF0YWNlbnRlcg8AYXAtc291dGhlYXN0LTFhCABob3N0bmFtZQcAaG9zdF84NwsAbWVhc3VyZW1lbnQMAGlub2Rlc190b3RhbAIAb3MLAFVidW50dTE1LjEwBAByYWNrAQAwBgByZWdpb24OAGFwLXNvdXRoZWFzdC0xBwBzZXJ2aWNlAgAxMRMAc2VydmljZV9lbnZpcm9ubWVudAcAc3RhZ2luZw8Ac2VydmljZV92ZXJzaW9uAQAwBAB0ZWFtAwBMT04=", + "dScMAAgAX19uYW1lX18DAGNwdQQAYXJjaAMAeDY0CgBkYXRhY2VudGVyCgB1cy13ZXN0LTJhCABob3N0bmFtZQYAaG9zdF82CwBtZWFzdXJlbWVudAoAdXNhZ2VfaWRsZQIAb3MLAFVidW50dTE2LjEwBAByYWNrAgAxMAYAcmVnaW9uCQB1cy13ZXN0LTIHAHNlcnZpY2UBADYTAHNlcnZpY2VfZW52aXJvbm1lbnQEAHRlc3QPAHNlcnZpY2VfdmVyc2lvbgEAMAQAdGVhbQMAQ0hJ", + "dScMAAgAX19uYW1lX18FAG5naW54BABhcmNoAwB4ODYKAGRhdGFjZW50ZXIKAHVzLWVhc3QtMWEIAGhvc3RuYW1lBwBob3N0XzQ0CwBtZWFzdXJlbWVudAcAaGFuZGxlZAIAb3MOAFVidW50dTE2LjA0TFRTBAByYWNrAgA2MQYAcmVnaW9uCQB1cy1lYXN0LTEHAHNlcnZpY2UBADITAHNlcnZpY2VfZW52aXJvbm1lbnQHAHN0YWdpbmcPAHNlcnZpY2VfdmVyc2lvbgEAMQQAdGVhbQMATllD", + "dScMAAgAX19uYW1lX18FAG5naW54BABhcmNoAwB4ODYKAGRhdGFjZW50ZXIKAHVzLXdlc3QtMWEIAGhvc3RuYW1lBwBob3N0XzI5CwBtZWFzdXJlbWVudAcAd2FpdGluZwIAb3MLAFVidW50dTE1LjEwBAByYWNrAgAxNQYAcmVnaW9uCQB1cy13ZXN0LTEHAHNlcnZpY2UBADQTAHNlcnZpY2VfZW52aXJvbm1lbnQEAHRlc3QPAHNlcnZpY2VfdmVyc2lvbgEAMQQAdGVhbQMATllD", + "dScMAAgAX19uYW1lX18GAGRpc2tpbwQAYXJjaAMAeDY0CgBkYXRhY2VudGVyDwBhcC1ub3J0aGVhc3QtMWMIAGhvc3RuYW1lBwBob3N0XzM4CwBtZWFzdXJlbWVudAoAd3JpdGVfdGltZQIAb3MLAFVidW50dTE1LjEwBAByYWNrAgAyMAYAcmVnaW9uDgBhcC1ub3J0aGVhc3QtMQcAc2VydmljZQEAMBMAc2VydmljZV9lbnZpcm9ubWVudAcAc3RhZ2luZw8Ac2VydmljZV92ZXJzaW9uAQAwBAB0ZWFtAgBTRg==", +} + +// BenchmarkTagValueFromEncodedTagsFast-12 11756650 110 ns/op +func BenchmarkTagValueFromEncodedTagsFast(b *testing.B) { + testData := prepareData(b) + + b.ResetTimer() + for i := range testData { + _, _, err := TagValueFromEncodedTagsFast(testData[i].encodedTags, testData[i].tagName) + require.NoError(b, err) + } +} + +func prepareData(b *testing.B) []encodedTagsWithTagName { + encodedTags, err := base64.StdEncoding.DecodeString(samples[0]) + require.NoError(b, err) + // Extracting tag names. Each sample has the same set of tag names, so using any of them + tagNames, err := decodeTagNames(encodedTags) + require.NoError(b, err) + tagNames = append(tagNames, []byte("not_exist")) + + var ( + result = make([]encodedTagsWithTagName, 0, b.N) + rnd = rand.New(rand.NewSource(42)) //nolint:gosec + ) + for i := 0; i < b.N; i++ { + tagName := tagNames[rnd.Intn(len(tagNames))] + encodedTags, err = base64.StdEncoding.DecodeString(samples[rnd.Intn(len(samples))]) + require.NoError(b, err) + + result = append(result, encodedTagsWithTagName{ + encodedTags: encodedTags, + tagName: tagName, + }) + } + + return result +} + +func decodeTagNames(encodedTags []byte) ([][]byte, error) { + decoderPool := NewTagDecoderPool(NewTagDecoderOptions(TagDecoderOptionsConfig{}), + pool.NewObjectPoolOptions()) + decoderPool.Init() + decoder := decoderPool.Get() + defer decoder.Close() + + var tagNames [][]byte + decoder.Reset(checked.NewBytes(encodedTags, nil)) + for decoder.Next() { + tagNames = append(tagNames, decoder.Current().Name.Bytes()) + } + + if decoder.Err() != nil { + return nil, decoder.Err() + } + + return tagNames, nil +} From cdf5f1196534bb8891de58bace8848ba2e6036bf Mon Sep 17 00:00:00 2001 From: Gediminas Guoba Date: Tue, 19 Jan 2021 13:12:13 +0200 Subject: [PATCH 6/6] Revert "[aggregator] keep metric type during the aggregation" (#3099) --- .../m3_stack/m3coordinator-aggregator.yml | 2 - .../m3_stack/m3coordinator-standard.yml | 2 - .../aggregator/m3coordinator.yml | 2 - .../aggregator/test.sh | 52 -------- src/aggregator/aggregation/counter_test.go | 2 - src/aggregator/aggregator/aggregator.go | 1 - src/aggregator/aggregator/counter_elem_gen.go | 7 +- src/aggregator/aggregator/elem_base_test.go | 4 +- src/aggregator/aggregator/elem_test.go | 11 +- src/aggregator/aggregator/flush.go | 2 - src/aggregator/aggregator/gauge_elem_gen.go | 7 +- src/aggregator/aggregator/generic_elem.go | 6 +- .../aggregator/handler/writer/protobuf.go | 1 - src/aggregator/aggregator/list.go | 4 - src/aggregator/aggregator/list_test.go | 2 - src/aggregator/aggregator/timer_elem_gen.go | 7 +- src/aggregator/generated-source-files.mk | 1 - .../m3coordinator/ingest/m3msg/config.go | 5 +- .../m3coordinator/ingest/m3msg/ingest.go | 116 ++++++------------ .../m3coordinator/ingest/m3msg/ingest_test.go | 10 +- .../services/m3coordinator/ingest/write.go | 27 ++-- .../server/m3msg/protobuf_handler.go | 2 +- .../server/m3msg/protobuf_handler_test.go | 2 - .../m3coordinator/server/m3msg/types.go | 2 - src/metrics/aggregation/type.go | 2 +- .../encoding/protobuf/aggregated_decoder.go | 11 -- src/metrics/metric/aggregated/types.go | 1 - .../api/v1/handler/prometheus/remote/write.go | 24 +--- .../handler/prometheus/remote/write_test.go | 83 ------------- src/query/server/query.go | 7 +- src/query/server/query_test.go | 4 +- src/query/storage/converter.go | 13 +- src/query/storage/converter_test.go | 8 +- src/x/headers/headers.go | 5 - 34 files changed, 85 insertions(+), 350 deletions(-) diff --git a/scripts/development/m3_stack/m3coordinator-aggregator.yml b/scripts/development/m3_stack/m3coordinator-aggregator.yml index 2591de57d2..b622668a4a 100644 --- a/scripts/development/m3_stack/m3coordinator-aggregator.yml +++ b/scripts/development/m3_stack/m3coordinator-aggregator.yml @@ -80,5 +80,3 @@ carbon: tagOptions: idScheme: quoted - -storeMetricsType: true diff --git a/scripts/development/m3_stack/m3coordinator-standard.yml b/scripts/development/m3_stack/m3coordinator-standard.yml index 5da1d01b5b..16137c65b4 100644 --- a/scripts/development/m3_stack/m3coordinator-standard.yml +++ b/scripts/development/m3_stack/m3coordinator-standard.yml @@ -36,5 +36,3 @@ carbon: tagOptions: idScheme: quoted - -storeMetricsType: true diff --git a/scripts/docker-integration-tests/aggregator/m3coordinator.yml b/scripts/docker-integration-tests/aggregator/m3coordinator.yml index 59319cfde7..10ba278c91 100644 --- a/scripts/docker-integration-tests/aggregator/m3coordinator.yml +++ b/scripts/docker-integration-tests/aggregator/m3coordinator.yml @@ -77,5 +77,3 @@ ingest: retry: maxBackoff: 10s jitter: true - -storeMetricsType: true diff --git a/scripts/docker-integration-tests/aggregator/test.sh b/scripts/docker-integration-tests/aggregator/test.sh index a0453ef12a..8d287df7e0 100755 --- a/scripts/docker-integration-tests/aggregator/test.sh +++ b/scripts/docker-integration-tests/aggregator/test.sh @@ -170,14 +170,12 @@ function prometheus_remote_write { local label1_value=${label1_value:-label1} local label2_name=${label2_name:-label2} local label2_value=${label2_value:-label2} - local metric_type=${metric_type:counter} network_name="aggregator" network=$(docker network ls | fgrep $network_name | tr -s ' ' | cut -f 1 -d ' ' | tail -n 1) out=$((docker run -it --rm --network $network \ $PROMREMOTECLI_IMAGE \ -u http://m3coordinator01:7202/api/v1/prom/remote/write \ - -h M3-Prom-Type:${metric_type} \ -t __name__:${metric_name} \ -t ${label0_name}:${label0_value} \ -t ${label1_name}:${label1_value} \ @@ -219,22 +217,6 @@ function prometheus_query_native { return $? } -function dbnode_fetch { - local namespace=${namespace} - local id=${id} - local rangeStart=${rangeStart} - local rangeEnd=${rangeEnd} - local jq_path=${jq_path:-} - local expected_value=${expected_value:-} - - result=$(curl -s \ - "0.0.0.0:9002/fetch" \ - "-d" \ - "{\"namespace\": \"${namespace}\", \"id\": \"${id}\", \"rangeStart\": ${rangeStart}, \"rangeEnd\": ${rangeEnd}}" | jq -r "${jq_path}") - test "$result" = "$expected_value" - return $? -} - function test_aggregated_rollup_rule { resolution_seconds="10" now=$(date +"%s") @@ -252,7 +234,6 @@ function test_aggregated_rollup_rule { label0_name="app" label0_value="nginx_edge" \ label1_name="status_code" label1_value="500" \ label2_name="endpoint" label2_value="/foo/bar" \ - metric_type="counter" \ prometheus_remote_write \ http_requests $write_at $value \ true "Expected request to succeed" \ @@ -270,7 +251,6 @@ function test_aggregated_rollup_rule { label0_name="app" label0_value="nginx_edge" \ label1_name="status_code" label1_value="500" \ label2_name="endpoint" label2_value="/foo/baz" \ - metric_type="gauge" \ prometheus_remote_write \ http_requests $write_at $value \ true "Expected request to succeed" \ @@ -304,38 +284,6 @@ function test_aggregated_rollup_rule { retry_with_backoff prometheus_query_native } -function test_metric_type_survives_aggregation { - now=$(date +"%s") - - echo "Test metric type should be kept after aggregation" - - # Emit values for endpoint /foo/bar (to ensure right values aggregated) - write_at="$now_truncated" - value="42" - - metric_type="counter" \ - prometheus_remote_write \ - metric_type_test $now $value \ - true "Expected request to succeed" \ - 200 "Expected request to return status code 200" - - start=$(( $now - 3600 )) - end=$(( $now + 3600 )) - jq_path=".datapoints[0].annotation" - - echo "Test query metric type" - - # Test by metric types are stored in aggregated namespace - ATTEMPTS=50 TIMEOUT=2 MAX_TIMEOUT=4 \ - namespace="agg" \ - id='{__name__=\"metric_type_test\",label0=\"label0\",label1=\"label1\",label2=\"label2\"}' \ - rangeStart=${start} \ - rangeEnd=${end} \ - jq_path="$jq_path" expected_value="CAEQAQ==" \ - retry_with_backoff dbnode_fetch -} - echo "Run tests" test_aggregated_graphite_metric test_aggregated_rollup_rule -test_metric_type_survives_aggregation diff --git a/src/aggregator/aggregation/counter_test.go b/src/aggregator/aggregation/counter_test.go index 57610479aa..47492459fe 100644 --- a/src/aggregator/aggregation/counter_test.go +++ b/src/aggregator/aggregation/counter_test.go @@ -70,8 +70,6 @@ func TestCounterCustomAggregationType(t *testing.T) { require.Equal(t, float64(338350), v) case aggregation.Stdev: require.InDelta(t, 29.01149, v, 0.001) - case aggregation.Last: - require.Equal(t, 0.0, v) default: require.Equal(t, float64(0), v) require.False(t, aggType.IsValidForCounter()) diff --git a/src/aggregator/aggregator/aggregator.go b/src/aggregator/aggregator/aggregator.go index 2e30c8b4e5..5e0df44e9c 100644 --- a/src/aggregator/aggregator/aggregator.go +++ b/src/aggregator/aggregator/aggregator.go @@ -287,7 +287,6 @@ func (agg *aggregator) AddPassthrough( ChunkedID: id.ChunkedID{ Data: []byte(metric.ID), }, - Type: metric.Type, TimeNanos: metric.TimeNanos, Value: metric.Value, }, diff --git a/src/aggregator/aggregator/counter_elem_gen.go b/src/aggregator/aggregator/counter_elem_gen.go index 040a988321..625ce7de2d 100644 --- a/src/aggregator/aggregator/counter_elem_gen.go +++ b/src/aggregator/aggregator/counter_elem_gen.go @@ -31,7 +31,6 @@ import ( "time" maggregation "github.com/m3db/m3/src/metrics/aggregation" - "github.com/m3db/m3/src/metrics/metric" "github.com/m3db/m3/src/metrics/metric/id" "github.com/m3db/m3/src/metrics/metric/unaggregated" "github.com/m3db/m3/src/metrics/pipeline/applied" @@ -481,10 +480,10 @@ func (e *CounterElem) processValueWithAggregationLock( for _, point := range toFlush { switch e.idPrefixSuffixType { case NoPrefixNoSuffix: - flushLocalFn(nil, e.id, metric.CounterType, nil, point.TimeNanos, point.Value, e.sp) + flushLocalFn(nil, e.id, nil, point.TimeNanos, point.Value, e.sp) case WithPrefixWithSuffix: - flushLocalFn(e.FullPrefix(e.opts), e.id, metric.CounterType, - e.TypeStringFor(e.aggTypesOpts, aggType), point.TimeNanos, point.Value, e.sp) + flushLocalFn(e.FullPrefix(e.opts), e.id, e.TypeStringFor(e.aggTypesOpts, aggType), + point.TimeNanos, point.Value, e.sp) } } } else { diff --git a/src/aggregator/aggregator/elem_base_test.go b/src/aggregator/aggregator/elem_base_test.go index a13ab372bc..a277432a51 100644 --- a/src/aggregator/aggregator/elem_base_test.go +++ b/src/aggregator/aggregator/elem_base_test.go @@ -203,9 +203,9 @@ func TestCounterElemBaseResetSetData(t *testing.T) { func TestCounterElemBaseResetSetDataInvalidTypes(t *testing.T) { e := counterElemBase{} - err := e.ResetSetData(nil, maggregation.Types{maggregation.P10}, false) + err := e.ResetSetData(nil, maggregation.Types{maggregation.Last}, false) require.Error(t, err) - require.True(t, strings.Contains(err.Error(), "invalid aggregation types P10 for counter")) + require.True(t, strings.Contains(err.Error(), "invalid aggregation types Last for counter")) } func TestTimerElemBase(t *testing.T) { diff --git a/src/aggregator/aggregator/elem_test.go b/src/aggregator/aggregator/elem_test.go index a87130d10c..845c47c13a 100644 --- a/src/aggregator/aggregator/elem_test.go +++ b/src/aggregator/aggregator/elem_test.go @@ -158,10 +158,12 @@ func TestCounterResetSetData(t *testing.T) { func TestCounterResetSetDataInvalidAggregationType(t *testing.T) { opts := NewOptions() - ce := MustNewCounterElem(nil, policy.EmptyStoragePolicy, maggregation.DefaultTypes, - applied.DefaultPipeline, testNumForwardedTimes, NoPrefixNoSuffix, opts) - err := ce.ResetSetData(testCounterID, testStoragePolicy, maggregation.Types{maggregation.P10}, - applied.DefaultPipeline, 0, NoPrefixNoSuffix) + ce := MustNewCounterElem(nil, policy.EmptyStoragePolicy, + maggregation.DefaultTypes, applied.DefaultPipeline, + testNumForwardedTimes, NoPrefixNoSuffix, opts) + err := ce.ResetSetData(testCounterID, testStoragePolicy, + maggregation.Types{maggregation.Last}, applied.DefaultPipeline, + 0, NoPrefixNoSuffix) require.Error(t, err) } @@ -1812,7 +1814,6 @@ func testFlushLocalMetricFn() ( return func( idPrefix []byte, id id.RawID, - metricType metric.Type, idSuffix []byte, timeNanos int64, value float64, diff --git a/src/aggregator/aggregator/flush.go b/src/aggregator/aggregator/flush.go index 1f93fa1621..e5d41e037e 100644 --- a/src/aggregator/aggregator/flush.go +++ b/src/aggregator/aggregator/flush.go @@ -23,7 +23,6 @@ package aggregator import ( "time" - "github.com/m3db/m3/src/metrics/metric" "github.com/m3db/m3/src/metrics/metric/id" "github.com/m3db/m3/src/metrics/policy" ) @@ -84,7 +83,6 @@ const ( type flushLocalMetricFn func( idPrefix []byte, id id.RawID, - metricType metric.Type, idSuffix []byte, timeNanos int64, value float64, diff --git a/src/aggregator/aggregator/gauge_elem_gen.go b/src/aggregator/aggregator/gauge_elem_gen.go index a845a296a2..20efbc03fa 100644 --- a/src/aggregator/aggregator/gauge_elem_gen.go +++ b/src/aggregator/aggregator/gauge_elem_gen.go @@ -31,7 +31,6 @@ import ( "time" maggregation "github.com/m3db/m3/src/metrics/aggregation" - "github.com/m3db/m3/src/metrics/metric" "github.com/m3db/m3/src/metrics/metric/id" "github.com/m3db/m3/src/metrics/metric/unaggregated" "github.com/m3db/m3/src/metrics/pipeline/applied" @@ -481,10 +480,10 @@ func (e *GaugeElem) processValueWithAggregationLock( for _, point := range toFlush { switch e.idPrefixSuffixType { case NoPrefixNoSuffix: - flushLocalFn(nil, e.id, metric.GaugeType, nil, point.TimeNanos, point.Value, e.sp) + flushLocalFn(nil, e.id, nil, point.TimeNanos, point.Value, e.sp) case WithPrefixWithSuffix: - flushLocalFn(e.FullPrefix(e.opts), e.id, metric.GaugeType, - e.TypeStringFor(e.aggTypesOpts, aggType), point.TimeNanos, point.Value, e.sp) + flushLocalFn(e.FullPrefix(e.opts), e.id, e.TypeStringFor(e.aggTypesOpts, aggType), + point.TimeNanos, point.Value, e.sp) } } } else { diff --git a/src/aggregator/aggregator/generic_elem.go b/src/aggregator/aggregator/generic_elem.go index 4a1370c83c..eab77d6b34 100644 --- a/src/aggregator/aggregator/generic_elem.go +++ b/src/aggregator/aggregator/generic_elem.go @@ -537,10 +537,10 @@ func (e *GenericElem) processValueWithAggregationLock( for _, point := range toFlush { switch e.idPrefixSuffixType { case NoPrefixNoSuffix: - flushLocalFn(nil, e.id, metric.GaugeType, nil, point.TimeNanos, point.Value, e.sp) + flushLocalFn(nil, e.id, nil, point.TimeNanos, point.Value, e.sp) case WithPrefixWithSuffix: - flushLocalFn(e.FullPrefix(e.opts), e.id, metric.GaugeType, - e.TypeStringFor(e.aggTypesOpts, aggType), point.TimeNanos, point.Value, e.sp) + flushLocalFn(e.FullPrefix(e.opts), e.id, e.TypeStringFor(e.aggTypesOpts, aggType), + point.TimeNanos, point.Value, e.sp) } } } else { diff --git a/src/aggregator/aggregator/handler/writer/protobuf.go b/src/aggregator/aggregator/handler/writer/protobuf.go index abe84608f3..f00f8d3564 100644 --- a/src/aggregator/aggregator/handler/writer/protobuf.go +++ b/src/aggregator/aggregator/handler/writer/protobuf.go @@ -133,7 +133,6 @@ func (w *protobufWriter) prepare(mp aggregated.ChunkedMetricWithStoragePolicy) ( w.m.ID = append(w.m.ID, mp.Suffix...) w.m.Metric.TimeNanos = mp.TimeNanos w.m.Metric.Value = mp.Value - w.m.Metric.Type = mp.Type w.m.StoragePolicy = mp.StoragePolicy shard := w.shardFn(w.m.ID, w.numShards) return w.m, shard diff --git a/src/aggregator/aggregator/list.go b/src/aggregator/aggregator/list.go index 20f51b6d2c..ea38acfcc3 100644 --- a/src/aggregator/aggregator/list.go +++ b/src/aggregator/aggregator/list.go @@ -30,7 +30,6 @@ import ( "github.com/m3db/m3/src/aggregator/aggregator/handler" "github.com/m3db/m3/src/aggregator/aggregator/handler/writer" - "github.com/m3db/m3/src/metrics/metric" "github.com/m3db/m3/src/metrics/metric/aggregated" metricid "github.com/m3db/m3/src/metrics/metric/id" "github.com/m3db/m3/src/metrics/policy" @@ -435,7 +434,6 @@ func (l *baseMetricList) flushBefore(beforeNanos int64, flushType flushType) { func (l *baseMetricList) consumeLocalMetric( idPrefix []byte, id metricid.RawID, - metricType metric.Type, idSuffix []byte, timeNanos int64, value float64, @@ -449,7 +447,6 @@ func (l *baseMetricList) consumeLocalMetric( chunkedMetricWithPolicy := aggregated.ChunkedMetricWithStoragePolicy{ ChunkedMetric: aggregated.ChunkedMetric{ ChunkedID: chunkedID, - Type: metricType, TimeNanos: timeNanos, Value: value, }, @@ -466,7 +463,6 @@ func (l *baseMetricList) consumeLocalMetric( func (l *baseMetricList) discardLocalMetric( idPrefix []byte, id metricid.RawID, - metricType metric.Type, idSuffix []byte, timeNanos int64, value float64, diff --git a/src/aggregator/aggregator/list_test.go b/src/aggregator/aggregator/list_test.go index 9a523521d3..67e61f1c4b 100644 --- a/src/aggregator/aggregator/list_test.go +++ b/src/aggregator/aggregator/list_test.go @@ -604,7 +604,6 @@ func TestTimedMetricListFlushConsumingAndCollectingTimedMetrics(t *testing.T) { ChunkedID: id.ChunkedID{ Data: ep.metric.ID, }, - Type: ep.metric.Type, TimeNanos: alignedStart, Value: ep.metric.Value, }, @@ -1057,7 +1056,6 @@ func TestForwardedMetricListLastStepLocalFlush(t *testing.T) { Prefix: ep.expectedPrefix, Data: ep.metric.ID, }, - Type: ep.metric.Type, TimeNanos: alignedStart, Value: ep.metric.Values[0], }, diff --git a/src/aggregator/aggregator/timer_elem_gen.go b/src/aggregator/aggregator/timer_elem_gen.go index cf46acc790..52e0d2be88 100644 --- a/src/aggregator/aggregator/timer_elem_gen.go +++ b/src/aggregator/aggregator/timer_elem_gen.go @@ -31,7 +31,6 @@ import ( "time" maggregation "github.com/m3db/m3/src/metrics/aggregation" - "github.com/m3db/m3/src/metrics/metric" "github.com/m3db/m3/src/metrics/metric/id" "github.com/m3db/m3/src/metrics/metric/unaggregated" "github.com/m3db/m3/src/metrics/pipeline/applied" @@ -481,10 +480,10 @@ func (e *TimerElem) processValueWithAggregationLock( for _, point := range toFlush { switch e.idPrefixSuffixType { case NoPrefixNoSuffix: - flushLocalFn(nil, e.id, metric.GaugeType, nil, point.TimeNanos, point.Value, e.sp) + flushLocalFn(nil, e.id, nil, point.TimeNanos, point.Value, e.sp) case WithPrefixWithSuffix: - flushLocalFn(e.FullPrefix(e.opts), e.id, metric.GaugeType, - e.TypeStringFor(e.aggTypesOpts, aggType), point.TimeNanos, point.Value, e.sp) + flushLocalFn(e.FullPrefix(e.opts), e.id, e.TypeStringFor(e.aggTypesOpts, aggType), + point.TimeNanos, point.Value, e.sp) } } } else { diff --git a/src/aggregator/generated-source-files.mk b/src/aggregator/generated-source-files.mk index d95dbd85d3..df9293c791 100644 --- a/src/aggregator/generated-source-files.mk +++ b/src/aggregator/generated-source-files.mk @@ -13,7 +13,6 @@ genny-all: genny-aggregator-counter-elem genny-aggregator-timer-elem genny-aggre genny-aggregator-counter-elem: cat $(m3db_package_path)/src/aggregator/aggregator/generic_elem.go \ | awk '/^package/{i++}i' \ - | sed 's/metric.GaugeType/metric.CounterType/' \ | genny -out=$(m3db_package_path)/src/aggregator/aggregator/counter_elem_gen.go -pkg=aggregator gen \ "timedAggregation=timedCounter lockedAggregation=lockedCounterAggregation typeSpecificAggregation=counterAggregation typeSpecificElemBase=counterElemBase genericElemPool=CounterElemPool GenericElem=CounterElem" diff --git a/src/cmd/services/m3coordinator/ingest/m3msg/config.go b/src/cmd/services/m3coordinator/ingest/m3msg/config.go index 7ca43135d1..9d7c17cce2 100644 --- a/src/cmd/services/m3coordinator/ingest/m3msg/config.go +++ b/src/cmd/services/m3coordinator/ingest/m3msg/config.go @@ -46,9 +46,8 @@ func (cfg Configuration) NewIngester( appender storage.Appender, tagOptions models.TagOptions, instrumentOptions instrument.Options, - storeMetricsType bool, ) (*Ingester, error) { - opts, err := cfg.newOptions(appender, tagOptions, instrumentOptions, storeMetricsType) + opts, err := cfg.newOptions(appender, tagOptions, instrumentOptions) if err != nil { return nil, err } @@ -59,7 +58,6 @@ func (cfg Configuration) newOptions( appender storage.Appender, tagOptions models.TagOptions, instrumentOptions instrument.Options, - storeMetricsType bool, ) (Options, error) { scope := instrumentOptions.MetricsScope().Tagged( map[string]string{"component": "ingester"}, @@ -100,6 +98,5 @@ func (cfg Configuration) newOptions( RetryOptions: cfg.Retry.NewOptions(scope), Sampler: sampler, InstrumentOptions: instrumentOptions, - StoreMetricsType: storeMetricsType, }, nil } diff --git a/src/cmd/services/m3coordinator/ingest/m3msg/ingest.go b/src/cmd/services/m3coordinator/ingest/m3msg/ingest.go index ca2920197f..8105bd9b3c 100644 --- a/src/cmd/services/m3coordinator/ingest/m3msg/ingest.go +++ b/src/cmd/services/m3coordinator/ingest/m3msg/ingest.go @@ -56,7 +56,6 @@ type Options struct { Sampler *sampler.Sampler InstrumentOptions instrument.Options TagOptions models.TagOptions - StoreMetricsType bool } type ingestMetrics struct { @@ -100,15 +99,14 @@ func NewIngester( // pooled, but currently this is the only way to get tag decoder. tagDecoder := opts.TagDecoderPool.Get() op := ingestOp{ - storageAppender: opts.Appender, - retrier: retrier, - iter: serialize.NewMetricTagsIterator(tagDecoder, nil), - tagOpts: tagOpts, - pool: p, - metrics: m, - logger: opts.InstrumentOptions.Logger(), - sampler: opts.Sampler, - storeMetricsType: opts.StoreMetricsType, + s: opts.Appender, + r: retrier, + it: serialize.NewMetricTagsIterator(tagDecoder, nil), + tagOpts: tagOpts, + p: p, + m: m, + logger: opts.InstrumentOptions.Logger(), + sampler: opts.Sampler, } op.attemptFn = op.attempt op.ingestFn = op.ingest @@ -125,16 +123,14 @@ func NewIngester( func (i *Ingester) Ingest( ctx context.Context, id []byte, - metricType ts.PromMetricType, metricNanos, encodeNanos int64, value float64, sp policy.StoragePolicy, callback m3msg.Callbackable, ) { op := i.p.Get().(*ingestOp) - op.ctx = ctx + op.c = ctx op.id = id - op.metricType = metricType op.metricNanos = metricNanos op.value = value op.sp = sp @@ -143,28 +139,26 @@ func (i *Ingester) Ingest( } type ingestOp struct { - storageAppender storage.Appender - retrier retry.Retrier - iter id.SortedTagIterator - tagOpts models.TagOptions - pool pool.ObjectPool - metrics ingestMetrics - logger *zap.Logger - sampler *sampler.Sampler - attemptFn retry.Fn - ingestFn func() - storeMetricsType bool + s storage.Appender + r retry.Retrier + it id.SortedTagIterator + tagOpts models.TagOptions + p pool.ObjectPool + m ingestMetrics + logger *zap.Logger + sampler *sampler.Sampler + attemptFn retry.Fn + ingestFn func() - ctx context.Context + c context.Context id []byte - metricType ts.PromMetricType metricNanos int64 value float64 sp policy.StoragePolicy callback m3msg.Callbackable tags models.Tags datapoints ts.Datapoints - writeQuery storage.WriteQuery + q storage.WriteQuery } func (op *ingestOp) sample() bool { @@ -176,22 +170,22 @@ func (op *ingestOp) sample() bool { func (op *ingestOp) ingest() { if err := op.resetWriteQuery(); err != nil { - op.metrics.ingestInternalError.Inc(1) + op.m.ingestInternalError.Inc(1) op.callback.Callback(m3msg.OnRetriableError) - op.pool.Put(op) + op.p.Put(op) if op.sample() { op.logger.Error("could not reset ingest op", zap.Error(err)) } return } - if err := op.retrier.Attempt(op.attemptFn); err != nil { + if err := op.r.Attempt(op.attemptFn); err != nil { nonRetryableErr := xerrors.IsNonRetryableError(err) if nonRetryableErr { op.callback.Callback(m3msg.OnNonRetriableError) - op.metrics.ingestNonRetryableError.Inc(1) + op.m.ingestNonRetryableError.Inc(1) } else { op.callback.Callback(m3msg.OnRetriableError) - op.metrics.ingestInternalError.Inc(1) + op.m.ingestInternalError.Inc(1) } // NB(r): Always log non-retriable errors since they are usually @@ -203,16 +197,16 @@ func (op *ingestOp) ingest() { zap.Bool("retryableError", !nonRetryableErr)) } - op.pool.Put(op) + op.p.Put(op) return } - op.metrics.ingestSuccess.Inc(1) + op.m.ingestSuccess.Inc(1) op.callback.Callback(m3msg.OnSuccess) - op.pool.Put(op) + op.p.Put(op) } func (op *ingestOp) attempt() error { - return op.storageAppender.Write(op.ctx, &op.writeQuery) + return op.s.Write(op.c, &op.q) } func (op *ingestOp) resetWriteQuery() error { @@ -220,8 +214,7 @@ func (op *ingestOp) resetWriteQuery() error { return err } op.resetDataPoints() - - wq := storage.WriteQueryOptions{ + return op.q.Reset(storage.WriteQueryOptions{ Tags: op.tags, Datapoints: op.datapoints, Unit: convert.UnitForM3DB(op.sp.Resolution().Precision), @@ -230,50 +223,15 @@ func (op *ingestOp) resetWriteQuery() error { Resolution: op.sp.Resolution().Window, Retention: op.sp.Retention().Duration(), }, - } - - if op.storeMetricsType { - var err error - wq.Annotation, err = op.convertTypeToAnnotation(op.metricType) - if err != nil { - return err - } - } - - return op.writeQuery.Reset(wq) -} - -func (op *ingestOp) convertTypeToAnnotation(tp ts.PromMetricType) ([]byte, error) { - if tp == ts.PromMetricTypeUnknown { - return nil, nil - } - - handleValueResets := false - if tp == ts.PromMetricTypeCounter { - handleValueResets = true - } - - annotationPayload, err := storage.SeriesAttributesToAnnotationPayload(tp, handleValueResets) - if err != nil { - return nil, err - } - annot, err := annotationPayload.Marshal() - if err != nil { - return nil, err - } - - if len(annot) == 0 { - annot = nil - } - return annot, nil + }) } func (op *ingestOp) resetTags() error { - op.iter.Reset(op.id) + op.it.Reset(op.id) op.tags.Tags = op.tags.Tags[:0] op.tags.Opts = op.tagOpts - for op.iter.Next() { - name, value := op.iter.Current() + for op.it.Next() { + name, value := op.it.Current() // TODO_FIX_GRAPHITE_TAGGING: Using this string constant to track // all places worth fixing this hack. There is at least one @@ -283,7 +241,7 @@ func (op *ingestOp) resetTags() error { if bytes.Equal(value, downsample.GraphiteIDSchemeTagValue) && op.tags.Opts.IDSchemeType() != models.TypeGraphite { // Restart iteration with graphite tag options parsing - op.iter.Reset(op.id) + op.it.Reset(op.id) op.tags.Tags = op.tags.Tags[:0] op.tags.Opts = op.tags.Opts.SetIDSchemeType(models.TypeGraphite) } @@ -298,7 +256,7 @@ func (op *ingestOp) resetTags() error { }.Clone()) } op.tags.Normalize() - return op.iter.Err() + return op.it.Err() } func (op *ingestOp) resetDataPoints() { diff --git a/src/cmd/services/m3coordinator/ingest/m3msg/ingest_test.go b/src/cmd/services/m3coordinator/ingest/m3msg/ingest_test.go index 2e9336065f..5e9053231f 100644 --- a/src/cmd/services/m3coordinator/ingest/m3msg/ingest_test.go +++ b/src/cmd/services/m3coordinator/ingest/m3msg/ingest_test.go @@ -59,7 +59,7 @@ func TestIngest(t *testing.T) { } appender := &mockAppender{} ingester, err := cfg.NewIngester(appender, models.NewTagOptions(), - instrument.NewOptions(), true) + instrument.NewOptions()) require.NoError(t, err) id := newTestID(t, "__name__", "foo", "app", "bar") @@ -72,14 +72,14 @@ func TestIngest(t *testing.T) { callback := m3msg.NewProtobufCallback(m, protobuf.NewAggregatedDecoder(nil), &wg) m.EXPECT().Ack() - ingester.Ingest(context.TODO(), id, ts.PromMetricTypeGauge, metricNanos, 0, val, sp, callback) + ingester.Ingest(context.TODO(), id, metricNanos, 0, val, sp, callback) for appender.cnt() != 1 { time.Sleep(100 * time.Millisecond) } expected, err := storage.NewWriteQuery(storage.WriteQueryOptions{ - Annotation: []byte{8, 2}, + Annotation: nil, Attributes: storagemetadata.Attributes{ MetricsType: storagemetadata.AggregatedMetricsType, Resolution: time.Minute, @@ -131,7 +131,7 @@ func TestIngestNonRetryableError(t *testing.T) { nonRetryableError := xerrors.NewNonRetryableError(errors.New("bad request error")) appender := &mockAppender{expectErr: nonRetryableError} ingester, err := cfg.NewIngester(appender, models.NewTagOptions(), - instrumentOpts, true) + instrumentOpts) require.NoError(t, err) id := newTestID(t, "__name__", "foo", "app", "bar") @@ -144,7 +144,7 @@ func TestIngestNonRetryableError(t *testing.T) { callback := m3msg.NewProtobufCallback(m, protobuf.NewAggregatedDecoder(nil), &wg) m.EXPECT().Ack() - ingester.Ingest(context.TODO(), id, ts.PromMetricTypeGauge, metricNanos, 0, val, sp, callback) + ingester.Ingest(context.TODO(), id, metricNanos, 0, val, sp, callback) for appender.cntErr() != 1 { time.Sleep(100 * time.Millisecond) diff --git a/src/cmd/services/m3coordinator/ingest/write.go b/src/cmd/services/m3coordinator/ingest/write.go index 6d131f8e15..27cd4b477a 100644 --- a/src/cmd/services/m3coordinator/ingest/write.go +++ b/src/cmd/services/m3coordinator/ingest/write.go @@ -22,7 +22,6 @@ package ingest import ( "context" - "fmt" "sync" "github.com/m3db/m3/src/cmd/services/m3coordinator/downsample" @@ -496,26 +495,14 @@ func (d *downsamplerAndWriter) writeAggregatedBatch( } for _, dp := range value.Datapoints { - if value.Attributes.PromType != ts.PromMetricTypeUnknown { - switch value.Attributes.PromType { - case ts.PromMetricTypeCounter: - err = result.SamplesAppender.AppendCounterTimedSample(dp.Timestamp, int64(dp.Value)) - default: - err = result.SamplesAppender.AppendGaugeTimedSample(dp.Timestamp, dp.Value) - } - } else { - switch value.Attributes.M3Type { - case ts.M3MetricTypeGauge: - err = result.SamplesAppender.AppendGaugeTimedSample(dp.Timestamp, dp.Value) - case ts.M3MetricTypeCounter: - err = result.SamplesAppender.AppendCounterTimedSample(dp.Timestamp, int64(dp.Value)) - case ts.M3MetricTypeTimer: - err = result.SamplesAppender.AppendTimerTimedSample(dp.Timestamp, dp.Value) - default: - err = fmt.Errorf("unknown m3type '%v'", value.Attributes.M3Type) - } + switch value.Attributes.M3Type { + case ts.M3MetricTypeGauge: + err = result.SamplesAppender.AppendGaugeTimedSample(dp.Timestamp, dp.Value) + case ts.M3MetricTypeCounter: + err = result.SamplesAppender.AppendCounterTimedSample(dp.Timestamp, int64(dp.Value)) + case ts.M3MetricTypeTimer: + err = result.SamplesAppender.AppendTimerTimedSample(dp.Timestamp, dp.Value) } - if err != nil { // If we see an error break out so we can try processing the // next datapoint. diff --git a/src/cmd/services/m3coordinator/server/m3msg/protobuf_handler.go b/src/cmd/services/m3coordinator/server/m3msg/protobuf_handler.go index 65f2d6b6c5..ffca0e0c1d 100644 --- a/src/cmd/services/m3coordinator/server/m3msg/protobuf_handler.go +++ b/src/cmd/services/m3coordinator/server/m3msg/protobuf_handler.go @@ -122,7 +122,7 @@ func (h *pbHandler) Process(msg consumer.Message) { } } - h.writeFn(h.ctx, dec.ID(), dec.Type(), dec.TimeNanos(), dec.EncodeNanos(), dec.Value(), sp, r) + h.writeFn(h.ctx, dec.ID(), dec.TimeNanos(), dec.EncodeNanos(), dec.Value(), sp, r) } func (h *pbHandler) Close() { h.wg.Wait() } diff --git a/src/cmd/services/m3coordinator/server/m3msg/protobuf_handler_test.go b/src/cmd/services/m3coordinator/server/m3msg/protobuf_handler_test.go index 1770a2b757..02e411749a 100644 --- a/src/cmd/services/m3coordinator/server/m3msg/protobuf_handler_test.go +++ b/src/cmd/services/m3coordinator/server/m3msg/protobuf_handler_test.go @@ -35,7 +35,6 @@ import ( "github.com/m3db/m3/src/msg/consumer" "github.com/m3db/m3/src/msg/generated/proto/msgpb" "github.com/m3db/m3/src/msg/protocol/proto" - "github.com/m3db/m3/src/query/ts" "github.com/m3db/m3/src/x/instrument" "github.com/m3db/m3/src/x/server" xtime "github.com/m3db/m3/src/x/time" @@ -234,7 +233,6 @@ type mockWriter struct { func (m *mockWriter) write( ctx context.Context, name []byte, - metricType ts.PromMetricType, metricNanos, encodeNanos int64, value float64, sp policy.StoragePolicy, diff --git a/src/cmd/services/m3coordinator/server/m3msg/types.go b/src/cmd/services/m3coordinator/server/m3msg/types.go index 20c20c37fe..8d3a5f96be 100644 --- a/src/cmd/services/m3coordinator/server/m3msg/types.go +++ b/src/cmd/services/m3coordinator/server/m3msg/types.go @@ -24,14 +24,12 @@ import ( "context" "github.com/m3db/m3/src/metrics/policy" - "github.com/m3db/m3/src/query/ts" ) // WriteFn is the function that writes a metric. type WriteFn func( ctx context.Context, id []byte, - metricType ts.PromMetricType, metricNanos, encodeNanos int64, value float64, sp policy.StoragePolicy, diff --git a/src/metrics/aggregation/type.go b/src/metrics/aggregation/type.go index 9716dd560d..db10e274ae 100644 --- a/src/metrics/aggregation/type.go +++ b/src/metrics/aggregation/type.go @@ -164,7 +164,7 @@ func (a Type) IsValidForGauge() bool { // IsValidForCounter if an Type is valid for Counter. func (a Type) IsValidForCounter() bool { switch a { - case Min, Max, Mean, Count, Sum, SumSq, Stdev, Last: + case Min, Max, Mean, Count, Sum, SumSq, Stdev: return true default: return false diff --git a/src/metrics/encoding/protobuf/aggregated_decoder.go b/src/metrics/encoding/protobuf/aggregated_decoder.go index a9ee58e9b8..ca9052c44c 100644 --- a/src/metrics/encoding/protobuf/aggregated_decoder.go +++ b/src/metrics/encoding/protobuf/aggregated_decoder.go @@ -23,7 +23,6 @@ package protobuf import ( "github.com/m3db/m3/src/metrics/generated/proto/metricpb" "github.com/m3db/m3/src/metrics/policy" - "github.com/m3db/m3/src/query/ts" ) // AggregatedDecoder is a decoder for decoding aggregated metrics. @@ -53,16 +52,6 @@ func (d AggregatedDecoder) ID() []byte { return d.pb.Metric.TimedMetric.Id } -// Type returns the type of the metric. -func (d *AggregatedDecoder) Type() ts.PromMetricType { - switch d.pb.Metric.TimedMetric.Type { - case metricpb.MetricType_COUNTER: - return ts.PromMetricTypeCounter - default: - return ts.PromMetricTypeGauge - } -} - // TimeNanos returns the decoded timestamp. func (d AggregatedDecoder) TimeNanos() int64 { return d.pb.Metric.TimedMetric.TimeNanos diff --git a/src/metrics/metric/aggregated/types.go b/src/metrics/metric/aggregated/types.go index e4503a0c60..4415dfa039 100644 --- a/src/metrics/metric/aggregated/types.go +++ b/src/metrics/metric/aggregated/types.go @@ -81,7 +81,6 @@ func (m Metric) String() string { // ChunkedMetric is a metric with a chunked ID. type ChunkedMetric struct { id.ChunkedID - Type metric.Type TimeNanos int64 Value float64 } diff --git a/src/query/api/v1/handler/prometheus/remote/write.go b/src/query/api/v1/handler/prometheus/remote/write.go index 7e11b60130..61ec7918de 100644 --- a/src/query/api/v1/handler/prometheus/remote/write.go +++ b/src/query/api/v1/handler/prometheus/remote/write.go @@ -93,16 +93,6 @@ var ( Attributes: ts.DefaultSeriesAttributes(), Metadata: ts.Metadata{}, } - - headerToMetricType = map[string]prompb.MetricType{ - "counter": prompb.MetricType_COUNTER, - "gauge": prompb.MetricType_GAUGE, - "gauge-histogram": prompb.MetricType_GAUGE_HISTOGRAM, - "histogram": prompb.MetricType_HISTOGRAM, - "info": prompb.MetricType_INFO, - "stateset": prompb.MetricType_STATESET, - "summary": prompb.MetricType_SUMMARY, - } ) // PromWriteHandler represents a handler for prometheus write endpoint. @@ -471,16 +461,6 @@ func (h *PromWriteHandler) parseRequest( } } - if promType := r.Header.Get(headers.PromTypeHeader); promType != "" { - tp, ok := headerToMetricType[strings.ToLower(promType)] - if !ok { - return parseRequestResult{}, fmt.Errorf("unknown prom metric type %s", promType) - } - for i := range req.Timeseries { - req.Timeseries[i].Type = tp - } - } - return parseRequestResult{ Request: &req, Options: opts, @@ -621,9 +601,7 @@ func (i *promTSIter) Next() bool { return true } - annotationPayload, err := storage.SeriesAttributesToAnnotationPayload( - i.attributes[i.idx].PromType, - i.attributes[i.idx].HandleValueResets) + annotationPayload, err := storage.SeriesAttributesToAnnotationPayload(i.attributes[i.idx]) if err != nil { i.err = err return false diff --git a/src/query/api/v1/handler/prometheus/remote/write_test.go b/src/query/api/v1/handler/prometheus/remote/write_test.go index 58b2a8d419..3f71305f35 100644 --- a/src/query/api/v1/handler/prometheus/remote/write_test.go +++ b/src/query/api/v1/handler/prometheus/remote/write_test.go @@ -86,89 +86,6 @@ func TestPromWriteParsing(t *testing.T) { require.Equal(t, ingest.WriteOptions{}, r.Options) } -func TestMetricTypeHeader(t *testing.T) { - tests := []struct { - headerValue string - expectedType prompb.MetricType - }{ - { - expectedType: prompb.MetricType_UNKNOWN, - }, - { - headerValue: "counter", - expectedType: prompb.MetricType_COUNTER, - }, - { - headerValue: "Counter", - expectedType: prompb.MetricType_COUNTER, - }, - { - headerValue: "gauge", - expectedType: prompb.MetricType_GAUGE, - }, - { - headerValue: "histogram", - expectedType: prompb.MetricType_HISTOGRAM, - }, - { - headerValue: "gauge-histogram", - expectedType: prompb.MetricType_GAUGE_HISTOGRAM, - }, - { - headerValue: "summary", - expectedType: prompb.MetricType_SUMMARY, - }, - { - headerValue: "info", - expectedType: prompb.MetricType_INFO, - }, - { - headerValue: "stateset", - expectedType: prompb.MetricType_STATESET, - }, - } - - ctrl := xtest.NewController(t) - defer ctrl.Finish() - - mockDownsamplerAndWriter := ingest.NewMockDownsamplerAndWriter(ctrl) - handlerOpts := makeOptions(mockDownsamplerAndWriter) - handler, err := NewPromWriteHandler(handlerOpts) - require.NoError(t, err) - - for _, testCase := range tests { - t.Run(testCase.headerValue, func(tt *testing.T) { - tc := testCase // nolint - promReq := test.GeneratePromWriteRequest() - promReqBody := test.GeneratePromWriteRequestBody(tt, promReq) - req := httptest.NewRequest(PromWriteHTTPMethod, PromWriteURL, promReqBody) - if tc.headerValue > "" { - req.Header.Add(headers.PromTypeHeader, tc.headerValue) - } - r, err := handler.(*PromWriteHandler).parseRequest(req) - require.NoError(tt, err) - require.Equal(tt, tc.expectedType, r.Request.Timeseries[0].Type) - }) - } -} - -func TestInvalidMetricTypeHeader(t *testing.T) { - ctrl := xtest.NewController(t) - defer ctrl.Finish() - - mockDownsamplerAndWriter := ingest.NewMockDownsamplerAndWriter(ctrl) - handlerOpts := makeOptions(mockDownsamplerAndWriter) - handler, err := NewPromWriteHandler(handlerOpts) - require.NoError(t, err) - - promReq := test.GeneratePromWriteRequest() - promReqBody := test.GeneratePromWriteRequestBody(t, promReq) - req := httptest.NewRequest(PromWriteHTTPMethod, PromWriteURL, promReqBody) - req.Header.Add(headers.PromTypeHeader, "random") - _, err = handler.(*PromWriteHandler).parseRequest(req) - require.Error(t, err) -} - func TestPromWrite(t *testing.T) { ctrl := xtest.NewController(t) defer ctrl.Finish() diff --git a/src/query/server/query.go b/src/query/server/query.go index 21cd32561b..5b3aae5121 100644 --- a/src/query/server/query.go +++ b/src/query/server/query.go @@ -550,15 +550,10 @@ func Run(runOpts RunOptions) { }() if cfg.Ingest != nil { - storeMetricsType := false - if cfg.StoreMetricsType != nil { - storeMetricsType = *cfg.StoreMetricsType - } - logger.Info("starting m3msg server", zap.String("address", cfg.Ingest.M3Msg.Server.ListenAddress)) ingester, err := cfg.Ingest.Ingester.NewIngester(backendStorage, - tagOptions, instrumentOptions, storeMetricsType) + tagOptions, instrumentOptions) if err != nil { logger.Fatal("unable to create ingester", zap.Error(err)) } diff --git a/src/query/server/query_test.go b/src/query/server/query_test.go index f79ccf05cb..d1f95e0171 100644 --- a/src/query/server/query_test.go +++ b/src/query/server/query_test.go @@ -116,8 +116,6 @@ writeWorkerPoolPolicy: size: 100 shards: 100 killProbability: 0.3 - -storeMetricsType: true ` func TestWrite(t *testing.T) { @@ -255,7 +253,7 @@ func TestIngest(t *testing.T) { gomock.Any(), 42.0, gomock.Any(), - []byte{8, 2}). + nil). Do(func(_, _, _, _, _, _, _ interface{}) { numWrites.Add(1) }) diff --git a/src/query/storage/converter.go b/src/query/storage/converter.go index 9302266006..c39d69a4c7 100644 --- a/src/query/storage/converter.go +++ b/src/query/storage/converter.go @@ -149,12 +149,11 @@ func PromTimeSeriesToSeriesAttributes(series prompb.TimeSeries) (ts.SeriesAttrib }, nil } -// SeriesAttributesToAnnotationPayload converts passed arguments into an annotation.Payload. -func SeriesAttributesToAnnotationPayload( - promType ts.PromMetricType, - handleValueResets bool) (annotation.Payload, error) { +// SeriesAttributesToAnnotationPayload converts ts.SeriesAttributes into an annotation.Payload. +func SeriesAttributesToAnnotationPayload(seriesAttributes ts.SeriesAttributes) (annotation.Payload, error) { var metricType annotation.MetricType - switch promType { + + switch seriesAttributes.PromType { case ts.PromMetricTypeUnknown: metricType = annotation.MetricType_UNKNOWN @@ -180,12 +179,12 @@ func SeriesAttributesToAnnotationPayload( metricType = annotation.MetricType_STATESET default: - return annotation.Payload{}, fmt.Errorf("invalid Prometheus metric type %v", promType) + return annotation.Payload{}, fmt.Errorf("invalid Prometheus metric type %v", seriesAttributes.PromType) } return annotation.Payload{ MetricType: metricType, - HandleValueResets: handleValueResets, + HandleValueResets: seriesAttributes.HandleValueResets, }, nil } diff --git a/src/query/storage/converter_test.go b/src/query/storage/converter_test.go index 88a68ef72e..ca808ab4f5 100644 --- a/src/query/storage/converter_test.go +++ b/src/query/storage/converter_test.go @@ -385,19 +385,19 @@ func TestSeriesAttributesToAnnotationPayload(t *testing.T) { } for promType, expected := range mapping { - payload, err := SeriesAttributesToAnnotationPayload(promType, false) + payload, err := SeriesAttributesToAnnotationPayload(ts.SeriesAttributes{PromType: promType}) require.NoError(t, err) assert.Equal(t, expected, payload.MetricType) } - _, err := SeriesAttributesToAnnotationPayload(math.MaxUint8, false) + _, err := SeriesAttributesToAnnotationPayload(ts.SeriesAttributes{PromType: math.MaxUint8}) require.Error(t, err) - payload, err := SeriesAttributesToAnnotationPayload(0, true) + payload, err := SeriesAttributesToAnnotationPayload(ts.SeriesAttributes{HandleValueResets: true}) require.NoError(t, err) assert.True(t, payload.HandleValueResets) - payload, err = SeriesAttributesToAnnotationPayload(0, false) + payload, err = SeriesAttributesToAnnotationPayload(ts.SeriesAttributes{HandleValueResets: false}) require.NoError(t, err) assert.False(t, payload.HandleValueResets) } diff --git a/src/x/headers/headers.go b/src/x/headers/headers.go index 579ac13080..65a4a74fb8 100644 --- a/src/x/headers/headers.go +++ b/src/x/headers/headers.go @@ -43,11 +43,6 @@ const ( // Valid values are "unaggregated" or "aggregated". MetricsTypeHeader = M3HeaderPrefix + "Metrics-Type" - // PromTypeHeader sets the prometheus metric type. Valid values are - // "counter", "gauge", etc. (see src/query/api/v1/handler/prometheus/remote/write.go - // field `headerToMetricType`) - PromTypeHeader = M3HeaderPrefix + "Prom-Type" - // WriteTypeHeader is a header that controls if default // writes should be written to both unaggregated and aggregated // namespaces, or if unaggregated values are skipped and