Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix TestLabelNames_Cancelled flakyness #8434

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 19 additions & 1 deletion pkg/storegateway/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,10 +262,28 @@ func NewBucketStore(

// RemoveBlocksAndClose remove all blocks from local disk and releases all resources associated with the BucketStore.
func (s *BucketStore) RemoveBlocksAndClose() error {
return s.removeBlocksAndClose(context.Background(), false)
}

// RemoveBlocksCloseAndWait remove all blocks from local disk, releases all resources associated with the BucketStore
// and waits until all dependencies have been stopped.
func (s *BucketStore) RemoveBlocksCloseAndWait(ctx context.Context) error {
return s.removeBlocksAndClose(ctx, true)
}
Comment on lines 264 to +272
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should be having this synchronous in all cases. If the snapshotter shuts down async, then it can create a file after the tenant directory has been cleaned up. Previously RemoveBlocksAndClose has been synchronous anyways.

having said that, it's also ok to not necessarily propagate the context all the way to RemoveBlocksCloseAndWait. This will be done soon with #8389

Copy link
Contributor

@narqo narqo Jun 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

having said that, it's also ok to not necessarily propagate the context all the way to RemoveBlocksCloseAndWait.

I think we need to, actually. At least for a case when the first syncBlocks fails (or was cancelled), the Snapshotter won't start. In this case, the call to Snapshotter.Stop will block, waiting on an empty channel.

With the above at hand, I wonder if it would be simpler to replace Snapshotter.stopped channel with a WaitGroup:

func Start() {
  wg.Add(1)
  go func() {
      defer wg.Done()
   }
}

func Stop() {
  close(stop)
  wg.Wait()
}

Although, the above it a (nit), given the plans for #8389

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, let's keep the current implementation. This is a nightmare to navigate


func (s *BucketStore) removeBlocksAndClose(ctx context.Context, wait bool) error {
err := s.removeAllBlocks()

// Release other resources even if it failed to close some blocks.
s.snapshotter.Stop()
if wait {
if stopErr := s.snapshotter.Stop(ctx); stopErr != nil && err == nil {
// Do not return a multi error so that we're sure the unwrapping works fine.
err = stopErr
}
} else {
s.snapshotter.StopAsync()
}

s.indexReaderPool.Close()

return err
Expand Down
4 changes: 3 additions & 1 deletion pkg/storegateway/bucket_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,9 @@ func prepareStoreWithTestBlocks(t testing.TB, bkt objstore.Bucket, cfg *prepareS
)
assert.NoError(t, err)
t.Cleanup(func() {
assert.NoError(t, s.store.RemoveBlocksAndClose())
// Make sure to wait until all store dependencies have been stopped / closed
// in order to avoid flaky tests.
assert.NoError(t, s.store.RemoveBlocksCloseAndWait(context.Background()))
})

s.store = store
Expand Down
28 changes: 23 additions & 5 deletions pkg/storegateway/indexheader/snapshotter.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,16 @@ type Snapshotter struct {
logger log.Logger
conf SnapshotterConfig

stop chan struct{}
stop chan struct{} // Closed to signal the Snapshotter should be stopped.
stopped chan struct{} // Closed one the Snapshotter has been successfully stopped.
}

func NewSnapshotter(logger log.Logger, conf SnapshotterConfig) *Snapshotter {
return &Snapshotter{
logger: logger,
conf: conf,
stop: make(chan struct{}),
logger: logger,
conf: conf,
stop: make(chan struct{}),
stopped: make(chan struct{}),
}
}

Expand All @@ -48,6 +50,8 @@ type blocksLoader interface {
// Start spawns a background job that periodically persists the list of lazy-loaded index headers.
func (s *Snapshotter) Start(ctx context.Context, bl blocksLoader) {
go func() {
defer close(s.stopped)

err := s.PersistLoadedBlocks(bl)
if err != nil {
// Note, the decision here is to only log the error but not failing the job. We may reconsider that later.
Expand All @@ -72,10 +76,24 @@ func (s *Snapshotter) Start(ctx context.Context, bl blocksLoader) {
}()
}

func (s *Snapshotter) Stop() {
// StopAsync stops the Snapshotter but doesn't wait until stopped.
func (s *Snapshotter) StopAsync() {
close(s.stop)
}

// Stop stops the Snapshotter and waits until stopped.
func (s *Snapshotter) Stop(ctx context.Context) error {
s.StopAsync()

select {
case <-ctx.Done():
return ctx.Err()

case <-s.stopped:
return nil
}
}

func (s *Snapshotter) PersistLoadedBlocks(bl blocksLoader) error {
snapshot := &indexHeadersSnapshot{
IndexHeaderLastUsedTime: bl.LoadedBlocks(),
Expand Down
Loading