Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

store-gateway: Move eager loading to BucketStore #8602

Merged
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 51 additions & 26 deletions pkg/storegateway/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,23 +239,13 @@ func NewBucketStore(
option(s)
}

lazyLoadedBlocks, err := indexheader.RestoreLoadedBlocks(dir)
if err != nil {
level.Warn(s.logger).Log(
"msg", "loading the list of index-headers from snapshot file failed; not eagerly loading index-headers for tenant",
"dir", dir,
"err", err,
)
// Don't fail initialization. If eager loading doesn't happen, then we will load index-headers lazily.
// Lazy loading which is slower, but not worth failing startup for.
}
s.indexReaderPool = indexheader.NewReaderPool(s.logger, bucketStoreConfig.IndexHeader, s.lazyLoadingGate, metrics.indexHeaderReaderMetrics, lazyLoadedBlocks)
s.indexReaderPool = indexheader.NewReaderPool(s.logger, bucketStoreConfig.IndexHeader, s.lazyLoadingGate, metrics.indexHeaderReaderMetrics)

if bucketStoreConfig.IndexHeader.EagerLoadingStartupEnabled {
snapConfig := indexheader.SnapshotterConfig{
Path: dir,
UserID: userID,
PersistInterval: bucketStoreConfig.IndexHeader.LazyLoadingIdleTimeout,
PersistInterval: bucketStoreConfig.IndexHeader.EagerLoadingPersistInterval,
}
s.snapshotter = indexheader.NewSnapshotter(s.logger, snapConfig, s.indexReaderPool)
} else {
Expand Down Expand Up @@ -317,10 +307,10 @@ func (s *BucketStore) Stats() BucketStoreStats {
// SyncBlocks synchronizes the stores state with the Bucket bucket.
// It will reuse disk space as persistent cache based on s.dir param.
func (s *BucketStore) SyncBlocks(ctx context.Context) error {
return s.syncBlocks(ctx, false)
return s.syncBlocks(ctx)
}

func (s *BucketStore) syncBlocks(ctx context.Context, initialSync bool) error {
func (s *BucketStore) syncBlocks(ctx context.Context) error {
metas, _, metaFetchErr := s.fetcher.Fetch(ctx)
// For partial view allow adding new blocks at least.
if metaFetchErr != nil && metas == nil {
Expand All @@ -334,7 +324,7 @@ func (s *BucketStore) syncBlocks(ctx context.Context, initialSync bool) error {
wg.Add(1)
go func() {
for meta := range blockc {
if err := s.addBlock(ctx, meta, initialSync); err != nil {
if err := s.addBlock(ctx, meta); err != nil {
continue
}
}
Expand All @@ -359,10 +349,7 @@ func (s *BucketStore) syncBlocks(ctx context.Context, initialSync bool) error {
return metaFetchErr
}

blockIDs := make([]ulid.ULID, 0)
s.blockSet.forEach(func(b *bucketBlock) {
blockIDs = append(blockIDs, b.meta.ULID)
})
blockIDs := s.blockSet.blockULIDs()
for _, id := range blockIDs {
if _, ok := metas[id]; ok {
continue
Expand All @@ -385,10 +372,44 @@ func (s *BucketStore) syncBlocks(ctx context.Context, initialSync bool) error {
// InitialSync perform blocking sync with extra step at the end to delete locally saved blocks that are no longer
// present in the bucket. The mismatch of these can only happen between restarts, so we can do that only once per startup.
func (s *BucketStore) InitialSync(ctx context.Context) error {
if err := s.syncBlocks(ctx, true); err != nil {
previouslyLoadedBlocks, err := indexheader.RestoreLoadedBlocks(s.dir)
if err != nil {
level.Warn(s.logger).Log(
"msg", "loading the list of index-headers from snapshot file failed; not eagerly loading index-headers for tenant",
"dir", s.dir,
"err", err,
)
// Don't fail initialization. If eager loading doesn't happen, then we will load index-headers lazily.
// Lazy loading which is slower, but not worth failing startup for.
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about we move this chunk inside s.loadBlocks (and maybe rename this method to s.restorePreviousBlocks)? We don't need to do it if EagerLoadingStartupEnabled is disabled. And also this will make the piece about "not failing the initialization" less awkward (IMHO).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

after we run syncBlocks the snapshotter will be running, so there's a theoretical race between the snapshotter and reading the snapshot from before the shutdown. It's not obvious, so I now added a comment explaining this and moved this whole thing into a function.

But if you have ideas for how to make this better I can do it

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It makes more sense now, thanks for explaining. I'm ok to stick to the current version. I think, the comments you've added makes it clear 👍

if err := s.syncBlocks(ctx); err != nil {
return errors.Wrap(err, "sync block")
}
if s.indexHeaderCfg.EagerLoadingStartupEnabled {
s.loadBlocks(ctx, previouslyLoadedBlocks)
}

err = s.cleanUpUnownedBlocks()
if err != nil {
return err
}

return nil
}

func (s *BucketStore) loadBlocks(ctx context.Context, blocks map[ulid.ULID]int64) {
// We ignore the time the block was used because it can only be in the map if it was still loaded before the shutdown
s.blockSet.forEach(func(b *bucketBlock) {
if _, ok := blocks[b.meta.ULID]; !ok {
return
}
if lazyReader, ok := b.indexHeaderReader.(*indexheader.LazyBinaryReader); ok {
lazyReader.EagerLoad(ctx)
}
dimitarvdimitrov marked this conversation as resolved.
Show resolved Hide resolved
})
}

func (s *BucketStore) cleanUpUnownedBlocks() error {
fis, err := os.ReadDir(s.dir)
if err != nil {
return errors.Wrap(err, "read dir")
Expand All @@ -415,7 +436,7 @@ func (s *BucketStore) InitialSync(ctx context.Context) error {
return nil
}

func (s *BucketStore) addBlock(ctx context.Context, meta *block.Meta, initialSync bool) (err error) {
func (s *BucketStore) addBlock(ctx context.Context, meta *block.Meta) (err error) {
dir := filepath.Join(s.dir, meta.ULID.String())
start := time.Now()

Expand All @@ -441,7 +462,6 @@ func (s *BucketStore) addBlock(ctx context.Context, meta *block.Meta, initialSyn
meta.ULID,
s.postingOffsetsInMemSampling,
s.indexHeaderCfg,
initialSync,
)
if err != nil {
return errors.Wrap(err, "create index header reader")
Expand Down Expand Up @@ -507,10 +527,7 @@ func (s *BucketStore) removeBlock(id ulid.ULID) (returnErr error) {
}

func (s *BucketStore) removeAllBlocks() error {
blockIDs := make([]ulid.ULID, 0)
s.blockSet.forEach(func(b *bucketBlock) {
blockIDs = append(blockIDs, b.meta.ULID)
})
blockIDs := s.blockSet.blockULIDs()

errs := multierror.New()
for _, id := range blockIDs {
Expand Down Expand Up @@ -1878,6 +1895,14 @@ func (s *bucketBlockSet) forEach(fn func(b *bucketBlock)) {
})
}

func (s *bucketBlockSet) blockULIDs() []ulid.ULID {
ulids := make([]ulid.ULID, 0, s.len())
s.forEach(func(b *bucketBlock) {
ulids = append(ulids, b.meta.ULID)
})
return ulids
}

// timerange returns the minimum and maximum timestamp available in the set.
func (s *bucketBlockSet) timerange() (mint, maxt int64) {
s.mtx.RLock()
Expand Down
Loading
Loading