From 36d36512dc559f7de01ce21920fed024458067cb Mon Sep 17 00:00:00 2001 From: Michael Hoffmann Date: Thu, 24 Aug 2023 15:44:57 +0200 Subject: [PATCH 1/5] store: fix error handling in decodePostings (#6650) Signed-off-by: Michael Hoffmann --- CHANGELOG.md | 2 ++ pkg/store/bucket.go | 5 ++--- pkg/store/bucket_test.go | 12 ++++++++++++ 3 files changed, 16 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 75185bddc4..bd51577517 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,8 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re ### Fixed +- [#6650](https://github.com/thanos-io/thanos/pull/6650) Store: fix error handling in decodePostings + ### Added ### Changed diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index f5b71b434a..780a2013aa 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -2864,14 +2864,13 @@ func (r *bucketIndexReader) decodeCachedPostings(b []byte) (index.Postings, []fu ) if isDiffVarintSnappyEncodedPostings(b) || isDiffVarintSnappyStreamedEncodedPostings(b) { s := time.Now() - clPostings, err := decodePostings(b) + l, err = decodePostings(b) r.stats.cachedPostingsDecompressions += 1 r.stats.CachedPostingsDecompressionTimeSum += time.Since(s) if err != nil { r.stats.cachedPostingsDecompressionErrors += 1 } else { - closeFns = append(closeFns, clPostings.close) - l = clPostings + closeFns = append(closeFns, l.(closeablePostings).close) } } else { _, l, err = r.dec.Postings(b) diff --git a/pkg/store/bucket_test.go b/pkg/store/bucket_test.go index 0d40f18bc3..3e71d83125 100644 --- a/pkg/store/bucket_test.go +++ b/pkg/store/bucket_test.go @@ -3330,3 +3330,15 @@ func TestExpandedPostingsRace(t *testing.T) { wg.Wait() } } + +func TestBucketIndexReader_decodeCachedPostingsErrors(t *testing.T) { + bir := bucketIndexReader{stats: &queryStats{}} + t.Run("should return error on broken cached postings without snappy prefix", func(t *testing.T) { + _, _, err := bir.decodeCachedPostings([]byte("foo")) + testutil.NotOk(t, err) + }) + t.Run("should return error on broken cached postings with snappy prefix", func(t *testing.T) { + _, _, err := bir.decodeCachedPostings(append([]byte(codecHeaderSnappy), []byte("foo")...)) + testutil.NotOk(t, err) + }) +} From a1ff1fc4569b071c13b97f72b8edef7f266fd945 Mon Sep 17 00:00:00 2001 From: Michael Hoffmann Date: Thu, 24 Aug 2023 20:13:28 +0200 Subject: [PATCH 2/5] store: fix ignored error in postings (#6654) Signed-off-by: Michael Hoffmann --- pkg/store/postings.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/store/postings.go b/pkg/store/postings.go index 066f52116c..28fbf0cf01 100644 --- a/pkg/store/postings.go +++ b/pkg/store/postings.go @@ -83,6 +83,7 @@ func (r *postingsReaderBuilder) Next() bool { _, err := r.r.Discard(int(from - r.lastOffset)) if err != nil { + r.e = err return false } r.lastOffset += from - r.lastOffset From 450c4ad78bbc6844cfad8a02e3d29d0b56a78505 Mon Sep 17 00:00:00 2001 From: Michael Hoffmann Date: Sat, 26 Aug 2023 09:22:04 +0200 Subject: [PATCH 3/5] Store: fix bufio pool handling (#6655) Signed-off-by: Michael Hoffmann --- CHANGELOG.md | 1 + pkg/store/bucket.go | 6 +++--- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index bd51577517..df4eb65db4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re ### Fixed - [#6650](https://github.com/thanos-io/thanos/pull/6650) Store: fix error handling in decodePostings +- [#6655](https://github.com/thanos-io/thanos/pull/6655) Store: fix bufio pool handling ### Added diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index 780a2013aa..3db587f742 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -2792,13 +2792,13 @@ func (r *bucketIndexReader) fetchPostings(ctx context.Context, keys []labels.Lab // We assume index does not have any ptrs that has 0 length. length := int64(part.End) - start - brdr := bufioReaderPool.Get().(*bufio.Reader) - defer bufioReaderPool.Put(brdr) - // Fetch from object storage concurrently and update stats and posting list. g.Go(func() error { begin := time.Now() + brdr := bufioReaderPool.Get().(*bufio.Reader) + defer bufioReaderPool.Put(brdr) + partReader, err := r.block.bkt.GetRange(ctx, r.block.indexFilename(), start, length) if err != nil { return errors.Wrap(err, "read postings range") From 7fbc950adc76cbc448d03d300efd634774cf5368 Mon Sep 17 00:00:00 2001 From: Filip Petkovski Date: Mon, 28 Aug 2023 12:19:35 +0200 Subject: [PATCH 4/5] Fix mutable stringset memory usage (#6669) This commit fixes the Insert function for the mutable stringset to only insert unique labels instead of adding every label to the set. Signed-off-by: Filip Petkovski --- pkg/store/bucket_e2e_test.go | 1 + pkg/stringset/set.go | 18 +++++++++++++++++- 2 files changed, 18 insertions(+), 1 deletion(-) diff --git a/pkg/store/bucket_e2e_test.go b/pkg/store/bucket_e2e_test.go index 85b1d597d8..7262dee155 100644 --- a/pkg/store/bucket_e2e_test.go +++ b/pkg/store/bucket_e2e_test.go @@ -799,6 +799,7 @@ func TestBucketStore_LabelNamesSet_e2e(t *testing.T) { for _, n := range []string{"a", "b", "c"} { testutil.Assert(t, filter.Has(n), "expected filter to have %s", n) } + testutil.Equals(t, 3, filter.Count()) }) } diff --git a/pkg/stringset/set.go b/pkg/stringset/set.go index defe699353..080071570f 100644 --- a/pkg/stringset/set.go +++ b/pkg/stringset/set.go @@ -10,6 +10,10 @@ import ( type Set interface { Has(string) bool HasAny([]string) bool + // Count returns the number of elements in the set. + // A value of -1 indicates infinite size and can be returned by a + // set representing all possible string values. + Count() int } type fixedSet struct { @@ -38,6 +42,10 @@ func (f fixedSet) Has(s string) bool { return f.cuckoo.Lookup([]byte(s)) } +func (f fixedSet) Count() int { + return int(f.cuckoo.Count()) +} + type mutableSet struct { cuckoo *cuckoo.ScalableCuckooFilter } @@ -54,7 +62,7 @@ func New() MutableSet { } func (e mutableSet) Insert(s string) { - e.cuckoo.Insert([]byte(s)) + e.cuckoo.InsertUnique([]byte(s)) } func (e mutableSet) Has(s string) bool { @@ -70,6 +78,10 @@ func (e mutableSet) HasAny(strings []string) bool { return false } +func (e mutableSet) Count() int { + return int(e.cuckoo.Count()) +} + type allStringsSet struct{} func (e allStringsSet) HasAny(_ []string) bool { @@ -83,3 +95,7 @@ func AllStrings() *allStringsSet { func (e allStringsSet) Has(_ string) bool { return true } + +func (e allStringsSet) Count() int { + return -1 +} From 5679429e0826d8e55422541682aa483d13257fec Mon Sep 17 00:00:00 2001 From: Saswata Mukherjee Date: Mon, 28 Aug 2023 16:01:59 +0530 Subject: [PATCH 5/5] Cut patch release v0.32.1 Signed-off-by: Saswata Mukherjee --- CHANGELOG.md | 14 +++++++++++++- VERSION | 2 +- 2 files changed, 14 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index df4eb65db4..24f0d4be18 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,8 +12,20 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re ### Fixed +### Added + +### Changed + +### Removed + +## [v0.32.1](https://github.com/thanos-io/thanos/tree/release-0.32) - 28.08.2023 + +### Fixed + - [#6650](https://github.com/thanos-io/thanos/pull/6650) Store: fix error handling in decodePostings +- [#6654](https://github.com/thanos-io/thanos/pull/6654) Store: fix ignored error in postings - [#6655](https://github.com/thanos-io/thanos/pull/6655) Store: fix bufio pool handling +- [#6669](https://github.com/thanos-io/thanos/pull/6669) Store: Fix mutable stringset memory usage ### Added @@ -21,7 +33,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re ### Removed -## [v0.32.0](https://github.com/thanos-io/thanos/tree/release-v0.32.0) - 23.08.2023 +## [v0.32.0](https://github.com/thanos-io/thanos/tree/release-0.32) - 23.08.2023 ### Added diff --git a/VERSION b/VERSION index 8a0d6d408f..bd03320d42 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -0.32.0 \ No newline at end of file +0.32.1 \ No newline at end of file