Skip to content

Commit

Permalink
lint fixes and empty header issues
Browse files Browse the repository at this point in the history
Signed-off-by: Rishabh Kumar <rishabh.kumar@airbnb.com>
  • Loading branch information
Rishabh Kumar committed Sep 25, 2024
1 parent 8147a7b commit d68e2f2
Show file tree
Hide file tree
Showing 12 changed files with 98 additions and 40 deletions.
4 changes: 3 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,8 @@ build: check-git deps $(PROMU)
@$(PROMU) build --prefix $(PREFIX)

GIT_BRANCH=$(shell $(GIT) rev-parse --abbrev-ref HEAD)
GIT_REVISION := $(shell git rev-parse --short HEAD)
IMAGE_TAG ?= $(subst /,-,$(GIT_BRANCH))-$(GIT_REVISION)
.PHONY: crossbuild
crossbuild: ## Builds all binaries for all platforms.
ifeq ($(GIT_BRANCH), main)
Expand Down Expand Up @@ -197,7 +199,7 @@ docker: build
@echo ">> copying Thanos from $(PREFIX) to ./thanos_tmp_for_docker"
@cp $(PREFIX)/thanos ./thanos_tmp_for_docker
@echo ">> building docker image 'thanos'"
@docker build -t "thanos" --build-arg BASE_DOCKER_SHA=$(BASE_DOCKER_SHA) .
@docker build -t "thanos" --build-arg BASE_DOCKER_SHA=$(BASE_DOCKER_SHA) -t thanos:$(IMAGE_TAG) .
@rm ./thanos_tmp_for_docker
else
docker: docker-multi-stage
Expand Down
3 changes: 3 additions & 0 deletions cmd/thanos/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (

"github.com/thanos-io/thanos/pkg/extkingpin"
"github.com/thanos-io/thanos/pkg/shipper"
"github.com/thanos-io/thanos/pkg/tenancy"
)

