From f4c26947ba41aee0fbe7b978f0a42bab6a58e5c0 Mon Sep 17 00:00:00 2001 From: arnikola Date: Thu, 30 May 2019 15:09:19 -0400 Subject: [PATCH] [dbnode] Add forward index write capability (#1613) --- src/dbnode/client/types.go | 2 +- src/dbnode/storage/forward_index_dice.go | 102 ++++ src/dbnode/storage/forward_index_dice_test.go | 157 +++++ src/dbnode/storage/index.go | 57 +- src/dbnode/storage/index/block_bench_test.go | 13 +- src/dbnode/storage/index/index_mock.go | 26 + src/dbnode/storage/index/types.go | 22 + .../storage/index_queue_forward_write_test.go | 572 ++++++++++++++++++ src/dbnode/storage/series/truncate_type.go | 2 +- src/x/dice/dice.go | 61 ++ src/x/dice/dice_test.go | 46 ++ 11 files changed, 1043 insertions(+), 17 deletions(-) create mode 100644 src/dbnode/storage/forward_index_dice.go create mode 100644 src/dbnode/storage/forward_index_dice_test.go create mode 100644 src/dbnode/storage/index_queue_forward_write_test.go create mode 100644 src/x/dice/dice.go create mode 100644 src/x/dice/dice_test.go diff --git a/src/dbnode/client/types.go b/src/dbnode/client/types.go index 1059101e86..206c160b85 100644 --- a/src/dbnode/client/types.go +++ b/src/dbnode/client/types.go @@ -32,12 +32,12 @@ import ( "github.com/m3db/m3/src/dbnode/storage/index" "github.com/m3db/m3/src/dbnode/namespace" "github.com/m3db/m3/src/dbnode/topology" - "github.com/m3db/m3/src/x/serialize" "github.com/m3db/m3/src/x/context" "github.com/m3db/m3/src/x/ident" "github.com/m3db/m3/src/x/instrument" "github.com/m3db/m3/src/x/pool" xretry "github.com/m3db/m3/src/x/retry" + "github.com/m3db/m3/src/x/serialize" xtime "github.com/m3db/m3/src/x/time" tchannel "github.com/uber/tchannel-go" diff --git a/src/dbnode/storage/forward_index_dice.go b/src/dbnode/storage/forward_index_dice.go new file mode 100644 index 0000000000..30ba4061aa --- /dev/null +++ b/src/dbnode/storage/forward_index_dice.go @@ -0,0 +1,102 @@ +// Copyright (c) 2019 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package storage + +import ( + "fmt" + "time" + + "github.com/m3db/m3/src/x/dice" +) + +// forwardIndexDice is a die roll that adds a chance for incoming index writes +// arriving near a block boundary to be duplicated and written to the next block +// index, adding jitter and smoothing index load so that block boundaries do not +// cause a huge influx of new documents that all need to be indexed at once. +type forwardIndexDice struct { + enabled bool + blockSize time.Duration + + forwardIndexThreshold time.Duration + forwardIndexDice dice.Dice +} + +func newForwardIndexDice( + opts Options, +) (forwardIndexDice, error) { + var ( + indexOpts = opts.IndexOptions() + seriesOpts = opts.SeriesOptions() + + probability = indexOpts.ForwardIndexProbability() + ) + + // NB: if not enabled, return a no-op forward index dice. + if probability == 0 { + return forwardIndexDice{}, nil + } + + var ( + threshold = indexOpts.ForwardIndexThreshold() + + retention = seriesOpts.RetentionOptions() + bufferFuture = retention.BufferFuture() + blockSize = retention.BlockSize() + + forwardIndexThreshold time.Duration + ) + + if threshold < 0 || threshold > 1 { + return forwardIndexDice{}, + fmt.Errorf("invalid forward write threshold %f", threshold) + } + + bufferFragment := float64(bufferFuture) * threshold + forwardIndexThreshold = blockSize - time.Duration(bufferFragment) + + dice, err := dice.NewDice(probability) + if err != nil { + return forwardIndexDice{}, + fmt.Errorf("cannot create forward write dice: %s", err) + } + + return forwardIndexDice{ + enabled: true, + blockSize: blockSize, + + forwardIndexThreshold: forwardIndexThreshold, + forwardIndexDice: dice, + }, nil +} + +// roll decides if a timestamp is eligible for forward index writes. +func (o *forwardIndexDice) roll(timestamp time.Time) bool { + if !o.enabled { + return false + } + + threshold := timestamp.Truncate(o.blockSize).Add(o.forwardIndexThreshold) + if !timestamp.Before(threshold) { + return o.forwardIndexDice.Roll() + } + + return false +} diff --git a/src/dbnode/storage/forward_index_dice_test.go b/src/dbnode/storage/forward_index_dice_test.go new file mode 100644 index 0000000000..0f184b9d48 --- /dev/null +++ b/src/dbnode/storage/forward_index_dice_test.go @@ -0,0 +1,157 @@ +// Copyright (c) 2019 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package storage + +import ( + "testing" + "time" + + "github.com/m3db/m3/src/dbnode/retention" + + "github.com/stretchr/testify/require" +) + +func optionsWithIndexValues( + indexProb float64, + indexThreshold float64, + retOpts retention.Options, +) Options { + opts := DefaultTestOptions() + + idxOpts := opts.IndexOptions() + idxOpts = idxOpts. + SetForwardIndexProbability(indexProb). + SetForwardIndexThreshold(indexThreshold) + + opts = DefaultTestOptions(). + SetIndexOptions(idxOpts) + + if retOpts != nil { + seriesOpts := opts.SeriesOptions(). + SetRetentionOptions(retOpts) + return opts.SetSeriesOptions(seriesOpts) + } + + return opts +} + +func TestDisabledForwardIndexDice(t *testing.T) { + opts := optionsWithIndexValues(0, 0, nil) + dice, err := newForwardIndexDice(opts) + require.NoError(t, err) + require.False(t, dice.enabled) + + start := time.Now().Truncate(time.Hour) + end := start.Add(time.Hour) + + for ts := start; ts.Before(end); ts = ts.Add(time.Second) { + require.False(t, dice.roll(ts)) + } +} + +func TestInvalidForwardIndexDice(t *testing.T) { + // Index probability < 0 and > 1 cases. + invalidIndexProbabilities := []float64{-10, 10} + for _, prob := range invalidIndexProbabilities { + opts := optionsWithIndexValues(prob, 0, nil) + _, err := newForwardIndexDice(opts) + require.Error(t, err) + } + + // Index threshold < 0 and > 1 cases. + invalidIndexThresholds := []float64{-10, 10} + for _, threshold := range invalidIndexThresholds { + opts := optionsWithIndexValues(0.5, threshold, nil) + _, err := newForwardIndexDice(opts) + require.Error(t, err) + } +} + +func TestAlwaysOnForwardIndexDice(t *testing.T) { + retOpts := retention.NewOptions(). + SetBlockSize(time.Hour). + SetBufferFuture(time.Minute * 10) + opts := optionsWithIndexValues(1, 0.9, retOpts) + + dice, err := newForwardIndexDice(opts) + require.NoError(t, err) + require.True(t, dice.enabled) + + var ( + start = time.Now().Truncate(time.Hour) + threshold = start.Add(time.Minute * 51) + end = start.Add(time.Hour) + ) + + for ts := start; ts.Before(end); ts = ts.Add(time.Second) { + indexing := dice.roll(ts) + if ts.Before(threshold) { + require.False(t, indexing) + } else { + require.True(t, indexing) + } + } +} + +type trackingDice struct { + rolled int + success int +} + +func (d *trackingDice) Rate() float64 { return 0 } +func (d *trackingDice) Roll() bool { + d.rolled++ + return d.rolled%d.success == 0 +} + +func TestCustomDice(t *testing.T) { + d := &trackingDice{ + success: 10, + } + + dice := forwardIndexDice{ + enabled: true, + blockSize: time.Hour, + + forwardIndexThreshold: time.Minute * 50, + forwardIndexDice: d, + } + + var ( + start = time.Now().Truncate(time.Hour) + threshold = start.Add(time.Minute * 50) + end = start.Add(time.Hour) + ) + + sample := 0 + for ts := start; ts.Before(end); ts = ts.Add(time.Second) { + indexing := dice.roll(ts) + + if ts.Before(threshold) { + require.False(t, indexing) + } else { + sample++ + require.Equal(t, sample%10 == 0, indexing) + } + } + + require.Equal(t, 600, d.rolled) +} diff --git a/src/dbnode/storage/index.go b/src/dbnode/storage/index.go index 04b5336f0f..4f9ac49f29 100644 --- a/src/dbnode/storage/index.go +++ b/src/dbnode/storage/index.go @@ -29,6 +29,7 @@ import ( "time" "github.com/m3db/m3/src/dbnode/clock" + "github.com/m3db/m3/src/dbnode/namespace" "github.com/m3db/m3/src/dbnode/persist" "github.com/m3db/m3/src/dbnode/persist/fs" "github.com/m3db/m3/src/dbnode/retention" @@ -39,7 +40,6 @@ import ( "github.com/m3db/m3/src/dbnode/storage/index" "github.com/m3db/m3/src/dbnode/storage/index/compaction" "github.com/m3db/m3/src/dbnode/storage/index/convert" - "github.com/m3db/m3/src/dbnode/namespace" "github.com/m3db/m3/src/m3ninx/doc" "github.com/m3db/m3/src/m3ninx/idx" m3ninxindex "github.com/m3db/m3/src/m3ninx/index" @@ -113,6 +113,10 @@ type nsIndex struct { queriesWg sync.WaitGroup metrics nsIndexMetrics + + // forwardIndexDice determines if an incoming index write should be dual + // written to the next block. + forwardIndexDice forwardIndexDice } type nsIndexState struct { @@ -292,6 +296,14 @@ func newNamespaceIndexWithOptions( idx.runtimeOptsListener = runtimeOptsMgr.RegisterListener(idx) } + // set up forward index dice. + dice, err := newForwardIndexDice(newIndexOpts.opts) + if err != nil { + return nil, err + } + + idx.forwardIndexDice = dice + // allocate indexing queue and start it up. queue := newIndexQueueFn(idx.writeBatches, nsMD, nowFn, scope) if err := queue.Start(); err != nil { @@ -302,7 +314,7 @@ func newNamespaceIndexWithOptions( // allocate the current block to ensure we're able to index as soon as we return currentBlock := nowFn().Truncate(idx.blockSize) idx.state.RLock() - _, err := idx.ensureBlockPresentWithRLock(currentBlock) + _, err = idx.ensureBlockPresentWithRLock(currentBlock) idx.state.RUnlock() if err != nil { return nil, err @@ -501,31 +513,60 @@ func (i *nsIndex) writeBatches( // on the provided inserts to terminate quicker during shutdown. return } - now := i.nowFn() - futureLimit := now.Add(1 * i.bufferFuture) - pastLimit := now.Add(-1 * i.bufferPast) + var ( + now = i.nowFn() + blockSize = i.blockSize + futureLimit = now.Add(1 * i.bufferFuture) + pastLimit = now.Add(-1 * i.bufferPast) + batchOptions = batch.Options() + forwardIndexDice = i.forwardIndexDice + forwardIndexEnabled = forwardIndexDice.enabled + + forwardIndexBatch *index.WriteBatch + ) // NB(r): Release lock early to avoid writing batches impacting ticking // speed, etc. // Sometimes foreground compaction can take a long time during heavy inserts. // Each lookup to ensureBlockPresent checks that index is still open, etc. i.state.RUnlock() + if forwardIndexEnabled { + // NB(arnikola): Don't initialize forward index batch if forward indexing + // is not enabled. + forwardIndexBatch = index.NewWriteBatch(batchOptions) + } // Ensure timestamp is not too old/new based on retention policies and that - // doc is valid. + // doc is valid. Add potential forward writes to the forwardWriteBatch. batch.ForEach( func(idx int, entry index.WriteBatchEntry, d doc.Document, _ index.WriteBatchEntryResult) { - if !futureLimit.After(entry.Timestamp) { + ts := entry.Timestamp + if !futureLimit.After(ts) { batch.MarkUnmarkedEntryError(m3dberrors.ErrTooFuture, idx) return } - if !entry.Timestamp.After(pastLimit) { + if !ts.After(pastLimit) { batch.MarkUnmarkedEntryError(m3dberrors.ErrTooPast, idx) return } + + if forwardIndexDice.roll(ts) { + forwardEntryTimestamp := ts.Truncate(blockSize).Add(blockSize) + xNanoTimestamp := xtime.ToUnixNano(forwardEntryTimestamp) + if entry.OnIndexSeries.NeedsIndexUpdate(xNanoTimestamp) { + forwardIndexEntry := entry + forwardIndexEntry.Timestamp = forwardEntryTimestamp + forwardIndexEntry.OnIndexSeries.OnIndexPrepare() + forwardIndexBatch.Append(forwardIndexEntry, d) + } + } }) + if forwardIndexEnabled && forwardIndexBatch.Len() > 0 { + batch.AppendAll(forwardIndexBatch) + } + // Sort the inserts by which block they're applicable for, and do the inserts // for each block, making sure to not try to insert any entries already marked // with a result. diff --git a/src/dbnode/storage/index/block_bench_test.go b/src/dbnode/storage/index/block_bench_test.go index c280e044e5..c3d8eb20bf 100644 --- a/src/dbnode/storage/index/block_bench_test.go +++ b/src/dbnode/storage/index/block_bench_test.go @@ -112,14 +112,13 @@ func BenchmarkBlockWrite(b *testing.B) { // mockOnIndexSeries is a by hand generated struct since using the // gomock generated ones is really slow so makes them almost // useless to use in benchmarks -type mockOnIndexSeries struct { -} +type mockOnIndexSeries struct{} var _ OnIndexSeries = mockOnIndexSeries{} -func (m mockOnIndexSeries) OnIndexSuccess(blockStart xtime.UnixNano) { - -} -func (m mockOnIndexSeries) OnIndexFinalize(blockStart xtime.UnixNano) { - +func (m mockOnIndexSeries) OnIndexSuccess(blockStart xtime.UnixNano) {} +func (m mockOnIndexSeries) OnIndexFinalize(blockStart xtime.UnixNano) {} +func (m mockOnIndexSeries) OnIndexPrepare() {} +func (m mockOnIndexSeries) NeedsIndexUpdate(indexBlockStartForWrite xtime.UnixNano) bool { + return false } diff --git a/src/dbnode/storage/index/index_mock.go b/src/dbnode/storage/index/index_mock.go index a09657e6ca..9fec0aeb9c 100644 --- a/src/dbnode/storage/index/index_mock.go +++ b/src/dbnode/storage/index/index_mock.go @@ -602,6 +602,32 @@ func (mr *MockOnIndexSeriesMockRecorder) OnIndexFinalize(blockStart interface{}) return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "OnIndexFinalize", reflect.TypeOf((*MockOnIndexSeries)(nil).OnIndexFinalize), blockStart) } +// OnIndexPrepare mocks base method +func (m *MockOnIndexSeries) OnIndexPrepare() { + m.ctrl.T.Helper() + m.ctrl.Call(m, "OnIndexPrepare") +} + +// OnIndexPrepare indicates an expected call of OnIndexPrepare +func (mr *MockOnIndexSeriesMockRecorder) OnIndexPrepare() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "OnIndexPrepare", reflect.TypeOf((*MockOnIndexSeries)(nil).OnIndexPrepare)) +} + +// NeedsIndexUpdate mocks base method +func (m *MockOnIndexSeries) NeedsIndexUpdate(indexBlockStartForWrite time0.UnixNano) bool { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "NeedsIndexUpdate", indexBlockStartForWrite) + ret0, _ := ret[0].(bool) + return ret0 +} + +// NeedsIndexUpdate indicates an expected call of NeedsIndexUpdate +func (mr *MockOnIndexSeriesMockRecorder) NeedsIndexUpdate(indexBlockStartForWrite interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "NeedsIndexUpdate", reflect.TypeOf((*MockOnIndexSeries)(nil).NeedsIndexUpdate), indexBlockStartForWrite) +} + // MockBlock is a mock of Block interface type MockBlock struct { ctrl *gomock.Controller diff --git a/src/dbnode/storage/index/types.go b/src/dbnode/storage/index/types.go index c6a3a1be68..2bab2eaa30 100644 --- a/src/dbnode/storage/index/types.go +++ b/src/dbnode/storage/index/types.go @@ -269,6 +269,23 @@ type OnIndexSeries interface { // during the course of indexing. `blockStart` is the startTime of the index // block for which the write was attempted. OnIndexFinalize(blockStart xtime.UnixNano) + + // OnIndexPrepare prepares the Entry to be handed off to the indexing sub-system. + // NB(prateek): we retain the ref count on the entry while the indexing is pending, + // the callback executed on the entry once the indexing is completed releases this + // reference. + OnIndexPrepare() + + // NeedsIndexUpdate returns a bool to indicate if the Entry needs to be indexed + // for the provided blockStart. It only allows a single index attempt at a time + // for a single entry. + // NB(prateek): NeedsIndexUpdate is a CAS, i.e. when this method returns true, it + // also sets state on the entry to indicate that a write for the given blockStart + // is going to be sent to the index, and other go routines should not attempt the + // same write. Callers are expected to ensure they follow this guideline. + // Further, every call to NeedsIndexUpdate which returns true needs to have a corresponding + // OnIndexFinalze() call. This is required for correct lifecycle maintenance. + NeedsIndexUpdate(indexBlockStartForWrite xtime.UnixNano) bool } // Block represents a collection of segments. Each `Block` is a complete reverse @@ -435,6 +452,11 @@ func (b *WriteBatch) Append( b.appendWithResult(entry, doc, &entry.resultVal) } +// Options returns the WriteBatchOptions for this batch. +func (b *WriteBatch) Options() WriteBatchOptions { + return b.opts +} + // AppendAll appends all entries from another batch to this batch // and ensures they share the same result struct. func (b *WriteBatch) AppendAll(from *WriteBatch) { diff --git a/src/dbnode/storage/index_queue_forward_write_test.go b/src/dbnode/storage/index_queue_forward_write_test.go new file mode 100644 index 0000000000..a4cf8956dd --- /dev/null +++ b/src/dbnode/storage/index_queue_forward_write_test.go @@ -0,0 +1,572 @@ +// Copyright (c) 2019 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package storage + +import ( + "errors" + "fmt" + "testing" + "time" + + "github.com/m3db/m3/src/dbnode/clock" + "github.com/m3db/m3/src/dbnode/namespace" + "github.com/m3db/m3/src/dbnode/runtime" + "github.com/m3db/m3/src/dbnode/storage/index" + "github.com/m3db/m3/src/dbnode/storage/series" + xmetrics "github.com/m3db/m3/src/dbnode/x/metrics" + "github.com/m3db/m3/src/m3ninx/doc" + m3ninxidx "github.com/m3db/m3/src/m3ninx/idx" + xclock "github.com/m3db/m3/src/x/clock" + "github.com/m3db/m3/src/x/context" + "github.com/m3db/m3/src/x/ident" + xtest "github.com/m3db/m3/src/x/test" + xtime "github.com/m3db/m3/src/x/time" + + "github.com/fortytw2/leaktest" + "github.com/golang/mock/gomock" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/uber-go/tally" +) + +func generateOptionsNowAndBlockSize() (Options, time.Time, time.Duration) { + idxOpts := testNamespaceIndexOptions(). + SetInsertMode(index.InsertSync). + SetForwardIndexProbability(1). + SetForwardIndexThreshold(1) + + opts := DefaultTestOptions(). + SetIndexOptions(idxOpts) + + var ( + retOpts = opts.SeriesOptions().RetentionOptions() + blockSize = retOpts.BlockSize() + bufferFuture = retOpts.BufferFuture() + bufferFragment = blockSize - time.Duration(float64(bufferFuture)*0.5) + now = time.Now().Truncate(blockSize).Add(bufferFragment) + + clockOptions = opts.ClockOptions() + ) + + clockOptions = clockOptions.SetNowFn(func() time.Time { return now }) + opts = opts.SetClockOptions(clockOptions) + + return opts, now, blockSize +} + +func setupForwardIndex( + t *testing.T, + ctrl *gomock.Controller, +) (namespaceIndex, time.Time, time.Duration) { + newFn := func( + fn nsIndexInsertBatchFn, + md namespace.Metadata, + nowFn clock.NowFn, + s tally.Scope, + ) namespaceIndexInsertQueue { + q := newNamespaceIndexInsertQueue(fn, md, nowFn, s) + q.(*nsIndexInsertQueue).indexBatchBackoff = 10 * time.Millisecond + return q + } + + md, err := namespace.NewMetadata(defaultTestNs1ID, defaultTestNs1Opts) + require.NoError(t, err) + + opts, now, blockSize := generateOptionsNowAndBlockSize() + idx, err := newNamespaceIndexWithInsertQueueFn(md, newFn, opts) + assert.NoError(t, err) + + var ( + ts = idx.(*nsIndex).state.latestBlock.StartTime() + nextTs = ts.Add(blockSize) + next = ts.Truncate(blockSize).Add(blockSize) + id = ident.StringID("foo") + tags = ident.NewTags( + ident.StringTag("name", "value"), + ) + lifecycle = index.NewMockOnIndexSeries(ctrl) + ) + + gomock.InOrder( + lifecycle.EXPECT().NeedsIndexUpdate(xtime.ToUnixNano(next)).Return(true), + lifecycle.EXPECT().OnIndexPrepare(), + + lifecycle.EXPECT().OnIndexSuccess(xtime.ToUnixNano(ts)), + lifecycle.EXPECT().OnIndexFinalize(xtime.ToUnixNano(ts)), + + lifecycle.EXPECT().OnIndexSuccess(xtime.ToUnixNano(nextTs)), + lifecycle.EXPECT().OnIndexFinalize(xtime.ToUnixNano(nextTs)), + ) + + entry, doc := testWriteBatchEntry(id, tags, now, lifecycle) + batch := testWriteBatch(entry, doc, testWriteBatchBlockSizeOption(blockSize)) + assert.NoError(t, idx.WriteBatch(batch)) + + return idx, now, blockSize +} + +func TestNamespaceForwardIndexInsertQuery(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + defer leaktest.CheckTimeout(t, 2*time.Second)() + + ctx := context.NewContext() + idx, now, blockSize := setupForwardIndex(t, ctrl) + defer idx.Close() + + reQuery, err := m3ninxidx.NewRegexpQuery([]byte("name"), []byte("val.*")) + assert.NoError(t, err) + + // NB: query both the current and the next index block to ensure that the + // write was correctly indexed to both. + nextBlockTime := now.Add(blockSize) + queryTimes := []time.Time{now, nextBlockTime} + for _, ts := range queryTimes { + res, err := idx.Query(ctx, index.Query{Query: reQuery}, index.QueryOptions{ + StartInclusive: ts.Add(-1 * time.Minute), + EndExclusive: ts.Add(1 * time.Minute), + }) + require.NoError(t, err) + + assert.True(t, res.Exhaustive) + results := res.Results + assert.Equal(t, "testns1", results.Namespace().String()) + + tags, ok := results.Map().Get(ident.StringID("foo")) + assert.True(t, ok) + assert.True(t, ident.NewTagIterMatcher( + ident.MustNewTagStringsIterator("name", "value")).Matches( + ident.NewTagsIterator(tags))) + } +} + +func TestNamespaceForwardIndexAggregateQuery(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + defer leaktest.CheckTimeout(t, 2*time.Second)() + + ctx := context.NewContext() + idx, now, blockSize := setupForwardIndex(t, ctrl) + defer idx.Close() + + reQuery, err := m3ninxidx.NewRegexpQuery([]byte("name"), []byte("val.*")) + assert.NoError(t, err) + + // NB: query both the current and the next index block to ensure that the + // write was correctly indexed to both. + nextBlockTime := now.Add(blockSize) + queryTimes := []time.Time{now, nextBlockTime} + for _, ts := range queryTimes { + res, err := idx.AggregateQuery(ctx, index.Query{Query: reQuery}, + index.AggregationOptions{ + QueryOptions: index.QueryOptions{ + StartInclusive: ts.Add(-1 * time.Minute), + EndExclusive: ts.Add(1 * time.Minute), + }, + }, + ) + require.NoError(t, err) + + assert.True(t, res.Exhaustive) + results := res.Results + assert.Equal(t, "testns1", results.Namespace().String()) + + rMap := results.Map() + require.Equal(t, 1, rMap.Len()) + seenIters, found := rMap.Get(ident.StringID("name")) + require.True(t, found) + + vMap := seenIters.Map() + require.Equal(t, 1, vMap.Len()) + assert.True(t, vMap.Contains(ident.StringID("value"))) + } +} + +func setupMockBlock( + t *testing.T, + bl *index.MockBlock, + ts time.Time, + id ident.ID, + tag ident.Tag, + lifecycle index.OnIndexSeries, +) { + bl.EXPECT(). + WriteBatch(gomock.Any()). + Return(index.WriteBatchResult{}, nil). + Do(func(batch *index.WriteBatch) { + docs := batch.PendingDocs() + require.Equal(t, 1, len(docs)) + require.Equal(t, doc.Document{ + ID: id.Bytes(), + Fields: doc.Fields{{Name: tag.Name.Bytes(), Value: tag.Value.Bytes()}}, + }, docs[0]) + entries := batch.PendingEntries() + require.Equal(t, 1, len(entries)) + require.True(t, entries[0].Timestamp.Equal(ts)) + require.True(t, entries[0].OnIndexSeries == lifecycle) // Just ptr equality + }) +} + +func createMockBlocks( + ctrl *gomock.Controller, + blockStart time.Time, + nextBlockStart time.Time, +) (*index.MockBlock, *index.MockBlock, newBlockFn) { + mockBlock := index.NewMockBlock(ctrl) + mockBlock.EXPECT().Stats(gomock.Any()).Return(nil).AnyTimes() + mockBlock.EXPECT().Close().Return(nil) + mockBlock.EXPECT().StartTime().Return(blockStart).AnyTimes() + + futureBlock := index.NewMockBlock(ctrl) + futureBlock.EXPECT().Stats(gomock.Any()).Return(nil).AnyTimes() + futureBlock.EXPECT().Close().Return(nil) + futureBlock.EXPECT().StartTime().Return(nextBlockStart).AnyTimes() + + var madeBlock, madeFuture bool + newBlockFn := func( + ts time.Time, + md namespace.Metadata, + _ index.BlockOptions, + io index.Options, + ) (index.Block, error) { + if ts.Equal(blockStart) { + if madeBlock { + return mockBlock, errors.New("already created initial block") + } + madeBlock = true + return mockBlock, nil + } else if ts.Equal(nextBlockStart) { + if madeFuture { + return nil, errors.New("already created forward block") + } + madeFuture = true + return futureBlock, nil + } + return nil, fmt.Errorf("no block starting at %s; must start at %s or %s", + ts, blockStart, nextBlockStart) + } + + return mockBlock, futureBlock, newBlockFn +} + +func TestNamespaceIndexForwardWrite(t *testing.T) { + ctrl := gomock.NewController(xtest.Reporter{T: t}) + defer ctrl.Finish() + + opts, now, blockSize := generateOptionsNowAndBlockSize() + blockStart := now.Truncate(blockSize) + futureStart := blockStart.Add(blockSize) + mockBlock, futureBlock, newBlockFn := createMockBlocks(ctrl, blockStart, futureStart) + + md := testNamespaceMetadata(blockSize, 4*time.Hour) + idx, err := newNamespaceIndexWithNewBlockFn(md, newBlockFn, opts) + require.NoError(t, err) + + defer func() { + require.NoError(t, idx.Close()) + }() + + id := ident.StringID("foo") + tag := ident.StringTag("name", "value") + tags := ident.NewTags(tag) + lifecycle := index.NewMockOnIndexSeries(ctrl) + + var ( + ts = idx.(*nsIndex).state.latestBlock.StartTime() + next = ts.Truncate(blockSize).Add(blockSize) + ) + + lifecycle.EXPECT().NeedsIndexUpdate(xtime.ToUnixNano(next)).Return(true) + lifecycle.EXPECT().OnIndexPrepare() + + setupMockBlock(t, mockBlock, now, id, tag, lifecycle) + setupMockBlock(t, futureBlock, futureStart, id, tag, lifecycle) + + batch := index.NewWriteBatch(index.WriteBatchOptions{ + IndexBlockSize: blockSize, + }) + batch.Append(testWriteBatchEntry(id, tags, now, lifecycle)) + require.NoError(t, idx.WriteBatch(batch)) +} + +func TestNamespaceIndexForwardWriteCreatesBlock(t *testing.T) { + ctrl := gomock.NewController(xtest.Reporter{T: t}) + defer ctrl.Finish() + + opts, now, blockSize := generateOptionsNowAndBlockSize() + blockStart := now.Truncate(blockSize) + futureStart := blockStart.Add(blockSize) + mockBlock, futureBlock, newBlockFn := createMockBlocks(ctrl, blockStart, futureStart) + + md := testNamespaceMetadata(blockSize, 4*time.Hour) + idx, err := newNamespaceIndexWithNewBlockFn(md, newBlockFn, opts) + require.NoError(t, err) + + defer func() { + require.NoError(t, idx.Close()) + }() + + id := ident.StringID("foo") + tag := ident.StringTag("name", "value") + tags := ident.NewTags(tag) + lifecycle := index.NewMockOnIndexSeries(ctrl) + + var ( + ts = idx.(*nsIndex).state.latestBlock.StartTime() + next = ts.Truncate(blockSize).Add(blockSize) + ) + + lifecycle.EXPECT().NeedsIndexUpdate(xtime.ToUnixNano(next)).Return(true) + lifecycle.EXPECT().OnIndexPrepare() + + setupMockBlock(t, mockBlock, now, id, tag, lifecycle) + setupMockBlock(t, futureBlock, futureStart, id, tag, lifecycle) + + entry, doc := testWriteBatchEntry(id, tags, now, lifecycle) + batch := testWriteBatch(entry, doc, testWriteBatchBlockSizeOption(blockSize)) + require.NoError(t, idx.WriteBatch(batch)) +} + +func TestShardForwardWriteTaggedSyncRefCountSyncIndex(t *testing.T) { + testShardForwardWriteTaggedRefCountIndex(t, index.InsertSync, false) +} + +func TestShardForwardWriteTaggedAsyncRefCountSyncIndex(t *testing.T) { + testShardForwardWriteTaggedRefCountIndex(t, index.InsertAsync, true) +} + +func testShardForwardWriteTaggedRefCountIndex( + t *testing.T, + syncType index.InsertMode, + async bool, +) { + defer leaktest.CheckTimeout(t, 10*time.Second)() + newFn := func( + fn nsIndexInsertBatchFn, + md namespace.Metadata, + nowFn clock.NowFn, + s tally.Scope, + ) namespaceIndexInsertQueue { + q := newNamespaceIndexInsertQueue(fn, md, nowFn, s) + q.(*nsIndexInsertQueue).indexBatchBackoff = 10 * time.Millisecond + return q + } + md, err := namespace.NewMetadata(defaultTestNs1ID, defaultTestNs1Opts) + require.NoError(t, err) + + opts, now, blockSize := generateOptionsNowAndBlockSize() + opts = opts.SetIndexOptions(opts.IndexOptions().SetInsertMode(syncType)) + + idx, err := newNamespaceIndexWithInsertQueueFn(md, newFn, opts) + assert.NoError(t, err) + + defer func() { + assert.NoError(t, idx.Close()) + }() + + next := now.Truncate(blockSize).Add(blockSize) + if async { + testShardForwardWriteTaggedAsyncRefCount(t, now, next, idx, opts) + } else { + testShardForwardWriteTaggedSyncRefCount(t, now, next, idx, opts) + } +} + +func writeToShard( + ctx context.Context, + t *testing.T, + shard *dbShard, + now time.Time, + id string, + shouldWrite bool, +) { + tag := ident.Tag{Name: ident.StringID(id), Value: ident.StringID("")} + idTags := ident.NewTags(tag) + iter := ident.NewTagsIterator(idTags) + _, wasWritten, err := shard.WriteTagged(ctx, ident.StringID(id), iter, now, + 1.0, xtime.Second, nil, series.WriteOptions{ + TruncateType: series.TypeBlock, + TransformOptions: series.WriteTransformOptions{ + ForceValueEnabled: true, + ForceValue: 1, + }, + }) + assert.NoError(t, err) + assert.Equal(t, shouldWrite, wasWritten) +} + +func verifyShard( + ctx context.Context, + t *testing.T, + idx namespaceIndex, + now time.Time, + next time.Time, + id string, +) { + query := m3ninxidx.NewFieldQuery([]byte(id)) + // check current index block for series + res, err := idx.Query(ctx, index.Query{Query: query}, index.QueryOptions{ + StartInclusive: now, + EndExclusive: next, + }) + require.NoError(t, err) + require.Equal(t, 1, res.Results.Size()) + + // check next index block for series + res, err = idx.Query(ctx, index.Query{Query: query}, index.QueryOptions{ + StartInclusive: next.Add(1 * time.Minute), + EndExclusive: next.Add(5 * time.Minute), + }) + require.NoError(t, err) + require.Equal(t, 1, res.Results.Size()) + + // check accross both index blocks to ensure only a single ID is returned. + res, err = idx.Query(ctx, index.Query{Query: query}, index.QueryOptions{ + StartInclusive: now, + EndExclusive: next.Add(5 * time.Minute), + }) + require.NoError(t, err) + require.Equal(t, 1, res.Results.Size()) +} + +func writeToShardAndVerify( + ctx context.Context, + t *testing.T, + shard *dbShard, + idx namespaceIndex, + now time.Time, + next time.Time, + id string, + shouldWrite bool, +) { + writeToShard(ctx, t, shard, now, id, shouldWrite) + verifyShard(ctx, t, idx, now, next, id) +} + +func testShardForwardWriteTaggedSyncRefCount( + t *testing.T, + now time.Time, + next time.Time, + idx namespaceIndex, + opts Options, +) { + shard := testDatabaseShardWithIndexFn(t, opts, idx) + shard.SetRuntimeOptions(runtime.NewOptions(). + SetWriteNewSeriesAsync(false)) + defer shard.Close() + + ctx := context.NewContext() + defer ctx.Close() + + writeToShardAndVerify(ctx, t, shard, idx, now, next, "foo", true) + writeToShardAndVerify(ctx, t, shard, idx, now, next, "bar", true) + writeToShardAndVerify(ctx, t, shard, idx, now, next, "baz", true) + + // ensure all entries have no references left + for _, id := range []string{"foo", "bar", "baz"} { + shard.Lock() + entry, _, err := shard.lookupEntryWithLock(ident.StringID(id)) + shard.Unlock() + assert.NoError(t, err) + assert.Equal(t, int32(0), entry.ReaderWriterCount(), id) + } + + // move the time the point is written to ensure truncation works. + now = now.Add(1) + // write already inserted series + writeToShardAndVerify(ctx, t, shard, idx, now, next, "foo", false) + writeToShardAndVerify(ctx, t, shard, idx, now, next, "bar", false) + writeToShardAndVerify(ctx, t, shard, idx, now, next, "baz", false) + + // // ensure all entries have no references left + for _, id := range []string{"foo", "bar", "baz"} { + shard.Lock() + entry, _, err := shard.lookupEntryWithLock(ident.StringID(id)) + shard.Unlock() + assert.NoError(t, err) + assert.Equal(t, int32(0), entry.ReaderWriterCount(), id) + } +} + +func testShardForwardWriteTaggedAsyncRefCount( + t *testing.T, + now time.Time, + next time.Time, + idx namespaceIndex, + opts Options, +) { + testReporterOpts := xmetrics.NewTestStatsReporterOptions() + testReporter := xmetrics.NewTestStatsReporter(testReporterOpts) + scope, closer := tally.NewRootScope(tally.ScopeOptions{ + Reporter: testReporter, + }, 100*time.Millisecond) + defer closer.Close() + opts = opts.SetInstrumentOptions( + opts.InstrumentOptions(). + SetMetricsScope(scope). + SetReportInterval(100 * time.Millisecond)) + + shard := testDatabaseShardWithIndexFn(t, opts, idx) + shard.SetRuntimeOptions(runtime.NewOptions(). + SetWriteNewSeriesAsync(true)) + defer shard.Close() + + ctx := context.NewContext() + defer ctx.Close() + + writeToShard(ctx, t, shard, now, "foo", true) + writeToShard(ctx, t, shard, now, "bar", true) + writeToShard(ctx, t, shard, now, "baz", true) + + inserted := xclock.WaitUntil(func() bool { + counter, ok := testReporter.Counters()["dbshard.insert-queue.inserts"] + return ok && counter == 3 + }, 5*time.Second) + assert.True(t, inserted) + + verifyShard(ctx, t, idx, now, next, "foo") + verifyShard(ctx, t, idx, now, next, "bar") + verifyShard(ctx, t, idx, now, next, "baz") + + // ensure all entries have no references left + for _, id := range []string{"foo", "bar", "baz"} { + shard.Lock() + entry, _, err := shard.lookupEntryWithLock(ident.StringID(id)) + shard.Unlock() + assert.NoError(t, err) + assert.Equal(t, int32(0), entry.ReaderWriterCount(), id) + } + + // write already inserted series. This should have no effect. + now = now.Add(1) + writeToShardAndVerify(ctx, t, shard, idx, now, next, "foo", false) + writeToShardAndVerify(ctx, t, shard, idx, now, next, "bar", false) + writeToShardAndVerify(ctx, t, shard, idx, now, next, "baz", false) + + // ensure all entries have no references left + for _, id := range []string{"foo", "bar", "baz"} { + shard.Lock() + entry, _, err := shard.lookupEntryWithLock(ident.StringID(id)) + shard.Unlock() + assert.NoError(t, err) + assert.Equal(t, int32(0), entry.ReaderWriterCount(), id) + } +} diff --git a/src/dbnode/storage/series/truncate_type.go b/src/dbnode/storage/series/truncate_type.go index 8a2665fda6..3f0850111e 100644 --- a/src/dbnode/storage/series/truncate_type.go +++ b/src/dbnode/storage/series/truncate_type.go @@ -36,7 +36,7 @@ var validTruncationTypes = []TruncateType{ // TypeNone indicates that no truncation occurs. TypeNone, // TypeBlock truncates incoming writes to the block boundary immediately - // preceeding this point's timestamp. + // preceding this point's timestamp. TypeBlock, } diff --git a/src/x/dice/dice.go b/src/x/dice/dice.go new file mode 100644 index 0000000000..fde1a26385 --- /dev/null +++ b/src/x/dice/dice.go @@ -0,0 +1,61 @@ +// Copyright (c) 2019 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package dice + +import ( + "fmt" + + "github.com/MichaelTJones/pcg" +) + +// Dice is an interface that allows for random sampling. +type Dice interface { + // Rate returns the sampling rate of this Dice: a number in (0.0, 1.0]. + Rate() float64 + + // Roll returns whether the dice roll succeeded. + Roll() bool +} + +// NewDice constructs a new Dice based on a given success rate. +func NewDice(rate float64) (Dice, error) { + if rate <= 0.0 || rate > 1.0 { + return nil, fmt.Errorf("invalid sample rate %f", rate) + } + + return &epoch{ + r: uint64(1.0 / rate), + rng: pcg.NewPCG64(), + }, nil +} + +type epoch struct { + r uint64 + rng *pcg.PCG64 +} + +func (d *epoch) Rate() float64 { + return 1 / float64(d.r) +} + +func (d *epoch) Roll() bool { + return d.rng.Random()%d.r == 0 +} diff --git a/src/x/dice/dice_test.go b/src/x/dice/dice_test.go new file mode 100644 index 0000000000..523c2b7410 --- /dev/null +++ b/src/x/dice/dice_test.go @@ -0,0 +1,46 @@ +// Copyright (c) 2019 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package dice + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestDiceConstructor(t *testing.T) { + dice, err := NewDice(0) + require.Error(t, err) + require.Nil(t, dice) + + dice, err = NewDice(2) + require.Error(t, err) + require.Nil(t, dice) +} + +func TestDice(t *testing.T) { + r, err := NewDice(1) + require.NoError(t, err) + + assert.Equal(t, float64(1.0), r.Rate()) + assert.True(t, r.Roll()) +}