Skip to content

Commit

Permalink
Expose Endpoint Info timeout parameter (#5480)
Browse files Browse the repository at this point in the history
* Expose Endpoint Info timeout parameter

Signed-off-by: Ben Ye <ben.ye@bytedance.com>

* fix test timeout

Signed-off-by: Ben Ye <ben.ye@bytedance.com>

* update doc and changelog

Signed-off-by: Ben Ye <ben.ye@bytedance.com>

* make it a hidden flag

Signed-off-by: Ben Ye <ben.ye@bytedance.com>

* rebase main

Signed-off-by: Ben Ye <ben.ye@bytedance.com>

* fix tests

Signed-off-by: Ben Ye <ben.ye@bytedance.com>

Signed-off-by: Ben Ye <ben.ye@bytedance.com>
Co-authored-by: Giedrius Statkevičius <giedrius.statkevicius@vinted.com>
  • Loading branch information
Ben Ye and GiedriusS authored Aug 12, 2022
1 parent 292dbb7 commit cbe6657
Show file tree
Hide file tree
Showing 4 changed files with 19 additions and 20 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
- [#5475](https://github.com/thanos-io/thanos/pull/5475) Compact/Store: Added `--block-files-concurrency` allowing to configure number of go routines for download/upload block files during compaction.
- [#5470](https://github.com/thanos-io/thanos/pull/5470) Receive: Implement exposing TSDB stats for all tenants
- [#5493](https://github.com/thanos-io/thanos/pull/5493) Compact: Added `--compact.blocks-fetch-concurrency` allowing to configure number of go routines for download blocks during compactions.
- [#5480](https://github.com/thanos-io/thanos/pull/5480) Query: Expose endpoint info timeout as a hidden flag.
- [#5527](https://github.com/thanos-io/thanos/pull/5527) Receive: Add per request limits for remote write.
- [#5520](https://github.com/thanos-io/thanos/pull/5520) Receive: Meta-monitoring based active series limiting.
- [#5555](https://github.com/thanos-io/thanos/pull/5555) Query: Added `--query.active-query-path` flag, allowing the user to configure the directory to create an active query tracking file, `queries.active`, for different resolution.
Expand Down
5 changes: 5 additions & 0 deletions cmd/thanos/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,8 @@ func registerQuery(app *extkingpin.App) {

unhealthyStoreTimeout := extkingpin.ModelDuration(cmd.Flag("store.unhealthy-timeout", "Timeout before an unhealthy store is cleaned from the store UI page.").Default("5m"))

endpointInfoTimeout := extkingpin.ModelDuration(cmd.Flag("endpoint.info-timeout", "Timeout of gRPC Info requests.").Default("5s").Hidden())

enableAutodownsampling := cmd.Flag("query.auto-downsampling", "Enable automatic adjustment (step / 5) to what source of data should be used in store gateways if no max_source_resolution param is specified.").
Default("false").Bool()

Expand Down Expand Up @@ -279,6 +281,7 @@ func registerQuery(app *extkingpin.App) {
time.Duration(*dnsSDInterval),
*dnsSDResolver,
time.Duration(*unhealthyStoreTimeout),
time.Duration(*endpointInfoTimeout),
time.Duration(*instantDefaultMaxSourceResolution),
*defaultMetadataTimeRange,
*strictStores,
Expand Down Expand Up @@ -347,6 +350,7 @@ func runQuery(
dnsSDInterval time.Duration,
dnsSDResolver string,
unhealthyStoreTimeout time.Duration,
endpointInfoTimeout time.Duration,
instantDefaultMaxSourceResolution time.Duration,
defaultMetadataTimeRange time.Duration,
strictStores []string,
Expand Down Expand Up @@ -459,6 +463,7 @@ func runQuery(
},
dialOpts,
unhealthyStoreTimeout,
endpointInfoTimeout,
)
proxy = store.NewProxyStore(logger, reg, endpoints.GetStoreClients, component.Query, selectorLset, storeResponseTimeout)
rulesProxy = rules.NewProxy(logger, endpoints.GetRulesClients)
Expand Down
10 changes: 5 additions & 5 deletions pkg/query/endpointset.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ type EndpointSet struct {
// accessible and we close gRPC client for it, unless it is strict.
endpointSpec func() map[string]*GRPCEndpointSpec
dialOpts []grpc.DialOption
gRPCInfoCallTimeout time.Duration
endpointInfoTimeout time.Duration
unhealthyEndpointTimeout time.Duration

updateMtx sync.Mutex
Expand All @@ -272,6 +272,7 @@ func NewEndpointSet(
endpointSpecs func() []*GRPCEndpointSpec,
dialOpts []grpc.DialOption,
unhealthyEndpointTimeout time.Duration,
endpointInfoTimeout time.Duration,
) *EndpointSet {
endpointsMetric := newEndpointSetNodeCollector()
if reg != nil {
Expand All @@ -292,9 +293,8 @@ func NewEndpointSet(
endpointsMetric: endpointsMetric,

dialOpts: dialOpts,
gRPCInfoCallTimeout: 5 * time.Second,
endpointInfoTimeout: endpointInfoTimeout,
unhealthyEndpointTimeout: unhealthyEndpointTimeout,

endpointSpec: func() map[string]*GRPCEndpointSpec {
specs := make(map[string]*GRPCEndpointSpec)
for _, s := range endpointSpecs() {
Expand Down Expand Up @@ -327,7 +327,7 @@ func (e *EndpointSet) Update(ctx context.Context) {
wg.Add(1)
go func(spec *GRPCEndpointSpec) {
defer wg.Done()
ctx, cancel := context.WithTimeout(ctx, e.gRPCInfoCallTimeout)
ctx, cancel := context.WithTimeout(ctx, e.endpointInfoTimeout)
defer cancel()
e.updateEndpoint(ctx, spec, er)

Expand All @@ -342,7 +342,7 @@ func (e *EndpointSet) Update(ctx context.Context) {
wg.Add(1)
go func(spec *GRPCEndpointSpec) {
defer wg.Done()
ctx, cancel := context.WithTimeout(ctx, e.gRPCInfoCallTimeout)
ctx, cancel := context.WithTimeout(ctx, e.endpointInfoTimeout)
defer cancel()

newRef, err := e.newEndpointRef(ctx, spec)
Expand Down
23 changes: 8 additions & 15 deletions pkg/query/endpointset_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -373,7 +373,6 @@ func TestEndpointSetUpdate_DuplicateSpecs(t *testing.T) {
discoveredEndpointAddr = append(discoveredEndpointAddr, discoveredEndpointAddr[0])

endpointSet := makeEndpointSet(discoveredEndpointAddr, false, time.Now)
endpointSet.gRPCInfoCallTimeout = 1 * time.Second
defer endpointSet.Close()

endpointSet.Update(context.Background())
Expand All @@ -396,7 +395,6 @@ func TestEndpointSetUpdate_EndpointGoingAway(t *testing.T) {

discoveredEndpointAddr := endpoints.EndpointAddresses()
endpointSet := makeEndpointSet(discoveredEndpointAddr, false, time.Now)
endpointSet.gRPCInfoCallTimeout = 1 * time.Second
defer endpointSet.Close()

// Initial update.
Expand Down Expand Up @@ -559,7 +557,7 @@ func TestEndpointSetUpdate_AtomicEndpointAdditions(t *testing.T) {
updateTime := time.Now()
discoveredEndpointAddr := endpoints.EndpointAddresses()
endpointSet := makeEndpointSet(discoveredEndpointAddr, false, func() time.Time { return updateTime })
endpointSet.gRPCInfoCallTimeout = 3 * time.Second
endpointSet.endpointInfoTimeout = 3 * time.Second
defer endpointSet.Close()

var wg sync.WaitGroup
Expand Down Expand Up @@ -648,8 +646,7 @@ func TestEndpointSet_Update(t *testing.T) {
}
return specs
},
testGRPCOpts, time.Minute)
endpointSet.gRPCInfoCallTimeout = 2 * time.Second
testGRPCOpts, time.Minute, 2*time.Second)
defer endpointSet.Close()

// Initial update.
Expand Down Expand Up @@ -1030,8 +1027,7 @@ func TestEndpointSet_Update_NoneAvailable(t *testing.T) {
}
return specs
},
testGRPCOpts, time.Minute)
endpointSet.gRPCInfoCallTimeout = 2 * time.Second
testGRPCOpts, time.Minute, 2*time.Second)
defer endpointSet.Close()

// Should not matter how many of these we run.
Expand Down Expand Up @@ -1140,9 +1136,8 @@ func TestEndpoint_Update_QuerierStrict(t *testing.T) {
NewGRPCEndpointSpec(discoveredEndpointAddr[1], false),
NewGRPCEndpointSpec(discoveredEndpointAddr[2], true),
}
}, testGRPCOpts, time.Minute)
}, testGRPCOpts, time.Minute, 1*time.Second)
defer endpointSet.Close()
endpointSet.gRPCInfoCallTimeout = 1 * time.Second

// Initial update.
endpointSet.Update(context.Background())
Expand All @@ -1161,12 +1156,12 @@ func TestEndpoint_Update_QuerierStrict(t *testing.T) {
testutil.Equals(t, int64(54321), curMax, "got incorrect minimum time")

// Successfully retrieve the information and observe minTime/maxTime updating.
endpointSet.gRPCInfoCallTimeout = 3 * time.Second
endpointSet.endpointInfoTimeout = 3 * time.Second
endpointSet.Update(context.Background())
updatedCurMin, updatedCurMax := endpointSet.endpoints[slowStaticEndpointAddr].metadata.Store.MinTime, endpointSet.endpoints[slowStaticEndpointAddr].metadata.Store.MaxTime
testutil.Equals(t, int64(65644), updatedCurMin)
testutil.Equals(t, int64(77777), updatedCurMax)
endpointSet.gRPCInfoCallTimeout = 1 * time.Second
endpointSet.endpointInfoTimeout = 1 * time.Second

// Turn off the endpoints.
endpoints.Close()
Expand Down Expand Up @@ -1320,7 +1315,7 @@ func TestEndpointSet_APIs_Discovery(t *testing.T) {

return tc.states[currentState].endpointSpec()
},
testGRPCOpts, time.Minute)
testGRPCOpts, time.Minute, 2*time.Second)

defer endpointSet.Close()

Expand Down Expand Up @@ -1506,9 +1501,7 @@ func makeEndpointSet(discoveredEndpointAddr []string, strict bool, now nowFunc)
}
return specs
},
testGRPCOpts, time.Minute)
endpointSet.gRPCInfoCallTimeout = 1 * time.Second

testGRPCOpts, time.Minute, time.Second)
return endpointSet
}

Expand Down

0 comments on commit cbe6657

Please sign in to comment.