Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

receive: Enable exemplars ingestion and querying #4292

Merged
merged 11 commits into from
Jun 17, 2021
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ We use _breaking :warning:_ to mark changes that are not backward compatible (re
- [#4299](https://github.com/thanos-io/thanos/pull/4299) Tracing: Add tracing to exemplar APIs.
- [#4327](https://github.com/thanos-io/thanos/pull/4327) Add environment variable substitution to all YAML configuration flags.
- [#4239](https://github.com/thanos-io/thanos/pull/4239) Add penalty based deduplication mode for compactor.
- [#4292](https://github.com/thanos-io/thanos/pull/4292) Receive: Enable exemplars ingestion and querying.

### Fixed

Expand Down
10 changes: 10 additions & 0 deletions cmd/thanos/receive.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/prometheus/prometheus/tsdb"

"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/exemplars"
"github.com/thanos-io/thanos/pkg/extkingpin"
"github.com/thanos-io/thanos/pkg/logging"

Expand Down Expand Up @@ -70,6 +71,7 @@ func registerReceive(app *extkingpin.App) {
NoLockfile: conf.noLockFile,
WALCompression: conf.walCompression,
AllowOverlappingBlocks: conf.tsdbAllowOverlappingBlocks,
MaxExemplars: conf.tsdbMaxExemplars,
}

// Enable ingestion if endpoint is specified or if both the hashrings configs are empty.
Expand Down Expand Up @@ -319,6 +321,7 @@ func setupAndRunGRPCServer(g *run.Group,
s = grpcserver.New(logger, &receive.UnRegisterer{Registerer: reg}, tracer, grpcLogOpts, tagOpts, comp, grpcProbe,
grpcserver.WithServer(store.RegisterStoreServer(rw)),
grpcserver.WithServer(store.RegisterWritableStoreServer(rw)),
grpcserver.WithServer(exemplars.RegisterExemplarsServer(exemplars.NewMultiTSDB(dbs.TSDBExemplars))),
grpcserver.WithListen(*conf.grpcBindAddr),
grpcserver.WithGracePeriod(time.Duration(*conf.grpcGracePeriod)),
grpcserver.WithTLSConfig(tlsCfg),
Expand Down Expand Up @@ -687,6 +690,7 @@ type receiveConfig struct {
tsdbMinBlockDuration *model.Duration
tsdbMaxBlockDuration *model.Duration
tsdbAllowOverlappingBlocks bool
tsdbMaxExemplars int

walCompression bool
noLockFile bool
Expand Down Expand Up @@ -760,6 +764,12 @@ func (rc *receiveConfig) registerFlag(cmd extkingpin.FlagClause) {

cmd.Flag("tsdb.no-lockfile", "Do not create lockfile in TSDB data directory. In any case, the lockfiles will be deleted on next startup.").Default("false").BoolVar(&rc.noLockFile)

cmd.Flag("tsdb.max-exemplars",
"Enables support for ingesting exemplars and sets the maximum number of exemplars that will be stored per tenant."+
" In case the exemplar storage becomes full (number of stored exemplars becomes equal to max-exemplars),"+
" ingesting a new exemplar will evict the oldest exemplar from storage. 0 (or less) value of this flag disables exemplars storage.").
Default("0").IntVar(&rc.tsdbMaxExemplars)

cmd.Flag("hash-func", "Specify which hash function to use when calculating the hashes of produced files. If no function has been specified, it does not happen. This permits avoiding downloading some files twice albeit at some performance cost. Possible values are: \"\", \"SHA256\".").
Default("").EnumVar(&rc.hashFunc, "SHA256", "")

Expand Down
12 changes: 11 additions & 1 deletion docs/components/receive.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,14 @@ We recommend this component to users who can only push into a Thanos due to air-

Thanos Receive supports multi-tenancy by using labels. See [Multitenancy documentation here](../operating/multi-tenancy.md).

Thanos Receive supports ingesting [exemplars](https://github.com/OpenObservability/OpenMetrics/blob/main/specification/OpenMetrics.md#exemplars) via remote-write. By default, the exemplars are silently discarded as `--tsdb.max-exemplars` is set to `0`. To enable exemplars storage, set the `--tsdb.max-exemplars` flag to a non-zero value. It exposes the ExemplarsAPI so that the [Thanos Queriers](./query.md) can query the stored exemplars. Take a look at the documentation for [exemplars storage in Prometheus](https://prometheus.io/docs/prometheus/latest/disabled_features/#exemplars-storage) to know more about it.

For more information please check out [initial design proposal](../proposals/201812_thanos-remote-receive.md).
For further information on tuning Prometheus Remote Write [see remote write tuning document](https://prometheus.io/docs/practices/remote_write/).

> NOTE: As the block producer it's important to set correct "external labels" that will identify data block across Thanos clusters. See [external labels](../storage.md#external-labels) docs for details.

# Example
## Example

```bash
thanos receive \
Expand Down Expand Up @@ -196,6 +198,14 @@ Flags:
--tsdb.allow-overlapping-blocks
Allow overlapping blocks, which in turn enables
vertical compaction and vertical query merge.
--tsdb.max-exemplars=0 Enables support for ingesting exemplars and
sets the maximum number of exemplars that will
be stored per tenant. In case the exemplar
storage becomes full (number of stored
exemplars becomes equal to max-exemplars),
ingesting a new exemplar will evict the oldest
exemplar from storage. 0 (or less) value of
this flag disables exemplars storage.
--tsdb.no-lockfile Do not create lockfile in TSDB data directory.
In any case, the lockfiles will be deleted on
next startup.
Expand Down
13 changes: 13 additions & 0 deletions pkg/exemplars/exemplarspb/custom.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"math/big"

"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/pkg/exemplar"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/thanos-io/thanos/pkg/store/labelpb"
)
Expand Down Expand Up @@ -95,3 +96,15 @@ func (e1 *Exemplar) Compare(e2 *Exemplar) int {
}
return big.NewFloat(e1.Value).Cmp(big.NewFloat(e2.Value))
}

func ExemplarsFromPromExemplars(exemplars []exemplar.Exemplar) []*Exemplar {
ex := make([]*Exemplar, 0, len(exemplars))
for _, e := range exemplars {
ex = append(ex, &Exemplar{
Labels: labelpb.ZLabelSet{Labels: labelpb.ZLabelsFromPromLabels(e.Labels)},
Value: e.Value,
Ts: e.Ts,
})
}
return ex
}
41 changes: 41 additions & 0 deletions pkg/exemplars/multitsdb.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
// Copyright (c) The Thanos Authors.
// Licensed under the Apache License 2.0.

package exemplars

import (
"github.com/pkg/errors"
"github.com/prometheus/prometheus/promql/parser"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"github.com/thanos-io/thanos/pkg/exemplars/exemplarspb"
)

// MultiTSDB implements exemplarspb.ExemplarsServer that allows to fetch exemplars a MultiTSDB instance.
type MultiTSDB struct {
tsdbExemplarsServers func() map[string]*TSDB
}

// NewMultiTSDB creates new exemplars.MultiTSDB.
func NewMultiTSDB(tsdbExemplarsServers func() map[string]*TSDB) *MultiTSDB {
return &MultiTSDB{
tsdbExemplarsServers: tsdbExemplarsServers,
}
}

// Exemplars returns all specified exemplars from a MultiTSDB instance.
func (m *MultiTSDB) Exemplars(r *exemplarspb.ExemplarsRequest, s exemplarspb.Exemplars_ExemplarsServer) error {
expr, err := parser.ParseExpr(r.Query)
if err != nil {
return status.Error(codes.Internal, err.Error())
}
matchers := parser.ExtractSelectors(expr)

for tenant, es := range m.tsdbExemplarsServers() {
onprem marked this conversation as resolved.
Show resolved Hide resolved
if err := es.Exemplars(matchers, r.Start, r.End, s); err != nil {
return status.Error(codes.Aborted, errors.Wrapf(err, "get exemplars for tenant %s", tenant).Error())
}
}
return nil
}
17 changes: 7 additions & 10 deletions pkg/exemplars/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,20 +61,17 @@ func (s *Proxy) Exemplars(req *exemplarspb.ExemplarsRequest, srv exemplarspb.Exe
return err
}

selectors := parser.ExtractSelectors(expr)
match, selectors := selectorsMatchesExternalLabels(parser.ExtractSelectors(expr), s.selectorLabels)

newSelectors := make([][]*labels.Matcher, 0, len(selectors))
for _, matchers := range selectors {
matched, newMatchers := matchesExternalLabels(matchers, s.selectorLabels)
if matched {
newSelectors = append(newSelectors, newMatchers)
}
}
// There is no matched selectors for this thanos query.
if len(newSelectors) == 0 {
if !match {
return nil
}

if len(selectors) == 0 {
return status.Error(codes.InvalidArgument, errors.New("no matchers specified (excluding external labels)").Error())
}

var (
g, gctx = errgroup.WithContext(ctx)
respChan = make(chan *exemplarspb.ExemplarData, 10)
Expand All @@ -84,7 +81,7 @@ func (s *Proxy) Exemplars(req *exemplarspb.ExemplarsRequest, srv exemplarspb.Exe
for _, st := range s.exemplars() {
query := ""
Matchers:
for _, matchers := range newSelectors {
for _, matchers := range selectors {
metricsSelector := ""
for _, m := range matchers {
for _, ls := range st.LabelSets {
Expand Down
83 changes: 83 additions & 0 deletions pkg/exemplars/tsdb.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
// Copyright (c) The Thanos Authors.
// Licensed under the Apache License 2.0.

package exemplars

import (
"github.com/gogo/status"
"github.com/pkg/errors"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/storage"
"google.golang.org/grpc/codes"

"github.com/thanos-io/thanos/pkg/exemplars/exemplarspb"
"github.com/thanos-io/thanos/pkg/store/labelpb"
)

// TSDB allows fetching exemplars from a TSDB instance.
type TSDB struct {
db storage.ExemplarQueryable
extLabels labels.Labels
}

// NewTSDB creates new exemplars.TSDB.
func NewTSDB(db storage.ExemplarQueryable, extLabels labels.Labels) *TSDB {
return &TSDB{
db: db,
extLabels: extLabels,
}
}

// Exemplars returns all specified exemplars from a TSDB instance.
func (t *TSDB) Exemplars(matchers [][]*labels.Matcher, start, end int64, s exemplarspb.Exemplars_ExemplarsServer) error {
match, selectors := selectorsMatchesExternalLabels(matchers, t.extLabels)

if !match {
return nil
}

if len(selectors) == 0 {
return status.Error(codes.InvalidArgument, errors.New("no matchers specified (excluding external labels)").Error())
}
onprem marked this conversation as resolved.
Show resolved Hide resolved

eq, err := t.db.ExemplarQuerier(s.Context())
if err != nil {
return status.Error(codes.Internal, err.Error())
}

exemplars, err := eq.Select(start, end, selectors...)
if err != nil {
return status.Error(codes.Internal, err.Error())
}
onprem marked this conversation as resolved.
Show resolved Hide resolved

for _, e := range exemplars {
exd := exemplarspb.ExemplarData{
SeriesLabels: labelpb.ZLabelSet{
Labels: labelpb.ZLabelsFromPromLabels(labelpb.ExtendSortedLabels(e.SeriesLabels, t.extLabels)),
},
Exemplars: exemplarspb.ExemplarsFromPromExemplars(e.Exemplars),
}
if err := s.Send(exemplarspb.NewExemplarsResponse(&exd)); err != nil {
return status.Error(codes.Aborted, err.Error())
}
}
return nil
}

// selectorsMatchesExternalLabels returns false if none of the selectors matches the external labels.
// If true, it also returns an array of non-empty Prometheus matchers.
func selectorsMatchesExternalLabels(selectors [][]*labels.Matcher, externalLabels labels.Labels) (bool, [][]*labels.Matcher) {
matchedOnce := false

var newSelectors [][]*labels.Matcher
for _, m := range selectors {
match, m := matchesExternalLabels(m, externalLabels)

matchedOnce = matchedOnce || match
if match && len(m) > 0 {
newSelectors = append(newSelectors, m)
}
}

return matchedOnce, newSelectors
}
95 changes: 95 additions & 0 deletions pkg/exemplars/tsdb_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
// Copyright (c) The Thanos Authors.
// Licensed under the Apache License 2.0.

package exemplars

import (
"testing"

"github.com/prometheus/prometheus/pkg/labels"
"github.com/thanos-io/thanos/pkg/testutil"
)

func TestSelectorsMatchExternalLabels(t *testing.T) {
t.Parallel()

tests := map[string]struct {
selectors [][]*labels.Matcher
extLabels labels.Labels
shouldMatch bool
expectedSelectors [][]*labels.Matcher
}{
"should return true for matching labels": {
selectors: [][]*labels.Matcher{
{
labels.MustNewMatcher(labels.MatchEqual, "code", "200"),
labels.MustNewMatcher(labels.MatchEqual, "receive", "true"),
},
},
extLabels: labels.FromStrings("receive", "true"),
shouldMatch: true,
expectedSelectors: [][]*labels.Matcher{
{
labels.MustNewMatcher(labels.MatchEqual, "code", "200"),
},
},
},
"should return true when the external labels are not present in input at all": {
selectors: [][]*labels.Matcher{
{
labels.MustNewMatcher(labels.MatchEqual, "code", "200"),
},
},
extLabels: labels.FromStrings("receive", "true"),
shouldMatch: true,
expectedSelectors: [][]*labels.Matcher{
{
labels.MustNewMatcher(labels.MatchEqual, "code", "200"),
},
},
},
"should return true when only some of matchers slice are matching": {
selectors: [][]*labels.Matcher{
{
labels.MustNewMatcher(labels.MatchEqual, "code", "200"),
labels.MustNewMatcher(labels.MatchRegexp, "receive", "false"),
},
{
labels.MustNewMatcher(labels.MatchEqual, "code", "500"),
labels.MustNewMatcher(labels.MatchRegexp, "receive", "true"),
},
},
extLabels: labels.FromStrings("receive", "true", "replica", "0"),
shouldMatch: true,
expectedSelectors: [][]*labels.Matcher{
{
labels.MustNewMatcher(labels.MatchEqual, "code", "500"),
},
},
},
"should return false when the external labels are not matching": {
selectors: [][]*labels.Matcher{
{
labels.MustNewMatcher(labels.MatchEqual, "code", "200"),
labels.MustNewMatcher(labels.MatchNotEqual, "replica", "1"),
},
{
labels.MustNewMatcher(labels.MatchEqual, "op", "get"),
labels.MustNewMatcher(labels.MatchEqual, "replica", "0"),
},
},
extLabels: labels.FromStrings("replica", "1"),
shouldMatch: false,
expectedSelectors: nil,
},
}

for name, tdata := range tests {
t.Run(name, func(t *testing.T) {
match, selectors := selectorsMatchesExternalLabels(tdata.selectors, tdata.extLabels)
testutil.Equals(t, tdata.shouldMatch, match)

testutil.Equals(t, tdata.expectedSelectors, selectors)
})
}
}
2 changes: 1 addition & 1 deletion pkg/receive/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,7 @@ func (h *Handler) receiveHTTP(w http.ResponseWriter, r *http.Request) {
tenant = h.options.DefaultTenantID
}

// TODO(yeya24): handle remote write metadata and exemplars.
// TODO(yeya24): handle remote write metadata.
// exit early if the request contained no data
if len(wreq.Timeseries) == 0 {
yeya24 marked this conversation as resolved.
Show resolved Hide resolved
level.Debug(h.logger).Log("msg", "empty timeseries from client", "tenant", tenant)
Expand Down
Loading