Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

query: add strict mode flag #2252

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@ We use *breaking* word for marking changes that are not backward compatible (rel

- [#2238](https://github.com/thanos-io/thanos/pull/2238) Ruler: Fixed Issue #2204 bug in alert queue signalling filled up queue and alerts were dropped

### Added

- [#2252](https://github.com/thanos-io/thanos/pull/2252) Query: add new `--store.strict-mode` flag. More information available [here](/docs/proposals/202001_thanos_query_health_handling.md).

## [v0.11.0](https://github.com/thanos-io/thanos/releases/tag/v0.11.0-rc.1) - 2020.03.02

### Fixed
Expand Down
22 changes: 16 additions & 6 deletions cmd/thanos/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ func registerQuery(m map[string]setupFunc, app *kingpin.Application) {
replicaLabels := cmd.Flag("query.replica-label", "Labels to treat as a replica indicator along which data is deduplicated. Still you will be able to query without deduplication using 'dedup=false' parameter.").
Strings()

strictMode := cmd.Flag("store.strict-mode", "Enable strict mode which makes Thanos Query always keep statically specified StoreAPIs around.").Default("false").Bool()
GiedriusS marked this conversation as resolved.
Show resolved Hide resolved

instantDefaultMaxSourceResolution := modelDuration(cmd.Flag("query.instant.default.max_source_resolution", "default value for max_source_resolution for instant queries. If not set, defaults to 0s only taking raw resolution into account. 1h can be a good value if you use instant queries over time ranges that incorporate times outside of your raw-retention.").Default("0s").Hidden())

selectorLabels := cmd.Flag("selector-label", "Query selector labels that will be exposed in info endpoint (repeated).").
Expand Down Expand Up @@ -162,6 +164,7 @@ func registerQuery(m map[string]setupFunc, app *kingpin.Application) {
*dnsSDResolver,
time.Duration(*unhealthyStoreTimeout),
time.Duration(*instantDefaultMaxSourceResolution),
*strictMode,
component.Query,
)
}
Expand Down Expand Up @@ -202,6 +205,7 @@ func runQuery(
dnsSDResolver string,
unhealthyStoreTimeout time.Duration,
instantDefaultMaxSourceResolution time.Duration,
strictMode bool,
comp component.Component,
) error {
// TODO(bplotka in PR #513 review): Move arguments into struct.
Expand All @@ -222,14 +226,20 @@ func runQuery(
dns.ResolverType(dnsSDResolver),
)

staticStores, dynamicStores := dns.FilterStaticNodes(storeAddrs...)
GiedriusS marked this conversation as resolved.
Show resolved Hide resolved

var (
stores = query.NewStoreSet(
logger,
reg,
func() (specs []query.StoreSpec) {
// Add DNS resolved addresses from static flags and file SD.
// Add DNS resolved addresses.
for _, addr := range dnsProvider.Addresses() {
specs = append(specs, query.NewGRPCStoreSpec(addr))
specs = append(specs, query.NewGRPCStoreSpec(addr, false))
}
// Add static nodes.
for _, addr := range staticStores {
specs = append(specs, query.NewGRPCStoreSpec(addr, true))
}

specs = removeDuplicateStoreSpecs(logger, duplicatedStores, specs)
Expand Down Expand Up @@ -257,7 +267,7 @@ func runQuery(
ctx, cancel := context.WithCancel(context.Background())
g.Add(func() error {
return runutil.Repeat(5*time.Second, ctx.Done(), func() error {
stores.Update(ctx)
stores.Update(ctx, strictMode)
return nil
})
}, func(error) {
Expand Down Expand Up @@ -289,8 +299,8 @@ func runQuery(
continue
}
fileSDCache.Update(update)
stores.Update(ctxUpdate)
dnsProvider.Resolve(ctxUpdate, append(fileSDCache.Addresses(), storeAddrs...))
stores.Update(ctxUpdate, strictMode)
dnsProvider.Resolve(ctxUpdate, append(fileSDCache.Addresses(), dynamicStores...))
case <-ctxUpdate.Done():
return nil
}
Expand All @@ -305,7 +315,7 @@ func runQuery(
ctx, cancel := context.WithCancel(context.Background())
g.Add(func() error {
return runutil.Repeat(dnsSDInterval, ctx.Done(), func() error {
dnsProvider.Resolve(ctx, append(fileSDCache.Addresses(), storeAddrs...))
dnsProvider.Resolve(ctx, append(fileSDCache.Addresses(), dynamicStores...))
return nil
})
}, func(error) {
Expand Down
3 changes: 3 additions & 0 deletions docs/components/query.md
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,9 @@ Flags:
which data is deduplicated. Still you will be
able to query without deduplication using
'dedup=false' parameter.
--store.strict-mode Enable strict mode which makes Thanos Query
always keep statically specified StoreAPIs
around.
--selector-label=<name>="<value>" ...
Query selector labels that will be exposed in
info endpoint (repeated).
Expand Down
2 changes: 1 addition & 1 deletion docs/proposals/202001_thanos_query_health_handling.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
title: Thanos Query store nodes healthiness handling
type: proposal
menu: proposals
status: accepted
status: complete
owner: GiedriusS
---

Expand Down
36 changes: 30 additions & 6 deletions pkg/discovery/dns/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,32 @@ func (p *Provider) Clone() *Provider {
}
}

// FilterStaticNodes walks through the whole list of addresses and returns
// two lists of statically and dynamically defined nodes.
func FilterStaticNodes(addrs ...string) (static []string, dynamic []string) {
static, dynamic = []string{}, []string{}

for _, addr := range addrs {
qtype, _ := GetQTypeName(addr)
if qtype != "" {
dynamic = append(dynamic, addr)
} else {
static = append(static, addr)
}
}
return
}

// GetQTypeName splits the provided addr into two parts: the QType (if any)
// and the name.
func GetQTypeName(addr string) (qtype string, name string) {
qtypeAndName := strings.SplitN(addr, "+", 2)
if len(qtypeAndName) != 2 {
return "", addr
}
return qtypeAndName[0], qtypeAndName[1]
}

// Resolve stores a list of provided addresses or their DNS records if requested.
// Addresses prefixed with `dns+` or `dnssrv+` will be resolved through respective DNS lookup (A/AAAA or SRV).
// defaultPort is used for non-SRV records when a port is not supplied.
Expand All @@ -100,14 +126,12 @@ func (p *Provider) Resolve(ctx context.Context, addrs []string) {
resolvedAddrs := map[string][]string{}
for _, addr := range addrs {
var resolved []string
qtypeAndName := strings.SplitN(addr, "+", 2)
if len(qtypeAndName) != 2 {
// No lookup specified. Add to results and continue to the next address.
resolvedAddrs[addr] = []string{addr}
p.resolverAddrs.WithLabelValues(addr).Set(1.0)
qtype, name := GetQTypeName(addr)
if qtype == "" {
resolvedAddrs[name] = []string{name}
p.resolverAddrs.WithLabelValues(name).Set(1.0)
continue
}
qtype, name := qtypeAndName[0], qtypeAndName[1]

resolved, err := p.resolver.Resolve(ctx, name, QType(qtype))
p.resolverLookupsCount.Inc()
Expand Down
33 changes: 33 additions & 0 deletions pkg/discovery/dns/provider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,3 +102,36 @@ func (d *mockResolver) Resolve(_ context.Context, name string, _ QType) ([]strin
}
return d.res[name], nil
}

// TestFilterStaticNodes tests if the provided nodes are separated correctly
// into static nodes and dynamic ones.
func TestFilterStaticNodes(t *testing.T) {
for _, tcase := range []struct {
nodes []string
expectedStatic []string
expectedDynamic []string
}{
// All valid cases.
{
nodes: []string{"1.2.3.4", "dns+1.2.3.4", "dnssrv+13.3.3.3", "dnssrvnoa+1.1.1.1"},
expectedStatic: []string{"1.2.3.4"},
expectedDynamic: []string{"dns+1.2.3.4", "dnssrv+13.3.3.3", "dnssrvnoa+1.1.1.1"},
},
// Negative test that will be caught later on.
{
nodes: []string{"gibberish+1.1.1.1+noa"},
expectedStatic: []string{},
expectedDynamic: []string{"gibberish+1.1.1.1+noa"},
},
// Negative test with no nodes.
{
nodes: []string{},
expectedStatic: []string{},
expectedDynamic: []string{},
},
} {
gotStatic, gotDynamic := FilterStaticNodes(tcase.nodes...)
testutil.Equals(t, tcase.expectedStatic, gotStatic, "mismatch between static nodes")
testutil.Equals(t, tcase.expectedDynamic, gotDynamic, "mismatch between dynamic nodes")
}
}
59 changes: 39 additions & 20 deletions pkg/query/storeset.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ type StoreSpec interface {
// NOTE: It is implementation responsibility to retry until context timeout, but a caller responsibility to manage
// given store connection.
Metadata(ctx context.Context, client storepb.StoreClient) (labelSets []storepb.LabelSet, mint int64, maxt int64, storeType component.StoreAPI, err error)
// Returns true if the StoreAPI has been statically defined.
Static() bool
}

type StoreStatus struct {
Expand All @@ -49,13 +51,19 @@ type StoreStatus struct {
}

type grpcStoreSpec struct {
addr string
addr string
static bool
}

// NewGRPCStoreSpec creates store pure gRPC spec.
// It uses Info gRPC call to get Metadata.
func NewGRPCStoreSpec(addr string) StoreSpec {
return &grpcStoreSpec{addr: addr}
func NewGRPCStoreSpec(addr string, static bool) StoreSpec {
return &grpcStoreSpec{addr: addr, static: static}
}

// Static returns true if the StoreAPI node has been statically defined.
func (s *grpcStoreSpec) Static() bool {
return s.static
}

func (s *grpcStoreSpec) Addr() string {
Expand Down Expand Up @@ -320,8 +328,8 @@ func newStoreAPIStats() map[component.StoreAPI]map[string]int {
}

// Update updates the store set. It fetches current list of store specs from function and updates the fresh metadata
// from all stores.
func (s *StoreSet) Update(ctx context.Context) {
// from all stores. If strictMode is true then it keeps around statically defined nodes.
func (s *StoreSet) Update(ctx context.Context, strictMode bool) {
s.updateMtx.Lock()
defer s.updateMtx.Unlock()

Expand All @@ -334,14 +342,14 @@ func (s *StoreSet) Update(ctx context.Context) {

level.Debug(s.logger).Log("msg", "starting updating storeAPIs", "cachedStores", len(stores))

healthyStores := s.getHealthyStores(ctx, stores)
level.Debug(s.logger).Log("msg", "checked requested storeAPIs", "healthyStores", len(healthyStores), "cachedStores", len(stores))
activeStores := s.getActiveStores(ctx, stores, strictMode)
level.Debug(s.logger).Log("msg", "checked requested storeAPIs", "activeStores", len(activeStores), "cachedStores", len(stores))

stats := newStoreAPIStats()

// Close stores that where not healthy this time (are not in healthy stores map).
// Close stores that where not active this time (are not in active stores map).
for addr, st := range stores {
if _, ok := healthyStores[addr]; ok {
if _, ok := activeStores[addr]; ok {
stats[st.StoreType()][st.LabelSetsString()]++
continue
}
Expand All @@ -353,7 +361,7 @@ func (s *StoreSet) Update(ctx context.Context) {
}

// Add stores that are not yet in stores.
for addr, st := range healthyStores {
for addr, st := range activeStores {
if _, ok := stores[addr]; ok {
continue
}
Expand Down Expand Up @@ -384,15 +392,15 @@ func (s *StoreSet) Update(ctx context.Context) {
s.cleanUpStoreStatuses(stores)
}

func (s *StoreSet) getHealthyStores(ctx context.Context, stores map[string]*storeRef) map[string]*storeRef {
func (s *StoreSet) getActiveStores(ctx context.Context, stores map[string]*storeRef, strictMode bool) map[string]*storeRef {
var (
unique = make(map[string]struct{})
healthyStores = make(map[string]*storeRef, len(stores))
mtx sync.Mutex
wg sync.WaitGroup
unique = make(map[string]struct{})
activeStores = make(map[string]*storeRef, len(stores))
mtx sync.Mutex
wg sync.WaitGroup
)

// Gather healthy stores map concurrently. Build new store if does not exist already.
// Gather active stores map concurrently. Build new store if does not exist already.
for _, storeSpec := range s.storeSpecs() {
if _, ok := unique[storeSpec.Addr()]; ok {
level.Warn(s.logger).Log("msg", "duplicated address in store nodes", "address", storeSpec.Addr())
Expand All @@ -411,7 +419,7 @@ func (s *StoreSet) getHealthyStores(ctx context.Context, stores map[string]*stor

st, seenAlready := stores[addr]
if !seenAlready {
// New store or was unhealthy and was removed in the past - create new one.
// New store or was unactive and was removed in the past - create new one.
conn, err := grpc.DialContext(ctx, addr, s.dialOpts...)
if err != nil {
s.updateStoreStatus(&storeRef{addr: addr}, err)
Expand All @@ -425,25 +433,36 @@ func (s *StoreSet) getHealthyStores(ctx context.Context, stores map[string]*stor
labelSets, minTime, maxTime, storeType, err := spec.Metadata(ctx, st.StoreClient)
if err != nil {
if !seenAlready {
// Close only if new. Unhealthy `s.stores` will be closed later on.
// Close only if new. Unactive `s.stores` will be closed later on.
st.Close()
}
s.updateStoreStatus(st, err)
level.Warn(s.logger).Log("msg", "update of store node failed", "err", errors.Wrap(err, "getting metadata"), "address", addr)

if !(strictMode && spec.Static()) {
return
}

// Still keep it around if static & strict mode enabled.
mtx.Lock()
defer mtx.Unlock()

activeStores[addr] = st
return
}

s.updateStoreStatus(st, nil)
st.Update(labelSets, minTime, maxTime, storeType)

mtx.Lock()
defer mtx.Unlock()

healthyStores[addr] = st
activeStores[addr] = st
}(storeSpec)
}
wg.Wait()

return healthyStores
return activeStores
}

func (s *StoreSet) updateStoreStatus(store *storeRef, err error) {
Expand Down
Loading