Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Expose Endpoint Info timeout parameter #5480

Merged
merged 7 commits into from
Aug 12, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
- [#5475](https://github.com/thanos-io/thanos/pull/5475) Compact/Store: Added `--block-files-concurrency` allowing to configure number of go routines for download/upload block files during compaction.
- [#5470](https://github.com/thanos-io/thanos/pull/5470) Receive: Implement exposing TSDB stats for all tenants
- [#5493](https://github.com/thanos-io/thanos/pull/5493) Compact: Added `--compact.blocks-fetch-concurrency` allowing to configure number of go routines for download blocks during compactions.
- [#5480](https://github.com/thanos-io/thanos/pull/5480) Query: Expose endpoint info timeout as a hidden flag.
- [#5527](https://github.com/thanos-io/thanos/pull/5527) Receive: Add per request limits for remote write.
- [#5520](https://github.com/thanos-io/thanos/pull/5520) Receive: Meta-monitoring based active series limiting.
- [#5555](https://github.com/thanos-io/thanos/pull/5555) Query: Added `--query.active-query-path` flag, allowing the user to configure the directory to create an active query tracking file, `queries.active`, for different resolution.
Expand Down
5 changes: 5 additions & 0 deletions cmd/thanos/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,8 @@ func registerQuery(app *extkingpin.App) {

unhealthyStoreTimeout := extkingpin.ModelDuration(cmd.Flag("store.unhealthy-timeout", "Timeout before an unhealthy store is cleaned from the store UI page.").Default("5m"))

endpointInfoTimeout := extkingpin.ModelDuration(cmd.Flag("endpoint.info-timeout", "Timeout of gRPC Info requests.").Default("5s").Hidden())

enableAutodownsampling := cmd.Flag("query.auto-downsampling", "Enable automatic adjustment (step / 5) to what source of data should be used in store gateways if no max_source_resolution param is specified.").
Default("false").Bool()

Expand Down Expand Up @@ -279,6 +281,7 @@ func registerQuery(app *extkingpin.App) {
time.Duration(*dnsSDInterval),
*dnsSDResolver,
time.Duration(*unhealthyStoreTimeout),
time.Duration(*endpointInfoTimeout),
time.Duration(*instantDefaultMaxSourceResolution),
*defaultMetadataTimeRange,
*strictStores,
Expand Down Expand Up @@ -347,6 +350,7 @@ func runQuery(
dnsSDInterval time.Duration,
dnsSDResolver string,
unhealthyStoreTimeout time.Duration,
endpointInfoTimeout time.Duration,
instantDefaultMaxSourceResolution time.Duration,
defaultMetadataTimeRange time.Duration,
strictStores []string,
Expand Down Expand Up @@ -459,6 +463,7 @@ func runQuery(
},
dialOpts,
unhealthyStoreTimeout,
endpointInfoTimeout,
)
proxy = store.NewProxyStore(logger, reg, endpoints.GetStoreClients, component.Query, selectorLset, storeResponseTimeout)
rulesProxy = rules.NewProxy(logger, endpoints.GetRulesClients)
Expand Down
10 changes: 5 additions & 5 deletions pkg/query/endpointset.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ type EndpointSet struct {
// accessible and we close gRPC client for it, unless it is strict.
endpointSpec func() map[string]*GRPCEndpointSpec
dialOpts []grpc.DialOption
gRPCInfoCallTimeout time.Duration
endpointInfoTimeout time.Duration
unhealthyEndpointTimeout time.Duration

updateMtx sync.Mutex
Expand All @@ -272,6 +272,7 @@ func NewEndpointSet(
endpointSpecs func() []*GRPCEndpointSpec,
dialOpts []grpc.DialOption,
unhealthyEndpointTimeout time.Duration,
endpointInfoTimeout time.Duration,
) *EndpointSet {
endpointsMetric := newEndpointSetNodeCollector()
if reg != nil {
Expand All @@ -292,9 +293,8 @@ func NewEndpointSet(
endpointsMetric: endpointsMetric,

dialOpts: dialOpts,
gRPCInfoCallTimeout: 5 * time.Second,
endpointInfoTimeout: endpointInfoTimeout,
unhealthyEndpointTimeout: unhealthyEndpointTimeout,

endpointSpec: func() map[string]*GRPCEndpointSpec {
specs := make(map[string]*GRPCEndpointSpec)
for _, s := range endpointSpecs() {
Expand Down Expand Up @@ -327,7 +327,7 @@ func (e *EndpointSet) Update(ctx context.Context) {
wg.Add(1)
go func(spec *GRPCEndpointSpec) {
defer wg.Done()
ctx, cancel := context.WithTimeout(ctx, e.gRPCInfoCallTimeout)
ctx, cancel := context.WithTimeout(ctx, e.endpointInfoTimeout)
defer cancel()
e.updateEndpoint(ctx, spec, er)

Expand All @@ -342,7 +342,7 @@ func (e *EndpointSet) Update(ctx context.Context) {
wg.Add(1)
go func(spec *GRPCEndpointSpec) {
defer wg.Done()
ctx, cancel := context.WithTimeout(ctx, e.gRPCInfoCallTimeout)
ctx, cancel := context.WithTimeout(ctx, e.endpointInfoTimeout)
defer cancel()

newRef, err := e.newEndpointRef(ctx, spec)
Expand Down
23 changes: 8 additions & 15 deletions pkg/query/endpointset_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -373,7 +373,6 @@ func TestEndpointSetUpdate_DuplicateSpecs(t *testing.T) {
discoveredEndpointAddr = append(discoveredEndpointAddr, discoveredEndpointAddr[0])

endpointSet := makeEndpointSet(discoveredEndpointAddr, false, time.Now)
endpointSet.gRPCInfoCallTimeout = 1 * time.Second
defer endpointSet.Close()

endpointSet.Update(context.Background())
Expand All @@ -396,7 +395,6 @@ func TestEndpointSetUpdate_EndpointGoingAway(t *testing.T) {

discoveredEndpointAddr := endpoints.EndpointAddresses()
endpointSet := makeEndpointSet(discoveredEndpointAddr, false, time.Now)
endpointSet.gRPCInfoCallTimeout = 1 * time.Second
defer endpointSet.Close()

// Initial update.
Expand Down Expand Up @@ -559,7 +557,7 @@ func TestEndpointSetUpdate_AtomicEndpointAdditions(t *testing.T) {
updateTime := time.Now()
discoveredEndpointAddr := endpoints.EndpointAddresses()
endpointSet := makeEndpointSet(discoveredEndpointAddr, false, func() time.Time { return updateTime })
endpointSet.gRPCInfoCallTimeout = 3 * time.Second
endpointSet.endpointInfoTimeout = 3 * time.Second
defer endpointSet.Close()

var wg sync.WaitGroup
Expand Down Expand Up @@ -648,8 +646,7 @@ func TestEndpointSet_Update(t *testing.T) {
}
return specs
},
testGRPCOpts, time.Minute)
endpointSet.gRPCInfoCallTimeout = 2 * time.Second
testGRPCOpts, time.Minute, 2*time.Second)
defer endpointSet.Close()

// Initial update.
Expand Down Expand Up @@ -1030,8 +1027,7 @@ func TestEndpointSet_Update_NoneAvailable(t *testing.T) {
}
return specs
},
testGRPCOpts, time.Minute)
endpointSet.gRPCInfoCallTimeout = 2 * time.Second
testGRPCOpts, time.Minute, 2*time.Second)
defer endpointSet.Close()

// Should not matter how many of these we run.
Expand Down Expand Up @@ -1140,9 +1136,8 @@ func TestEndpoint_Update_QuerierStrict(t *testing.T) {
NewGRPCEndpointSpec(discoveredEndpointAddr[1], false),
NewGRPCEndpointSpec(discoveredEndpointAddr[2], true),
}
}, testGRPCOpts, time.Minute)
}, testGRPCOpts, time.Minute, 1*time.Second)
defer endpointSet.Close()
endpointSet.gRPCInfoCallTimeout = 1 * time.Second

// Initial update.
endpointSet.Update(context.Background())
Expand All @@ -1161,12 +1156,12 @@ func TestEndpoint_Update_QuerierStrict(t *testing.T) {
testutil.Equals(t, int64(54321), curMax, "got incorrect minimum time")

// Successfully retrieve the information and observe minTime/maxTime updating.
endpointSet.gRPCInfoCallTimeout = 3 * time.Second
endpointSet.endpointInfoTimeout = 3 * time.Second
endpointSet.Update(context.Background())
updatedCurMin, updatedCurMax := endpointSet.endpoints[slowStaticEndpointAddr].metadata.Store.MinTime, endpointSet.endpoints[slowStaticEndpointAddr].metadata.Store.MaxTime
testutil.Equals(t, int64(65644), updatedCurMin)
testutil.Equals(t, int64(77777), updatedCurMax)
endpointSet.gRPCInfoCallTimeout = 1 * time.Second
endpointSet.endpointInfoTimeout = 1 * time.Second

// Turn off the endpoints.
endpoints.Close()
Expand Down Expand Up @@ -1320,7 +1315,7 @@ func TestEndpointSet_APIs_Discovery(t *testing.T) {

return tc.states[currentState].endpointSpec()
},
testGRPCOpts, time.Minute)
testGRPCOpts, time.Minute, 2*time.Second)

defer endpointSet.Close()

Expand Down Expand Up @@ -1506,9 +1501,7 @@ func makeEndpointSet(discoveredEndpointAddr []string, strict bool, now nowFunc)
}
return specs
},
testGRPCOpts, time.Minute)
endpointSet.gRPCInfoCallTimeout = 1 * time.Second

testGRPCOpts, time.Minute, time.Second)
return endpointSet
}

Expand Down