Skip to content

Commit

Permalink
cmd/thanos: apply Bartek's comments
Browse files Browse the repository at this point in the history
Signed-off-by: Simon Pasquier <spasquie@redhat.com>
  • Loading branch information
simonpasquier committed Jan 8, 2020
1 parent d50e5fe commit 4f1d20c
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 23 deletions.
43 changes: 22 additions & 21 deletions cmd/thanos/rule.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func registerRule(m map[string]setupFunc, app *kingpin.Application) {

queryConfig := extflag.RegisterPathOrContent(cmd, "query.config", "YAML file that contains query API servers configuration. See format details: https://thanos.io/components/rule.md/#configuration. If defined, it takes precedence over the '--query' and '--query.sd-files' flags.", false)

fileSDFiles := cmd.Flag("query.sd-files", "Path to file that contain addresses of query peers. The path can be a glob pattern (repeatable).").
fileSDFiles := cmd.Flag("query.sd-files", "Path to file that contains addresses of query API servers. The path can be a glob pattern (repeatable).").
PlaceHolder("<path>").Strings()

fileSDInterval := modelDuration(cmd.Flag("query.sd-interval", "Refresh interval to re-read file SD files. (used as a fallback)").
Expand Down Expand Up @@ -318,6 +318,8 @@ func runRule(
return err
}
queryClients = append(queryClients, queryClient)
// Discover and resolve query addresses.
addDiscoveryGroups(g, queryClient, dnsSDInterval)
}

db, err := tsdb.Open(dataDir, log.With(logger, "component", "tsdb"), reg, tsdbOpts)
Expand Down Expand Up @@ -367,13 +369,14 @@ func runRule(
if err != nil {
return err
}
c.Transport = tracing.HTTPTripperware(logger, c.Transport)
// Each Alertmanager client has a different list of targets thus each needs its own DNS provider.
amClient, err := http_util.NewFanoutClient(logger, cfg.EndpointsConfig, c, amProvider.Clone())
if err != nil {
return err
}
// Discover and resolve Alertmanager addresses.
addToGroup(g, amClient, alertmgrsDNSSDInterval)
addDiscoveryGroups(g, amClient, alertmgrsDNSSDInterval)

alertmgrs = append(alertmgrs, alert.NewAlertmanager(logger, amClient, time.Duration(cfg.Timeout)))
}
Expand Down Expand Up @@ -461,12 +464,6 @@ func runRule(
cancel()
})
}
// Discover and resolve query addresses.
{
for _, c := range queryClients {
addToGroup(g, c, dnsSDInterval)
}
}

// Handle reload and termination interrupts.
reload := make(chan struct{}, 1)
Expand Down Expand Up @@ -718,36 +715,40 @@ func queryFunc(
panic(errors.Errorf("unknown partial response strategy %v", partialResponseStrategy).Error())
}

promClients := make([]*promclient.Client, 0, len(queriers))
for _, q := range queriers {
promClients = append(promClients, promclient.NewClient(logger, q))
}

return func(ctx context.Context, q string, t time.Time) (promql.Vector, error) {
for _, i := range rand.Perm(len(queriers)) {
querier := queriers[i]
c := promclient.NewClient(logger, querier)
endpoints := removeDuplicateQueryEndpoints(logger, duplicatedQuery, querier.Endpoints())
promClient := promClients[i]
endpoints := removeDuplicateQueryEndpoints(logger, duplicatedQuery, queriers[i].Endpoints())
for _, i := range rand.Perm(len(endpoints)) {
span, ctx := tracing.StartSpan(ctx, spanID)
v, warns, err := c.PromqlQueryInstant(ctx, endpoints[i], q, t, promclient.QueryOptions{
v, warns, err := promClient.PromqlQueryInstant(ctx, endpoints[i], q, t, promclient.QueryOptions{
Deduplicate: true,
PartialResponseStrategy: partialResponseStrategy,
})
span.Finish()

if err != nil {
level.Error(logger).Log("err", err, "query", q)
} else {
if len(warns) > 0 {
ruleEvalWarnings.WithLabelValues(strings.ToLower(partialResponseStrategy.String())).Inc()
// TODO(bwplotka): Propagate those to UI, probably requires changing rule manager code ):
level.Warn(logger).Log("warnings", strings.Join(warns, ", "), "query", q)
}
return v, nil
continue
}
if len(warns) > 0 {
ruleEvalWarnings.WithLabelValues(strings.ToLower(partialResponseStrategy.String())).Inc()
// TODO(bwplotka): Propagate those to UI, probably requires changing rule manager code ):
level.Warn(logger).Log("warnings", strings.Join(warns, ", "), "query", q)
}
return v, nil
}
}
return nil, errors.Errorf("no query peer reachable")
return nil, errors.Errorf("no query API server reachable")
}
}

func addToGroup(g *run.Group, c *http_util.FanoutClient, interval time.Duration) {
func addDiscoveryGroups(g *run.Group, c *http_util.FanoutClient, interval time.Duration) {
ctx, cancel := context.WithCancel(context.Background())
g.Add(func() error {
c.Discover(ctx)
Expand Down
4 changes: 2 additions & 2 deletions docs/components/rule.md
Original file line number Diff line number Diff line change
Expand Up @@ -373,8 +373,8 @@ Flags:
If defined, it takes precedence over the
'--query' and '--query.sd-files' flags.
--query.sd-files=<path> ...
Path to file that contain addresses of query
peers. The path can be a glob pattern
Path to file that contains addresses of query
API servers. The path can be a glob pattern
(repeatable).
--query.sd-interval=5m Refresh interval to re-read file SD files.
(used as a fallback)
Expand Down

0 comments on commit 4f1d20c

Please sign in to comment.