Skip to content

Commit

Permalink
Limit for time series read from disk
Browse files Browse the repository at this point in the history
There is additional memory costs when time series need to be loaded from
disk (in addition to the actual bytes being read). This new limit allows
docLimits to be higher so series already in memory can easily be served
without hitting limits.
  • Loading branch information
ryanhall07 committed Jan 15, 2021
1 parent 5b4f793 commit e72a29f
Show file tree
Hide file tree
Showing 9 changed files with 142 additions and 40 deletions.
6 changes: 6 additions & 0 deletions src/cmd/services/m3dbnode/config/limits.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,12 @@ type LimitsConfiguration struct {
// max is surpassed encounter an error.
MaxRecentlyQueriedSeriesDiskBytesRead *MaxRecentQueryResourceLimitConfiguration `yaml:"maxRecentlyQueriedSeriesDiskBytesRead"`

// MaxRecentlyQueriedSeriesDiskRead sets the upper limit on time series read from disk within a given lookback
// period. Queries which are issued while this max is surpassed encounter an error.
// This is the number of time series, which is different from the number of bytes controlled by
// MaxRecentlyQueriedSeriesDiskBytesRead.
MaxRecentlyQueriedSeriesDiskRead *MaxRecentQueryResourceLimitConfiguration `yaml:"maxRecentlyQueriedSeriesDiskRead"`

// MaxRecentlyQueriedSeriesBlocks sets the upper limit on time series blocks
// count within a given lookback period. Queries which are issued while this
// max is surpassed encounter an error.
Expand Down
44 changes: 26 additions & 18 deletions src/dbnode/persist/fs/retriever.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ import (
"github.com/m3db/m3/src/x/ident"
"github.com/m3db/m3/src/x/pool"

"github.com/uber-go/tally"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -88,11 +89,12 @@ const (
type blockRetriever struct {
sync.RWMutex

opts BlockRetrieverOptions
fsOpts Options
logger *zap.Logger
queryLimits limits.QueryLimits
bytesReadLimit limits.LookbackLimit
opts BlockRetrieverOptions
fsOpts Options
logger *zap.Logger
queryLimits limits.QueryLimits
bytesReadLimit limits.LookbackLimit
seriesReadCount tally.Counter

newSeekerMgrFn newSeekerMgrFn

Expand Down Expand Up @@ -121,18 +123,21 @@ func NewBlockRetriever(
return nil, err
}

scope := fsOpts.InstrumentOptions().MetricsScope().SubScope("retriever")

return &blockRetriever{
opts: opts,
fsOpts: fsOpts,
logger: fsOpts.InstrumentOptions().Logger(),
queryLimits: opts.QueryLimits(),
bytesReadLimit: opts.QueryLimits().BytesReadLimit(),
newSeekerMgrFn: NewSeekerManager,
reqPool: opts.RetrieveRequestPool(),
bytesPool: opts.BytesPool(),
idPool: opts.IdentifierPool(),
status: blockRetrieverNotOpen,
notifyFetch: make(chan struct{}, 1),
opts: opts,
fsOpts: fsOpts,
logger: fsOpts.InstrumentOptions().Logger(),
queryLimits: opts.QueryLimits(),
bytesReadLimit: opts.QueryLimits().BytesReadLimit(),
seriesReadCount: scope.Counter("series-read"),
newSeekerMgrFn: NewSeekerManager,
reqPool: opts.RetrieveRequestPool(),
bytesPool: opts.BytesPool(),
idPool: opts.IdentifierPool(),
status: blockRetrieverNotOpen,
notifyFetch: make(chan struct{}, 1),
// We just close this channel when the fetchLoops should shutdown, so no
// buffering is required
fetchLoopsShouldShutdownCh: make(chan struct{}),
Expand Down Expand Up @@ -564,6 +569,11 @@ func (r *blockRetriever) streamRequest(
startTime time.Time,
nsCtx namespace.Context,
) (bool, error) {
req.resultWg.Add(1)
r.seriesReadCount.Inc(1)
if err := r.queryLimits.DiskSeriesReadLimit().Inc(1, req.source); err != nil {
return false, err
}
req.shard = shard

// NB(r): If the ID is a ident.BytesID then we can just hold
Expand All @@ -579,8 +589,6 @@ func (r *blockRetriever) streamRequest(
req.start = startTime
req.blockSize = r.blockSize

req.resultWg.Add(1)

// Ensure to finalize at the end of request
ctx.RegisterFinalizer(req)

Expand Down
28 changes: 27 additions & 1 deletion src/dbnode/persist/fs/retriever_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,23 +34,25 @@ import (

"github.com/m3db/m3/src/cluster/shard"
"github.com/m3db/m3/src/dbnode/digest"
"github.com/m3db/m3/src/dbnode/namespace"
"github.com/m3db/m3/src/dbnode/persist"
"github.com/m3db/m3/src/dbnode/retention"
"github.com/m3db/m3/src/dbnode/sharding"
"github.com/m3db/m3/src/dbnode/storage/block"
"github.com/m3db/m3/src/dbnode/storage/index/convert"
"github.com/m3db/m3/src/dbnode/storage/limits"
"github.com/m3db/m3/src/dbnode/ts"
"github.com/m3db/m3/src/dbnode/x/xio"
"github.com/m3db/m3/src/x/checked"
"github.com/m3db/m3/src/x/context"
"github.com/m3db/m3/src/x/ident"
"github.com/m3db/m3/src/x/instrument"
"github.com/m3db/m3/src/x/pool"
xsync "github.com/m3db/m3/src/x/sync"
xtime "github.com/m3db/m3/src/x/time"

"github.com/fortytw2/leaktest"
"github.com/golang/mock/gomock"
"github.com/m3db/m3/src/dbnode/namespace"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -799,6 +801,30 @@ func TestBlockRetrieverHandlesSeekByIndexEntryErrors(t *testing.T) {
testBlockRetrieverHandlesSeekErrors(t, ctrl, mockSeeker)
}

func TestLimitSeriesReadFromDisk(t *testing.T) {
limitOpts := limits.NewOptions().
SetInstrumentOptions(instrument.NewOptions()).
SetBytesReadLimitOpts(limits.DefaultLookbackLimitOptions()).
SetDocsLimitOpts(limits.DefaultLookbackLimitOptions()).
SetDiskSeriesReadLimitOpts(limits.LookbackLimitOptions{
Limit: 1,
Lookback: time.Second * 1,
})
queryLimits, err := limits.NewQueryLimits(limitOpts)
require.NoError(t, err)
opts := NewBlockRetrieverOptions().
SetBlockLeaseManager(&block.NoopLeaseManager{}).
SetQueryLimits(queryLimits)
publicRetriever, err := NewBlockRetriever(opts, NewOptions())
require.NoError(t, err)
req := &retrieveRequest{}
retriever := publicRetriever.(*blockRetriever)
_, _ = retriever.streamRequest(context.NewContext(), req, 0, ident.StringID("id"), time.Now(), namespace.Context{})
_, err = retriever.streamRequest(context.NewContext(), req, 0, ident.StringID("id"), time.Now(), namespace.Context{})
require.Error(t, err)
require.Contains(t, err.Error(), "query aborted due to limit")
}

var errSeekErr = errors.New("some-error")

func testBlockRetrieverHandlesSeekErrors(t *testing.T, ctrl *gomock.Controller, mockSeeker ConcurrentDataFileSetSeeker) {
Expand Down
6 changes: 6 additions & 0 deletions src/dbnode/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -450,6 +450,7 @@ func Run(runOpts RunOptions) {
// Setup query stats tracking.
docsLimit := limits.DefaultLookbackLimitOptions()
bytesReadLimit := limits.DefaultLookbackLimitOptions()
diskSeriesReadLimit := limits.DefaultLookbackLimitOptions()
if limitConfig := runOpts.Config.Limits.MaxRecentlyQueriedSeriesBlocks; limitConfig != nil {
docsLimit.Limit = limitConfig.Value
docsLimit.Lookback = limitConfig.Lookback
Expand All @@ -458,9 +459,14 @@ func Run(runOpts RunOptions) {
bytesReadLimit.Limit = limitConfig.Value
bytesReadLimit.Lookback = limitConfig.Lookback
}
if limitConfig := runOpts.Config.Limits.MaxRecentlyQueriedSeriesDiskRead; limitConfig != nil {
diskSeriesReadLimit.Limit = limitConfig.Value
diskSeriesReadLimit.Lookback = limitConfig.Lookback
}
limitOpts := limits.NewOptions().
SetDocsLimitOpts(docsLimit).
SetBytesReadLimitOpts(bytesReadLimit).
SetDiskSeriesReadLimitOpts(diskSeriesReadLimit).
SetInstrumentOptions(iOpts)
if builder := opts.SourceLoggerBuilder(); builder != nil {
limitOpts = limitOpts.SetSourceLoggerBuilder(builder)
Expand Down
4 changes: 4 additions & 0 deletions src/dbnode/storage/limits/noop_query_limits.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ func (q *noOpQueryLimits) BytesReadLimit() LookbackLimit {
return &noOpLookbackLimit{}
}

func (q *noOpQueryLimits) DiskSeriesReadLimit() LookbackLimit {
return &noOpLookbackLimit{}
}

func (q *noOpQueryLimits) AnyExceeded() error {
return nil
}
Expand Down
21 changes: 17 additions & 4 deletions src/dbnode/storage/limits/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,11 @@ import (
)

type limitOpts struct {
iOpts instrument.Options
docsLimitOpts LookbackLimitOptions
bytesReadLimitOpts LookbackLimitOptions
sourceLoggerBuilder SourceLoggerBuilder
iOpts instrument.Options
docsLimitOpts LookbackLimitOptions
bytesReadLimitOpts LookbackLimitOptions
diskSeriesReadLimitOpts LookbackLimitOptions
sourceLoggerBuilder SourceLoggerBuilder
}

// NewOptions creates limit options with default values.
Expand Down Expand Up @@ -94,6 +95,18 @@ func (o *limitOpts) BytesReadLimitOpts() LookbackLimitOptions {
return o.bytesReadLimitOpts
}

// SetDiskSeriesReadLimitOpts sets the disk ts read limit options.
func (o *limitOpts) SetDiskSeriesReadLimitOpts(value LookbackLimitOptions) Options {
opts := *o
opts.diskSeriesReadLimitOpts = value
return &opts
}

// DiskSeriesReadLimitOpts returns the disk ts read limit options.
func (o *limitOpts) DiskSeriesReadLimitOpts() LookbackLimitOptions {
return o.diskSeriesReadLimitOpts
}

// SetSourceLoggerBuilder sets the source logger.
func (o *limitOpts) SetSourceLoggerBuilder(value SourceLoggerBuilder) Options {
opts := *o
Expand Down
37 changes: 23 additions & 14 deletions src/dbnode/storage/limits/query_limits.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,9 @@ const (
)

type queryLimits struct {
docsLimit *lookbackLimit
bytesReadLimit *lookbackLimit
docsLimit *lookbackLimit
bytesReadLimit *lookbackLimit
seriesDiskReadLimit *lookbackLimit
}

type lookbackLimit struct {
Expand Down Expand Up @@ -72,31 +73,30 @@ func DefaultLookbackLimitOptions() LookbackLimitOptions {
}

// NewQueryLimits returns a new query limits manager.
func NewQueryLimits(
options Options,
// docsLimitOpts LookbackLimitOptions,
// bytesReadLimitOpts LookbackLimitOptions,
// instrumentOpts instrument.Options,
) (QueryLimits, error) {
func NewQueryLimits(options Options) (QueryLimits, error) {
if err := options.Validate(); err != nil {
return nil, err
}

var (
iOpts = options.InstrumentOptions()
docsLimitOpts = options.DocsLimitOpts()
bytesReadLimitOpts = options.BytesReadLimitOpts()
sourceLoggerBuilder = options.SourceLoggerBuilder()
iOpts = options.InstrumentOptions()
docsLimitOpts = options.DocsLimitOpts()
bytesReadLimitOpts = options.BytesReadLimitOpts()
diskSeriesReadLimitOpts = options.DiskSeriesReadLimitOpts()
sourceLoggerBuilder = options.SourceLoggerBuilder()

docsLimit = newLookbackLimit(
iOpts, docsLimitOpts, "docs-matched", sourceLoggerBuilder)
bytesReadLimit = newLookbackLimit(
iOpts, bytesReadLimitOpts, "disk-bytes-read", sourceLoggerBuilder)
seriesDiskReadLimit = newLookbackLimit(
iOpts, diskSeriesReadLimitOpts, "disk-series-read", sourceLoggerBuilder)
)

return &queryLimits{
docsLimit: docsLimit,
bytesReadLimit: bytesReadLimit,
docsLimit: docsLimit,
bytesReadLimit: bytesReadLimit,
seriesDiskReadLimit: seriesDiskReadLimit,
}, nil
}

Expand Down Expand Up @@ -145,20 +145,29 @@ func (q *queryLimits) BytesReadLimit() LookbackLimit {
return q.bytesReadLimit
}

func (q *queryLimits) DiskSeriesReadLimit() LookbackLimit {
return q.seriesDiskReadLimit
}

func (q *queryLimits) Start() {
q.docsLimit.start()
q.seriesDiskReadLimit.start()
q.bytesReadLimit.start()
}

func (q *queryLimits) Stop() {
q.docsLimit.stop()
q.seriesDiskReadLimit.stop()
q.bytesReadLimit.stop()
}

func (q *queryLimits) AnyExceeded() error {
if err := q.docsLimit.exceeded(); err != nil {
return err
}
if err := q.seriesDiskReadLimit.exceeded(); err != nil {
return err
}
return q.bytesReadLimit.exceeded()
}

Expand Down
28 changes: 25 additions & 3 deletions src/dbnode/storage/limits/query_limits_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,13 @@ import (
func testQueryLimitOptions(
docOpts LookbackLimitOptions,
bytesOpts LookbackLimitOptions,
seriesOpts LookbackLimitOptions,
iOpts instrument.Options,
) Options {
return NewOptions().
SetDocsLimitOpts(docOpts).
SetBytesReadLimitOpts(bytesOpts).
SetDiskSeriesReadLimitOpts(seriesOpts).
SetInstrumentOptions(iOpts)
}

Expand All @@ -54,7 +56,11 @@ func TestQueryLimits(t *testing.T) {
Limit: 1,
Lookback: time.Second,
}
opts := testQueryLimitOptions(docOpts, bytesOpts, instrument.NewOptions())
seriesOpts := LookbackLimitOptions{
Limit: 1,
Lookback: time.Second,
}
opts := testQueryLimitOptions(docOpts, bytesOpts, seriesOpts, instrument.NewOptions())
queryLimits, err := NewQueryLimits(opts)
require.NoError(t, err)
require.NotNil(t, queryLimits)
Expand All @@ -69,7 +75,7 @@ func TestQueryLimits(t *testing.T) {
require.True(t, xerrors.IsInvalidParams(err))
require.True(t, IsQueryLimitExceededError(err))

opts = testQueryLimitOptions(docOpts, bytesOpts, instrument.NewOptions())
opts = testQueryLimitOptions(docOpts, bytesOpts, seriesOpts, instrument.NewOptions())
queryLimits, err = NewQueryLimits(opts)
require.NoError(t, err)
require.NotNil(t, queryLimits)
Expand All @@ -84,6 +90,22 @@ func TestQueryLimits(t *testing.T) {
require.Error(t, err)
require.True(t, xerrors.IsInvalidParams(err))
require.True(t, IsQueryLimitExceededError(err))

opts = testQueryLimitOptions(docOpts, bytesOpts, seriesOpts, instrument.NewOptions())
queryLimits, err = NewQueryLimits(opts)
require.NoError(t, err)
require.NotNil(t, queryLimits)

// No error yet.
err = queryLimits.AnyExceeded()
require.NoError(t, err)

// Limit from bytes.
require.Error(t, queryLimits.DiskSeriesReadLimit().Inc(2, nil))
err = queryLimits.AnyExceeded()
require.Error(t, err)
require.True(t, xerrors.IsInvalidParams(err))
require.True(t, IsQueryLimitExceededError(err))
}

func TestLookbackLimit(t *testing.T) {
Expand Down Expand Up @@ -298,7 +320,7 @@ func TestSourceLogger(t *testing.T) {
}

builder = &testBuilder{records: []testLoggerRecord{}}
opts = testQueryLimitOptions(noLimit, noLimit, iOpts).
opts = testQueryLimitOptions(noLimit, noLimit, noLimit, iOpts).
SetSourceLoggerBuilder(builder)
)

Expand Down
8 changes: 8 additions & 0 deletions src/dbnode/storage/limits/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ type QueryLimits interface {
DocsLimit() LookbackLimit
// BytesReadLimit limits queries by a global concurrent count of bytes read from disk.
BytesReadLimit() LookbackLimit
// DiskSeriesReadLimit limits queries by a global concurrent count of time series read from disk.
DiskSeriesReadLimit() LookbackLimit

// AnyExceeded returns an error if any of the query limits are exceeded.
AnyExceeded() error
Expand Down Expand Up @@ -97,6 +99,12 @@ type Options interface {
// BytesReadLimitOpts returns the byte read limit options.
BytesReadLimitOpts() LookbackLimitOptions

// SetDiskSeriesReadLimitOpts sets the disk series read limit options.
SetDiskSeriesReadLimitOpts(value LookbackLimitOptions) Options

// DiskSeriesReadLimitOpts returns the disk series read limit options.
DiskSeriesReadLimitOpts() LookbackLimitOptions

// SetSourceLoggerBuilder sets the source logger.
SetSourceLoggerBuilder(value SourceLoggerBuilder) Options

Expand Down

0 comments on commit e72a29f

Please sign in to comment.