From e0a0529f6418e1146b90dc2e399f6c48261ae19a Mon Sep 17 00:00:00 2001 From: Filip Petkovski Date: Tue, 25 Apr 2023 09:32:35 +0200 Subject: [PATCH] Resort store response set on internal label dedup When deduplicating on labels which are stored internally in TSDB, the store response set needs to be resorted after replica labels are removed. In order to detect when deduplication by internal labels happens, this PR adds a bloom filter with all label names to the Info response. When a replica label is present in this bloom filter for an individual store, the proxy heap would resort a response set from that store before merging in the result with the rest of the set. Signed-off-by: Filip Petkovski --- cmd/thanos/sidecar.go | 60 +++++- go.mod | 6 +- go.sum | 4 + pkg/bloom/bloom.go | 67 ++++++ pkg/info/infopb/custom.go | 11 + pkg/info/infopb/rpc.pb.go | 304 +++++++++++++++++++++++---- pkg/info/infopb/rpc.proto | 6 + pkg/promclient/promclient.go | 4 + pkg/query/endpointset.go | 12 ++ pkg/receive/multitsdb.go | 36 ++-- pkg/store/prometheus.go | 18 +- pkg/store/prometheus_test.go | 36 +++- pkg/store/proxy.go | 4 + pkg/store/proxy_heap.go | 18 +- pkg/store/proxy_test.go | 39 +++- pkg/store/storepb/testutil/client.go | 4 + 16 files changed, 548 insertions(+), 81 deletions(-) create mode 100644 pkg/bloom/bloom.go create mode 100644 pkg/info/infopb/custom.go diff --git a/cmd/thanos/sidecar.go b/cmd/thanos/sidecar.go index 8fd399232e3..57f0ee0d46a 100644 --- a/cmd/thanos/sidecar.go +++ b/cmd/thanos/sidecar.go @@ -25,6 +25,7 @@ import ( "github.com/thanos-io/objstore/client" "github.com/thanos-io/thanos/pkg/block/metadata" + "github.com/thanos-io/thanos/pkg/bloom" "github.com/thanos-io/thanos/pkg/component" "github.com/thanos-io/thanos/pkg/exemplars" "github.com/thanos-io/thanos/pkg/extkingpin" @@ -109,8 +110,9 @@ func runSidecar( mint: conf.limitMinTime.PrometheusTimestamp(), maxt: math.MaxInt64, - limitMinTime: conf.limitMinTime, - client: promclient.NewWithTracingClient(logger, httpClient, "thanos-sidecar"), + limitMinTime: conf.limitMinTime, + client: promclient.NewWithTracingClient(logger, httpClient, "thanos-sidecar"), + labelNamesBloom: bloom.NewAlwaysTrueFilter(), } confContentYaml, err := conf.objStore.Content() @@ -234,6 +236,19 @@ func runSidecar( }, func(error) { cancel() }) + + g.Add(func() error { + return runutil.Repeat(10*time.Second, ctx.Done(), func() error { + level.Debug(logger).Log("msg", "Starting label names bloom filter update") + + m.UpdateLabelNamesBloom(context.Background()) + + level.Debug(logger).Log("msg", "Finished label names bloom filter update") + return nil + }) + }, func(err error) { + cancel() + }) } { ctx, cancel := context.WithCancel(context.Background()) @@ -246,7 +261,7 @@ func runSidecar( { c := promclient.NewWithTracingClient(logger, httpClient, httpconfig.ThanosUserAgent) - promStore, err := store.NewPrometheusStore(logger, reg, c, conf.prometheus.url, component.Sidecar, m.Labels, m.Timestamps, m.Version) + promStore, err := store.NewPrometheusStore(logger, reg, c, conf.prometheus.url, component.Sidecar, m.Labels, m.Timestamps, m.LabelNamesBloom, m.Version) if err != nil { return errors.Wrap(err, "create Prometheus store") } @@ -267,11 +282,13 @@ func runSidecar( info.WithStoreInfoFunc(func() *infopb.StoreInfo { if httpProbe.IsReady() { mint, maxt := promStore.Timestamps() + labelNamesBloom := promStore.LabelNamesBloom() return &infopb.StoreInfo{ MinTime: mint, MaxTime: maxt, SupportsSharding: true, SupportsWithoutReplicaLabels: true, + LabelNamesBloom: infopb.NewBloomFilter(labelNamesBloom), } } return nil @@ -405,15 +422,16 @@ func validatePrometheus(ctx context.Context, client *promclient.Client, logger l type promMetadata struct { promURL *url.URL - mtx sync.Mutex - mint int64 - maxt int64 - labels labels.Labels - promVersion string - + mtx sync.Mutex + mint int64 + maxt int64 + labels labels.Labels + promVersion string limitMinTime thanosmodel.TimeOrDurationValue client *promclient.Client + + labelNamesBloom bloom.Filter } func (s *promMetadata) UpdateLabels(ctx context.Context) error { @@ -441,6 +459,30 @@ func (s *promMetadata) UpdateTimestamps(mint, maxt int64) { s.maxt = maxt } +func (s *promMetadata) UpdateLabelNamesBloom(ctx context.Context) { + mint, _ := s.Timestamps() + labelNames, err := s.client.LabelNamesInGRPC(ctx, s.promURL, nil, mint, time.Now().UnixMilli()) + if err != nil { + s.mtx.Lock() + defer s.mtx.Unlock() + + s.labelNamesBloom = bloom.NewAlwaysTrueFilter() + return + } + + filter := bloom.NewFilterForStrings(labelNames...) + s.mtx.Lock() + s.labelNamesBloom = filter + s.mtx.Unlock() +} + +func (s *promMetadata) LabelNamesBloom() bloom.Filter { + s.mtx.Lock() + defer s.mtx.Unlock() + + return s.labelNamesBloom +} + func (s *promMetadata) Labels() labels.Labels { s.mtx.Lock() defer s.mtx.Unlock() diff --git a/go.mod b/go.mod index 8587f5efa11..8c6206407a5 100644 --- a/go.mod +++ b/go.mod @@ -125,7 +125,11 @@ require ( golang.org/x/exp v0.0.0-20230307190834-24139beb5833 ) -require go4.org/unsafe/assume-no-moving-gc v0.0.0-20230209150437-ee73d164e760 // indirect +require ( + github.com/bits-and-blooms/bloom v2.0.3+incompatible // indirect + github.com/willf/bitset v1.1.11 // indirect + go4.org/unsafe/assume-no-moving-gc v0.0.0-20230209150437-ee73d164e760 // indirect +) require ( cloud.google.com/go/compute/metadata v0.2.3 // indirect diff --git a/go.sum b/go.sum index 4eccd1d027f..723f2448ba7 100644 --- a/go.sum +++ b/go.sum @@ -175,6 +175,8 @@ github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+Ce github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= github.com/bgentry/speakeasy v0.1.0/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kBD4zp0CCIs= +github.com/bits-and-blooms/bloom v2.0.3+incompatible h1:3ONZFjJoMyfHDil5iCcNkcPJ//PNNo+55RHvPrfUGnY= +github.com/bits-and-blooms/bloom v2.0.3+incompatible/go.mod h1:nEmPH2pqJb3sCXfd7cyDSKC4iPfCAt312JHgNrtnnDE= github.com/blang/semver/v4 v4.0.0 h1:1PFHFE6yCCTv8C1TeyNNarDzntLi7wMI5i/pzqYIsAM= github.com/blang/semver/v4 v4.0.0/go.mod h1:IbckMUScFkM3pff0VJDNKRiT6TG/YpiHIM2yvyW5YoQ= github.com/casbin/casbin/v2 v2.1.2/go.mod h1:YcPU1XXisHhLzuxH9coDNf2FbKpjGlbCg3n9yuLkIJQ= @@ -920,6 +922,8 @@ github.com/weaveworks/common v0.0.0-20221201103051-7c2720a9024d h1:9Z/HiqeGN+LOn github.com/weaveworks/common v0.0.0-20221201103051-7c2720a9024d/go.mod h1:Fnq3+U51tMkPRMC6Wr7zKGUeFFYX4YjNrNK50iU0fcE= github.com/weaveworks/promrus v1.2.0 h1:jOLf6pe6/vss4qGHjXmGz4oDJQA+AOCqEL3FvvZGz7M= github.com/weaveworks/promrus v1.2.0/go.mod h1:SaE82+OJ91yqjrE1rsvBWVzNZKcHYFtMUyS1+Ogs/KA= +github.com/willf/bitset v1.1.11 h1:N7Z7E9UvjW+sGsEl7k/SJrvY2reP1A07MrGuCjIOjRE= +github.com/willf/bitset v1.1.11/go.mod h1:83CECat5yLh5zVOf4P1ErAgKA5UDvKtgyUABdr3+MjI= github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI= github.com/xdg-go/scram v1.0.2/go.mod h1:1WAq6h33pAW+iRreB34OORO2Nf7qel3VV3fjBj+hCSs= github.com/xdg-go/scram v1.1.1/go.mod h1:RaEWvsqvNKKvBPvcKeFjrG2cJqOkHTiyTpzz23ni57g= diff --git a/pkg/bloom/bloom.go b/pkg/bloom/bloom.go new file mode 100644 index 00000000000..c6a2b087d9a --- /dev/null +++ b/pkg/bloom/bloom.go @@ -0,0 +1,67 @@ +package bloom + +import ( + "bytes" + + "github.com/bits-and-blooms/bloom" +) + +const FilterErrorRate = 0.01 + +type Filter interface { + Test(string) bool + Bytes() []byte +} + +type filter struct { + bloom *bloom.BloomFilter +} + +func NewFromBytes(bloomBytes []byte) Filter { + if bloomBytes == nil { + return NewAlwaysTrueFilter() + } + + bloomFilter := &bloom.BloomFilter{} + byteReader := bytes.NewReader(bloomBytes) + if _, err := bloomFilter.ReadFrom(byteReader); err != nil { + return NewAlwaysTrueFilter() + } + + return &filter{bloom: bloomFilter} +} + +func NewFilterForStrings(items ...string) Filter { + bloomFilter := bloom.NewWithEstimates(uint(len(items)), FilterErrorRate) + for _, label := range items { + bloomFilter.AddString(label) + } + + return &filter{bloom: bloomFilter} +} + +func (f filter) Bytes() []byte { + var buf bytes.Buffer + if _, err := f.bloom.WriteTo(&buf); err != nil { + return nil + } + return buf.Bytes() +} + +func (f filter) Test(s string) bool { + return f.bloom.TestString(s) +} + +type alwaysTrueFilter struct{} + +func NewAlwaysTrueFilter() *alwaysTrueFilter { + return &alwaysTrueFilter{} +} + +func (e alwaysTrueFilter) Test(s string) bool { + return true +} + +func (e alwaysTrueFilter) Bytes() []byte { + return nil +} diff --git a/pkg/info/infopb/custom.go b/pkg/info/infopb/custom.go new file mode 100644 index 00000000000..a4659dc93b6 --- /dev/null +++ b/pkg/info/infopb/custom.go @@ -0,0 +1,11 @@ +package infopb + +import ( + "github.com/thanos-io/thanos/pkg/bloom" +) + +func NewBloomFilter(filter bloom.Filter) *BloomFilter { + return &BloomFilter{ + BloomFilterData: filter.Bytes(), + } +} diff --git a/pkg/info/infopb/rpc.pb.go b/pkg/info/infopb/rpc.pb.go index 6153dbb4cf5..3d572539499 100644 --- a/pkg/info/infopb/rpc.pb.go +++ b/pkg/info/infopb/rpc.pb.go @@ -121,7 +121,8 @@ type StoreInfo struct { MaxTime int64 `protobuf:"varint,2,opt,name=max_time,json=maxTime,proto3" json:"max_time,omitempty"` SupportsSharding bool `protobuf:"varint,3,opt,name=supports_sharding,json=supportsSharding,proto3" json:"supports_sharding,omitempty"` // replica_aware means this store supports without_replica_labels of StoreAPI.Series. - SupportsWithoutReplicaLabels bool `protobuf:"varint,5,opt,name=supports_without_replica_labels,json=supportsWithoutReplicaLabels,proto3" json:"supports_without_replica_labels,omitempty"` + SupportsWithoutReplicaLabels bool `protobuf:"varint,5,opt,name=supports_without_replica_labels,json=supportsWithoutReplicaLabels,proto3" json:"supports_without_replica_labels,omitempty"` + LabelNamesBloom *BloomFilter `protobuf:"bytes,6,opt,name=label_names_bloom,json=labelNamesBloom,proto3" json:"label_names_bloom,omitempty"` } func (m *StoreInfo) Reset() { *m = StoreInfo{} } @@ -157,6 +158,43 @@ func (m *StoreInfo) XXX_DiscardUnknown() { var xxx_messageInfo_StoreInfo proto.InternalMessageInfo +type BloomFilter struct { + BloomFilterData []byte `protobuf:"bytes,1,opt,name=bloom_filter_data,json=bloomFilterData,proto3" json:"bloom_filter_data,omitempty"` +} + +func (m *BloomFilter) Reset() { *m = BloomFilter{} } +func (m *BloomFilter) String() string { return proto.CompactTextString(m) } +func (*BloomFilter) ProtoMessage() {} +func (*BloomFilter) Descriptor() ([]byte, []int) { + return fileDescriptor_a1214ec45d2bf952, []int{3} +} +func (m *BloomFilter) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *BloomFilter) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_BloomFilter.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *BloomFilter) XXX_Merge(src proto.Message) { + xxx_messageInfo_BloomFilter.Merge(m, src) +} +func (m *BloomFilter) XXX_Size() int { + return m.Size() +} +func (m *BloomFilter) XXX_DiscardUnknown() { + xxx_messageInfo_BloomFilter.DiscardUnknown(m) +} + +var xxx_messageInfo_BloomFilter proto.InternalMessageInfo + // RulesInfo holds the metadata related to Rules API exposed by the component. type RulesInfo struct { } @@ -165,7 +203,7 @@ func (m *RulesInfo) Reset() { *m = RulesInfo{} } func (m *RulesInfo) String() string { return proto.CompactTextString(m) } func (*RulesInfo) ProtoMessage() {} func (*RulesInfo) Descriptor() ([]byte, []int) { - return fileDescriptor_a1214ec45d2bf952, []int{3} + return fileDescriptor_a1214ec45d2bf952, []int{4} } func (m *RulesInfo) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -202,7 +240,7 @@ func (m *MetricMetadataInfo) Reset() { *m = MetricMetadataInfo{} } func (m *MetricMetadataInfo) String() string { return proto.CompactTextString(m) } func (*MetricMetadataInfo) ProtoMessage() {} func (*MetricMetadataInfo) Descriptor() ([]byte, []int) { - return fileDescriptor_a1214ec45d2bf952, []int{4} + return fileDescriptor_a1214ec45d2bf952, []int{5} } func (m *MetricMetadataInfo) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -239,7 +277,7 @@ func (m *TargetsInfo) Reset() { *m = TargetsInfo{} } func (m *TargetsInfo) String() string { return proto.CompactTextString(m) } func (*TargetsInfo) ProtoMessage() {} func (*TargetsInfo) Descriptor() ([]byte, []int) { - return fileDescriptor_a1214ec45d2bf952, []int{5} + return fileDescriptor_a1214ec45d2bf952, []int{6} } func (m *TargetsInfo) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -278,7 +316,7 @@ func (m *ExemplarsInfo) Reset() { *m = ExemplarsInfo{} } func (m *ExemplarsInfo) String() string { return proto.CompactTextString(m) } func (*ExemplarsInfo) ProtoMessage() {} func (*ExemplarsInfo) Descriptor() ([]byte, []int) { - return fileDescriptor_a1214ec45d2bf952, []int{6} + return fileDescriptor_a1214ec45d2bf952, []int{7} } func (m *ExemplarsInfo) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -315,7 +353,7 @@ func (m *QueryAPIInfo) Reset() { *m = QueryAPIInfo{} } func (m *QueryAPIInfo) String() string { return proto.CompactTextString(m) } func (*QueryAPIInfo) ProtoMessage() {} func (*QueryAPIInfo) Descriptor() ([]byte, []int) { - return fileDescriptor_a1214ec45d2bf952, []int{7} + return fileDescriptor_a1214ec45d2bf952, []int{8} } func (m *QueryAPIInfo) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -348,6 +386,7 @@ func init() { proto.RegisterType((*InfoRequest)(nil), "thanos.info.InfoRequest") proto.RegisterType((*InfoResponse)(nil), "thanos.info.InfoResponse") proto.RegisterType((*StoreInfo)(nil), "thanos.info.StoreInfo") + proto.RegisterType((*BloomFilter)(nil), "thanos.info.BloomFilter") proto.RegisterType((*RulesInfo)(nil), "thanos.info.RulesInfo") proto.RegisterType((*MetricMetadataInfo)(nil), "thanos.info.MetricMetadataInfo") proto.RegisterType((*TargetsInfo)(nil), "thanos.info.TargetsInfo") @@ -358,41 +397,45 @@ func init() { func init() { proto.RegisterFile("info/infopb/rpc.proto", fileDescriptor_a1214ec45d2bf952) } var fileDescriptor_a1214ec45d2bf952 = []byte{ - // 533 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x9c, 0x93, 0xdf, 0x6a, 0xdb, 0x30, - 0x14, 0xc6, 0xe3, 0xe6, 0xbf, 0xd2, 0x74, 0xad, 0xe8, 0x86, 0x13, 0x86, 0x13, 0x4c, 0x2f, 0x02, - 0x1b, 0x31, 0x64, 0x30, 0x06, 0xbb, 0x5a, 0x4b, 0x60, 0x1d, 0x2b, 0x6c, 0x4e, 0x60, 0xd0, 0x1b, - 0xa3, 0xa4, 0x6a, 0x62, 0xb0, 0x2d, 0x55, 0x92, 0x59, 0xf2, 0x16, 0x7b, 0x95, 0x5d, 0xef, 0x05, - 0x72, 0xd9, 0xcb, 0x5d, 0x8d, 0x2d, 0x79, 0x91, 0xa1, 0x23, 0x27, 0x8b, 0x59, 0xaf, 0x7a, 0x93, - 0x48, 0xe7, 0xfb, 0x7d, 0xc7, 0xd2, 0x39, 0x3a, 0xe8, 0x69, 0x98, 0xdc, 0x32, 0x4f, 0xff, 0xf0, - 0x89, 0x27, 0xf8, 0xb4, 0xcf, 0x05, 0x53, 0x0c, 0x37, 0xd4, 0x9c, 0x24, 0x4c, 0xf6, 0xb5, 0xd0, - 0x6e, 0x49, 0xc5, 0x04, 0xf5, 0x22, 0x32, 0xa1, 0x11, 0x9f, 0x78, 0x6a, 0xc9, 0xa9, 0x34, 0x5c, - 0xfb, 0x74, 0xc6, 0x66, 0x0c, 0x96, 0x9e, 0x5e, 0x99, 0xa8, 0xdb, 0x44, 0x8d, 0xcb, 0xe4, 0x96, - 0xf9, 0xf4, 0x2e, 0xa5, 0x52, 0xb9, 0xdf, 0x8b, 0xe8, 0xd0, 0xec, 0x25, 0x67, 0x89, 0xa4, 0xf8, - 0x35, 0x42, 0x90, 0x2c, 0x90, 0x54, 0x49, 0xdb, 0xea, 0x16, 0x7b, 0x8d, 0xc1, 0x49, 0x3f, 0xfb, - 0xe4, 0xf5, 0x47, 0x2d, 0x8d, 0xa8, 0x3a, 0x2f, 0xad, 0x7e, 0x75, 0x0a, 0x7e, 0x3d, 0xca, 0xf6, - 0x12, 0x9f, 0xa1, 0xe6, 0x05, 0x8b, 0x39, 0x4b, 0x68, 0xa2, 0xc6, 0x4b, 0x4e, 0xed, 0x83, 0xae, - 0xd5, 0xab, 0xfb, 0xf9, 0x20, 0x7e, 0x89, 0xca, 0x70, 0x60, 0xbb, 0xd8, 0xb5, 0x7a, 0x8d, 0xc1, - 0xb3, 0xfe, 0xde, 0x5d, 0xfa, 0x23, 0xad, 0xc0, 0x61, 0x0c, 0xa4, 0x69, 0x91, 0x46, 0x54, 0xda, - 0xa5, 0x07, 0x68, 0x5f, 0x2b, 0x86, 0x06, 0x08, 0xbf, 0x47, 0x4f, 0x62, 0xaa, 0x44, 0x38, 0x0d, - 0x62, 0xaa, 0xc8, 0x0d, 0x51, 0xc4, 0x2e, 0x83, 0xaf, 0x93, 0xf3, 0x5d, 0x01, 0x73, 0x95, 0x21, - 0x90, 0xe0, 0x28, 0xce, 0xc5, 0xf0, 0x00, 0x55, 0x15, 0x11, 0x33, 0x5d, 0x80, 0x0a, 0x64, 0xb0, - 0x73, 0x19, 0xc6, 0x46, 0x03, 0xeb, 0x16, 0xc4, 0x6f, 0x50, 0x9d, 0x2e, 0x68, 0xcc, 0x23, 0x22, - 0xa4, 0x5d, 0x05, 0x57, 0x3b, 0xe7, 0x1a, 0x6e, 0x55, 0xf0, 0xfd, 0x83, 0xb1, 0x87, 0xca, 0x77, - 0x29, 0x15, 0x4b, 0xbb, 0x06, 0xae, 0x56, 0xce, 0xf5, 0x59, 0x2b, 0xef, 0x3e, 0x5d, 0x9a, 0x8b, - 0x02, 0xe7, 0xfe, 0xb0, 0x50, 0x7d, 0x57, 0x2b, 0xdc, 0x42, 0xb5, 0x38, 0x4c, 0x02, 0x15, 0xc6, - 0xd4, 0xb6, 0xba, 0x56, 0xaf, 0xe8, 0x57, 0xe3, 0x30, 0x19, 0x87, 0x31, 0x05, 0x89, 0x2c, 0x8c, - 0x74, 0x90, 0x49, 0x64, 0x01, 0xd2, 0x0b, 0x74, 0x22, 0x53, 0xce, 0x99, 0x50, 0x32, 0x90, 0x73, - 0x22, 0x6e, 0xc2, 0x64, 0x06, 0x4d, 0xa9, 0xf9, 0xc7, 0x5b, 0x61, 0x94, 0xc5, 0xf1, 0x10, 0x75, - 0x76, 0xf0, 0xd7, 0x50, 0xcd, 0x59, 0xaa, 0x02, 0x41, 0x79, 0x14, 0x4e, 0x49, 0x00, 0x2f, 0x40, - 0x42, 0xa5, 0x6b, 0xfe, 0xf3, 0x2d, 0xf6, 0xc5, 0x50, 0xbe, 0x81, 0xe0, 0xd5, 0xc8, 0x0f, 0xa5, - 0x5a, 0xe9, 0xb8, 0xec, 0x36, 0x50, 0x7d, 0xd7, 0x3a, 0xf7, 0x14, 0xe1, 0xff, 0xfb, 0xa1, 0xdf, - 0xe8, 0x5e, 0x8d, 0xdd, 0x21, 0x6a, 0xe6, 0x8a, 0xf7, 0xb8, 0x2b, 0xbb, 0x47, 0xe8, 0x70, 0xbf, - 0x9a, 0x83, 0x0b, 0x54, 0x82, 0x6c, 0x6f, 0xb3, 0xff, 0x7c, 0x93, 0xf7, 0x86, 0xa4, 0xdd, 0x7a, - 0x40, 0x31, 0xe3, 0x72, 0x7e, 0xb6, 0xfa, 0xe3, 0x14, 0x56, 0x6b, 0xc7, 0xba, 0x5f, 0x3b, 0xd6, - 0xef, 0xb5, 0x63, 0x7d, 0xdb, 0x38, 0x85, 0xfb, 0x8d, 0x53, 0xf8, 0xb9, 0x71, 0x0a, 0xd7, 0x15, - 0x33, 0xbc, 0x93, 0x0a, 0xcc, 0xde, 0xab, 0xbf, 0x01, 0x00, 0x00, 0xff, 0xff, 0x7f, 0x1d, 0x6e, - 0xa7, 0xd2, 0x03, 0x00, 0x00, + // 593 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x9c, 0x94, 0xcb, 0x6e, 0xda, 0x4c, + 0x14, 0xc7, 0x71, 0x80, 0x04, 0xc6, 0xb9, 0x31, 0xca, 0xf7, 0xc9, 0xa0, 0xca, 0x41, 0x56, 0x16, + 0xa8, 0xad, 0xb0, 0x44, 0xa5, 0xaa, 0x55, 0x57, 0xcd, 0xa5, 0x6a, 0xaa, 0xa6, 0x6a, 0x9d, 0x48, + 0x95, 0xb2, 0xb1, 0xc6, 0xc9, 0x00, 0x96, 0x6c, 0xcf, 0x64, 0x66, 0x50, 0x61, 0xd9, 0x37, 0xe8, + 0xab, 0xf4, 0x2d, 0x58, 0x66, 0xd9, 0x55, 0xd5, 0xc2, 0x8b, 0x54, 0x73, 0xc6, 0x10, 0xac, 0xb2, + 0xea, 0x06, 0x3c, 0xe7, 0xff, 0xfb, 0x1f, 0x3c, 0xe7, 0x02, 0xfa, 0x2f, 0xce, 0xfa, 0xcc, 0xd7, + 0x1f, 0x3c, 0xf2, 0x05, 0xbf, 0xe9, 0x72, 0xc1, 0x14, 0xc3, 0xb6, 0x1a, 0x92, 0x8c, 0xc9, 0xae, + 0x16, 0x5a, 0x4d, 0xa9, 0x98, 0xa0, 0x7e, 0x42, 0x22, 0x9a, 0xf0, 0xc8, 0x57, 0x13, 0x4e, 0xa5, + 0xe1, 0x5a, 0x07, 0x03, 0x36, 0x60, 0xf0, 0xe8, 0xeb, 0x27, 0x13, 0xf5, 0x76, 0x90, 0x7d, 0x9e, + 0xf5, 0x59, 0x40, 0xef, 0x46, 0x54, 0x2a, 0xef, 0x7b, 0x19, 0x6d, 0x9b, 0xb3, 0xe4, 0x2c, 0x93, + 0x14, 0x3f, 0x47, 0x08, 0x92, 0x85, 0x92, 0x2a, 0xe9, 0x58, 0xed, 0x72, 0xc7, 0xee, 0x35, 0xba, + 0xf9, 0x4f, 0x5e, 0xbf, 0xd7, 0xd2, 0x25, 0x55, 0xc7, 0x95, 0xe9, 0xcf, 0xc3, 0x52, 0x50, 0x4f, + 0xf2, 0xb3, 0xc4, 0x47, 0x68, 0xe7, 0x84, 0xa5, 0x9c, 0x65, 0x34, 0x53, 0x57, 0x13, 0x4e, 0x9d, + 0x8d, 0xb6, 0xd5, 0xa9, 0x07, 0xc5, 0x20, 0x7e, 0x8a, 0xaa, 0xf0, 0xc2, 0x4e, 0xb9, 0x6d, 0x75, + 0xec, 0xde, 0xff, 0xdd, 0x95, 0xbb, 0x74, 0x2f, 0xb5, 0x02, 0x2f, 0x63, 0x20, 0x4d, 0x8b, 0x51, + 0x42, 0xa5, 0x53, 0x59, 0x43, 0x07, 0x5a, 0x31, 0x34, 0x40, 0xf8, 0x2d, 0xda, 0x4b, 0xa9, 0x12, + 0xf1, 0x4d, 0x98, 0x52, 0x45, 0x6e, 0x89, 0x22, 0x4e, 0x15, 0x7c, 0x87, 0x05, 0xdf, 0x05, 0x30, + 0x17, 0x39, 0x02, 0x09, 0x76, 0xd3, 0x42, 0x0c, 0xf7, 0xd0, 0x96, 0x22, 0x62, 0xa0, 0x0b, 0xb0, + 0x09, 0x19, 0x9c, 0x42, 0x86, 0x2b, 0xa3, 0x81, 0x75, 0x01, 0xe2, 0x17, 0xa8, 0x4e, 0xc7, 0x34, + 0xe5, 0x09, 0x11, 0xd2, 0xd9, 0x02, 0x57, 0xab, 0xe0, 0x3a, 0x5b, 0xa8, 0xe0, 0x7b, 0x80, 0xb1, + 0x8f, 0xaa, 0x77, 0x23, 0x2a, 0x26, 0x4e, 0x0d, 0x5c, 0xcd, 0x82, 0xeb, 0x93, 0x56, 0x5e, 0x7f, + 0x3c, 0x37, 0x17, 0x05, 0xce, 0xfb, 0xba, 0x81, 0xea, 0xcb, 0x5a, 0xe1, 0x26, 0xaa, 0xa5, 0x71, + 0x16, 0xaa, 0x38, 0xa5, 0x8e, 0xd5, 0xb6, 0x3a, 0xe5, 0x60, 0x2b, 0x8d, 0xb3, 0xab, 0x38, 0xa5, + 0x20, 0x91, 0xb1, 0x91, 0x36, 0x72, 0x89, 0x8c, 0x41, 0x7a, 0x82, 0x1a, 0x72, 0xc4, 0x39, 0x13, + 0x4a, 0x86, 0x72, 0x48, 0xc4, 0x6d, 0x9c, 0x0d, 0xa0, 0x29, 0xb5, 0x60, 0x7f, 0x21, 0x5c, 0xe6, + 0x71, 0x7c, 0x86, 0x0e, 0x97, 0xf0, 0x97, 0x58, 0x0d, 0xd9, 0x48, 0x85, 0x82, 0xf2, 0x24, 0xbe, + 0x21, 0x21, 0x4c, 0x80, 0x84, 0x4a, 0xd7, 0x82, 0x47, 0x0b, 0xec, 0xb3, 0xa1, 0x02, 0x03, 0xc1, + 0xd4, 0x48, 0x7c, 0x8a, 0x1a, 0x66, 0xb4, 0x32, 0x92, 0x52, 0x19, 0x46, 0x09, 0x63, 0xe9, 0xda, + 0x02, 0x1f, 0x6b, 0xe5, 0x4d, 0x9c, 0x28, 0x2a, 0x82, 0x3d, 0xb0, 0x7c, 0xd0, 0x0e, 0x08, 0xbf, + 0xab, 0xd4, 0x2a, 0xfb, 0x55, 0xef, 0x25, 0xb2, 0x57, 0x28, 0xfc, 0x18, 0x35, 0x20, 0x5d, 0xd8, + 0x87, 0x73, 0x08, 0xdd, 0xd7, 0xd5, 0xd8, 0x0e, 0xf6, 0xa2, 0x07, 0xee, 0x94, 0x28, 0xe2, 0xd9, + 0xa8, 0xbe, 0x9c, 0x1d, 0xef, 0x00, 0xe1, 0xbf, 0x07, 0x42, 0x2f, 0xc9, 0x4a, 0x93, 0xbd, 0x33, + 0xb4, 0x53, 0xe8, 0xde, 0xbf, 0xd5, 0xdc, 0xdb, 0x45, 0xdb, 0xab, 0xed, 0xec, 0x9d, 0xa0, 0x0a, + 0x64, 0x7b, 0x95, 0x7f, 0x17, 0x8b, 0xb0, 0xb2, 0xa5, 0xad, 0xe6, 0x1a, 0xc5, 0xec, 0xeb, 0xf1, + 0xd1, 0xf4, 0xb7, 0x5b, 0x9a, 0xce, 0x5c, 0xeb, 0x7e, 0xe6, 0x5a, 0xbf, 0x66, 0xae, 0xf5, 0x6d, + 0xee, 0x96, 0xee, 0xe7, 0x6e, 0xe9, 0xc7, 0xdc, 0x2d, 0x5d, 0x6f, 0x9a, 0x7f, 0x8f, 0x68, 0x13, + 0x96, 0xff, 0xd9, 0x9f, 0x00, 0x00, 0x00, 0xff, 0xff, 0x2f, 0x81, 0x35, 0x13, 0x53, 0x04, 0x00, + 0x00, } // Reference imports to suppress errors if they are not otherwise used. @@ -636,6 +679,18 @@ func (m *StoreInfo) MarshalToSizedBuffer(dAtA []byte) (int, error) { _ = i var l int _ = l + if m.LabelNamesBloom != nil { + { + size, err := m.LabelNamesBloom.MarshalToSizedBuffer(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = encodeVarintRpc(dAtA, i, uint64(size)) + } + i-- + dAtA[i] = 0x32 + } if m.SupportsWithoutReplicaLabels { i-- if m.SupportsWithoutReplicaLabels { @@ -669,6 +724,36 @@ func (m *StoreInfo) MarshalToSizedBuffer(dAtA []byte) (int, error) { return len(dAtA) - i, nil } +func (m *BloomFilter) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *BloomFilter) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *BloomFilter) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if len(m.BloomFilterData) > 0 { + i -= len(m.BloomFilterData) + copy(dAtA[i:], m.BloomFilterData) + i = encodeVarintRpc(dAtA, i, uint64(len(m.BloomFilterData))) + i-- + dAtA[i] = 0xa + } + return len(dAtA) - i, nil +} + func (m *RulesInfo) Marshal() (dAtA []byte, err error) { size := m.Size() dAtA = make([]byte, size) @@ -875,6 +960,23 @@ func (m *StoreInfo) Size() (n int) { if m.SupportsWithoutReplicaLabels { n += 2 } + if m.LabelNamesBloom != nil { + l = m.LabelNamesBloom.Size() + n += 1 + l + sovRpc(uint64(l)) + } + return n +} + +func (m *BloomFilter) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + l = len(m.BloomFilterData) + if l > 0 { + n += 1 + l + sovRpc(uint64(l)) + } return n } @@ -1424,6 +1526,126 @@ func (m *StoreInfo) Unmarshal(dAtA []byte) error { } } m.SupportsWithoutReplicaLabels = bool(v != 0) + case 6: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field LabelNamesBloom", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRpc + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthRpc + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthRpc + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + if m.LabelNamesBloom == nil { + m.LabelNamesBloom = &BloomFilter{} + } + if err := m.LabelNamesBloom.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipRpc(dAtA[iNdEx:]) + if err != nil { + return err + } + if (skippy < 0) || (iNdEx+skippy) < 0 { + return ErrInvalidLengthRpc + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *BloomFilter) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRpc + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: BloomFilter: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: BloomFilter: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field BloomFilterData", wireType) + } + var byteLen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRpc + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + byteLen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if byteLen < 0 { + return ErrInvalidLengthRpc + } + postIndex := iNdEx + byteLen + if postIndex < 0 { + return ErrInvalidLengthRpc + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.BloomFilterData = append(m.BloomFilterData[:0], dAtA[iNdEx:postIndex]...) + if m.BloomFilterData == nil { + m.BloomFilterData = []byte{} + } + iNdEx = postIndex default: iNdEx = preIndex skippy, err := skipRpc(dAtA[iNdEx:]) diff --git a/pkg/info/infopb/rpc.proto b/pkg/info/infopb/rpc.proto index 90cecd58905..51acbaf1d22 100644 --- a/pkg/info/infopb/rpc.proto +++ b/pkg/info/infopb/rpc.proto @@ -61,6 +61,12 @@ message StoreInfo { // replica_aware means this store supports without_replica_labels of StoreAPI.Series. bool supports_without_replica_labels = 5; + + BloomFilter label_names_bloom = 6; +} + +message BloomFilter { + bytes bloom_filter_data = 1; } // RulesInfo holds the metadata related to Rules API exposed by the component. diff --git a/pkg/promclient/promclient.go b/pkg/promclient/promclient.go index 430264471dc..b75a1e28276 100644 --- a/pkg/promclient/promclient.go +++ b/pkg/promclient/promclient.go @@ -676,6 +676,8 @@ func (c *Client) get2xxResultWithGRPCErrors(ctx context.Context, spanName string Error string `json:"error"` } + fmt.Println(string(body)) + if err = json.Unmarshal(body, &m); err != nil { return status.Error(codes.Internal, err.Error()) } @@ -728,6 +730,8 @@ func (c *Client) LabelNamesInGRPC(ctx context.Context, base *url.URL, matchers [ q.Add("end", formatTime(timestamp.Time(endTime))) u.RawQuery = q.Encode() + fmt.Println(u.String()) + var m struct { Data []string `json:"data"` } diff --git a/pkg/query/endpointset.go b/pkg/query/endpointset.go index 826d3e3cfb9..b015e4f4b26 100644 --- a/pkg/query/endpointset.go +++ b/pkg/query/endpointset.go @@ -14,6 +14,7 @@ import ( "unicode/utf8" "github.com/thanos-io/thanos/pkg/api/query/querypb" + "github.com/thanos-io/thanos/pkg/bloom" "github.com/go-kit/log" "github.com/go-kit/log/level" @@ -805,6 +806,17 @@ func (er *endpointRef) timeRange() (int64, int64) { return er.metadata.Store.MinTime, er.metadata.Store.MaxTime } +func (er *endpointRef) LabelNamesBloom() bloom.Filter { + er.mtx.RLock() + defer er.mtx.RUnlock() + + if er.metadata == nil || er.metadata.Store == nil || er.metadata.Store.LabelNamesBloom == nil { + return bloom.NewAlwaysTrueFilter() + } + + return bloom.NewFromBytes(er.metadata.Store.LabelNamesBloom.BloomFilterData) +} + func (er *endpointRef) SupportsSharding() bool { er.mtx.RLock() defer er.mtx.RUnlock() diff --git a/pkg/receive/multitsdb.go b/pkg/receive/multitsdb.go index 63357d85609..2755b02153d 100644 --- a/pkg/receive/multitsdb.go +++ b/pkg/receive/multitsdb.go @@ -23,10 +23,11 @@ import ( "go.uber.org/atomic" "golang.org/x/sync/errgroup" - "github.com/thanos-io/thanos/pkg/api/status" - "github.com/thanos-io/objstore" + "github.com/thanos-io/thanos/pkg/api/status" + "github.com/thanos-io/thanos/pkg/bloom" + "github.com/thanos-io/thanos/pkg/block/metadata" "github.com/thanos-io/thanos/pkg/component" "github.com/thanos-io/thanos/pkg/errutil" @@ -93,28 +94,35 @@ func NewMultiTSDB( type localClient struct { storepb.StoreClient - labelSetFunc func() []labelpb.ZLabelSet - timeRangeFunc func() (int64, int64) + store *store.TSDBStore +} + +func newLocalClient(c storepb.StoreClient, store *store.TSDBStore) *localClient { + return &localClient{ + StoreClient: c, + store: store, + } } -func newLocalClient( - c storepb.StoreClient, - labelSetFunc func() []labelpb.ZLabelSet, - timeRangeFunc func() (int64, int64), -) *localClient { - return &localClient{c, labelSetFunc, timeRangeFunc} +func (l *localClient) LabelNamesBloom() bloom.Filter { + labelNames, err := l.store.LabelNames(context.Background(), &storepb.LabelNamesRequest{}) + if err != nil { + return bloom.NewAlwaysTrueFilter() + } + + return bloom.NewFilterForStrings(labelNames.Names...) } func (l *localClient) LabelSets() []labels.Labels { - return labelpb.ZLabelSetsToPromLabelSets(l.labelSetFunc()...) + return labelpb.ZLabelSetsToPromLabelSets(l.store.LabelSet()...) } func (l *localClient) TimeRange() (mint int64, maxt int64) { - return l.timeRangeFunc() + return l.store.TimeRange() } func (l *localClient) String() string { - mint, maxt := l.timeRangeFunc() + mint, maxt := l.store.TimeRange() return fmt.Sprintf( "LabelSets: %v MinTime: %d MaxTime: %d", labelpb.PromLabelSetsToString(l.LabelSets()), mint, maxt, @@ -168,7 +176,7 @@ func (t *tenant) client(logger log.Logger) store.Client { return nil } client := storepb.ServerAsClient(store.NewRecoverableStoreServer(logger, tsdbStore), 0) - return newLocalClient(client, tsdbStore.LabelSet, tsdbStore.TimeRange) + return newLocalClient(client, tsdbStore) } func (t *tenant) exemplars() *exemplars.TSDB { diff --git a/pkg/store/prometheus.go b/pkg/store/prometheus.go index e4444c718ff..50ffcd973a0 100644 --- a/pkg/store/prometheus.go +++ b/pkg/store/prometheus.go @@ -30,6 +30,10 @@ import ( "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/storage/remote" "github.com/prometheus/prometheus/tsdb/chunkenc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + + "github.com/thanos-io/thanos/pkg/bloom" "github.com/thanos-io/thanos/pkg/component" "github.com/thanos-io/thanos/pkg/dedup" "github.com/thanos-io/thanos/pkg/httpconfig" @@ -39,8 +43,6 @@ import ( "github.com/thanos-io/thanos/pkg/store/storepb" "github.com/thanos-io/thanos/pkg/store/storepb/prompb" "github.com/thanos-io/thanos/pkg/tracing" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/status" ) // PrometheusStore implements the store node API on top of the Prometheus remote read API. @@ -51,8 +53,10 @@ type PrometheusStore struct { buffers sync.Pool component component.StoreAPI externalLabelsFn func() labels.Labels - promVersion func() string - timestamps func() (mint int64, maxt int64) + labelNamesBloom func() bloom.Filter + + promVersion func() string + timestamps func() (mint int64, maxt int64) remoteReadAcceptableResponses []prompb.ReadRequest_ResponseType @@ -76,6 +80,7 @@ func NewPrometheusStore( component component.StoreAPI, externalLabelsFn func() labels.Labels, timestamps func() (mint int64, maxt int64), + labelNamesBloom func() bloom.Filter, promVersion func() string, ) (*PrometheusStore, error) { if logger == nil { @@ -89,6 +94,7 @@ func NewPrometheusStore( externalLabelsFn: externalLabelsFn, promVersion: promVersion, timestamps: timestamps, + labelNamesBloom: labelNamesBloom, remoteReadAcceptableResponses: []prompb.ReadRequest_ResponseType{prompb.ReadRequest_STREAMED_XOR_CHUNKS, prompb.ReadRequest_SAMPLES}, buffers: sync.Pool{New: func() interface{} { b := make([]byte, 0, initialBufSize) @@ -758,3 +764,7 @@ func (p *PrometheusStore) LabelSet() []labelpb.ZLabelSet { func (p *PrometheusStore) Timestamps() (mint int64, maxt int64) { return p.timestamps() } + +func (p *PrometheusStore) LabelNamesBloom() bloom.Filter { + return p.labelNamesBloom() +} diff --git a/pkg/store/prometheus_test.go b/pkg/store/prometheus_test.go index cb3c5005791..406a34f336d 100644 --- a/pkg/store/prometheus_test.go +++ b/pkg/store/prometheus_test.go @@ -20,6 +20,8 @@ import ( "github.com/prometheus/prometheus/tsdb/chunkenc" "github.com/efficientgo/core/testutil" + + "github.com/thanos-io/thanos/pkg/bloom" "github.com/thanos-io/thanos/pkg/component" "github.com/thanos-io/thanos/pkg/promclient" "github.com/thanos-io/thanos/pkg/store/labelpb" @@ -67,7 +69,10 @@ func testPrometheusStoreSeriesE2e(t *testing.T, prefix string) { limitMinT := int64(0) proxy, err := NewPrometheusStore(nil, nil, promclient.NewDefaultClient(), u, component.Sidecar, func() labels.Labels { return labels.FromStrings("region", "eu-west") }, - func() (int64, int64) { return limitMinT, -1 }, nil) // MaxTime does not matter. + func() (int64, int64) { return limitMinT, -1 }, + func() bloom.Filter { return bloom.NewAlwaysTrueFilter() }, + nil, + ) // MaxTime does not matter. testutil.Ok(t, err) // Query all three samples except for the first one. Since we round up queried data @@ -194,7 +199,10 @@ func TestPrometheusStore_SeriesLabels_e2e(t *testing.T) { promStore, err := NewPrometheusStore(nil, nil, promclient.NewDefaultClient(), u, component.Sidecar, func() labels.Labels { return labels.FromStrings("region", "eu-west") }, - func() (int64, int64) { return math.MinInt64/1000 + 62135596801, math.MaxInt64/1000 - 62135596801 }, nil) + func() (int64, int64) { return math.MinInt64/1000 + 62135596801, math.MaxInt64/1000 - 62135596801 }, + func() bloom.Filter { return bloom.NewAlwaysTrueFilter() }, + nil, + ) testutil.Ok(t, err) for _, tcase := range []struct { @@ -362,9 +370,11 @@ func TestPrometheusStore_LabelAPIs(t *testing.T) { version, err := promclient.NewDefaultClient().BuildVersion(context.Background(), u) testutil.Ok(t, err) - promStore, err := NewPrometheusStore(nil, nil, promclient.NewDefaultClient(), u, component.Sidecar, func() labels.Labels { - return extLset - }, nil, func() string { return version }) + promStore, err := NewPrometheusStore(nil, nil, promclient.NewDefaultClient(), u, component.Sidecar, + func() labels.Labels { return extLset }, + nil, + func() bloom.Filter { return bloom.NewAlwaysTrueFilter() }, + func() string { return version }) testutil.Ok(t, err) return promStore @@ -399,7 +409,9 @@ func TestPrometheusStore_Series_MatchExternalLabel(t *testing.T) { proxy, err := NewPrometheusStore(nil, nil, promclient.NewDefaultClient(), u, component.Sidecar, func() labels.Labels { return labels.FromStrings("region", "eu-west") }, - func() (int64, int64) { return 0, math.MaxInt64 }, nil) + func() (int64, int64) { return 0, math.MaxInt64 }, + func() bloom.Filter { return bloom.NewAlwaysTrueFilter() }, + nil) testutil.Ok(t, err) srv := newStoreSeriesServer(ctx) @@ -461,7 +473,9 @@ func TestPrometheusStore_Series_ChunkHashCalculation_Integration(t *testing.T) { proxy, err := NewPrometheusStore(nil, nil, promclient.NewDefaultClient(), u, component.Sidecar, func() labels.Labels { return labels.FromStrings("region", "eu-west") }, - func() (int64, int64) { return 0, math.MaxInt64 }, nil) + func() (int64, int64) { return 0, math.MaxInt64 }, + func() bloom.Filter { return bloom.NewAlwaysTrueFilter() }, + nil) testutil.Ok(t, err) srv := newStoreSeriesServer(ctx) @@ -490,7 +504,9 @@ func TestPrometheusStore_Info(t *testing.T) { proxy, err := NewPrometheusStore(nil, nil, promclient.NewDefaultClient(), nil, component.Sidecar, func() labels.Labels { return labels.FromStrings("region", "eu-west") }, - func() (int64, int64) { return 123, 456 }, nil) + func() (int64, int64) { return 123, 456 }, + func() bloom.Filter { return bloom.NewAlwaysTrueFilter() }, + nil) testutil.Ok(t, err) resp, err := proxy.Info(ctx, &storepb.InfoRequest{}) @@ -568,7 +584,9 @@ func TestPrometheusStore_Series_SplitSamplesIntoChunksWithMaxSizeOf120(t *testin proxy, err := NewPrometheusStore(nil, nil, promclient.NewDefaultClient(), u, component.Sidecar, func() labels.Labels { return labels.FromStrings("region", "eu-west") }, - func() (int64, int64) { return 0, math.MaxInt64 }, nil) + func() (int64, int64) { return 0, math.MaxInt64 }, + func() bloom.Filter { return bloom.NewAlwaysTrueFilter() }, + nil) testutil.Ok(t, err) // We build chunks only for SAMPLES method. Make sure we ask for SAMPLES only. diff --git a/pkg/store/proxy.go b/pkg/store/proxy.go index d6056b524b6..d3191d2aace 100644 --- a/pkg/store/proxy.go +++ b/pkg/store/proxy.go @@ -23,6 +23,7 @@ import ( "google.golang.org/grpc/codes" "google.golang.org/grpc/status" + "github.com/thanos-io/thanos/pkg/bloom" "github.com/thanos-io/thanos/pkg/component" "github.com/thanos-io/thanos/pkg/store/labelpb" "github.com/thanos-io/thanos/pkg/store/storepb" @@ -47,6 +48,9 @@ type Client interface { // LabelSets that each apply to some data exposed by the backing store. LabelSets() []labels.Labels + // LabelNamesBloom returns a bloom filter for all label names. + LabelNamesBloom() bloom.Filter + // TimeRange returns minimum and maximum time range of data in the store. TimeRange() (mint int64, maxt int64) diff --git a/pkg/store/proxy_heap.go b/pkg/store/proxy_heap.go index 028ac81a7ec..7a16d126c36 100644 --- a/pkg/store/proxy_heap.go +++ b/pkg/store/proxy_heap.go @@ -540,8 +540,9 @@ func newAsyncRespSet( return nil, err } - var labelsToRemove map[string]struct{} - if !st.SupportsWithoutReplicaLabels() && len(req.WithoutReplicaLabels) > 0 { + labelsToRemove := make(map[string]struct{}) + dedupByInternalLabel := hasInternalReplicaLabels(st, req) + if dedupByInternalLabel || !st.SupportsWithoutReplicaLabels() && len(req.WithoutReplicaLabels) > 0 { level.Warn(logger).Log("msg", "detecting store that does not support without replica label setting. "+ "Falling back to eager retrieval with additional sort. Make sure your storeAPI supports it to speed up your queries", "store", st.String()) retrievalStrategy = EagerRetrieval @@ -819,3 +820,16 @@ type respSet interface { Labelset() string Empty() bool } + +// hasInternalReplicaLabels returns true if any replica label in the series request is not an +// external label for the given Client. +func hasInternalReplicaLabels(st Client, req *storepb.SeriesRequest) bool { + bloom := st.LabelNamesBloom() + for _, labelName := range req.WithoutReplicaLabels { + if bloom.Test(labelName) { + return true + } + } + + return false +} diff --git a/pkg/store/proxy_test.go b/pkg/store/proxy_test.go index 61d9bc1965d..0c4a0e46773 100644 --- a/pkg/store/proxy_test.go +++ b/pkg/store/proxy_test.go @@ -24,11 +24,14 @@ import ( "github.com/prometheus/prometheus/model/timestamp" "github.com/prometheus/prometheus/tsdb" "github.com/prometheus/prometheus/tsdb/chunkenc" + "github.com/stretchr/testify/require" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" "github.com/efficientgo/core/testutil" + + "github.com/thanos-io/thanos/pkg/bloom" "github.com/thanos-io/thanos/pkg/component" "github.com/thanos-io/thanos/pkg/store/labelpb" "github.com/thanos-io/thanos/pkg/store/storepb" @@ -592,6 +595,40 @@ func TestProxyStore_Series(t *testing.T) { }, }, }, + { + title: "deduplicate by TSDB internal label", + storeAPIs: []Client{ + &storetestutil.TestClient{ + StoreClient: &mockedStoreAPI{ + RespSeries: []*storepb.SeriesResponse{ + storeSeriesResponse(t, labels.FromStrings("env", "1", "id", "centos", "instance", "1"), []sample{{0, 0}}), + storeSeriesResponse(t, labels.FromStrings("env", "1", "id", "centos", "instance", "2"), []sample{{0, 0}}), + storeSeriesResponse(t, labels.FromStrings("env", "2", "id", "centos", "instance", "1"), []sample{{0, 0}}), + storeSeriesResponse(t, labels.FromStrings("env", "2", "id", "centos", "instance", "2"), []sample{{0, 0}}), + }, + }, + MinTime: 1, + MaxTime: 300, + LabelNamesBloomFilter: bloom.NewFilterForStrings("env", "id", "instance"), + }, + }, + req: &storepb.SeriesRequest{ + MinTime: 1, + MaxTime: 300, + Matchers: []storepb.LabelMatcher{{Name: "id", Value: "centos", Type: storepb.LabelMatcher_RE}}, + WithoutReplicaLabels: []string{"env"}, + }, + expectedSeries: []rawSeries{ + { + lset: labels.FromStrings("id", "centos", "instance", "1"), + chunks: [][]sample{{{0, 0}}}, + }, + { + lset: labels.FromStrings("id", "centos", "instance", "2"), + chunks: [][]sample{{{0, 0}}}, + }, + }, + }, } { t.Run(tc.title, func(t *testing.T) { for _, replicaLabelSupport := range []bool{false, true} { @@ -1561,7 +1598,7 @@ func seriesEquals(t *testing.T, expected []rawSeries, got []storepb.Series) { } for i := range ret { - testutil.Equals(t, expected[i], ret[i]) + require.Equal(t, expected[i], ret[i]) } } diff --git a/pkg/store/storepb/testutil/client.go b/pkg/store/storepb/testutil/client.go index b6916005a38..f6d2302356f 100644 --- a/pkg/store/storepb/testutil/client.go +++ b/pkg/store/storepb/testutil/client.go @@ -5,6 +5,8 @@ package storetestutil import ( "github.com/prometheus/prometheus/model/labels" + + "github.com/thanos-io/thanos/pkg/bloom" "github.com/thanos-io/thanos/pkg/store/storepb" ) @@ -18,6 +20,7 @@ type TestClient struct { Shardable bool WithoutReplicaLabelsEnabled bool IsLocalStore bool + LabelNamesBloomFilter bloom.Filter } func (c TestClient) LabelSets() []labels.Labels { return c.ExtLset } @@ -26,3 +29,4 @@ func (c TestClient) SupportsSharding() bool { return c.Shardable } func (c TestClient) SupportsWithoutReplicaLabels() bool { return c.WithoutReplicaLabelsEnabled } func (c TestClient) String() string { return c.Name } func (c TestClient) Addr() (string, bool) { return c.Name, c.IsLocalStore } +func (c TestClient) LabelNamesBloom() bloom.Filter { return c.LabelNamesBloomFilter }