Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Resort store response set on internal label dedup #6317

Merged
merged 14 commits into from
Aug 10, 2023
15 changes: 15 additions & 0 deletions cmd/thanos/receive.go
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,21 @@ func runReceive(
grpcserver.WithTLSConfig(tlsCfg),
)

ctx, cancel := context.WithCancel(context.Background())
level.Debug(logger).Log("msg", "setting up periodic update for label names")
g.Add(func() error {
return runutil.Repeat(10*time.Second, ctx.Done(), func() error {
level.Debug(logger).Log("msg", "Starting label names update")

dbs.UpdateLabelNames(ctx)

level.Debug(logger).Log("msg", "Finished label names update")
return nil
})
}, func(err error) {
cancel()
})

g.Add(
func() error {
level.Info(logger).Log("msg", "listening for StoreAPI and WritableStoreAPI gRPC", "address", conf.grpcConfig.bindAddress)
Expand Down
58 changes: 49 additions & 9 deletions cmd/thanos/sidecar.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ import (
"github.com/thanos-io/thanos/pkg/shipper"
"github.com/thanos-io/thanos/pkg/store"
"github.com/thanos-io/thanos/pkg/store/labelpb"
"github.com/thanos-io/thanos/pkg/stringset"
"github.com/thanos-io/thanos/pkg/targets"
"github.com/thanos-io/thanos/pkg/tls"
)
Expand Down Expand Up @@ -112,8 +113,9 @@ func runSidecar(
mint: conf.limitMinTime.PrometheusTimestamp(),
maxt: math.MaxInt64,

limitMinTime: conf.limitMinTime,
client: promclient.NewWithTracingClient(logger, httpClient, "thanos-sidecar"),
limitMinTime: conf.limitMinTime,
client: promclient.NewWithTracingClient(logger, httpClient, "thanos-sidecar"),
labelNamesSet: stringset.AllStrings(),
}

confContentYaml, err := conf.objStore.Content()
Expand Down Expand Up @@ -237,6 +239,19 @@ func runSidecar(
}, func(error) {
cancel()
})

g.Add(func() error {
return runutil.Repeat(10*time.Second, ctx.Done(), func() error {
level.Debug(logger).Log("msg", "Starting label names update")

m.UpdateLabelNames(context.Background())

level.Debug(logger).Log("msg", "Finished label names update")
return nil
})
}, func(err error) {
cancel()
})
}
{
ctx, cancel := context.WithCancel(context.Background())
Expand All @@ -249,7 +264,7 @@ func runSidecar(
{
c := promclient.NewWithTracingClient(logger, httpClient, httpconfig.ThanosUserAgent)

promStore, err := store.NewPrometheusStore(logger, reg, c, conf.prometheus.url, component.Sidecar, m.Labels, m.Timestamps, m.Version)
promStore, err := store.NewPrometheusStore(logger, reg, c, conf.prometheus.url, component.Sidecar, m.Labels, m.Timestamps, m.LabelNamesSet, m.Version)
if err != nil {
return errors.Wrap(err, "create Prometheus store")
}
Expand Down Expand Up @@ -411,15 +426,16 @@ func validatePrometheus(ctx context.Context, client *promclient.Client, logger l
type promMetadata struct {
promURL *url.URL

mtx sync.Mutex
mint int64
maxt int64
labels labels.Labels
promVersion string

mtx sync.Mutex
mint int64
maxt int64
labels labels.Labels
promVersion string
limitMinTime thanosmodel.TimeOrDurationValue

client *promclient.Client

labelNamesSet stringset.Set
}

func (s *promMetadata) UpdateLabels(ctx context.Context) error {
Expand Down Expand Up @@ -447,6 +463,30 @@ func (s *promMetadata) UpdateTimestamps(mint, maxt int64) {
s.maxt = maxt
}

func (s *promMetadata) UpdateLabelNames(ctx context.Context) {
mint, _ := s.Timestamps()
labelNames, err := s.client.LabelNamesInGRPC(ctx, s.promURL, nil, mint, time.Now().UnixMilli())
if err != nil {
s.mtx.Lock()
defer s.mtx.Unlock()

s.labelNamesSet = stringset.AllStrings()
return
}

filter := stringset.NewFromStrings(labelNames...)
s.mtx.Lock()
s.labelNamesSet = filter
s.mtx.Unlock()
}

func (s *promMetadata) LabelNamesSet() stringset.Set {
s.mtx.Lock()
defer s.mtx.Unlock()

return s.labelNamesSet
}

func (s *promMetadata) Labels() labels.Labels {
s.mtx.Lock()
defer s.mtx.Unlock()
Expand Down
18 changes: 18 additions & 0 deletions cmd/thanos/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -495,6 +495,24 @@ func runStore(
s.Shutdown(err)
})
}

{
ctx, cancel := context.WithCancel(context.Background())
level.Debug(logger).Log("msg", "setting up periodic update for label names")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
level.Debug(logger).Log("msg", "setting up periodic update for label names")
level.Info(logger).Log("msg", "setting up periodic update for label names")

I guess info would make sense here

g.Add(func() error {
return runutil.Repeat(10*time.Second, ctx.Done(), func() error {
level.Debug(logger).Log("msg", "Starting label names update")
fpetkovski marked this conversation as resolved.
Show resolved Hide resolved

bs.UpdateLabelNames()

level.Debug(logger).Log("msg", "Finished label names update")
return nil
})
}, func(err error) {
cancel()
})

}
// Add bucket UI for loaded blocks.
{
ins := extpromhttp.NewInstrumentationMiddleware(reg, nil)
Expand Down
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -118,12 +118,14 @@ require (

require (
github.com/onsi/gomega v1.27.10
github.com/seiflotfy/cuckoofilter v0.0.0-20220411075957-e3b120b3f5fb
go.opentelemetry.io/contrib/propagators/autoprop v0.38.0
go4.org/intern v0.0.0-20220617035311-6925f38cc365
golang.org/x/exp v0.0.0-20230321023759-10a507213a29
)

require (
github.com/dgryski/go-metro v0.0.0-20200812162917-85c65e2d0165 // indirect
github.com/huaweicloud/huaweicloud-sdk-go-obs v3.23.3+incompatible // indirect
github.com/onsi/ginkgo v1.16.5 // indirect
go.opentelemetry.io/contrib/propagators/ot v1.13.0 // indirect
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,8 @@ github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSs
github.com/dennwc/varint v1.0.0 h1:kGNFFSSw8ToIy3obO/kKr8U9GZYUAxQEVuix4zfDWzE=
github.com/dennwc/varint v1.0.0/go.mod h1:hnItb35rvZvJrbTALZtY/iQfDs48JKRG1RPpgziApxA=
github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ=
github.com/dgryski/go-metro v0.0.0-20200812162917-85c65e2d0165 h1:BS21ZUJ/B5X2UVUbczfmdWH7GapPWAhxcMsDnjJTU1E=
github.com/dgryski/go-metro v0.0.0-20200812162917-85c65e2d0165/go.mod h1:c9O8+fpSOX1DM8cPNSkX/qsBWdkD4yd2dpciOWQjpBw=
github.com/digitalocean/godo v1.99.0 h1:gUHO7n9bDaZFWvbzOum4bXE0/09ZuYA9yA8idQHX57E=
github.com/dnaeon/go-vcr v1.2.0 h1:zHCHvJYTMh1N7xnV7zf1m1GPBF9Ad0Jk/whtQ1663qI=
github.com/dnaeon/go-vcr v1.2.0/go.mod h1:R4UdLID7HZT3taECzJs4YgbbH6PIGXB6W/sc5OLb6RQ=
Expand Down Expand Up @@ -846,6 +848,8 @@ github.com/santhosh-tekuri/jsonschema v1.2.4/go.mod h1:TEAUOeZSmIxTTuHatJzrvARHi
github.com/satori/go.uuid v1.2.1-0.20181028125025-b2ce2384e17b h1:gQZ0qzfKHQIybLANtM3mBXNUtOfsCFXeTsnBqCsx1KM=
github.com/scaleway/scaleway-sdk-go v1.0.0-beta.15 h1:Y7xOFbD+3jaPw+VN7lkakNJ/pa+ZSQVFp1ONtJaBxns=
github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg65j358z/aeFdxmN0P9QXhEzd20vsDc=
github.com/seiflotfy/cuckoofilter v0.0.0-20220411075957-e3b120b3f5fb h1:XfLJSPIOUX+osiMraVgIrMR27uMXnRJWGm1+GL8/63U=
github.com/seiflotfy/cuckoofilter v0.0.0-20220411075957-e3b120b3f5fb/go.mod h1:bR6DqgcAl1zTcOX8/pE2Qkj9XO00eCNqmKb7lXP8EAg=
github.com/sercand/kuberesolver v2.4.0+incompatible h1:WE2OlRf6wjLxHwNkkFLQGaZcVLEXjMjBPjjEU5vksH8=
github.com/sercand/kuberesolver v2.4.0+incompatible/go.mod h1:lWF3GL0xptCB/vCiJPl/ZshwPsX/n4Y7u0CW9E7aQIQ=
github.com/shirou/gopsutil/v3 v3.21.2/go.mod h1:ghfMypLDrFSWN2c9cDYFLHyynQ+QUht0cv/18ZqVczw=
Expand Down
46 changes: 25 additions & 21 deletions pkg/receive/multitsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,40 +97,31 @@ func NewMultiTSDB(

type localClient struct {
storepb.StoreClient
labelSetFunc func() []labelpb.ZLabelSet
timeRangeFunc func() (int64, int64)
tsdbOpts *tsdb.Options
store *store.TSDBStore
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice cleanup <3

}

func NewLocalClient(
c storepb.StoreClient,
labelSetFunc func() []labelpb.ZLabelSet,
timeRangeFunc func() (int64, int64),
tsdbOpts *tsdb.Options,
) store.Client {
func newLocalClient(c storepb.StoreClient, store *store.TSDBStore) *localClient {
return &localClient{
StoreClient: c,
labelSetFunc: labelSetFunc,
timeRangeFunc: timeRangeFunc,
tsdbOpts: tsdbOpts,
StoreClient: c,
store: store,
}
}

func (l *localClient) LabelSets() []labels.Labels {
return labelpb.ZLabelSetsToPromLabelSets(l.labelSetFunc()...)
return labelpb.ZLabelSetsToPromLabelSets(l.store.LabelSet()...)
}

func (l *localClient) TimeRange() (mint int64, maxt int64) {
return l.timeRangeFunc()
return l.store.TimeRange()
}

func (l *localClient) TSDBInfos() []infopb.TSDBInfo {
labelsets := l.labelSetFunc()
labelsets := l.store.LabelSet()
if len(labelsets) == 0 {
return []infopb.TSDBInfo{}
}

mint, maxt := l.timeRangeFunc()
mint, maxt := l.store.TimeRange()
return []infopb.TSDBInfo{
{
Labels: labelsets[0],
Expand All @@ -141,7 +132,7 @@ func (l *localClient) TSDBInfos() []infopb.TSDBInfo {
}

func (l *localClient) String() string {
mint, maxt := l.timeRangeFunc()
mint, maxt := l.store.TimeRange()
return fmt.Sprintf(
"LabelSets: %v MinTime: %d MaxTime: %d",
labelpb.PromLabelSetsToString(l.LabelSets()), mint, maxt,
Expand Down Expand Up @@ -186,7 +177,7 @@ func (t *tenant) store() *store.TSDBStore {
return t.storeTSDB
}

func (t *tenant) client(logger log.Logger, tsdbOpts *tsdb.Options) store.Client {
func (t *tenant) client(logger log.Logger) store.Client {
t.mtx.RLock()
defer t.mtx.RUnlock()

Expand All @@ -196,7 +187,7 @@ func (t *tenant) client(logger log.Logger, tsdbOpts *tsdb.Options) store.Client
}

client := storepb.ServerAsClient(store.NewRecoverableStoreServer(logger, tsdbStore), 0)
return NewLocalClient(client, tsdbStore.LabelSet, tsdbStore.TimeRange, tsdbOpts)
return newLocalClient(client, tsdbStore)
}

func (t *tenant) exemplars() *exemplars.TSDB {
Expand Down Expand Up @@ -495,7 +486,7 @@ func (t *MultiTSDB) TSDBLocalClients() []store.Client {

res := make([]store.Client, 0, len(t.tenants))
for _, tenant := range t.tenants {
client := tenant.client(t.logger, t.tsdbOpts)
client := tenant.client(t.logger)
if client != nil {
res = append(res, client)
}
Expand Down Expand Up @@ -876,6 +867,19 @@ func (t *MultiTSDB) extractTenantsLabels(tenantID string, initialLset labels.Lab
return initialLset, nil
}

func (t *MultiTSDB) UpdateLabelNames(ctx context.Context) {
t.mtx.RLock()
defer t.mtx.RUnlock()

for _, tenant := range t.tenants {
db := tenant.storeTSDB
if db == nil {
continue
}
db.UpdateLabelNames(ctx)
}
}

// extendLabels extends external labels of the initial label set.
// If an external label shares same name with a label in the initial label set,
// use the label in the initial label set and inform user about it.
Expand Down
Loading
Loading