Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[thanos] Changes to address querying multi cluster reads using thanos query #7781

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,8 @@ build: check-git deps $(PROMU)
@$(PROMU) build --prefix $(PREFIX)

GIT_BRANCH=$(shell $(GIT) rev-parse --abbrev-ref HEAD)
GIT_REVISION := $(shell git rev-parse --short HEAD)
IMAGE_TAG ?= $(subst /,-,$(GIT_BRANCH))-$(GIT_REVISION)
.PHONY: crossbuild
crossbuild: ## Builds all binaries for all platforms.
ifeq ($(GIT_BRANCH), main)
Expand Down Expand Up @@ -197,7 +199,7 @@ docker: build
@echo ">> copying Thanos from $(PREFIX) to ./thanos_tmp_for_docker"
@cp $(PREFIX)/thanos ./thanos_tmp_for_docker
@echo ">> building docker image 'thanos'"
@docker build -t "thanos" --build-arg BASE_DOCKER_SHA=$(BASE_DOCKER_SHA) .
@docker build -t "thanos" --build-arg BASE_DOCKER_SHA=$(BASE_DOCKER_SHA) -t thanos:$(IMAGE_TAG) .
@rm ./thanos_tmp_for_docker
else
docker: docker-multi-stage
Expand All @@ -207,7 +209,7 @@ endif
docker-multi-stage: ## Builds 'thanos' docker image using multi-stage.
docker-multi-stage:
@echo ">> building docker image 'thanos' with Dockerfile.multi-stage"
@docker build -f Dockerfile.multi-stage -t "thanos" --build-arg BASE_DOCKER_SHA=$(BASE_DOCKER_SHA) .
@docker build -f Dockerfile.multi-stage -t "thanos" -t thanos:$(IMAGE_TAG) --build-arg BASE_DOCKER_SHA=$(BASE_DOCKER_SHA) .

# docker-build builds docker images with multiple architectures.
.PHONY: docker-build $(BUILD_DOCKER_ARCHS)
Expand Down
3 changes: 3 additions & 0 deletions cmd/thanos/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (

"github.com/thanos-io/thanos/pkg/extkingpin"
"github.com/thanos-io/thanos/pkg/shipper"
"github.com/thanos-io/thanos/pkg/tenancy"
)

