Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initial query limit overriding #3090

Merged
merged 89 commits into from
Jan 21, 2021
Merged
Show file tree
Hide file tree
Changes from 42 commits
Commits
Show all changes
89 commits
Select commit Hold shift + click to select a range
9e3b280
Initial query limit overriding
rallen090 Jan 14, 2021
d5ef8b4
Initial query limit overriding 2
rallen090 Jan 14, 2021
5ecdb41
Initial query limit overriding 3
rallen090 Jan 14, 2021
ab8384b
Initial query limit overriding 4
rallen090 Jan 14, 2021
06ad766
PR feedback
rallen090 Jan 14, 2021
6390dcf
Build fix
rallen090 Jan 14, 2021
c9b0ad0
Fix error wording
rallen090 Jan 14, 2021
0bba45e
Build fix 2
rallen090 Jan 14, 2021
b90cc5c
Fix limit test
rallen090 Jan 14, 2021
7916470
Fix limit test 2
rallen090 Jan 14, 2021
5c6ae76
Add Override test
rallen090 Jan 14, 2021
3285a62
Add Override test 2
rallen090 Jan 15, 2021
49b6513
Merge remote-tracking branch 'origin/master' into ra/dynamic-limits
rallen090 Jan 15, 2021
6df725b
Back query limits by etcd
rallen090 Jan 16, 2021
e7e23aa
Back query limits by etcd 2
rallen090 Jan 16, 2021
6c4bca4
Back query limits by etcd 3
rallen090 Jan 16, 2021
8f281b9
Build fix
rallen090 Jan 18, 2021
7b0d797
Build fix 2
rallen090 Jan 18, 2021
520f2de
Merge remote-tracking branch 'origin/master' into ra/dynamic-limits
rallen090 Jan 18, 2021
2ad8c09
Build fix 3
rallen090 Jan 18, 2021
bc50860
Build fix 4
rallen090 Jan 18, 2021
f2476c5
Lint
rallen090 Jan 18, 2021
9f4d9b1
Removing endpoint to instead use etcd directly
rallen090 Jan 18, 2021
33a7bf4
Removing endpoint to instead use etcd directly 2
rallen090 Jan 18, 2021
32f148f
Add back rpc mock
rallen090 Jan 18, 2021
66a0a42
Fix test
rallen090 Jan 18, 2021
a39700f
Fix test 2
rallen090 Jan 18, 2021
0e56187
Add back m3em_mock
rallen090 Jan 18, 2021
2dbef2c
Fix gen
rallen090 Jan 18, 2021
fb29267
Gen
rallen090 Jan 18, 2021
90bef8a
Lint
rallen090 Jan 18, 2021
d3747ec
Fix tests
rallen090 Jan 18, 2021
03f987f
Merge remote-tracking branch 'origin/master' into ra/dynamic-limits
rallen090 Jan 18, 2021
053739b
Rebased
rallen090 Jan 18, 2021
2272d5e
Rebased 2
rallen090 Jan 18, 2021
dddecfb
Test fix
rallen090 Jan 18, 2021
0bc4306
Lint
rallen090 Jan 18, 2021
8e249e6
Test fix 2
rallen090 Jan 18, 2021
c8507c0
Fix logs
rallen090 Jan 18, 2021
027aaae
Add limit logs and metrics
rallen090 Jan 19, 2021
40948e0
Merge remote-tracking branch 'origin/master' into ra/dynamic-limits
rallen090 Jan 19, 2021
8b9b602
Test reset
rallen090 Jan 19, 2021
98efb93
Add etcd kv update endpoint 1
rallen090 Jan 19, 2021
6be4a30
Add etcd kv update endpoint 2
rallen090 Jan 19, 2021
db34398
Add etcd kv update endpoint 3
rallen090 Jan 19, 2021
02fcd56
Add etcd kv update endpoint 4
rallen090 Jan 19, 2021
8351c89
Add etcd kv update endpoint 5
rallen090 Jan 19, 2021
731ec5f
Remove reset
rallen090 Jan 19, 2021
ffcea5e
Add back mock
rallen090 Jan 19, 2021
6c42ae2
Fixing tests
rallen090 Jan 19, 2021
a86d55f
More build fixes
rallen090 Jan 19, 2021
b86fdcf
Merge remote-tracking branch 'origin/master' into ra/dynamic-limits
rallen090 Jan 19, 2021
7dc8689
Integration test fix
rallen090 Jan 19, 2021
3ec7673
Add back mock
rallen090 Jan 20, 2021
6a8f9ed
PR feedback 1
rallen090 Jan 20, 2021
c05b6c9
Comment
rallen090 Jan 20, 2021
79cd2e4
Test fix
rallen090 Jan 20, 2021
0a31c53
Cleanup
rallen090 Jan 20, 2021
38cabbd
Lint
rallen090 Jan 20, 2021
5f81a75
Merge remote-tracking branch 'origin/master' into ra/dynamic-limits
rallen090 Jan 20, 2021
a9621f7
Lint 2
rallen090 Jan 20, 2021
f488d68
Lint 3
rallen090 Jan 20, 2021
c5e0683
Cleanup 2
rallen090 Jan 20, 2021
316a7d3
Dep order
rallen090 Jan 20, 2021
e3480a8
Fix stop race
rallen090 Jan 20, 2021
6e15b82
Merge remote-tracking branch 'origin/master' into ra/dynamic-limits
rallen090 Jan 20, 2021
450acce
Fix integration test
rallen090 Jan 20, 2021
5208b2c
PR feedback
rallen090 Jan 20, 2021
f880627
PR feedback 2
rallen090 Jan 20, 2021
4cdea4c
Add kvstore test
rallen090 Jan 20, 2021
9fc90b8
More tests
rallen090 Jan 20, 2021
9b422ad
More tests 2
rallen090 Jan 20, 2021
a3c0857
More tests 3
rallen090 Jan 20, 2021
34a5e3e
Lint
rallen090 Jan 20, 2021
bdd3ad4
Fallback to config-based limits if unset in dynamic
rallen090 Jan 20, 2021
aa89830
Fixes from feedback
rallen090 Jan 20, 2021
314e43f
Add docs around dynamic limits
rallen090 Jan 20, 2021
24b7e87
More docs
rallen090 Jan 20, 2021
8596169
More doc updates
rallen090 Jan 21, 2021
bbc474e
Merge remote-tracking branch 'origin/master' into ra/dynamic-limits
rallen090 Jan 21, 2021
0967ae6
Reorganize limit setting code
rallen090 Jan 21, 2021
6f540d6
Reorganize limit setting code 2
rallen090 Jan 21, 2021
21732ef
Add more comprehensive locking
rallen090 Jan 21, 2021
4a4d5cd
Update wording
rallen090 Jan 21, 2021
5e00b23
Update docs more
rallen090 Jan 21, 2021
a943da0
Update docs more 2
rallen090 Jan 21, 2021
b659237
PR feedback
rallen090 Jan 21, 2021
7fc778a
Merge remote-tracking branch 'origin/master' into ra/dynamic-limits
rallen090 Jan 21, 2021
e07440d
PR feedback 2
rallen090 Jan 21, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/dbnode/generated/thrift/rpc/rpc.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2020 Uber Technologies, Inc.
// Copyright (c) 2021 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
Expand Down
3 changes: 2 additions & 1 deletion src/dbnode/integration/query_limit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,9 @@ func newTestOptionsWithIndexedNamespace(t *testing.T) (TestOptions, namespace.Me

func newTestSetupWithQueryLimits(t *testing.T, opts TestOptions) TestSetup {
storageLimitsFn := func(storageOpts storage.Options) storage.Options {
limit := int64(1)
queryLookback := limits.DefaultLookbackLimitOptions()
queryLookback.Limit = 1
queryLookback.Limit = &limit
queryLookback.Lookback = time.Hour

limitOpts := limits.NewOptions().
Expand Down
12 changes: 12 additions & 0 deletions src/dbnode/kvconfig/keys.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,4 +49,16 @@ const (
// ClientWriteConsistencyLevel is the KV config key for the runtime
// configuration specifying the client write consistency level
ClientWriteConsistencyLevel = "m3db.client.write-consistency-level"

// DocsLimit is the KV config key for the docs matched query limit.
// Settings in string form "{limit},{lookback}", e.g. "1000,15s".
DocsLimit = "m3db.limits.docs"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

migrating these will probably be more annoying than migrating static config when we want to rename these. Should we land on the desired names now?

rallen090 marked this conversation as resolved.
Show resolved Hide resolved

// DiskBytesReadLimit is the KV config key for the disk bytes read query limit.
// Settings in string form "{limit},{lookback}", e.g. "1000,15s".
DiskBytesReadLimit = "m3db.limits.disk-bytes-read"

// DiskSeriesReadLimit is the KV config key for the disk series read query limit.
// Settings in string form "{limit},{lookback}", e.g. "1000,15s".
DiskSeriesReadLimit = "m3db.limits.disk-series-read"
)
3 changes: 2 additions & 1 deletion src/dbnode/persist/fs/retriever_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -805,12 +805,13 @@ func TestBlockRetrieverHandlesSeekByIndexEntryErrors(t *testing.T) {

func TestLimitSeriesReadFromDisk(t *testing.T) {
scope := tally.NewTestScope("test", nil)
limit := int64(2)
limitOpts := limits.NewOptions().
SetInstrumentOptions(instrument.NewOptions().SetMetricsScope(scope)).
SetBytesReadLimitOpts(limits.DefaultLookbackLimitOptions()).
SetDocsLimitOpts(limits.DefaultLookbackLimitOptions()).
SetDiskSeriesReadLimitOpts(limits.LookbackLimitOptions{
Limit: 1,
Limit: &limit,
Lookback: time.Second * 1,
})
queryLimits, err := limits.NewQueryLimits(limitOpts)
Expand Down
89 changes: 86 additions & 3 deletions src/dbnode/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"path"
"runtime"
"runtime/debug"
"strconv"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -452,15 +453,21 @@ func Run(runOpts RunOptions) {
bytesReadLimit := limits.DefaultLookbackLimitOptions()
diskSeriesReadLimit := limits.DefaultLookbackLimitOptions()
if limitConfig := runOpts.Config.Limits.MaxRecentlyQueriedSeriesBlocks; limitConfig != nil {
docsLimit.Limit = limitConfig.Value
if limitConfig.Value != 0 {
rallen090 marked this conversation as resolved.
Show resolved Hide resolved
docsLimit.Limit = &limitConfig.Value
}
docsLimit.Lookback = limitConfig.Lookback
}
if limitConfig := runOpts.Config.Limits.MaxRecentlyQueriedSeriesDiskBytesRead; limitConfig != nil {
bytesReadLimit.Limit = limitConfig.Value
if limitConfig.Value != 0 {
bytesReadLimit.Limit = &limitConfig.Value
}
bytesReadLimit.Lookback = limitConfig.Lookback
}
if limitConfig := runOpts.Config.Limits.MaxRecentlyQueriedSeriesDiskRead; limitConfig != nil {
diskSeriesReadLimit.Limit = limitConfig.Value
if limitConfig.Value != 0 {
diskSeriesReadLimit.Limit = &limitConfig.Value
}
diskSeriesReadLimit.Lookback = limitConfig.Lookback
}
limitOpts := limits.NewOptions().
Expand Down Expand Up @@ -993,6 +1000,9 @@ func Run(runOpts RunOptions) {
runtimeOptsMgr, cfg.Limits.WriteNewSeriesPerSecond)
kvWatchEncodersPerBlockLimit(syncCfg.KVStore, logger,
runtimeOptsMgr, cfg.Limits.MaxEncodersPerBlock)
kvWatchQueryLimit(syncCfg.KVStore, logger, queryLimits.DocsLimit(), kvconfig.DocsLimit)
kvWatchQueryLimit(syncCfg.KVStore, logger, queryLimits.DiskSeriesReadLimit(), kvconfig.DiskBytesReadLimit)
kvWatchQueryLimit(syncCfg.KVStore, logger, queryLimits.BytesReadLimit(), kvconfig.DiskSeriesReadLimit)
}()

// Wait for process interrupt.
Expand Down Expand Up @@ -1165,6 +1175,79 @@ func kvWatchEncodersPerBlockLimit(
}()
}

func kvWatchQueryLimit(
store kv.Store,
logger *zap.Logger,
limit limits.LookbackLimit,
kvName string,
) {
options := limit.Options()

value, err := store.Get(kvName)
if err == nil {
protoValue := &commonpb.StringProto{}
err = value.Unmarshal(protoValue)
if err == nil {
options = parseLookbackLimitOptions(logger, kvName, protoValue.Value, options)
}
} else if errors.Is(err, kv.ErrNotFound) {
logger.Warn("error resolving query limit", zap.Error(err), zap.String("name", kvName))
}
rallen090 marked this conversation as resolved.
Show resolved Hide resolved

if err := limit.Update(options); err != nil {
rallen090 marked this conversation as resolved.
Show resolved Hide resolved
logger.Warn("unable to set query limit", zap.Error(err), zap.String("name", kvName))
}

watch, err := store.Watch(kvName)
if err != nil {
logger.Error("could not watch query limit", zap.Error(err), zap.String("name", kvName))
return
}

go func() {
protoValue := &commonpb.StringProto{}
for range watch.C() {
value := options
if newValue := watch.Get(); newValue != nil {
rallen090 marked this conversation as resolved.
Show resolved Hide resolved
rallen090 marked this conversation as resolved.
Show resolved Hide resolved
if err := newValue.Unmarshal(protoValue); err != nil {
logger.Warn("unable to parse new query limit", zap.Error(err), zap.String("name", kvName))
continue
}
value = parseLookbackLimitOptions(logger, kvName, protoValue.Value, value)
}

if err := limit.Update(value); err != nil {
rallen090 marked this conversation as resolved.
Show resolved Hide resolved
logger.Warn("unable to set query limit", zap.Error(err), zap.String("name", kvName))
}
}
}()
}

func parseLookbackLimitOptions(logger *zap.Logger,
kvName string,
val string,
defaultOpts limits.LookbackLimitOptions,
rallen090 marked this conversation as resolved.
Show resolved Hide resolved
) limits.LookbackLimitOptions {
parts := strings.Split(val, ",")
rallen090 marked this conversation as resolved.
Show resolved Hide resolved
if val == "" {
defaultOpts.Limit = nil
} else if len(parts) == 2 {
parsedLimit, err := strconv.ParseInt(parts[0], 10, 64)
if err != nil {
logger.Warn("error parsing query limit value", zap.Error(err), zap.String("name", kvName))
} else {
defaultOpts.Limit = &parsedLimit
}
parsedLookback, err := time.ParseDuration(parts[1])
if err != nil {
logger.Warn("error parsing query limit lookback", zap.Error(err), zap.String("name", kvName))
} else {
defaultOpts.Lookback = parsedLookback
}
}
return defaultOpts
}

func kvWatchClientConsistencyLevels(
store kv.Store,
logger *zap.Logger,
Expand Down
8 changes: 8 additions & 0 deletions src/dbnode/storage/limits/noop_query_limits.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,14 @@ func (q *noOpQueryLimits) Stop() {
func (q *noOpQueryLimits) Start() {
}

func (q *noOpLookbackLimit) Options() LookbackLimitOptions {
return LookbackLimitOptions{}
}

func (q *noOpLookbackLimit) Update(LookbackLimitOptions) error {
return nil
}

func (q *noOpLookbackLimit) Inc(int, []byte) error {
return nil
}
69 changes: 61 additions & 8 deletions src/dbnode/storage/limits/query_limits.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,15 @@ package limits

import (
"fmt"
"sync"
"time"

xerrors "github.com/m3db/m3/src/x/errors"
"github.com/m3db/m3/src/x/instrument"

"github.com/uber-go/tally"
"go.uber.org/atomic"
"go.uber.org/zap"
)

const (
Expand All @@ -45,15 +47,19 @@ type lookbackLimit struct {
name string
options LookbackLimitOptions
metrics lookbackLimitMetrics
logger *zap.Logger
recent *atomic.Int64
stopCh chan struct{}
lock sync.RWMutex
}

type lookbackLimitMetrics struct {
recentCount tally.Gauge
recentMax tally.Gauge
total tally.Counter
exceeded tally.Counter
optionsMax tally.Gauge
optionsLookback tally.Gauge
recentCount tally.Gauge
recentMax tally.Gauge
total tally.Counter
exceeded tally.Counter

sourceLogger SourceLogger
}
Expand All @@ -67,7 +73,7 @@ var (
func DefaultLookbackLimitOptions() LookbackLimitOptions {
return LookbackLimitOptions{
// Default to no limit.
Limit: 0,
Limit: nil,
Lookback: defaultLookback,
}
}
Expand Down Expand Up @@ -110,6 +116,7 @@ func newLookbackLimit(
name: name,
options: opts,
metrics: newLookbackLimitMetrics(instrumentOpts, name, sourceLoggerBuilder),
logger: instrumentOpts.Logger(),
recent: atomic.NewInt64(0),
stopCh: make(chan struct{}),
}
Expand Down Expand Up @@ -171,6 +178,36 @@ func (q *queryLimits) AnyExceeded() error {
return q.bytesReadLimit.exceeded()
}

func (q *lookbackLimit) Options() LookbackLimitOptions {
return q.options
}

// Override overrides the limit set on construction.
rallen090 marked this conversation as resolved.
Show resolved Hide resolved
func (q *lookbackLimit) Update(opts LookbackLimitOptions) error {
if err := opts.validate(); err != nil {
return err
}

q.lock.Lock()
defer q.lock.Unlock()

old := q.options
q.options = opts

// If the lookback changed, replace the background goroutine that manages the periodic resetting.
if q.options.Lookback != old.Lookback {
q.stop()
q.start()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could q.start possibly fail? Scary to imagine if the resetting goroutine doesn't get started, meaning the limits will run amok

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks like it shouldn't be able to

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why don't we change the tick value of the existing goroutine?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had tried to do this but ticker.Reset is go 1.15 https://golang.org/doc/go1.15#time

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just create a new ticker? don't really care, just throwing out ideas

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah that's what we're doing instead of .Reset(...). We just also call stop so the current ticker stops.

}

q.logger.Info("query limit options updated",
zap.String("name", q.name),
zap.Any("new", opts),
zap.Any("old", old))

return nil
}

// Inc increments the current value and returns an error if above the limit.
func (q *lookbackLimit) Inc(val int, source []byte) error {
if val < 0 {
Expand Down Expand Up @@ -199,7 +236,15 @@ func (q *lookbackLimit) exceeded() error {
}

func (q *lookbackLimit) checkLimit(recent int64) error {
if q.options.Limit > 0 && recent > q.options.Limit {
q.lock.RLock()
limit := q.options.Limit
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is a lock necessary here if we grab a snapshot and use the snapshot? I imagine there will be a ton of activity on this

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's just a read lock, which should be fine. and that's basically what is happening, snapshoting the current value with the lock.

q.lock.RUnlock()

if limit == nil {
return nil
}

if recent >= *limit {
q.metrics.exceeded.Inc(1)
return xerrors.NewInvalidParamsError(NewQueryLimitExceededError(fmt.Sprintf(
"query aborted due to limit: name=%s, limit=%d, current=%d, within=%s",
Expand All @@ -210,7 +255,9 @@ func (q *lookbackLimit) checkLimit(recent int64) error {

func (q *lookbackLimit) start() {
ticker := time.NewTicker(q.options.Lookback)
ticker.Reset(q.options.Lookback)
go func() {
q.logger.Info("query limit interval started", zap.String("name", q.name))
for {
select {
case <-ticker.C:
Expand All @@ -221,10 +268,16 @@ func (q *lookbackLimit) start() {
}
}
}()

q.metrics.optionsMax.Update(float64(*q.options.Limit))
q.metrics.optionsLookback.Update(q.options.Lookback.Seconds())
}

func (q *lookbackLimit) stop() {
close(q.stopCh)
q.stopCh = make(chan struct{})

q.logger.Info("query limit interval stopped", zap.String("name", q.name))
}

func (q *lookbackLimit) current() int64 {
Expand All @@ -244,8 +297,8 @@ func (q *lookbackLimit) reset() {
}

func (opts LookbackLimitOptions) validate() error {
if opts.Limit < 0 {
return fmt.Errorf("query limit requires limit >= 0 (%d)", opts.Limit)
if opts.Limit != nil && *opts.Limit < 0 {
return fmt.Errorf("query limit requires limit >= 0 or nil (%d)", *opts.Limit)
}
if opts.Lookback <= 0 {
return fmt.Errorf("query limit requires lookback > 0 (%d)", opts.Lookback)
Expand Down
Loading