Skip to content

Commit

Permalink
query: add endpointset flow (#4421)
Browse files Browse the repository at this point in the history
* Create endpoint flow

Signed-off-by: Hitanshu Mehta <hitanshu99amehta@gmail.com>

* add unit test for endpointSet

Signed-off-by: Hitanshu Mehta <hitanshu99amehta@gmail.com>

* lint fixes

Signed-off-by: Hitanshu Mehta <hitanshu99amehta@gmail.com>

* fix typo

Signed-off-by: Hitanshu Mehta <hitanshu99amehta@gmail.com>

* remove code smells

Signed-off-by: Hitanshu Mehta <hitanshu99amehta@gmail.com>

* start using endpointset instead of storeset

Signed-off-by: Hitanshu Mehta <hitanshu99amehta@gmail.com>

* remove storeset

Signed-off-by: Hitanshu Mehta <hitanshu99amehta@gmail.com>

* minor nits

Signed-off-by: Hitanshu Mehta <hitanshu99amehta@gmail.com>

* Fix failing e2e tests

Signed-off-by: Hitanshu Mehta <hitanshu99amehta@gmail.com>

* improve logging

Signed-off-by: Hitanshu Mehta <hitanshu99amehta@gmail.com>

* fix comment

Signed-off-by: Hitanshu Mehta <hitanshu99amehta@gmail.com>
  • Loading branch information
hitanshu-mehta authored Aug 31, 2021
1 parent fdfc077 commit 8862ad5
Show file tree
Hide file tree
Showing 10 changed files with 3,561 additions and 1,907 deletions.
71 changes: 23 additions & 48 deletions cmd/thanos/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -375,60 +375,35 @@ func runQuery(
)

var (
stores = query.NewStoreSet(
endpoints = query.NewEndpointSet(
logger,
reg,
func() (specs []query.StoreSpec) {

func() (specs []query.EndpointSpec) {
// Add strict & static nodes.
for _, addr := range strictStores {
specs = append(specs, query.NewGRPCStoreSpec(addr, true))
}
// Add DNS resolved addresses from static flags and file SD.
for _, addr := range dnsStoreProvider.Addresses() {
specs = append(specs, query.NewGRPCStoreSpec(addr, false))
}
return removeDuplicateStoreSpecs(logger, duplicatedStores, specs)
},
func() (specs []query.RuleSpec) {
for _, addr := range dnsRuleProvider.Addresses() {
specs = append(specs, query.NewGRPCStoreSpec(addr, false))
specs = append(specs, query.NewGRPCEndpointSpec(addr, true))
}

// NOTE(s-urbaniak): No need to remove duplicates, as rule apis are a subset of store apis.
// hence, any duplicates will be tracked in the store api set.

return specs
},
func() (specs []query.TargetSpec) {
for _, addr := range dnsTargetProvider.Addresses() {
specs = append(specs, query.NewGRPCStoreSpec(addr, false))
}
for _, dnsProvider := range []*dns.Provider{dnsStoreProvider, dnsRuleProvider, dnsExemplarProvider, dnsMetadataProvider, dnsTargetProvider} {
var tmpSpecs []query.EndpointSpec

return specs
},
func() (specs []query.MetadataSpec) {
for _, addr := range dnsMetadataProvider.Addresses() {
specs = append(specs, query.NewGRPCStoreSpec(addr, false))
}

return specs
},
func() (specs []query.ExemplarSpec) {
for _, addr := range dnsExemplarProvider.Addresses() {
specs = append(specs, query.NewGRPCStoreSpec(addr, false))
for _, addr := range dnsProvider.Addresses() {
tmpSpecs = append(tmpSpecs, query.NewGRPCEndpointSpec(addr, false))
}
tmpSpecs = removeDuplicateEndpointSpecs(logger, duplicatedStores, tmpSpecs)
specs = append(specs, tmpSpecs...)
}

return specs
},
dialOpts,
unhealthyStoreTimeout,
)
proxy = store.NewProxyStore(logger, reg, stores.Get, component.Query, selectorLset, storeResponseTimeout)
rulesProxy = rules.NewProxy(logger, stores.GetRulesClients)
targetsProxy = targets.NewProxy(logger, stores.GetTargetsClients)
metadataProxy = metadata.NewProxy(logger, stores.GetMetadataClients)
exemplarsProxy = exemplars.NewProxy(logger, stores.GetExemplarsStores, selectorLset)
proxy = store.NewProxyStore(logger, reg, endpoints.GetStoreClients, component.Query, selectorLset, storeResponseTimeout)
rulesProxy = rules.NewProxy(logger, endpoints.GetRulesClients)
targetsProxy = targets.NewProxy(logger, endpoints.GetTargetsClients)
metadataProxy = metadata.NewProxy(logger, endpoints.GetMetricMetadataClients)
exemplarsProxy = exemplars.NewProxy(logger, endpoints.GetExemplarsStores, selectorLset)
queryableCreator = query.NewQueryableCreator(
logger,
extprom.WrapRegistererWithPrefix("thanos_query_", reg),
Expand All @@ -454,12 +429,12 @@ func runQuery(
ctx, cancel := context.WithCancel(context.Background())
g.Add(func() error {
return runutil.Repeat(5*time.Second, ctx.Done(), func() error {
stores.Update(ctx)
endpoints.Update(ctx)
return nil
})
}, func(error) {
cancel()
stores.Close()
endpoints.Close()
})
}
// Run File Service Discovery and update the store set when the files are modified.
Expand All @@ -486,7 +461,7 @@ func runQuery(
continue
}
fileSDCache.Update(update)
stores.Update(ctxUpdate)
endpoints.Update(ctxUpdate)

if err := dnsStoreProvider.Resolve(ctxUpdate, append(fileSDCache.Addresses(), storeAddrs...)); err != nil {
level.Error(logger).Log("msg", "failed to resolve addresses for storeAPIs", "err", err)
Expand Down Expand Up @@ -562,11 +537,11 @@ func runQuery(

ins := extpromhttp.NewInstrumentationMiddleware(reg, nil)
// TODO(bplotka in PR #513 review): pass all flags, not only the flags needed by prefix rewriting.
ui.NewQueryUI(logger, stores, webExternalPrefix, webPrefixHeaderName).Register(router, ins)
ui.NewQueryUI(logger, endpoints, webExternalPrefix, webPrefixHeaderName).Register(router, ins)

api := v1.NewQueryAPI(
logger,
stores,
endpoints,
engineFactory(promql.NewEngine, engineOpts, dynamicLookbackDelta),
queryableCreator,
// NOTE: Will share the same replica label as the query for now.
Expand Down Expand Up @@ -644,8 +619,8 @@ func runQuery(
return nil
}

func removeDuplicateStoreSpecs(logger log.Logger, duplicatedStores prometheus.Counter, specs []query.StoreSpec) []query.StoreSpec {
set := make(map[string]query.StoreSpec)
func removeDuplicateEndpointSpecs(logger log.Logger, duplicatedStores prometheus.Counter, specs []query.EndpointSpec) []query.EndpointSpec {
set := make(map[string]query.EndpointSpec)
for _, spec := range specs {
addr := spec.Addr()
if _, ok := set[addr]; ok {
Expand All @@ -654,7 +629,7 @@ func removeDuplicateStoreSpecs(logger log.Logger, duplicatedStores prometheus.Co
}
set[addr] = spec
}
deduplicated := make([]query.StoreSpec, 0, len(set))
deduplicated := make([]query.EndpointSpec, 0, len(set))
for _, value := range set {
deduplicated = append(deduplicated, value)
}
Expand Down
12 changes: 6 additions & 6 deletions pkg/api/query/v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ type QueryAPI struct {
disableCORS bool

replicaLabels []string
storeSet *query.StoreSet
endpointSet *query.EndpointSet

defaultRangeQueryStep time.Duration
defaultInstantQueryMaxSourceResolution time.Duration
Expand All @@ -106,7 +106,7 @@ type QueryAPI struct {
// NewQueryAPI returns an initialized QueryAPI type.
func NewQueryAPI(
logger log.Logger,
storeSet *query.StoreSet,
endpointSet *query.EndpointSet,
qe func(int64) *promql.Engine,
c query.QueryableCreator,
ruleGroups rules.UnaryClient,
Expand Down Expand Up @@ -144,7 +144,7 @@ func NewQueryAPI(
enableTargetPartialResponse: enableTargetPartialResponse,
enableMetricMetadataPartialResponse: enableMetricMetadataPartialResponse,
replicaLabels: replicaLabels,
storeSet: storeSet,
endpointSet: endpointSet,
defaultRangeQueryStep: defaultRangeQueryStep,
defaultInstantQueryMaxSourceResolution: defaultInstantQueryMaxSourceResolution,
defaultMetadataTimeRange: defaultMetadataTimeRange,
Expand Down Expand Up @@ -701,9 +701,9 @@ func (qapi *QueryAPI) labelNames(r *http.Request) (interface{}, []error, *api.Ap
}

func (qapi *QueryAPI) stores(_ *http.Request) (interface{}, []error, *api.ApiError) {
statuses := make(map[string][]query.StoreStatus)
for _, status := range qapi.storeSet.GetStoreStatus() {
statuses[status.StoreType.String()] = append(statuses[status.StoreType.String()], status)
statuses := make(map[string][]query.EndpointStatus)
for _, status := range qapi.endpointSet.GetEndpointStatus() {
statuses[status.ComponentType.String()] = append(statuses[status.ComponentType.String()], status)
}
return statuses, nil, nil
}
Expand Down
19 changes: 19 additions & 0 deletions pkg/component/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,25 @@ func FromProto(storeType storepb.StoreType) StoreAPI {
}
}

func FromString(storeType string) StoreAPI {
switch storeType {
case "query":
return Query
case "rule":
return Rule
case "sidecar":
return Sidecar
case "store":
return Store
case "receive":
return Receive
case "debug":
return Debug
default:
return UnknownStoreAPI
}
}

var (
Bucket = source{component: component{name: "bucket"}}
Cleanup = source{component: component{name: "cleanup"}}
Expand Down
Loading

0 comments on commit 8862ad5

Please sign in to comment.