diff --git a/src/dbnode/client/fetch_state.go b/src/dbnode/client/fetch_state.go index 46ca48c846..b198b66580 100644 --- a/src/dbnode/client/fetch_state.go +++ b/src/dbnode/client/fetch_state.go @@ -241,7 +241,8 @@ func (f *fetchState) asEncodingSeriesIterators( return f.tagResultAccumulator.AsEncodingSeriesIterators(limit, pools, descr, opts) } -func (f *fetchState) asAggregatedTagsIterator(pools fetchTaggedPools) (AggregatedTagsIterator, FetchResponseMetadata, error) { +func (f *fetchState) asAggregatedTagsIterator(pools fetchTaggedPools, limit int) ( + AggregatedTagsIterator, FetchResponseMetadata, error) { f.Lock() defer f.Unlock() @@ -259,7 +260,9 @@ func (f *fetchState) asAggregatedTagsIterator(pools fetchTaggedPools) (Aggregate return nil, FetchResponseMetadata{}, err } - limit := f.aggregateOp.requestSeriesLimit(maxInt) + if limit == 0 { + limit = maxInt + } return f.tagResultAccumulator.AsAggregatedTagsIterator(limit, pools) } diff --git a/src/dbnode/client/session.go b/src/dbnode/client/session.go index 7eef51042e..7f77ba05de 100644 --- a/src/dbnode/client/session.go +++ b/src/dbnode/client/session.go @@ -1459,6 +1459,14 @@ func (s *session) aggregateAttempt( nsClone.Finalize() return nil, FetchResponseMetadata{}, xerrors.NewNonRetryableError(err) } + if req.SeriesLimit != nil && opts.InstanceMultiple > 0 { + topo := s.state.topoMap + iPerReplica := int64(len(topo.Hosts()) / topo.Replicas()) + iSeriesLimit := int64(float32(opts.SeriesLimit)*opts.InstanceMultiple) / iPerReplica + if iSeriesLimit < *req.SeriesLimit { + req.SeriesLimit = &iSeriesLimit + } + } fetchState, err := s.newFetchStateWithRLock(ctx, nsClone, newFetchStateOpts{ stateType: aggregateFetchState, @@ -1479,7 +1487,7 @@ func (s *session) aggregateAttempt( // must Unlock before calling `asEncodingSeriesIterators` as the latter needs to acquire // the fetchState Lock fetchState.Unlock() - iters, meta, err := fetchState.asAggregatedTagsIterator(s.pools) + iters, meta, err := fetchState.asAggregatedTagsIterator(s.pools, opts.SeriesLimit) // must Unlock() before decRef'ing, as the latter releases the fetchState back into a // pool if ref count == 0. @@ -1553,11 +1561,11 @@ func (s *session) fetchTaggedAttempt( nsClone.Finalize() return nil, FetchResponseMetadata{}, xerrors.NewNonRetryableError(err) } - if opts.InstanceMultiple > 0 { + if req.SeriesLimit != nil && opts.InstanceMultiple > 0 { topo := s.state.topoMap iPerReplica := int64(len(topo.Hosts()) / topo.Replicas()) iSeriesLimit := int64(float32(opts.SeriesLimit)*opts.InstanceMultiple) / iPerReplica - if req.SeriesLimit != nil && iSeriesLimit < *req.SeriesLimit { + if iSeriesLimit < *req.SeriesLimit { req.SeriesLimit = &iSeriesLimit } } @@ -1618,6 +1626,14 @@ func (s *session) fetchTaggedIDsAttempt( nsClone.Finalize() return nil, FetchResponseMetadata{}, xerrors.NewNonRetryableError(err) } + if req.SeriesLimit != nil && opts.InstanceMultiple > 0 { + topo := s.state.topoMap + iPerReplica := int64(len(topo.Hosts()) / topo.Replicas()) + iSeriesLimit := int64(float32(opts.SeriesLimit)*opts.InstanceMultiple) / iPerReplica + if iSeriesLimit < *req.SeriesLimit { + req.SeriesLimit = &iSeriesLimit + } + } fetchState, err := s.newFetchStateWithRLock(ctx, nsClone, newFetchStateOpts{ stateType: fetchTaggedFetchState, diff --git a/src/dbnode/client/session_test.go b/src/dbnode/client/session_test.go index ea94d0bc96..43e5642c16 100644 --- a/src/dbnode/client/session_test.go +++ b/src/dbnode/client/session_test.go @@ -280,34 +280,13 @@ func TestIteratorPools(t *testing.T) { assert.Equal(t, idPool, itPool.ID()) } -func TestSeriesLimit(t *testing.T) { +//nolint:dupl +func TestSeriesLimit_FetchTagged(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() - opts := newSessionTestOptions() - shardSet := sessionTestShardSet() - var hostShardSets []topology.HostShardSet - // setup 9 hosts so there are 3 instances per replica. Each instance has a single shard. - for i := 0; i < sessionTestReplicas; i++ { - for j := 0; j < sessionTestShards; j++ { - id := fmt.Sprintf("testhost-%d-%d", i, j) - host := topology.NewHost(id, fmt.Sprintf("%s:9000", id)) - hostShard, _ := sharding.NewShardSet([]shard.Shard{shardSet.All()[j]}, shardSet.HashFn()) - hostShardSet := topology.NewHostShardSet(host, hostShard) - hostShardSets = append(hostShardSets, hostShardSet) - } - } - - opts = opts.SetTopologyInitializer(topology.NewStaticInitializer( - topology.NewStaticOptions(). - SetReplicas(sessionTestReplicas). - SetShardSet(shardSet). - SetHostShardSets(hostShardSets))) - s, err := newSession(opts) - assert.NoError(t, err) - sess := s.(*session) // mock the host queue to return a result with a single series, this results in 3 series total, one per shard. - sess.newHostQueueFn = mockHostQueuesWithFn(ctrl, func(op op, host topology.Host) { + sess := setupMultipleInstanceCluster(t, ctrl, func(op op, host topology.Host) { fOp := op.(*fetchTaggedOp) assert.Equal(t, int64(2), *fOp.request.SeriesLimit) shardID := strings.Split(host.ID(), "-")[2] @@ -325,7 +304,6 @@ func TestSeriesLimit(t *testing.T) { }, nil) }) - require.NoError(t, sess.Open()) iters, meta, err := sess.fetchTaggedAttempt(context.TODO(), ident.StringID("ns"), index.Query{Query: idx.NewAllQuery()}, index.QueryOptions{ @@ -341,6 +319,89 @@ func TestSeriesLimit(t *testing.T) { require.NoError(t, sess.Close()) } +//nolint:dupl +func TestSeriesLimit_FetchTaggedIDs(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + // mock the host queue to return a result with a single series, this results in 3 series total, one per shard. + sess := setupMultipleInstanceCluster(t, ctrl, func(op op, host topology.Host) { + fOp := op.(*fetchTaggedOp) + assert.Equal(t, int64(2), *fOp.request.SeriesLimit) + shardID := strings.Split(host.ID(), "-")[2] + op.CompletionFn()(fetchTaggedResultAccumulatorOpts{ + host: host, + response: &rpc.FetchTaggedResult_{ + Exhaustive: true, + Elements: []*rpc.FetchTaggedIDResult_{ + { + // use shard id for the metric id so it's stable across replicas. + ID: []byte(shardID), + }, + }, + }, + }, nil) + }) + + iter, meta, err := sess.fetchTaggedIDsAttempt(context.TODO(), ident.StringID("ns"), + index.Query{Query: idx.NewAllQuery()}, + index.QueryOptions{ + // set to 6 so we can test the instance series limit is 2 (6 /3 instances per replica * InstanceMultiple) + SeriesLimit: 6, + InstanceMultiple: 1, + }) + require.NoError(t, err) + require.NotNil(t, iter) + // expect a series per shard. + require.Equal(t, 3, iter.Remaining()) + require.True(t, meta.Exhaustive) + require.NoError(t, sess.Close()) +} + +//nolint:dupl +func TestSeriesLimit_Aggregate(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + // mock the host queue to return a result with a single series, this results in 3 series total, one per shard. + sess := setupMultipleInstanceCluster(t, ctrl, func(op op, host topology.Host) { + aOp := op.(*aggregateOp) + assert.Equal(t, int64(2), *aOp.request.SeriesLimit) + shardID := strings.Split(host.ID(), "-")[2] + op.CompletionFn()(aggregateResultAccumulatorOpts{ + host: host, + response: &rpc.AggregateQueryRawResult_{ + Exhaustive: true, + Results: []*rpc.AggregateQueryRawResultTagNameElement{ + { + // use shard id for the tag value so it's stable across replicas. + TagName: []byte(shardID), + TagValues: []*rpc.AggregateQueryRawResultTagValueElement{ + { + TagValue: []byte("value"), + }, + }, + }, + }, + }, + }, nil) + }) + iter, meta, err := sess.aggregateAttempt(context.TODO(), ident.StringID("ns"), + index.Query{Query: idx.NewAllQuery()}, + index.AggregationOptions{ + QueryOptions: index.QueryOptions{ + // set to 6 so we can test the instance series limit is 2 (6 /3 instances per replica * InstanceMultiple) + SeriesLimit: 6, + InstanceMultiple: 1, + }, + }) + require.NoError(t, err) + require.NotNil(t, iter) + require.Equal(t, 3, iter.Remaining()) + require.True(t, meta.Exhaustive) + require.NoError(t, sess.Close()) +} + func TestSessionClusterConnectConsistencyLevelAny(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() @@ -442,11 +503,35 @@ func testSessionClusterConnectConsistencyLevel( } } -// mockHostQueuesWithFn mocks every host queue and calls the provided fn when an operation is enqueued. the provided -// fn is dispatched in a separate goroutine to simulate the queue processing. this also allows the function to access -// the state locks. -func mockHostQueuesWithFn(ctrl *gomock.Controller, fn func(op op, host topology.Host)) newHostQueueFn { - return func(host topology.Host, hostQueueOpts hostQueueOpts) (hostQueue, error) { +// setupMultipleInstanceCluster sets up a db cluster with 3 shards and 3 replicas. The 3 shards are distributed across +// 9 hosts, so each host has 1 replica of 1 shard. +// the function passed is executed when an operation is enqueued. the provided fn is dispatched in a separate goroutine +// to simulate the queue processing. this also allows the function to access the state locks. +func setupMultipleInstanceCluster(t *testing.T, ctrl *gomock.Controller, fn func(op op, host topology.Host)) *session { + opts := newSessionTestOptions() + shardSet := sessionTestShardSet() + var hostShardSets []topology.HostShardSet + // setup 9 hosts so there are 3 instances per replica. Each instance has a single shard. + for i := 0; i < sessionTestReplicas; i++ { + for j := 0; j < sessionTestShards; j++ { + id := fmt.Sprintf("testhost-%d-%d", i, j) + host := topology.NewHost(id, fmt.Sprintf("%s:9000", id)) + hostShard, _ := sharding.NewShardSet([]shard.Shard{shardSet.All()[j]}, shardSet.HashFn()) + hostShardSet := topology.NewHostShardSet(host, hostShard) + hostShardSets = append(hostShardSets, hostShardSet) + } + } + + opts = opts.SetTopologyInitializer(topology.NewStaticInitializer( + topology.NewStaticOptions(). + SetReplicas(sessionTestReplicas). + SetShardSet(shardSet). + SetHostShardSets(hostShardSets))) + s, err := newSession(opts) + assert.NoError(t, err) + sess := s.(*session) + + sess.newHostQueueFn = func(host topology.Host, hostQueueOpts hostQueueOpts) (hostQueue, error) { q := NewMockhostQueue(ctrl) q.EXPECT().Open() q.EXPECT().ConnectionCount().Return(hostQueueOpts.opts.MinConnectionCount()).AnyTimes() @@ -460,6 +545,9 @@ func mockHostQueuesWithFn(ctrl *gomock.Controller, fn func(op op, host topology. q.EXPECT().Close() return q, nil } + + require.NoError(t, sess.Open()) + return sess } func mockHostQueues(