-
Notifications
You must be signed in to change notification settings - Fork 453
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Limit for time series read from disk #3094
Changes from all commits
e72a29f
e2b479c
df01e9b
c5e0951
4b38603
3d21e49
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -50,6 +50,7 @@ import ( | |
"github.com/m3db/m3/src/x/ident" | ||
"github.com/m3db/m3/src/x/pool" | ||
|
||
"github.com/uber-go/tally" | ||
"go.uber.org/zap" | ||
) | ||
|
||
|
@@ -88,11 +89,12 @@ const ( | |
type blockRetriever struct { | ||
sync.RWMutex | ||
|
||
opts BlockRetrieverOptions | ||
fsOpts Options | ||
logger *zap.Logger | ||
queryLimits limits.QueryLimits | ||
bytesReadLimit limits.LookbackLimit | ||
opts BlockRetrieverOptions | ||
fsOpts Options | ||
logger *zap.Logger | ||
queryLimits limits.QueryLimits | ||
bytesReadLimit limits.LookbackLimit | ||
seriesReadCount tally.Counter | ||
|
||
newSeekerMgrFn newSeekerMgrFn | ||
|
||
|
@@ -121,18 +123,21 @@ func NewBlockRetriever( | |
return nil, err | ||
} | ||
|
||
scope := fsOpts.InstrumentOptions().MetricsScope().SubScope("retriever") | ||
|
||
return &blockRetriever{ | ||
opts: opts, | ||
fsOpts: fsOpts, | ||
logger: fsOpts.InstrumentOptions().Logger(), | ||
queryLimits: opts.QueryLimits(), | ||
bytesReadLimit: opts.QueryLimits().BytesReadLimit(), | ||
newSeekerMgrFn: NewSeekerManager, | ||
reqPool: opts.RetrieveRequestPool(), | ||
bytesPool: opts.BytesPool(), | ||
idPool: opts.IdentifierPool(), | ||
status: blockRetrieverNotOpen, | ||
notifyFetch: make(chan struct{}, 1), | ||
opts: opts, | ||
fsOpts: fsOpts, | ||
logger: fsOpts.InstrumentOptions().Logger(), | ||
queryLimits: opts.QueryLimits(), | ||
bytesReadLimit: opts.QueryLimits().BytesReadLimit(), | ||
seriesReadCount: scope.Counter("series-read"), | ||
newSeekerMgrFn: NewSeekerManager, | ||
reqPool: opts.RetrieveRequestPool(), | ||
bytesPool: opts.BytesPool(), | ||
idPool: opts.IdentifierPool(), | ||
status: blockRetrieverNotOpen, | ||
notifyFetch: make(chan struct{}, 1), | ||
// We just close this channel when the fetchLoops should shutdown, so no | ||
// buffering is required | ||
fetchLoopsShouldShutdownCh: make(chan struct{}), | ||
|
@@ -564,6 +569,11 @@ func (r *blockRetriever) streamRequest( | |
startTime time.Time, | ||
nsCtx namespace.Context, | ||
) (bool, error) { | ||
req.resultWg.Add(1) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. any significant in moving this line up? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yea since the function returns earlier now, it might block a reader. while a caller shouldn't be waiting on a result that returned an error, we saw it happen when testing. |
||
r.seriesReadCount.Inc(1) | ||
if err := r.queryLimits.DiskSeriesReadLimit().Inc(1, req.source); err != nil { | ||
return false, err | ||
} | ||
req.shard = shard | ||
|
||
// NB(r): If the ID is a ident.BytesID then we can just hold | ||
|
@@ -579,8 +589,6 @@ func (r *blockRetriever) streamRequest( | |
req.start = startTime | ||
req.blockSize = r.blockSize | ||
|
||
req.resultWg.Add(1) | ||
|
||
// Ensure to finalize at the end of request | ||
ctx.RegisterFinalizer(req) | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -32,25 +32,29 @@ import ( | |
"testing" | ||
"time" | ||
|
||
"github.com/uber-go/tally" | ||
|
||
"github.com/m3db/m3/src/cluster/shard" | ||
"github.com/m3db/m3/src/dbnode/digest" | ||
"github.com/m3db/m3/src/dbnode/namespace" | ||
"github.com/m3db/m3/src/dbnode/persist" | ||
"github.com/m3db/m3/src/dbnode/retention" | ||
"github.com/m3db/m3/src/dbnode/sharding" | ||
"github.com/m3db/m3/src/dbnode/storage/block" | ||
"github.com/m3db/m3/src/dbnode/storage/index/convert" | ||
"github.com/m3db/m3/src/dbnode/storage/limits" | ||
"github.com/m3db/m3/src/dbnode/ts" | ||
"github.com/m3db/m3/src/dbnode/x/xio" | ||
"github.com/m3db/m3/src/x/checked" | ||
"github.com/m3db/m3/src/x/context" | ||
"github.com/m3db/m3/src/x/ident" | ||
"github.com/m3db/m3/src/x/instrument" | ||
"github.com/m3db/m3/src/x/pool" | ||
xsync "github.com/m3db/m3/src/x/sync" | ||
xtime "github.com/m3db/m3/src/x/time" | ||
|
||
"github.com/fortytw2/leaktest" | ||
"github.com/golang/mock/gomock" | ||
"github.com/m3db/m3/src/dbnode/namespace" | ||
"github.com/stretchr/testify/assert" | ||
"github.com/stretchr/testify/require" | ||
) | ||
|
@@ -799,6 +803,38 @@ func TestBlockRetrieverHandlesSeekByIndexEntryErrors(t *testing.T) { | |
testBlockRetrieverHandlesSeekErrors(t, ctrl, mockSeeker) | ||
} | ||
|
||
func TestLimitSeriesReadFromDisk(t *testing.T) { | ||
scope := tally.NewTestScope("test", nil) | ||
limitOpts := limits.NewOptions(). | ||
SetInstrumentOptions(instrument.NewOptions().SetMetricsScope(scope)). | ||
SetBytesReadLimitOpts(limits.DefaultLookbackLimitOptions()). | ||
SetDocsLimitOpts(limits.DefaultLookbackLimitOptions()). | ||
SetDiskSeriesReadLimitOpts(limits.LookbackLimitOptions{ | ||
Limit: 1, | ||
Lookback: time.Second * 1, | ||
}) | ||
queryLimits, err := limits.NewQueryLimits(limitOpts) | ||
require.NoError(t, err) | ||
opts := NewBlockRetrieverOptions(). | ||
SetBlockLeaseManager(&block.NoopLeaseManager{}). | ||
SetQueryLimits(queryLimits) | ||
publicRetriever, err := NewBlockRetriever(opts, NewOptions(). | ||
SetInstrumentOptions(instrument.NewOptions().SetMetricsScope(scope))) | ||
require.NoError(t, err) | ||
req := &retrieveRequest{} | ||
retriever := publicRetriever.(*blockRetriever) | ||
_, _ = retriever.streamRequest(context.NewContext(), req, 0, ident.StringID("id"), time.Now(), namespace.Context{}) | ||
_, err = retriever.streamRequest(context.NewContext(), req, 0, ident.StringID("id"), time.Now(), namespace.Context{}) | ||
require.Error(t, err) | ||
require.Contains(t, err.Error(), "query aborted due to limit") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. could we assert on any metrics being incremented? |
||
|
||
snapshot := scope.Snapshot() | ||
seriesRead := snapshot.Counters()["test.retriever.series-read+"] | ||
require.Equal(t, int64(2), seriesRead.Value()) | ||
seriesLimit := snapshot.Counters()["test.query-limit.exceeded+limit=disk-series-read"] | ||
require.Equal(t, int64(1), seriesLimit.Value()) | ||
} | ||
|
||
var errSeekErr = errors.New("some-error") | ||
|
||
func testBlockRetrieverHandlesSeekErrors(t *testing.T, ctrl *gomock.Controller, mockSeeker ConcurrentDataFileSetSeeker) { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -37,11 +37,13 @@ import ( | |
func testQueryLimitOptions( | ||
docOpts LookbackLimitOptions, | ||
bytesOpts LookbackLimitOptions, | ||
seriesOpts LookbackLimitOptions, | ||
iOpts instrument.Options, | ||
) Options { | ||
return NewOptions(). | ||
SetDocsLimitOpts(docOpts). | ||
SetBytesReadLimitOpts(bytesOpts). | ||
SetDiskSeriesReadLimitOpts(seriesOpts). | ||
SetInstrumentOptions(iOpts) | ||
} | ||
|
||
|
@@ -54,7 +56,11 @@ func TestQueryLimits(t *testing.T) { | |
Limit: 1, | ||
Lookback: time.Second, | ||
} | ||
opts := testQueryLimitOptions(docOpts, bytesOpts, instrument.NewOptions()) | ||
seriesOpts := LookbackLimitOptions{ | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. should this be a separate test case? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. i decided to be consistent with the existing test, that was testing all the limits (docs + bytes) |
||
Limit: 1, | ||
Lookback: time.Second, | ||
} | ||
opts := testQueryLimitOptions(docOpts, bytesOpts, seriesOpts, instrument.NewOptions()) | ||
queryLimits, err := NewQueryLimits(opts) | ||
require.NoError(t, err) | ||
require.NotNil(t, queryLimits) | ||
|
@@ -69,7 +75,7 @@ func TestQueryLimits(t *testing.T) { | |
require.True(t, xerrors.IsInvalidParams(err)) | ||
require.True(t, IsQueryLimitExceededError(err)) | ||
|
||
opts = testQueryLimitOptions(docOpts, bytesOpts, instrument.NewOptions()) | ||
opts = testQueryLimitOptions(docOpts, bytesOpts, seriesOpts, instrument.NewOptions()) | ||
queryLimits, err = NewQueryLimits(opts) | ||
require.NoError(t, err) | ||
require.NotNil(t, queryLimits) | ||
|
@@ -84,6 +90,22 @@ func TestQueryLimits(t *testing.T) { | |
require.Error(t, err) | ||
require.True(t, xerrors.IsInvalidParams(err)) | ||
require.True(t, IsQueryLimitExceededError(err)) | ||
|
||
opts = testQueryLimitOptions(docOpts, bytesOpts, seriesOpts, instrument.NewOptions()) | ||
queryLimits, err = NewQueryLimits(opts) | ||
require.NoError(t, err) | ||
require.NotNil(t, queryLimits) | ||
|
||
// No error yet. | ||
err = queryLimits.AnyExceeded() | ||
require.NoError(t, err) | ||
|
||
// Limit from bytes. | ||
require.Error(t, queryLimits.DiskSeriesReadLimit().Inc(2, nil)) | ||
err = queryLimits.AnyExceeded() | ||
require.Error(t, err) | ||
require.True(t, xerrors.IsInvalidParams(err)) | ||
require.True(t, IsQueryLimitExceededError(err)) | ||
} | ||
|
||
func TestLookbackLimit(t *testing.T) { | ||
|
@@ -298,7 +320,7 @@ func TestSourceLogger(t *testing.T) { | |
} | ||
|
||
builder = &testBuilder{records: []testLoggerRecord{}} | ||
opts = testQueryLimitOptions(noLimit, noLimit, iOpts). | ||
opts = testQueryLimitOptions(noLimit, noLimit, noLimit, iOpts). | ||
SetSourceLoggerBuilder(builder) | ||
) | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: should this be "DiskSeriesRead" to fit more similarly to "DiskBytesRead"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SeriesDiskSeriesRead
just felt too weird