diff --git a/cmd/thanos/rule.go b/cmd/thanos/rule.go index ac666fc03dd..73b2f5afa89 100644 --- a/cmd/thanos/rule.go +++ b/cmd/thanos/rule.go @@ -99,7 +99,7 @@ func registerRule(m map[string]setupFunc, app *kingpin.Application) { queryConfig := extflag.RegisterPathOrContent(cmd, "query.config", "YAML file that contains query API servers configuration. See format details: https://thanos.io/components/rule.md/#configuration. If defined, it takes precedence over the '--query' and '--query.sd-files' flags.", false) - fileSDFiles := cmd.Flag("query.sd-files", "Path to file that contain addresses of query peers. The path can be a glob pattern (repeatable)."). + fileSDFiles := cmd.Flag("query.sd-files", "Path to file that contains addresses of query API servers. The path can be a glob pattern (repeatable)."). PlaceHolder("").Strings() fileSDInterval := modelDuration(cmd.Flag("query.sd-interval", "Refresh interval to re-read file SD files. (used as a fallback)"). @@ -318,6 +318,8 @@ func runRule( return err } queryClients = append(queryClients, queryClient) + // Discover and resolve query addresses. + addDiscoveryGroups(g, queryClient, dnsSDInterval) } db, err := tsdb.Open(dataDir, log.With(logger, "component", "tsdb"), reg, tsdbOpts) @@ -367,13 +369,14 @@ func runRule( if err != nil { return err } + c.Transport = tracing.HTTPTripperware(logger, c.Transport) // Each Alertmanager client has a different list of targets thus each needs its own DNS provider. amClient, err := http_util.NewFanoutClient(logger, cfg.EndpointsConfig, c, amProvider.Clone()) if err != nil { return err } // Discover and resolve Alertmanager addresses. - addToGroup(g, amClient, alertmgrsDNSSDInterval) + addDiscoveryGroups(g, amClient, alertmgrsDNSSDInterval) alertmgrs = append(alertmgrs, alert.NewAlertmanager(logger, amClient, time.Duration(cfg.Timeout))) } @@ -461,12 +464,6 @@ func runRule( cancel() }) } - // Discover and resolve query addresses. - { - for _, c := range queryClients { - addToGroup(g, c, dnsSDInterval) - } - } // Handle reload and termination interrupts. reload := make(chan struct{}, 1) @@ -718,14 +715,18 @@ func queryFunc( panic(errors.Errorf("unknown partial response strategy %v", partialResponseStrategy).Error()) } + promClients := make([]*promclient.Client, 0, len(queriers)) + for _, q := range queriers { + promClients = append(promClients, promclient.NewClient(logger, q)) + } + return func(ctx context.Context, q string, t time.Time) (promql.Vector, error) { for _, i := range rand.Perm(len(queriers)) { - querier := queriers[i] - c := promclient.NewClient(logger, querier) - endpoints := removeDuplicateQueryEndpoints(logger, duplicatedQuery, querier.Endpoints()) + promClient := promClients[i] + endpoints := removeDuplicateQueryEndpoints(logger, duplicatedQuery, queriers[i].Endpoints()) for _, i := range rand.Perm(len(endpoints)) { span, ctx := tracing.StartSpan(ctx, spanID) - v, warns, err := c.PromqlQueryInstant(ctx, endpoints[i], q, t, promclient.QueryOptions{ + v, warns, err := promClient.PromqlQueryInstant(ctx, endpoints[i], q, t, promclient.QueryOptions{ Deduplicate: true, PartialResponseStrategy: partialResponseStrategy, }) @@ -733,21 +734,21 @@ func queryFunc( if err != nil { level.Error(logger).Log("err", err, "query", q) - } else { - if len(warns) > 0 { - ruleEvalWarnings.WithLabelValues(strings.ToLower(partialResponseStrategy.String())).Inc() - // TODO(bwplotka): Propagate those to UI, probably requires changing rule manager code ): - level.Warn(logger).Log("warnings", strings.Join(warns, ", "), "query", q) - } - return v, nil + continue + } + if len(warns) > 0 { + ruleEvalWarnings.WithLabelValues(strings.ToLower(partialResponseStrategy.String())).Inc() + // TODO(bwplotka): Propagate those to UI, probably requires changing rule manager code ): + level.Warn(logger).Log("warnings", strings.Join(warns, ", "), "query", q) } + return v, nil } } - return nil, errors.Errorf("no query peer reachable") + return nil, errors.Errorf("no query API server reachable") } } -func addToGroup(g *run.Group, c *http_util.FanoutClient, interval time.Duration) { +func addDiscoveryGroups(g *run.Group, c *http_util.FanoutClient, interval time.Duration) { ctx, cancel := context.WithCancel(context.Background()) g.Add(func() error { c.Discover(ctx) diff --git a/docs/components/rule.md b/docs/components/rule.md index 73188b0aa41..a563d3907e9 100644 --- a/docs/components/rule.md +++ b/docs/components/rule.md @@ -373,8 +373,8 @@ Flags: If defined, it takes precedence over the '--query' and '--query.sd-files' flags. --query.sd-files= ... - Path to file that contain addresses of query - peers. The path can be a glob pattern + Path to file that contains addresses of query + API servers. The path can be a glob pattern (repeatable). --query.sd-interval=5m Refresh interval to re-read file SD files. (used as a fallback)