Skip to content

Commit

Permalink
[dbnode] Add forward index write capability (#1613)
Browse files Browse the repository at this point in the history
  • Loading branch information
arnikola authored May 30, 2019
1 parent 744e0fd commit f4c2694
Show file tree
Hide file tree
Showing 11 changed files with 1,043 additions and 17 deletions.
2 changes: 1 addition & 1 deletion src/dbnode/client/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,12 @@ import (
"github.com/m3db/m3/src/dbnode/storage/index"
"github.com/m3db/m3/src/dbnode/namespace"
"github.com/m3db/m3/src/dbnode/topology"
"github.com/m3db/m3/src/x/serialize"
"github.com/m3db/m3/src/x/context"
"github.com/m3db/m3/src/x/ident"
"github.com/m3db/m3/src/x/instrument"
"github.com/m3db/m3/src/x/pool"
xretry "github.com/m3db/m3/src/x/retry"
"github.com/m3db/m3/src/x/serialize"
xtime "github.com/m3db/m3/src/x/time"

tchannel "github.com/uber/tchannel-go"
Expand Down
102 changes: 102 additions & 0 deletions src/dbnode/storage/forward_index_dice.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
// Copyright (c) 2019 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package storage

import (
"fmt"
"time"

"github.com/m3db/m3/src/x/dice"
)

// forwardIndexDice is a die roll that adds a chance for incoming index writes
// arriving near a block boundary to be duplicated and written to the next block
// index, adding jitter and smoothing index load so that block boundaries do not
// cause a huge influx of new documents that all need to be indexed at once.
type forwardIndexDice struct {
enabled bool
blockSize time.Duration

forwardIndexThreshold time.Duration
forwardIndexDice dice.Dice
}

func newForwardIndexDice(
opts Options,
) (forwardIndexDice, error) {
var (
indexOpts = opts.IndexOptions()
seriesOpts = opts.SeriesOptions()

probability = indexOpts.ForwardIndexProbability()
)

// NB: if not enabled, return a no-op forward index dice.
if probability == 0 {
return forwardIndexDice{}, nil
}

var (
threshold = indexOpts.ForwardIndexThreshold()

retention = seriesOpts.RetentionOptions()
bufferFuture = retention.BufferFuture()
blockSize = retention.BlockSize()

forwardIndexThreshold time.Duration
)

if threshold < 0 || threshold > 1 {
return forwardIndexDice{},
fmt.Errorf("invalid forward write threshold %f", threshold)
}

bufferFragment := float64(bufferFuture) * threshold
forwardIndexThreshold = blockSize - time.Duration(bufferFragment)

dice, err := dice.NewDice(probability)
if err != nil {
return forwardIndexDice{},
fmt.Errorf("cannot create forward write dice: %s", err)
}

return forwardIndexDice{
enabled: true,
blockSize: blockSize,

forwardIndexThreshold: forwardIndexThreshold,
forwardIndexDice: dice,
}, nil
}

// roll decides if a timestamp is eligible for forward index writes.
func (o *forwardIndexDice) roll(timestamp time.Time) bool {
if !o.enabled {
return false
}

threshold := timestamp.Truncate(o.blockSize).Add(o.forwardIndexThreshold)
if !timestamp.Before(threshold) {
return o.forwardIndexDice.Roll()
}

return false
}
157 changes: 157 additions & 0 deletions src/dbnode/storage/forward_index_dice_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
// Copyright (c) 2019 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package storage

import (
"testing"
"time"

"github.com/m3db/m3/src/dbnode/retention"

"github.com/stretchr/testify/require"
)

func optionsWithIndexValues(
indexProb float64,
indexThreshold float64,
retOpts retention.Options,
) Options {
opts := DefaultTestOptions()

idxOpts := opts.IndexOptions()
idxOpts = idxOpts.
SetForwardIndexProbability(indexProb).
SetForwardIndexThreshold(indexThreshold)

opts = DefaultTestOptions().
SetIndexOptions(idxOpts)

if retOpts != nil {
seriesOpts := opts.SeriesOptions().
SetRetentionOptions(retOpts)
return opts.SetSeriesOptions(seriesOpts)
}

return opts
}

func TestDisabledForwardIndexDice(t *testing.T) {
opts := optionsWithIndexValues(0, 0, nil)
dice, err := newForwardIndexDice(opts)
require.NoError(t, err)
require.False(t, dice.enabled)

start := time.Now().Truncate(time.Hour)
end := start.Add(time.Hour)

for ts := start; ts.Before(end); ts = ts.Add(time.Second) {
require.False(t, dice.roll(ts))
}
}

func TestInvalidForwardIndexDice(t *testing.T) {
// Index probability < 0 and > 1 cases.
invalidIndexProbabilities := []float64{-10, 10}
for _, prob := range invalidIndexProbabilities {
opts := optionsWithIndexValues(prob, 0, nil)
_, err := newForwardIndexDice(opts)
require.Error(t, err)
}

// Index threshold < 0 and > 1 cases.
invalidIndexThresholds := []float64{-10, 10}
for _, threshold := range invalidIndexThresholds {
opts := optionsWithIndexValues(0.5, threshold, nil)
_, err := newForwardIndexDice(opts)
require.Error(t, err)
}
}

func TestAlwaysOnForwardIndexDice(t *testing.T) {
retOpts := retention.NewOptions().
SetBlockSize(time.Hour).
SetBufferFuture(time.Minute * 10)
opts := optionsWithIndexValues(1, 0.9, retOpts)

dice, err := newForwardIndexDice(opts)
require.NoError(t, err)
require.True(t, dice.enabled)

var (
start = time.Now().Truncate(time.Hour)
threshold = start.Add(time.Minute * 51)
end = start.Add(time.Hour)
)

for ts := start; ts.Before(end); ts = ts.Add(time.Second) {
indexing := dice.roll(ts)
if ts.Before(threshold) {
require.False(t, indexing)
} else {
require.True(t, indexing)
}
}
}

type trackingDice struct {
rolled int
success int
}

func (d *trackingDice) Rate() float64 { return 0 }
func (d *trackingDice) Roll() bool {
d.rolled++
return d.rolled%d.success == 0
}

func TestCustomDice(t *testing.T) {
d := &trackingDice{
success: 10,
}

dice := forwardIndexDice{
enabled: true,
blockSize: time.Hour,

forwardIndexThreshold: time.Minute * 50,
forwardIndexDice: d,
}

var (
start = time.Now().Truncate(time.Hour)
threshold = start.Add(time.Minute * 50)
end = start.Add(time.Hour)
)

sample := 0
for ts := start; ts.Before(end); ts = ts.Add(time.Second) {
indexing := dice.roll(ts)

if ts.Before(threshold) {
require.False(t, indexing)
} else {
sample++
require.Equal(t, sample%10 == 0, indexing)
}
}

require.Equal(t, 600, d.rolled)
}
57 changes: 49 additions & 8 deletions src/dbnode/storage/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"time"

"github.com/m3db/m3/src/dbnode/clock"
"github.com/m3db/m3/src/dbnode/namespace"
"github.com/m3db/m3/src/dbnode/persist"
"github.com/m3db/m3/src/dbnode/persist/fs"
"github.com/m3db/m3/src/dbnode/retention"
Expand All @@ -39,7 +40,6 @@ import (
"github.com/m3db/m3/src/dbnode/storage/index"
"github.com/m3db/m3/src/dbnode/storage/index/compaction"
"github.com/m3db/m3/src/dbnode/storage/index/convert"
"github.com/m3db/m3/src/dbnode/namespace"
"github.com/m3db/m3/src/m3ninx/doc"
"github.com/m3db/m3/src/m3ninx/idx"
m3ninxindex "github.com/m3db/m3/src/m3ninx/index"
Expand Down Expand Up @@ -113,6 +113,10 @@ type nsIndex struct {
queriesWg sync.WaitGroup

metrics nsIndexMetrics

// forwardIndexDice determines if an incoming index write should be dual
// written to the next block.
forwardIndexDice forwardIndexDice
}

type nsIndexState struct {
Expand Down Expand Up @@ -292,6 +296,14 @@ func newNamespaceIndexWithOptions(
idx.runtimeOptsListener = runtimeOptsMgr.RegisterListener(idx)
}

// set up forward index dice.
dice, err := newForwardIndexDice(newIndexOpts.opts)
if err != nil {
return nil, err
}

idx.forwardIndexDice = dice

// allocate indexing queue and start it up.
queue := newIndexQueueFn(idx.writeBatches, nsMD, nowFn, scope)
if err := queue.Start(); err != nil {
Expand All @@ -302,7 +314,7 @@ func newNamespaceIndexWithOptions(
// allocate the current block to ensure we're able to index as soon as we return
currentBlock := nowFn().Truncate(idx.blockSize)
idx.state.RLock()
_, err := idx.ensureBlockPresentWithRLock(currentBlock)
_, err = idx.ensureBlockPresentWithRLock(currentBlock)
idx.state.RUnlock()
if err != nil {
return nil, err
Expand Down Expand Up @@ -501,31 +513,60 @@ func (i *nsIndex) writeBatches(
// on the provided inserts to terminate quicker during shutdown.
return
}
now := i.nowFn()
futureLimit := now.Add(1 * i.bufferFuture)
pastLimit := now.Add(-1 * i.bufferPast)
var (
now = i.nowFn()
blockSize = i.blockSize
futureLimit = now.Add(1 * i.bufferFuture)
pastLimit = now.Add(-1 * i.bufferPast)
batchOptions = batch.Options()
forwardIndexDice = i.forwardIndexDice
forwardIndexEnabled = forwardIndexDice.enabled

forwardIndexBatch *index.WriteBatch
)
// NB(r): Release lock early to avoid writing batches impacting ticking
// speed, etc.
// Sometimes foreground compaction can take a long time during heavy inserts.
// Each lookup to ensureBlockPresent checks that index is still open, etc.
i.state.RUnlock()

if forwardIndexEnabled {
// NB(arnikola): Don't initialize forward index batch if forward indexing
// is not enabled.
forwardIndexBatch = index.NewWriteBatch(batchOptions)
}
// Ensure timestamp is not too old/new based on retention policies and that
// doc is valid.
// doc is valid. Add potential forward writes to the forwardWriteBatch.
batch.ForEach(
func(idx int, entry index.WriteBatchEntry,
d doc.Document, _ index.WriteBatchEntryResult) {
if !futureLimit.After(entry.Timestamp) {
ts := entry.Timestamp
if !futureLimit.After(ts) {
batch.MarkUnmarkedEntryError(m3dberrors.ErrTooFuture, idx)
return
}

if !entry.Timestamp.After(pastLimit) {
if !ts.After(pastLimit) {
batch.MarkUnmarkedEntryError(m3dberrors.ErrTooPast, idx)
return
}

if forwardIndexDice.roll(ts) {
forwardEntryTimestamp := ts.Truncate(blockSize).Add(blockSize)
xNanoTimestamp := xtime.ToUnixNano(forwardEntryTimestamp)
if entry.OnIndexSeries.NeedsIndexUpdate(xNanoTimestamp) {
forwardIndexEntry := entry
forwardIndexEntry.Timestamp = forwardEntryTimestamp
forwardIndexEntry.OnIndexSeries.OnIndexPrepare()
forwardIndexBatch.Append(forwardIndexEntry, d)
}
}
})

if forwardIndexEnabled && forwardIndexBatch.Len() > 0 {
batch.AppendAll(forwardIndexBatch)
}

// Sort the inserts by which block they're applicable for, and do the inserts
// for each block, making sure to not try to insert any entries already marked
// with a result.
Expand Down
Loading

0 comments on commit f4c2694

Please sign in to comment.