diff --git a/pkg/storegateway/bucket.go b/pkg/storegateway/bucket.go index 7d2f0d71eb9..14197159759 100644 --- a/pkg/storegateway/bucket.go +++ b/pkg/storegateway/bucket.go @@ -239,23 +239,13 @@ func NewBucketStore( option(s) } - lazyLoadedBlocks, err := indexheader.RestoreLoadedBlocks(dir) - if err != nil { - level.Warn(s.logger).Log( - "msg", "loading the list of index-headers from snapshot file failed; not eagerly loading index-headers for tenant", - "dir", dir, - "err", err, - ) - // Don't fail initialization. If eager loading doesn't happen, then we will load index-headers lazily. - // Lazy loading which is slower, but not worth failing startup for. - } - s.indexReaderPool = indexheader.NewReaderPool(s.logger, bucketStoreConfig.IndexHeader, s.lazyLoadingGate, metrics.indexHeaderReaderMetrics, lazyLoadedBlocks) + s.indexReaderPool = indexheader.NewReaderPool(s.logger, bucketStoreConfig.IndexHeader, s.lazyLoadingGate, metrics.indexHeaderReaderMetrics) if bucketStoreConfig.IndexHeader.EagerLoadingStartupEnabled { snapConfig := indexheader.SnapshotterConfig{ Path: dir, UserID: userID, - PersistInterval: bucketStoreConfig.IndexHeader.LazyLoadingIdleTimeout, + PersistInterval: bucketStoreConfig.IndexHeader.EagerLoadingPersistInterval, } s.snapshotter = indexheader.NewSnapshotter(s.logger, snapConfig, s.indexReaderPool) } else { @@ -317,10 +307,10 @@ func (s *BucketStore) Stats() BucketStoreStats { // SyncBlocks synchronizes the stores state with the Bucket bucket. // It will reuse disk space as persistent cache based on s.dir param. func (s *BucketStore) SyncBlocks(ctx context.Context) error { - return s.syncBlocks(ctx, false) + return s.syncBlocks(ctx) } -func (s *BucketStore) syncBlocks(ctx context.Context, initialSync bool) error { +func (s *BucketStore) syncBlocks(ctx context.Context) error { metas, _, metaFetchErr := s.fetcher.Fetch(ctx) // For partial view allow adding new blocks at least. if metaFetchErr != nil && metas == nil { @@ -334,7 +324,7 @@ func (s *BucketStore) syncBlocks(ctx context.Context, initialSync bool) error { wg.Add(1) go func() { for meta := range blockc { - if err := s.addBlock(ctx, meta, initialSync); err != nil { + if err := s.addBlock(ctx, meta); err != nil { continue } } @@ -359,10 +349,7 @@ func (s *BucketStore) syncBlocks(ctx context.Context, initialSync bool) error { return metaFetchErr } - blockIDs := make([]ulid.ULID, 0) - s.blockSet.forEach(func(b *bucketBlock) { - blockIDs = append(blockIDs, b.meta.ULID) - }) + blockIDs := s.blockSet.blockULIDs() for _, id := range blockIDs { if _, ok := metas[id]; ok { continue @@ -385,10 +372,52 @@ func (s *BucketStore) syncBlocks(ctx context.Context, initialSync bool) error { // InitialSync perform blocking sync with extra step at the end to delete locally saved blocks that are no longer // present in the bucket. The mismatch of these can only happen between restarts, so we can do that only once per startup. func (s *BucketStore) InitialSync(ctx context.Context) error { - if err := s.syncBlocks(ctx, true); err != nil { + // Read the snapshot before running the sync. After we run a sync we'll start persisting the snapshots again, + // so we need to read the pre-shutdown snapshot before the sync. + previouslyLoadedBlocks := s.tryRestoreLoadedBlocksSet() + + if err := s.syncBlocks(ctx); err != nil { return errors.Wrap(err, "sync block") } + if s.indexHeaderCfg.EagerLoadingStartupEnabled { + s.loadBlocks(ctx, previouslyLoadedBlocks) + } + + err := s.cleanUpUnownedBlocks() + if err != nil { + return err + } + + return nil +} + +func (s *BucketStore) tryRestoreLoadedBlocksSet() map[ulid.ULID]int64 { + previouslyLoadedBlocks, err := indexheader.RestoreLoadedBlocks(s.dir) + if err != nil { + level.Warn(s.logger).Log( + "msg", "loading the list of index-headers from snapshot file failed; not eagerly loading index-headers for tenant", + "dir", s.dir, + "err", err, + ) + // Don't fail initialization. If eager loading doesn't happen, then we will load index-headers lazily. + // Lazy loading which is slower, but not worth failing startup for. + } + return previouslyLoadedBlocks +} + +func (s *BucketStore) loadBlocks(ctx context.Context, blocks map[ulid.ULID]int64) { + // This is not happening during a request so we can ignore the stats. + ignoredStats := newSafeQueryStats() + // We ignore the time the block was used because it can only be in the map if it was still loaded before the shutdown + s.blockSet.forEach(func(b *bucketBlock) { + if _, ok := blocks[b.meta.ULID]; !ok { + return + } + b.ensureIndexHeaderLoaded(ctx, ignoredStats) + }) +} +func (s *BucketStore) cleanUpUnownedBlocks() error { fis, err := os.ReadDir(s.dir) if err != nil { return errors.Wrap(err, "read dir") @@ -415,7 +444,7 @@ func (s *BucketStore) InitialSync(ctx context.Context) error { return nil } -func (s *BucketStore) addBlock(ctx context.Context, meta *block.Meta, initialSync bool) (err error) { +func (s *BucketStore) addBlock(ctx context.Context, meta *block.Meta) (err error) { dir := filepath.Join(s.dir, meta.ULID.String()) start := time.Now() @@ -441,7 +470,6 @@ func (s *BucketStore) addBlock(ctx context.Context, meta *block.Meta, initialSyn meta.ULID, s.postingOffsetsInMemSampling, s.indexHeaderCfg, - initialSync, ) if err != nil { return errors.Wrap(err, "create index header reader") @@ -507,10 +535,7 @@ func (s *BucketStore) removeBlock(id ulid.ULID) (returnErr error) { } func (s *BucketStore) removeAllBlocks() error { - blockIDs := make([]ulid.ULID, 0) - s.blockSet.forEach(func(b *bucketBlock) { - blockIDs = append(blockIDs, b.meta.ULID) - }) + blockIDs := s.blockSet.blockULIDs() errs := multierror.New() for _, id := range blockIDs { @@ -1878,6 +1903,14 @@ func (s *bucketBlockSet) forEach(fn func(b *bucketBlock)) { }) } +func (s *bucketBlockSet) blockULIDs() []ulid.ULID { + ulids := make([]ulid.ULID, 0, s.len()) + s.forEach(func(b *bucketBlock) { + ulids = append(ulids, b.meta.ULID) + }) + return ulids +} + // timerange returns the minimum and maximum timestamp available in the set. func (s *bucketBlockSet) timerange() (mint, maxt int64) { s.mtx.RLock() diff --git a/pkg/storegateway/bucket_e2e_test.go b/pkg/storegateway/bucket_e2e_test.go index 41ccadc45c6..bd1ddf694a4 100644 --- a/pkg/storegateway/bucket_e2e_test.go +++ b/pkg/storegateway/bucket_e2e_test.go @@ -8,9 +8,11 @@ package storegateway import ( "context" "fmt" + "math" "net/http" "os" "path/filepath" + "strings" "testing" "time" @@ -18,7 +20,9 @@ import ( "github.com/grafana/dskit/grpcutil" dskit_metrics "github.com/grafana/dskit/metrics" "github.com/grafana/dskit/services" + "github.com/oklog/ulid" "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/testutil" dto "github.com/prometheus/client_model/go" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/model/timestamp" @@ -35,6 +39,7 @@ import ( "github.com/grafana/mimir/pkg/storegateway/indexcache" "github.com/grafana/mimir/pkg/storegateway/indexheader" "github.com/grafana/mimir/pkg/storegateway/storepb" + "github.com/grafana/mimir/pkg/util/test" ) var ( @@ -110,17 +115,19 @@ func prepareTestBlocks(t testing.TB, now time.Time, count int, dir string, bkt o type prepareStoreConfig struct { tempDir string manyParts bool - maxSeriesPerBatch int chunksLimiterFactory ChunksLimiterFactory seriesLimiterFactory SeriesLimiterFactory series []labels.Labels indexCache indexcache.IndexCache metricsRegistry *prometheus.Registry + logger log.Logger postingsStrategy postingsSelectionStrategy // When nonOverlappingBlocks is false, prepare store creates 2 blocks per block range. // When nonOverlappingBlocks is true, it shifts the 2nd block ahead by 2hrs for every block range. // This way the first and the last blocks created have no overlapping blocks. nonOverlappingBlocks bool + numBlocks int + bucketStoreConfig mimir_tsdb.BucketStoreConfig } func (c *prepareStoreConfig) apply(opts ...prepareStoreConfigOption) *prepareStoreConfig { @@ -133,12 +140,24 @@ func (c *prepareStoreConfig) apply(opts ...prepareStoreConfigOption) *prepareSto func defaultPrepareStoreConfig(t testing.TB) *prepareStoreConfig { return &prepareStoreConfig{ metricsRegistry: prometheus.NewRegistry(), + numBlocks: 6, + logger: log.NewNopLogger(), tempDir: t.TempDir(), manyParts: false, - // We want to force each Series() call to use more than one batch to catch some edge cases. - // This should make the implementation slightly slower, although most tests time - // is dominated by the setup. - maxSeriesPerBatch: 10, + bucketStoreConfig: mimir_tsdb.BucketStoreConfig{ + // We want to force each Series() call to use more than one batch to catch some edge cases. + // This should make the implementation slightly slower, although most tests time + // is dominated by the setup. + StreamingBatchSize: 10, + BlockSyncConcurrency: 20, + PostingOffsetsInMemSampling: mimir_tsdb.DefaultPostingOffsetInMemorySampling, + IndexHeader: indexheader.Config{ + EagerLoadingStartupEnabled: true, + EagerLoadingPersistInterval: time.Minute, + LazyLoadingEnabled: true, + LazyLoadingIdleTimeout: time.Minute, + }, + }, seriesLimiterFactory: newStaticSeriesLimiterFactory(0), chunksLimiterFactory: newStaticChunksLimiterFactory(0), indexCache: noopCache{}, @@ -166,11 +185,10 @@ func withManyParts() prepareStoreConfigOption { func prepareStoreWithTestBlocks(t testing.TB, bkt objstore.Bucket, cfg *prepareStoreConfig) *storeSuite { extLset := labels.FromStrings("ext1", "value1") - - minTime, maxTime := prepareTestBlocks(t, time.Now(), 3, cfg.tempDir, bkt, cfg.series, extLset, cfg.nonOverlappingBlocks) + minTime, maxTime := prepareTestBlocks(t, time.Now(), cfg.numBlocks/2, cfg.tempDir, bkt, cfg.series, extLset, cfg.nonOverlappingBlocks) s := &storeSuite{ - logger: log.NewNopLogger(), + logger: cfg.logger, metricsRegistry: cfg.metricsRegistry, cache: &swappableCache{IndexCache: cfg.indexCache}, minTime: minTime, @@ -188,17 +206,7 @@ func prepareStoreWithTestBlocks(t testing.TB, bkt objstore.Bucket, cfg *prepareS objstore.WithNoopInstr(bkt), metaFetcher, cfg.tempDir, - mimir_tsdb.BucketStoreConfig{ - StreamingBatchSize: cfg.maxSeriesPerBatch, - BlockSyncConcurrency: 20, - PostingOffsetsInMemSampling: mimir_tsdb.DefaultPostingOffsetInMemorySampling, - IndexHeader: indexheader.Config{ - EagerLoadingStartupEnabled: true, - LazyLoadingEnabled: true, - LazyLoadingIdleTimeout: time.Minute, - EagerLoadingPersistInterval: time.Minute, - }, - }, + cfg.bucketStoreConfig, cfg.postingsStrategy, cfg.chunksLimiterFactory, cfg.seriesLimiterFactory, @@ -640,8 +648,6 @@ func TestBucketStore_Series_ChunksLimiter_e2e(t *testing.T) { t.Run(testName, func(t *testing.T) { for _, streamingBatchSize := range []int{0, 1, 5} { t.Run(fmt.Sprintf("streamingBatchSize=%d", streamingBatchSize), func(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() bkt := objstore.NewInMemBucket() prepConfig := defaultPrepareStoreConfig(t) @@ -649,7 +655,6 @@ func TestBucketStore_Series_ChunksLimiter_e2e(t *testing.T) { prepConfig.seriesLimiterFactory = newStaticSeriesLimiterFactory(testData.maxSeriesLimit) s := prepareStoreWithTestBlocks(t, bkt, prepConfig) - assert.NoError(t, s.store.SyncBlocks(ctx)) req := &storepb.SeriesRequest{ Matchers: []storepb.LabelMatcher{ @@ -678,6 +683,147 @@ func TestBucketStore_Series_ChunksLimiter_e2e(t *testing.T) { } } +func TestBucketStore_EagerLoading(t *testing.T) { + testCases := map[string]struct { + eagerLoadReaderEnabled bool + expectedEagerLoadedBlocks int + createLoadedBlocksSnapshotFn func([]ulid.ULID) map[ulid.ULID]int64 + }{ + "block is present in pre-shutdown loaded blocks and eager-loading is disabled": { + eagerLoadReaderEnabled: false, + expectedEagerLoadedBlocks: 0, + createLoadedBlocksSnapshotFn: func(blockIDs []ulid.ULID) map[ulid.ULID]int64 { + snapshot := make(map[ulid.ULID]int64) + for _, blockID := range blockIDs { + snapshot[blockID] = time.Now().UnixMilli() + } + return snapshot + }, + }, + "block is present in pre-shutdown loaded blocks and eager-loading is enabled, loading index header during initial sync": { + eagerLoadReaderEnabled: true, + expectedEagerLoadedBlocks: 6, + createLoadedBlocksSnapshotFn: func(blockIDs []ulid.ULID) map[ulid.ULID]int64 { + snapshot := make(map[ulid.ULID]int64) + for _, blockID := range blockIDs { + snapshot[blockID] = time.Now().UnixMilli() + } + return snapshot + }, + }, + "block is present in pre-shutdown loaded blocks and eager-loading is enabled, loading index header after initial sync": { + eagerLoadReaderEnabled: true, + expectedEagerLoadedBlocks: 6, + createLoadedBlocksSnapshotFn: func(blockIDs []ulid.ULID) map[ulid.ULID]int64 { + snapshot := make(map[ulid.ULID]int64) + for _, blockID := range blockIDs { + snapshot[blockID] = time.Now().UnixMilli() + } + return snapshot + }, + }, + "block is not present in pre-shutdown loaded blocks snapshot and eager-loading is enabled": { + eagerLoadReaderEnabled: true, + expectedEagerLoadedBlocks: 0, // although eager loading is enabled, this test will not do eager loading because the block ID is not in the lazy loaded file. + createLoadedBlocksSnapshotFn: func(_ []ulid.ULID) map[ulid.ULID]int64 { + // let's create a random fake blockID to be stored in lazy loaded headers file + fakeBlockID := ulid.MustNew(ulid.Now(), nil) + // this snapshot will refer to fake block, hence eager load wouldn't be executed for the real block that we test + return map[ulid.ULID]int64{fakeBlockID: time.Now().UnixMilli()} + }, + }, + "pre-shutdown loaded blocks snapshot doesn't exist and eager-loading is enabled": { + eagerLoadReaderEnabled: true, + expectedEagerLoadedBlocks: 0, + }, + } + + assertLoadedBlocks := func(t *testing.T, cfg *prepareStoreConfig, expectedLoadedBlocks int) { + assert.NoError(t, testutil.GatherAndCompare(cfg.metricsRegistry, strings.NewReader(fmt.Sprintf(` + # HELP cortex_bucket_store_indexheader_lazy_load_total Total number of index-header lazy load operations. + # TYPE cortex_bucket_store_indexheader_lazy_load_total counter + cortex_bucket_store_indexheader_lazy_load_total %d + `, expectedLoadedBlocks)), + "cortex_bucket_store_indexheader_lazy_load_total", + )) + } + + for testName, testData := range testCases { + testData := testData + t.Run(testName, func(t *testing.T) { + t.Parallel() + bkt := objstore.NewInMemBucket() + cfg := defaultPrepareStoreConfig(t) + cfg.logger = test.NewTestingLogger(t) + cfg.bucketStoreConfig.IndexHeader.EagerLoadingStartupEnabled = testData.eagerLoadReaderEnabled + ctx := context.Background() + + // Start the store so we generate some blocks and can use them in the mock snapshot. + store := prepareStoreWithTestBlocks(t, bkt, cfg) + assertLoadedBlocks(t, cfg, 0) + + if testData.createLoadedBlocksSnapshotFn != nil { + // Create the snapshot manually so that we don't rely on the periodic snapshotting. + loadedBlocks := store.store.blockSet.blockULIDs() + staticLoader := staticLoadedBlocks(testData.createLoadedBlocksSnapshotFn(loadedBlocks)) + snapshotter := indexheader.NewSnapshotter(cfg.logger, indexheader.SnapshotterConfig{ + PersistInterval: time.Hour, + Path: cfg.tempDir, + }, staticLoader) + + require.NoError(t, snapshotter.PersistLoadedBlocks()) + } + // Stop store and start a new one using the same directory. It should pick up the stored blocks. + require.NoError(t, services.StopAndAwaitTerminated(ctx, store.store)) + cfg.metricsRegistry = prometheus.NewRegistry() // The store-gateway will reregister its metrics; replace the registry to prevent a panic + cfg.numBlocks = 0 // we don't want to generate blocks again to speed the test up + + _ = prepareStoreWithTestBlocks(t, bkt, cfg) // we create and start the store only to trigger eager loading. + assertLoadedBlocks(t, cfg, testData.expectedEagerLoadedBlocks) + }) + } +} + +func TestBucketStore_PersistsLazyLoadedBlocks(t *testing.T) { + t.Parallel() + + const persistInterval = 100 * time.Millisecond + bkt := objstore.NewInMemBucket() + cfg := defaultPrepareStoreConfig(t) + cfg.logger = test.NewTestingLogger(t) + cfg.bucketStoreConfig.IndexHeader.EagerLoadingPersistInterval = persistInterval + cfg.bucketStoreConfig.IndexHeader.EagerLoadingStartupEnabled = true + ctx := context.Background() + + // Start the store so we generate some blocks and can use them in the mock snapshot. + store := prepareStoreWithTestBlocks(t, bkt, cfg) + // Wait for the snapshot to be persisted. + time.Sleep(persistInterval * 2) + + // The snapshot should be empty. + blocks, err := indexheader.RestoreLoadedBlocks(cfg.tempDir) + assert.NoError(t, err) + assert.Empty(t, blocks) + + // Run a simple request to trigger loading the blocks + resp, err := store.store.LabelNames(ctx, &storepb.LabelNamesRequest{End: math.MaxInt64}) + require.NoError(t, err) + assert.NotEmpty(t, resp.Names) + + // Wait for the snapshot to be persisted. + time.Sleep(persistInterval * 2) + + blocks, err = indexheader.RestoreLoadedBlocks(cfg.tempDir) + assert.NoError(t, err) + assert.Len(t, blocks, cfg.numBlocks) +} + +type staticLoadedBlocks map[ulid.ULID]int64 + +func (b staticLoadedBlocks) LoadedBlocks() map[ulid.ULID]int64 { + return b +} + func assertQueryStatsLabelNamesMetricsRecorded(t *testing.T, numLabelNames int, registry *prometheus.Registry) { t.Helper() diff --git a/pkg/storegateway/bucket_test.go b/pkg/storegateway/bucket_test.go index 187cfa1b533..60e878081b7 100644 --- a/pkg/storegateway/bucket_test.go +++ b/pkg/storegateway/bucket_test.go @@ -1830,7 +1830,7 @@ func TestBucketStore_Series_OneBlock_InMemIndexCacheSegfault(t *testing.T) { indexReaderPool: indexheader.NewReaderPool(log.NewNopLogger(), indexheader.Config{ LazyLoadingEnabled: false, LazyLoadingIdleTimeout: 0, - }, gate.NewNoop(), indexheader.NewReaderPoolMetrics(nil), nil), + }, gate.NewNoop(), indexheader.NewReaderPoolMetrics(nil)), blockSet: newBucketBlockSet(), metrics: NewBucketStoreMetrics(nil), postingsStrategy: selectAllStrategy{}, diff --git a/pkg/storegateway/indexheader/lazy_binary_reader.go b/pkg/storegateway/indexheader/lazy_binary_reader.go index 7999f7ad6a8..507bc1ae181 100644 --- a/pkg/storegateway/indexheader/lazy_binary_reader.go +++ b/pkg/storegateway/indexheader/lazy_binary_reader.go @@ -247,16 +247,6 @@ func (r *LazyBinaryReader) LabelNames(ctx context.Context) ([]string, error) { return loaded.reader.LabelNames(ctx) } -// EagerLoad attempts to eagerly load this index header. -func (r *LazyBinaryReader) EagerLoad(ctx context.Context) { - loaded := r.getOrLoadReader(ctx) - if loaded.err != nil { - level.Warn(r.logger).Log("msg", "eager loading of lazy loaded index-header failed; skipping", "err", loaded.err) - return - } - loaded.inUse.Done() -} - // getOrLoadReader ensures the underlying binary index-header reader has been successfully loaded. // Returns the reader, wait group that should be used to signal that usage of reader is finished, and an error on failure. // Must be called without lock. diff --git a/pkg/storegateway/indexheader/lazy_binary_reader_test.go b/pkg/storegateway/indexheader/lazy_binary_reader_test.go index a3eda5fc100..f8911d039a1 100644 --- a/pkg/storegateway/indexheader/lazy_binary_reader_test.go +++ b/pkg/storegateway/indexheader/lazy_binary_reader_test.go @@ -222,35 +222,6 @@ func TestLazyBinaryReader_LoadUnloadRaceCondition(t *testing.T) { }) } -func TestNewLazyBinaryReader_EagerLoadLazyLoadedIndexHeaders(t *testing.T) { - tmpDir, bkt, blockID := initBucketAndBlocksForTest(t) - - testLazyBinaryReader(t, bkt, tmpDir, blockID, func(t *testing.T, r *LazyBinaryReader, err error) { - r.EagerLoad(context.Background()) - - require.NoError(t, err) - t.Cleanup(func() { - require.NoError(t, r.Close()) - }) - - require.Equal(t, float64(1), promtestutil.ToFloat64(r.metrics.loadCount)) - require.Equal(t, float64(0), promtestutil.ToFloat64(r.metrics.unloadCount)) - - // The index should already be loaded, the following call will return reader already loaded above - v, err := r.IndexVersion(context.Background()) - require.NoError(t, err) - require.Equal(t, 2, v) - require.Equal(t, float64(1), promtestutil.ToFloat64(r.metrics.loadCount)) - require.Equal(t, float64(0), promtestutil.ToFloat64(r.metrics.unloadCount)) - - labelNames, err := r.LabelNames(context.Background()) - require.NoError(t, err) - require.Equal(t, []string{"a"}, labelNames) - require.Equal(t, float64(1), promtestutil.ToFloat64(r.metrics.loadCount)) - require.Equal(t, float64(0), promtestutil.ToFloat64(r.metrics.unloadCount)) - }) -} - func initBucketAndBlocksForTest(t testing.TB) (string, *filesystem.Bucket, ulid.ULID) { ctx := context.Background() diff --git a/pkg/storegateway/indexheader/reader_pool.go b/pkg/storegateway/indexheader/reader_pool.go index ec7082e38f7..c5f65a23950 100644 --- a/pkg/storegateway/indexheader/reader_pool.go +++ b/pkg/storegateway/indexheader/reader_pool.go @@ -52,13 +52,11 @@ type ReaderPool struct { // Keep track of all readers managed by the pool. lazyReadersMx sync.Mutex lazyReaders map[*LazyBinaryReader]struct{} - // Snapshot of blocks loaded by the pool before the last shutdown. - preShutdownLoadedBlocks map[ulid.ULID]int64 } // NewReaderPool makes a new ReaderPool. If lazy-loading is enabled, NewReaderPool also starts a background task for unloading idle Readers. -func NewReaderPool(logger log.Logger, indexHeaderConfig Config, lazyLoadingGate gate.Gate, metrics *ReaderPoolMetrics, loadedBlocks map[ulid.ULID]int64) *ReaderPool { - p := newReaderPool(logger, indexHeaderConfig, lazyLoadingGate, metrics, loadedBlocks) +func NewReaderPool(logger log.Logger, indexHeaderConfig Config, lazyLoadingGate gate.Gate, metrics *ReaderPoolMetrics) *ReaderPool { + p := newReaderPool(logger, indexHeaderConfig, lazyLoadingGate, metrics) // Start a goroutine to close idle readers (only if required). if p.lazyReaderEnabled && p.lazyReaderIdleTimeout > 0 { @@ -83,23 +81,22 @@ func NewReaderPool(logger log.Logger, indexHeaderConfig Config, lazyLoadingGate } // newReaderPool makes a new ReaderPool. -func newReaderPool(logger log.Logger, indexHeaderConfig Config, lazyLoadingGate gate.Gate, metrics *ReaderPoolMetrics, loadedBlocks map[ulid.ULID]int64) *ReaderPool { +func newReaderPool(logger log.Logger, indexHeaderConfig Config, lazyLoadingGate gate.Gate, metrics *ReaderPoolMetrics) *ReaderPool { return &ReaderPool{ - logger: logger, - metrics: metrics, - lazyReaderEnabled: indexHeaderConfig.LazyLoadingEnabled, - lazyReaderIdleTimeout: indexHeaderConfig.LazyLoadingIdleTimeout, - lazyReaders: make(map[*LazyBinaryReader]struct{}), - close: make(chan struct{}), - preShutdownLoadedBlocks: loadedBlocks, - lazyLoadingGate: lazyLoadingGate, + logger: logger, + metrics: metrics, + lazyReaderEnabled: indexHeaderConfig.LazyLoadingEnabled, + lazyReaderIdleTimeout: indexHeaderConfig.LazyLoadingIdleTimeout, + lazyReaders: make(map[*LazyBinaryReader]struct{}), + close: make(chan struct{}), + lazyLoadingGate: lazyLoadingGate, } } // NewBinaryReader creates and returns a new binary reader. If the pool has been configured // with lazy reader enabled, this function will return a lazy reader. The returned lazy reader // is tracked by the pool and automatically closed once the idle timeout expires. -func (p *ReaderPool) NewBinaryReader(ctx context.Context, logger log.Logger, bkt objstore.BucketReader, dir string, id ulid.ULID, postingOffsetsInMemSampling int, cfg Config, initialSync bool) (Reader, error) { +func (p *ReaderPool) NewBinaryReader(ctx context.Context, logger log.Logger, bkt objstore.BucketReader, dir string, id ulid.ULID, postingOffsetsInMemSampling int, cfg Config) (Reader, error) { var readerFactory func() (Reader, error) var reader Reader var err error @@ -109,19 +106,7 @@ func (p *ReaderPool) NewBinaryReader(ctx context.Context, logger log.Logger, bkt } if p.lazyReaderEnabled { - lazyBinaryReader, lazyErr := NewLazyBinaryReader(ctx, readerFactory, logger, bkt, dir, id, p.metrics.lazyReader, p.onLazyReaderClosed, p.lazyLoadingGate) - if lazyErr != nil { - return nil, lazyErr - } - - // we only try to eager load during initialSync - if initialSync && p.preShutdownLoadedBlocks != nil { - // we only eager load if we have preShutdownLoadedBlocks for the given block id - if p.preShutdownLoadedBlocks[id] > 0 { - lazyBinaryReader.EagerLoad(ctx) - } - } - reader, err = lazyBinaryReader, lazyErr + reader, err = NewLazyBinaryReader(ctx, readerFactory, logger, bkt, dir, id, p.metrics.lazyReader, p.onLazyReaderClosed, p.lazyLoadingGate) } else { reader, err = readerFactory() } diff --git a/pkg/storegateway/indexheader/reader_pool_test.go b/pkg/storegateway/indexheader/reader_pool_test.go index b27bf21d2e2..821fe4f0fca 100644 --- a/pkg/storegateway/indexheader/reader_pool_test.go +++ b/pkg/storegateway/indexheader/reader_pool_test.go @@ -29,9 +29,6 @@ func TestReaderPool_NewBinaryReader(t *testing.T) { tests := map[string]struct { lazyReaderEnabled bool lazyReaderIdleTimeout time.Duration - eagerLoadReaderEnabled bool - initialSync bool - createLoadedBlocksSnapshotFn func(blockId ulid.ULID) map[ulid.ULID]int64 expectedLoadCountMetricBeforeLabelNamesCall int expectedLoadCountMetricAfterLabelNamesCall int }{ @@ -49,50 +46,6 @@ func TestReaderPool_NewBinaryReader(t *testing.T) { lazyReaderIdleTimeout: time.Minute, expectedLoadCountMetricAfterLabelNamesCall: 1, }, - "block is present in pre-shutdown loaded blocks and eager-loading is enabled, loading index header during initial sync": { - lazyReaderEnabled: true, - lazyReaderIdleTimeout: time.Minute, - eagerLoadReaderEnabled: true, - initialSync: true, - expectedLoadCountMetricBeforeLabelNamesCall: 1, // the index header will be eagerly loaded before the operation - expectedLoadCountMetricAfterLabelNamesCall: 1, - createLoadedBlocksSnapshotFn: func(blockId ulid.ULID) map[ulid.ULID]int64 { - return map[ulid.ULID]int64{blockId: time.Now().UnixMilli()} - }, - }, - "block is present in pre-shutdown loaded blocks and eager-loading is enabled, loading index header after initial sync": { - lazyReaderEnabled: true, - lazyReaderIdleTimeout: time.Minute, - eagerLoadReaderEnabled: true, - initialSync: false, - expectedLoadCountMetricBeforeLabelNamesCall: 0, // the index header is not eager loaded if not during initial-sync - expectedLoadCountMetricAfterLabelNamesCall: 1, - createLoadedBlocksSnapshotFn: func(blockId ulid.ULID) map[ulid.ULID]int64 { - return map[ulid.ULID]int64{blockId: time.Now().UnixMilli()} - }, - }, - "block is not present in pre-shutdown loaded blocks snapshot and eager-loading is enabled": { - lazyReaderEnabled: true, - lazyReaderIdleTimeout: time.Minute, - eagerLoadReaderEnabled: true, - initialSync: true, - expectedLoadCountMetricBeforeLabelNamesCall: 0, // although eager loading is enabled, this test will not do eager loading because the block ID is not in the lazy loaded file. - expectedLoadCountMetricAfterLabelNamesCall: 1, - createLoadedBlocksSnapshotFn: func(_ ulid.ULID) map[ulid.ULID]int64 { - // let's create a random fake blockID to be stored in lazy loaded headers file - fakeBlockID := ulid.MustNew(ulid.Now(), rand.Reader) - // this snapshot will refer to fake block, hence eager load wouldn't be executed for the real block that we test - return map[ulid.ULID]int64{fakeBlockID: time.Now().UnixMilli()} - }, - }, - "pre-shutdown loaded blocks snapshot doesn't exist and eager-loading is enabled": { - lazyReaderEnabled: true, - lazyReaderIdleTimeout: time.Minute, - eagerLoadReaderEnabled: true, - initialSync: true, - expectedLoadCountMetricBeforeLabelNamesCall: 0, // although eager loading is enabled, this test will not do eager loading because there is no snapshot file - expectedLoadCountMetricAfterLabelNamesCall: 1, - }, } ctx := context.Background() @@ -100,22 +53,16 @@ func TestReaderPool_NewBinaryReader(t *testing.T) { for testName, testData := range tests { t.Run(testName, func(t *testing.T) { - var lazyLoadedBlocks map[ulid.ULID]int64 - if testData.createLoadedBlocksSnapshotFn != nil { - lazyLoadedBlocks = testData.createLoadedBlocksSnapshotFn(blockID) - require.NotNil(t, lazyLoadedBlocks) - } metrics := NewReaderPoolMetrics(nil) indexHeaderConfig := Config{ - LazyLoadingEnabled: testData.lazyReaderEnabled, - LazyLoadingIdleTimeout: testData.lazyReaderIdleTimeout, - EagerLoadingStartupEnabled: testData.eagerLoadReaderEnabled, + LazyLoadingEnabled: testData.lazyReaderEnabled, + LazyLoadingIdleTimeout: testData.lazyReaderIdleTimeout, } - pool := NewReaderPool(log.NewNopLogger(), indexHeaderConfig, gate.NewNoop(), metrics, lazyLoadedBlocks) + pool := NewReaderPool(log.NewNopLogger(), indexHeaderConfig, gate.NewNoop(), metrics) defer pool.Close() - r, err := pool.NewBinaryReader(ctx, log.NewNopLogger(), bkt, tmpDir, blockID, 3, indexHeaderConfig, testData.initialSync) + r, err := pool.NewBinaryReader(ctx, log.NewNopLogger(), bkt, tmpDir, blockID, 3, indexHeaderConfig) require.NoError(t, err) defer func() { require.NoError(t, r.Close()) }() @@ -143,10 +90,10 @@ func TestReaderPool_ShouldCloseIdleLazyReaders(t *testing.T) { LazyLoadingEnabled: true, LazyLoadingIdleTimeout: idleTimeout, EagerLoadingStartupEnabled: false, - }, gate.NewNoop(), metrics, nil) + }, gate.NewNoop(), metrics) defer pool.Close() - r, err := pool.NewBinaryReader(ctx, log.NewNopLogger(), bkt, tmpDir, blockID, 3, Config{}, false) + r, err := pool.NewBinaryReader(ctx, log.NewNopLogger(), bkt, tmpDir, blockID, 3, Config{}) require.NoError(t, err) defer func() { require.NoError(t, r.Close()) }() diff --git a/pkg/storegateway/indexheader/snapshotter.go b/pkg/storegateway/indexheader/snapshotter.go index 7618e3da58f..d3efc3ffd89 100644 --- a/pkg/storegateway/indexheader/snapshotter.go +++ b/pkg/storegateway/indexheader/snapshotter.go @@ -54,7 +54,7 @@ type BlocksLoader interface { } func (s *Snapshotter) persist(context.Context) error { - err := s.PersistLoadedBlocks(s.bl) + err := s.PersistLoadedBlocks() if err != nil { // Note, the decision here is to only log the error but not failing the job. We may reconsider that later. level.Warn(s.logger).Log("msg", "failed to persist list of lazy-loaded index headers", "err", err) @@ -63,9 +63,9 @@ func (s *Snapshotter) persist(context.Context) error { return nil } -func (s *Snapshotter) PersistLoadedBlocks(bl BlocksLoader) error { +func (s *Snapshotter) PersistLoadedBlocks() error { snapshot := &indexHeadersSnapshot{ - IndexHeaderLastUsedTime: bl.LoadedBlocks(), + IndexHeaderLastUsedTime: s.bl.LoadedBlocks(), UserID: s.conf.UserID, } data, err := json.Marshal(snapshot) diff --git a/pkg/storegateway/indexheader/snapshotter_test.go b/pkg/storegateway/indexheader/snapshotter_test.go index 5d626e6259b..826c48dbbe5 100644 --- a/pkg/storegateway/indexheader/snapshotter_test.go +++ b/pkg/storegateway/indexheader/snapshotter_test.go @@ -35,7 +35,7 @@ func TestSnapshotter_PersistAndRestoreLoadedBlocks(t *testing.T) { // First instance persists the original snapshot. s1 := NewSnapshotter(log.NewNopLogger(), config, testBlocksLoader) - err := s1.PersistLoadedBlocks(testBlocksLoader) + err := s1.PersistLoadedBlocks() require.NoError(t, err) persistedFile := filepath.Join(tmpDir, lazyLoadedHeadersListFileName)