type grpcConfig struct {
Expand Down Expand Up @@ -80,12 +81,14 @@ type prometheusConfig struct {
getConfigInterval time.Duration
getConfigTimeout time.Duration
httpClient *extflag.PathOrContent
tenantHeader string
}

func (pc *prometheusConfig) registerFlag(cmd extkingpin.FlagClause) *prometheusConfig {
cmd.Flag("prometheus.url",
"URL at which to reach Prometheus's API. For better performance use local network.").
Default("http://localhost:9090").URLVar(&pc.url)
cmd.Flag("prometheus.tenant-header", "HTTP header to determine tenant.").Default(tenancy.DefaultTenantHeader).StringVar(&pc.tenantHeader)
cmd.Flag("prometheus.ready_timeout",
"Maximum time to wait for the Prometheus instance to start up").
Default("10m").DurationVar(&pc.readyTimeout)
Expand Down
2 changes: 1 addition & 1 deletion cmd/thanos/sidecar.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ func runSidecar(
{
c := promclient.NewWithTracingClient(logger, httpClient, clientconfig.ThanosUserAgent)

promStore, err := store.NewPrometheusStore(logger, reg, c, conf.prometheus.url, component.Sidecar, m.Labels, m.Timestamps, m.Version)
promStore, err := store.NewPrometheusStore(logger, reg, c, conf.prometheus.url, component.Sidecar, m.Labels, m.Timestamps, m.Version, conf.prometheus.tenantHeader)
if err != nil {
return errors.Wrap(err, "create Prometheus store")
}
Expand Down
3 changes: 3 additions & 0 deletions docs/components/sidecar.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ prometheus \
```bash
thanos sidecar \
--tsdb.path "/path/to/prometheus/data/dir" \
--prometheus.tenant-header="THANOS-TENANT" \
--prometheus.url "http://localhost:9090" \
--objstore.config-file "bucket.yml"
```
Expand Down Expand Up @@ -170,6 +171,8 @@ Flags:
--prometheus.ready_timeout=10m
Maximum time to wait for the Prometheus
instance to start up
--prometheus.tenant-header="THANOS-TENANT"
HTTP header to determine tenant.
--prometheus.url=http://localhost:9090
URL at which to reach Prometheus's API.
For better performance use local network.
Expand Down
47 changes: 31 additions & 16 deletions pkg/promclient/promclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"github.com/thanos-io/thanos/pkg/runutil"
"github.com/thanos-io/thanos/pkg/store/storepb"
"github.com/thanos-io/thanos/pkg/targets/targetspb"
"github.com/thanos-io/thanos/pkg/tenancy"
"github.com/thanos-io/thanos/pkg/tracing"
)

Expand Down Expand Up @@ -107,6 +108,7 @@ func NewWithTracingClient(logger log.Logger, httpClient *http.Client, userAgent
// the raw query is encoded in the body and the appropriate Content-Type is set.
func (c *Client) req2xx(ctx context.Context, u *url.URL, method string, headers http.Header) (_ []byte, _ int, err error) {
var b io.Reader

if method == http.MethodPost {
rq := u.RawQuery
b = strings.NewReader(rq)
Expand Down Expand Up @@ -691,11 +693,17 @@ func formatTime(t time.Time) string {
return strconv.FormatFloat(float64(t.Unix())+float64(t.Nanosecond())/1e9, 'f', -1, 64)
}

func (c *Client) get2xxResultWithGRPCErrors(ctx context.Context, spanName string, u *url.URL, data interface{}) error {
func (c *Client) get2xxResultWithGRPCErrors(ctx context.Context, spanName string, u *url.URL, data interface{}, tenantHeader string) error {
span, ctx := tracing.StartSpan(ctx, spanName)
defer span.Finish()

body, code, err := c.req2xx(ctx, u, http.MethodGet, nil)
var customheader http.Header = nil
if len(tenantHeader) > 0 {
customheader = http.Header{}
customheader.Set(tenantHeader, tenancy.GetTenantFromProvidedHeader(ctx, tenantHeader))
}

body, code, err := c.req2xx(ctx, u, http.MethodGet, customheader)
if err != nil {
if code, exists := statusToCode[code]; exists && code != 0 {
return status.Error(code, err.Error())
Expand Down Expand Up @@ -734,27 +742,29 @@ func (c *Client) get2xxResultWithGRPCErrors(ctx context.Context, spanName string

// SeriesInGRPC returns the labels from Prometheus series API. It uses gRPC errors.
// NOTE: This method is tested in pkg/store/prometheus_test.go against Prometheus.
func (c *Client) SeriesInGRPC(ctx context.Context, base *url.URL, matchers []*labels.Matcher, startTime, endTime int64, limit int) ([]map[string]string, error) {
func (c *Client) SeriesInGRPC(ctx context.Context, base *url.URL, matchers []*labels.Matcher, startTime, endTime int64, limit int, tenantHeader string) ([]map[string]string, error) {
u := *base
u.Path = path.Join(u.Path, "/api/v1/series")
q := u.Query()

q.Add("match[]", storepb.PromMatchersToString(matchers...))
q.Add("start", formatTime(timestamp.Time(startTime)))
q.Add("end", formatTime(timestamp.Time(endTime)))
q.Add("limit", strconv.Itoa(limit))
if limit > 0 {
q.Add("limit", strconv.Itoa(limit))
}
u.RawQuery = q.Encode()

var m struct {
Data []map[string]string `json:"data"`
}

return m.Data, c.get2xxResultWithGRPCErrors(ctx, "/prom_series HTTP[client]", &u, &m)
return m.Data, c.get2xxResultWithGRPCErrors(ctx, "/prom_series HTTP[client]", &u, &m, tenantHeader)
}

// LabelNamesInGRPC returns all known label names constrained by the given matchers. It uses gRPC errors.
// NOTE: This method is tested in pkg/store/prometheus_test.go against Prometheus.
func (c *Client) LabelNamesInGRPC(ctx context.Context, base *url.URL, matchers []*labels.Matcher, startTime, endTime int64, limit int) ([]string, error) {
func (c *Client) LabelNamesInGRPC(ctx context.Context, base *url.URL, matchers []*labels.Matcher, startTime, endTime int64, limit int, tenantHeader string) ([]string, error) {
u := *base
u.Path = path.Join(u.Path, "/api/v1/labels")
q := u.Query()
Expand All @@ -764,18 +774,20 @@ func (c *Client) LabelNamesInGRPC(ctx context.Context, base *url.URL, matchers [
}
q.Add("start", formatTime(timestamp.Time(startTime)))
q.Add("end", formatTime(timestamp.Time(endTime)))
q.Add("limit", strconv.Itoa(limit))
if limit > 0 {
q.Add("limit", strconv.Itoa(limit))
}
u.RawQuery = q.Encode()

var m struct {
Data []string `json:"data"`
}
return m.Data, c.get2xxResultWithGRPCErrors(ctx, "/prom_label_names HTTP[client]", &u, &m)
return m.Data, c.get2xxResultWithGRPCErrors(ctx, "/prom_label_names HTTP[client]", &u, &m, tenantHeader)
}

// LabelValuesInGRPC returns all known label values for a given label name. It uses gRPC errors.
// NOTE: This method is tested in pkg/store/prometheus_test.go against Prometheus.
func (c *Client) LabelValuesInGRPC(ctx context.Context, base *url.URL, label string, matchers []*labels.Matcher, startTime, endTime int64, limit int) ([]string, error) {
func (c *Client) LabelValuesInGRPC(ctx context.Context, base *url.URL, label string, matchers []*labels.Matcher, startTime, endTime int64, limit int, tenantHeader string) ([]string, error) {
u := *base
u.Path = path.Join(u.Path, "/api/v1/label/", label, "/values")
q := u.Query()
Expand All @@ -785,13 +797,16 @@ func (c *Client) LabelValuesInGRPC(ctx context.Context, base *url.URL, label str
}
q.Add("start", formatTime(timestamp.Time(startTime)))
q.Add("end", formatTime(timestamp.Time(endTime)))
q.Add("limit", strconv.Itoa(limit))
if limit > 0 {
q.Add("limit", strconv.Itoa(limit))
}

u.RawQuery = q.Encode()

var m struct {
Data []string `json:"data"`
}
return m.Data, c.get2xxResultWithGRPCErrors(ctx, "/prom_label_values HTTP[client]", &u, &m)
return m.Data, c.get2xxResultWithGRPCErrors(ctx, "/prom_label_values HTTP[client]", &u, &m, tenantHeader)
}

// RulesInGRPC returns the rules from Prometheus rules API. It uses gRPC errors.
Expand All @@ -810,7 +825,7 @@ func (c *Client) RulesInGRPC(ctx context.Context, base *url.URL, typeRules strin
Data *rulespb.RuleGroups `json:"data"`
}

if err := c.get2xxResultWithGRPCErrors(ctx, "/prom_rules HTTP[client]", &u, &m); err != nil {
if err := c.get2xxResultWithGRPCErrors(ctx, "/prom_rules HTTP[client]", &u, &m, ""); err != nil {
return nil, err
}

Expand All @@ -833,7 +848,7 @@ func (c *Client) AlertsInGRPC(ctx context.Context, base *url.URL) ([]*rulespb.Al
} `json:"data"`
}

if err := c.get2xxResultWithGRPCErrors(ctx, "/prom_alerts HTTP[client]", &u, &m); err != nil {
if err := c.get2xxResultWithGRPCErrors(ctx, "/prom_alerts HTTP[client]", &u, &m, ""); err != nil {
return nil, err
}

Expand Down Expand Up @@ -863,7 +878,7 @@ func (c *Client) MetricMetadataInGRPC(ctx context.Context, base *url.URL, metric
var v struct {
Data map[string][]*metadatapb.Meta `json:"data"`
}
return v.Data, c.get2xxResultWithGRPCErrors(ctx, "/prom_metric_metadata HTTP[client]", &u, &v)
return v.Data, c.get2xxResultWithGRPCErrors(ctx, "/prom_metric_metadata HTTP[client]", &u, &v, "")
}

// ExemplarsInGRPC returns the exemplars from Prometheus exemplars API. It uses gRPC errors.
Expand All @@ -882,7 +897,7 @@ func (c *Client) ExemplarsInGRPC(ctx context.Context, base *url.URL, query strin
Data []*exemplarspb.ExemplarData `json:"data"`
}

if err := c.get2xxResultWithGRPCErrors(ctx, "/prom_exemplars HTTP[client]", &u, &m); err != nil {
if err := c.get2xxResultWithGRPCErrors(ctx, "/prom_exemplars HTTP[client]", &u, &m, ""); err != nil {
return nil, err
}

Expand All @@ -902,5 +917,5 @@ func (c *Client) TargetsInGRPC(ctx context.Context, base *url.URL, stateTargets
var v struct {
Data *targetspb.TargetDiscovery `json:"data"`
}
return v.Data, c.get2xxResultWithGRPCErrors(ctx, "/prom_targets HTTP[client]", &u, &v)
return v.Data, c.get2xxResultWithGRPCErrors(ctx, "/prom_targets HTTP[client]", &u, &v, "")
}
2 changes: 1 addition & 1 deletion pkg/store/acceptance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -997,7 +997,7 @@ func TestPrometheusStore_Acceptance(t *testing.T) {
promStore, err := NewPrometheusStore(nil, nil, promclient.NewDefaultClient(), u, component.Sidecar,
func() labels.Labels { return extLset },
func() (int64, int64) { return timestamp.FromTime(minTime), timestamp.FromTime(maxTime) },
func() string { return version })
func() string { return version }, "")
testutil.Ok(tt, err)

// We build chunks only for SAMPLES method. Make sure we ask for SAMPLES only.
Expand Down
21 changes: 14 additions & 7 deletions pkg/store/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
"github.com/thanos-io/thanos/pkg/store/labelpb"
"github.com/thanos-io/thanos/pkg/store/storepb"
"github.com/thanos-io/thanos/pkg/store/storepb/prompb"
"github.com/thanos-io/thanos/pkg/tenancy"
"github.com/thanos-io/thanos/pkg/tracing"
)

Expand All @@ -61,6 +62,7 @@ type PrometheusStore struct {
framesRead prometheus.Histogram

storepb.UnimplementedStoreServer
tenantHeader string
}

// Label{Values,Names} call with matchers is supported for Prometheus versions >= 2.24.0.
Expand All @@ -81,6 +83,7 @@ func NewPrometheusStore(
externalLabelsFn func() labels.Labels,
timestamps func() (mint int64, maxt int64),
promVersion func() string,
tenantHeader string,
) (*PrometheusStore, error) {
if logger == nil {
logger = log.NewNopLogger()
Expand All @@ -105,6 +108,7 @@ func NewPrometheusStore(
Buckets: prometheus.ExponentialBuckets(10, 10, 5),
},
),
tenantHeader: tenantHeader,
}
return p, nil
}
Expand Down Expand Up @@ -153,7 +157,7 @@ func (p *PrometheusStore) Series(r *storepb.SeriesRequest, seriesSrv storepb.Sto

if r.SkipChunks {
finalExtLset := rmLabels(extLset.Copy(), extLsetToRemove)
labelMaps, err := p.client.SeriesInGRPC(s.Context(), p.base, matchers, r.MinTime, r.MaxTime, int(r.Limit))
labelMaps, err := p.client.SeriesInGRPC(s.Context(), p.base, matchers, r.MinTime, r.MaxTime, int(r.Limit), p.tenantHeader)
if err != nil {
return err
}
Expand Down Expand Up @@ -471,7 +475,9 @@ func (p *PrometheusStore) startPromRemoteRead(ctx context.Context, q *prompb.Que
preq.Header.Add("Content-Encoding", "snappy")
preq.Header.Set("Content-Type", "application/x-stream-protobuf")
preq.Header.Set("X-Prometheus-Remote-Read-Version", "0.1.0")

if len(p.tenantHeader) > 0 {
preq.Header.Set(p.tenantHeader, tenancy.GetTenantFromProvidedHeader(ctx, p.tenantHeader))
}
preq.Header.Set("User-Agent", clientconfig.ThanosUserAgent)
presp, err = p.client.Do(preq.WithContext(ctx))
if err != nil {
Expand Down Expand Up @@ -551,12 +557,12 @@ func (p *PrometheusStore) LabelNames(ctx context.Context, r *storepb.LabelNamesR

var lbls []string
if len(matchers) == 0 || p.labelCallsSupportMatchers() {
lbls, err = p.client.LabelNamesInGRPC(ctx, p.base, matchers, r.Start, r.End, int(r.Limit))
lbls, err = p.client.LabelNamesInGRPC(ctx, p.base, matchers, r.Start, r.End, int(r.Limit), p.tenantHeader)
if err != nil {
return nil, err
}
} else {
sers, err := p.client.SeriesInGRPC(ctx, p.base, matchers, r.Start, r.End, int(r.Limit))
sers, err := p.client.SeriesInGRPC(ctx, p.base, matchers, r.Start, r.End, int(r.Limit), p.tenantHeader)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -622,7 +628,8 @@ func (p *PrometheusStore) LabelValues(ctx context.Context, r *storepb.LabelValue
if len(matchers) == 0 {
return &storepb.LabelValuesResponse{Values: []string{val}}, nil
}
sers, err = p.client.SeriesInGRPC(ctx, p.base, matchers, r.Start, r.End, int(r.Limit))

sers, err = p.client.SeriesInGRPC(ctx, p.base, matchers, r.Start, r.End, int(r.Limit), p.tenantHeader)
if err != nil {
return nil, err
}
Expand All @@ -633,12 +640,12 @@ func (p *PrometheusStore) LabelValues(ctx context.Context, r *storepb.LabelValue
}

if len(matchers) == 0 || p.labelCallsSupportMatchers() {
vals, err = p.client.LabelValuesInGRPC(ctx, p.base, r.Label, matchers, r.Start, r.End, int(r.Limit))
vals, err = p.client.LabelValuesInGRPC(ctx, p.base, r.Label, matchers, r.Start, r.End, int(r.Limit), p.tenantHeader)
if err != nil {
return nil, err
}
} else {
sers, err = p.client.SeriesInGRPC(ctx, p.base, matchers, r.Start, r.End, int(r.Limit))
sers, err = p.client.SeriesInGRPC(ctx, p.base, matchers, r.Start, r.End, int(r.Limit), p.tenantHeader)
if err != nil {
return nil, err
}
Expand Down
8 changes: 4 additions & 4 deletions pkg/store/prometheus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func testPrometheusStoreSeriesE2e(t *testing.T, prefix string) {
proxy, err := NewPrometheusStore(nil, nil, promclient.NewDefaultClient(), u, component.Sidecar,
func() labels.Labels { return labels.FromStrings("region", "eu-west") },
func() (int64, int64) { return limitMinT, -1 },
nil,
nil, "",
) // MaxTime does not matter.
testutil.Ok(t, err)

Expand Down Expand Up @@ -200,7 +200,7 @@ func TestPrometheusStore_SeriesLabels_e2e(t *testing.T) {
promStore, err := NewPrometheusStore(nil, nil, promclient.NewDefaultClient(), u, component.Sidecar,
func() labels.Labels { return labels.FromStrings("region", "eu-west") },
func() (int64, int64) { return math.MinInt64/1000 + 62135596801, math.MaxInt64/1000 - 62135596801 },
nil,
nil, "",
)
testutil.Ok(t, err)

Expand Down Expand Up @@ -374,7 +374,7 @@ func TestPrometheusStore_Series_MatchExternalLabel(t *testing.T) {
proxy, err := NewPrometheusStore(nil, nil, promclient.NewDefaultClient(), u, component.Sidecar,
func() labels.Labels { return labels.FromStrings("region", "eu-west") },
func() (int64, int64) { return 0, math.MaxInt64 },
nil)
nil, "")
testutil.Ok(t, err)
srv := newStoreSeriesServer(ctx)

Expand Down Expand Up @@ -437,7 +437,7 @@ func TestPrometheusStore_Series_ChunkHashCalculation_Integration(t *testing.T) {
proxy, err := NewPrometheusStore(nil, nil, promclient.NewDefaultClient(), u, component.Sidecar,
func() labels.Labels { return labels.FromStrings("region", "eu-west") },
func() (int64, int64) { return 0, math.MaxInt64 },
nil)
nil, "")
testutil.Ok(t, err)
srv := newStoreSeriesServer(ctx)

Expand Down
Loading
Loading