Skip to content

Commit

Permalink
Implement federated metric metadata API (#3686)
Browse files Browse the repository at this point in the history
* support federated metadata API

Signed-off-by: Ben Ye <yb532204897@gmail.com>

* update comments

Signed-off-by: yeya24 <yb532204897@gmail.com>

* use parseInt

Signed-off-by: yeya24 <yb532204897@gmail.com>

* address Prem's comments

Signed-off-by: yeya24 <yb532204897@gmail.com>

* update proto comment

Signed-off-by: yeya24 <yb532204897@gmail.com>

* add changelog

Signed-off-by: yeya24 <yb532204897@gmail.com>
  • Loading branch information
yeya24 authored Feb 25, 2021
1 parent f969003 commit 46d0106
Show file tree
Hide file tree
Showing 24 changed files with 2,281 additions and 28 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ We use _breaking :warning:_ to mark changes that are not backward compatible (re
- [#3700](https://github.com/thanos-io/thanos/pull/3700) ui: make old bucket viewer UI work with vanilla Prometheus blocks
- [#2641](https://github.com/thanos-io/thanos/issues/2641) Query Frontend: Added `--query-range.request-downsampled` flag enabling additional queries for downsampled data in case of empty or incomplete response to range request.
- [#3792](https://github.com/thanos-io/thanos/pull/3792) Receiver: Added `--tsdb.allow-overlapping-blocks` flag to allow overlapping tsdb blocks and enable vertical compaction
- [#3686](https://github.com/thanos-io/thanos/pull/3686) Query: Added federated metric metadata support.

### Fixed

Expand Down
36 changes: 36 additions & 0 deletions cmd/thanos/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
extpromhttp "github.com/thanos-io/thanos/pkg/extprom/http"
"github.com/thanos-io/thanos/pkg/gate"
"github.com/thanos-io/thanos/pkg/logging"
"github.com/thanos-io/thanos/pkg/metadata"
"github.com/thanos-io/thanos/pkg/prober"
"github.com/thanos-io/thanos/pkg/query"
"github.com/thanos-io/thanos/pkg/rules"
Expand Down Expand Up @@ -96,6 +97,9 @@ func registerQuery(app *extkingpin.App) {
ruleEndpoints := cmd.Flag("rule", "Experimental: Addresses of statically configured rules API servers (repeatable). The scheme may be prefixed with 'dns+' or 'dnssrv+' to detect rule API servers through respective DNS lookups.").
Hidden().PlaceHolder("<rule>").Strings()

metadataEndpoints := cmd.Flag("metadata", "Experimental: Addresses of statically configured metadata API servers (repeatable). The scheme may be prefixed with 'dns+' or 'dnssrv+' to detect metadata API servers through respective DNS lookups.").
Hidden().PlaceHolder("<metadata>").Strings()

strictStores := cmd.Flag("store-strict", "Addresses of only statically configured store API servers that are always used, even if the health check fails. Useful if you have a caching layer on top.").
PlaceHolder("<staticstore>").Strings()

Expand Down Expand Up @@ -123,6 +127,9 @@ func registerQuery(app *extkingpin.App) {
enableRulePartialResponse := cmd.Flag("rule.partial-response", "Enable partial response for rules endpoint. --no-rule.partial-response for disabling.").
Hidden().Default("true").Bool()

enableMetricMetadataPartialResponse := cmd.Flag("metric-metadata.partial-response", "Enable partial response for metric metadata endpoint. --no-metric-metadata.partial-response for disabling.").
Hidden().Default("true").Bool()

defaultEvaluationInterval := extkingpin.ModelDuration(cmd.Flag("query.default-evaluation-interval", "Set default evaluation interval for sub queries.").Default("1m"))

defaultRangeQueryStep := extkingpin.ModelDuration(cmd.Flag("query.default-step", "Set default step for range queries. Default step is only used when step is not set in UI. In such cases, Thanos UI will use default step to calculate resolution (resolution = max(rangeSeconds / 250, defaultStep)). This will not work from Grafana, but Grafana has __step variable which can be used.").
Expand All @@ -144,6 +151,10 @@ func registerQuery(app *extkingpin.App) {
return errors.Errorf("Address %s is duplicated for --rule flag.", dup)
}

if dup := firstDuplicate(*metadataEndpoints); dup != "" {
return errors.Errorf("Address %s is duplicated for --metadata flag.", dup)
}

var fileSD *file.Discovery
if len(*fileSDFiles) > 0 {
conf := &file.SDConfig{
Expand Down Expand Up @@ -195,9 +206,11 @@ func registerQuery(app *extkingpin.App) {
getFlagsMap(cmd.Flags()),
*stores,
*ruleEndpoints,
*metadataEndpoints,
*enableAutodownsampling,
*enableQueryPartialResponse,
*enableRulePartialResponse,
*enableMetricMetadataPartialResponse,
fileSD,
time.Duration(*dnsSDInterval),
*dnsSDResolver,
Expand Down Expand Up @@ -246,9 +259,11 @@ func runQuery(
flagsMap map[string]string,
storeAddrs []string,
ruleAddrs []string,
metadataAddrs []string,
enableAutodownsampling bool,
enableQueryPartialResponse bool,
enableRulePartialResponse bool,
enableMetricMetadataPartialResponse bool,
fileSD *file.Discovery,
dnsSDInterval time.Duration,
dnsSDResolver string,
Expand Down Expand Up @@ -288,6 +303,12 @@ func runQuery(
dns.ResolverType(dnsSDResolver),
)

dnsMetadataProvider := dns.NewProvider(
logger,
extprom.WrapRegistererWithPrefix("thanos_query_metadata_apis_", reg),
dns.ResolverType(dnsSDResolver),
)

var (
stores = query.NewStoreSet(
logger,
Expand All @@ -314,11 +335,19 @@ func runQuery(

return specs
},
func() (specs []query.MetadataSpec) {
for _, addr := range dnsMetadataProvider.Addresses() {
specs = append(specs, query.NewGRPCStoreSpec(addr, false))
}

return specs
},
dialOpts,
unhealthyStoreTimeout,
)
proxy = store.NewProxyStore(logger, reg, stores.Get, component.Query, selectorLset, storeResponseTimeout)
rulesProxy = rules.NewProxy(logger, stores.GetRulesClients)
metadataProxy = metadata.NewProxy(logger, stores.GetMetadataClients)
queryableCreator = query.NewQueryableCreator(
logger,
extprom.WrapRegistererWithPrefix("thanos_query_", reg),
Expand Down Expand Up @@ -381,6 +410,7 @@ func runQuery(
if err := dnsStoreProvider.Resolve(ctxUpdate, append(fileSDCache.Addresses(), storeAddrs...)); err != nil {
level.Error(logger).Log("msg", "failed to resolve addresses for storeAPIs", "err", err)
}

// Rules apis do not support file service discovery as of now.
case <-ctxUpdate.Done():
return nil
Expand All @@ -404,6 +434,9 @@ func runQuery(
if err := dnsRuleProvider.Resolve(resolveCtx, ruleAddrs); err != nil {
level.Error(logger).Log("msg", "failed to resolve addresses for rulesAPIs", "err", err)
}
if err := dnsMetadataProvider.Resolve(resolveCtx, metadataAddrs); err != nil {
level.Error(logger).Log("msg", "failed to resolve addresses for metadataAPIs", "err", err)
}
return nil
})
}, func(error) {
Expand Down Expand Up @@ -454,9 +487,11 @@ func runQuery(
queryableCreator,
// NOTE: Will share the same replica label as the query for now.
rules.NewGRPCClientWithDedup(rulesProxy, queryReplicaLabels),
metadata.NewGRPCClient(metadataProxy),
enableAutodownsampling,
enableQueryPartialResponse,
enableRulePartialResponse,
enableMetricMetadataPartialResponse,
queryReplicaLabels,
flagsMap,
defaultRangeQueryStep,
Expand Down Expand Up @@ -497,6 +532,7 @@ func runQuery(
s := grpcserver.New(logger, reg, tracer, comp, grpcProbe,
grpcserver.WithServer(store.RegisterStoreServer(proxy)),
grpcserver.WithServer(rules.RegisterRulesServer(rulesProxy)),
grpcserver.WithServer(metadata.RegisterMetadataServer(metadataProxy)),
grpcserver.WithListen(grpcBindAddr),
grpcserver.WithGracePeriod(grpcGracePeriod),
grpcserver.WithTLSConfig(tlsCfg),
Expand Down
2 changes: 2 additions & 0 deletions cmd/thanos/sidecar.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/thanos-io/thanos/pkg/extkingpin"
"github.com/thanos-io/thanos/pkg/extprom"
thanoshttp "github.com/thanos-io/thanos/pkg/http"
meta "github.com/thanos-io/thanos/pkg/metadata"
thanosmodel "github.com/thanos-io/thanos/pkg/model"
"github.com/thanos-io/thanos/pkg/objstore/client"
"github.com/thanos-io/thanos/pkg/prober"
Expand Down Expand Up @@ -218,6 +219,7 @@ func runSidecar(
s := grpcserver.New(logger, reg, tracer, comp, grpcProbe,
grpcserver.WithServer(store.RegisterStoreServer(promStore)),
grpcserver.WithServer(rules.RegisterRulesServer(rules.NewPrometheus(conf.prometheus.url, c, m.Labels))),
grpcserver.WithServer(meta.RegisterMetadataServer(meta.NewPrometheus(conf.prometheus.url, c))),
grpcserver.WithListen(conf.grpc.bindAddress),
grpcserver.WithGracePeriod(time.Duration(conf.grpc.gracePeriod)),
grpcserver.WithTLSConfig(tlsCfg),
Expand Down
50 changes: 46 additions & 4 deletions pkg/api/query/v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ import (
extpromhttp "github.com/thanos-io/thanos/pkg/extprom/http"
"github.com/thanos-io/thanos/pkg/gate"
"github.com/thanos-io/thanos/pkg/logging"
"github.com/thanos-io/thanos/pkg/metadata"
"github.com/thanos-io/thanos/pkg/metadata/metadatapb"
"github.com/thanos-io/thanos/pkg/query"
"github.com/thanos-io/thanos/pkg/rules"
"github.com/thanos-io/thanos/pkg/rules/rulespb"
Expand Down Expand Up @@ -70,10 +72,12 @@ type QueryAPI struct {
// queryEngine returns appropriate promql.Engine for a query with a given step.
queryEngine func(int64) *promql.Engine
ruleGroups rules.UnaryClient
metadatas metadata.UnaryClient

enableAutodownsampling bool
enableQueryPartialResponse bool
enableRulePartialResponse bool
enableAutodownsampling bool
enableQueryPartialResponse bool
enableRulePartialResponse bool
enableMetricMetadataPartialResponse bool

replicaLabels []string
storeSet *query.StoreSet
Expand All @@ -90,9 +94,11 @@ func NewQueryAPI(
qe func(int64) *promql.Engine,
c query.QueryableCreator,
ruleGroups rules.UnaryClient,
metadatas metadata.UnaryClient,
enableAutodownsampling bool,
enableQueryPartialResponse bool,
enableRulePartialResponse bool,
enableMetricMetadataPartialResponse bool,
replicaLabels []string,
flagsMap map[string]string,
defaultRangeQueryStep time.Duration,
Expand All @@ -107,10 +113,12 @@ func NewQueryAPI(
queryableCreate: c,
gate: gate,
ruleGroups: ruleGroups,
metadatas: metadatas,

enableAutodownsampling: enableAutodownsampling,
enableQueryPartialResponse: enableQueryPartialResponse,
enableRulePartialResponse: enableRulePartialResponse,
enableMetricMetadataPartialResponse: enableMetricMetadataPartialResponse,
replicaLabels: replicaLabels,
storeSet: storeSet,
defaultRangeQueryStep: defaultRangeQueryStep,
Expand Down Expand Up @@ -142,6 +150,8 @@ func (qapi *QueryAPI) Register(r *route.Router, tracer opentracing.Tracer, logge
r.Get("/stores", instr("stores", qapi.stores))

r.Get("/rules", instr("rules", NewRulesHandler(qapi.ruleGroups, qapi.enableRulePartialResponse)))

r.Get("/metadata", instr("metadata", NewMetricMetadataHandler(qapi.metadatas, qapi.enableMetricMetadataPartialResponse)))
}

type queryData struct {
Expand Down Expand Up @@ -630,7 +640,7 @@ func (qapi *QueryAPI) labelNames(r *http.Request) (interface{}, []error, *api.Ap
return names, warnings, nil
}

func (qapi *QueryAPI) stores(r *http.Request) (interface{}, []error, *api.ApiError) {
func (qapi *QueryAPI) stores(_ *http.Request) (interface{}, []error, *api.ApiError) {
statuses := make(map[string][]query.StoreStatus)
for _, status := range qapi.storeSet.GetStoreStatus() {
statuses[status.StoreType.String()] = append(statuses[status.StoreType.String()], status)
Expand Down Expand Up @@ -790,3 +800,35 @@ func labelValuesByMatchers(sets []storage.SeriesSet, name string) ([]string, sto
sort.Strings(labelValues)
return labelValues, warnings, nil
}

func NewMetricMetadataHandler(client metadata.UnaryClient, enablePartialResponse bool) func(*http.Request) (interface{}, []error, *api.ApiError) {
ps := storepb.PartialResponseStrategy_ABORT
if enablePartialResponse {
ps = storepb.PartialResponseStrategy_WARN
}

return func(r *http.Request) (interface{}, []error, *api.ApiError) {
req := &metadatapb.MetadataRequest{
// By default we use -1, which means no limit.
Limit: -1,
Metric: r.URL.Query().Get("metric"),
PartialResponseStrategy: ps,
}

limitStr := r.URL.Query().Get("limit")
if limitStr != "" {
limit, err := strconv.ParseInt(limitStr, 10, 32)
if err != nil {
return nil, nil, &api.ApiError{Typ: api.ErrorBadData, Err: errors.Errorf("invalid metric metadata limit='%v'", limit)}
}
req.Limit = int32(limit)
}

t, warnings, err := client.Metadata(r.Context(), req)
if err != nil {
return nil, nil, &api.ApiError{Typ: api.ErrorInternal, Err: errors.Wrap(err, "retrieving metadata")}
}

return t, warnings, nil
}
}
108 changes: 108 additions & 0 deletions pkg/metadata/metadata.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
// Copyright (c) The Thanos Authors.
// Licensed under the Apache License 2.0.

package metadata

import (
"context"

"github.com/pkg/errors"
"github.com/prometheus/prometheus/storage"
"github.com/thanos-io/thanos/pkg/metadata/metadatapb"
)

var _ UnaryClient = &GRPCClient{}

// UnaryClient is gRPC metadatapb.Metadata client which expands streaming metadata API. Useful for consumers that does not
// support streaming.
type UnaryClient interface {
Metadata(ctx context.Context, req *metadatapb.MetadataRequest) (map[string][]metadatapb.Meta, storage.Warnings, error)
}

// GRPCClient allows to retrieve metadata from local gRPC streaming server implementation.
// TODO(bwplotka): Switch to native gRPC transparent client->server adapter once available.
type GRPCClient struct {
proxy metadatapb.MetadataServer
}

func NewGRPCClient(ts metadatapb.MetadataServer) *GRPCClient {
return &GRPCClient{
proxy: ts,
}
}

func (rr *GRPCClient) Metadata(ctx context.Context, req *metadatapb.MetadataRequest) (map[string][]metadatapb.Meta, storage.Warnings, error) {
srv := &metadataServer{ctx: ctx, metric: req.Metric, limit: int(req.Limit)}

if req.Limit >= 0 {
if req.Metric != "" {
srv.metadataMap = make(map[string][]metadatapb.Meta, 1)
} else if req.Limit <= 100 {
srv.metadataMap = make(map[string][]metadatapb.Meta, req.Limit)
} else {
srv.metadataMap = make(map[string][]metadatapb.Meta)
}
} else {
srv.metadataMap = make(map[string][]metadatapb.Meta)
}

if err := rr.proxy.Metadata(req, srv); err != nil {
return nil, nil, errors.Wrap(err, "proxy Metadata")
}

return srv.metadataMap, srv.warnings, nil
}

type metadataServer struct {
// This field just exist to pseudo-implement the unused methods of the interface.
metadatapb.Metadata_MetadataServer
ctx context.Context

metric string
limit int

warnings []error
metadataMap map[string][]metadatapb.Meta
}

func (srv *metadataServer) Send(res *metadatapb.MetadataResponse) error {
if res.GetWarning() != "" {
srv.warnings = append(srv.warnings, errors.New(res.GetWarning()))
return nil
}

if res.GetMetadata() == nil {
return errors.New("no metadata")
}

// If limit is set to 0, we don't need to add anything.
if srv.limit == 0 {
return nil
}

for k, v := range res.GetMetadata().Metadata {
if metadata, ok := srv.metadataMap[k]; !ok {
// If limit is set and it is positive, we limit the size of the map.
if srv.limit < 0 || srv.limit > 0 && len(srv.metadataMap) < srv.limit {
srv.metadataMap[k] = v.Metas
}
} else {
// There shouldn't be many metadata for one single metric.
Outer:
for _, meta := range v.Metas {
for _, m := range metadata {
if meta == m {
continue Outer
}
}
srv.metadataMap[k] = append(srv.metadataMap[k], meta)
}
}
}

return nil
}

func (srv *metadataServer) Context() context.Context {
return srv.ctx
}
28 changes: 28 additions & 0 deletions pkg/metadata/metadatapb/custom.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
// Copyright (c) The Thanos Authors.
// Licensed under the Apache License 2.0.

package metadatapb

import (
"unsafe"
)

func NewMetadataResponse(metadata *MetricMetadata) *MetadataResponse {
return &MetadataResponse{
Result: &MetadataResponse_Metadata{
Metadata: metadata,
},
}
}

func NewWarningMetadataResponse(warning error) *MetadataResponse {
return &MetadataResponse{
Result: &MetadataResponse_Warning{
Warning: warning.Error(),
},
}
}

func FromMetadataMap(m map[string][]Meta) *MetricMetadata {
return &MetricMetadata{Metadata: *(*map[string]MetricMetadataEntry)(unsafe.Pointer(&m))}
}
Loading

0 comments on commit 46d0106

Please sign in to comment.