Skip to content

Commit

Permalink
Moved Prometheus 2.11.1 and TSDB to 0.9.1 (#1380)
Browse files Browse the repository at this point in the history
Signed-off-by: Bartek Plotka <bwplotka@gmail.com>
  • Loading branch information
bwplotka authored Aug 7, 2019
1 parent 477a720 commit 04e5e7d
Show file tree
Hide file tree
Showing 9 changed files with 130 additions and 155 deletions.
7 changes: 5 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,21 @@ We use *breaking* word for marking changes that are not backward compatible (rel
### Added

- [#1358](https://github.com/thanos-io/thanos/pull/1358) Added `part_size` configuration option for HTTP multipart requests minimum part size for S3 storage type

- [#1363](https://github.com/thanos-io/thanos/pull/1363) Thanos Receive now exposes `thanos_receive_hashring_nodes` and `thanos_receive_hashring_tenants` metrics to monitor status of hash-rings

### Changed

- [#1380](https://github.com/thanos-io/thanos/pull/1380) Upgraded important dependencies: Prometheus to 2.11.1 and TSDB to 0.9.1. Some changes affecting Querier:
- [ENHANCEMENT] Query performance improvement: Efficient iteration and search in HashForLabels and HashWithoutLabels. #5707
- [ENHANCEMENT] Optimize queries using regexp for set lookups. tsdb#602
- [BUGFIX] prometheus_tsdb_compactions_failed_total is now incremented on any compaction failure. tsdb#613
- [BUGFIX] PromQL: Correctly display {__name__="a"}. #5552
- [#1338](https://github.com/thanos-io/thanos/pull/1338) Querier still warns on store API duplicate, but allows a single one from duplicated set. This is gracefully warn about the problematic logic and not disrupt immediately.
- [#1297](https://github.com/improbable-eng/thanos/pull/1297) Added `/-/ready` and `/-/healthy` endpoints to Thanos compact.

### Fixed

- [#1327](https://github.com/thanos-io/thanos/pull/1327) `/series` API end-point now properly returns an empty array just like Prometheus if there are no results

- [#1302](https://github.com/thanos-io/thanos/pull/1302) Thanos now efficiently reuses HTTP keep-alive connections

## [v0.6.0](https://github.com/thanos-io/thanos/releases/tag/v0.6.0) - 2019.07.18
Expand Down
5 changes: 3 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ require (
cloud.google.com/go v0.34.0
github.com/Azure/azure-storage-blob-go v0.7.0
github.com/NYTimes/gziphandler v1.1.1
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 // indirect
github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da
github.com/cespare/xxhash v1.1.0
github.com/fatih/structtag v1.0.0
Expand Down Expand Up @@ -33,8 +34,8 @@ require (
github.com/pkg/errors v0.8.1
github.com/prometheus/client_golang v1.0.0
github.com/prometheus/common v0.6.0
github.com/prometheus/prometheus v2.9.2+incompatible
github.com/prometheus/tsdb v0.8.0
github.com/prometheus/prometheus v0.0.0-20190710134608-e5b22494857d
github.com/prometheus/tsdb v0.9.1
github.com/uber-go/atomic v1.4.0 // indirect
github.com/uber/jaeger-client-go v2.16.0+incompatible
github.com/uber/jaeger-lib v2.0.0+incompatible
Expand Down
124 changes: 78 additions & 46 deletions go.sum

Large diffs are not rendered by default.

84 changes: 17 additions & 67 deletions pkg/query/api/v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,11 @@ import (
"math"
"net/http"
"strconv"
"sync"
"time"

"github.com/NYTimes/gziphandler"

"github.com/go-kit/kit/log"
opentracing "github.com/opentracing/opentracing-go"
"github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
Expand Down Expand Up @@ -279,22 +277,12 @@ func (api *API) query(r *http.Request) (interface{}, []error, *ApiError) {
return nil, nil, apiErr
}

var (
warnmtx sync.Mutex
warnings []error
)
warningReporter := func(err error) {
warnmtx.Lock()
warnings = append(warnings, err)
warnmtx.Unlock()
}

// We are starting promQL tracing span here, because we have no control over promQL code.
span, ctx := tracing.StartSpan(ctx, "promql_instant_query")
defer span.Finish()

begin := api.now()
qry, err := api.queryEngine.NewInstantQuery(api.queryableCreate(enableDedup, 0, enablePartialResponse, warningReporter), r.FormValue("query"), ts)
qry, err := api.queryEngine.NewInstantQuery(api.queryableCreate(enableDedup, 0, enablePartialResponse), r.FormValue("query"), ts)
if err != nil {
return nil, nil, &ApiError{errorBadData, err}
}
Expand All @@ -316,7 +304,7 @@ func (api *API) query(r *http.Request) (interface{}, []error, *ApiError) {
return &queryData{
ResultType: res.Value.Type(),
Result: res.Value,
}, warnings, nil
}, res.Warnings, nil
}

func (api *API) queryRange(r *http.Request) (interface{}, []error, *ApiError) {
Expand Down Expand Up @@ -377,23 +365,13 @@ func (api *API) queryRange(r *http.Request) (interface{}, []error, *ApiError) {
return nil, nil, apiErr
}

var (
warnmtx sync.Mutex
warnings []error
)
warningReporter := func(err error) {
warnmtx.Lock()
warnings = append(warnings, err)
warnmtx.Unlock()
}

// We are starting promQL tracing span here, because we have no control over promQL code.
span, ctx := tracing.StartSpan(ctx, "promql_range_query")
defer span.Finish()

begin := api.now()
qry, err := api.queryEngine.NewRangeQuery(
api.queryableCreate(enableDedup, maxSourceResolution, enablePartialResponse, warningReporter),
api.queryableCreate(enableDedup, maxSourceResolution, enablePartialResponse),
r.FormValue("query"),
start,
end,
Expand All @@ -418,7 +396,7 @@ func (api *API) queryRange(r *http.Request) (interface{}, []error, *ApiError) {
return &queryData{
ResultType: res.Value.Type(),
Result: res.Value,
}, warnings, nil
}, res.Warnings, nil
}

func (api *API) labelValues(r *http.Request) (interface{}, []error, *ApiError) {
Expand All @@ -434,25 +412,15 @@ func (api *API) labelValues(r *http.Request) (interface{}, []error, *ApiError) {
return nil, nil, apiErr
}

var (
warnmtx sync.Mutex
warnings []error
)
warningReporter := func(err error) {
warnmtx.Lock()
warnings = append(warnings, err)
warnmtx.Unlock()
}

q, err := api.queryableCreate(true, 0, enablePartialResponse, warningReporter).Querier(ctx, math.MinInt64, math.MaxInt64)
q, err := api.queryableCreate(true, 0, enablePartialResponse).Querier(ctx, math.MinInt64, math.MaxInt64)
if err != nil {
return nil, nil, &ApiError{errorExec, err}
}
defer runutil.CloseWithLogOnErr(api.logger, q, "queryable labelValues")

// TODO(fabxc): add back request context.

vals, err := q.LabelValues(name)
vals, warnings, err := q.LabelValues(name)
if err != nil {
return nil, nil, &ApiError{errorExec, err}
}
Expand Down Expand Up @@ -515,42 +483,34 @@ func (api *API) series(r *http.Request) (interface{}, []error, *ApiError) {
return nil, nil, apiErr
}

var (
warnmtx sync.Mutex
warnings []error
)
warningReporter := func(err error) {
warnmtx.Lock()
warnings = append(warnings, err)
warnmtx.Unlock()
}

// TODO(bwplotka): Support downsampling?
q, err := api.queryableCreate(enableDedup, 0, enablePartialResponse, warningReporter).Querier(r.Context(), timestamp.FromTime(start), timestamp.FromTime(end))
q, err := api.queryableCreate(enableDedup, 0, enablePartialResponse).Querier(r.Context(), timestamp.FromTime(start), timestamp.FromTime(end))
if err != nil {
return nil, nil, &ApiError{errorExec, err}
}
defer runutil.CloseWithLogOnErr(api.logger, q, "queryable series")

var sets []storage.SeriesSet
var (
warnings []error
metrics = []labels.Labels{}
sets []storage.SeriesSet
)
for _, mset := range matcherSets {
s, _, err := q.Select(&storage.SelectParams{}, mset...)
s, warns, err := q.Select(&storage.SelectParams{}, mset...)
if err != nil {
return nil, nil, &ApiError{errorExec, err}
}
warnings = append(warnings, warns...)
sets = append(sets, s)
}

set := storage.NewMergeSeriesSet(sets, nil)

metrics := []labels.Labels{}
for set.Next() {
metrics = append(metrics, set.At().Labels())
}
if set.Err() != nil {
return nil, nil, &ApiError{errorExec, set.Err()}
}

return metrics, warnings, nil
}

Expand Down Expand Up @@ -627,23 +587,13 @@ func (api *API) labelNames(r *http.Request) (interface{}, []error, *ApiError) {
return nil, nil, apiErr
}

var (
warnmtx sync.Mutex
warnings []error
)
warningReporter := func(err error) {
warnmtx.Lock()
warnings = append(warnings, err)
warnmtx.Unlock()
}

q, err := api.queryableCreate(true, 0, enablePartialResponse, warningReporter).Querier(ctx, math.MinInt64, math.MaxInt64)
q, err := api.queryableCreate(true, 0, enablePartialResponse).Querier(ctx, math.MinInt64, math.MaxInt64)
if err != nil {
return nil, nil, &ApiError{errorExec, err}
}
defer runutil.CloseWithLogOnErr(api.logger, q, "queryable labelNames")

names, err := q.LabelNames()
names, warnings, err := q.LabelNames()
if err != nil {
return nil, nil, &ApiError{errorExec, err}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/query/api/v1_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ import (
)

func testQueryableCreator(queryable storage.Queryable) query.QueryableCreator {
return func(_ bool, _ int64, _ bool, _ query.WarningReporter) storage.Queryable {
return func(_ bool, _ int64, _ bool) storage.Queryable {
return queryable
}
}
Expand Down
48 changes: 17 additions & 31 deletions pkg/query/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,30 +13,22 @@ import (
"github.com/thanos-io/thanos/pkg/tracing"
)

// WarningReporter allows to report warnings to frontend layer.
//
// Warning can include partial errors `partialResponse` is enabled. It occurs when only part of the results are ready and
// another is not available because of the failure.
// It is required to be thread-safe.
type WarningReporter func(error)

// QueryableCreator returns implementation of promql.Queryable that fetches data from the proxy store API endpoints.
// If deduplication is enabled, all data retrieved from it will be deduplicated along the replicaLabel by default.
// maxResolutionMillis controls downsampling resolution that is allowed (specified in milliseconds).
// partialResponse controls `partialResponseDisabled` option of StoreAPI and partial response behaviour of proxy.
type QueryableCreator func(deduplicate bool, maxResolutionMillis int64, partialResponse bool, r WarningReporter) storage.Queryable
type QueryableCreator func(deduplicate bool, maxResolutionMillis int64, partialResponse bool) storage.Queryable

// NewQueryableCreator creates QueryableCreator.
func NewQueryableCreator(logger log.Logger, proxy storepb.StoreServer, replicaLabel string) QueryableCreator {
return func(deduplicate bool, maxResolutionMillis int64, partialResponse bool, r WarningReporter) storage.Queryable {
return func(deduplicate bool, maxResolutionMillis int64, partialResponse bool) storage.Queryable {
return &queryable{
logger: logger,
replicaLabel: replicaLabel,
proxy: proxy,
deduplicate: deduplicate,
maxResolutionMillis: maxResolutionMillis,
partialResponse: partialResponse,
warningReporter: r,
}
}
}
Expand All @@ -48,12 +40,11 @@ type queryable struct {
deduplicate bool
maxResolutionMillis int64
partialResponse bool
warningReporter WarningReporter
}

// Querier returns a new storage querier against the underlying proxy store API.
func (q *queryable) Querier(ctx context.Context, mint, maxt int64) (storage.Querier, error) {
return newQuerier(ctx, q.logger, mint, maxt, q.replicaLabel, q.proxy, q.deduplicate, int64(q.maxResolutionMillis), q.partialResponse, q.warningReporter), nil
return newQuerier(ctx, q.logger, mint, maxt, q.replicaLabel, q.proxy, q.deduplicate, int64(q.maxResolutionMillis), q.partialResponse), nil
}

type querier struct {
Expand All @@ -66,7 +57,6 @@ type querier struct {
deduplicate bool
maxResolutionMillis int64
partialResponse bool
warningReporter WarningReporter
}

// newQuerier creates implementation of storage.Querier that fetches data from the proxy
Expand All @@ -80,14 +70,10 @@ func newQuerier(
deduplicate bool,
maxResolutionMillis int64,
partialResponse bool,
warningReporter WarningReporter,
) *querier {
if logger == nil {
logger = log.NewNopLogger()
}
if warningReporter == nil {
warningReporter = func(error) {}
}
ctx, cancel := context.WithCancel(ctx)
return &querier{
ctx: ctx,
Expand All @@ -100,7 +86,6 @@ func newQuerier(
deduplicate: deduplicate,
maxResolutionMillis: maxResolutionMillis,
partialResponse: partialResponse,
warningReporter: warningReporter,
}
}

Expand Down Expand Up @@ -191,10 +176,9 @@ func (q *querier) Select(params *storage.SelectParams, ms ...*labels.Matcher) (s
return nil, nil, errors.Wrap(err, "proxy Series()")
}

var warns storage.Warnings
for _, w := range resp.warnings {
// NOTE(bwplotka): We could use warnings return arguments here, however need reporter anyway for LabelValues and LabelNames method,
// so we choose to be consistent and keep reporter.
q.warningReporter(errors.New(w))
warns = append(warns, errors.New(w))
}

if !q.isDedupEnabled() {
Expand All @@ -204,7 +188,7 @@ func (q *querier) Select(params *storage.SelectParams, ms ...*labels.Matcher) (s
maxt: q.maxt,
set: newStoreSeriesSet(resp.seriesSet),
aggr: resAggr,
}, nil, nil
}, warns, nil
}

// TODO(fabxc): this could potentially pushed further down into the store API
Expand All @@ -221,7 +205,7 @@ func (q *querier) Select(params *storage.SelectParams, ms ...*labels.Matcher) (s
// The merged series set assembles all potentially-overlapping time ranges
// of the same series into a single one. The series are ordered so that equal series
// from different replicas are sequential. We can now deduplicate those.
return newDedupSeriesSet(set, q.replicaLabel), nil, nil
return newDedupSeriesSet(set, q.replicaLabel), warns, nil
}

// sortDedupLabels resorts the set so that the same series with different replica
Expand All @@ -247,37 +231,39 @@ func sortDedupLabels(set []storepb.Series, replicaLabel string) {
}

// LabelValues returns all potential values for a label name.
func (q *querier) LabelValues(name string) ([]string, error) {
func (q *querier) LabelValues(name string) ([]string, storage.Warnings, error) {
span, ctx := tracing.StartSpan(q.ctx, "querier_label_values")
defer span.Finish()

resp, err := q.proxy.LabelValues(ctx, &storepb.LabelValuesRequest{Label: name, PartialResponseDisabled: !q.partialResponse})
if err != nil {
return nil, errors.Wrap(err, "proxy LabelValues()")
return nil, nil, errors.Wrap(err, "proxy LabelValues()")
}

var warns storage.Warnings
for _, w := range resp.Warnings {
q.warningReporter(errors.New(w))
warns = append(warns, errors.New(w))
}

return resp.Values, nil
return resp.Values, warns, nil
}

// LabelNames returns all the unique label names present in the block in sorted order.
func (q *querier) LabelNames() ([]string, error) {
func (q *querier) LabelNames() ([]string, storage.Warnings, error) {
span, ctx := tracing.StartSpan(q.ctx, "querier_label_names")
defer span.Finish()

resp, err := q.proxy.LabelNames(ctx, &storepb.LabelNamesRequest{PartialResponseDisabled: !q.partialResponse})
if err != nil {
return nil, errors.Wrap(err, "proxy LabelNames()")
return nil, nil, errors.Wrap(err, "proxy LabelNames()")
}

var warns storage.Warnings
for _, w := range resp.Warnings {
q.warningReporter(errors.New(w))
warns = append(warns, errors.New(w))
}

return resp.Names, nil
return resp.Names, warns, nil
}

func (q *querier) Close() error {
Expand Down
Loading

0 comments on commit 04e5e7d

Please sign in to comment.