diff --git a/src/cmd/services/m3dbnode/config/limits.go b/src/cmd/services/m3dbnode/config/limits.go index 29c6dafe78..eb26cb0de8 100644 --- a/src/cmd/services/m3dbnode/config/limits.go +++ b/src/cmd/services/m3dbnode/config/limits.go @@ -29,6 +29,12 @@ type LimitsConfiguration struct { // max is surpassed encounter an error. MaxRecentlyQueriedSeriesDiskBytesRead *MaxRecentQueryResourceLimitConfiguration `yaml:"maxRecentlyQueriedSeriesDiskBytesRead"` + // MaxRecentlyQueriedSeriesDiskRead sets the upper limit on time series read from disk within a given lookback + // period. Queries which are issued while this max is surpassed encounter an error. + // This is the number of time series, which is different from the number of bytes controlled by + // MaxRecentlyQueriedSeriesDiskBytesRead. + MaxRecentlyQueriedSeriesDiskRead *MaxRecentQueryResourceLimitConfiguration `yaml:"maxRecentlyQueriedSeriesDiskRead"` + // MaxRecentlyQueriedSeriesBlocks sets the upper limit on time series blocks // count within a given lookback period. Queries which are issued while this // max is surpassed encounter an error. diff --git a/src/dbnode/persist/fs/retriever.go b/src/dbnode/persist/fs/retriever.go index a5bfab3511..23622ab484 100644 --- a/src/dbnode/persist/fs/retriever.go +++ b/src/dbnode/persist/fs/retriever.go @@ -50,6 +50,7 @@ import ( "github.com/m3db/m3/src/x/ident" "github.com/m3db/m3/src/x/pool" + "github.com/uber-go/tally" "go.uber.org/zap" ) @@ -88,11 +89,12 @@ const ( type blockRetriever struct { sync.RWMutex - opts BlockRetrieverOptions - fsOpts Options - logger *zap.Logger - queryLimits limits.QueryLimits - bytesReadLimit limits.LookbackLimit + opts BlockRetrieverOptions + fsOpts Options + logger *zap.Logger + queryLimits limits.QueryLimits + bytesReadLimit limits.LookbackLimit + seriesReadCount tally.Counter newSeekerMgrFn newSeekerMgrFn @@ -121,18 +123,21 @@ func NewBlockRetriever( return nil, err } + scope := fsOpts.InstrumentOptions().MetricsScope().SubScope("retriever") + return &blockRetriever{ - opts: opts, - fsOpts: fsOpts, - logger: fsOpts.InstrumentOptions().Logger(), - queryLimits: opts.QueryLimits(), - bytesReadLimit: opts.QueryLimits().BytesReadLimit(), - newSeekerMgrFn: NewSeekerManager, - reqPool: opts.RetrieveRequestPool(), - bytesPool: opts.BytesPool(), - idPool: opts.IdentifierPool(), - status: blockRetrieverNotOpen, - notifyFetch: make(chan struct{}, 1), + opts: opts, + fsOpts: fsOpts, + logger: fsOpts.InstrumentOptions().Logger(), + queryLimits: opts.QueryLimits(), + bytesReadLimit: opts.QueryLimits().BytesReadLimit(), + seriesReadCount: scope.Counter("series-read"), + newSeekerMgrFn: NewSeekerManager, + reqPool: opts.RetrieveRequestPool(), + bytesPool: opts.BytesPool(), + idPool: opts.IdentifierPool(), + status: blockRetrieverNotOpen, + notifyFetch: make(chan struct{}, 1), // We just close this channel when the fetchLoops should shutdown, so no // buffering is required fetchLoopsShouldShutdownCh: make(chan struct{}), @@ -564,6 +569,11 @@ func (r *blockRetriever) streamRequest( startTime time.Time, nsCtx namespace.Context, ) (bool, error) { + req.resultWg.Add(1) + r.seriesReadCount.Inc(1) + if err := r.queryLimits.DiskSeriesReadLimit().Inc(1, req.source); err != nil { + return false, err + } req.shard = shard // NB(r): If the ID is a ident.BytesID then we can just hold @@ -579,8 +589,6 @@ func (r *blockRetriever) streamRequest( req.start = startTime req.blockSize = r.blockSize - req.resultWg.Add(1) - // Ensure to finalize at the end of request ctx.RegisterFinalizer(req) diff --git a/src/dbnode/persist/fs/retriever_test.go b/src/dbnode/persist/fs/retriever_test.go index 07754dc90e..56195a5d9c 100644 --- a/src/dbnode/persist/fs/retriever_test.go +++ b/src/dbnode/persist/fs/retriever_test.go @@ -34,23 +34,25 @@ import ( "github.com/m3db/m3/src/cluster/shard" "github.com/m3db/m3/src/dbnode/digest" + "github.com/m3db/m3/src/dbnode/namespace" "github.com/m3db/m3/src/dbnode/persist" "github.com/m3db/m3/src/dbnode/retention" "github.com/m3db/m3/src/dbnode/sharding" "github.com/m3db/m3/src/dbnode/storage/block" "github.com/m3db/m3/src/dbnode/storage/index/convert" + "github.com/m3db/m3/src/dbnode/storage/limits" "github.com/m3db/m3/src/dbnode/ts" "github.com/m3db/m3/src/dbnode/x/xio" "github.com/m3db/m3/src/x/checked" "github.com/m3db/m3/src/x/context" "github.com/m3db/m3/src/x/ident" + "github.com/m3db/m3/src/x/instrument" "github.com/m3db/m3/src/x/pool" xsync "github.com/m3db/m3/src/x/sync" xtime "github.com/m3db/m3/src/x/time" "github.com/fortytw2/leaktest" "github.com/golang/mock/gomock" - "github.com/m3db/m3/src/dbnode/namespace" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -799,6 +801,30 @@ func TestBlockRetrieverHandlesSeekByIndexEntryErrors(t *testing.T) { testBlockRetrieverHandlesSeekErrors(t, ctrl, mockSeeker) } +func TestLimitSeriesReadFromDisk(t *testing.T) { + limitOpts := limits.NewOptions(). + SetInstrumentOptions(instrument.NewOptions()). + SetBytesReadLimitOpts(limits.DefaultLookbackLimitOptions()). + SetDocsLimitOpts(limits.DefaultLookbackLimitOptions()). + SetDiskSeriesReadLimitOpts(limits.LookbackLimitOptions{ + Limit: 1, + Lookback: time.Second * 1, + }) + queryLimits, err := limits.NewQueryLimits(limitOpts) + require.NoError(t, err) + opts := NewBlockRetrieverOptions(). + SetBlockLeaseManager(&block.NoopLeaseManager{}). + SetQueryLimits(queryLimits) + publicRetriever, err := NewBlockRetriever(opts, NewOptions()) + require.NoError(t, err) + req := &retrieveRequest{} + retriever := publicRetriever.(*blockRetriever) + _, _ = retriever.streamRequest(context.NewContext(), req, 0, ident.StringID("id"), time.Now(), namespace.Context{}) + _, err = retriever.streamRequest(context.NewContext(), req, 0, ident.StringID("id"), time.Now(), namespace.Context{}) + require.Error(t, err) + require.Contains(t, err.Error(), "query aborted due to limit") +} + var errSeekErr = errors.New("some-error") func testBlockRetrieverHandlesSeekErrors(t *testing.T, ctrl *gomock.Controller, mockSeeker ConcurrentDataFileSetSeeker) { diff --git a/src/dbnode/server/server.go b/src/dbnode/server/server.go index ef21426ef7..7b50475ec1 100644 --- a/src/dbnode/server/server.go +++ b/src/dbnode/server/server.go @@ -450,6 +450,7 @@ func Run(runOpts RunOptions) { // Setup query stats tracking. docsLimit := limits.DefaultLookbackLimitOptions() bytesReadLimit := limits.DefaultLookbackLimitOptions() + diskSeriesReadLimit := limits.DefaultLookbackLimitOptions() if limitConfig := runOpts.Config.Limits.MaxRecentlyQueriedSeriesBlocks; limitConfig != nil { docsLimit.Limit = limitConfig.Value docsLimit.Lookback = limitConfig.Lookback @@ -458,9 +459,14 @@ func Run(runOpts RunOptions) { bytesReadLimit.Limit = limitConfig.Value bytesReadLimit.Lookback = limitConfig.Lookback } + if limitConfig := runOpts.Config.Limits.MaxRecentlyQueriedSeriesDiskRead; limitConfig != nil { + diskSeriesReadLimit.Limit = limitConfig.Value + diskSeriesReadLimit.Lookback = limitConfig.Lookback + } limitOpts := limits.NewOptions(). SetDocsLimitOpts(docsLimit). SetBytesReadLimitOpts(bytesReadLimit). + SetDiskSeriesReadLimitOpts(diskSeriesReadLimit). SetInstrumentOptions(iOpts) if builder := opts.SourceLoggerBuilder(); builder != nil { limitOpts = limitOpts.SetSourceLoggerBuilder(builder) diff --git a/src/dbnode/storage/limits/noop_query_limits.go b/src/dbnode/storage/limits/noop_query_limits.go index 672ea8d52d..cf0a9497a6 100644 --- a/src/dbnode/storage/limits/noop_query_limits.go +++ b/src/dbnode/storage/limits/noop_query_limits.go @@ -44,6 +44,10 @@ func (q *noOpQueryLimits) BytesReadLimit() LookbackLimit { return &noOpLookbackLimit{} } +func (q *noOpQueryLimits) DiskSeriesReadLimit() LookbackLimit { + return &noOpLookbackLimit{} +} + func (q *noOpQueryLimits) AnyExceeded() error { return nil } diff --git a/src/dbnode/storage/limits/options.go b/src/dbnode/storage/limits/options.go index 5f8c49a89b..cc82c121aa 100644 --- a/src/dbnode/storage/limits/options.go +++ b/src/dbnode/storage/limits/options.go @@ -28,10 +28,11 @@ import ( ) type limitOpts struct { - iOpts instrument.Options - docsLimitOpts LookbackLimitOptions - bytesReadLimitOpts LookbackLimitOptions - sourceLoggerBuilder SourceLoggerBuilder + iOpts instrument.Options + docsLimitOpts LookbackLimitOptions + bytesReadLimitOpts LookbackLimitOptions + diskSeriesReadLimitOpts LookbackLimitOptions + sourceLoggerBuilder SourceLoggerBuilder } // NewOptions creates limit options with default values. @@ -94,6 +95,18 @@ func (o *limitOpts) BytesReadLimitOpts() LookbackLimitOptions { return o.bytesReadLimitOpts } +// SetDiskSeriesReadLimitOpts sets the disk ts read limit options. +func (o *limitOpts) SetDiskSeriesReadLimitOpts(value LookbackLimitOptions) Options { + opts := *o + opts.diskSeriesReadLimitOpts = value + return &opts +} + +// DiskSeriesReadLimitOpts returns the disk ts read limit options. +func (o *limitOpts) DiskSeriesReadLimitOpts() LookbackLimitOptions { + return o.diskSeriesReadLimitOpts +} + // SetSourceLoggerBuilder sets the source logger. func (o *limitOpts) SetSourceLoggerBuilder(value SourceLoggerBuilder) Options { opts := *o diff --git a/src/dbnode/storage/limits/query_limits.go b/src/dbnode/storage/limits/query_limits.go index 44643d40ec..3b4201729c 100644 --- a/src/dbnode/storage/limits/query_limits.go +++ b/src/dbnode/storage/limits/query_limits.go @@ -36,8 +36,9 @@ const ( ) type queryLimits struct { - docsLimit *lookbackLimit - bytesReadLimit *lookbackLimit + docsLimit *lookbackLimit + bytesReadLimit *lookbackLimit + seriesDiskReadLimit *lookbackLimit } type lookbackLimit struct { @@ -72,31 +73,30 @@ func DefaultLookbackLimitOptions() LookbackLimitOptions { } // NewQueryLimits returns a new query limits manager. -func NewQueryLimits( - options Options, - // docsLimitOpts LookbackLimitOptions, - // bytesReadLimitOpts LookbackLimitOptions, - // instrumentOpts instrument.Options, -) (QueryLimits, error) { +func NewQueryLimits(options Options) (QueryLimits, error) { if err := options.Validate(); err != nil { return nil, err } var ( - iOpts = options.InstrumentOptions() - docsLimitOpts = options.DocsLimitOpts() - bytesReadLimitOpts = options.BytesReadLimitOpts() - sourceLoggerBuilder = options.SourceLoggerBuilder() + iOpts = options.InstrumentOptions() + docsLimitOpts = options.DocsLimitOpts() + bytesReadLimitOpts = options.BytesReadLimitOpts() + diskSeriesReadLimitOpts = options.DiskSeriesReadLimitOpts() + sourceLoggerBuilder = options.SourceLoggerBuilder() docsLimit = newLookbackLimit( iOpts, docsLimitOpts, "docs-matched", sourceLoggerBuilder) bytesReadLimit = newLookbackLimit( iOpts, bytesReadLimitOpts, "disk-bytes-read", sourceLoggerBuilder) + seriesDiskReadLimit = newLookbackLimit( + iOpts, diskSeriesReadLimitOpts, "disk-series-read", sourceLoggerBuilder) ) return &queryLimits{ - docsLimit: docsLimit, - bytesReadLimit: bytesReadLimit, + docsLimit: docsLimit, + bytesReadLimit: bytesReadLimit, + seriesDiskReadLimit: seriesDiskReadLimit, }, nil } @@ -145,13 +145,19 @@ func (q *queryLimits) BytesReadLimit() LookbackLimit { return q.bytesReadLimit } +func (q *queryLimits) DiskSeriesReadLimit() LookbackLimit { + return q.seriesDiskReadLimit +} + func (q *queryLimits) Start() { q.docsLimit.start() + q.seriesDiskReadLimit.start() q.bytesReadLimit.start() } func (q *queryLimits) Stop() { q.docsLimit.stop() + q.seriesDiskReadLimit.stop() q.bytesReadLimit.stop() } @@ -159,6 +165,9 @@ func (q *queryLimits) AnyExceeded() error { if err := q.docsLimit.exceeded(); err != nil { return err } + if err := q.seriesDiskReadLimit.exceeded(); err != nil { + return err + } return q.bytesReadLimit.exceeded() } diff --git a/src/dbnode/storage/limits/query_limits_test.go b/src/dbnode/storage/limits/query_limits_test.go index 7739812c62..23ee91c2ef 100644 --- a/src/dbnode/storage/limits/query_limits_test.go +++ b/src/dbnode/storage/limits/query_limits_test.go @@ -37,11 +37,13 @@ import ( func testQueryLimitOptions( docOpts LookbackLimitOptions, bytesOpts LookbackLimitOptions, + seriesOpts LookbackLimitOptions, iOpts instrument.Options, ) Options { return NewOptions(). SetDocsLimitOpts(docOpts). SetBytesReadLimitOpts(bytesOpts). + SetDiskSeriesReadLimitOpts(seriesOpts). SetInstrumentOptions(iOpts) } @@ -54,7 +56,11 @@ func TestQueryLimits(t *testing.T) { Limit: 1, Lookback: time.Second, } - opts := testQueryLimitOptions(docOpts, bytesOpts, instrument.NewOptions()) + seriesOpts := LookbackLimitOptions{ + Limit: 1, + Lookback: time.Second, + } + opts := testQueryLimitOptions(docOpts, bytesOpts, seriesOpts, instrument.NewOptions()) queryLimits, err := NewQueryLimits(opts) require.NoError(t, err) require.NotNil(t, queryLimits) @@ -69,7 +75,7 @@ func TestQueryLimits(t *testing.T) { require.True(t, xerrors.IsInvalidParams(err)) require.True(t, IsQueryLimitExceededError(err)) - opts = testQueryLimitOptions(docOpts, bytesOpts, instrument.NewOptions()) + opts = testQueryLimitOptions(docOpts, bytesOpts, seriesOpts, instrument.NewOptions()) queryLimits, err = NewQueryLimits(opts) require.NoError(t, err) require.NotNil(t, queryLimits) @@ -84,6 +90,22 @@ func TestQueryLimits(t *testing.T) { require.Error(t, err) require.True(t, xerrors.IsInvalidParams(err)) require.True(t, IsQueryLimitExceededError(err)) + + opts = testQueryLimitOptions(docOpts, bytesOpts, seriesOpts, instrument.NewOptions()) + queryLimits, err = NewQueryLimits(opts) + require.NoError(t, err) + require.NotNil(t, queryLimits) + + // No error yet. + err = queryLimits.AnyExceeded() + require.NoError(t, err) + + // Limit from bytes. + require.Error(t, queryLimits.DiskSeriesReadLimit().Inc(2, nil)) + err = queryLimits.AnyExceeded() + require.Error(t, err) + require.True(t, xerrors.IsInvalidParams(err)) + require.True(t, IsQueryLimitExceededError(err)) } func TestLookbackLimit(t *testing.T) { @@ -298,7 +320,7 @@ func TestSourceLogger(t *testing.T) { } builder = &testBuilder{records: []testLoggerRecord{}} - opts = testQueryLimitOptions(noLimit, noLimit, iOpts). + opts = testQueryLimitOptions(noLimit, noLimit, noLimit, iOpts). SetSourceLoggerBuilder(builder) ) diff --git a/src/dbnode/storage/limits/types.go b/src/dbnode/storage/limits/types.go index d38db00452..aa87c493bf 100644 --- a/src/dbnode/storage/limits/types.go +++ b/src/dbnode/storage/limits/types.go @@ -39,6 +39,8 @@ type QueryLimits interface { DocsLimit() LookbackLimit // BytesReadLimit limits queries by a global concurrent count of bytes read from disk. BytesReadLimit() LookbackLimit + // DiskSeriesReadLimit limits queries by a global concurrent count of time series read from disk. + DiskSeriesReadLimit() LookbackLimit // AnyExceeded returns an error if any of the query limits are exceeded. AnyExceeded() error @@ -97,6 +99,12 @@ type Options interface { // BytesReadLimitOpts returns the byte read limit options. BytesReadLimitOpts() LookbackLimitOptions + // SetDiskSeriesReadLimitOpts sets the disk series read limit options. + SetDiskSeriesReadLimitOpts(value LookbackLimitOptions) Options + + // DiskSeriesReadLimitOpts returns the disk series read limit options. + DiskSeriesReadLimitOpts() LookbackLimitOptions + // SetSourceLoggerBuilder sets the source logger. SetSourceLoggerBuilder(value SourceLoggerBuilder) Options