Skip to content

Commit

Permalink
Visibility into and ability to limit number of encoders per block (#2516
Browse files Browse the repository at this point in the history
)

* Max encoders per block setting
* New metric: histogram of number of active encoders per block
* New metric: counter of number of writes dropped due to max encoders setting
  • Loading branch information
justinjc committed Aug 31, 2020
1 parent b429452 commit ac5d8d4
Show file tree
Hide file tree
Showing 15 changed files with 690 additions and 9 deletions.
2 changes: 2 additions & 0 deletions src/cmd/services/m3dbnode/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -714,6 +714,8 @@ func TestConfiguration(t *testing.T) {
maxOutstandingWriteRequests: 0
maxOutstandingReadRequests: 0
maxOutstandingRepairedBytes: 0
maxEncodersPerBlock: 0
tchannel: null
coordinator: null
`

Expand Down
6 changes: 6 additions & 0 deletions src/cmd/services/m3dbnode/config/limits.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,10 @@ type Limits struct {
// process would pause until some of the repaired bytes had been persisted to disk (and subsequently
// evicted from memory) at which point it would resume.
MaxOutstandingRepairedBytes int64 `yaml:"maxOutstandingRepairedBytes" validate:"min=0"`

// MaxEncodersPerBlock is the maximum number of encoders permitted in a block.
// When there are too many encoders, merging them (during a tick) puts a high
// load on the CPU, which can prevent other DB operations.
// A setting of 0 means there is no maximum.
MaxEncodersPerBlock int `yaml:"maxEncodersPerBlock" validate:"min=0"`
}
118 changes: 118 additions & 0 deletions src/dbnode/integration/encoder_limit_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
// +build integration

// Copyright (c) 2020 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package integration

import (
"testing"
"time"

"github.com/m3db/m3/src/dbnode/client"
"github.com/m3db/m3/src/x/ident"
xtime "github.com/m3db/m3/src/x/time"

"github.com/stretchr/testify/require"
)

func TestEncoderLimit(t *testing.T) {
if testing.Short() {
t.SkipNow()
}

// We don't want a tick to happen during this test, since that will
// interfere with testing encoders due to the tick merging them.
testOpts := NewTestOptions(t).SetTickMinimumInterval(time.Minute)
testSetup, err := NewTestSetup(t, testOpts, nil)
require.NoError(t, err)
defer testSetup.Close()

log := testSetup.StorageOpts().InstrumentOptions().Logger()
require.NoError(t, testSetup.StartServer())
log.Info("server is now up")

defer func() {
require.NoError(t, testSetup.StopServer())
log.Info("server is now down")
}()

now := testSetup.NowFn()()

db := testSetup.DB()
mgr := db.Options().RuntimeOptionsManager()
encoderLimit := 5
newRuntimeOpts := mgr.Get().SetEncodersPerBlockLimit(encoderLimit)
mgr.Update(newRuntimeOpts)

session, err := testSetup.M3DBClient().DefaultSession()
require.NoError(t, err)
nsID := testNamespaces[0]
seriesID := ident.StringID("foo")

for i := 0; i < encoderLimit+5; i++ {
err = session.Write(
nsID, seriesID,
// Write backwards so that a new encoder gets created every write.
now.Add(time.Duration(50-i)*time.Second),
123, xtime.Second, nil,
)

if i >= encoderLimit {
require.Error(t, err)
// A rejected write due to hitting the max encoder limit should be
// a bad request so that the client knows to not repeat the write
// request, since that will exacerbate the problem.
require.True(t, client.IsBadRequestError(err))
} else {
require.NoError(t, err)
}
}

for i := 0; i < 10; i++ {
err = session.Write(
nsID, seriesID,
now.Add(time.Duration(51+i)*time.Second),
123, xtime.Second, nil,
)

// Even though we're doing more writes, these can fit into existing
// encoders since they are all ahead of existing writes, so expect
// no errors writing.
require.NoError(t, err)
}

// Now allow an unlimited number of encoders.
encoderLimit = 0
newRuntimeOpts = mgr.Get().SetEncodersPerBlockLimit(encoderLimit)
mgr.Update(newRuntimeOpts)

for i := 0; i < 20; i++ {
err = session.Write(
nsID, seriesID,
now.Add(time.Duration(20-i)*time.Second),
123, xtime.Second, nil,
)

// Now there's no encoder limit, so no error even though each of these
// additional writes creates a new encoder.
require.NoError(t, err)
}
}
5 changes: 5 additions & 0 deletions src/dbnode/kvconfig/keys.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@ const (
// configuration specifying a hard limit for a cluster new series insertions.
ClusterNewSeriesInsertLimitKey = "m3db.node.cluster-new-series-insert-limit"

// EncodersPerBlockLimitKey is the KV config key for the runtime
// configuration specifying a hard limit on the number of active encoders
// per block.
EncodersPerBlockLimitKey = "m3db.node.encoders-per-block-limit"

// ClientBootstrapConsistencyLevel is the KV config key for the runtime
// configuration specifying the client bootstrap consistency level
ClientBootstrapConsistencyLevel = "m3db.client.bootstrap-consistency-level"
Expand Down
28 changes: 28 additions & 0 deletions src/dbnode/runtime/runtime_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 11 additions & 0 deletions src/dbnode/runtime/runtime_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ type options struct {
writeNewSeriesAsync bool
writeNewSeriesBackoffDuration time.Duration
writeNewSeriesLimitPerShardPerSecond int
encodersPerBlockLimit int
tickSeriesBatchSize int
tickPerSeriesSleepDuration time.Duration
tickMinimumInterval time.Duration
Expand Down Expand Up @@ -168,6 +169,16 @@ func (o *options) WriteNewSeriesLimitPerShardPerSecond() int {
return o.writeNewSeriesLimitPerShardPerSecond
}

func (o *options) SetEncodersPerBlockLimit(value int) Options {
opts := *o
opts.encodersPerBlockLimit = value
return &opts
}

func (o *options) EncodersPerBlockLimit() int {
return o.encodersPerBlockLimit
}

func (o *options) SetTickSeriesBatchSize(value int) Options {
opts := *o
opts.tickSeriesBatchSize = value
Expand Down
14 changes: 14 additions & 0 deletions src/dbnode/runtime/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,20 @@ type Options interface {
// time series being inserted.
WriteNewSeriesLimitPerShardPerSecond() int

// SetEncodersPerBlockLimit sets the maximum number of encoders per block
// allowed. Setting to zero means an unlimited number of encoders are
// permitted. This rate limit is primarily offered to defend against
// bursts of out of order writes, which creates many encoders, subsequently
// causing a large burst in CPU load when trying to merge them.
SetEncodersPerBlockLimit(value int) Options

// EncodersPerBlockLimit sets the maximum number of encoders per block
// allowed. Setting to zero means an unlimited number of encoders are
// permitted. This rate limit is primarily offered to defend against
// bursts of out of order writes, which creates many encoders, subsequently
// causing a large burst in CPU load when trying to merge them.
EncodersPerBlockLimit() int

// SetTickSeriesBatchSize sets the batch size to process series together
// during a tick before yielding and sleeping the per series duration
// multiplied by the batch size.
Expand Down
73 changes: 73 additions & 0 deletions src/dbnode/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -872,6 +872,8 @@ func Run(runOpts RunOptions) {
// Only set the write new series limit after bootstrapping
kvWatchNewSeriesLimitPerShard(syncCfg.KVStore, logger, topo,
runtimeOptsMgr, cfg.WriteNewSeriesLimitPerSecond)
kvWatchEncodersPerBlockLimit(syncCfg.KVStore, logger,
runtimeOptsMgr, cfg.Limits.MaxEncodersPerBlock)
}()

// Wait for process interrupt.
Expand Down Expand Up @@ -988,6 +990,62 @@ func kvWatchNewSeriesLimitPerShard(
}()
}

func kvWatchEncodersPerBlockLimit(
store kv.Store,
logger *zap.Logger,
runtimeOptsMgr m3dbruntime.OptionsManager,
defaultEncodersPerBlockLimit int,
) {
var initEncoderLimit int

value, err := store.Get(kvconfig.EncodersPerBlockLimitKey)
if err == nil {
protoValue := &commonpb.Int64Proto{}
err = value.Unmarshal(protoValue)
if err == nil {
initEncoderLimit = int(protoValue.Value)
}
}

if err != nil {
if err != kv.ErrNotFound {
logger.Warn("error resolving encoder per block limit", zap.Error(err))
}
initEncoderLimit = defaultEncodersPerBlockLimit
}

err = setEncodersPerBlockLimitOnChange(runtimeOptsMgr, initEncoderLimit)
if err != nil {
logger.Warn("unable to set encoder per block limit", zap.Error(err))
}

watch, err := store.Watch(kvconfig.EncodersPerBlockLimitKey)
if err != nil {
logger.Error("could not watch encoder per block limit", zap.Error(err))
return
}

go func() {
protoValue := &commonpb.Int64Proto{}
for range watch.C() {
value := defaultEncodersPerBlockLimit
if newValue := watch.Get(); newValue != nil {
if err := newValue.Unmarshal(protoValue); err != nil {
logger.Warn("unable to parse new encoder per block limit", zap.Error(err))
continue
}
value = int(protoValue.Value)
}

err = setEncodersPerBlockLimitOnChange(runtimeOptsMgr, value)
if err != nil {
logger.Warn("unable to set encoder per block limit", zap.Error(err))
continue
}
}
}()
}

func kvWatchClientConsistencyLevels(
store kv.Store,
logger *zap.Logger,
Expand Down Expand Up @@ -1147,6 +1205,21 @@ func clusterLimitToPlacedShardLimit(topo topology.Topology, clusterLimit int) in
return nodeLimit
}

func setEncodersPerBlockLimitOnChange(
runtimeOptsMgr m3dbruntime.OptionsManager,
encoderLimit int,
) error {
runtimeOpts := runtimeOptsMgr.Get()
if runtimeOpts.EncodersPerBlockLimit() == encoderLimit {
// Not changed, no need to set the value and trigger a runtime options update
return nil
}

newRuntimeOpts := runtimeOpts.
SetEncodersPerBlockLimit(encoderLimit)
return runtimeOptsMgr.Update(newRuntimeOpts)
}

// this function will block for at most waitTimeout to try to get an initial value
// before we kick off the bootstrap
func kvWatchBootstrappers(
Expand Down
3 changes: 2 additions & 1 deletion src/dbnode/storage/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,8 @@ func NewSeriesOptionsFromOptions(opts Options, ropts retention.Options) series.O
SetMultiReaderIteratorPool(opts.MultiReaderIteratorPool()).
SetIdentifierPool(opts.IdentifierPool()).
SetBufferBucketPool(opts.BufferBucketPool()).
SetBufferBucketVersionsPool(opts.BufferBucketVersionsPool())
SetBufferBucketVersionsPool(opts.BufferBucketVersionsPool()).
SetRuntimeOptionsManager(opts.RuntimeOptionsManager())
}

type options struct {
Expand Down
21 changes: 20 additions & 1 deletion src/dbnode/storage/series/buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ const (
var (
timeZero time.Time
errIncompleteMerge = errors.New("bucket merge did not result in only one encoder")
errTooManyEncoders = xerrors.NewInvalidParamsError(errors.New("too many encoders per block"))
)

const (
Expand Down Expand Up @@ -445,6 +446,8 @@ func (b *dbBuffer) Tick(blockStates ShardBlockStateSnapshot, nsCtx namespace.Con
}
}

buckets.recordActiveEncoders()

// Once we've evicted all eligible buckets, we merge duplicate encoders
// in the remaining ones to try and reclaim memory.
merges, err := buckets.merge(WarmWrite, nsCtx)
Expand Down Expand Up @@ -1063,6 +1066,16 @@ func (b *BufferBucketVersions) mergeToStreams(ctx context.Context, opts streamsO
return res, nil
}

func (b *BufferBucketVersions) recordActiveEncoders() {
var numActiveEncoders int
for _, bucket := range b.buckets {
if bucket.version == writableBucketVersion {
numActiveEncoders += len(bucket.encoders)
}
}
b.opts.Stats().RecordEncodersPerBlock(numActiveEncoders)
}

type streamsOptions struct {
filterWriteType bool
writeType WriteType
Expand Down Expand Up @@ -1157,7 +1170,13 @@ func (b *BufferBucket) write(
return err == nil, err
}

// Need a new encoder, we didn't find an encoder to write to
// Need a new encoder, we didn't find an encoder to write to.
maxEncoders := b.opts.RuntimeOptionsManager().Get().EncodersPerBlockLimit()
if maxEncoders != 0 && len(b.encoders) >= int(maxEncoders) {
b.opts.Stats().IncEncoderLimitWriteRejected()
return false, errTooManyEncoders
}

b.opts.Stats().IncCreatedEncoders()
bopts := b.opts.DatabaseBlockOptions()
blockSize := b.opts.RetentionOptions().BlockSize()
Expand Down
Loading

0 comments on commit ac5d8d4

Please sign in to comment.