Skip to content

Commit

Permalink
cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
Rishabh Kumar committed Sep 23, 2024
1 parent ab160f0 commit 711db92
Show file tree
Hide file tree
Showing 10 changed files with 76 additions and 40 deletions.
3 changes: 3 additions & 0 deletions cmd/thanos/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (

"github.com/thanos-io/thanos/pkg/extkingpin"
"github.com/thanos-io/thanos/pkg/shipper"
"github.com/thanos-io/thanos/pkg/tenancy"
)

type grpcConfig struct {
Expand Down Expand Up @@ -80,12 +81,14 @@ type prometheusConfig struct {
getConfigInterval time.Duration
getConfigTimeout time.Duration
httpClient *extflag.PathOrContent
tenantHeader string
}

func (pc *prometheusConfig) registerFlag(cmd extkingpin.FlagClause) *prometheusConfig {
cmd.Flag("prometheus.url",
"URL at which to reach Prometheus's API. For better performance use local network.").
Default("http://localhost:9090").URLVar(&pc.url)
cmd.Flag("prometheus.tenant-header", "HTTP header to determine tenant.").Default(tenancy.DefaultTenantHeader).StringVar(&pc.tenantHeader)
cmd.Flag("prometheus.ready_timeout",
"Maximum time to wait for the Prometheus instance to start up").
Default("10m").DurationVar(&pc.readyTimeout)
Expand Down
3 changes: 2 additions & 1 deletion cmd/thanos/sidecar.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,8 @@ func runSidecar(
{
c := promclient.NewWithTracingClient(logger, httpClient, clientconfig.ThanosUserAgent)

promStore, err := store.NewPrometheusStore(logger, reg, c, conf.prometheus.url, component.Sidecar, m.Labels, m.Timestamps, m.Version)
errors.Wrap(err, "create Prometheus header "+conf.prometheus.tenantHeader)

Check failure on line 300 in cmd/thanos/sidecar.go

View workflow job for this annotation

GitHub Actions / Linters (Static Analysis) for Go

Error return value of `errors.Wrap` is not checked (errcheck)
promStore, err := store.NewPrometheusStore(logger, reg, c, conf.prometheus.url, component.Sidecar, m.Labels, m.Timestamps, m.Version, conf.prometheus.tenantHeader)
if err != nil {
return errors.Wrap(err, "create Prometheus store")
}
Expand Down
46 changes: 29 additions & 17 deletions pkg/promclient/promclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"github.com/thanos-io/thanos/pkg/runutil"
"github.com/thanos-io/thanos/pkg/store/storepb"
"github.com/thanos-io/thanos/pkg/targets/targetspb"
"github.com/thanos-io/thanos/pkg/tenancy"
"github.com/thanos-io/thanos/pkg/tracing"
)

Expand Down Expand Up @@ -107,6 +108,7 @@ func NewWithTracingClient(logger log.Logger, httpClient *http.Client, userAgent
// the raw query is encoded in the body and the appropriate Content-Type is set.
func (c *Client) req2xx(ctx context.Context, u *url.URL, method string, headers http.Header) (_ []byte, _ int, err error) {
var b io.Reader

if method == http.MethodPost {
rq := u.RawQuery
b = strings.NewReader(rq)
Expand Down Expand Up @@ -691,11 +693,14 @@ func formatTime(t time.Time) string {
return strconv.FormatFloat(float64(t.Unix())+float64(t.Nanosecond())/1e9, 'f', -1, 64)
}

func (c *Client) get2xxResultWithGRPCErrors(ctx context.Context, spanName string, u *url.URL, data interface{}) error {
func (c *Client) get2xxResultWithGRPCErrors(ctx context.Context, spanName string, u *url.URL, data interface{}, header string) error {
span, ctx := tracing.StartSpan(ctx, spanName)
defer span.Finish()

body, code, err := c.req2xx(ctx, u, http.MethodGet, nil)
var customheader = http.Header{}
customheader.Set(header, tenancy.GetTenantFromProvidedHeader(ctx, header))

body, code, err := c.req2xx(ctx, u, http.MethodGet, customheader)
if err != nil {
if code, exists := statusToCode[code]; exists && code != 0 {
return status.Error(code, err.Error())
Expand Down Expand Up @@ -734,27 +739,29 @@ func (c *Client) get2xxResultWithGRPCErrors(ctx context.Context, spanName string

// SeriesInGRPC returns the labels from Prometheus series API. It uses gRPC errors.
// NOTE: This method is tested in pkg/store/prometheus_test.go against Prometheus.
func (c *Client) SeriesInGRPC(ctx context.Context, base *url.URL, matchers []*labels.Matcher, startTime, endTime int64, limit int) ([]map[string]string, error) {
func (c *Client) SeriesInGRPC(ctx context.Context, base *url.URL, matchers []*labels.Matcher, startTime, endTime int64, limit int, header string) ([]map[string]string, error) {
u := *base
u.Path = path.Join(u.Path, "/api/v1/series")
q := u.Query()

q.Add("match[]", storepb.PromMatchersToString(matchers...))
q.Add("start", formatTime(timestamp.Time(startTime)))
q.Add("end", formatTime(timestamp.Time(endTime)))
q.Add("limit", strconv.Itoa(limit))
if limit > 0 {
q.Add("limit", strconv.Itoa(limit))
}
u.RawQuery = q.Encode()

var m struct {
Data []map[string]string `json:"data"`
}

return m.Data, c.get2xxResultWithGRPCErrors(ctx, "/prom_series HTTP[client]", &u, &m)
return m.Data, c.get2xxResultWithGRPCErrors(ctx, "/prom_series HTTP[client]", &u, &m, header)
}

// LabelNamesInGRPC returns all known label names constrained by the given matchers. It uses gRPC errors.
// NOTE: This method is tested in pkg/store/prometheus_test.go against Prometheus.
func (c *Client) LabelNamesInGRPC(ctx context.Context, base *url.URL, matchers []*labels.Matcher, startTime, endTime int64, limit int) ([]string, error) {
func (c *Client) LabelNamesInGRPC(ctx context.Context, base *url.URL, matchers []*labels.Matcher, startTime, endTime int64, limit int, header string) ([]string, error) {
u := *base
u.Path = path.Join(u.Path, "/api/v1/labels")
q := u.Query()
Expand All @@ -764,18 +771,20 @@ func (c *Client) LabelNamesInGRPC(ctx context.Context, base *url.URL, matchers [
}
q.Add("start", formatTime(timestamp.Time(startTime)))
q.Add("end", formatTime(timestamp.Time(endTime)))
q.Add("limit", strconv.Itoa(limit))
if limit >= 0 {
q.Add("limit", strconv.Itoa(limit))
}
u.RawQuery = q.Encode()

var m struct {
Data []string `json:"data"`
}
return m.Data, c.get2xxResultWithGRPCErrors(ctx, "/prom_label_names HTTP[client]", &u, &m)
return m.Data, c.get2xxResultWithGRPCErrors(ctx, "/prom_label_names HTTP[client]", &u, &m, header)
}

// LabelValuesInGRPC returns all known label values for a given label name. It uses gRPC errors.
// NOTE: This method is tested in pkg/store/prometheus_test.go against Prometheus.
func (c *Client) LabelValuesInGRPC(ctx context.Context, base *url.URL, label string, matchers []*labels.Matcher, startTime, endTime int64, limit int) ([]string, error) {
func (c *Client) LabelValuesInGRPC(ctx context.Context, base *url.URL, label string, matchers []*labels.Matcher, startTime, endTime int64, limit int, header string) ([]string, error) {
u := *base
u.Path = path.Join(u.Path, "/api/v1/label/", label, "/values")
q := u.Query()
Expand All @@ -785,13 +794,16 @@ func (c *Client) LabelValuesInGRPC(ctx context.Context, base *url.URL, label str
}
q.Add("start", formatTime(timestamp.Time(startTime)))
q.Add("end", formatTime(timestamp.Time(endTime)))
q.Add("limit", strconv.Itoa(limit))
if limit >= 0 {
q.Add("limit", strconv.Itoa(limit))
}

u.RawQuery = q.Encode()

var m struct {
Data []string `json:"data"`
}
return m.Data, c.get2xxResultWithGRPCErrors(ctx, "/prom_label_values HTTP[client]", &u, &m)
return m.Data, c.get2xxResultWithGRPCErrors(ctx, "/prom_label_values HTTP[client]", &u, &m, header)
}

// RulesInGRPC returns the rules from Prometheus rules API. It uses gRPC errors.
Expand All @@ -810,7 +822,7 @@ func (c *Client) RulesInGRPC(ctx context.Context, base *url.URL, typeRules strin
Data *rulespb.RuleGroups `json:"data"`
}

if err := c.get2xxResultWithGRPCErrors(ctx, "/prom_rules HTTP[client]", &u, &m); err != nil {
if err := c.get2xxResultWithGRPCErrors(ctx, "/prom_rules HTTP[client]", &u, &m, ""); err != nil {
return nil, err
}

Expand All @@ -833,7 +845,7 @@ func (c *Client) AlertsInGRPC(ctx context.Context, base *url.URL) ([]*rulespb.Al
} `json:"data"`
}

if err := c.get2xxResultWithGRPCErrors(ctx, "/prom_alerts HTTP[client]", &u, &m); err != nil {
if err := c.get2xxResultWithGRPCErrors(ctx, "/prom_alerts HTTP[client]", &u, &m, ""); err != nil {
return nil, err
}

Expand All @@ -854,7 +866,7 @@ func (c *Client) MetricMetadataInGRPC(ctx context.Context, base *url.URL, metric
q.Add("metric", metric)
}
// We only set limit when it is >= 0.
if limit >= 0 {
if limit > 0 {
q.Add("limit", strconv.Itoa(limit))
}

Expand All @@ -863,7 +875,7 @@ func (c *Client) MetricMetadataInGRPC(ctx context.Context, base *url.URL, metric
var v struct {
Data map[string][]*metadatapb.Meta `json:"data"`
}
return v.Data, c.get2xxResultWithGRPCErrors(ctx, "/prom_metric_metadata HTTP[client]", &u, &v)
return v.Data, c.get2xxResultWithGRPCErrors(ctx, "/prom_metric_metadata HTTP[client]", &u, &v, "")
}

// ExemplarsInGRPC returns the exemplars from Prometheus exemplars API. It uses gRPC errors.
Expand All @@ -882,7 +894,7 @@ func (c *Client) ExemplarsInGRPC(ctx context.Context, base *url.URL, query strin
Data []*exemplarspb.ExemplarData `json:"data"`
}

if err := c.get2xxResultWithGRPCErrors(ctx, "/prom_exemplars HTTP[client]", &u, &m); err != nil {
if err := c.get2xxResultWithGRPCErrors(ctx, "/prom_exemplars HTTP[client]", &u, &m, ""); err != nil {
return nil, err
}

Expand All @@ -902,5 +914,5 @@ func (c *Client) TargetsInGRPC(ctx context.Context, base *url.URL, stateTargets
var v struct {
Data *targetspb.TargetDiscovery `json:"data"`
}
return v.Data, c.get2xxResultWithGRPCErrors(ctx, "/prom_targets HTTP[client]", &u, &v)
return v.Data, c.get2xxResultWithGRPCErrors(ctx, "/prom_targets HTTP[client]", &u, &v, "")
}
2 changes: 1 addition & 1 deletion pkg/store/acceptance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -997,7 +997,7 @@ func TestPrometheusStore_Acceptance(t *testing.T) {
promStore, err := NewPrometheusStore(nil, nil, promclient.NewDefaultClient(), u, component.Sidecar,
func() labels.Labels { return extLset },
func() (int64, int64) { return timestamp.FromTime(minTime), timestamp.FromTime(maxTime) },
func() string { return version })
func() string { return version }, "")
testutil.Ok(tt, err)

// We build chunks only for SAMPLES method. Make sure we ask for SAMPLES only.
Expand Down
20 changes: 13 additions & 7 deletions pkg/store/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
"github.com/thanos-io/thanos/pkg/store/labelpb"
"github.com/thanos-io/thanos/pkg/store/storepb"
"github.com/thanos-io/thanos/pkg/store/storepb/prompb"
"github.com/thanos-io/thanos/pkg/tenancy"
"github.com/thanos-io/thanos/pkg/tracing"
)

Expand All @@ -61,6 +62,7 @@ type PrometheusStore struct {
framesRead prometheus.Histogram

storepb.UnimplementedStoreServer
tenantHeader string
}

// Label{Values,Names} call with matchers is supported for Prometheus versions >= 2.24.0.
Expand All @@ -81,6 +83,7 @@ func NewPrometheusStore(
externalLabelsFn func() labels.Labels,
timestamps func() (mint int64, maxt int64),
promVersion func() string,
tenantHeader string,
) (*PrometheusStore, error) {
if logger == nil {
logger = log.NewNopLogger()
Expand All @@ -105,6 +108,7 @@ func NewPrometheusStore(
Buckets: prometheus.ExponentialBuckets(10, 10, 5),
},
),
tenantHeader: tenantHeader,
}
return p, nil
}
Expand Down Expand Up @@ -153,7 +157,7 @@ func (p *PrometheusStore) Series(r *storepb.SeriesRequest, seriesSrv storepb.Sto

if r.SkipChunks {
finalExtLset := rmLabels(extLset.Copy(), extLsetToRemove)
labelMaps, err := p.client.SeriesInGRPC(s.Context(), p.base, matchers, r.MinTime, r.MaxTime, int(r.Limit))
labelMaps, err := p.client.SeriesInGRPC(s.Context(), p.base, matchers, r.MinTime, r.MaxTime, int(r.Limit), p.tenantHeader)
if err != nil {
return err
}
Expand Down Expand Up @@ -468,10 +472,11 @@ func (p *PrometheusStore) startPromRemoteRead(ctx context.Context, q *prompb.Que
if err != nil {
return nil, errors.Wrap(err, "unable to create request")
}
tenantName, _ := tenancy.GetTenantFromGRPCMetadata(ctx)
preq.Header.Add("Content-Encoding", "snappy")
preq.Header.Set("Content-Type", "application/x-stream-protobuf")
preq.Header.Set("X-Prometheus-Remote-Read-Version", "0.1.0")

preq.Header.Set(p.tenantHeader, tenantName)
preq.Header.Set("User-Agent", clientconfig.ThanosUserAgent)
presp, err = p.client.Do(preq.WithContext(ctx))
if err != nil {
Expand Down Expand Up @@ -551,12 +556,12 @@ func (p *PrometheusStore) LabelNames(ctx context.Context, r *storepb.LabelNamesR

var lbls []string
if len(matchers) == 0 || p.labelCallsSupportMatchers() {
lbls, err = p.client.LabelNamesInGRPC(ctx, p.base, matchers, r.Start, r.End, int(r.Limit))
lbls, err = p.client.LabelNamesInGRPC(ctx, p.base, matchers, r.Start, r.End, int(r.Limit), p.tenantHeader)
if err != nil {
return nil, err
}
} else {
sers, err := p.client.SeriesInGRPC(ctx, p.base, matchers, r.Start, r.End, int(r.Limit))
sers, err := p.client.SeriesInGRPC(ctx, p.base, matchers, r.Start, r.End, int(r.Limit), p.tenantHeader)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -622,7 +627,8 @@ func (p *PrometheusStore) LabelValues(ctx context.Context, r *storepb.LabelValue
if len(matchers) == 0 {
return &storepb.LabelValuesResponse{Values: []string{val}}, nil
}
sers, err = p.client.SeriesInGRPC(ctx, p.base, matchers, r.Start, r.End, int(r.Limit))

sers, err = p.client.SeriesInGRPC(ctx, p.base, matchers, r.Start, r.End, int(r.Limit), p.tenantHeader)
if err != nil {
return nil, err
}
Expand All @@ -633,12 +639,12 @@ func (p *PrometheusStore) LabelValues(ctx context.Context, r *storepb.LabelValue
}

if len(matchers) == 0 || p.labelCallsSupportMatchers() {
vals, err = p.client.LabelValuesInGRPC(ctx, p.base, r.Label, matchers, r.Start, r.End, int(r.Limit))
vals, err = p.client.LabelValuesInGRPC(ctx, p.base, r.Label, matchers, r.Start, r.End, int(r.Limit), p.tenantHeader)
if err != nil {
return nil, err
}
} else {
sers, err = p.client.SeriesInGRPC(ctx, p.base, matchers, r.Start, r.End, int(r.Limit))
sers, err = p.client.SeriesInGRPC(ctx, p.base, matchers, r.Start, r.End, int(r.Limit), p.tenantHeader)
if err != nil {
return nil, err
}
Expand Down
8 changes: 4 additions & 4 deletions pkg/store/prometheus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func testPrometheusStoreSeriesE2e(t *testing.T, prefix string) {
proxy, err := NewPrometheusStore(nil, nil, promclient.NewDefaultClient(), u, component.Sidecar,
func() labels.Labels { return labels.FromStrings("region", "eu-west") },
func() (int64, int64) { return limitMinT, -1 },
nil,
nil, "",
) // MaxTime does not matter.
testutil.Ok(t, err)

Expand Down Expand Up @@ -200,7 +200,7 @@ func TestPrometheusStore_SeriesLabels_e2e(t *testing.T) {
promStore, err := NewPrometheusStore(nil, nil, promclient.NewDefaultClient(), u, component.Sidecar,
func() labels.Labels { return labels.FromStrings("region", "eu-west") },
func() (int64, int64) { return math.MinInt64/1000 + 62135596801, math.MaxInt64/1000 - 62135596801 },
nil,
nil, "",
)
testutil.Ok(t, err)

Expand Down Expand Up @@ -374,7 +374,7 @@ func TestPrometheusStore_Series_MatchExternalLabel(t *testing.T) {
proxy, err := NewPrometheusStore(nil, nil, promclient.NewDefaultClient(), u, component.Sidecar,
func() labels.Labels { return labels.FromStrings("region", "eu-west") },
func() (int64, int64) { return 0, math.MaxInt64 },
nil)
nil, "")
testutil.Ok(t, err)
srv := newStoreSeriesServer(ctx)

Expand Down Expand Up @@ -437,7 +437,7 @@ func TestPrometheusStore_Series_ChunkHashCalculation_Integration(t *testing.T) {
proxy, err := NewPrometheusStore(nil, nil, promclient.NewDefaultClient(), u, component.Sidecar,
func() labels.Labels { return labels.FromStrings("region", "eu-west") },
func() (int64, int64) { return 0, math.MaxInt64 },
nil)
nil, "")
testutil.Ok(t, err)
srv := newStoreSeriesServer(ctx)

Expand Down
6 changes: 5 additions & 1 deletion pkg/store/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,7 @@ func (s *ProxyStore) Series(originalRequest *storepb.SeriesRequest, srv storepb.
}

ctx = metadata.AppendToOutgoingContext(ctx, tenancy.DefaultTenantHeader, tenant)
ctx = metadata.AppendToOutgoingContext(ctx, tenancy.CustomHeader, tenant)
level.Debug(s.logger).Log("msg", "Tenant info in Series()", "tenant", tenant)

stores, storeLabelSets, storeDebugMsgs := s.matchingStores(ctx, originalRequest.MinTime, originalRequest.MaxTime, matchers)
Expand Down Expand Up @@ -361,6 +362,7 @@ func (s *ProxyStore) LabelNames(ctx context.Context, originalRequest *storepb.La
}

ctx = metadata.AppendToOutgoingContext(ctx, tenancy.DefaultTenantHeader, tenant)
ctx = metadata.AppendToOutgoingContext(ctx, tenancy.CustomHeader, tenant)
level.Debug(s.logger).Log("msg", "Tenant info in LabelNames()", "tenant", tenant)

stores, storeLabelSets, storeDebugMsgs := s.matchingStores(ctx, originalRequest.Start, originalRequest.End, matchers)
Expand All @@ -375,6 +377,7 @@ func (s *ProxyStore) LabelNames(ctx context.Context, originalRequest *storepb.La
End: originalRequest.End,
Matchers: append(storeMatchers, MatchersForLabelSets(storeLabelSets)...),
WithoutReplicaLabels: originalRequest.WithoutReplicaLabels,
Limit: originalRequest.Limit,
Hints: originalRequest.Hints,
}

Expand Down Expand Up @@ -464,7 +467,8 @@ func (s *ProxyStore) LabelValues(ctx context.Context, originalRequest *storepb.L
}

ctx = metadata.AppendToOutgoingContext(ctx, tenancy.DefaultTenantHeader, tenant)
level.Debug(reqLogger).Log("msg", "Tenant info in LabelValues()", "tenant", tenant)
ctx = metadata.AppendToOutgoingContext(ctx, tenancy.CustomHeader, tenant)
level.Debug(s.logger).Log("msg", "Tenant info in LabelNames()", "tenant", tenant)

stores, storeLabelSets, storeDebugMsgs := s.matchingStores(ctx, originalRequest.Start, originalRequest.End, matchers)
if len(stores) == 0 {
Expand Down
Loading

0 comments on commit 711db92

Please sign in to comment.