From 31cb39475df6f76e6d5462372c0893caf04950f6 Mon Sep 17 00:00:00 2001 From: Alexander Tunik <2braven@gmail.com> Date: Fri, 31 Jul 2020 12:42:25 +0300 Subject: [PATCH] federated targets functionality (#1375) Signed-off-by: Alexander Tunik <2braven@gmail.com> --- cmd/thanos/query.go | 36 + cmd/thanos/sidecar.go | 2 + pkg/api/query/v1.go | 40 + pkg/promclient/promclient.go | 17 + pkg/query/storeset.go | 56 +- pkg/query/storeset_test.go | 8 + pkg/targets/prometheus.go | 67 ++ pkg/targets/prometheus_test.go | 137 +++ pkg/targets/proxy.go | 139 +++ pkg/targets/targets.go | 207 ++++ pkg/targets/targets_test.go | 280 ++++++ pkg/targets/targetspb/custom.go | 129 +++ pkg/targets/targetspb/rpc.pb.go | 1668 +++++++++++++++++++++++++++++++ pkg/targets/targetspb/rpc.proto | 78 ++ scripts/genproto.sh | 2 +- test/e2e/compact_test.go | 2 +- test/e2e/e2ethanos/services.go | 6 +- test/e2e/metadata_api_test.go | 1 + test/e2e/query_frontend_test.go | 4 +- test/e2e/query_test.go | 11 +- test/e2e/receive_test.go | 10 +- test/e2e/rule_test.go | 6 +- test/e2e/rules_api_test.go | 1 + test/e2e/store_gateway_test.go | 2 +- test/e2e/targets_api_test.go | 140 +++ 25 files changed, 3032 insertions(+), 17 deletions(-) create mode 100644 pkg/targets/prometheus.go create mode 100644 pkg/targets/prometheus_test.go create mode 100644 pkg/targets/proxy.go create mode 100644 pkg/targets/targets.go create mode 100644 pkg/targets/targets_test.go create mode 100644 pkg/targets/targetspb/custom.go create mode 100644 pkg/targets/targetspb/rpc.pb.go create mode 100644 pkg/targets/targetspb/rpc.proto create mode 100644 test/e2e/targets_api_test.go diff --git a/cmd/thanos/query.go b/cmd/thanos/query.go index 2da6f1085df..df1e7e601e4 100644 --- a/cmd/thanos/query.go +++ b/cmd/thanos/query.go @@ -46,6 +46,7 @@ import ( grpcserver "github.com/thanos-io/thanos/pkg/server/grpc" httpserver "github.com/thanos-io/thanos/pkg/server/http" "github.com/thanos-io/thanos/pkg/store" + "github.com/thanos-io/thanos/pkg/targets" "github.com/thanos-io/thanos/pkg/tls" "github.com/thanos-io/thanos/pkg/ui" ) @@ -103,6 +104,10 @@ func registerQuery(app *extkingpin.App) { metadataEndpoints := cmd.Flag("metadata", "Experimental: Addresses of statically configured metadata API servers (repeatable). The scheme may be prefixed with 'dns+' or 'dnssrv+' to detect metadata API servers through respective DNS lookups."). Hidden().PlaceHolder("").Strings() + // TODO(atunik): Hidden because we plan to extract discovery to separate API: https://github.com/thanos-io/thanos/issues/2600. + targetEndpoints := cmd.Flag("target", "Experimental: Addresses of statically configured target API servers (repeatable). The scheme may be prefixed with 'dns+' or 'dnssrv+' to detect target API servers through respective DNS lookups."). + Hidden().PlaceHolder("").Strings() + strictStores := cmd.Flag("store-strict", "Addresses of only statically configured store API servers that are always used, even if the health check fails. Useful if you have a caching layer on top."). PlaceHolder("").Strings() @@ -130,6 +135,9 @@ func registerQuery(app *extkingpin.App) { enableRulePartialResponse := cmd.Flag("rule.partial-response", "Enable partial response for rules endpoint. --no-rule.partial-response for disabling."). Hidden().Default("true").Bool() + enableTargetPartialResponse := cmd.Flag("target.partial-response", "Enable partial response for targets endpoint. --no-target.partial-response for disabling."). + Hidden().Default("true").Bool() + enableMetricMetadataPartialResponse := cmd.Flag("metric-metadata.partial-response", "Enable partial response for metric metadata endpoint. --no-metric-metadata.partial-response for disabling."). Hidden().Default("true").Bool() @@ -169,6 +177,10 @@ func registerQuery(app *extkingpin.App) { return errors.Wrap(err, "error while parsing config for request logging") } + if dup := firstDuplicate(*targetEndpoints); dup != "" { + return errors.Errorf("Address %s is duplicated for --target flag.", dup) + } + var fileSD *file.Discovery if len(*fileSDFiles) > 0 { conf := &file.SDConfig{ @@ -222,10 +234,12 @@ func registerQuery(app *extkingpin.App) { getFlagsMap(cmd.Flags()), *stores, *ruleEndpoints, + *targetEndpoints, *metadataEndpoints, *enableAutodownsampling, *enableQueryPartialResponse, *enableRulePartialResponse, + *enableTargetPartialResponse, *enableMetricMetadataPartialResponse, fileSD, time.Duration(*dnsSDInterval), @@ -278,10 +292,12 @@ func runQuery( flagsMap map[string]string, storeAddrs []string, ruleAddrs []string, + targetAddrs []string, metadataAddrs []string, enableAutodownsampling bool, enableQueryPartialResponse bool, enableRulePartialResponse bool, + enableTargetPartialResponse bool, enableMetricMetadataPartialResponse bool, fileSD *file.Discovery, dnsSDInterval time.Duration, @@ -323,6 +339,12 @@ func runQuery( dns.ResolverType(dnsSDResolver), ) + dnsTargetProvider := dns.NewProvider( + logger, + extprom.WrapRegistererWithPrefix("thanos_query_target_apis_", reg), + dns.ResolverType(dnsSDResolver), + ) + dnsMetadataProvider := dns.NewProvider( logger, extprom.WrapRegistererWithPrefix("thanos_query_metadata_apis_", reg), @@ -355,6 +377,13 @@ func runQuery( return specs }, + func() (specs []query.TargetSpec) { + for _, addr := range dnsTargetProvider.Addresses() { + specs = append(specs, query.NewGRPCStoreSpec(addr, false)) + } + + return specs + }, func() (specs []query.MetadataSpec) { for _, addr := range dnsMetadataProvider.Addresses() { specs = append(specs, query.NewGRPCStoreSpec(addr, false)) @@ -367,6 +396,7 @@ func runQuery( ) proxy = store.NewProxyStore(logger, reg, stores.Get, component.Query, selectorLset, storeResponseTimeout) rulesProxy = rules.NewProxy(logger, stores.GetRulesClients) + targetsProxy = targets.NewProxy(logger, stores.GetTargetsClients) metadataProxy = metadata.NewProxy(logger, stores.GetMetadataClients) queryableCreator = query.NewQueryableCreator( logger, @@ -454,6 +484,9 @@ func runQuery( if err := dnsRuleProvider.Resolve(resolveCtx, ruleAddrs); err != nil { level.Error(logger).Log("msg", "failed to resolve addresses for rulesAPIs", "err", err) } + if err := dnsTargetProvider.Resolve(ctx, targetAddrs); err != nil { + level.Error(logger).Log("msg", "failed to resolve addresses for targetsAPIs", "err", err) + } if err := dnsMetadataProvider.Resolve(resolveCtx, metadataAddrs); err != nil { level.Error(logger).Log("msg", "failed to resolve addresses for metadataAPIs", "err", err) } @@ -504,10 +537,12 @@ func runQuery( queryableCreator, // NOTE: Will share the same replica label as the query for now. rules.NewGRPCClientWithDedup(rulesProxy, queryReplicaLabels), + targets.NewGRPCClientWithDedup(targetsProxy, queryReplicaLabels), metadata.NewGRPCClient(metadataProxy), enableAutodownsampling, enableQueryPartialResponse, enableRulePartialResponse, + enableTargetPartialResponse, enableMetricMetadataPartialResponse, queryReplicaLabels, flagsMap, @@ -550,6 +585,7 @@ func runQuery( s := grpcserver.New(logger, reg, tracer, grpcLogOpts, tagOpts, comp, grpcProbe, grpcserver.WithServer(store.RegisterStoreServer(proxy)), grpcserver.WithServer(rules.RegisterRulesServer(rulesProxy)), + grpcserver.WithServer(targets.RegisterTargetsServer(targetsProxy)), grpcserver.WithServer(metadata.RegisterMetadataServer(metadataProxy)), grpcserver.WithListen(grpcBindAddr), grpcserver.WithGracePeriod(grpcGracePeriod), diff --git a/cmd/thanos/sidecar.go b/cmd/thanos/sidecar.go index dd93addc855..62f2484bde0 100644 --- a/cmd/thanos/sidecar.go +++ b/cmd/thanos/sidecar.go @@ -42,6 +42,7 @@ import ( httpserver "github.com/thanos-io/thanos/pkg/server/http" "github.com/thanos-io/thanos/pkg/shipper" "github.com/thanos-io/thanos/pkg/store" + "github.com/thanos-io/thanos/pkg/targets" "github.com/thanos-io/thanos/pkg/tls" "github.com/thanos-io/thanos/pkg/tracing" ) @@ -229,6 +230,7 @@ func runSidecar( s := grpcserver.New(logger, reg, tracer, grpcLogOpts, tagOpts, comp, grpcProbe, grpcserver.WithServer(store.RegisterStoreServer(promStore)), grpcserver.WithServer(rules.RegisterRulesServer(rules.NewPrometheus(conf.prometheus.url, c, m.Labels))), + grpcserver.WithServer(targets.RegisterTargetsServer(targets.NewPrometheus(conf.prometheus.url, c, m.Labels))), grpcserver.WithServer(meta.RegisterMetadataServer(meta.NewPrometheus(conf.prometheus.url, c))), grpcserver.WithListen(conf.grpc.bindAddress), grpcserver.WithGracePeriod(time.Duration(conf.grpc.gracePeriod)), diff --git a/pkg/api/query/v1.go b/pkg/api/query/v1.go index 00671eb85f3..8e93748b352 100644 --- a/pkg/api/query/v1.go +++ b/pkg/api/query/v1.go @@ -50,6 +50,8 @@ import ( "github.com/thanos-io/thanos/pkg/rules/rulespb" "github.com/thanos-io/thanos/pkg/runutil" "github.com/thanos-io/thanos/pkg/store/storepb" + "github.com/thanos-io/thanos/pkg/targets" + "github.com/thanos-io/thanos/pkg/targets/targetspb" "github.com/thanos-io/thanos/pkg/tracing" ) @@ -72,11 +74,13 @@ type QueryAPI struct { // queryEngine returns appropriate promql.Engine for a query with a given step. queryEngine func(int64) *promql.Engine ruleGroups rules.UnaryClient + targets targets.UnaryClient metadatas metadata.UnaryClient enableAutodownsampling bool enableQueryPartialResponse bool enableRulePartialResponse bool + enableTargetPartialResponse bool enableMetricMetadataPartialResponse bool disableCORS bool @@ -95,10 +99,12 @@ func NewQueryAPI( qe func(int64) *promql.Engine, c query.QueryableCreator, ruleGroups rules.UnaryClient, + targets targets.UnaryClient, metadatas metadata.UnaryClient, enableAutodownsampling bool, enableQueryPartialResponse bool, enableRulePartialResponse bool, + enableTargetPartialResponse bool, enableMetricMetadataPartialResponse bool, replicaLabels []string, flagsMap map[string]string, @@ -115,11 +121,13 @@ func NewQueryAPI( queryableCreate: c, gate: gate, ruleGroups: ruleGroups, + targets: targets, metadatas: metadatas, enableAutodownsampling: enableAutodownsampling, enableQueryPartialResponse: enableQueryPartialResponse, enableRulePartialResponse: enableRulePartialResponse, + enableTargetPartialResponse: enableTargetPartialResponse, enableMetricMetadataPartialResponse: enableMetricMetadataPartialResponse, replicaLabels: replicaLabels, storeSet: storeSet, @@ -154,6 +162,8 @@ func (qapi *QueryAPI) Register(r *route.Router, tracer opentracing.Tracer, logge r.Get("/rules", instr("rules", NewRulesHandler(qapi.ruleGroups, qapi.enableRulePartialResponse))) + r.Get("/targets", instr("targets", NewTargetsHandler(qapi.targets, qapi.enableTargetPartialResponse))) + r.Get("/metadata", instr("metadata", NewMetricMetadataHandler(qapi.metadatas, qapi.enableMetricMetadataPartialResponse))) } @@ -652,6 +662,36 @@ func (qapi *QueryAPI) stores(_ *http.Request) (interface{}, []error, *api.ApiErr return statuses, nil, nil } +func NewTargetsHandler(client targets.UnaryClient, enablePartialResponse bool) func(*http.Request) (interface{}, []error, *api.ApiError) { + ps := storepb.PartialResponseStrategy_ABORT + if enablePartialResponse { + ps = storepb.PartialResponseStrategy_WARN + } + + return func(r *http.Request) (interface{}, []error, *api.ApiError) { + stateParam := r.URL.Query().Get("state") + state, ok := targetspb.TargetsRequest_State_value[strings.ToUpper(stateParam)] + if !ok { + if stateParam != "" { + return nil, nil, &api.ApiError{Typ: api.ErrorBadData, Err: errors.Errorf("invalid targets parameter state='%v'", stateParam)} + } + state = int32(targetspb.TargetsRequest_ANY) + } + + req := &targetspb.TargetsRequest{ + State: targetspb.TargetsRequest_State(state), + PartialResponseStrategy: ps, + } + + t, warnings, err := client.Targets(r.Context(), req) + if err != nil { + return nil, nil, &api.ApiError{Typ: api.ErrorInternal, Err: errors.Wrap(err, "retrieving targets")} + } + + return t, warnings, nil + } +} + // NewRulesHandler created handler compatible with HTTP /api/v1/rules https://prometheus.io/docs/prometheus/latest/querying/api/#rules // which uses gRPC Unary Rules API. func NewRulesHandler(client rules.UnaryClient, enablePartialResponse bool) func(*http.Request) (interface{}, []error, *api.ApiError) { diff --git a/pkg/promclient/promclient.go b/pkg/promclient/promclient.go index 58fb296a039..313a6f7b7d7 100644 --- a/pkg/promclient/promclient.go +++ b/pkg/promclient/promclient.go @@ -35,6 +35,7 @@ import ( "github.com/thanos-io/thanos/pkg/rules/rulespb" "github.com/thanos-io/thanos/pkg/runutil" "github.com/thanos-io/thanos/pkg/store/storepb" + "github.com/thanos-io/thanos/pkg/targets/targetspb" "github.com/thanos-io/thanos/pkg/tracing" "google.golang.org/grpc/codes" yaml "gopkg.in/yaml.v2" @@ -758,3 +759,19 @@ func (c *Client) MetadataInGRPC(ctx context.Context, base *url.URL, metric strin } return v.Data, c.get2xxResultWithGRPCErrors(ctx, "/metadata HTTP[client]", &u, &v) } + +func (c *Client) TargetsInGRPC(ctx context.Context, base *url.URL, stateTargets string) (*targetspb.TargetDiscovery, error) { + u := *base + u.Path = path.Join(u.Path, "/api/v1/targets") + + if stateTargets != "" { + q := u.Query() + q.Add("state", stateTargets) + u.RawQuery = q.Encode() + } + + var v struct { + Data *targetspb.TargetDiscovery `json:"data"` + } + return v.Data, c.get2xxResultWithGRPCErrors(ctx, "/targets HTTP[client]", &u, &v) +} diff --git a/pkg/query/storeset.go b/pkg/query/storeset.go index b014f683e3a..5a5d5cb6d45 100644 --- a/pkg/query/storeset.go +++ b/pkg/query/storeset.go @@ -25,6 +25,7 @@ import ( "github.com/thanos-io/thanos/pkg/store" "github.com/thanos-io/thanos/pkg/store/labelpb" "github.com/thanos-io/thanos/pkg/store/storepb" + "github.com/thanos-io/thanos/pkg/targets/targetspb" ) const ( @@ -50,6 +51,11 @@ type RuleSpec interface { Addr() string } +type TargetSpec interface { + // Addr returns TargetsAPI Address for the targets spec. It is used as its ID. + Addr() string +} + type MetadataSpec interface { // Addr returns MetadataAPI Address for the metadata spec. It is used as its ID. Addr() string @@ -187,6 +193,7 @@ type StoreSet struct { // accessible and we close gRPC client for it. storeSpecs func() []StoreSpec ruleSpecs func() []RuleSpec + targetSpecs func() []TargetSpec metadataSpecs func() []MetadataSpec dialOpts []grpc.DialOption gRPCInfoCallTimeout time.Duration @@ -210,6 +217,7 @@ func NewStoreSet( reg *prometheus.Registry, storeSpecs func() []StoreSpec, ruleSpecs func() []RuleSpec, + targetSpecs func() []TargetSpec, metadataSpecs func() []MetadataSpec, dialOpts []grpc.DialOption, unhealthyStoreTimeout time.Duration, @@ -228,6 +236,9 @@ func NewStoreSet( if ruleSpecs == nil { ruleSpecs = func() []RuleSpec { return nil } } + if targetSpecs == nil { + targetSpecs = func() []TargetSpec { return nil } + } if metadataSpecs == nil { metadataSpecs = func() []MetadataSpec { return nil } } @@ -236,6 +247,7 @@ func NewStoreSet( logger: log.With(logger, "component", "storeset"), storeSpecs: storeSpecs, ruleSpecs: ruleSpecs, + targetSpecs: targetSpecs, metadataSpecs: metadataSpecs, dialOpts: dialOpts, storesMetric: storesMetric, @@ -258,6 +270,9 @@ type storeRef struct { rule rulespb.RulesClient metadata metadatapb.MetadataClient + // If target is not nil, then this store also supports targets API. + target targetspb.TargetsClient + // Meta (can change during runtime). labelSets []labels.Labels storeType component.StoreAPI @@ -267,7 +282,7 @@ type storeRef struct { logger log.Logger } -func (s *storeRef) Update(labelSets []labels.Labels, minTime int64, maxTime int64, storeType component.StoreAPI, rule rulespb.RulesClient, metadata metadatapb.MetadataClient) { +func (s *storeRef) Update(labelSets []labels.Labels, minTime int64, maxTime int64, storeType component.StoreAPI, rule rulespb.RulesClient, target targetspb.TargetsClient, metadata metadatapb.MetadataClient) { s.mtx.Lock() defer s.mtx.Unlock() @@ -276,6 +291,7 @@ func (s *storeRef) Update(labelSets []labels.Labels, minTime int64, maxTime int6 s.minTime = minTime s.maxTime = maxTime s.rule = rule + s.target = target s.metadata = metadata } @@ -293,6 +309,13 @@ func (s *storeRef) HasRulesAPI() bool { return s.rule != nil } +func (s *storeRef) HasTargetsAPI() bool { + s.mtx.RLock() + defer s.mtx.RUnlock() + + return s.target != nil +} + func (s *storeRef) HasMetadataAPI() bool { s.mtx.RLock() defer s.mtx.RUnlock() @@ -405,6 +428,10 @@ func (s *StoreSet) Update(ctx context.Context) { level.Info(s.logger).Log("msg", "adding new rulesAPI to query storeset", "address", addr) } + if st.HasTargetsAPI() { + level.Info(s.logger).Log("msg", "adding new targetsAPI to query storeset", "address", addr) + } + level.Info(s.logger).Log("msg", "adding new storeAPI to query storeset", "address", addr, "extLset", extLset) } @@ -425,6 +452,7 @@ func (s *StoreSet) getActiveStores(ctx context.Context, stores map[string]*store storeAddrSet = make(map[string]struct{}) ruleAddrSet = make(map[string]struct{}) + targetAddrSet = make(map[string]struct{}) metadataAddrSet = make(map[string]struct{}) ) @@ -433,6 +461,11 @@ func (s *StoreSet) getActiveStores(ctx context.Context, stores map[string]*store ruleAddrSet[ruleSpec.Addr()] = struct{}{} } + // Gather active targets map concurrently. Add a new target if it does not exist already. + for _, targetSpec := range s.targetSpecs() { + targetAddrSet[targetSpec.Addr()] = struct{}{} + } + // Gather active stores map concurrently. Build new store if does not exist already. for _, metadataSpec := range s.metadataSpecs() { metadataAddrSet[metadataSpec.Addr()] = struct{}{} @@ -473,6 +506,11 @@ func (s *StoreSet) getActiveStores(ctx context.Context, stores map[string]*store rule = rulespb.NewRulesClient(st.cc) } + var target targetspb.TargetsClient + if _, ok := targetAddrSet[addr]; ok { + target = targetspb.NewTargetsClient(st.cc) + } + var metadata metadatapb.MetadataClient if _, ok := metadataAddrSet[addr]; ok { metadata = metadatapb.NewMetadataClient(st.cc) @@ -502,7 +540,7 @@ func (s *StoreSet) getActiveStores(ctx context.Context, stores map[string]*store } s.updateStoreStatus(st, nil) - st.Update(labelSets, minTime, maxTime, storeType, rule, metadata) + st.Update(labelSets, minTime, maxTime, storeType, rule, target, metadata) mtx.Lock() defer mtx.Unlock() @@ -586,6 +624,20 @@ func (s *StoreSet) GetRulesClients() []rulespb.RulesClient { return rules } +// GetTargetsClients returns a list of all active targets clients. +func (s *StoreSet) GetTargetsClients() []targetspb.TargetsClient { + s.storesMtx.RLock() + defer s.storesMtx.RUnlock() + + targets := make([]targetspb.TargetsClient, 0, len(s.stores)) + for _, st := range s.stores { + if st.HasTargetsAPI() { + targets = append(targets, st.target) + } + } + return targets +} + // GetMetadataClients returns a list of all active metadata clients. func (s *StoreSet) GetMetadataClients() []metadatapb.MetadataClient { s.storesMtx.RLock() diff --git a/pkg/query/storeset_test.go b/pkg/query/storeset_test.go index 7de48faafbf..3f416a75fe3 100644 --- a/pkg/query/storeset_test.go +++ b/pkg/query/storeset_test.go @@ -196,6 +196,9 @@ func TestStoreSet_Update(t *testing.T) { func() (specs []RuleSpec) { return nil }, + func() (specs []TargetSpec) { + return nil + }, func() (specs []MetadataSpec) { return nil }, @@ -548,6 +551,7 @@ func TestStoreSet_Update_NoneAvailable(t *testing.T) { return specs }, func() (specs []RuleSpec) { return nil }, + func() (specs []TargetSpec) { return nil }, func() (specs []MetadataSpec) { return nil }, testGRPCOpts, time.Minute) storeSet.gRPCInfoCallTimeout = 2 * time.Second @@ -633,6 +637,8 @@ func TestQuerierStrict(t *testing.T) { } }, func() []RuleSpec { return nil + }, func() []TargetSpec { + return nil }, func() (specs []MetadataSpec) { return nil }, testGRPCOpts, time.Minute) @@ -764,6 +770,7 @@ func TestStoreSet_Update_Rules(t *testing.T) { storeSet := NewStoreSet(nil, nil, tc.storeSpecs, tc.ruleSpecs, + func() []TargetSpec { return nil }, func() []MetadataSpec { return nil }, testGRPCOpts, time.Minute) @@ -937,6 +944,7 @@ func TestStoreSet_Rules_Discovery(t *testing.T) { return tc.states[currentState].ruleSpecs() }, + func() []TargetSpec { return nil }, func() []MetadataSpec { return nil }, diff --git a/pkg/targets/prometheus.go b/pkg/targets/prometheus.go new file mode 100644 index 00000000000..042dcaa636a --- /dev/null +++ b/pkg/targets/prometheus.go @@ -0,0 +1,67 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package targets + +import ( + "net/url" + "strings" + + "github.com/prometheus/prometheus/pkg/labels" + "github.com/thanos-io/thanos/pkg/promclient" + "github.com/thanos-io/thanos/pkg/store/labelpb" + "github.com/thanos-io/thanos/pkg/targets/targetspb" +) + +// Prometheus implements targetspb.Targets gRPC that allows to fetch targets from Prometheus HTTP api/v1/targets endpoint. +type Prometheus struct { + base *url.URL + client *promclient.Client + + extLabels func() labels.Labels +} + +// NewPrometheus creates new targets.Prometheus. +func NewPrometheus(base *url.URL, client *promclient.Client, extLabels func() labels.Labels) *Prometheus { + return &Prometheus{ + base: base, + client: client, + extLabels: extLabels, + } +} + +// Targets returns all specified targets from Prometheus. +func (p *Prometheus) Targets(r *targetspb.TargetsRequest, s targetspb.Targets_TargetsServer) error { + var stateTargets string + if r.State != targetspb.TargetsRequest_ANY { + stateTargets = strings.ToLower(r.State.String()) + } + targets, err := p.client.TargetsInGRPC(s.Context(), p.base, stateTargets) + if err != nil { + return err + } + + // Prometheus does not add external labels, so we need to add on our own. + enrichTargetsWithExtLabels(targets, p.extLabels()) + + if err := s.Send(&targetspb.TargetsResponse{Result: &targetspb.TargetsResponse_Targets{Targets: targets}}); err != nil { + return err + } + + return nil +} + +func enrichTargetsWithExtLabels(targets *targetspb.TargetDiscovery, extLset labels.Labels) { + for i, target := range targets.ActiveTargets { + target.SetDiscoveredLabels(labelpb.ExtendSortedLabels(target.DiscoveredLabels.PromLabels(), extLset)) + target.SetLabels(labelpb.ExtendSortedLabels(target.Labels.PromLabels(), extLset)) + + targets.ActiveTargets[i] = target + } + + for i, target := range targets.DroppedTargets { + target.SetDiscoveredLabels(labelpb.ExtendSortedLabels(target.DiscoveredLabels.PromLabels(), extLset)) + + targets.DroppedTargets[i] = target + } +} diff --git a/pkg/targets/prometheus_test.go b/pkg/targets/prometheus_test.go new file mode 100644 index 00000000000..b98ec7d1030 --- /dev/null +++ b/pkg/targets/prometheus_test.go @@ -0,0 +1,137 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package targets + +import ( + "context" + "fmt" + "net/url" + "testing" + "time" + + "github.com/gogo/protobuf/proto" + "github.com/prometheus/prometheus/pkg/labels" + "github.com/prometheus/prometheus/storage" + "github.com/thanos-io/thanos/pkg/promclient" + "github.com/thanos-io/thanos/pkg/store/labelpb" + "github.com/thanos-io/thanos/pkg/targets/targetspb" + "github.com/thanos-io/thanos/pkg/testutil" + "github.com/thanos-io/thanos/pkg/testutil/e2eutil" +) + +func TestPrometheus_Targets_e2e(t *testing.T) { + p, err := e2eutil.NewPrometheus() + testutil.Ok(t, err) + defer func() { testutil.Ok(t, p.Stop()) }() + + testutil.Ok(t, p.SetConfig(` +global: + external_labels: + region: eu-west + +scrape_configs: +- job_name: 'myself' + # Quick scrapes for test purposes. + scrape_interval: 1s + scrape_timeout: 1s + static_configs: + - targets: ['localhost:9090','localhost:80'] + relabel_configs: + - source_labels: ['__address__'] + regex: '^.+:80$' + action: drop +`)) + testutil.Ok(t, p.Start()) + + // For some reason it's better to wait much more than a few scrape intervals. + time.Sleep(5 * time.Second) + + u, err := url.Parse(fmt.Sprintf("http://%s", p.Addr())) + testutil.Ok(t, err) + + promTargets := NewPrometheus(u, promclient.NewDefaultClient(), func() labels.Labels { + return labels.FromStrings("replica", "test1") + }) + + expected := &targetspb.TargetDiscovery{ + ActiveTargets: []*targetspb.ActiveTarget{ + { + DiscoveredLabels: labelpb.ZLabelSet{Labels: []labelpb.ZLabel{ + {Name: "__address__", Value: "localhost:9090"}, + {Name: "__metrics_path__", Value: "/metrics"}, + {Name: "__scheme__", Value: "http"}, + {Name: "job", Value: "myself"}, + {Name: "replica", Value: "test1"}, + }}, + Labels: labelpb.ZLabelSet{Labels: []labelpb.ZLabel{ + {Name: "instance", Value: "localhost:9090"}, + {Name: "job", Value: "myself"}, + {Name: "replica", Value: "test1"}, + }}, + ScrapePool: "myself", + ScrapeUrl: "http://localhost:9090/metrics", + Health: targetspb.TargetHealth_DOWN, + LastScrape: time.Time{}, + LastScrapeDuration: 0, + }, + }, + DroppedTargets: []*targetspb.DroppedTarget{ + { + DiscoveredLabels: labelpb.ZLabelSet{Labels: []labelpb.ZLabel{ + {Name: "__address__", Value: "localhost:80"}, + {Name: "__metrics_path__", Value: "/metrics"}, + {Name: "__scheme__", Value: "http"}, + {Name: "job", Value: "myself"}, + {Name: "replica", Value: "test1"}, + }}, + }, + }, + } + + for _, tcase := range []struct { + requestedState targetspb.TargetsRequest_State + expectedErr error + }{ + { + requestedState: targetspb.TargetsRequest_ANY, + }, + { + requestedState: targetspb.TargetsRequest_ACTIVE, + }, + { + requestedState: targetspb.TargetsRequest_DROPPED, + }, + } { + t.Run(tcase.requestedState.String(), func(t *testing.T) { + targets, w, err := NewGRPCClientWithDedup(promTargets, nil).Targets(context.Background(), &targetspb.TargetsRequest{ + State: tcase.requestedState, + }) + testutil.Equals(t, storage.Warnings(nil), w) + if tcase.expectedErr != nil { + testutil.NotOk(t, err) + testutil.Equals(t, tcase.expectedErr.Error(), err.Error()) + return + } + testutil.Ok(t, err) + + expectedTargets := proto.Clone(expected).(*targetspb.TargetDiscovery) + + switch tcase.requestedState { + case targetspb.TargetsRequest_ACTIVE: + expectedTargets.DroppedTargets = expectedTargets.DroppedTargets[:0] + case targetspb.TargetsRequest_DROPPED: + expectedTargets.ActiveTargets = expectedTargets.ActiveTargets[:0] + } + + for i := range targets.ActiveTargets { + targets.ActiveTargets[i].LastScrapeDuration = 0 + targets.ActiveTargets[i].LastScrape = time.Time{} + targets.ActiveTargets[i].LastError = "" + } + + testutil.Equals(t, expectedTargets.ActiveTargets, targets.ActiveTargets) + testutil.Equals(t, expectedTargets.DroppedTargets, targets.DroppedTargets) + }) + } +} diff --git a/pkg/targets/proxy.go b/pkg/targets/proxy.go new file mode 100644 index 00000000000..d92ffa6c026 --- /dev/null +++ b/pkg/targets/proxy.go @@ -0,0 +1,139 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package targets + +import ( + "context" + "io" + + "github.com/go-kit/kit/log" + "github.com/go-kit/kit/log/level" + "github.com/pkg/errors" + "golang.org/x/sync/errgroup" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + + "github.com/thanos-io/thanos/pkg/store/storepb" + "github.com/thanos-io/thanos/pkg/targets/targetspb" +) + +// Proxy implements targetspb.Targets gRPC that fans out requests to given targetspb.Targets. +type Proxy struct { + logger log.Logger + targets func() []targetspb.TargetsClient +} + +func RegisterTargetsServer(targetsSrv targetspb.TargetsServer) func(*grpc.Server) { + return func(s *grpc.Server) { + targetspb.RegisterTargetsServer(s, targetsSrv) + } +} + +// NewProxy returns new targets.Proxy. +func NewProxy(logger log.Logger, targets func() []targetspb.TargetsClient) *Proxy { + return &Proxy{ + logger: logger, + targets: targets, + } +} + +func (s *Proxy) Targets(req *targetspb.TargetsRequest, srv targetspb.Targets_TargetsServer) error { + var ( + g, gctx = errgroup.WithContext(srv.Context()) + respChan = make(chan *targetspb.TargetDiscovery, 10) + targets []*targetspb.TargetDiscovery + ) + + for _, targetsClient := range s.targets() { + rs := &targetsStream{ + client: targetsClient, + request: req, + channel: respChan, + server: srv, + } + g.Go(func() error { return rs.receive(gctx) }) + } + + go func() { + _ = g.Wait() + close(respChan) + }() + + for resp := range respChan { + // TODO: Stream it + targets = append(targets, resp) + } + + if err := g.Wait(); err != nil { + level.Error(s.logger).Log("err", err) + return err + } + + for _, t := range targets { + if err := srv.Send(targetspb.NewTargetsResponse(t)); err != nil { + return status.Error(codes.Unknown, errors.Wrap(err, "send targets response").Error()) + } + } + + return nil +} + +type targetsStream struct { + client targetspb.TargetsClient + request *targetspb.TargetsRequest + channel chan<- *targetspb.TargetDiscovery + server targetspb.Targets_TargetsServer +} + +func (stream *targetsStream) receive(ctx context.Context) error { + targets, err := stream.client.Targets(ctx, stream.request) + if err != nil { + err = errors.Wrapf(err, "fetching targets from targets client %v", stream.client) + + if stream.request.PartialResponseStrategy == storepb.PartialResponseStrategy_ABORT { + return err + } + + if serr := stream.server.Send(targetspb.NewWarningTargetsResponse(err)); serr != nil { + return serr + } + // Not an error if response strategy is warning. + return nil + } + + for { + target, err := targets.Recv() + if err == io.EOF { + return nil + } + + if err != nil { + err = errors.Wrapf(err, "receiving targets from targets client %v", stream.client) + + if stream.request.PartialResponseStrategy == storepb.PartialResponseStrategy_ABORT { + return err + } + + if err := stream.server.Send(targetspb.NewWarningTargetsResponse(err)); err != nil { + return errors.Wrapf(err, "sending targets error to server %v", stream.server) + } + + continue + } + + if w := target.GetWarning(); w != "" { + if err := stream.server.Send(targetspb.NewWarningTargetsResponse(errors.New(w))); err != nil { + return errors.Wrapf(err, "sending targets warning to server %v", stream.server) + } + continue + } + + select { + case stream.channel <- target.GetTargets(): + case <-ctx.Done(): + return ctx.Err() + } + } +} diff --git a/pkg/targets/targets.go b/pkg/targets/targets.go new file mode 100644 index 00000000000..c62b74a7f36 --- /dev/null +++ b/pkg/targets/targets.go @@ -0,0 +1,207 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package targets + +import ( + "context" + "sort" + + "github.com/pkg/errors" + "github.com/prometheus/prometheus/storage" + "github.com/thanos-io/thanos/pkg/store/storepb" + "github.com/thanos-io/thanos/pkg/targets/targetspb" +) + +var _ UnaryClient = &GRPCClient{} + +// UnaryClient is gRPC targetspb.Targets client which expands streaming targets API. Useful for consumers that does not +// support streaming. +type UnaryClient interface { + Targets(ctx context.Context, req *targetspb.TargetsRequest) (*targetspb.TargetDiscovery, storage.Warnings, error) +} + +// GRPCClient allows to retrieve targets from local gRPC streaming server implementation. +// TODO(bwplotka): Switch to native gRPC transparent client->server adapter once available. +type GRPCClient struct { + proxy targetspb.TargetsServer + + replicaLabels map[string]struct{} +} + +func NewGRPCClient(ts targetspb.TargetsServer) *GRPCClient { + return NewGRPCClientWithDedup(ts, nil) +} + +func NewGRPCClientWithDedup(ts targetspb.TargetsServer, replicaLabels []string) *GRPCClient { + c := &GRPCClient{ + proxy: ts, + replicaLabels: map[string]struct{}{}, + } + + for _, label := range replicaLabels { + c.replicaLabels[label] = struct{}{} + } + return c +} + +func (rr *GRPCClient) Targets(ctx context.Context, req *targetspb.TargetsRequest) (*targetspb.TargetDiscovery, storage.Warnings, error) { + resp := &targetsServer{ctx: ctx, targets: &targetspb.TargetDiscovery{ + ActiveTargets: make([]*targetspb.ActiveTarget, 0), + DroppedTargets: make([]*targetspb.DroppedTarget, 0), + }} + + if err := rr.proxy.Targets(req, resp); err != nil { + return nil, nil, errors.Wrap(err, "proxy Targets") + } + + resp.targets = dedupTargets(resp.targets, rr.replicaLabels) + + return resp.targets, resp.warnings, nil +} + +// dedupTargets re-sorts the set so that the same target with different replica +// labels are coming right after each other. +func dedupTargets(targets *targetspb.TargetDiscovery, replicaLabels map[string]struct{}) *targetspb.TargetDiscovery { + if targets == nil { + return nil + } + + targets.ActiveTargets = dedupActiveTargets(targets.ActiveTargets, replicaLabels) + targets.DroppedTargets = dedupDroppedTargets(targets.DroppedTargets, replicaLabels) + + return targets +} + +func dedupDroppedTargets(droppedTargets []*targetspb.DroppedTarget, replicaLabels map[string]struct{}) []*targetspb.DroppedTarget { + if len(droppedTargets) == 0 { + return droppedTargets + } + + // Sort each target's label names such that they are comparable. + for _, t := range droppedTargets { + sort.Slice(t.DiscoveredLabels.Labels, func(i, j int) bool { + return t.DiscoveredLabels.Labels[i].Name < t.DiscoveredLabels.Labels[j].Name + }) + } + + // Sort targets globally based on synthesized deduplication labels, also considering replica labels and their values. + sort.Slice(droppedTargets, func(i, j int) bool { + return droppedTargets[i].Compare(droppedTargets[j]) < 0 + }) + + // Remove targets based on synthesized deduplication labels, this time ignoring replica labels + i := 0 + droppedTargets[i].DiscoveredLabels.Labels = removeReplicaLabels( + droppedTargets[i].DiscoveredLabels.Labels, + replicaLabels, + ) + for j := 1; j < len(droppedTargets); j++ { + droppedTargets[j].DiscoveredLabels.Labels = removeReplicaLabels( + droppedTargets[j].DiscoveredLabels.Labels, + replicaLabels, + ) + if droppedTargets[i].Compare(droppedTargets[j]) != 0 { + // Effectively retain targets[j] in the resulting slice. + i++ + droppedTargets[i] = droppedTargets[j] + continue + } + } + + return droppedTargets[:i+1] +} + +func dedupActiveTargets(activeTargets []*targetspb.ActiveTarget, replicaLabels map[string]struct{}) []*targetspb.ActiveTarget { + if len(activeTargets) == 0 { + return activeTargets + } + + // Sort each target's label names such that they are comparable. + for _, t := range activeTargets { + sort.Slice(t.DiscoveredLabels.Labels, func(i, j int) bool { + return t.DiscoveredLabels.Labels[i].Name < t.DiscoveredLabels.Labels[j].Name + }) + } + + // Sort targets globally based on synthesized deduplication labels, also considering replica labels and their values. + sort.Slice(activeTargets, func(i, j int) bool { + return activeTargets[i].Compare(activeTargets[j]) < 0 + }) + + // Remove targets based on synthesized deduplication labels, this time ignoring replica labels and last scrape. + i := 0 + activeTargets[i].DiscoveredLabels.Labels = removeReplicaLabels( + activeTargets[i].DiscoveredLabels.Labels, + replicaLabels, + ) + activeTargets[i].Labels.Labels = removeReplicaLabels( + activeTargets[i].Labels.Labels, + replicaLabels, + ) + for j := 1; j < len(activeTargets); j++ { + activeTargets[j].DiscoveredLabels.Labels = removeReplicaLabels( + activeTargets[j].DiscoveredLabels.Labels, + replicaLabels, + ) + activeTargets[j].Labels.Labels = removeReplicaLabels( + activeTargets[j].Labels.Labels, + replicaLabels, + ) + + if activeTargets[i].Compare(activeTargets[j]) != 0 { + // Effectively retain targets[j] in the resulting slice. + i++ + activeTargets[i] = activeTargets[j] + continue + } + + if activeTargets[i].CompareState(activeTargets[j]) <= 0 { + continue + } + + // Swap if we found a younger target. + activeTargets[i] = activeTargets[j] + } + + return activeTargets[:i+1] +} + +func removeReplicaLabels(labels []storepb.Label, replicaLabels map[string]struct{}) []storepb.Label { + newLabels := make([]storepb.Label, 0, len(labels)) + for _, l := range labels { + if _, ok := replicaLabels[l.Name]; !ok { + newLabels = append(newLabels, l) + } + } + + return newLabels +} + +type targetsServer struct { + // This field just exist to pseudo-implement the unused methods of the interface. + targetspb.Targets_TargetsServer + ctx context.Context + + warnings []error + targets *targetspb.TargetDiscovery +} + +func (srv *targetsServer) Send(res *targetspb.TargetsResponse) error { + if res.GetWarning() != "" { + srv.warnings = append(srv.warnings, errors.New(res.GetWarning())) + return nil + } + + if res.GetTargets() == nil { + return errors.New("no targets") + } + srv.targets.ActiveTargets = append(srv.targets.ActiveTargets, res.GetTargets().ActiveTargets...) + srv.targets.DroppedTargets = append(srv.targets.DroppedTargets, res.GetTargets().DroppedTargets...) + + return nil +} + +func (srv *targetsServer) Context() context.Context { + return srv.ctx +} diff --git a/pkg/targets/targets_test.go b/pkg/targets/targets_test.go new file mode 100644 index 00000000000..f6be94a0840 --- /dev/null +++ b/pkg/targets/targets_test.go @@ -0,0 +1,280 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package targets + +import ( + "testing" + "time" + + "github.com/thanos-io/thanos/pkg/store/labelpb" + "github.com/thanos-io/thanos/pkg/targets/targetspb" + "github.com/thanos-io/thanos/pkg/testutil" +) + +func TestDedupTargets(t *testing.T) { + for _, tc := range []struct { + name string + targets, want *targetspb.TargetDiscovery + replicaLabels []string + }{ + { + name: "nil slice", + targets: nil, + want: nil, + }, + { + name: "dropped", + replicaLabels: []string{"replica"}, + targets: &targetspb.TargetDiscovery{ + DroppedTargets: []*targetspb.DroppedTarget{ + { + DiscoveredLabels: labelpb.ZLabelSet{Labels: []labelpb.ZLabel{ + {Name: "__address__", Value: "localhost:80"}, + {Name: "__metrics_path__", Value: "/metrics"}, + {Name: "__scheme__", Value: "http"}, + {Name: "job", Value: "myself"}, + {Name: "prometheus", Value: "ha"}, + {Name: "replica", Value: "0"}, + }}, + }, + { + DiscoveredLabels: labelpb.ZLabelSet{Labels: []labelpb.ZLabel{ + {Name: "__address__", Value: "localhost:80"}, + {Name: "__metrics_path__", Value: "/metrics"}, + {Name: "__scheme__", Value: "http"}, + {Name: "job", Value: "myself"}, + {Name: "prometheus", Value: "ha"}, + {Name: "replica", Value: "1"}, + }}, + }, + }, + }, + want: &targetspb.TargetDiscovery{ + DroppedTargets: []*targetspb.DroppedTarget{ + { + DiscoveredLabels: labelpb.ZLabelSet{Labels: []labelpb.ZLabel{ + {Name: "__address__", Value: "localhost:80"}, + {Name: "__metrics_path__", Value: "/metrics"}, + {Name: "__scheme__", Value: "http"}, + {Name: "job", Value: "myself"}, + {Name: "prometheus", Value: "ha"}, + }}, + }, + }, + }, + }, + { + name: "active simple", + replicaLabels: []string{"replica"}, + targets: &targetspb.TargetDiscovery{ + ActiveTargets: []*targetspb.ActiveTarget{ + { + DiscoveredLabels: labelpb.ZLabelSet{Labels: []labelpb.ZLabel{ + {Name: "__address__", Value: "localhost:9090"}, + {Name: "__metrics_path__", Value: "/metrics"}, + {Name: "__scheme__", Value: "http"}, + {Name: "job", Value: "myself"}, + {Name: "prometheus", Value: "ha"}, + {Name: "replica", Value: "0"}, + }}, + Labels: labelpb.ZLabelSet{Labels: []labelpb.ZLabel{ + {Name: "instance", Value: "localhost:9090"}, + {Name: "job", Value: "myself"}, + {Name: "prometheus", Value: "ha"}, + {Name: "replica", Value: "0"}, + }}, + ScrapePool: "myself", + ScrapeUrl: "http://localhost:9090/metrics", + Health: targetspb.TargetHealth_UP, + }, + { + DiscoveredLabels: labelpb.ZLabelSet{Labels: []labelpb.ZLabel{ + {Name: "__address__", Value: "localhost:9090"}, + {Name: "__metrics_path__", Value: "/metrics"}, + {Name: "__scheme__", Value: "http"}, + {Name: "job", Value: "myself"}, + {Name: "prometheus", Value: "ha"}, + {Name: "replica", Value: "1"}, + }}, + Labels: labelpb.ZLabelSet{Labels: []labelpb.ZLabel{ + {Name: "instance", Value: "localhost:9090"}, + {Name: "job", Value: "myself"}, + {Name: "prometheus", Value: "ha"}, + {Name: "replica", Value: "1"}, + }}, + ScrapePool: "myself", + ScrapeUrl: "http://localhost:9090/metrics", + Health: targetspb.TargetHealth_UP, + }, + }, + }, + want: &targetspb.TargetDiscovery{ + ActiveTargets: []*targetspb.ActiveTarget{ + { + DiscoveredLabels: labelpb.ZLabelSet{Labels: []labelpb.ZLabel{ + {Name: "__address__", Value: "localhost:9090"}, + {Name: "__metrics_path__", Value: "/metrics"}, + {Name: "__scheme__", Value: "http"}, + {Name: "job", Value: "myself"}, + {Name: "prometheus", Value: "ha"}, + }}, + Labels: labelpb.ZLabelSet{Labels: []labelpb.ZLabel{ + {Name: "instance", Value: "localhost:9090"}, + {Name: "job", Value: "myself"}, + {Name: "prometheus", Value: "ha"}, + }}, + ScrapePool: "myself", + ScrapeUrl: "http://localhost:9090/metrics", + Health: targetspb.TargetHealth_UP, + }, + }, + }, + }, + { + name: "active unhealth first", + replicaLabels: []string{"replica"}, + targets: &targetspb.TargetDiscovery{ + ActiveTargets: []*targetspb.ActiveTarget{ + { + DiscoveredLabels: labelpb.ZLabelSet{Labels: []labelpb.ZLabel{ + {Name: "__address__", Value: "localhost:9090"}, + {Name: "__metrics_path__", Value: "/metrics"}, + {Name: "__scheme__", Value: "http"}, + {Name: "job", Value: "myself"}, + {Name: "prometheus", Value: "ha"}, + {Name: "replica", Value: "0"}, + }}, + Labels: labelpb.ZLabelSet{Labels: []labelpb.ZLabel{ + {Name: "instance", Value: "localhost:9090"}, + {Name: "job", Value: "myself"}, + {Name: "prometheus", Value: "ha"}, + {Name: "replica", Value: "0"}, + }}, + ScrapePool: "myself", + ScrapeUrl: "http://localhost:9090/metrics", + Health: targetspb.TargetHealth_UP, + }, + { + DiscoveredLabels: labelpb.ZLabelSet{Labels: []labelpb.ZLabel{ + {Name: "__address__", Value: "localhost:9090"}, + {Name: "__metrics_path__", Value: "/metrics"}, + {Name: "__scheme__", Value: "http"}, + {Name: "job", Value: "myself"}, + {Name: "prometheus", Value: "ha"}, + {Name: "replica", Value: "1"}, + }}, + Labels: labelpb.ZLabelSet{Labels: []labelpb.ZLabel{ + {Name: "instance", Value: "localhost:9090"}, + {Name: "job", Value: "myself"}, + {Name: "prometheus", Value: "ha"}, + {Name: "replica", Value: "1"}, + }}, + ScrapePool: "myself", + ScrapeUrl: "http://localhost:9090/metrics", + Health: targetspb.TargetHealth_DOWN, + }, + }, + }, + want: &targetspb.TargetDiscovery{ + ActiveTargets: []*targetspb.ActiveTarget{ + { + DiscoveredLabels: labelpb.ZLabelSet{Labels: []labelpb.ZLabel{ + {Name: "__address__", Value: "localhost:9090"}, + {Name: "__metrics_path__", Value: "/metrics"}, + {Name: "__scheme__", Value: "http"}, + {Name: "job", Value: "myself"}, + {Name: "prometheus", Value: "ha"}, + }}, + Labels: labelpb.ZLabelSet{Labels: []labelpb.ZLabel{ + {Name: "instance", Value: "localhost:9090"}, + {Name: "job", Value: "myself"}, + {Name: "prometheus", Value: "ha"}, + }}, + ScrapePool: "myself", + ScrapeUrl: "http://localhost:9090/metrics", + Health: targetspb.TargetHealth_DOWN, + }, + }, + }, + }, + { + name: "active latest scrape first", + replicaLabels: []string{"replica"}, + targets: &targetspb.TargetDiscovery{ + ActiveTargets: []*targetspb.ActiveTarget{ + { + DiscoveredLabels: labelpb.ZLabelSet{Labels: []labelpb.ZLabel{ + {Name: "__address__", Value: "localhost:9090"}, + {Name: "__metrics_path__", Value: "/metrics"}, + {Name: "__scheme__", Value: "http"}, + {Name: "job", Value: "myself"}, + {Name: "prometheus", Value: "ha"}, + {Name: "replica", Value: "0"}, + }}, + Labels: labelpb.ZLabelSet{Labels: []labelpb.ZLabel{ + {Name: "instance", Value: "localhost:9090"}, + {Name: "job", Value: "myself"}, + {Name: "prometheus", Value: "ha"}, + {Name: "replica", Value: "0"}, + }}, + ScrapePool: "myself", + ScrapeUrl: "http://localhost:9090/metrics", + Health: targetspb.TargetHealth_UP, + LastScrape: time.Unix(1, 0), + }, + { + DiscoveredLabels: labelpb.ZLabelSet{Labels: []labelpb.ZLabel{ + {Name: "__address__", Value: "localhost:9090"}, + {Name: "__metrics_path__", Value: "/metrics"}, + {Name: "__scheme__", Value: "http"}, + {Name: "job", Value: "myself"}, + {Name: "prometheus", Value: "ha"}, + {Name: "replica", Value: "1"}, + }}, + Labels: labelpb.ZLabelSet{Labels: []labelpb.ZLabel{ + {Name: "instance", Value: "localhost:9090"}, + {Name: "job", Value: "myself"}, + {Name: "prometheus", Value: "ha"}, + {Name: "replica", Value: "1"}, + }}, + ScrapePool: "myself", + ScrapeUrl: "http://localhost:9090/metrics", + Health: targetspb.TargetHealth_UP, + LastScrape: time.Unix(2, 0), + }, + }, + }, + want: &targetspb.TargetDiscovery{ + ActiveTargets: []*targetspb.ActiveTarget{ + { + DiscoveredLabels: labelpb.ZLabelSet{Labels: []labelpb.ZLabel{ + {Name: "__address__", Value: "localhost:9090"}, + {Name: "__metrics_path__", Value: "/metrics"}, + {Name: "__scheme__", Value: "http"}, + {Name: "job", Value: "myself"}, + {Name: "prometheus", Value: "ha"}, + }}, + Labels: labelpb.ZLabelSet{Labels: []labelpb.ZLabel{ + {Name: "instance", Value: "localhost:9090"}, + {Name: "job", Value: "myself"}, + {Name: "prometheus", Value: "ha"}, + }}, + ScrapePool: "myself", + ScrapeUrl: "http://localhost:9090/metrics", + Health: targetspb.TargetHealth_UP, + LastScrape: time.Unix(2, 0), + }, + }, + }, + }, + } { + t.Run(tc.name, func(t *testing.T) { + replicaLabels := make(map[string]struct{}) + for _, lbl := range tc.replicaLabels { + replicaLabels[lbl] = struct{}{} + } + testutil.Equals(t, tc.want, dedupTargets(tc.targets, replicaLabels)) + }) + } +} diff --git a/pkg/targets/targetspb/custom.go b/pkg/targets/targetspb/custom.go new file mode 100644 index 00000000000..0c0ea1f6f52 --- /dev/null +++ b/pkg/targets/targetspb/custom.go @@ -0,0 +1,129 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package targetspb + +import ( + "strconv" + "strings" + + "github.com/pkg/errors" + "github.com/prometheus/prometheus/pkg/labels" + "github.com/thanos-io/thanos/pkg/store/labelpb" +) + +func NewTargetsResponse(targets *TargetDiscovery) *TargetsResponse { + return &TargetsResponse{ + Result: &TargetsResponse_Targets{ + Targets: targets, + }, + } +} + +func NewWarningTargetsResponse(warning error) *TargetsResponse { + return &TargetsResponse{ + Result: &TargetsResponse_Warning{ + Warning: warning.Error(), + }, + } +} + +func (x *TargetHealth) UnmarshalJSON(entry []byte) error { + fieldStr, err := strconv.Unquote(string(entry)) + if err != nil { + return errors.Wrapf(err, "targetHealth: unquote %v", string(entry)) + } + + if len(fieldStr) == 0 { + return errors.New("empty targetHealth") + } + + state, ok := TargetHealth_value[strings.ToUpper(fieldStr)] + if !ok { + return errors.Errorf("unknown targetHealth: %v", string(entry)) + } + *x = TargetHealth(state) + return nil +} + +func (x *TargetHealth) MarshalJSON() ([]byte, error) { + return []byte(strconv.Quote(strings.ToLower(x.String()))), nil +} + +func (x TargetHealth) Compare(y TargetHealth) int { + return int(x) - int(y) +} + +func (t1 *ActiveTarget) Compare(t2 *ActiveTarget) int { + if d := strings.Compare(t1.ScrapeUrl, t2.ScrapeUrl); d != 0 { + return d + } + + if d := strings.Compare(t1.ScrapePool, t2.ScrapePool); d != 0 { + return d + } + + if d := labels.Compare(t1.DiscoveredLabels.PromLabels(), t2.DiscoveredLabels.PromLabels()); d != 0 { + return d + } + + if d := labels.Compare(t1.Labels.PromLabels(), t2.Labels.PromLabels()); d != 0 { + return d + } + + return 0 +} + +func (t1 *ActiveTarget) CompareState(t2 *ActiveTarget) int { + if d := t1.Health.Compare(t2.Health); d != 0 { + return d + } + + if t1.LastScrape.Before(t2.LastScrape) { + return 1 + } + + if t1.LastScrape.After(t2.LastScrape) { + return -1 + } + + return 0 +} + +func (t1 *DroppedTarget) Compare(t2 *DroppedTarget) int { + if d := labels.Compare(t1.DiscoveredLabels.PromLabels(), t2.DiscoveredLabels.PromLabels()); d != 0 { + return d + } + + return 0 +} + +func (t *ActiveTarget) SetLabels(ls labels.Labels) { + var result labelpb.ZLabelSet + + if len(ls) > 0 { + result = labelpb.ZLabelSet{Labels: labelpb.ZLabelsFromPromLabels(ls)} + } + + t.Labels = result +} + +func (t *ActiveTarget) SetDiscoveredLabels(ls labels.Labels) { + var result labelpb.ZLabelSet + + if len(ls) > 0 { + result = labelpb.ZLabelSet{Labels: labelpb.ZLabelsFromPromLabels(ls)} + } + + t.DiscoveredLabels = result +} + +func (t *DroppedTarget) SetDiscoveredLabels(ls labels.Labels) { + var result labelpb.ZLabelSet + + if len(ls) > 0 { + result = labelpb.ZLabelSet{Labels: labelpb.ZLabelsFromPromLabels(ls)} + } + + t.DiscoveredLabels = result +} diff --git a/pkg/targets/targetspb/rpc.pb.go b/pkg/targets/targetspb/rpc.pb.go new file mode 100644 index 00000000000..11f1bc98b2b --- /dev/null +++ b/pkg/targets/targetspb/rpc.pb.go @@ -0,0 +1,1668 @@ +// Code generated by protoc-gen-gogo. DO NOT EDIT. +// source: targets/targetspb/rpc.proto + +package targetspb + +import ( + context "context" + encoding_binary "encoding/binary" + fmt "fmt" + + _ "github.com/gogo/protobuf/gogoproto" + proto "github.com/gogo/protobuf/proto" + github_com_gogo_protobuf_types "github.com/gogo/protobuf/types" + grpc "google.golang.org/grpc" + codes "google.golang.org/grpc/codes" + status "google.golang.org/grpc/status" + + io "io" + math "math" + math_bits "math/bits" + time "time" + + labelpb "github.com/thanos-io/thanos/pkg/store/labelpb" + storepb "github.com/thanos-io/thanos/pkg/store/storepb" +) + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = fmt.Errorf +var _ = math.Inf +var _ = time.Kitchen + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the proto package it is being compiled against. +// A compilation error at this line likely means your copy of the +// proto package needs to be updated. +const _ = proto.GoGoProtoPackageIsVersion3 // please upgrade the proto package + +type TargetHealth int32 + +const ( + TargetHealth_DOWN TargetHealth = 0 + TargetHealth_UP TargetHealth = 1 +) + +var TargetHealth_name = map[int32]string{ + 0: "DOWN", + 1: "UP", +} + +var TargetHealth_value = map[string]int32{ + "DOWN": 0, + "UP": 1, +} + +func (x TargetHealth) String() string { + return proto.EnumName(TargetHealth_name, int32(x)) +} + +func (TargetHealth) EnumDescriptor() ([]byte, []int) { + return fileDescriptor_b5cdaee03579e907, []int{0} +} + +type TargetsRequest_State int32 + +const ( + TargetsRequest_ANY TargetsRequest_State = 0 + /// This will make sure strings.ToLower(.String()) will match 'active' and 'dropped' values for + /// Prometheus HTTP API. + TargetsRequest_ACTIVE TargetsRequest_State = 1 + TargetsRequest_DROPPED TargetsRequest_State = 2 +) + +var TargetsRequest_State_name = map[int32]string{ + 0: "ANY", + 1: "ACTIVE", + 2: "DROPPED", +} + +var TargetsRequest_State_value = map[string]int32{ + "ANY": 0, + "ACTIVE": 1, + "DROPPED": 2, +} + +func (x TargetsRequest_State) String() string { + return proto.EnumName(TargetsRequest_State_name, int32(x)) +} + +func (TargetsRequest_State) EnumDescriptor() ([]byte, []int) { + return fileDescriptor_b5cdaee03579e907, []int{0, 0} +} + +type TargetsRequest struct { + State TargetsRequest_State `protobuf:"varint,1,opt,name=state,proto3,enum=thanos.TargetsRequest_State" json:"state,omitempty"` + PartialResponseStrategy storepb.PartialResponseStrategy `protobuf:"varint,2,opt,name=partial_response_strategy,json=partialResponseStrategy,proto3,enum=thanos.PartialResponseStrategy" json:"partial_response_strategy,omitempty"` +} + +func (m *TargetsRequest) Reset() { *m = TargetsRequest{} } +func (m *TargetsRequest) String() string { return proto.CompactTextString(m) } +func (*TargetsRequest) ProtoMessage() {} +func (*TargetsRequest) Descriptor() ([]byte, []int) { + return fileDescriptor_b5cdaee03579e907, []int{0} +} +func (m *TargetsRequest) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *TargetsRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_TargetsRequest.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *TargetsRequest) XXX_Merge(src proto.Message) { + xxx_messageInfo_TargetsRequest.Merge(m, src) +} +func (m *TargetsRequest) XXX_Size() int { + return m.Size() +} +func (m *TargetsRequest) XXX_DiscardUnknown() { + xxx_messageInfo_TargetsRequest.DiscardUnknown(m) +} + +var xxx_messageInfo_TargetsRequest proto.InternalMessageInfo + +type TargetsResponse struct { + // Types that are valid to be assigned to Result: + // *TargetsResponse_Targets + // *TargetsResponse_Warning + Result isTargetsResponse_Result `protobuf_oneof:"result"` +} + +func (m *TargetsResponse) Reset() { *m = TargetsResponse{} } +func (m *TargetsResponse) String() string { return proto.CompactTextString(m) } +func (*TargetsResponse) ProtoMessage() {} +func (*TargetsResponse) Descriptor() ([]byte, []int) { + return fileDescriptor_b5cdaee03579e907, []int{1} +} +func (m *TargetsResponse) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *TargetsResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_TargetsResponse.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *TargetsResponse) XXX_Merge(src proto.Message) { + xxx_messageInfo_TargetsResponse.Merge(m, src) +} +func (m *TargetsResponse) XXX_Size() int { + return m.Size() +} +func (m *TargetsResponse) XXX_DiscardUnknown() { + xxx_messageInfo_TargetsResponse.DiscardUnknown(m) +} + +var xxx_messageInfo_TargetsResponse proto.InternalMessageInfo + +type isTargetsResponse_Result interface { + isTargetsResponse_Result() + MarshalTo([]byte) (int, error) + Size() int +} + +type TargetsResponse_Targets struct { + Targets *TargetDiscovery `protobuf:"bytes,1,opt,name=targets,proto3,oneof" json:"targets,omitempty"` +} +type TargetsResponse_Warning struct { + Warning string `protobuf:"bytes,2,opt,name=warning,proto3,oneof" json:"warning,omitempty"` +} + +func (*TargetsResponse_Targets) isTargetsResponse_Result() {} +func (*TargetsResponse_Warning) isTargetsResponse_Result() {} + +func (m *TargetsResponse) GetResult() isTargetsResponse_Result { + if m != nil { + return m.Result + } + return nil +} + +func (m *TargetsResponse) GetTargets() *TargetDiscovery { + if x, ok := m.GetResult().(*TargetsResponse_Targets); ok { + return x.Targets + } + return nil +} + +func (m *TargetsResponse) GetWarning() string { + if x, ok := m.GetResult().(*TargetsResponse_Warning); ok { + return x.Warning + } + return "" +} + +// XXX_OneofWrappers is for the internal use of the proto package. +func (*TargetsResponse) XXX_OneofWrappers() []interface{} { + return []interface{}{ + (*TargetsResponse_Targets)(nil), + (*TargetsResponse_Warning)(nil), + } +} + +type TargetDiscovery struct { + ActiveTargets []*ActiveTarget `protobuf:"bytes,1,rep,name=activeTargets,proto3" json:"activeTargets"` + DroppedTargets []*DroppedTarget `protobuf:"bytes,2,rep,name=droppedTargets,proto3" json:"droppedTargets"` +} + +func (m *TargetDiscovery) Reset() { *m = TargetDiscovery{} } +func (m *TargetDiscovery) String() string { return proto.CompactTextString(m) } +func (*TargetDiscovery) ProtoMessage() {} +func (*TargetDiscovery) Descriptor() ([]byte, []int) { + return fileDescriptor_b5cdaee03579e907, []int{2} +} +func (m *TargetDiscovery) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *TargetDiscovery) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_TargetDiscovery.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *TargetDiscovery) XXX_Merge(src proto.Message) { + xxx_messageInfo_TargetDiscovery.Merge(m, src) +} +func (m *TargetDiscovery) XXX_Size() int { + return m.Size() +} +func (m *TargetDiscovery) XXX_DiscardUnknown() { + xxx_messageInfo_TargetDiscovery.DiscardUnknown(m) +} + +var xxx_messageInfo_TargetDiscovery proto.InternalMessageInfo + +type ActiveTarget struct { + DiscoveredLabels labelpb.ZLabelSet `protobuf:"bytes,1,opt,name=discoveredLabels,proto3" json:"discoveredLabels"` + Labels labelpb.ZLabelSet `protobuf:"bytes,2,opt,name=labels,proto3" json:"labels"` + ScrapePool string `protobuf:"bytes,3,opt,name=scrapePool,proto3" json:"scrapePool"` + ScrapeUrl string `protobuf:"bytes,4,opt,name=scrapeUrl,proto3" json:"scrapeUrl"` + LastError string `protobuf:"bytes,5,opt,name=lastError,proto3" json:"lastError"` + LastScrape time.Time `protobuf:"bytes,6,opt,name=lastScrape,proto3,stdtime" json:"lastScrape"` + LastScrapeDuration float64 `protobuf:"fixed64,7,opt,name=lastScrapeDuration,proto3" json:"lastScrapeDuration"` + Health TargetHealth `protobuf:"varint,8,opt,name=health,proto3,enum=thanos.TargetHealth" json:"health"` +} + +func (m *ActiveTarget) Reset() { *m = ActiveTarget{} } +func (m *ActiveTarget) String() string { return proto.CompactTextString(m) } +func (*ActiveTarget) ProtoMessage() {} +func (*ActiveTarget) Descriptor() ([]byte, []int) { + return fileDescriptor_b5cdaee03579e907, []int{3} +} +func (m *ActiveTarget) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *ActiveTarget) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_ActiveTarget.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *ActiveTarget) XXX_Merge(src proto.Message) { + xxx_messageInfo_ActiveTarget.Merge(m, src) +} +func (m *ActiveTarget) XXX_Size() int { + return m.Size() +} +func (m *ActiveTarget) XXX_DiscardUnknown() { + xxx_messageInfo_ActiveTarget.DiscardUnknown(m) +} + +var xxx_messageInfo_ActiveTarget proto.InternalMessageInfo + +type DroppedTarget struct { + DiscoveredLabels labelpb.ZLabelSet `protobuf:"bytes,1,opt,name=discoveredLabels,proto3" json:"discoveredLabels"` +} + +func (m *DroppedTarget) Reset() { *m = DroppedTarget{} } +func (m *DroppedTarget) String() string { return proto.CompactTextString(m) } +func (*DroppedTarget) ProtoMessage() {} +func (*DroppedTarget) Descriptor() ([]byte, []int) { + return fileDescriptor_b5cdaee03579e907, []int{4} +} +func (m *DroppedTarget) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *DroppedTarget) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_DroppedTarget.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *DroppedTarget) XXX_Merge(src proto.Message) { + xxx_messageInfo_DroppedTarget.Merge(m, src) +} +func (m *DroppedTarget) XXX_Size() int { + return m.Size() +} +func (m *DroppedTarget) XXX_DiscardUnknown() { + xxx_messageInfo_DroppedTarget.DiscardUnknown(m) +} + +var xxx_messageInfo_DroppedTarget proto.InternalMessageInfo + +func init() { + proto.RegisterEnum("thanos.TargetHealth", TargetHealth_name, TargetHealth_value) + proto.RegisterEnum("thanos.TargetsRequest_State", TargetsRequest_State_name, TargetsRequest_State_value) + proto.RegisterType((*TargetsRequest)(nil), "thanos.TargetsRequest") + proto.RegisterType((*TargetsResponse)(nil), "thanos.TargetsResponse") + proto.RegisterType((*TargetDiscovery)(nil), "thanos.TargetDiscovery") + proto.RegisterType((*ActiveTarget)(nil), "thanos.ActiveTarget") + proto.RegisterType((*DroppedTarget)(nil), "thanos.DroppedTarget") +} + +func init() { proto.RegisterFile("targets/targetspb/rpc.proto", fileDescriptor_b5cdaee03579e907) } + +var fileDescriptor_b5cdaee03579e907 = []byte{ + // 679 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xb4, 0x54, 0xc1, 0x4e, 0xdb, 0x4c, + 0x10, 0xb6, 0x03, 0x38, 0x30, 0x40, 0xfe, 0xb0, 0xe2, 0x07, 0x93, 0x56, 0x31, 0xca, 0xa5, 0xb4, + 0x95, 0xec, 0x2a, 0x5c, 0x5a, 0xa9, 0x17, 0xdc, 0xd0, 0x52, 0xa9, 0x85, 0x74, 0x13, 0x8a, 0x4a, + 0x0f, 0x68, 0x93, 0x6c, 0x9d, 0x48, 0x26, 0xeb, 0xee, 0x6e, 0xa8, 0xb8, 0xf6, 0x09, 0x78, 0x97, + 0xbe, 0x04, 0x87, 0x1e, 0x38, 0xf6, 0xe4, 0xb6, 0x70, 0xcb, 0x53, 0x54, 0x5e, 0xdb, 0x89, 0x03, + 0xe9, 0xb1, 0x97, 0xec, 0xcc, 0x37, 0xdf, 0x7c, 0x33, 0xbb, 0x99, 0x31, 0xdc, 0x93, 0x84, 0x7b, + 0x54, 0x0a, 0x27, 0x39, 0x83, 0x96, 0xc3, 0x83, 0xb6, 0x1d, 0x70, 0x26, 0x19, 0x32, 0x64, 0x97, + 0xf4, 0x99, 0x28, 0x6d, 0x08, 0xc9, 0x38, 0x75, 0xd4, 0x6f, 0xd0, 0x72, 0xe4, 0x79, 0x40, 0x45, + 0x4c, 0x29, 0xad, 0x7a, 0xcc, 0x63, 0xca, 0x74, 0x22, 0x2b, 0x41, 0x93, 0x04, 0x9f, 0xb4, 0xa8, + 0x7f, 0x2b, 0xc1, 0xf2, 0x18, 0xf3, 0x7c, 0xea, 0x28, 0xaf, 0x35, 0xf8, 0xe4, 0xc8, 0xde, 0x29, + 0x15, 0x92, 0x9c, 0x06, 0x31, 0xa1, 0xf2, 0x5d, 0x87, 0x42, 0x33, 0x6e, 0x06, 0xd3, 0xcf, 0x03, + 0x2a, 0x24, 0xaa, 0xc2, 0x9c, 0x90, 0x44, 0x52, 0x53, 0xdf, 0xd4, 0xb7, 0x0a, 0xd5, 0xfb, 0x76, + 0xdc, 0x97, 0x3d, 0x49, 0xb3, 0x1b, 0x11, 0x07, 0xc7, 0x54, 0xf4, 0x11, 0x36, 0x02, 0xc2, 0x65, + 0x8f, 0xf8, 0x27, 0x9c, 0x8a, 0x80, 0xf5, 0x05, 0x3d, 0x11, 0x92, 0x13, 0x49, 0xbd, 0x73, 0x33, + 0xa7, 0x74, 0xac, 0x54, 0xa7, 0x1e, 0x13, 0x71, 0xc2, 0x6b, 0x24, 0x34, 0xbc, 0x1e, 0x4c, 0x0f, + 0x54, 0x1e, 0xc2, 0x9c, 0x2a, 0x86, 0xf2, 0x30, 0xb3, 0xb3, 0xff, 0xa1, 0xa8, 0x21, 0x00, 0x63, + 0xe7, 0x45, 0xf3, 0xf5, 0xfb, 0xdd, 0xa2, 0x8e, 0x16, 0x21, 0x5f, 0xc3, 0x07, 0xf5, 0xfa, 0x6e, + 0xad, 0x98, 0xab, 0xf8, 0xf0, 0xdf, 0xa8, 0xcd, 0x58, 0x05, 0x6d, 0x43, 0x3e, 0x79, 0x6d, 0x75, + 0xa1, 0xc5, 0xea, 0xfa, 0xe4, 0x85, 0x6a, 0x3d, 0xd1, 0x66, 0x67, 0x94, 0x9f, 0xef, 0x69, 0x38, + 0x65, 0xa2, 0x12, 0xe4, 0xbf, 0x10, 0xde, 0xef, 0xf5, 0x3d, 0xd5, 0xfd, 0x42, 0x14, 0x4b, 0x00, + 0x77, 0x1e, 0x0c, 0x4e, 0xc5, 0xc0, 0x97, 0x95, 0x6f, 0x7a, 0x5a, 0x6e, 0x24, 0x82, 0xde, 0xc2, + 0x32, 0x69, 0xcb, 0xde, 0x19, 0x6d, 0x8e, 0x8a, 0xce, 0x6c, 0x2d, 0x56, 0x57, 0xd3, 0xa2, 0x3b, + 0x99, 0xa0, 0xbb, 0x32, 0x0c, 0xad, 0x49, 0x3a, 0x9e, 0x74, 0xd1, 0x3b, 0x28, 0x74, 0x38, 0x0b, + 0x02, 0xda, 0x49, 0xf5, 0x72, 0x4a, 0xef, 0xff, 0x54, 0xaf, 0x96, 0x8d, 0xba, 0x68, 0x18, 0x5a, + 0xb7, 0x12, 0xf0, 0x2d, 0xbf, 0xf2, 0x75, 0x16, 0x96, 0xb2, 0x5d, 0xa0, 0x23, 0x28, 0x76, 0x92, + 0xfe, 0x69, 0xe7, 0x4d, 0x34, 0x45, 0xe9, 0x53, 0xad, 0xa4, 0x55, 0x8e, 0x15, 0xdc, 0xa0, 0xd2, + 0x35, 0x2f, 0x43, 0x4b, 0x1b, 0x86, 0xd6, 0x9d, 0x14, 0x7c, 0x07, 0x41, 0xcf, 0xc0, 0xf0, 0x63, + 0xb9, 0xdc, 0xdf, 0xe4, 0x0a, 0x89, 0x5c, 0x42, 0xc4, 0xc9, 0x89, 0x6c, 0x00, 0xd1, 0xe6, 0x24, + 0xa0, 0x75, 0xc6, 0x7c, 0x73, 0x26, 0xfa, 0x0f, 0xdc, 0xc2, 0x30, 0xb4, 0x32, 0x28, 0xce, 0xd8, + 0xe8, 0x31, 0x2c, 0xc4, 0xde, 0x21, 0xf7, 0xcd, 0x59, 0x45, 0x5f, 0x1e, 0x86, 0xd6, 0x18, 0xc4, + 0x63, 0x33, 0x22, 0xfb, 0x44, 0xc8, 0x5d, 0xce, 0x19, 0x37, 0xe7, 0xc6, 0xe4, 0x11, 0x88, 0xc7, + 0x26, 0xc2, 0x00, 0x91, 0xd3, 0x50, 0xd9, 0xa6, 0xa1, 0x2e, 0x52, 0xb2, 0xe3, 0xbd, 0xb2, 0xd3, + 0xbd, 0xb2, 0x9b, 0xe9, 0x5e, 0xb9, 0x6b, 0xc9, 0x8d, 0x32, 0x59, 0x17, 0x3f, 0x2d, 0x1d, 0x67, + 0x7c, 0xf4, 0x12, 0xd0, 0xd8, 0xab, 0x0d, 0x38, 0x91, 0x3d, 0xd6, 0x37, 0xf3, 0x9b, 0xfa, 0x96, + 0xee, 0xae, 0x0d, 0x43, 0x6b, 0x4a, 0x14, 0x4f, 0xc1, 0xd0, 0x53, 0x30, 0xba, 0x94, 0xf8, 0xb2, + 0x6b, 0xce, 0xab, 0x1d, 0x5b, 0x9d, 0x1c, 0xed, 0x3d, 0x15, 0x73, 0x21, 0x7a, 0xdf, 0x98, 0x87, + 0x93, 0xb3, 0xd2, 0x85, 0xe5, 0x89, 0xc9, 0xf9, 0x67, 0x43, 0xf0, 0x68, 0x13, 0x96, 0xb2, 0xdd, + 0xa0, 0x79, 0x98, 0xad, 0x1d, 0x1c, 0xed, 0x17, 0x35, 0x64, 0x40, 0xee, 0xb0, 0x5e, 0xd4, 0xab, + 0xaf, 0x20, 0x9f, 0x8e, 0xfb, 0xf3, 0xb1, 0xb9, 0x36, 0xfd, 0xbb, 0x53, 0x5a, 0xbf, 0x83, 0xc7, + 0x8b, 0xfe, 0x44, 0x77, 0x1f, 0x5c, 0xfe, 0x2e, 0x6b, 0x97, 0xd7, 0x65, 0xfd, 0xea, 0xba, 0xac, + 0xff, 0xba, 0x2e, 0xeb, 0x17, 0x37, 0x65, 0xed, 0xea, 0xa6, 0xac, 0xfd, 0xb8, 0x29, 0x6b, 0xc7, + 0x0b, 0xa3, 0x8f, 0x6e, 0xcb, 0x50, 0xff, 0xdb, 0xf6, 0x9f, 0x00, 0x00, 0x00, 0xff, 0xff, 0x7c, + 0xd7, 0xb0, 0x1f, 0x90, 0x05, 0x00, 0x00, +} + +// Reference imports to suppress errors if they are not otherwise used. +var _ context.Context +var _ grpc.ClientConn + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the grpc package it is being compiled against. +const _ = grpc.SupportPackageIsVersion4 + +// TargetsClient is the client API for Targets service. +// +// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream. +type TargetsClient interface { + /// Targets has info for all targets. + /// Returned targets are expected to include external labels. + Targets(ctx context.Context, in *TargetsRequest, opts ...grpc.CallOption) (Targets_TargetsClient, error) +} + +type targetsClient struct { + cc *grpc.ClientConn +} + +func NewTargetsClient(cc *grpc.ClientConn) TargetsClient { + return &targetsClient{cc} +} + +func (c *targetsClient) Targets(ctx context.Context, in *TargetsRequest, opts ...grpc.CallOption) (Targets_TargetsClient, error) { + stream, err := c.cc.NewStream(ctx, &_Targets_serviceDesc.Streams[0], "/thanos.Targets/Targets", opts...) + if err != nil { + return nil, err + } + x := &targetsTargetsClient{stream} + if err := x.ClientStream.SendMsg(in); err != nil { + return nil, err + } + if err := x.ClientStream.CloseSend(); err != nil { + return nil, err + } + return x, nil +} + +type Targets_TargetsClient interface { + Recv() (*TargetsResponse, error) + grpc.ClientStream +} + +type targetsTargetsClient struct { + grpc.ClientStream +} + +func (x *targetsTargetsClient) Recv() (*TargetsResponse, error) { + m := new(TargetsResponse) + if err := x.ClientStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + +// TargetsServer is the server API for Targets service. +type TargetsServer interface { + /// Targets has info for all targets. + /// Returned targets are expected to include external labels. + Targets(*TargetsRequest, Targets_TargetsServer) error +} + +// UnimplementedTargetsServer can be embedded to have forward compatible implementations. +type UnimplementedTargetsServer struct { +} + +func (*UnimplementedTargetsServer) Targets(req *TargetsRequest, srv Targets_TargetsServer) error { + return status.Errorf(codes.Unimplemented, "method Targets not implemented") +} + +func RegisterTargetsServer(s *grpc.Server, srv TargetsServer) { + s.RegisterService(&_Targets_serviceDesc, srv) +} + +func _Targets_Targets_Handler(srv interface{}, stream grpc.ServerStream) error { + m := new(TargetsRequest) + if err := stream.RecvMsg(m); err != nil { + return err + } + return srv.(TargetsServer).Targets(m, &targetsTargetsServer{stream}) +} + +type Targets_TargetsServer interface { + Send(*TargetsResponse) error + grpc.ServerStream +} + +type targetsTargetsServer struct { + grpc.ServerStream +} + +func (x *targetsTargetsServer) Send(m *TargetsResponse) error { + return x.ServerStream.SendMsg(m) +} + +var _Targets_serviceDesc = grpc.ServiceDesc{ + ServiceName: "thanos.Targets", + HandlerType: (*TargetsServer)(nil), + Methods: []grpc.MethodDesc{}, + Streams: []grpc.StreamDesc{ + { + StreamName: "Targets", + Handler: _Targets_Targets_Handler, + ServerStreams: true, + }, + }, + Metadata: "targets/targetspb/rpc.proto", +} + +func (m *TargetsRequest) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *TargetsRequest) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *TargetsRequest) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if m.PartialResponseStrategy != 0 { + i = encodeVarintRpc(dAtA, i, uint64(m.PartialResponseStrategy)) + i-- + dAtA[i] = 0x10 + } + if m.State != 0 { + i = encodeVarintRpc(dAtA, i, uint64(m.State)) + i-- + dAtA[i] = 0x8 + } + return len(dAtA) - i, nil +} + +func (m *TargetsResponse) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *TargetsResponse) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *TargetsResponse) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if m.Result != nil { + { + size := m.Result.Size() + i -= size + if _, err := m.Result.MarshalTo(dAtA[i:]); err != nil { + return 0, err + } + } + } + return len(dAtA) - i, nil +} + +func (m *TargetsResponse_Targets) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *TargetsResponse_Targets) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + if m.Targets != nil { + { + size, err := m.Targets.MarshalToSizedBuffer(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = encodeVarintRpc(dAtA, i, uint64(size)) + } + i-- + dAtA[i] = 0xa + } + return len(dAtA) - i, nil +} +func (m *TargetsResponse_Warning) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *TargetsResponse_Warning) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + i -= len(m.Warning) + copy(dAtA[i:], m.Warning) + i = encodeVarintRpc(dAtA, i, uint64(len(m.Warning))) + i-- + dAtA[i] = 0x12 + return len(dAtA) - i, nil +} +func (m *TargetDiscovery) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *TargetDiscovery) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *TargetDiscovery) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if len(m.DroppedTargets) > 0 { + for iNdEx := len(m.DroppedTargets) - 1; iNdEx >= 0; iNdEx-- { + { + size, err := m.DroppedTargets[iNdEx].MarshalToSizedBuffer(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = encodeVarintRpc(dAtA, i, uint64(size)) + } + i-- + dAtA[i] = 0x12 + } + } + if len(m.ActiveTargets) > 0 { + for iNdEx := len(m.ActiveTargets) - 1; iNdEx >= 0; iNdEx-- { + { + size, err := m.ActiveTargets[iNdEx].MarshalToSizedBuffer(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = encodeVarintRpc(dAtA, i, uint64(size)) + } + i-- + dAtA[i] = 0xa + } + } + return len(dAtA) - i, nil +} + +func (m *ActiveTarget) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *ActiveTarget) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *ActiveTarget) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if m.Health != 0 { + i = encodeVarintRpc(dAtA, i, uint64(m.Health)) + i-- + dAtA[i] = 0x40 + } + if m.LastScrapeDuration != 0 { + i -= 8 + encoding_binary.LittleEndian.PutUint64(dAtA[i:], uint64(math.Float64bits(float64(m.LastScrapeDuration)))) + i-- + dAtA[i] = 0x39 + } + n2, err2 := github_com_gogo_protobuf_types.StdTimeMarshalTo(m.LastScrape, dAtA[i-github_com_gogo_protobuf_types.SizeOfStdTime(m.LastScrape):]) + if err2 != nil { + return 0, err2 + } + i -= n2 + i = encodeVarintRpc(dAtA, i, uint64(n2)) + i-- + dAtA[i] = 0x32 + if len(m.LastError) > 0 { + i -= len(m.LastError) + copy(dAtA[i:], m.LastError) + i = encodeVarintRpc(dAtA, i, uint64(len(m.LastError))) + i-- + dAtA[i] = 0x2a + } + if len(m.ScrapeUrl) > 0 { + i -= len(m.ScrapeUrl) + copy(dAtA[i:], m.ScrapeUrl) + i = encodeVarintRpc(dAtA, i, uint64(len(m.ScrapeUrl))) + i-- + dAtA[i] = 0x22 + } + if len(m.ScrapePool) > 0 { + i -= len(m.ScrapePool) + copy(dAtA[i:], m.ScrapePool) + i = encodeVarintRpc(dAtA, i, uint64(len(m.ScrapePool))) + i-- + dAtA[i] = 0x1a + } + { + size, err := m.Labels.MarshalToSizedBuffer(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = encodeVarintRpc(dAtA, i, uint64(size)) + } + i-- + dAtA[i] = 0x12 + { + size, err := m.DiscoveredLabels.MarshalToSizedBuffer(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = encodeVarintRpc(dAtA, i, uint64(size)) + } + i-- + dAtA[i] = 0xa + return len(dAtA) - i, nil +} + +func (m *DroppedTarget) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *DroppedTarget) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *DroppedTarget) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + { + size, err := m.DiscoveredLabels.MarshalToSizedBuffer(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = encodeVarintRpc(dAtA, i, uint64(size)) + } + i-- + dAtA[i] = 0xa + return len(dAtA) - i, nil +} + +func encodeVarintRpc(dAtA []byte, offset int, v uint64) int { + offset -= sovRpc(v) + base := offset + for v >= 1<<7 { + dAtA[offset] = uint8(v&0x7f | 0x80) + v >>= 7 + offset++ + } + dAtA[offset] = uint8(v) + return base +} +func (m *TargetsRequest) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if m.State != 0 { + n += 1 + sovRpc(uint64(m.State)) + } + if m.PartialResponseStrategy != 0 { + n += 1 + sovRpc(uint64(m.PartialResponseStrategy)) + } + return n +} + +func (m *TargetsResponse) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if m.Result != nil { + n += m.Result.Size() + } + return n +} + +func (m *TargetsResponse_Targets) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if m.Targets != nil { + l = m.Targets.Size() + n += 1 + l + sovRpc(uint64(l)) + } + return n +} +func (m *TargetsResponse_Warning) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + l = len(m.Warning) + n += 1 + l + sovRpc(uint64(l)) + return n +} +func (m *TargetDiscovery) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if len(m.ActiveTargets) > 0 { + for _, e := range m.ActiveTargets { + l = e.Size() + n += 1 + l + sovRpc(uint64(l)) + } + } + if len(m.DroppedTargets) > 0 { + for _, e := range m.DroppedTargets { + l = e.Size() + n += 1 + l + sovRpc(uint64(l)) + } + } + return n +} + +func (m *ActiveTarget) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + l = m.DiscoveredLabels.Size() + n += 1 + l + sovRpc(uint64(l)) + l = m.Labels.Size() + n += 1 + l + sovRpc(uint64(l)) + l = len(m.ScrapePool) + if l > 0 { + n += 1 + l + sovRpc(uint64(l)) + } + l = len(m.ScrapeUrl) + if l > 0 { + n += 1 + l + sovRpc(uint64(l)) + } + l = len(m.LastError) + if l > 0 { + n += 1 + l + sovRpc(uint64(l)) + } + l = github_com_gogo_protobuf_types.SizeOfStdTime(m.LastScrape) + n += 1 + l + sovRpc(uint64(l)) + if m.LastScrapeDuration != 0 { + n += 9 + } + if m.Health != 0 { + n += 1 + sovRpc(uint64(m.Health)) + } + return n +} + +func (m *DroppedTarget) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + l = m.DiscoveredLabels.Size() + n += 1 + l + sovRpc(uint64(l)) + return n +} + +func sovRpc(x uint64) (n int) { + return (math_bits.Len64(x|1) + 6) / 7 +} +func sozRpc(x uint64) (n int) { + return sovRpc(uint64((x << 1) ^ uint64((int64(x) >> 63)))) +} +func (m *TargetsRequest) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRpc + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: TargetsRequest: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: TargetsRequest: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field State", wireType) + } + m.State = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRpc + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.State |= TargetsRequest_State(b&0x7F) << shift + if b < 0x80 { + break + } + } + case 2: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field PartialResponseStrategy", wireType) + } + m.PartialResponseStrategy = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRpc + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.PartialResponseStrategy |= storepb.PartialResponseStrategy(b&0x7F) << shift + if b < 0x80 { + break + } + } + default: + iNdEx = preIndex + skippy, err := skipRpc(dAtA[iNdEx:]) + if err != nil { + return err + } + if (skippy < 0) || (iNdEx+skippy) < 0 { + return ErrInvalidLengthRpc + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *TargetsResponse) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRpc + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: TargetsResponse: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: TargetsResponse: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Targets", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRpc + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthRpc + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthRpc + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + v := &TargetDiscovery{} + if err := v.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + m.Result = &TargetsResponse_Targets{v} + iNdEx = postIndex + case 2: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Warning", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRpc + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthRpc + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthRpc + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Result = &TargetsResponse_Warning{string(dAtA[iNdEx:postIndex])} + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipRpc(dAtA[iNdEx:]) + if err != nil { + return err + } + if (skippy < 0) || (iNdEx+skippy) < 0 { + return ErrInvalidLengthRpc + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *TargetDiscovery) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRpc + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: TargetDiscovery: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: TargetDiscovery: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field ActiveTargets", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRpc + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthRpc + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthRpc + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.ActiveTargets = append(m.ActiveTargets, &ActiveTarget{}) + if err := m.ActiveTargets[len(m.ActiveTargets)-1].Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + case 2: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field DroppedTargets", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRpc + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthRpc + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthRpc + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.DroppedTargets = append(m.DroppedTargets, &DroppedTarget{}) + if err := m.DroppedTargets[len(m.DroppedTargets)-1].Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipRpc(dAtA[iNdEx:]) + if err != nil { + return err + } + if (skippy < 0) || (iNdEx+skippy) < 0 { + return ErrInvalidLengthRpc + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *ActiveTarget) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRpc + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: ActiveTarget: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: ActiveTarget: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field DiscoveredLabels", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRpc + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthRpc + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthRpc + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + if err := m.DiscoveredLabels.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + case 2: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Labels", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRpc + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthRpc + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthRpc + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + if err := m.Labels.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + case 3: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field ScrapePool", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRpc + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthRpc + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthRpc + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.ScrapePool = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + case 4: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field ScrapeUrl", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRpc + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthRpc + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthRpc + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.ScrapeUrl = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + case 5: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field LastError", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRpc + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthRpc + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthRpc + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.LastError = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + case 6: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field LastScrape", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRpc + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthRpc + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthRpc + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + if err := github_com_gogo_protobuf_types.StdTimeUnmarshal(&m.LastScrape, dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + case 7: + if wireType != 1 { + return fmt.Errorf("proto: wrong wireType = %d for field LastScrapeDuration", wireType) + } + var v uint64 + if (iNdEx + 8) > l { + return io.ErrUnexpectedEOF + } + v = uint64(encoding_binary.LittleEndian.Uint64(dAtA[iNdEx:])) + iNdEx += 8 + m.LastScrapeDuration = float64(math.Float64frombits(v)) + case 8: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Health", wireType) + } + m.Health = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRpc + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.Health |= TargetHealth(b&0x7F) << shift + if b < 0x80 { + break + } + } + default: + iNdEx = preIndex + skippy, err := skipRpc(dAtA[iNdEx:]) + if err != nil { + return err + } + if (skippy < 0) || (iNdEx+skippy) < 0 { + return ErrInvalidLengthRpc + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *DroppedTarget) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRpc + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: DroppedTarget: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: DroppedTarget: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field DiscoveredLabels", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRpc + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthRpc + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthRpc + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + if err := m.DiscoveredLabels.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipRpc(dAtA[iNdEx:]) + if err != nil { + return err + } + if (skippy < 0) || (iNdEx+skippy) < 0 { + return ErrInvalidLengthRpc + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func skipRpc(dAtA []byte) (n int, err error) { + l := len(dAtA) + iNdEx := 0 + depth := 0 + for iNdEx < l { + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowRpc + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + wireType := int(wire & 0x7) + switch wireType { + case 0: + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowRpc + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + iNdEx++ + if dAtA[iNdEx-1] < 0x80 { + break + } + } + case 1: + iNdEx += 8 + case 2: + var length int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowRpc + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + length |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if length < 0 { + return 0, ErrInvalidLengthRpc + } + iNdEx += length + case 3: + depth++ + case 4: + if depth == 0 { + return 0, ErrUnexpectedEndOfGroupRpc + } + depth-- + case 5: + iNdEx += 4 + default: + return 0, fmt.Errorf("proto: illegal wireType %d", wireType) + } + if iNdEx < 0 { + return 0, ErrInvalidLengthRpc + } + if depth == 0 { + return iNdEx, nil + } + } + return 0, io.ErrUnexpectedEOF +} + +var ( + ErrInvalidLengthRpc = fmt.Errorf("proto: negative length found during unmarshaling") + ErrIntOverflowRpc = fmt.Errorf("proto: integer overflow") + ErrUnexpectedEndOfGroupRpc = fmt.Errorf("proto: unexpected end of group") +) diff --git a/pkg/targets/targetspb/rpc.proto b/pkg/targets/targetspb/rpc.proto new file mode 100644 index 00000000000..e030a2c89d7 --- /dev/null +++ b/pkg/targets/targetspb/rpc.proto @@ -0,0 +1,78 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +syntax = "proto3"; +package thanos; + +import "store/storepb/types.proto"; +import "gogoproto/gogo.proto"; +import "store/labelpb/types.proto"; +import "google/protobuf/timestamp.proto"; + +option go_package = "targetspb"; + +option (gogoproto.sizer_all) = true; +option (gogoproto.marshaler_all) = true; +option (gogoproto.unmarshaler_all) = true; +option (gogoproto.goproto_getters_all) = false; + +// Do not generate XXX fields to reduce memory footprint and opening a door +// for zero-copy casts to/from prometheus data types. +option (gogoproto.goproto_unkeyed_all) = false; +option (gogoproto.goproto_unrecognized_all) = false; +option (gogoproto.goproto_sizecache_all) = false; + +/// Targets represents API that is responsible for gathering targets and their states. +service Targets { + /// Targets has info for all targets. + /// Returned targets are expected to include external labels. + rpc Targets (TargetsRequest) returns (stream TargetsResponse); +} + +message TargetsRequest { + enum State { + ANY = 0; + /// This will make sure strings.ToLower(.String()) will match 'active' and 'dropped' values for + /// Prometheus HTTP API. + ACTIVE = 1; + DROPPED = 2; + } + State state = 1; + PartialResponseStrategy partial_response_strategy = 2; +} + +message TargetsResponse { + oneof result { + /// structure with targets. It is up to server implementation to decide how many of those to put here within single frame. + TargetDiscovery targets = 1; + + /// warning is considered an information piece in place of series for warning purposes. + /// It is used to warn rule API users about suspicious cases or partial response (if enabled). + string warning = 2; + } +} + +message TargetDiscovery { + repeated ActiveTarget activeTargets = 1 [(gogoproto.jsontag) = "activeTargets"]; + repeated DroppedTarget droppedTargets = 2 [(gogoproto.jsontag) = "droppedTargets"]; +} + +enum TargetHealth { + DOWN = 0; + UP = 1; +} + +message ActiveTarget { + ZLabelSet discoveredLabels = 1 [(gogoproto.jsontag) = "discoveredLabels", (gogoproto.nullable) = false]; + ZLabelSet labels = 2 [(gogoproto.jsontag) = "labels", (gogoproto.nullable) = false]; + string scrapePool = 3 [(gogoproto.jsontag) = "scrapePool"]; + string scrapeUrl = 4 [(gogoproto.jsontag) = "scrapeUrl"]; + string lastError = 5 [(gogoproto.jsontag) = "lastError"]; + google.protobuf.Timestamp lastScrape = 6 [(gogoproto.jsontag) = "lastScrape", (gogoproto.stdtime) = true, (gogoproto.nullable) = false]; + double lastScrapeDuration = 7 [(gogoproto.jsontag) = "lastScrapeDuration"]; + TargetHealth health = 8 [(gogoproto.jsontag) = "health"]; +} + +message DroppedTarget { + ZLabelSet discoveredLabels = 1 [(gogoproto.jsontag) = "discoveredLabels", (gogoproto.nullable) = false]; +} diff --git a/scripts/genproto.sh b/scripts/genproto.sh index 83f1f7da904..9ec3eb2c726 100755 --- a/scripts/genproto.sh +++ b/scripts/genproto.sh @@ -25,7 +25,7 @@ PATH=${PATH}:/tmp/protobin GOGOPROTO_ROOT="$(GO111MODULE=on go list -modfile=.bingo/protoc-gen-gogofast.mod -f '{{ .Dir }}' -m github.com/gogo/protobuf)" GOGOPROTO_PATH="${GOGOPROTO_ROOT}:${GOGOPROTO_ROOT}/protobuf" -DIRS="store/storepb/ store/storepb/prompb/ store/labelpb rules/rulespb store/hintspb queryfrontend metadata/metadatapb" +DIRS="store/storepb/ store/storepb/prompb/ store/labelpb rules/rulespb targets/targetspb store/hintspb queryfrontend metadata/metadatapb" echo "generating code" pushd "pkg" for dir in ${DIRS}; do diff --git a/test/e2e/compact_test.go b/test/e2e/compact_test.go index a1e206f08b6..1934af23935 100644 --- a/test/e2e/compact_test.go +++ b/test/e2e/compact_test.go @@ -442,7 +442,7 @@ func TestCompactWithStoreGateway(t *testing.T) { testutil.Ok(t, str.WaitSumMetrics(e2e.Equals(0), "thanos_blocks_meta_sync_failures_total")) testutil.Ok(t, str.WaitSumMetrics(e2e.Equals(0), "thanos_blocks_meta_modified")) - q, err := e2ethanos.NewQuerier(s.SharedDir(), "1", []string{str.GRPCNetworkEndpoint()}, nil, nil, nil, "", "") + q, err := e2ethanos.NewQuerier(s.SharedDir(), "1", []string{str.GRPCNetworkEndpoint()}, nil, nil, nil, nil, "", "") testutil.Ok(t, err) testutil.Ok(t, s.StartAndWaitReady(q)) diff --git a/test/e2e/e2ethanos/services.go b/test/e2e/e2ethanos/services.go index 2cb490fb1a2..1e481beec17 100644 --- a/test/e2e/e2ethanos/services.go +++ b/test/e2e/e2ethanos/services.go @@ -114,7 +114,7 @@ func NewPrometheusWithSidecar(sharedDir string, netName string, name string, con return prom, sidecar, nil } -func NewQuerier(sharedDir, name string, storeAddresses, fileSDStoreAddresses, ruleAddresses, metadataAddresses []string, routePrefix, externalPrefix string) (*Service, error) { +func NewQuerier(sharedDir, name string, storeAddresses, fileSDStoreAddresses, ruleAddresses, targetAddresses []string, metadataAddresses []string, routePrefix, externalPrefix string) (*Service, error) { const replicaLabel = "replica" args := e2e.BuildArgs(map[string]string{ @@ -136,6 +136,10 @@ func NewQuerier(sharedDir, name string, storeAddresses, fileSDStoreAddresses, ru args = append(args, "--rule="+addr) } + for _, addr := range targetAddresses { + args = append(args, "--target="+addr) + } + for _, addr := range metadataAddresses { args = append(args, "--metadata="+addr) } diff --git a/test/e2e/metadata_api_test.go b/test/e2e/metadata_api_test.go index 6211525f0cf..ef1c7eedeb3 100644 --- a/test/e2e/metadata_api_test.go +++ b/test/e2e/metadata_api_test.go @@ -50,6 +50,7 @@ func TestMetadataAPI_Fanout(t *testing.T) { []string{sidecar1.GRPCNetworkEndpoint(), sidecar2.GRPCNetworkEndpoint()}, nil, nil, + nil, []string{sidecar1.GRPCNetworkEndpoint(), sidecar2.GRPCNetworkEndpoint()}, "", "", diff --git a/test/e2e/query_frontend_test.go b/test/e2e/query_frontend_test.go index 27dd9b123d0..d80f7168f76 100644 --- a/test/e2e/query_frontend_test.go +++ b/test/e2e/query_frontend_test.go @@ -34,7 +34,7 @@ func TestQueryFrontend(t *testing.T) { testutil.Ok(t, err) testutil.Ok(t, s.StartAndWaitReady(prom, sidecar)) - q, err := e2ethanos.NewQuerier(s.SharedDir(), "1", []string{sidecar.GRPCNetworkEndpoint()}, nil, nil, nil, "", "") + q, err := e2ethanos.NewQuerier(s.SharedDir(), "1", []string{sidecar.GRPCNetworkEndpoint()}, nil, nil, nil, nil, "", "") testutil.Ok(t, err) testutil.Ok(t, s.StartAndWaitReady(q)) @@ -388,7 +388,7 @@ func TestQueryFrontendMemcachedCache(t *testing.T) { testutil.Ok(t, err) testutil.Ok(t, s.StartAndWaitReady(prom, sidecar)) - q, err := e2ethanos.NewQuerier(s.SharedDir(), "1", []string{sidecar.GRPCNetworkEndpoint()}, nil, nil, nil, "", "") + q, err := e2ethanos.NewQuerier(s.SharedDir(), "1", []string{sidecar.GRPCNetworkEndpoint()}, nil, nil, nil, nil, "", "") testutil.Ok(t, err) testutil.Ok(t, s.StartAndWaitReady(q)) diff --git a/test/e2e/query_test.go b/test/e2e/query_test.go index 00215e01f6c..089db1a72fa 100644 --- a/test/e2e/query_test.go +++ b/test/e2e/query_test.go @@ -55,6 +55,10 @@ scrape_configs: scrape_timeout: 1s static_configs: - targets: [%s] + relabel_configs: + - source_labels: ['__address__'] + regex: '^.+:80$' + action: drop `, name, replica, targets) if remoteWriteEndpoint != "" { @@ -108,7 +112,7 @@ func TestQuery(t *testing.T) { testutil.Ok(t, s.StartAndWaitReady(prom1, sidecar1, prom2, sidecar2, prom3, sidecar3, prom4, sidecar4)) // Querier. Both fileSD and directly by flags. - q, err := e2ethanos.NewQuerier(s.SharedDir(), "1", []string{sidecar1.GRPCNetworkEndpoint(), sidecar2.GRPCNetworkEndpoint(), receiver.GRPCNetworkEndpoint()}, []string{sidecar3.GRPCNetworkEndpoint(), sidecar4.GRPCNetworkEndpoint()}, nil, nil, "", "") + q, err := e2ethanos.NewQuerier(s.SharedDir(), "1", []string{sidecar1.GRPCNetworkEndpoint(), sidecar2.GRPCNetworkEndpoint(), receiver.GRPCNetworkEndpoint()}, []string{sidecar3.GRPCNetworkEndpoint(), sidecar4.GRPCNetworkEndpoint()}, nil, nil, nil, "", "") testutil.Ok(t, err) testutil.Ok(t, s.StartAndWaitReady(q)) @@ -189,6 +193,7 @@ func TestQueryExternalPrefixWithoutReverseProxy(t *testing.T) { nil, nil, nil, + nil, "", externalPrefix, ) @@ -213,6 +218,7 @@ func TestQueryExternalPrefix(t *testing.T) { nil, nil, nil, + nil, "", externalPrefix, ) @@ -243,6 +249,7 @@ func TestQueryExternalPrefixAndRoutePrefix(t *testing.T) { nil, nil, nil, + nil, routePrefix, externalPrefix, ) @@ -281,6 +288,7 @@ func TestQueryLabelNames(t *testing.T) { []string{}, nil, nil, + nil, "", "", ) @@ -340,6 +348,7 @@ func TestQueryLabelValues(t *testing.T) { []string{}, nil, nil, + nil, "", "", ) diff --git a/test/e2e/receive_test.go b/test/e2e/receive_test.go index 9aa599f40c7..fffef5b65c6 100644 --- a/test/e2e/receive_test.go +++ b/test/e2e/receive_test.go @@ -102,7 +102,7 @@ func TestReceive(t *testing.T) { testutil.Ok(t, err) testutil.Ok(t, s.StartAndWaitReady(prom1, prom2, prom3)) - q, err := e2ethanos.NewQuerier(s.SharedDir(), "1", []string{r1.GRPCNetworkEndpoint(), r2.GRPCNetworkEndpoint(), r3.GRPCNetworkEndpoint()}, nil, nil, nil, "", "") + q, err := e2ethanos.NewQuerier(s.SharedDir(), "1", []string{r1.GRPCNetworkEndpoint(), r2.GRPCNetworkEndpoint(), r3.GRPCNetworkEndpoint()}, nil, nil, nil, nil, "", "") testutil.Ok(t, err) testutil.Ok(t, s.StartAndWaitReady(q)) @@ -178,7 +178,7 @@ func TestReceive(t *testing.T) { testutil.Ok(t, err) testutil.Ok(t, s.StartAndWaitReady(prom1, prom2, prom3)) - q, err := e2ethanos.NewQuerier(s.SharedDir(), "1", []string{r1.GRPCNetworkEndpoint(), r2.GRPCNetworkEndpoint(), r3.GRPCNetworkEndpoint()}, nil, nil, nil, "", "") + q, err := e2ethanos.NewQuerier(s.SharedDir(), "1", []string{r1.GRPCNetworkEndpoint(), r2.GRPCNetworkEndpoint(), r3.GRPCNetworkEndpoint()}, nil, nil, nil, nil, "", "") testutil.Ok(t, err) testutil.Ok(t, s.StartAndWaitReady(q)) @@ -253,7 +253,7 @@ func TestReceive(t *testing.T) { testutil.Ok(t, err) testutil.Ok(t, s.StartAndWaitReady(prom1)) - q, err := e2ethanos.NewQuerier(s.SharedDir(), "1", []string{r1.GRPCNetworkEndpoint(), r2.GRPCNetworkEndpoint(), r3.GRPCNetworkEndpoint()}, nil, nil, nil, "", "") + q, err := e2ethanos.NewQuerier(s.SharedDir(), "1", []string{r1.GRPCNetworkEndpoint(), r2.GRPCNetworkEndpoint(), r3.GRPCNetworkEndpoint()}, nil, nil, nil, nil, "", "") testutil.Ok(t, err) testutil.Ok(t, s.StartAndWaitReady(q)) @@ -325,7 +325,7 @@ func TestReceive(t *testing.T) { testutil.Ok(t, err) testutil.Ok(t, s.StartAndWaitReady(prom1)) - q, err := e2ethanos.NewQuerier(s.SharedDir(), "1", []string{r1.GRPCNetworkEndpoint(), r2.GRPCNetworkEndpoint()}, nil, nil, nil, "", "") + q, err := e2ethanos.NewQuerier(s.SharedDir(), "1", []string{r1.GRPCNetworkEndpoint(), r2.GRPCNetworkEndpoint()}, nil, nil, nil, nil, "", "") testutil.Ok(t, err) testutil.Ok(t, s.StartAndWaitReady(q)) @@ -400,7 +400,7 @@ func TestReceive(t *testing.T) { testutil.Ok(t, s.StartAndWaitReady(prom1)) testutil.Ok(t, s.StartAndWaitReady(prom2)) - q, err := e2ethanos.NewQuerier(s.SharedDir(), "1", []string{r1.GRPCNetworkEndpoint()}, nil, nil, nil, "", "") + q, err := e2ethanos.NewQuerier(s.SharedDir(), "1", []string{r1.GRPCNetworkEndpoint()}, nil, nil, nil, nil, "", "") testutil.Ok(t, err) testutil.Ok(t, s.StartAndWaitReady(q)) ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute) diff --git a/test/e2e/rule_test.go b/test/e2e/rule_test.go index 0417ca6e299..5ea489298b0 100644 --- a/test/e2e/rule_test.go +++ b/test/e2e/rule_test.go @@ -291,7 +291,7 @@ func TestRule_AlertmanagerHTTPClient(t *testing.T) { { EndpointsConfig: http_util.EndpointsConfig{ StaticAddresses: func() []string { - q, err := e2ethanos.NewQuerier(s.SharedDir(), "1", nil, nil, nil, nil, "", "") + q, err := e2ethanos.NewQuerier(s.SharedDir(), "1", nil, nil, nil, nil, nil, "", "") testutil.Ok(t, err) return []string{q.NetworkHTTPEndpointFor(s.NetworkName())} }(), @@ -302,7 +302,7 @@ func TestRule_AlertmanagerHTTPClient(t *testing.T) { testutil.Ok(t, err) testutil.Ok(t, s.StartAndWaitReady(r)) - q, err := e2ethanos.NewQuerier(s.SharedDir(), "1", []string{r.GRPCNetworkEndpoint()}, nil, nil, nil, "", "") + q, err := e2ethanos.NewQuerier(s.SharedDir(), "1", []string{r.GRPCNetworkEndpoint()}, nil, nil, nil, nil, "", "") testutil.Ok(t, err) testutil.Ok(t, s.StartAndWaitReady(q)) @@ -383,7 +383,7 @@ func TestRule(t *testing.T) { testutil.Ok(t, err) testutil.Ok(t, s.StartAndWaitReady(r)) - q, err := e2ethanos.NewQuerier(s.SharedDir(), "1", []string{r.GRPCNetworkEndpoint()}, nil, nil, nil, "", "") + q, err := e2ethanos.NewQuerier(s.SharedDir(), "1", []string{r.GRPCNetworkEndpoint()}, nil, nil, nil, nil, "", "") testutil.Ok(t, err) testutil.Ok(t, s.StartAndWaitReady(q)) diff --git a/test/e2e/rules_api_test.go b/test/e2e/rules_api_test.go index 67d18c2b410..594da162323 100644 --- a/test/e2e/rules_api_test.go +++ b/test/e2e/rules_api_test.go @@ -70,6 +70,7 @@ func TestRulesAPI_Fanout(t *testing.T) { nil, []string{sidecar1.GRPCNetworkEndpoint(), sidecar2.GRPCNetworkEndpoint(), r1.GRPCNetworkEndpoint(), r2.GRPCNetworkEndpoint()}, nil, + nil, "", "", ) diff --git a/test/e2e/store_gateway_test.go b/test/e2e/store_gateway_test.go index 1cf4d61341f..d25c9ca19c9 100644 --- a/test/e2e/store_gateway_test.go +++ b/test/e2e/store_gateway_test.go @@ -62,7 +62,7 @@ func TestStoreGateway(t *testing.T) { // Ensure bucket UI. ensureGETStatusCode(t, http.StatusOK, "http://"+path.Join(s1.HTTPEndpoint(), "loaded")) - q, err := e2ethanos.NewQuerier(s.SharedDir(), "1", []string{s1.GRPCNetworkEndpoint()}, nil, nil, nil, "", "") + q, err := e2ethanos.NewQuerier(s.SharedDir(), "1", []string{s1.GRPCNetworkEndpoint()}, nil, nil, nil, nil, "", "") testutil.Ok(t, err) testutil.Ok(t, s.StartAndWaitReady(q)) diff --git a/test/e2e/targets_api_test.go b/test/e2e/targets_api_test.go new file mode 100644 index 00000000000..87494f0b710 --- /dev/null +++ b/test/e2e/targets_api_test.go @@ -0,0 +1,140 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package e2e_test + +import ( + "context" + "fmt" + "reflect" + "sort" + "testing" + "time" + + "github.com/cortexproject/cortex/integration/e2e" + "github.com/pkg/errors" + + "github.com/thanos-io/thanos/pkg/promclient" + "github.com/thanos-io/thanos/pkg/runutil" + "github.com/thanos-io/thanos/pkg/store/labelpb" + "github.com/thanos-io/thanos/pkg/targets/targetspb" + "github.com/thanos-io/thanos/pkg/testutil" + "github.com/thanos-io/thanos/test/e2e/e2ethanos" +) + +func TestTargetsAPI_Fanout(t *testing.T) { + t.Parallel() + + netName := "e2e_test_targets_fanout" + + s, err := e2e.NewScenario(netName) + testutil.Ok(t, err) + t.Cleanup(e2ethanos.CleanScenario(t, s)) + + // 2x Prometheus. + prom1, sidecar1, err := e2ethanos.NewPrometheusWithSidecar( + s.SharedDir(), + netName, + "prom1", + defaultPromConfig("ha", 0, "", "", "localhost:9090", "localhost:80"), + e2ethanos.DefaultPrometheusImage(), + ) + testutil.Ok(t, err) + prom2, sidecar2, err := e2ethanos.NewPrometheusWithSidecar( + s.SharedDir(), + netName, + "prom2", + defaultPromConfig("ha", 1, "", "", "localhost:9090", "localhost:80"), + e2ethanos.DefaultPrometheusImage(), + ) + testutil.Ok(t, err) + testutil.Ok(t, s.StartAndWaitReady(prom1, sidecar1, prom2, sidecar2)) + + q, err := e2ethanos.NewQuerier( + s.SharedDir(), + "query", + []string{sidecar1.GRPCNetworkEndpoint(), sidecar2.GRPCNetworkEndpoint()}, + nil, + nil, + []string{sidecar1.GRPCNetworkEndpoint(), sidecar2.GRPCNetworkEndpoint()}, + nil, + "", + "", + ) + + testutil.Ok(t, err) + testutil.Ok(t, s.StartAndWaitReady(q)) + + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute) + t.Cleanup(cancel) + + testutil.Ok(t, q.WaitSumMetricsWithOptions(e2e.Equals(2), []string{"thanos_store_nodes_grpc_connections"}, e2e.WaitMissingMetrics)) + + targetAndAssert(t, ctx, q.HTTPEndpoint(), "", &targetspb.TargetDiscovery{ + ActiveTargets: []*targetspb.ActiveTarget{ + { + DiscoveredLabels: labelpb.ZLabelSet{Labels: []labelpb.ZLabel{ + {Name: "__address__", Value: "localhost:9090"}, + {Name: "__metrics_path__", Value: "/metrics"}, + {Name: "__scheme__", Value: "http"}, + {Name: "job", Value: "myself"}, + {Name: "prometheus", Value: "ha"}, + }}, + Labels: labelpb.ZLabelSet{Labels: []labelpb.ZLabel{ + {Name: "instance", Value: "localhost:9090"}, + {Name: "job", Value: "myself"}, + {Name: "prometheus", Value: "ha"}, + }}, + ScrapePool: "myself", + ScrapeUrl: "http://localhost:9090/metrics", + Health: targetspb.TargetHealth_UP, + }, + }, + DroppedTargets: []*targetspb.DroppedTarget{ + { + DiscoveredLabels: labelpb.ZLabelSet{Labels: []labelpb.ZLabel{ + {Name: "__address__", Value: "localhost:80"}, + {Name: "__metrics_path__", Value: "/metrics"}, + {Name: "__scheme__", Value: "http"}, + {Name: "job", Value: "myself"}, + {Name: "prometheus", Value: "ha"}, + }}, + }, + }, + }) +} + +func targetAndAssert(t *testing.T, ctx context.Context, addr string, state string, want *targetspb.TargetDiscovery) { + t.Helper() + + fmt.Println("targetAndAssert: Waiting for results for targets state", state) + + testutil.Ok(t, runutil.Retry(time.Second, ctx.Done(), func() error { + res, err := promclient.NewDefaultClient().TargetsInGRPC(ctx, mustURLParse(t, "http://"+addr), state) + if err != nil { + return err + } + + if len(res.ActiveTargets) != len(want.ActiveTargets) { + return errors.Errorf("unexpected result.ActiveTargets size, want %d; got: %d result: %v", len(want.ActiveTargets), len(res.ActiveTargets), res) + } + + if len(res.DroppedTargets) != len(want.DroppedTargets) { + return errors.Errorf("unexpected result.DroppedTargets size, want %d; got: %d result: %v", len(want.DroppedTargets), len(res.DroppedTargets), res) + } + + for it := range res.ActiveTargets { + res.ActiveTargets[it].LastScrape = time.Time{} + res.ActiveTargets[it].LastScrapeDuration = 0 + } + + sort.Slice(res.ActiveTargets, func(i, j int) bool { return res.ActiveTargets[i].Compare(res.ActiveTargets[j]) < 0 }) + sort.Slice(res.DroppedTargets, func(i, j int) bool { return res.DroppedTargets[i].Compare(res.DroppedTargets[j]) < 0 }) + + if !reflect.DeepEqual(want, res) { + return errors.Errorf("unexpected result\nwant %v\ngot: %v", want, res) + } + + return nil + })) +}