type grpcConfig struct {
Expand Down Expand Up @@ -80,12 +81,14 @@ type prometheusConfig struct {
getConfigInterval time.Duration
getConfigTimeout time.Duration
httpClient *extflag.PathOrContent
tenantHeader string
}

func (pc *prometheusConfig) registerFlag(cmd extkingpin.FlagClause) *prometheusConfig {
cmd.Flag("prometheus.url",
"URL at which to reach Prometheus's API. For better performance use local network.").
Default("http://localhost:9090").URLVar(&pc.url)
cmd.Flag("prometheus.tenant-header", "HTTP header to determine tenant.").Default(tenancy.DefaultTenantHeader).StringVar(&pc.tenantHeader)
cmd.Flag("prometheus.ready_timeout",
"Maximum time to wait for the Prometheus instance to start up").
Default("10m").DurationVar(&pc.readyTimeout)
Expand Down
2 changes: 1 addition & 1 deletion cmd/thanos/sidecar.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ func runSidecar(
{
c := promclient.NewWithTracingClient(logger, httpClient, clientconfig.ThanosUserAgent)

promStore, err := store.NewPrometheusStore(logger, reg, c, conf.prometheus.url, component.Sidecar, m.Labels, m.Timestamps, m.Version)
promStore, err := store.NewPrometheusStore(logger, reg, c, conf.prometheus.url, component.Sidecar, m.Labels, m.Timestamps, m.Version, conf.prometheus.tenantHeader)
if err != nil {
return errors.Wrap(err, "create Prometheus store")
}
Expand Down
3 changes: 3 additions & 0 deletions docs/components/sidecar.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ prometheus \
```bash
thanos sidecar \
--tsdb.path "/path/to/prometheus/data/dir" \
--prometheus.tenant-header="THANOS-TENANT" \
--prometheus.url "http://localhost:9090" \
--objstore.config-file "bucket.yml"
```
Expand Down Expand Up @@ -170,6 +171,8 @@ Flags:
--prometheus.ready_timeout=10m
Maximum time to wait for the Prometheus
instance to start up
--prometheus.tenant-header="THANOS-TENANT"
HTTP header to determine tenant.
--prometheus.url=http://localhost:9090
URL at which to reach Prometheus's API.
For better performance use local network.
Expand Down
47 changes: 31 additions & 16 deletions pkg/promclient/promclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"github.com/thanos-io/thanos/pkg/runutil"
"github.com/thanos-io/thanos/pkg/store/storepb"
"github.com/thanos-io/thanos/pkg/targets/targetspb"
"github.com/thanos-io/thanos/pkg/tenancy"
"github.com/thanos-io/thanos/pkg/tracing"
)

Expand Down Expand Up @@ -107,6 +108,7 @@ func NewWithTracingClient(logger log.Logger, httpClient *http.Client, userAgent
// the raw query is encoded in the body and the appropriate Content-Type is set.
func (c *Client) req2xx(ctx context.Context, u *url.URL, method string, headers http.Header) (_ []byte, _ int, err error) {
var b io.Reader

if method == http.MethodPost {
rq := u.RawQuery
b = strings.NewReader(rq)
Expand Down Expand Up @@ -691,11 +693,17 @@ func formatTime(t time.Time) string {
return strconv.FormatFloat(float64(t.Unix())+float64(t.Nanosecond())/1e9, 'f', -1, 64)
}

func (c *Client) get2xxResultWithGRPCErrors(ctx context.Context, spanName string, u *url.URL, data interface{}) error {
func (c *Client) get2xxResultWithGRPCErrors(ctx context.Context, spanName string, u *url.URL, data interface{}, tenantHeader string) error {
span, ctx := tracing.StartSpan(ctx, spanName)
defer span.Finish()

body, code, err := c.req2xx(ctx, u, http.MethodGet, nil)
var customheader http.Header = nil
if len(tenantHeader) > 0 {
customheader = http.Header{}
customheader.Set(tenantHeader, tenancy.GetTenantFromProvidedHeader(ctx, tenantHeader))
}

body, code, err := c.req2xx(ctx, u, http.MethodGet, customheader)
if err != nil {
if code, exists := statusToCode[code]; exists && code != 0 {
return status.Error(code, err.Error())
Expand Down Expand Up @@ -734,27 +742,29 @@ func (c *Client) get2xxResultWithGRPCErrors(ctx context.Context, spanName string

// SeriesInGRPC returns the labels from Prometheus series API. It uses gRPC errors.
// NOTE: This method is tested in pkg/store/prometheus_test.go against Prometheus.
func (c *Client) SeriesInGRPC(ctx context.Context, base *url.URL, matchers []*labels.Matcher, startTime, endTime int64, limit int) ([]map[string]string, error) {
func (c *Client) SeriesInGRPC(ctx context.Context, base *url.URL, matchers []*labels.Matcher, startTime, endTime int64, limit int, tenantHeader string) ([]map[string]string, error) {
u := *base
u.Path = path.Join(u.Path, "/api/v1/series")
q := u.Query()

q.Add("match[]", storepb.PromMatchersToString(matchers...))
q.Add("start", formatTime(timestamp.Time(startTime)))
q.Add("end", formatTime(timestamp.Time(endTime)))
q.Add("limit", strconv.Itoa(limit))
if limit > 0 {
q.Add("limit", strconv.Itoa(limit))
}
u.RawQuery = q.Encode()

var m struct {
Data []map[string]string `json:"data"`
}

return m.Data, c.get2xxResultWithGRPCErrors(ctx, "/prom_series HTTP[client]", &u, &m)
return m.Data, c.get2xxResultWithGRPCErrors(ctx, "/prom_series HTTP[client]", &u, &m, tenantHeader)
}

// LabelNamesInGRPC returns all known label names constrained by the given matchers. It uses gRPC errors.
// NOTE: This method is tested in pkg/store/prometheus_test.go against Prometheus.
func (c *Client) LabelNamesInGRPC(ctx context.Context, base *url.URL, matchers []*labels.Matcher, startTime, endTime int64, limit int) ([]string, error) {
func (c *Client) LabelNamesInGRPC(ctx context.Context, base *url.URL, matchers []*labels.Matcher, startTime, endTime int64, limit int, tenantHeader string) ([]string, error) {
u := *base
u.Path = path.Join(u.Path, "/api/v1/labels")
q := u.Query()
Expand All @@ -764,18 +774,20 @@ func (c *Client) LabelNamesInGRPC(ctx context.Context, base *url.URL, matchers [
}
q.Add("start", formatTime(timestamp.Time(startTime)))
q.Add("end", formatTime(timestamp.Time(endTime)))
q.Add("limit", strconv.Itoa(limit))
if limit > 0 {
q.Add("limit", strconv.Itoa(limit))
}
u.RawQuery = q.Encode()

var m struct {
Data []string `json:"data"`
}
return m.Data, c.get2xxResultWithGRPCErrors(ctx, "/prom_label_names HTTP[client]", &u, &m)
return m.Data, c.get2xxResultWithGRPCErrors(ctx, "/prom_label_names HTTP[client]", &u, &m, tenantHeader)
}

// LabelValuesInGRPC returns all known label values for a given label name. It uses gRPC errors.
// NOTE: This method is tested in pkg/store/prometheus_test.go against Prometheus.
func (c *Client) LabelValuesInGRPC(ctx context.Context, base *url.URL, label string, matchers []*labels.Matcher, startTime, endTime int64, limit int) ([]string, error) {
func (c *Client) LabelValuesInGRPC(ctx context.Context, base *url.URL, label string, matchers []*labels.Matcher, startTime, endTime int64, limit int, tenantHeader string) ([]string, error) {
u := *base
u.Path = path.Join(u.Path, "/api/v1/label/", label, "/values")
q := u.Query()
Expand All @@ -785,13 +797,16 @@ func (c *Client) LabelValuesInGRPC(ctx context.Context, base *url.URL, label str
}
q.Add("start", formatTime(timestamp.Time(startTime)))
q.Add("end", formatTime(timestamp.Time(endTime)))
q.Add("limit", strconv.Itoa(limit))
if limit > 0 {
q.Add("limit", strconv.Itoa(limit))
}

u.RawQuery = q.Encode()

var m struct {
Data []string `json:"data"`
}
return m.Data, c.get2xxResultWithGRPCErrors(ctx, "/prom_label_values HTTP[client]", &u, &m)
return m.Data, c.get2xxResultWithGRPCErrors(ctx, "/prom_label_values HTTP[client]", &u, &m, tenantHeader)
}

// RulesInGRPC returns the rules from Prometheus rules API. It uses gRPC errors.
Expand All @@ -810,7 +825,7 @@ func (c *Client) RulesInGRPC(ctx context.Context, base *url.URL, typeRules strin
Data *rulespb.RuleGroups `json:"data"`
}

if err := c.get2xxResultWithGRPCErrors(ctx, "/prom_rules HTTP[client]", &u, &m); err != nil {
if err := c.get2xxResultWithGRPCErrors(ctx, "/prom_rules HTTP[client]", &u, &m, ""); err != nil {
return nil, err
}

Expand All @@ -833,7 +848,7 @@ func (c *Client) AlertsInGRPC(ctx context.Context, base *url.URL) ([]*rulespb.Al
} `json:"data"`
}

if err := c.get2xxResultWithGRPCErrors(ctx, "/prom_alerts HTTP[client]", &u, &m); err != nil {
if err := c.get2xxResultWithGRPCErrors(ctx, "/prom_alerts HTTP[client]", &u, &m, ""); err != nil {
return nil, err
}

Expand Down Expand Up @@ -863,7 +878,7 @@ func (c *Client) MetricMetadataInGRPC(ctx context.Context, base *url.URL, metric
var v struct {
Data map[string][]*metadatapb.Meta `json:"data"`
}
return v.Data, c.get2xxResultWithGRPCErrors(ctx, "/prom_metric_metadata HTTP[client]", &u, &v)
return v.Data, c.get2xxResultWithGRPCErrors(ctx, "/prom_metric_metadata HTTP[client]", &u, &v, "")
}

// ExemplarsInGRPC returns the exemplars from Prometheus exemplars API. It uses gRPC errors.
Expand All @@ -882,7 +897,7 @@ func (c *Client) ExemplarsInGRPC(ctx context.Context, base *url.URL, query strin
Data []*exemplarspb.ExemplarData `json:"data"`
}

if err := c.get2xxResultWithGRPCErrors(ctx, "/prom_exemplars HTTP[client]", &u, &m); err != nil {
if err := c.get2xxResultWithGRPCErrors(ctx, "/prom_exemplars HTTP[client]", &u, &m, ""); err != nil {
return nil, err
}

Expand All @@ -902,5 +917,5 @@ func (c *Client) TargetsInGRPC(ctx context.Context, base *url.URL, stateTargets
var v struct {
Data *targetspb.TargetDiscovery `json:"data"`
}
return v.Data, c.get2xxResultWithGRPCErrors(ctx, "/prom_targets HTTP[client]", &u, &v)
return v.Data, c.get2xxResultWithGRPCErrors(ctx, "/prom_targets HTTP[client]", &u, &v, "")
}
2 changes: 1 addition & 1 deletion pkg/store/acceptance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -997,7 +997,7 @@ func TestPrometheusStore_Acceptance(t *testing.T) {
promStore, err := NewPrometheusStore(nil, nil, promclient.NewDefaultClient(), u, component.Sidecar,
func() labels.Labels { return extLset },
func() (int64, int64) { return timestamp.FromTime(minTime), timestamp.FromTime(maxTime) },
func() string { return version })
func() string { return version }, "")
testutil.Ok(tt, err)

// We build chunks only for SAMPLES method. Make sure we ask for SAMPLES only.
Expand Down
21 changes: 14 additions & 7 deletions pkg/store/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
"github.com/thanos-io/thanos/pkg/store/labelpb"
"github.com/thanos-io/thanos/pkg/store/storepb"
"github.com/thanos-io/thanos/pkg/store/storepb/prompb"
"github.com/thanos-io/thanos/pkg/tenancy"
"github.com/thanos-io/thanos/pkg/tracing"
)

Expand All @@ -61,6 +62,7 @@ type PrometheusStore struct {
framesRead prometheus.Histogram

storepb.UnimplementedStoreServer
tenantHeader string
}

// Label{Values,Names} call with matchers is supported for Prometheus versions >= 2.24.0.
Expand All @@ -81,6 +83,7 @@ func NewPrometheusStore(
externalLabelsFn func() labels.Labels,
timestamps func() (mint int64, maxt int64),
promVersion func() string,
tenantHeader string,
) (*PrometheusStore, error) {
if logger == nil {
logger = log.NewNopLogger()
Expand All @@ -105,6 +108,7 @@ func NewPrometheusStore(
Buckets: prometheus.ExponentialBuckets(10, 10, 5),
},
),
tenantHeader: tenantHeader,
}
return p, nil
}
Expand Down Expand Up @@ -153,7 +157,7 @@ func (p *PrometheusStore) Series(r *storepb.SeriesRequest, seriesSrv storepb.Sto

if r.SkipChunks {
finalExtLset := rmLabels(extLset.Copy(), extLsetToRemove)
labelMaps, err := p.client.SeriesInGRPC(s.Context(), p.base, matchers, r.MinTime, r.MaxTime, int(r.Limit))
labelMaps, err := p.client.SeriesInGRPC(s.Context(), p.base, matchers, r.MinTime, r.MaxTime, int(r.Limit), p.tenantHeader)
if err != nil {
return err
}
Expand Down Expand Up @@ -471,7 +475,9 @@ func (p *PrometheusStore) startPromRemoteRead(ctx context.Context, q *prompb.Que
preq.Header.Add("Content-Encoding", "snappy")
preq.Header.Set("Content-Type", "application/x-stream-protobuf")
preq.Header.Set("X-Prometheus-Remote-Read-Version", "0.1.0")

if len(p.tenantHeader) > 0 {
preq.Header.Set(p.tenantHeader, tenancy.GetTenantFromProvidedHeader(ctx, p.tenantHeader))
}
preq.Header.Set("User-Agent", clientconfig.ThanosUserAgent)
presp, err = p.client.Do(preq.WithContext(ctx))
if err != nil {
Expand Down Expand Up @@ -551,12 +557,12 @@ func (p *PrometheusStore) LabelNames(ctx context.Context, r *storepb.LabelNamesR

var lbls []string
if len(matchers) == 0 || p.labelCallsSupportMatchers() {
lbls, err = p.client.LabelNamesInGRPC(ctx, p.base, matchers, r.Start, r.End, int(r.Limit))
lbls, err = p.client.LabelNamesInGRPC(ctx, p.base, matchers, r.Start, r.End, int(r.Limit), p.tenantHeader)
if err != nil {
return nil, err
}
} else {
sers, err := p.client.SeriesInGRPC(ctx, p.base, matchers, r.Start, r.End, int(r.Limit))
sers, err := p.client.SeriesInGRPC(ctx, p.base, matchers, r.Start, r.End, int(r.Limit), p.tenantHeader)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -622,7 +628,8 @@ func (p *PrometheusStore) LabelValues(ctx context.Context, r *storepb.LabelValue
if len(matchers) == 0 {
return &storepb.LabelValuesResponse{Values: []string{val}}, nil
}
sers, err = p.client.SeriesInGRPC(ctx, p.base, matchers, r.Start, r.End, int(r.Limit))

sers, err = p.client.SeriesInGRPC(ctx, p.base, matchers, r.Start, r.End, int(r.Limit), p.tenantHeader)
if err != nil {
return nil, err
}
Expand All @@ -633,12 +640,12 @@ func (p *PrometheusStore) LabelValues(ctx context.Context, r *storepb.LabelValue
}

if len(matchers) == 0 || p.labelCallsSupportMatchers() {
vals, err = p.client.LabelValuesInGRPC(ctx, p.base, r.Label, matchers, r.Start, r.End, int(r.Limit))
vals, err = p.client.LabelValuesInGRPC(ctx, p.base, r.Label, matchers, r.Start, r.End, int(r.Limit), p.tenantHeader)
if err != nil {
return nil, err
}
} else {
sers, err = p.client.SeriesInGRPC(ctx, p.base, matchers, r.Start, r.End, int(r.Limit))
sers, err = p.client.SeriesInGRPC(ctx, p.base, matchers, r.Start, r.End, int(r.Limit), p.tenantHeader)
if err != nil {
return nil, err
}
Expand Down
8 changes: 4 additions & 4 deletions pkg/store/prometheus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func testPrometheusStoreSeriesE2e(t *testing.T, prefix string) {
proxy, err := NewPrometheusStore(nil, nil, promclient.NewDefaultClient(), u, component.Sidecar,
func() labels.Labels { return labels.FromStrings("region", "eu-west") },
func() (int64, int64) { return limitMinT, -1 },
nil,
nil, "",
) // MaxTime does not matter.
testutil.Ok(t, err)

Expand Down Expand Up @@ -200,7 +200,7 @@ func TestPrometheusStore_SeriesLabels_e2e(t *testing.T) {
promStore, err := NewPrometheusStore(nil, nil, promclient.NewDefaultClient(), u, component.Sidecar,
func() labels.Labels { return labels.FromStrings("region", "eu-west") },
func() (int64, int64) { return math.MinInt64/1000 + 62135596801, math.MaxInt64/1000 - 62135596801 },
nil,
nil, "",
)
testutil.Ok(t, err)

Expand Down Expand Up @@ -374,7 +374,7 @@ func TestPrometheusStore_Series_MatchExternalLabel(t *testing.T) {
proxy, err := NewPrometheusStore(nil, nil, promclient.NewDefaultClient(), u, component.Sidecar,
func() labels.Labels { return labels.FromStrings("region", "eu-west") },
func() (int64, int64) { return 0, math.MaxInt64 },
nil)
nil, "")
testutil.Ok(t, err)
srv := newStoreSeriesServer(ctx)

Expand Down Expand Up @@ -437,7 +437,7 @@ func TestPrometheusStore_Series_ChunkHashCalculation_Integration(t *testing.T) {
proxy, err := NewPrometheusStore(nil, nil, promclient.NewDefaultClient(), u, component.Sidecar,
func() labels.Labels { return labels.FromStrings("region", "eu-west") },
func() (int64, int64) { return 0, math.MaxInt64 },
nil)
nil, "")
testutil.Ok(t, err)
srv := newStoreSeriesServer(ctx)

Expand Down
Loading

0 comments on commit d68e2f2

Please sign in to comment.