Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Query: allow multiple deduplication label. #1362

Merged
merged 17 commits into from
Sep 16, 2019
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,12 @@ We use *breaking* word for marking changes that are not backward compatible (rel
### Added

- [#1358](https://github.com/thanos-io/thanos/pull/1358) Added `part_size` configuration option for HTTP multipart requests minimum part size for S3 storage type
- [#1362](https://github.com/thanos-io/thanos/pull/1362) Optional `replicaLabels` param for `/query` and `/query_range` querier endpoints. When provided overwrite the `query.replica-label` cli flags.

### Changed

- [#1362](https://github.com/thanos-io/thanos/pull/1362) `query.replica-label` configuration can be provided more than once for multiple deduplication labels like: `--query.replica-label=prometheus_replica --query.replica-label=service`.

- [#1338](https://github.com/thanos-io/thanos/pull/1338) Querier still warns on store API duplicate, but allows a single one from duplicated set. This is gracefully warn about the problematic logic and not disrupt immediately.

### Fixed
Expand Down
10 changes: 5 additions & 5 deletions cmd/thanos/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,8 @@ func registerQuery(m map[string]setupFunc, app *kingpin.Application, name string
maxConcurrentQueries := cmd.Flag("query.max-concurrent", "Maximum number of queries processed concurrently by query node.").
Default("20").Int()

replicaLabel := cmd.Flag("query.replica-label", "Label to treat as a replica indicator along which data is deduplicated. Still you will be able to query without deduplication using 'dedup=false' parameter.").
String()
replicaLabels := cmd.Flag("query.replica-label", "Labels to treat as a replica indicator along which data is deduplicated. Still you will be able to query without deduplication using 'dedup=false' parameter.").
Strings()

selectorLabels := cmd.Flag("selector-label", "Query selector labels that will be exposed in info endpoint (repeated).").
PlaceHolder("<name>=\"<value>\"").Strings()
Expand Down Expand Up @@ -145,7 +145,7 @@ func registerQuery(m map[string]setupFunc, app *kingpin.Application, name string
*maxConcurrentQueries,
time.Duration(*queryTimeout),
time.Duration(*storeResponseTimeout),
*replicaLabel,
*replicaLabels,
selectorLset,
*stores,
*enableAutodownsampling,
Expand Down Expand Up @@ -262,7 +262,7 @@ func runQuery(
maxConcurrentQueries int,
queryTimeout time.Duration,
storeResponseTimeout time.Duration,
replicaLabel string,
replicaLabels []string,
selectorLset labels.Labels,
storeAddrs []string,
enableAutodownsampling bool,
Expand Down Expand Up @@ -309,7 +309,7 @@ func runQuery(
unhealthyStoreTimeout,
)
proxy = store.NewProxyStore(logger, stores.Get, component.Query, selectorLset, storeResponseTimeout)
queryableCreator = query.NewQueryableCreator(logger, proxy, replicaLabel)
queryableCreator = query.NewQueryableCreator(logger, proxy, replicaLabels)
engine = promql.NewEngine(
promql.EngineOpts{
Logger: logger,
Expand Down
39 changes: 33 additions & 6 deletions docs/components/query.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,12 @@ $ thanos query \
## Deduplication

The query layer can deduplicate series that were collected from high-availability pairs of data sources such as Prometheus.
A fixed replica label must be chosen for the entire cluster and can then be passed to query nodes on startup.
A fixed single or multiple replica labels must be chosen for the entire cluster and can then be passed to query nodes on startup.

Two or more series that are only distinguished by the given replica label, will be merged into a single time series.
This also hides gaps in collection of a single data source. For example:
This also hides gaps in collection of a single data source.

### An example with a single replica labels:

* Prometheus + sidecar "A": `cluster=1,env=2,replica=A`
* Prometheus + sidecar "B": `cluster=1,env=2,replica=B`
Expand All @@ -47,12 +49,28 @@ And we query for metric `up{job="prometheus",env="2"}` with this option we will
* `up{job="prometheus",env="2",cluster="1"} 1`
* `up{job="prometheus",env="2",cluster="2"} 1`

WITHOUT this replica flag (so deduplication turned off), we will get 3 results:
WITHOUT this replica flag (deduplication turned off), we will get 3 results:

* `up{job="prometheus",env="2",cluster="1",replica="A"} 1`
* `up{job="prometheus",env="2",cluster="1",replica="B"} 1`
* `up{job="prometheus",env="2",cluster="2",replica="A"} 1`

### The same example with multiple replica labels:
krasi-georgiev marked this conversation as resolved.
Show resolved Hide resolved

* Prometheus + sidecar "A": `cluster=1,env=2,replica=A,replicaX=A`
* Prometheus + sidecar "B": `cluster=1,env=2,replica=B,replicaX=B`
* Prometheus + sidecar "A" in different cluster: `cluster=2,env=2,replica=A,replicaX=A`

```
$ thanos query \
--http-address "0.0.0.0:9090" \
--query.replica-label "replica" \
--query.replica-label "replicaX" \
--store "<store-api>:<grpc-port>" \
--store "<store-api2>:<grpc-port>" \
```


This logic can also be controlled via parameter on QueryAPI. More details below.

## Query API
Expand Down Expand Up @@ -87,14 +105,23 @@ Querier also allows to configure different timeouts:
If you prefer availability over accuracy you can set tighter timeout to underlying StoreAPI than overall query timeout. If partial response
strategy is NOT `abort`, this will "ignore" slower StoreAPIs producing just warning with 200 status code response.

### Deduplication replica labels.

| HTTP URL/FORM parameter | Type | Default | Example |
|----|----|----|----|
| `replicaLabels` | `[]string` | `query.replica-label` flag (default: empty). | `replicaLabels=replicaA&replicaLabels=replicaB` |
| | | | |

This overwrites the `query.replica-label` cli flag to allow dynamic replica labels at query time.

### Deduplication Enabled

| HTTP URL/FORM parameter | Type | Default | Example |
|----|----|----|----|
| `dedup` | `Boolean` | True, but effect depends on `query.replica` configuration flag. | `1, t, T, TRUE, true, True` for "True" |
| | | | |

This controls if query should use `replica` label for deduplication or not.
This controls if query results should be deduplicated using the replica labels.

### Auto downsampling

Expand Down Expand Up @@ -230,8 +257,8 @@ Flags:
--query.timeout=2m Maximum time to process query by query node.
--query.max-concurrent=20 Maximum number of queries processed
concurrently by query node.
--query.replica-label=QUERY.REPLICA-LABEL
Label to treat as a replica indicator along
--query.replica-label=QUERY.REPLICA-LABEL ...
Labels to treat as a replica indicator along
which data is deduplicated. Still you will be
able to query without deduplication using
'dedup=false' parameter.
Expand Down
1 change: 1 addition & 0 deletions docs/getting-started.md
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ thanos query \
--store 1.2.3.4:19090 \
--store 1.2.3.5:19090 \
--query.replica-label replica # Replica label for de-duplication
--query.replica-label replicaX # Supports multiple replica labels for de-duplication
```

Go to the configured HTTP address, and you should now be able to query across all Prometheus instances and receive de-duplicated data.
Expand Down
36 changes: 31 additions & 5 deletions pkg/query/api/v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,17 @@ func (api *API) parseEnableDedupParam(r *http.Request) (enableDeduplication bool
return enableDeduplication, nil
}

func (api *API) parseReplicaLabelsParam(r *http.Request) (replicaLabels []string, _ *ApiError) {
if err := r.ParseForm(); err != nil {
return nil, &ApiError{ErrorInternal, errors.Wrap(err, "parse form")}
}

if len(r.Form["replicaLabels[]"]) > 0 {
krasi-georgiev marked this conversation as resolved.
Show resolved Hide resolved
replicaLabels = r.Form["match[]"]
krasi-georgiev marked this conversation as resolved.
Show resolved Hide resolved
}
return replicaLabels, nil
}

func (api *API) parseDownsamplingParamMillis(r *http.Request, step time.Duration) (maxResolutionMillis int64, _ *ApiError) {
const maxSourceResolutionParam = "max_source_resolution"
maxSourceResolution := 0 * time.Second
Expand Down Expand Up @@ -274,6 +285,11 @@ func (api *API) query(r *http.Request) (interface{}, []error, *ApiError) {
return nil, nil, apiErr
}

replicaLabels, apiErr := api.parseReplicaLabelsParam(r)
if apiErr != nil {
return nil, nil, apiErr
}

enablePartialResponse, apiErr := api.parsePartialResponseParam(r)
if apiErr != nil {
return nil, nil, apiErr
Expand All @@ -294,7 +310,7 @@ func (api *API) query(r *http.Request) (interface{}, []error, *ApiError) {
defer span.Finish()

begin := api.now()
qry, err := api.queryEngine.NewInstantQuery(api.queryableCreate(enableDedup, 0, enablePartialResponse, warningReporter), r.FormValue("query"), ts)
qry, err := api.queryEngine.NewInstantQuery(api.queryableCreate(enableDedup, replicaLabels, 0, enablePartialResponse, warningReporter), r.FormValue("query"), ts)
krasi-georgiev marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return nil, nil, &ApiError{errorBadData, err}
}
Expand Down Expand Up @@ -367,6 +383,11 @@ func (api *API) queryRange(r *http.Request) (interface{}, []error, *ApiError) {
return nil, nil, apiErr
}

replicaLabels, apiErr := api.parseReplicaLabelsParam(r)
if apiErr != nil {
return nil, nil, apiErr
}

maxSourceResolution, apiErr := api.parseDownsamplingParamMillis(r, step)
if apiErr != nil {
return nil, nil, apiErr
Expand All @@ -393,7 +414,7 @@ func (api *API) queryRange(r *http.Request) (interface{}, []error, *ApiError) {

begin := api.now()
qry, err := api.queryEngine.NewRangeQuery(
api.queryableCreate(enableDedup, maxSourceResolution, enablePartialResponse, warningReporter),
api.queryableCreate(enableDedup, replicaLabels, maxSourceResolution, enablePartialResponse, warningReporter),
r.FormValue("query"),
start,
end,
Expand Down Expand Up @@ -444,7 +465,7 @@ func (api *API) labelValues(r *http.Request) (interface{}, []error, *ApiError) {
warnmtx.Unlock()
}

q, err := api.queryableCreate(true, 0, enablePartialResponse, warningReporter).Querier(ctx, math.MinInt64, math.MaxInt64)
q, err := api.queryableCreate(true, nil, 0, enablePartialResponse, warningReporter).Querier(ctx, math.MinInt64, math.MaxInt64)
if err != nil {
return nil, nil, &ApiError{errorExec, err}
}
Expand Down Expand Up @@ -510,6 +531,11 @@ func (api *API) series(r *http.Request) (interface{}, []error, *ApiError) {
return nil, nil, apiErr
}

replicaLabels, apiErr := api.parseReplicaLabelsParam(r)
if apiErr != nil {
return nil, nil, apiErr
}

enablePartialResponse, apiErr := api.parsePartialResponseParam(r)
if apiErr != nil {
return nil, nil, apiErr
Expand All @@ -526,7 +552,7 @@ func (api *API) series(r *http.Request) (interface{}, []error, *ApiError) {
}

// TODO(bwplotka): Support downsampling?
q, err := api.queryableCreate(enableDedup, 0, enablePartialResponse, warningReporter).Querier(r.Context(), timestamp.FromTime(start), timestamp.FromTime(end))
q, err := api.queryableCreate(enableDedup, replicaLabels, 0, enablePartialResponse, warningReporter).Querier(r.Context(), timestamp.FromTime(start), timestamp.FromTime(end))
if err != nil {
return nil, nil, &ApiError{errorExec, err}
}
Expand Down Expand Up @@ -637,7 +663,7 @@ func (api *API) labelNames(r *http.Request) (interface{}, []error, *ApiError) {
warnmtx.Unlock()
}

q, err := api.queryableCreate(true, 0, enablePartialResponse, warningReporter).Querier(ctx, math.MinInt64, math.MaxInt64)
q, err := api.queryableCreate(true, nil, 0, enablePartialResponse, warningReporter).Querier(ctx, math.MinInt64, math.MaxInt64)
if err != nil {
return nil, nil, &ApiError{errorExec, err}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/query/api/v1_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ import (
)

func testQueryableCreator(queryable storage.Queryable) query.QueryableCreator {
return func(_ bool, _ int64, _ bool, _ query.WarningReporter) storage.Queryable {
return func(_ bool, _ []string, _ int64, _ bool, _ query.WarningReporter) storage.Queryable {
return queryable
}
}
Expand Down
12 changes: 6 additions & 6 deletions pkg/query/iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,17 +292,17 @@ func (it *chunkSeriesIterator) Err() error {
}

type dedupSeriesSet struct {
set storage.SeriesSet
replicaLabel string
set storage.SeriesSet
replicaLabels map[string]struct{}

replicas []storage.Series
lset labels.Labels
peek storage.Series
ok bool
}

func newDedupSeriesSet(set storage.SeriesSet, replicaLabel string) storage.SeriesSet {
s := &dedupSeriesSet{set: set, replicaLabel: replicaLabel}
func newDedupSeriesSet(set storage.SeriesSet, replicaLabels map[string]struct{}) storage.SeriesSet {
s := &dedupSeriesSet{set: set, replicaLabels: replicaLabels}
s.ok = s.set.Next()
if s.ok {
s.peek = s.set.At()
Expand All @@ -325,10 +325,10 @@ func (s *dedupSeriesSet) Next() bool {
// replica label if it exists
func (s *dedupSeriesSet) peekLset() labels.Labels {
lset := s.peek.Labels()
if lset[len(lset)-1].Name != s.replicaLabel {
if _, ok := s.replicaLabels[lset[len(lset)-1].Name]; !ok {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would have expected that we need to verify each replica label here. I feel like I'm missing something about this logic though, can you explain what's happening here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the last label is not in the replicaLabels list than no replica labels are set.
Replica labels are always at the end of the list so if the last one is not present in the s.replicaLabels map this guarantees that no replica labels need to be stripped.

If the last labels in the full list is a replica label than strip all replica labels by removing len(s.replicaLabels) labels from the full labels list.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Worth to comment this indeed.

return lset
}
return lset[:len(lset)-1]
return lset[:len(lset)-len(s.replicaLabels)]
krasi-georgiev marked this conversation as resolved.
Show resolved Hide resolved
}

func (s *dedupSeriesSet) next() bool {
Expand Down
Loading