diff --git a/src/dbnode/client/aggregate_op.go b/src/dbnode/client/aggregate_op.go index 2acc042fb6..6fb5a79fa2 100644 --- a/src/dbnode/client/aggregate_op.go +++ b/src/dbnode/client/aggregate_op.go @@ -45,11 +45,11 @@ func (f *aggregateOp) update(req rpc.AggregateQueryRawRequest, fn completionFn) f.completionFn = fn } -func (f *aggregateOp) requestLimit(defaultValue int) int { - if f.request.Limit == nil { +func (f *aggregateOp) requestSeriesLimit(defaultValue int) int { + if f.request.SeriesLimit == nil { return defaultValue } - return int(*f.request.Limit) + return int(*f.request.SeriesLimit) } func (f *aggregateOp) close() { diff --git a/src/dbnode/client/fetch_state.go b/src/dbnode/client/fetch_state.go index c991e19db5..baa99daa97 100644 --- a/src/dbnode/client/fetch_state.go +++ b/src/dbnode/client/fetch_state.go @@ -205,7 +205,7 @@ func (f *fetchState) asTaggedIDsIterator( return nil, FetchResponseMetadata{}, err } - limit := f.fetchTaggedOp.requestLimit(maxInt) + limit := f.fetchTaggedOp.requestSeriesLimit(maxInt) return f.tagResultAccumulator.AsTaggedIDsIterator(limit, pools) } @@ -231,7 +231,7 @@ func (f *fetchState) asEncodingSeriesIterators( return nil, FetchResponseMetadata{}, err } - limit := f.fetchTaggedOp.requestLimit(maxInt) + limit := f.fetchTaggedOp.requestSeriesLimit(maxInt) return f.tagResultAccumulator.AsEncodingSeriesIterators(limit, pools, descr, opts) } @@ -253,7 +253,7 @@ func (f *fetchState) asAggregatedTagsIterator(pools fetchTaggedPools) (Aggregate return nil, FetchResponseMetadata{}, err } - limit := f.aggregateOp.requestLimit(maxInt) + limit := f.aggregateOp.requestSeriesLimit(maxInt) return f.tagResultAccumulator.AsAggregatedTagsIterator(limit, pools) } diff --git a/src/dbnode/client/fetch_tagged_op.go b/src/dbnode/client/fetch_tagged_op.go index b9996da9d9..6de8b0f4e3 100644 --- a/src/dbnode/client/fetch_tagged_op.go +++ b/src/dbnode/client/fetch_tagged_op.go @@ -45,11 +45,11 @@ func (f *fetchTaggedOp) update(req rpc.FetchTaggedRequest, fn completionFn) { f.completionFn = fn } -func (f *fetchTaggedOp) requestLimit(defaultValue int) int { - if f.request.Limit == nil { +func (f *fetchTaggedOp) requestSeriesLimit(defaultValue int) int { + if f.request.SeriesLimit == nil { return defaultValue } - return int(*f.request.Limit) + return int(*f.request.SeriesLimit) } func (f *fetchTaggedOp) close() { diff --git a/src/dbnode/client/fetch_tagged_results_accumulator.go b/src/dbnode/client/fetch_tagged_results_accumulator.go index 35256b903e..977d37dd81 100644 --- a/src/dbnode/client/fetch_tagged_results_accumulator.go +++ b/src/dbnode/client/fetch_tagged_results_accumulator.go @@ -482,6 +482,7 @@ func (accum *fetchTaggedResultAccumulator) AsAggregatedTagsIterator( } moreElems = hasMore + // Would count ever be above limit? return count < limit }) diff --git a/src/dbnode/generated/thrift/rpc.thrift b/src/dbnode/generated/thrift/rpc.thrift index e83cc7335f..e6876db393 100644 --- a/src/dbnode/generated/thrift/rpc.thrift +++ b/src/dbnode/generated/thrift/rpc.thrift @@ -178,7 +178,7 @@ struct FetchTaggedRequest { 3: required i64 rangeStart 4: required i64 rangeEnd 5: required bool fetchData - 6: optional i64 limit + 6: optional i64 seriesLimit 7: optional TimeType rangeTimeType = TimeType.UNIX_SECONDS 8: optional bool requireExhaustive = true 9: optional i64 docsLimit @@ -398,11 +398,12 @@ struct AggregateQueryRawRequest { 2: required i64 rangeStart 3: required i64 rangeEnd 4: required binary nameSpace - 5: optional i64 limit + 5: optional i64 seriesLimit 6: optional list tagNameFilter 7: optional AggregateQueryType aggregateQueryType = AggregateQueryType.AGGREGATE_BY_TAG_NAME_VALUE 8: optional TimeType rangeType = TimeType.UNIX_SECONDS 9: optional binary source + 10: optional i64 docsLimit } struct AggregateQueryRawResult { @@ -425,11 +426,12 @@ struct AggregateQueryRequest { 2: required i64 rangeStart 3: required i64 rangeEnd 4: required string nameSpace - 5: optional i64 limit + 5: optional i64 seriesLimit 6: optional list tagNameFilter 7: optional AggregateQueryType aggregateQueryType = AggregateQueryType.AGGREGATE_BY_TAG_NAME_VALUE 8: optional TimeType rangeType = TimeType.UNIX_SECONDS 9: optional binary source + 10: optional i64 docsLimit } struct AggregateQueryResult { diff --git a/src/dbnode/generated/thrift/rpc/rpc.go b/src/dbnode/generated/thrift/rpc/rpc.go index ea8421cec3..c15f440908 100644 --- a/src/dbnode/generated/thrift/rpc/rpc.go +++ b/src/dbnode/generated/thrift/rpc/rpc.go @@ -3277,7 +3277,7 @@ func (p *Segment) String() string { // - RangeStart // - RangeEnd // - FetchData -// - Limit +// - SeriesLimit // - RangeTimeType // - RequireExhaustive // - DocsLimit @@ -3288,7 +3288,7 @@ type FetchTaggedRequest struct { RangeStart int64 `thrift:"rangeStart,3,required" db:"rangeStart" json:"rangeStart"` RangeEnd int64 `thrift:"rangeEnd,4,required" db:"rangeEnd" json:"rangeEnd"` FetchData bool `thrift:"fetchData,5,required" db:"fetchData" json:"fetchData"` - Limit *int64 `thrift:"limit,6" db:"limit" json:"limit,omitempty"` + SeriesLimit *int64 `thrift:"seriesLimit,6" db:"seriesLimit" json:"seriesLimit,omitempty"` RangeTimeType TimeType `thrift:"rangeTimeType,7" db:"rangeTimeType" json:"rangeTimeType,omitempty"` RequireExhaustive bool `thrift:"requireExhaustive,8" db:"requireExhaustive" json:"requireExhaustive,omitempty"` DocsLimit *int64 `thrift:"docsLimit,9" db:"docsLimit" json:"docsLimit,omitempty"` @@ -3323,13 +3323,13 @@ func (p *FetchTaggedRequest) GetFetchData() bool { return p.FetchData } -var FetchTaggedRequest_Limit_DEFAULT int64 +var FetchTaggedRequest_SeriesLimit_DEFAULT int64 -func (p *FetchTaggedRequest) GetLimit() int64 { - if !p.IsSetLimit() { - return FetchTaggedRequest_Limit_DEFAULT +func (p *FetchTaggedRequest) GetSeriesLimit() int64 { + if !p.IsSetSeriesLimit() { + return FetchTaggedRequest_SeriesLimit_DEFAULT } - return *p.Limit + return *p.SeriesLimit } var FetchTaggedRequest_RangeTimeType_DEFAULT TimeType = 0 @@ -3358,8 +3358,8 @@ var FetchTaggedRequest_Source_DEFAULT []byte func (p *FetchTaggedRequest) GetSource() []byte { return p.Source } -func (p *FetchTaggedRequest) IsSetLimit() bool { - return p.Limit != nil +func (p *FetchTaggedRequest) IsSetSeriesLimit() bool { + return p.SeriesLimit != nil } func (p *FetchTaggedRequest) IsSetRangeTimeType() bool { @@ -3522,7 +3522,7 @@ func (p *FetchTaggedRequest) ReadField6(iprot thrift.TProtocol) error { if v, err := iprot.ReadI64(); err != nil { return thrift.PrependError("error reading field 6: ", err) } else { - p.Limit = &v + p.SeriesLimit = &v } return nil } @@ -3675,15 +3675,15 @@ func (p *FetchTaggedRequest) writeField5(oprot thrift.TProtocol) (err error) { } func (p *FetchTaggedRequest) writeField6(oprot thrift.TProtocol) (err error) { - if p.IsSetLimit() { - if err := oprot.WriteFieldBegin("limit", thrift.I64, 6); err != nil { - return thrift.PrependError(fmt.Sprintf("%T write field begin error 6:limit: ", p), err) + if p.IsSetSeriesLimit() { + if err := oprot.WriteFieldBegin("seriesLimit", thrift.I64, 6); err != nil { + return thrift.PrependError(fmt.Sprintf("%T write field begin error 6:seriesLimit: ", p), err) } - if err := oprot.WriteI64(int64(*p.Limit)); err != nil { - return thrift.PrependError(fmt.Sprintf("%T.limit (6) field write error: ", p), err) + if err := oprot.WriteI64(int64(*p.SeriesLimit)); err != nil { + return thrift.PrependError(fmt.Sprintf("%T.seriesLimit (6) field write error: ", p), err) } if err := oprot.WriteFieldEnd(); err != nil { - return thrift.PrependError(fmt.Sprintf("%T write field end error 6:limit: ", p), err) + return thrift.PrependError(fmt.Sprintf("%T write field end error 6:seriesLimit: ", p), err) } } return err @@ -9610,21 +9610,23 @@ func (p *HealthResult_) String() string { // - RangeStart // - RangeEnd // - NameSpace -// - Limit +// - SeriesLimit // - TagNameFilter // - AggregateQueryType // - RangeType // - Source +// - DocsLimit type AggregateQueryRawRequest struct { Query []byte `thrift:"query,1,required" db:"query" json:"query"` RangeStart int64 `thrift:"rangeStart,2,required" db:"rangeStart" json:"rangeStart"` RangeEnd int64 `thrift:"rangeEnd,3,required" db:"rangeEnd" json:"rangeEnd"` NameSpace []byte `thrift:"nameSpace,4,required" db:"nameSpace" json:"nameSpace"` - Limit *int64 `thrift:"limit,5" db:"limit" json:"limit,omitempty"` + SeriesLimit *int64 `thrift:"seriesLimit,5" db:"seriesLimit" json:"seriesLimit,omitempty"` TagNameFilter [][]byte `thrift:"tagNameFilter,6" db:"tagNameFilter" json:"tagNameFilter,omitempty"` AggregateQueryType AggregateQueryType `thrift:"aggregateQueryType,7" db:"aggregateQueryType" json:"aggregateQueryType,omitempty"` RangeType TimeType `thrift:"rangeType,8" db:"rangeType" json:"rangeType,omitempty"` Source []byte `thrift:"source,9" db:"source" json:"source,omitempty"` + DocsLimit *int64 `thrift:"docsLimit,10" db:"docsLimit" json:"docsLimit,omitempty"` } func NewAggregateQueryRawRequest() *AggregateQueryRawRequest { @@ -9651,13 +9653,13 @@ func (p *AggregateQueryRawRequest) GetNameSpace() []byte { return p.NameSpace } -var AggregateQueryRawRequest_Limit_DEFAULT int64 +var AggregateQueryRawRequest_SeriesLimit_DEFAULT int64 -func (p *AggregateQueryRawRequest) GetLimit() int64 { - if !p.IsSetLimit() { - return AggregateQueryRawRequest_Limit_DEFAULT +func (p *AggregateQueryRawRequest) GetSeriesLimit() int64 { + if !p.IsSetSeriesLimit() { + return AggregateQueryRawRequest_SeriesLimit_DEFAULT } - return *p.Limit + return *p.SeriesLimit } var AggregateQueryRawRequest_TagNameFilter_DEFAULT [][]byte @@ -9683,8 +9685,17 @@ var AggregateQueryRawRequest_Source_DEFAULT []byte func (p *AggregateQueryRawRequest) GetSource() []byte { return p.Source } -func (p *AggregateQueryRawRequest) IsSetLimit() bool { - return p.Limit != nil + +var AggregateQueryRawRequest_DocsLimit_DEFAULT int64 + +func (p *AggregateQueryRawRequest) GetDocsLimit() int64 { + if !p.IsSetDocsLimit() { + return AggregateQueryRawRequest_DocsLimit_DEFAULT + } + return *p.DocsLimit +} +func (p *AggregateQueryRawRequest) IsSetSeriesLimit() bool { + return p.SeriesLimit != nil } func (p *AggregateQueryRawRequest) IsSetTagNameFilter() bool { @@ -9703,6 +9714,10 @@ func (p *AggregateQueryRawRequest) IsSetSource() bool { return p.Source != nil } +func (p *AggregateQueryRawRequest) IsSetDocsLimit() bool { + return p.DocsLimit != nil +} + func (p *AggregateQueryRawRequest) Read(iprot thrift.TProtocol) error { if _, err := iprot.ReadStructBegin(); err != nil { return thrift.PrependError(fmt.Sprintf("%T read error: ", p), err) @@ -9762,6 +9777,10 @@ func (p *AggregateQueryRawRequest) Read(iprot thrift.TProtocol) error { if err := p.ReadField9(iprot); err != nil { return err } + case 10: + if err := p.ReadField10(iprot); err != nil { + return err + } default: if err := iprot.Skip(fieldTypeId); err != nil { return err @@ -9829,7 +9848,7 @@ func (p *AggregateQueryRawRequest) ReadField5(iprot thrift.TProtocol) error { if v, err := iprot.ReadI64(); err != nil { return thrift.PrependError("error reading field 5: ", err) } else { - p.Limit = &v + p.SeriesLimit = &v } return nil } @@ -9885,6 +9904,15 @@ func (p *AggregateQueryRawRequest) ReadField9(iprot thrift.TProtocol) error { return nil } +func (p *AggregateQueryRawRequest) ReadField10(iprot thrift.TProtocol) error { + if v, err := iprot.ReadI64(); err != nil { + return thrift.PrependError("error reading field 10: ", err) + } else { + p.DocsLimit = &v + } + return nil +} + func (p *AggregateQueryRawRequest) Write(oprot thrift.TProtocol) error { if err := oprot.WriteStructBegin("AggregateQueryRawRequest"); err != nil { return thrift.PrependError(fmt.Sprintf("%T write struct begin error: ", p), err) @@ -9917,6 +9945,9 @@ func (p *AggregateQueryRawRequest) Write(oprot thrift.TProtocol) error { if err := p.writeField9(oprot); err != nil { return err } + if err := p.writeField10(oprot); err != nil { + return err + } } if err := oprot.WriteFieldStop(); err != nil { return thrift.PrependError("write field stop error: ", err) @@ -9980,15 +10011,15 @@ func (p *AggregateQueryRawRequest) writeField4(oprot thrift.TProtocol) (err erro } func (p *AggregateQueryRawRequest) writeField5(oprot thrift.TProtocol) (err error) { - if p.IsSetLimit() { - if err := oprot.WriteFieldBegin("limit", thrift.I64, 5); err != nil { - return thrift.PrependError(fmt.Sprintf("%T write field begin error 5:limit: ", p), err) + if p.IsSetSeriesLimit() { + if err := oprot.WriteFieldBegin("seriesLimit", thrift.I64, 5); err != nil { + return thrift.PrependError(fmt.Sprintf("%T write field begin error 5:seriesLimit: ", p), err) } - if err := oprot.WriteI64(int64(*p.Limit)); err != nil { - return thrift.PrependError(fmt.Sprintf("%T.limit (5) field write error: ", p), err) + if err := oprot.WriteI64(int64(*p.SeriesLimit)); err != nil { + return thrift.PrependError(fmt.Sprintf("%T.seriesLimit (5) field write error: ", p), err) } if err := oprot.WriteFieldEnd(); err != nil { - return thrift.PrependError(fmt.Sprintf("%T write field end error 5:limit: ", p), err) + return thrift.PrependError(fmt.Sprintf("%T write field end error 5:seriesLimit: ", p), err) } } return err @@ -10062,6 +10093,21 @@ func (p *AggregateQueryRawRequest) writeField9(oprot thrift.TProtocol) (err erro return err } +func (p *AggregateQueryRawRequest) writeField10(oprot thrift.TProtocol) (err error) { + if p.IsSetDocsLimit() { + if err := oprot.WriteFieldBegin("docsLimit", thrift.I64, 10); err != nil { + return thrift.PrependError(fmt.Sprintf("%T write field begin error 10:docsLimit: ", p), err) + } + if err := oprot.WriteI64(int64(*p.DocsLimit)); err != nil { + return thrift.PrependError(fmt.Sprintf("%T.docsLimit (10) field write error: ", p), err) + } + if err := oprot.WriteFieldEnd(); err != nil { + return thrift.PrependError(fmt.Sprintf("%T write field end error 10:docsLimit: ", p), err) + } + } + return err +} + func (p *AggregateQueryRawRequest) String() string { if p == nil { return "" @@ -10492,21 +10538,23 @@ func (p *AggregateQueryRawResultTagValueElement) String() string { // - RangeStart // - RangeEnd // - NameSpace -// - Limit +// - SeriesLimit // - TagNameFilter // - AggregateQueryType // - RangeType // - Source +// - DocsLimit type AggregateQueryRequest struct { Query *Query `thrift:"query,1" db:"query" json:"query,omitempty"` RangeStart int64 `thrift:"rangeStart,2,required" db:"rangeStart" json:"rangeStart"` RangeEnd int64 `thrift:"rangeEnd,3,required" db:"rangeEnd" json:"rangeEnd"` NameSpace string `thrift:"nameSpace,4,required" db:"nameSpace" json:"nameSpace"` - Limit *int64 `thrift:"limit,5" db:"limit" json:"limit,omitempty"` + SeriesLimit *int64 `thrift:"seriesLimit,5" db:"seriesLimit" json:"seriesLimit,omitempty"` TagNameFilter []string `thrift:"tagNameFilter,6" db:"tagNameFilter" json:"tagNameFilter,omitempty"` AggregateQueryType AggregateQueryType `thrift:"aggregateQueryType,7" db:"aggregateQueryType" json:"aggregateQueryType,omitempty"` RangeType TimeType `thrift:"rangeType,8" db:"rangeType" json:"rangeType,omitempty"` Source []byte `thrift:"source,9" db:"source" json:"source,omitempty"` + DocsLimit *int64 `thrift:"docsLimit,10" db:"docsLimit" json:"docsLimit,omitempty"` } func NewAggregateQueryRequest() *AggregateQueryRequest { @@ -10538,13 +10586,13 @@ func (p *AggregateQueryRequest) GetNameSpace() string { return p.NameSpace } -var AggregateQueryRequest_Limit_DEFAULT int64 +var AggregateQueryRequest_SeriesLimit_DEFAULT int64 -func (p *AggregateQueryRequest) GetLimit() int64 { - if !p.IsSetLimit() { - return AggregateQueryRequest_Limit_DEFAULT +func (p *AggregateQueryRequest) GetSeriesLimit() int64 { + if !p.IsSetSeriesLimit() { + return AggregateQueryRequest_SeriesLimit_DEFAULT } - return *p.Limit + return *p.SeriesLimit } var AggregateQueryRequest_TagNameFilter_DEFAULT []string @@ -10570,12 +10618,21 @@ var AggregateQueryRequest_Source_DEFAULT []byte func (p *AggregateQueryRequest) GetSource() []byte { return p.Source } + +var AggregateQueryRequest_DocsLimit_DEFAULT int64 + +func (p *AggregateQueryRequest) GetDocsLimit() int64 { + if !p.IsSetDocsLimit() { + return AggregateQueryRequest_DocsLimit_DEFAULT + } + return *p.DocsLimit +} func (p *AggregateQueryRequest) IsSetQuery() bool { return p.Query != nil } -func (p *AggregateQueryRequest) IsSetLimit() bool { - return p.Limit != nil +func (p *AggregateQueryRequest) IsSetSeriesLimit() bool { + return p.SeriesLimit != nil } func (p *AggregateQueryRequest) IsSetTagNameFilter() bool { @@ -10594,6 +10651,10 @@ func (p *AggregateQueryRequest) IsSetSource() bool { return p.Source != nil } +func (p *AggregateQueryRequest) IsSetDocsLimit() bool { + return p.DocsLimit != nil +} + func (p *AggregateQueryRequest) Read(iprot thrift.TProtocol) error { if _, err := iprot.ReadStructBegin(); err != nil { return thrift.PrependError(fmt.Sprintf("%T read error: ", p), err) @@ -10651,6 +10712,10 @@ func (p *AggregateQueryRequest) Read(iprot thrift.TProtocol) error { if err := p.ReadField9(iprot); err != nil { return err } + case 10: + if err := p.ReadField10(iprot); err != nil { + return err + } default: if err := iprot.Skip(fieldTypeId); err != nil { return err @@ -10714,7 +10779,7 @@ func (p *AggregateQueryRequest) ReadField5(iprot thrift.TProtocol) error { if v, err := iprot.ReadI64(); err != nil { return thrift.PrependError("error reading field 5: ", err) } else { - p.Limit = &v + p.SeriesLimit = &v } return nil } @@ -10770,6 +10835,15 @@ func (p *AggregateQueryRequest) ReadField9(iprot thrift.TProtocol) error { return nil } +func (p *AggregateQueryRequest) ReadField10(iprot thrift.TProtocol) error { + if v, err := iprot.ReadI64(); err != nil { + return thrift.PrependError("error reading field 10: ", err) + } else { + p.DocsLimit = &v + } + return nil +} + func (p *AggregateQueryRequest) Write(oprot thrift.TProtocol) error { if err := oprot.WriteStructBegin("AggregateQueryRequest"); err != nil { return thrift.PrependError(fmt.Sprintf("%T write struct begin error: ", p), err) @@ -10802,6 +10876,9 @@ func (p *AggregateQueryRequest) Write(oprot thrift.TProtocol) error { if err := p.writeField9(oprot); err != nil { return err } + if err := p.writeField10(oprot); err != nil { + return err + } } if err := oprot.WriteFieldStop(); err != nil { return thrift.PrependError("write field stop error: ", err) @@ -10867,15 +10944,15 @@ func (p *AggregateQueryRequest) writeField4(oprot thrift.TProtocol) (err error) } func (p *AggregateQueryRequest) writeField5(oprot thrift.TProtocol) (err error) { - if p.IsSetLimit() { - if err := oprot.WriteFieldBegin("limit", thrift.I64, 5); err != nil { - return thrift.PrependError(fmt.Sprintf("%T write field begin error 5:limit: ", p), err) + if p.IsSetSeriesLimit() { + if err := oprot.WriteFieldBegin("seriesLimit", thrift.I64, 5); err != nil { + return thrift.PrependError(fmt.Sprintf("%T write field begin error 5:seriesLimit: ", p), err) } - if err := oprot.WriteI64(int64(*p.Limit)); err != nil { - return thrift.PrependError(fmt.Sprintf("%T.limit (5) field write error: ", p), err) + if err := oprot.WriteI64(int64(*p.SeriesLimit)); err != nil { + return thrift.PrependError(fmt.Sprintf("%T.seriesLimit (5) field write error: ", p), err) } if err := oprot.WriteFieldEnd(); err != nil { - return thrift.PrependError(fmt.Sprintf("%T write field end error 5:limit: ", p), err) + return thrift.PrependError(fmt.Sprintf("%T write field end error 5:seriesLimit: ", p), err) } } return err @@ -10949,6 +11026,21 @@ func (p *AggregateQueryRequest) writeField9(oprot thrift.TProtocol) (err error) return err } +func (p *AggregateQueryRequest) writeField10(oprot thrift.TProtocol) (err error) { + if p.IsSetDocsLimit() { + if err := oprot.WriteFieldBegin("docsLimit", thrift.I64, 10); err != nil { + return thrift.PrependError(fmt.Sprintf("%T write field begin error 10:docsLimit: ", p), err) + } + if err := oprot.WriteI64(int64(*p.DocsLimit)); err != nil { + return thrift.PrependError(fmt.Sprintf("%T.docsLimit (10) field write error: ", p), err) + } + if err := oprot.WriteFieldEnd(); err != nil { + return thrift.PrependError(fmt.Sprintf("%T write field end error 10:docsLimit: ", p), err) + } + } + return err +} + func (p *AggregateQueryRequest) String() string { if p == nil { return "" diff --git a/src/dbnode/generated/thrift/rpc/rpc_mock.go b/src/dbnode/generated/thrift/rpc/rpc_mock.go index a7be5ca909..3d5f1f0ff3 100644 --- a/src/dbnode/generated/thrift/rpc/rpc_mock.go +++ b/src/dbnode/generated/thrift/rpc/rpc_mock.go @@ -1,7 +1,7 @@ // Code generated by MockGen. DO NOT EDIT. // Source: github.com/m3db/m3/src/dbnode/generated/thrift/rpc/tchan-go -// Copyright (c) 2020 Uber Technologies, Inc. +// Copyright (c) 2021 Uber Technologies, Inc. // // Permission is hereby granted, free of charge, to any person obtaining a copy // of this software and associated documentation files (the "Software"), to deal diff --git a/src/dbnode/network/server/tchannelthrift/convert/convert.go b/src/dbnode/network/server/tchannelthrift/convert/convert.go index ed7b9a7fd0..f62e60101b 100644 --- a/src/dbnode/network/server/tchannelthrift/convert/convert.go +++ b/src/dbnode/network/server/tchannelthrift/convert/convert.go @@ -227,7 +227,7 @@ func FromRPCFetchTaggedRequest( EndExclusive: end, RequireExhaustive: req.RequireExhaustive, } - if l := req.Limit; l != nil { + if l := req.SeriesLimit; l != nil { opts.SeriesLimit = int(*l) } if l := req.DocsLimit; l != nil { @@ -285,7 +285,7 @@ func ToRPCFetchTaggedRequest( if opts.SeriesLimit > 0 { l := int64(opts.SeriesLimit) - request.Limit = &l + request.SeriesLimit = &l } if opts.DocsLimit > 0 { @@ -320,9 +320,13 @@ func FromRPCAggregateQueryRequest( EndExclusive: end, }, } - if l := req.Limit; l != nil { + if l := req.SeriesLimit; l != nil { opts.SeriesLimit = int(*l) } + if l := req.DocsLimit; l != nil { + opts.DocsLimit = int(*l) + } + if len(req.Source) > 0 { opts.Source = req.Source } @@ -368,12 +372,17 @@ func FromRPCAggregateQueryRawRequest( EndExclusive: end, }, } - if l := req.Limit; l != nil { + if l := req.SeriesLimit; l != nil { opts.SeriesLimit = int(*l) } + if l := req.DocsLimit; l != nil { + opts.DocsLimit = int(*l) + } + if len(req.Source) > 0 { opts.Source = req.Source } + query, err := idx.Unmarshal(req.Query) if err != nil { return nil, index.Query{}, index.AggregationOptions{}, err @@ -420,7 +429,11 @@ func ToRPCAggregateQueryRawRequest( if opts.SeriesLimit > 0 { l := int64(opts.SeriesLimit) - request.Limit = &l + request.SeriesLimit = &l + } + if opts.DocsLimit > 0 { + l := int64(opts.DocsLimit) + request.DocsLimit = &l } if len(opts.Source) > 0 { diff --git a/src/dbnode/network/server/tchannelthrift/convert/convert_test.go b/src/dbnode/network/server/tchannelthrift/convert/convert_test.go index 2c33c2dd06..f150827389 100644 --- a/src/dbnode/network/server/tchannelthrift/convert/convert_test.go +++ b/src/dbnode/network/server/tchannelthrift/convert/convert_test.go @@ -1,4 +1,4 @@ -// Copyright (c) 2018 Uber Technologies, Inc. +// Copyright (c) 2021 Uber Technologies, Inc. // // Permission is hereby granted, free of charge, to any person obtaining a copy // of this software and associated documentation files (the "Software"), to deal @@ -101,20 +101,25 @@ func conjunctionQueryATestCase(t *testing.T) (idx.Query, []byte) { } func TestConvertFetchTaggedRequest(t *testing.T) { + var ( + seriesLimit int64 = 10 + docsLimit int64 = 10 + ) ns := ident.StringID("abc") opts := index.QueryOptions{ StartInclusive: time.Now().Add(-900 * time.Hour), EndExclusive: time.Now(), - SeriesLimit: 10, + SeriesLimit: int(seriesLimit), + DocsLimit: int(docsLimit), } fetchData := true - var limit int64 = 10 requestSkeleton := &rpc.FetchTaggedRequest{ - NameSpace: ns.Bytes(), - RangeStart: mustToRpcTime(t, opts.StartInclusive), - RangeEnd: mustToRpcTime(t, opts.EndExclusive), - FetchData: fetchData, - Limit: &limit, + NameSpace: ns.Bytes(), + RangeStart: mustToRpcTime(t, opts.StartInclusive), + RangeEnd: mustToRpcTime(t, opts.EndExclusive), + FetchData: fetchData, + SeriesLimit: &seriesLimit, + DocsLimit: &docsLimit, } requireEqual := func(a, b interface{}) { d := cmp.Diff(a, b) @@ -166,12 +171,17 @@ func TestConvertFetchTaggedRequest(t *testing.T) { } func TestConvertAggregateRawQueryRequest(t *testing.T) { - ns := ident.StringID("abc") + var ( + seriesLimit int64 = 10 + docsLimit int64 = 10 + ns = ident.StringID("abc") + ) opts := index.AggregationOptions{ QueryOptions: index.QueryOptions{ StartInclusive: time.Now().Add(-900 * time.Hour), EndExclusive: time.Now(), - SeriesLimit: 10, + SeriesLimit: int(seriesLimit), + DocsLimit: int(docsLimit), }, Type: index.AggregateTagNamesAndValues, FieldFilter: index.AggregateFieldFilter{ @@ -179,12 +189,12 @@ func TestConvertAggregateRawQueryRequest(t *testing.T) { []byte("string"), }, } - var limit int64 = 10 requestSkeleton := &rpc.AggregateQueryRawRequest{ - NameSpace: ns.Bytes(), - RangeStart: mustToRpcTime(t, opts.StartInclusive), - RangeEnd: mustToRpcTime(t, opts.EndExclusive), - Limit: &limit, + NameSpace: ns.Bytes(), + RangeStart: mustToRpcTime(t, opts.StartInclusive), + RangeEnd: mustToRpcTime(t, opts.EndExclusive), + SeriesLimit: &seriesLimit, + DocsLimit: &docsLimit, TagNameFilter: [][]byte{ []byte("some"), []byte("string"), diff --git a/src/dbnode/network/server/tchannelthrift/node/service_test.go b/src/dbnode/network/server/tchannelthrift/node/service_test.go index 1996795609..65567c26d8 100644 --- a/src/dbnode/network/server/tchannelthrift/node/service_test.go +++ b/src/dbnode/network/server/tchannelthrift/node/service_test.go @@ -1632,7 +1632,10 @@ func TestServiceFetchTagged(t *testing.T) { index.QueryResultsOptions{}, testIndexOptions) resMap.Map().Set(md1.ID, doc.NewDocumentFromMetadata(md1)) resMap.Map().Set(md2.ID, doc.NewDocumentFromMetadata(md2)) - + var ( + seriesLimit int64 = 10 + docsLimit int64 = 10 + ) mockDB.EXPECT().QueryIDs( gomock.Any(), ident.NewIDMatcher(nsID), @@ -1640,23 +1643,25 @@ func TestServiceFetchTagged(t *testing.T) { index.QueryOptions{ StartInclusive: start, EndExclusive: end, - SeriesLimit: 10, + SeriesLimit: int(seriesLimit), + DocsLimit: int(docsLimit), }).Return(index.QueryResult{Results: resMap, Exhaustive: true}, nil) startNanos, err := convert.ToValue(start, rpc.TimeType_UNIX_NANOSECONDS) require.NoError(t, err) endNanos, err := convert.ToValue(end, rpc.TimeType_UNIX_NANOSECONDS) require.NoError(t, err) - var limit int64 = 10 + data, err := idx.Marshal(req) require.NoError(t, err) r, err := service.FetchTagged(tctx, &rpc.FetchTaggedRequest{ - NameSpace: []byte(nsID), - Query: data, - RangeStart: startNanos, - RangeEnd: endNanos, - FetchData: true, - Limit: &limit, + NameSpace: []byte(nsID), + Query: data, + RangeStart: startNanos, + RangeEnd: endNanos, + FetchData: true, + SeriesLimit: &seriesLimit, + DocsLimit: &docsLimit, }) require.NoError(t, err) @@ -1764,16 +1769,20 @@ func TestServiceFetchTaggedIsOverloaded(t *testing.T) { require.NoError(t, err) endNanos, err := convert.ToValue(end, rpc.TimeType_UNIX_NANOSECONDS) require.NoError(t, err) - var limit int64 = 10 + var ( + seriesLimit int64 = 10 + docsLimit int64 = 10 + ) data, err := idx.Marshal(req) require.NoError(t, err) _, err = service.FetchTagged(tctx, &rpc.FetchTaggedRequest{ - NameSpace: []byte(nsID), - Query: data, - RangeStart: startNanos, - RangeEnd: endNanos, - FetchData: true, - Limit: &limit, + NameSpace: []byte(nsID), + Query: data, + RangeStart: startNanos, + RangeEnd: endNanos, + FetchData: true, + SeriesLimit: &seriesLimit, + DocsLimit: &docsLimit, }) require.Equal(t, tterrors.NewInternalError(errServerIsOverloaded), err) } @@ -1804,17 +1813,21 @@ func TestServiceFetchTaggedDatabaseNotSet(t *testing.T) { require.NoError(t, err) endNanos, err := convert.ToValue(end, rpc.TimeType_UNIX_NANOSECONDS) require.NoError(t, err) - var limit int64 = 10 + var ( + seriesLimit int64 = 10 + docsLimit int64 = 10 + ) data, err := idx.Marshal(req) require.NoError(t, err) _, err = service.FetchTagged(tctx, &rpc.FetchTaggedRequest{ - NameSpace: []byte(nsID), - Query: data, - RangeStart: startNanos, - RangeEnd: endNanos, - FetchData: true, - Limit: &limit, + NameSpace: []byte(nsID), + Query: data, + RangeStart: startNanos, + RangeEnd: endNanos, + FetchData: true, + SeriesLimit: &seriesLimit, + DocsLimit: &docsLimit, }) require.Equal(t, tterrors.NewInternalError(errDatabaseIsNotInitializedYet), err) } @@ -1856,7 +1869,10 @@ func TestServiceFetchTaggedNoData(t *testing.T) { index.QueryResultsOptions{}, testIndexOptions) resMap.Map().Set(md1.ID, doc.NewDocumentFromMetadata(md1)) resMap.Map().Set(md2.ID, doc.NewDocumentFromMetadata(md2)) - + var ( + seriesLimit int64 = 10 + docsLimit int64 = 10 + ) mockDB.EXPECT().QueryIDs( ctx, ident.NewIDMatcher(nsID), @@ -1864,23 +1880,25 @@ func TestServiceFetchTaggedNoData(t *testing.T) { index.QueryOptions{ StartInclusive: start, EndExclusive: end, - SeriesLimit: 10, + SeriesLimit: int(seriesLimit), + DocsLimit: int(docsLimit), }).Return(index.QueryResult{Results: resMap, Exhaustive: true}, nil) startNanos, err := convert.ToValue(start, rpc.TimeType_UNIX_NANOSECONDS) require.NoError(t, err) endNanos, err := convert.ToValue(end, rpc.TimeType_UNIX_NANOSECONDS) require.NoError(t, err) - var limit int64 = 10 + data, err := idx.Marshal(req) require.NoError(t, err) r, err := service.FetchTagged(tctx, &rpc.FetchTaggedRequest{ - NameSpace: []byte(nsID), - Query: data, - RangeStart: startNanos, - RangeEnd: endNanos, - FetchData: false, - Limit: &limit, + NameSpace: []byte(nsID), + Query: data, + RangeStart: startNanos, + RangeEnd: endNanos, + FetchData: false, + SeriesLimit: &seriesLimit, + DocsLimit: &docsLimit, }) require.NoError(t, err) @@ -1922,8 +1940,10 @@ func TestServiceFetchTaggedErrs(t *testing.T) { require.NoError(t, err) endNanos, err := convert.ToValue(end, rpc.TimeType_UNIX_NANOSECONDS) require.NoError(t, err) - var limit int64 = 10 - + var ( + seriesLimit int64 = 10 + docsLimit int64 = 10 + ) req, err := idx.NewRegexpQuery([]byte("foo"), []byte("b.*")) require.NoError(t, err) data, err := idx.Marshal(req) @@ -1937,15 +1957,17 @@ func TestServiceFetchTaggedErrs(t *testing.T) { index.QueryOptions{ StartInclusive: start, EndExclusive: end, - SeriesLimit: 10, + SeriesLimit: int(seriesLimit), + DocsLimit: int(docsLimit), }).Return(index.QueryResult{}, fmt.Errorf("random err")) _, err = service.FetchTagged(tctx, &rpc.FetchTaggedRequest{ - NameSpace: []byte(nsID), - Query: data, - RangeStart: startNanos, - RangeEnd: endNanos, - FetchData: false, - Limit: &limit, + NameSpace: []byte(nsID), + Query: data, + RangeStart: startNanos, + RangeEnd: endNanos, + FetchData: false, + SeriesLimit: &seriesLimit, + DocsLimit: &docsLimit, }) require.Error(t, err) } @@ -2022,7 +2044,10 @@ func TestServiceFetchTaggedReturnOnFirstErr(t *testing.T) { resMap := index.NewQueryResults(ident.StringID(nsID), index.QueryResultsOptions{}, testIndexOptions) resMap.Map().Set(md1.ID, doc.NewDocumentFromMetadata(md1)) - + var ( + seriesLimit int64 = 10 + docsLimit int64 = 10 + ) mockDB.EXPECT().QueryIDs( gomock.Any(), ident.NewIDMatcher(nsID), @@ -2030,23 +2055,25 @@ func TestServiceFetchTaggedReturnOnFirstErr(t *testing.T) { index.QueryOptions{ StartInclusive: start, EndExclusive: end, - SeriesLimit: 10, + SeriesLimit: int(seriesLimit), + DocsLimit: int(docsLimit), }).Return(index.QueryResult{Results: resMap, Exhaustive: true}, nil) startNanos, err := convert.ToValue(start, rpc.TimeType_UNIX_NANOSECONDS) require.NoError(t, err) endNanos, err := convert.ToValue(end, rpc.TimeType_UNIX_NANOSECONDS) require.NoError(t, err) - var limit int64 = 10 + data, err := idx.Marshal(req) require.NoError(t, err) _, err = service.FetchTagged(tctx, &rpc.FetchTaggedRequest{ - NameSpace: []byte(nsID), - Query: data, - RangeStart: startNanos, - RangeEnd: endNanos, - FetchData: true, - Limit: &limit, + NameSpace: []byte(nsID), + Query: data, + RangeStart: startNanos, + RangeEnd: endNanos, + FetchData: true, + SeriesLimit: &seriesLimit, + DocsLimit: &docsLimit, }) require.Error(t, err) } @@ -2080,6 +2107,11 @@ func TestServiceAggregate(t *testing.T) { resMap.Map().Set(ident.StringID("foo"), index.MustNewAggregateValues(testIndexOptions)) resMap.Map().Set(ident.StringID("bar"), index.MustNewAggregateValues(testIndexOptions, ident.StringID("baz"), ident.StringID("barf"))) + + var ( + seriesLimit int64 = 10 + docsLimit int64 = 10 + ) mockDB.EXPECT().AggregateQuery( ctx, ident.NewIDMatcher(nsID), @@ -2088,7 +2120,8 @@ func TestServiceAggregate(t *testing.T) { QueryOptions: index.QueryOptions{ StartInclusive: start, EndExclusive: end, - SeriesLimit: 10, + SeriesLimit: int(seriesLimit), + DocsLimit: int(docsLimit), }, FieldFilter: index.AggregateFieldFilter{ []byte("foo"), []byte("bar"), @@ -2101,7 +2134,7 @@ func TestServiceAggregate(t *testing.T) { require.NoError(t, err) endNanos, err := convert.ToValue(end, rpc.TimeType_UNIX_NANOSECONDS) require.NoError(t, err) - var limit int64 = 10 + data, err := idx.Marshal(req) require.NoError(t, err) r, err := service.AggregateRaw(tctx, &rpc.AggregateQueryRawRequest{ @@ -2109,7 +2142,8 @@ func TestServiceAggregate(t *testing.T) { Query: data, RangeStart: startNanos, RangeEnd: endNanos, - Limit: &limit, + SeriesLimit: &seriesLimit, + DocsLimit: &docsLimit, AggregateQueryType: rpc.AggregateQueryType_AGGREGATE_BY_TAG_NAME_VALUE, TagNameFilter: [][]byte{ []byte("foo"), []byte("bar"), @@ -2163,6 +2197,10 @@ func TestServiceAggregateNameOnly(t *testing.T) { index.AggregateResultsOptions{}, testIndexOptions) resMap.Map().Set(ident.StringID("foo"), index.AggregateValues{}) resMap.Map().Set(ident.StringID("bar"), index.AggregateValues{}) + var ( + seriesLimit int64 = 10 + docsLimit int64 = 10 + ) mockDB.EXPECT().AggregateQuery( ctx, ident.NewIDMatcher(nsID), @@ -2171,7 +2209,8 @@ func TestServiceAggregateNameOnly(t *testing.T) { QueryOptions: index.QueryOptions{ StartInclusive: start, EndExclusive: end, - SeriesLimit: 10, + SeriesLimit: int(seriesLimit), + DocsLimit: int(docsLimit), }, FieldFilter: index.AggregateFieldFilter{ []byte("foo"), []byte("bar"), @@ -2184,7 +2223,7 @@ func TestServiceAggregateNameOnly(t *testing.T) { require.NoError(t, err) endNanos, err := convert.ToValue(end, rpc.TimeType_UNIX_NANOSECONDS) require.NoError(t, err) - var limit int64 = 10 + data, err := idx.Marshal(req) require.NoError(t, err) r, err := service.AggregateRaw(tctx, &rpc.AggregateQueryRawRequest{ @@ -2192,7 +2231,8 @@ func TestServiceAggregateNameOnly(t *testing.T) { Query: data, RangeStart: startNanos, RangeEnd: endNanos, - Limit: &limit, + SeriesLimit: &seriesLimit, + DocsLimit: &docsLimit, AggregateQueryType: rpc.AggregateQueryType_AGGREGATE_BY_TAG_NAME, TagNameFilter: [][]byte{ []byte("foo"), []byte("bar"),