diff --git a/CHANGELOG.md b/CHANGELOG.md index 5ceefc193a..e28c8a1242 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -24,6 +24,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re - [#4663](https://github.com/thanos-io/thanos/pull/4663) Fetcher: Fix discovered data races. - [#4754](https://github.com/thanos-io/thanos/pull/4754) Query: Fix possible panic on stores endpoint. - [#4753](https://github.com/thanos-io/thanos/pull/4753) Store: validate block sync concurrency parameter +- [#4792](https://github.com/thanos-io/thanos/pull/4792) Store: Fix data race in BucketedBytes pool. ## [v0.23.1](https://github.com/thanos-io/thanos/tree/release-0.23) - 2021.10.1 diff --git a/pkg/pool/pool.go b/pkg/pool/pool.go index cbd034e9e7..a7eb98c854 100644 --- a/pkg/pool/pool.go +++ b/pkg/pool/pool.go @@ -107,8 +107,9 @@ func (p *BucketedBytes) Put(b *[]byte) { return } + sz := cap(*b) for i, bktSize := range p.sizes { - if cap(*b) > bktSize { + if sz > bktSize { continue } *b = (*b)[:0] @@ -118,13 +119,11 @@ func (p *BucketedBytes) Put(b *[]byte) { p.mtx.Lock() defer p.mtx.Unlock() - // We could assume here that our users will not make the slices larger // but lets be on the safe side to avoid an underflow of p.usedTotal. - sz := uint64(cap(*b)) - if sz >= p.usedTotal { + if uint64(sz) >= p.usedTotal { p.usedTotal = 0 } else { - p.usedTotal -= sz + p.usedTotal -= uint64(sz) } } diff --git a/pkg/pool/pool_test.go b/pkg/pool/pool_test.go index a4140361d2..14c8350acb 100644 --- a/pkg/pool/pool_test.go +++ b/pkg/pool/pool_test.go @@ -4,8 +4,7 @@ package pool import ( - "bytes" - "fmt" + "strings" "sync" "testing" "time" @@ -71,52 +70,57 @@ func TestRacePutGet(t *testing.T) { s := sync.WaitGroup{} - // Start two goroutines: they always Get and Put two byte slices - // to which they write 'foo' / 'barbazbaz' and check if the data is still + const goroutines = 100 + + // Start multiple goroutines: they always Get and Put two byte slices + // to which they write their contents and check if the data is still // there after writing it, before putting it back. - errs := make(chan error, 2) - stop := make(chan bool, 2) + errs := make(chan error, goroutines) + stop := make(chan struct{}) - f := func(txt string) { + f := func(txt string, grow bool) { defer s.Done() for { select { case <-stop: return default: - c, err := chunkPool.Get(3) - if err != nil { - errs <- errors.Wrapf(err, "goroutine %s", txt) - return - } - - buf := bytes.NewBuffer(*c) - - _, err = fmt.Fprintf(buf, "%s", txt) + c, err := chunkPool.Get(len(txt)) if err != nil { errs <- errors.Wrapf(err, "goroutine %s", txt) return } - if buf.String() != txt { + *c = append(*c, txt...) + if string(*c) != txt { errs <- errors.New("expected to get the data just written") return } + if grow { + *c = append(*c, txt...) + *c = append(*c, txt...) + if string(*c) != txt+txt+txt { + errs <- errors.New("expected to get the data just written") + return + } + } - b := buf.Bytes() - chunkPool.Put(&b) + chunkPool.Put(c) } } } - s.Add(2) - go f("foo") - go f("barbazbaz") - - time.Sleep(5 * time.Second) - stop <- true - stop <- true + for i := 0; i < goroutines; i++ { + s.Add(1) + // make sure we start multiple goroutines with same len buf requirements, to hit same pools + s := strings.Repeat(string(byte(i)), i%10) + // some of the goroutines will append more elements to the provided slice + grow := i%2 == 0 + go f(s, grow) + } + time.Sleep(1 * time.Second) + close(stop) s.Wait() select { case err := <-errs: