From 486c0338d8f5f66aeeeba074a1e78563e5c37a59 Mon Sep 17 00:00:00 2001 From: Vladimir Varankin Date: Fri, 7 Jun 2024 14:46:46 +0200 Subject: [PATCH] indexheader: move lazy-loaded snapshotting into a separate component Signed-off-by: Vladimir Varankin --- pkg/storegateway/bucket.go | 37 +++-- pkg/storegateway/indexheader/reader_pool.go | 146 ++------------------ pkg/storegateway/indexheader/snapshotter.go | 145 +++++++++++++++++++ 3 files changed, 180 insertions(+), 148 deletions(-) create mode 100644 pkg/storegateway/indexheader/snapshotter.go diff --git a/pkg/storegateway/bucket.go b/pkg/storegateway/bucket.go index c5e1118df26..eaacf1546d0 100644 --- a/pkg/storegateway/bucket.go +++ b/pkg/storegateway/bucket.go @@ -81,15 +81,16 @@ type BucketStoreStats struct { // This makes them smaller, but takes extra CPU and memory. // When used with in-memory cache, memory usage should decrease overall, thanks to postings being smaller. type BucketStore struct { - userID string - logger log.Logger - metrics *BucketStoreMetrics - bkt objstore.InstrumentedBucketReader - fetcher block.MetadataFetcher - dir string - indexCache indexcache.IndexCache - indexReaderPool *indexheader.ReaderPool - seriesHashCache *hashcache.SeriesHashCache + userID string + logger log.Logger + metrics *BucketStoreMetrics + bkt objstore.InstrumentedBucketReader + fetcher block.MetadataFetcher + dir string + indexCache indexcache.IndexCache + indexReaderPool *indexheader.ReaderPool + indexHeadersSnapshotter *indexheader.Snapshotter + seriesHashCache *hashcache.SeriesHashCache // Sets of blocks that have the same labels. They are indexed by a hash over their label set. blocksMx sync.RWMutex @@ -238,12 +239,15 @@ func NewBucketStore( option(s) } - lazyLoadedSnapshotConfig := indexheader.LazyLoadedHeadersSnapshotConfig{ - Path: dir, - UserID: userID, + snapConfig := indexheader.SnapshotterConfig{ + Enabled: bucketStoreConfig.IndexHeader.EagerLoadingStartupEnabled, + Path: dir, + UserID: userID, } - // Depend on the options - s.indexReaderPool = indexheader.NewReaderPool(s.logger, bucketStoreConfig.IndexHeader, s.lazyLoadingGate, metrics.indexHeaderReaderMetrics, lazyLoadedSnapshotConfig) + s.indexHeadersSnapshotter = indexheader.NewSnapshotter(s.logger, snapConfig) + + lazyLoadedBlocks := s.indexHeadersSnapshotter.RestoreLoadedBlocks() + s.indexReaderPool = indexheader.NewReaderPool(s.logger, bucketStoreConfig.IndexHeader, s.lazyLoadingGate, metrics.indexHeaderReaderMetrics, lazyLoadedBlocks) if err := os.MkdirAll(dir, 0750); err != nil { return nil, errors.Wrap(err, "create dir") @@ -257,6 +261,7 @@ func (s *BucketStore) RemoveBlocksAndClose() error { err := s.removeAllBlocks() // Release other resources even if it failed to close some blocks. + s.indexHeadersSnapshotter.Stop() s.indexReaderPool.Close() return err @@ -361,6 +366,10 @@ func (s *BucketStore) InitialSync(ctx context.Context) error { return errors.Wrap(err, "sync block") } + if err := s.indexHeadersSnapshotter.Start(ctx, s.indexReaderPool); err != nil { + return errors.Wrap(err, "start index headers snapshotter") + } + fis, err := os.ReadDir(s.dir) if err != nil { return errors.Wrap(err, "read dir") diff --git a/pkg/storegateway/indexheader/reader_pool.go b/pkg/storegateway/indexheader/reader_pool.go index 83ee1c3ae40..693c40d761f 100644 --- a/pkg/storegateway/indexheader/reader_pool.go +++ b/pkg/storegateway/indexheader/reader_pool.go @@ -6,11 +6,7 @@ package indexheader import ( - "bytes" "context" - "encoding/json" - "os" - "path/filepath" "sync" "time" @@ -21,12 +17,8 @@ import ( "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/thanos-io/objstore" - - "github.com/grafana/mimir/pkg/util/atomicfs" ) -const lazyLoadedHeadersListFileName = "lazy-loaded.json" - // ReaderPoolMetrics holds metrics tracked by ReaderPool. type ReaderPoolMetrics struct { lazyReader *LazyBinaryReaderMetrics @@ -58,52 +50,15 @@ type ReaderPool struct { close chan struct{} // Keep track of all readers managed by the pool. - lazyReadersMx sync.Mutex - lazyReaders map[*LazyBinaryReader]struct{} - preShutdownLoadedBlocks *lazyLoadedHeadersSnapshot -} - -// LazyLoadedHeadersSnapshotConfig stores information needed to track lazy loaded index headers. -type LazyLoadedHeadersSnapshotConfig struct { - // Path stores where lazy loaded blocks will be tracked in a single file per tenant - Path string - UserID string -} - -type lazyLoadedHeadersSnapshot struct { - // IndexHeaderLastUsedTime is map of index header ulid.ULID to timestamp in millisecond. - IndexHeaderLastUsedTime map[ulid.ULID]int64 `json:"index_header_last_used_time"` - UserID string `json:"user_id"` + lazyReadersMx sync.Mutex + lazyReaders map[*LazyBinaryReader]struct{} + // Snapshot of blocks loaded by the pool before the last shutdown. + preShutdownLoadedBlocks map[ulid.ULID]int64 } -// persist atomically writes this snapshot to persistDir. -func (l lazyLoadedHeadersSnapshot) persist(persistDir string) error { - // Create temporary path for fsync. - // We don't use temporary folder because the process might not have access to the temporary folder. - tmpPath := filepath.Join(persistDir, "tmp-"+lazyLoadedHeadersListFileName) - // the actual path we want to store the file in - finalPath := filepath.Join(persistDir, lazyLoadedHeadersListFileName) - - data, err := json.Marshal(l) - if err != nil { - return err - } - - return atomicfs.CreateFileAndMove(tmpPath, finalPath, bytes.NewReader(data)) -} - -// NewReaderPool makes a new ReaderPool. If lazy-loading is enabled, NewReaderPool also starts a background task for unloading idle Readers and persisting a list of loaded Readers to disk. -func NewReaderPool(logger log.Logger, indexHeaderConfig Config, lazyLoadingGate gate.Gate, metrics *ReaderPoolMetrics, lazyLoadedSnapshotConfig LazyLoadedHeadersSnapshotConfig) *ReaderPool { - var snapshot *lazyLoadedHeadersSnapshot - - // Eager loading can only be enabled if lazy-loading is enabled. - eagerLoadingEnabled := indexHeaderConfig.LazyLoadingEnabled && indexHeaderConfig.EagerLoadingStartupEnabled - - if eagerLoadingEnabled { - snapshot = tryRestoreLazyLoadedHeadersSnapshot(logger, lazyLoadedSnapshotConfig) - } - - p := newReaderPool(logger, indexHeaderConfig, lazyLoadingGate, metrics, snapshot) +// NewReaderPool makes a new ReaderPool. If lazy-loading is enabled, NewReaderPool also starts a background task for unloading idle Readers. +func NewReaderPool(logger log.Logger, indexHeaderConfig Config, lazyLoadingGate gate.Gate, metrics *ReaderPoolMetrics, loadedBlocks map[ulid.ULID]int64) *ReaderPool { + p := newReaderPool(logger, indexHeaderConfig, lazyLoadingGate, metrics, loadedBlocks) // Start a goroutine to close idle readers (only if required). if p.lazyReaderEnabled && p.lazyReaderIdleTimeout > 0 { @@ -112,35 +67,12 @@ func NewReaderPool(logger log.Logger, indexHeaderConfig Config, lazyLoadingGate go func() { tickerIdleReader := time.NewTicker(checkFreq) defer tickerIdleReader.Stop() - - var lazyLoadC <-chan time.Time - - if eagerLoadingEnabled { - tickerLazyLoadPersist := time.NewTicker(time.Minute) - defer tickerLazyLoadPersist.Stop() - - lazyLoadC = tickerLazyLoadPersist.C - } - for { select { case <-p.close: return case <-tickerIdleReader.C: p.closeIdleReaders() - case t := <-lazyLoadC: - // minUsedAt is the threshold for how recently used should the block be to stay in the snapshot; - // we add an extra couple of minutes to make sure the pool closes the idle readers. - dur := p.lazyReaderIdleTimeout + (10 * time.Minute) - minUsedAt := t.Truncate(time.Minute).Add(-dur).UnixMilli() - snapshot := lazyLoadedHeadersSnapshot{ - IndexHeaderLastUsedTime: p.LoadedBlocks(minUsedAt), - UserID: lazyLoadedSnapshotConfig.UserID, - } - - if err := snapshot.persist(lazyLoadedSnapshotConfig.Path); err != nil { - level.Warn(p.logger).Log("msg", "failed to persist list of lazy-loaded index headers", "err", err) - } } } @@ -150,34 +82,8 @@ func NewReaderPool(logger log.Logger, indexHeaderConfig Config, lazyLoadingGate return p } -func tryRestoreLazyLoadedHeadersSnapshot(logger log.Logger, cfg LazyLoadedHeadersSnapshotConfig) *lazyLoadedHeadersSnapshot { - fileName := filepath.Join(cfg.Path, lazyLoadedHeadersListFileName) - snapshot, err := loadLazyLoadedHeadersSnapshot(fileName) - if os.IsNotExist(err) { - // We didn't find the snapshot. Could be because we crashed after restoring it last time - // or because the previous binary didn't support eager loading. - // Either way, we can continue without eagerly loading blocks. - // Since the file wasn't found, we also won't try to remove it. - return nil - } - if err != nil { - level.Warn(logger).Log( - "msg", "loading the list of index-headers from snapshot file failed; not eagerly loading index-headers for tenant", - "file", fileName, - "err", err, - ) - } - // We will remove the file regardless whether err is nil or not nil. - // In the case such as snapshot loading causing OOM, we will still - // remove the snapshot and lazy load after server is restarted. - if err := os.Remove(fileName); err != nil { - level.Warn(logger).Log("msg", "removing the lazy-loaded index-header snapshot failed", "file", fileName, "err", err) - } - return snapshot -} - // newReaderPool makes a new ReaderPool. -func newReaderPool(logger log.Logger, indexHeaderConfig Config, lazyLoadingGate gate.Gate, metrics *ReaderPoolMetrics, lazyLoadedHeadersSnapshot *lazyLoadedHeadersSnapshot) *ReaderPool { +func newReaderPool(logger log.Logger, indexHeaderConfig Config, lazyLoadingGate gate.Gate, metrics *ReaderPoolMetrics, loadedBlocks map[ulid.ULID]int64) *ReaderPool { return &ReaderPool{ logger: logger, metrics: metrics, @@ -185,24 +91,11 @@ func newReaderPool(logger log.Logger, indexHeaderConfig Config, lazyLoadingGate lazyReaderIdleTimeout: indexHeaderConfig.LazyLoadingIdleTimeout, lazyReaders: make(map[*LazyBinaryReader]struct{}), close: make(chan struct{}), - preShutdownLoadedBlocks: lazyLoadedHeadersSnapshot, + preShutdownLoadedBlocks: loadedBlocks, lazyLoadingGate: lazyLoadingGate, } } -func loadLazyLoadedHeadersSnapshot(fileName string) (*lazyLoadedHeadersSnapshot, error) { - snapshotBytes, err := os.ReadFile(fileName) - if err != nil { - return nil, err - } - snapshot := &lazyLoadedHeadersSnapshot{} - err = json.Unmarshal(snapshotBytes, snapshot) - if err != nil { - return nil, err - } - return snapshot, nil -} - // NewBinaryReader creates and returns a new binary reader. If the pool has been configured // with lazy reader enabled, this function will return a lazy reader. The returned lazy reader // is tracked by the pool and automatically closed once the idle timeout expires. @@ -224,7 +117,7 @@ func (p *ReaderPool) NewBinaryReader(ctx context.Context, logger log.Logger, bkt // we only try to eager load during initialSync if initialSync && p.preShutdownLoadedBlocks != nil { // we only eager load if we have preShutdownLoadedBlocks for the given block id - if p.preShutdownLoadedBlocks.IndexHeaderLastUsedTime[id] > 0 { + if p.preShutdownLoadedBlocks[id] > 0 { lazyBinaryReader.EagerLoad() } } @@ -296,8 +189,7 @@ func (p *ReaderPool) onLazyReaderClosed(r *LazyBinaryReader) { } // LoadedBlocks returns a new map of lazy-loaded block IDs and the last time they were used in milliseconds. -// It skips blocks, which weren't in use after minUsedAt. -func (p *ReaderPool) LoadedBlocks(minUsedAt int64) map[ulid.ULID]int64 { +func (p *ReaderPool) LoadedBlocks() map[ulid.ULID]int64 { p.lazyReadersMx.Lock() defer p.lazyReadersMx.Unlock() @@ -305,21 +197,7 @@ func (p *ReaderPool) LoadedBlocks(minUsedAt int64) map[ulid.ULID]int64 { for r := range p.lazyReaders { if r.reader != nil { usedAt := r.usedAt.Load() / int64(time.Millisecond) - if usedAt > minUsedAt { - blocks[r.blockID] = usedAt - } - } - } - - // Add blocks from the pre-shutdown snapshot if those are still "fresh". - if p.lazyReaderEnabled && p.preShutdownLoadedBlocks != nil { - for id, usedAt := range p.preShutdownLoadedBlocks.IndexHeaderLastUsedTime { - if _, ok := blocks[id]; ok { - continue - } - if usedAt > minUsedAt { - blocks[id] = usedAt - } + blocks[r.blockID] = usedAt } } diff --git a/pkg/storegateway/indexheader/snapshotter.go b/pkg/storegateway/indexheader/snapshotter.go new file mode 100644 index 00000000000..2b6f7304366 --- /dev/null +++ b/pkg/storegateway/indexheader/snapshotter.go @@ -0,0 +1,145 @@ +package indexheader + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "os" + "path/filepath" + "time" + + "github.com/go-kit/log" + "github.com/go-kit/log/level" + "github.com/oklog/ulid" + + "github.com/grafana/mimir/pkg/util/atomicfs" +) + +const lazyLoadedHeadersListFileName = "lazy-loaded.json" + +type SnapshotterConfig struct { + Enabled bool + + // Path stores where lazy loaded blocks will be tracked in a single file per tenant + Path string + UserID string +} + +// Snapshotter manages the snapshots of lazy loaded blocks. +type Snapshotter struct { + logger log.Logger + conf SnapshotterConfig + + done chan struct{} +} + +func NewSnapshotter(logger log.Logger, conf SnapshotterConfig) *Snapshotter { + return &Snapshotter{ + logger: logger, + conf: conf, + done: make(chan struct{}), + } +} + +type blocksLoader interface { + LoadedBlocks() map[ulid.ULID]int64 +} + +func (s *Snapshotter) Start(ctx context.Context, l blocksLoader) error { + if !s.conf.Enabled { + return nil + } + + err := s.persistLoadedBlocks(l) + if err != nil { + return fmt.Errorf("persist initial list of lazy-loaded index headers: %w", err) + } + + go func() { + tick := time.NewTicker(time.Minute) + defer tick.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-s.done: + return + case <-tick.C: + if err := s.persistLoadedBlocks(l); err != nil { + level.Warn(s.logger).Log("msg", "failed to persist list of lazy-loaded index headers", "err", err) + } + } + } + }() + + return nil +} + +func (s *Snapshotter) persistLoadedBlocks(l blocksLoader) error { + snapshot := &indexHeadersSnapshot{ + IndexHeaderLastUsedTime: l.LoadedBlocks(), + UserID: s.conf.UserID, + } + data, err := json.Marshal(snapshot) + if err != nil { + return err + } + + // Create temporary path for fsync. + // We don't use temporary folder because the process might not have access to the temporary folder. + tmpPath := filepath.Join(s.conf.Path, "tmp-"+lazyLoadedHeadersListFileName) + // the actual path we want to store the file in + finalPath := filepath.Join(s.conf.Path, lazyLoadedHeadersListFileName) + + return atomicfs.CreateFileAndMove(tmpPath, finalPath, bytes.NewReader(data)) +} + +func (s *Snapshotter) Stop() { + close(s.done) +} + +func (s *Snapshotter) RestoreLoadedBlocks() map[ulid.ULID]int64 { + if !s.conf.Enabled { + return nil + } + + var snapshot indexHeadersSnapshot + fileName := filepath.Join(s.conf.Path, lazyLoadedHeadersListFileName) + err := loadIndexHeadersSnapshot(fileName, &snapshot) + if err != nil { + if os.IsNotExist(err) { + // We didn't find the snapshot. Could be because we crashed after restoring it last time + // or because the previous binary didn't support eager loading. + return nil + } + level.Warn(s.logger).Log( + "msg", "loading the list of index-headers from snapshot file failed; not eagerly loading index-headers for tenant", + "tenant", s.conf.UserID, + "file", fileName, + "err", err, + ) + // We will remove the file only on error. + // Note, in the case such as snapshot loading causing OOM, we will need to + // remove the snapshot and lazy load after server is restarted. + if err := os.Remove(fileName); err != nil { + level.Warn(s.logger).Log("msg", "removing the lazy-loaded index-header snapshot failed", "file", fileName, "err", err) + } + } + return snapshot.IndexHeaderLastUsedTime +} + +type indexHeadersSnapshot struct { + // IndexHeaderLastUsedTime is map of index header ulid.ULID to timestamp in millisecond. + IndexHeaderLastUsedTime map[ulid.ULID]int64 `json:"index_header_last_used_time"` + UserID string `json:"user_id"` +} + +func loadIndexHeadersSnapshot(fileName string, snapshot *indexHeadersSnapshot) error { + snapshotBytes, err := os.ReadFile(fileName) + if err != nil { + return err + } + return json.Unmarshal(snapshotBytes, snapshot) +}