diff --git a/pkg/storegateway/bucket.go b/pkg/storegateway/bucket.go index 69d7066e009..e8991912102 100644 --- a/pkg/storegateway/bucket.go +++ b/pkg/storegateway/bucket.go @@ -262,10 +262,28 @@ func NewBucketStore( // RemoveBlocksAndClose remove all blocks from local disk and releases all resources associated with the BucketStore. func (s *BucketStore) RemoveBlocksAndClose() error { + return s.removeBlocksAndClose(context.Background(), false) +} + +// RemoveBlocksCloseAndWait remove all blocks from local disk, releases all resources associated with the BucketStore +// and waits until all dependencies have been stopped. +func (s *BucketStore) RemoveBlocksCloseAndWait(ctx context.Context) error { + return s.removeBlocksAndClose(ctx, true) +} + +func (s *BucketStore) removeBlocksAndClose(ctx context.Context, wait bool) error { err := s.removeAllBlocks() // Release other resources even if it failed to close some blocks. - s.snapshotter.Stop() + if wait { + if stopErr := s.snapshotter.Stop(ctx); stopErr != nil && err == nil { + // Do not return a multi error so that we're sure the unwrapping works fine. + err = stopErr + } + } else { + s.snapshotter.StopAsync() + } + s.indexReaderPool.Close() return err diff --git a/pkg/storegateway/bucket_e2e_test.go b/pkg/storegateway/bucket_e2e_test.go index 35bcc40f112..e3a125980cd 100644 --- a/pkg/storegateway/bucket_e2e_test.go +++ b/pkg/storegateway/bucket_e2e_test.go @@ -207,7 +207,9 @@ func prepareStoreWithTestBlocks(t testing.TB, bkt objstore.Bucket, cfg *prepareS ) assert.NoError(t, err) t.Cleanup(func() { - assert.NoError(t, s.store.RemoveBlocksAndClose()) + // Make sure to wait until all store dependencies have been stopped / closed + // in order to avoid flaky tests. + assert.NoError(t, s.store.RemoveBlocksCloseAndWait(context.Background())) }) s.store = store diff --git a/pkg/storegateway/indexheader/snapshotter.go b/pkg/storegateway/indexheader/snapshotter.go index 08e9d03bd82..e5482551d8b 100644 --- a/pkg/storegateway/indexheader/snapshotter.go +++ b/pkg/storegateway/indexheader/snapshotter.go @@ -30,14 +30,16 @@ type Snapshotter struct { logger log.Logger conf SnapshotterConfig - stop chan struct{} + stop chan struct{} // Closed to signal the Snapshotter should be stopped. + stopped chan struct{} // Closed one the Snapshotter has been successfully stopped. } func NewSnapshotter(logger log.Logger, conf SnapshotterConfig) *Snapshotter { return &Snapshotter{ - logger: logger, - conf: conf, - stop: make(chan struct{}), + logger: logger, + conf: conf, + stop: make(chan struct{}), + stopped: make(chan struct{}), } } @@ -48,6 +50,8 @@ type blocksLoader interface { // Start spawns a background job that periodically persists the list of lazy-loaded index headers. func (s *Snapshotter) Start(ctx context.Context, bl blocksLoader) { go func() { + defer close(s.stopped) + err := s.PersistLoadedBlocks(bl) if err != nil { // Note, the decision here is to only log the error but not failing the job. We may reconsider that later. @@ -72,10 +76,24 @@ func (s *Snapshotter) Start(ctx context.Context, bl blocksLoader) { }() } -func (s *Snapshotter) Stop() { +// StopAsync stops the Snapshotter but doesn't wait until stopped. +func (s *Snapshotter) StopAsync() { close(s.stop) } +// Stop stops the Snapshotter and waits until stopped. +func (s *Snapshotter) Stop(ctx context.Context) error { + s.StopAsync() + + select { + case <-ctx.Done(): + return ctx.Err() + + case <-s.stopped: + return nil + } +} + func (s *Snapshotter) PersistLoadedBlocks(bl blocksLoader) error { snapshot := &indexHeadersSnapshot{ IndexHeaderLastUsedTime: bl.LoadedBlocks(),