Skip to content

Commit

Permalink
support federated metadata API
Browse files Browse the repository at this point in the history
Signed-off-by: Ben Ye <yb532204897@gmail.com>
  • Loading branch information
yeya24 committed Dec 30, 2020
1 parent 9228b90 commit 0f791fa
Show file tree
Hide file tree
Showing 23 changed files with 2,282 additions and 27 deletions.
39 changes: 39 additions & 0 deletions cmd/thanos/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
extpromhttp "github.com/thanos-io/thanos/pkg/extprom/http"
"github.com/thanos-io/thanos/pkg/gate"
"github.com/thanos-io/thanos/pkg/logging"
"github.com/thanos-io/thanos/pkg/metadata"
"github.com/thanos-io/thanos/pkg/prober"
"github.com/thanos-io/thanos/pkg/query"
"github.com/thanos-io/thanos/pkg/rules"
Expand Down Expand Up @@ -96,6 +97,9 @@ func registerQuery(app *extkingpin.App) {
ruleEndpoints := cmd.Flag("rule", "Experimental: Addresses of statically configured rules API servers (repeatable). The scheme may be prefixed with 'dns+' or 'dnssrv+' to detect rule API servers through respective DNS lookups.").
Hidden().PlaceHolder("<rule>").Strings()

metadataEndpoints := cmd.Flag("metadata", "Experimental: Addresses of statically configured metadata API servers (repeatable). The scheme may be prefixed with 'dns+' or 'dnssrv+' to detect metadata API servers through respective DNS lookups.").
Hidden().PlaceHolder("<metadata>").Strings()

strictStores := cmd.Flag("store-strict", "Addresses of only statically configured store API servers that are always used, even if the health check fails. Useful if you have a caching layer on top.").
PlaceHolder("<staticstore>").Strings()

Expand Down Expand Up @@ -123,6 +127,9 @@ func registerQuery(app *extkingpin.App) {
enableRulePartialResponse := cmd.Flag("rule.partial-response", "Enable partial response for rules endpoint. --no-rule.partial-response for disabling.").
Hidden().Default("true").Bool()

enableMetricMetadataPartialResponse := cmd.Flag("metric-metadata.partial-response", "Enable partial response for metric metadata endpoint. --no-metric-metadata.partial-response for disabling.").
Hidden().Default("true").Bool()

defaultEvaluationInterval := extkingpin.ModelDuration(cmd.Flag("query.default-evaluation-interval", "Set default evaluation interval for sub queries.").Default("1m"))

storeResponseTimeout := extkingpin.ModelDuration(cmd.Flag("store.response-timeout", "If a Store doesn't send any data in this specified duration then a Store will be ignored and partial data will be returned if it's enabled. 0 disables timeout.").Default("0ms"))
Expand All @@ -141,6 +148,10 @@ func registerQuery(app *extkingpin.App) {
return errors.Errorf("Address %s is duplicated for --rule flag.", dup)
}

if dup := firstDuplicate(*metadataEndpoints); dup != "" {
return errors.Errorf("Address %s is duplicated for --target flag.", dup)
}

var fileSD *file.Discovery
if len(*fileSDFiles) > 0 {
conf := &file.SDConfig{
Expand Down Expand Up @@ -191,9 +202,11 @@ func registerQuery(app *extkingpin.App) {
getFlagsMap(cmd.Flags()),
*stores,
*ruleEndpoints,
*metadataEndpoints,
*enableAutodownsampling,
*enableQueryPartialResponse,
*enableRulePartialResponse,
*enableMetricMetadataPartialResponse,
fileSD,
time.Duration(*dnsSDInterval),
*dnsSDResolver,
Expand Down Expand Up @@ -241,9 +254,11 @@ func runQuery(
flagsMap map[string]string,
storeAddrs []string,
ruleAddrs []string,
metadataAddrs []string,
enableAutodownsampling bool,
enableQueryPartialResponse bool,
enableRulePartialResponse bool,
enableMetricMetadataPartialResponse bool,
fileSD *file.Discovery,
dnsSDInterval time.Duration,
dnsSDResolver string,
Expand Down Expand Up @@ -283,6 +298,12 @@ func runQuery(
dns.ResolverType(dnsSDResolver),
)

dnsMetadataProvider := dns.NewProvider(
logger,
extprom.WrapRegistererWithPrefix("thanos_query_metadata_apis_", reg),
dns.ResolverType(dnsSDResolver),
)

var (
stores = query.NewStoreSet(
logger,
Expand All @@ -309,11 +330,22 @@ func runQuery(

return specs
},
func() (specs []query.MetadataSpec) {
for _, addr := range dnsMetadataProvider.Addresses() {
specs = append(specs, query.NewGRPCStoreSpec(addr, false))
}

// NOTE(s-urbaniak): No need to remove duplicates, as rule apis are a subset of store apis.
// hence, any duplicates will be tracked in the store api set.

return specs
},
dialOpts,
unhealthyStoreTimeout,
)
proxy = store.NewProxyStore(logger, reg, stores.Get, component.Query, selectorLset, storeResponseTimeout)
rulesProxy = rules.NewProxy(logger, stores.GetRulesClients)
metadataProxy = metadata.NewProxy(logger, stores.GetMetadataClients)
queryableCreator = query.NewQueryableCreator(
logger,
extprom.WrapRegistererWithPrefix("thanos_query_", reg),
Expand Down Expand Up @@ -376,6 +408,7 @@ func runQuery(
if err := dnsStoreProvider.Resolve(ctxUpdate, append(fileSDCache.Addresses(), storeAddrs...)); err != nil {
level.Error(logger).Log("msg", "failed to resolve addresses for storeAPIs", "err", err)
}

// Rules apis do not support file service discovery as of now.
case <-ctxUpdate.Done():
return nil
Expand All @@ -399,6 +432,9 @@ func runQuery(
if err := dnsRuleProvider.Resolve(resolveCtx, ruleAddrs); err != nil {
level.Error(logger).Log("msg", "failed to resolve addresses for rulesAPIs", "err", err)
}
if err := dnsMetadataProvider.Resolve(resolveCtx, metadataAddrs); err != nil {
level.Error(logger).Log("msg", "failed to resolve addresses for rulesAPIs", "err", err)
}
return nil
})
}, func(error) {
Expand Down Expand Up @@ -449,9 +485,11 @@ func runQuery(
queryableCreator,
// NOTE: Will share the same replica label as the query for now.
rules.NewGRPCClientWithDedup(rulesProxy, queryReplicaLabels),
metadata.NewGRPCClient(metadataProxy),
enableAutodownsampling,
enableQueryPartialResponse,
enableRulePartialResponse,
enableMetricMetadataPartialResponse,
queryReplicaLabels,
flagsMap,
instantDefaultMaxSourceResolution,
Expand Down Expand Up @@ -491,6 +529,7 @@ func runQuery(
s := grpcserver.New(logger, reg, tracer, comp, grpcProbe,
grpcserver.WithServer(store.RegisterStoreServer(proxy)),
grpcserver.WithServer(rules.RegisterRulesServer(rulesProxy)),
grpcserver.WithServer(metadata.RegisterMetadataServer(metadataProxy)),
grpcserver.WithListen(grpcBindAddr),
grpcserver.WithGracePeriod(grpcGracePeriod),
grpcserver.WithTLSConfig(tlsCfg),
Expand Down
2 changes: 2 additions & 0 deletions cmd/thanos/sidecar.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/thanos-io/thanos/pkg/extkingpin"
"github.com/thanos-io/thanos/pkg/extprom"
thanoshttp "github.com/thanos-io/thanos/pkg/http"
meta "github.com/thanos-io/thanos/pkg/metadata"
thanosmodel "github.com/thanos-io/thanos/pkg/model"
"github.com/thanos-io/thanos/pkg/objstore/client"
"github.com/thanos-io/thanos/pkg/prober"
Expand Down Expand Up @@ -218,6 +219,7 @@ func runSidecar(
s := grpcserver.New(logger, reg, tracer, comp, grpcProbe,
grpcserver.WithServer(store.RegisterStoreServer(promStore)),
grpcserver.WithServer(rules.RegisterRulesServer(rules.NewPrometheus(conf.prometheus.url, c, m.Labels))),
grpcserver.WithServer(meta.RegisterMetadataServer(meta.NewPrometheus(conf.prometheus.url, c))),
grpcserver.WithListen(conf.grpc.bindAddress),
grpcserver.WithGracePeriod(time.Duration(conf.grpc.gracePeriod)),
grpcserver.WithTLSConfig(tlsCfg),
Expand Down
50 changes: 46 additions & 4 deletions pkg/api/query/v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ import (
extpromhttp "github.com/thanos-io/thanos/pkg/extprom/http"
"github.com/thanos-io/thanos/pkg/gate"
"github.com/thanos-io/thanos/pkg/logging"
"github.com/thanos-io/thanos/pkg/metadata"
"github.com/thanos-io/thanos/pkg/metadata/metadatapb"
"github.com/thanos-io/thanos/pkg/query"
"github.com/thanos-io/thanos/pkg/rules"
"github.com/thanos-io/thanos/pkg/rules/rulespb"
Expand All @@ -69,10 +71,12 @@ type QueryAPI struct {
// queryEngine returns appropriate promql.Engine for a query with a given step.
queryEngine func(int64) *promql.Engine
ruleGroups rules.UnaryClient
metadatas metadata.UnaryClient

enableAutodownsampling bool
enableQueryPartialResponse bool
enableRulePartialResponse bool
enableAutodownsampling bool
enableQueryPartialResponse bool
enableRulePartialResponse bool
enableMetricMetadataPartialResponse bool

replicaLabels []string
storeSet *query.StoreSet
Expand All @@ -88,9 +92,11 @@ func NewQueryAPI(
qe func(int64) *promql.Engine,
c query.QueryableCreator,
ruleGroups rules.UnaryClient,
metadatas metadata.UnaryClient,
enableAutodownsampling bool,
enableQueryPartialResponse bool,
enableRulePartialResponse bool,
enableMetricMetadataPartialResponse bool,
replicaLabels []string,
flagsMap map[string]string,
defaultInstantQueryMaxSourceResolution time.Duration,
Expand All @@ -104,10 +110,12 @@ func NewQueryAPI(
queryableCreate: c,
gate: gate,
ruleGroups: ruleGroups,
metadatas: metadatas,

enableAutodownsampling: enableAutodownsampling,
enableQueryPartialResponse: enableQueryPartialResponse,
enableRulePartialResponse: enableRulePartialResponse,
enableMetricMetadataPartialResponse: enableMetricMetadataPartialResponse,
replicaLabels: replicaLabels,
storeSet: storeSet,
defaultInstantQueryMaxSourceResolution: defaultInstantQueryMaxSourceResolution,
Expand Down Expand Up @@ -138,6 +146,8 @@ func (qapi *QueryAPI) Register(r *route.Router, tracer opentracing.Tracer, logge
r.Get("/stores", instr("stores", qapi.stores))

r.Get("/rules", instr("rules", NewRulesHandler(qapi.ruleGroups, qapi.enableRulePartialResponse)))

r.Get("/metadata", instr("metadata", NewMetricMetadataHandler(qapi.metadatas, qapi.enableMetricMetadataPartialResponse)))
}

type queryData struct {
Expand Down Expand Up @@ -611,7 +621,7 @@ func (qapi *QueryAPI) labelNames(r *http.Request) (interface{}, []error, *api.Ap
return names, warnings, nil
}

func (qapi *QueryAPI) stores(r *http.Request) (interface{}, []error, *api.ApiError) {
func (qapi *QueryAPI) stores(_ *http.Request) (interface{}, []error, *api.ApiError) {
statuses := make(map[string][]query.StoreStatus)
for _, status := range qapi.storeSet.GetStoreStatus() {
statuses[status.StoreType.String()] = append(statuses[status.StoreType.String()], status)
Expand Down Expand Up @@ -771,3 +781,35 @@ func labelValuesByMatchers(sets []storage.SeriesSet, name string) ([]string, sto
sort.Strings(labelValues)
return labelValues, warnings, nil
}

func NewMetricMetadataHandler(client metadata.UnaryClient, enablePartialResponse bool) func(*http.Request) (interface{}, []error, *api.ApiError) {
ps := storepb.PartialResponseStrategy_ABORT
if enablePartialResponse {
ps = storepb.PartialResponseStrategy_WARN
}

return func(r *http.Request) (interface{}, []error, *api.ApiError) {
req := &metadatapb.MetadataRequest{
// By default we use -1, which means no limit.
Limit: -1,
Metric: r.URL.Query().Get("metric"),
PartialResponseStrategy: ps,
}

limitStr := r.URL.Query().Get("limit")
if limitStr != "" {
limit, err := strconv.Atoi(limitStr)
if err != nil {
return nil, nil, &api.ApiError{Typ: api.ErrorBadData, Err: errors.Errorf("invalid metric metadata limit='%v'", limit)}
}
req.Limit = int32(limit)
}

t, warnings, err := client.Metadata(r.Context(), req)
if err != nil {
return nil, nil, &api.ApiError{Typ: api.ErrorInternal, Err: errors.Wrap(err, "retrieving metadata")}
}

return t, warnings, nil
}
}
108 changes: 108 additions & 0 deletions pkg/metadata/metadata.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
// Copyright (c) The Thanos Authors.
// Licensed under the Apache License 2.0.

package metadata

import (
"context"

"github.com/pkg/errors"
"github.com/prometheus/prometheus/storage"
"github.com/thanos-io/thanos/pkg/metadata/metadatapb"
)

var _ UnaryClient = &GRPCClient{}

// UnaryClient is gRPC metadatapb.Metadata client which expands streaming metadata API. Useful for consumers that does not
// support streaming.
type UnaryClient interface {
Metadata(ctx context.Context, req *metadatapb.MetadataRequest) (map[string][]metadatapb.Meta, storage.Warnings, error)
}

// GRPCClient allows to retrieve metadata from local gRPC streaming server implementation.
// TODO(bwplotka): Switch to native gRPC transparent client->server adapter once available.
type GRPCClient struct {
proxy metadatapb.MetadataServer
}

func NewGRPCClient(ts metadatapb.MetadataServer) *GRPCClient {
return &GRPCClient{
proxy: ts,
}
}

func (rr *GRPCClient) Metadata(ctx context.Context, req *metadatapb.MetadataRequest) (map[string][]metadatapb.Meta, storage.Warnings, error) {
srv := &metadataServer{ctx: ctx, metric: req.Metric, limit: int(req.Limit)}

if req.Limit >= 0 {
if req.Metric != "" {
srv.metadataMap = make(map[string][]metadatapb.Meta, 1)
} else if req.Limit <= 100 {
srv.metadataMap = make(map[string][]metadatapb.Meta, req.Limit)
} else {
srv.metadataMap = make(map[string][]metadatapb.Meta)
}
} else {
srv.metadataMap = make(map[string][]metadatapb.Meta)
}

if err := rr.proxy.Metadata(req, srv); err != nil {
return nil, nil, errors.Wrap(err, "proxy Metadata")
}

return srv.metadataMap, srv.warnings, nil
}

type metadataServer struct {
// This field just exist to pseudo-implement the unused methods of the interface.
metadatapb.Metadata_MetadataServer
ctx context.Context

metric string
limit int

warnings []error
metadataMap map[string][]metadatapb.Meta
}

func (srv *metadataServer) Send(res *metadatapb.MetadataResponse) error {
if res.GetWarning() != "" {
srv.warnings = append(srv.warnings, errors.New(res.GetWarning()))
return nil
}

if res.GetMetadata() == nil {
return errors.New("no metadata")
}

// If limit is set to 0, we don't need to add anything.
if srv.limit == 0 {
return nil
}

for k, v := range res.GetMetadata().Metadata {
if metadata, ok := srv.metadataMap[k]; !ok {
// If limit is set and it is positive, we limit the size of the map.
if srv.limit < 0 || srv.limit > 0 && len(srv.metadataMap) < srv.limit {
srv.metadataMap[k] = v.Metas
}
} else {
// There shouldn't be many metadata for one single metric.
Outer:
for _, meta := range v.Metas {
for _, m := range metadata {
if meta == m {
continue Outer
}
}
srv.metadataMap[k] = append(srv.metadataMap[k], meta)
}
}
}

return nil
}

func (srv *metadataServer) Context() context.Context {
return srv.ctx
}
28 changes: 28 additions & 0 deletions pkg/metadata/metadatapb/custom.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
// Copyright (c) The Thanos Authors.
// Licensed under the Apache License 2.0.

package metadatapb

import (
"unsafe"
)

func NewMetadataResponse(metadata *MetricMetadata) *MetadataResponse {
return &MetadataResponse{
Result: &MetadataResponse_Metadata{
Metadata: metadata,
},
}
}

func NewWarningMetadataResponse(warning error) *MetadataResponse {
return &MetadataResponse{
Result: &MetadataResponse_Warning{
Warning: warning.Error(),
},
}
}

func FromMetadataMap(m map[string][]Meta) *MetricMetadata {
return &MetricMetadata{Metadata: *(*map[string]MetricMetadataEntry)(unsafe.Pointer(&m))}
}
Loading

0 comments on commit 0f791fa

Please sign in to comment.