Skip to content

Commit

Permalink
[dbnode][coordinator] Ensure docs limit is propagated for search and …
Browse files Browse the repository at this point in the history
…aggregate RPCs (#3108)

* [dbnode][coordinator] Ensure docs limit is propagated for search and aggregate RPCs

* Fixup convert_test

* Fixup compilation issues

* Update mocks

* Fixup service test

Co-authored-by: Wesley Kim <wesley@chronosphere.io>
Co-authored-by: arnikola <artem@chronosphere.io>
  • Loading branch information
3 people authored Jan 21, 2021
1 parent 9be8ef9 commit 45e5e4a
Show file tree
Hide file tree
Showing 10 changed files with 295 additions and 137 deletions.
6 changes: 3 additions & 3 deletions src/dbnode/client/aggregate_op.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,11 @@ func (f *aggregateOp) update(req rpc.AggregateQueryRawRequest, fn completionFn)
f.completionFn = fn
}

func (f *aggregateOp) requestLimit(defaultValue int) int {
if f.request.Limit == nil {
func (f *aggregateOp) requestSeriesLimit(defaultValue int) int {
if f.request.SeriesLimit == nil {
return defaultValue
}
return int(*f.request.Limit)
return int(*f.request.SeriesLimit)
}

func (f *aggregateOp) close() {
Expand Down
6 changes: 3 additions & 3 deletions src/dbnode/client/fetch_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ func (f *fetchState) asTaggedIDsIterator(
return nil, FetchResponseMetadata{}, err
}

limit := f.fetchTaggedOp.requestLimit(maxInt)
limit := f.fetchTaggedOp.requestSeriesLimit(maxInt)
return f.tagResultAccumulator.AsTaggedIDsIterator(limit, pools)
}

Expand All @@ -231,7 +231,7 @@ func (f *fetchState) asEncodingSeriesIterators(
return nil, FetchResponseMetadata{}, err
}

limit := f.fetchTaggedOp.requestLimit(maxInt)
limit := f.fetchTaggedOp.requestSeriesLimit(maxInt)
return f.tagResultAccumulator.AsEncodingSeriesIterators(limit, pools, descr, opts)
}

Expand All @@ -253,7 +253,7 @@ func (f *fetchState) asAggregatedTagsIterator(pools fetchTaggedPools) (Aggregate
return nil, FetchResponseMetadata{}, err
}

limit := f.aggregateOp.requestLimit(maxInt)
limit := f.aggregateOp.requestSeriesLimit(maxInt)
return f.tagResultAccumulator.AsAggregatedTagsIterator(limit, pools)
}

Expand Down
6 changes: 3 additions & 3 deletions src/dbnode/client/fetch_tagged_op.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,11 @@ func (f *fetchTaggedOp) update(req rpc.FetchTaggedRequest, fn completionFn) {
f.completionFn = fn
}

func (f *fetchTaggedOp) requestLimit(defaultValue int) int {
if f.request.Limit == nil {
func (f *fetchTaggedOp) requestSeriesLimit(defaultValue int) int {
if f.request.SeriesLimit == nil {
return defaultValue
}
return int(*f.request.Limit)
return int(*f.request.SeriesLimit)
}

func (f *fetchTaggedOp) close() {
Expand Down
1 change: 1 addition & 0 deletions src/dbnode/client/fetch_tagged_results_accumulator.go
Original file line number Diff line number Diff line change
Expand Up @@ -482,6 +482,7 @@ func (accum *fetchTaggedResultAccumulator) AsAggregatedTagsIterator(
}

moreElems = hasMore
// Would count ever be above limit?
return count < limit
})

Expand Down
8 changes: 5 additions & 3 deletions src/dbnode/generated/thrift/rpc.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ struct FetchTaggedRequest {
3: required i64 rangeStart
4: required i64 rangeEnd
5: required bool fetchData
6: optional i64 limit
6: optional i64 seriesLimit
7: optional TimeType rangeTimeType = TimeType.UNIX_SECONDS
8: optional bool requireExhaustive = true
9: optional i64 docsLimit
Expand Down Expand Up @@ -398,11 +398,12 @@ struct AggregateQueryRawRequest {
2: required i64 rangeStart
3: required i64 rangeEnd
4: required binary nameSpace
5: optional i64 limit
5: optional i64 seriesLimit
6: optional list<binary> tagNameFilter
7: optional AggregateQueryType aggregateQueryType = AggregateQueryType.AGGREGATE_BY_TAG_NAME_VALUE
8: optional TimeType rangeType = TimeType.UNIX_SECONDS
9: optional binary source
10: optional i64 docsLimit
}

struct AggregateQueryRawResult {
Expand All @@ -425,11 +426,12 @@ struct AggregateQueryRequest {
2: required i64 rangeStart
3: required i64 rangeEnd
4: required string nameSpace
5: optional i64 limit
5: optional i64 seriesLimit
6: optional list<string> tagNameFilter
7: optional AggregateQueryType aggregateQueryType = AggregateQueryType.AGGREGATE_BY_TAG_NAME_VALUE
8: optional TimeType rangeType = TimeType.UNIX_SECONDS
9: optional binary source
10: optional i64 docsLimit
}

struct AggregateQueryResult {
Expand Down
Loading

0 comments on commit 45e5e4a

Please sign in to comment.