Skip to content

Commit

Permalink
indexheader: move lazy-loaded snapshotting into a separate component
Browse files Browse the repository at this point in the history
Signed-off-by: Vladimir Varankin <vladimir.varankin@grafana.com>
  • Loading branch information
narqo committed Jun 7, 2024
1 parent 543725f commit 486c033
Show file tree
Hide file tree
Showing 3 changed files with 180 additions and 148 deletions.
37 changes: 23 additions & 14 deletions pkg/storegateway/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,15 +81,16 @@ type BucketStoreStats struct {
// This makes them smaller, but takes extra CPU and memory.
// When used with in-memory cache, memory usage should decrease overall, thanks to postings being smaller.
type BucketStore struct {
userID string
logger log.Logger
metrics *BucketStoreMetrics
bkt objstore.InstrumentedBucketReader
fetcher block.MetadataFetcher
dir string
indexCache indexcache.IndexCache
indexReaderPool *indexheader.ReaderPool
seriesHashCache *hashcache.SeriesHashCache
userID string
logger log.Logger
metrics *BucketStoreMetrics
bkt objstore.InstrumentedBucketReader
fetcher block.MetadataFetcher
dir string
indexCache indexcache.IndexCache
indexReaderPool *indexheader.ReaderPool
indexHeadersSnapshotter *indexheader.Snapshotter
seriesHashCache *hashcache.SeriesHashCache

// Sets of blocks that have the same labels. They are indexed by a hash over their label set.
blocksMx sync.RWMutex
Expand Down Expand Up @@ -238,12 +239,15 @@ func NewBucketStore(
option(s)
}

lazyLoadedSnapshotConfig := indexheader.LazyLoadedHeadersSnapshotConfig{
Path: dir,
UserID: userID,
snapConfig := indexheader.SnapshotterConfig{
Enabled: bucketStoreConfig.IndexHeader.EagerLoadingStartupEnabled,
Path: dir,
UserID: userID,
}
// Depend on the options
s.indexReaderPool = indexheader.NewReaderPool(s.logger, bucketStoreConfig.IndexHeader, s.lazyLoadingGate, metrics.indexHeaderReaderMetrics, lazyLoadedSnapshotConfig)
s.indexHeadersSnapshotter = indexheader.NewSnapshotter(s.logger, snapConfig)

lazyLoadedBlocks := s.indexHeadersSnapshotter.RestoreLoadedBlocks()
s.indexReaderPool = indexheader.NewReaderPool(s.logger, bucketStoreConfig.IndexHeader, s.lazyLoadingGate, metrics.indexHeaderReaderMetrics, lazyLoadedBlocks)

if err := os.MkdirAll(dir, 0750); err != nil {
return nil, errors.Wrap(err, "create dir")
Expand All @@ -257,6 +261,7 @@ func (s *BucketStore) RemoveBlocksAndClose() error {
err := s.removeAllBlocks()

// Release other resources even if it failed to close some blocks.
s.indexHeadersSnapshotter.Stop()
s.indexReaderPool.Close()

return err
Expand Down Expand Up @@ -361,6 +366,10 @@ func (s *BucketStore) InitialSync(ctx context.Context) error {
return errors.Wrap(err, "sync block")
}

if err := s.indexHeadersSnapshotter.Start(ctx, s.indexReaderPool); err != nil {
return errors.Wrap(err, "start index headers snapshotter")
}

fis, err := os.ReadDir(s.dir)
if err != nil {
return errors.Wrap(err, "read dir")
Expand Down
146 changes: 12 additions & 134 deletions pkg/storegateway/indexheader/reader_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,7 @@
package indexheader

import (
"bytes"
"context"
"encoding/json"
"os"
"path/filepath"
"sync"
"time"

Expand All @@ -21,12 +17,8 @@ import (
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/thanos-io/objstore"

"github.com/grafana/mimir/pkg/util/atomicfs"
)

const lazyLoadedHeadersListFileName = "lazy-loaded.json"

// ReaderPoolMetrics holds metrics tracked by ReaderPool.
type ReaderPoolMetrics struct {
lazyReader *LazyBinaryReaderMetrics
Expand Down Expand Up @@ -58,52 +50,15 @@ type ReaderPool struct {
close chan struct{}

// Keep track of all readers managed by the pool.
lazyReadersMx sync.Mutex
lazyReaders map[*LazyBinaryReader]struct{}
preShutdownLoadedBlocks *lazyLoadedHeadersSnapshot
}

// LazyLoadedHeadersSnapshotConfig stores information needed to track lazy loaded index headers.
type LazyLoadedHeadersSnapshotConfig struct {
// Path stores where lazy loaded blocks will be tracked in a single file per tenant
Path string
UserID string
}

type lazyLoadedHeadersSnapshot struct {
// IndexHeaderLastUsedTime is map of index header ulid.ULID to timestamp in millisecond.
IndexHeaderLastUsedTime map[ulid.ULID]int64 `json:"index_header_last_used_time"`
UserID string `json:"user_id"`
lazyReadersMx sync.Mutex
lazyReaders map[*LazyBinaryReader]struct{}
// Snapshot of blocks loaded by the pool before the last shutdown.
preShutdownLoadedBlocks map[ulid.ULID]int64
}

// persist atomically writes this snapshot to persistDir.
func (l lazyLoadedHeadersSnapshot) persist(persistDir string) error {
// Create temporary path for fsync.
// We don't use temporary folder because the process might not have access to the temporary folder.
tmpPath := filepath.Join(persistDir, "tmp-"+lazyLoadedHeadersListFileName)
// the actual path we want to store the file in
finalPath := filepath.Join(persistDir, lazyLoadedHeadersListFileName)

data, err := json.Marshal(l)
if err != nil {
return err
}

return atomicfs.CreateFileAndMove(tmpPath, finalPath, bytes.NewReader(data))
}

// NewReaderPool makes a new ReaderPool. If lazy-loading is enabled, NewReaderPool also starts a background task for unloading idle Readers and persisting a list of loaded Readers to disk.
func NewReaderPool(logger log.Logger, indexHeaderConfig Config, lazyLoadingGate gate.Gate, metrics *ReaderPoolMetrics, lazyLoadedSnapshotConfig LazyLoadedHeadersSnapshotConfig) *ReaderPool {
var snapshot *lazyLoadedHeadersSnapshot

// Eager loading can only be enabled if lazy-loading is enabled.
eagerLoadingEnabled := indexHeaderConfig.LazyLoadingEnabled && indexHeaderConfig.EagerLoadingStartupEnabled

if eagerLoadingEnabled {
snapshot = tryRestoreLazyLoadedHeadersSnapshot(logger, lazyLoadedSnapshotConfig)
}

p := newReaderPool(logger, indexHeaderConfig, lazyLoadingGate, metrics, snapshot)
// NewReaderPool makes a new ReaderPool. If lazy-loading is enabled, NewReaderPool also starts a background task for unloading idle Readers.
func NewReaderPool(logger log.Logger, indexHeaderConfig Config, lazyLoadingGate gate.Gate, metrics *ReaderPoolMetrics, loadedBlocks map[ulid.ULID]int64) *ReaderPool {
p := newReaderPool(logger, indexHeaderConfig, lazyLoadingGate, metrics, loadedBlocks)

// Start a goroutine to close idle readers (only if required).
if p.lazyReaderEnabled && p.lazyReaderIdleTimeout > 0 {
Expand All @@ -112,35 +67,12 @@ func NewReaderPool(logger log.Logger, indexHeaderConfig Config, lazyLoadingGate
go func() {
tickerIdleReader := time.NewTicker(checkFreq)
defer tickerIdleReader.Stop()

var lazyLoadC <-chan time.Time

if eagerLoadingEnabled {
tickerLazyLoadPersist := time.NewTicker(time.Minute)
defer tickerLazyLoadPersist.Stop()

lazyLoadC = tickerLazyLoadPersist.C
}

for {
select {
case <-p.close:
return
case <-tickerIdleReader.C:
p.closeIdleReaders()
case t := <-lazyLoadC:
// minUsedAt is the threshold for how recently used should the block be to stay in the snapshot;
// we add an extra couple of minutes to make sure the pool closes the idle readers.
dur := p.lazyReaderIdleTimeout + (10 * time.Minute)
minUsedAt := t.Truncate(time.Minute).Add(-dur).UnixMilli()
snapshot := lazyLoadedHeadersSnapshot{
IndexHeaderLastUsedTime: p.LoadedBlocks(minUsedAt),
UserID: lazyLoadedSnapshotConfig.UserID,
}

if err := snapshot.persist(lazyLoadedSnapshotConfig.Path); err != nil {
level.Warn(p.logger).Log("msg", "failed to persist list of lazy-loaded index headers", "err", err)
}
}
}

Expand All @@ -150,59 +82,20 @@ func NewReaderPool(logger log.Logger, indexHeaderConfig Config, lazyLoadingGate
return p
}

func tryRestoreLazyLoadedHeadersSnapshot(logger log.Logger, cfg LazyLoadedHeadersSnapshotConfig) *lazyLoadedHeadersSnapshot {
fileName := filepath.Join(cfg.Path, lazyLoadedHeadersListFileName)
snapshot, err := loadLazyLoadedHeadersSnapshot(fileName)
if os.IsNotExist(err) {
// We didn't find the snapshot. Could be because we crashed after restoring it last time
// or because the previous binary didn't support eager loading.
// Either way, we can continue without eagerly loading blocks.
// Since the file wasn't found, we also won't try to remove it.
return nil
}
if err != nil {
level.Warn(logger).Log(
"msg", "loading the list of index-headers from snapshot file failed; not eagerly loading index-headers for tenant",
"file", fileName,
"err", err,
)
}
// We will remove the file regardless whether err is nil or not nil.
// In the case such as snapshot loading causing OOM, we will still
// remove the snapshot and lazy load after server is restarted.
if err := os.Remove(fileName); err != nil {
level.Warn(logger).Log("msg", "removing the lazy-loaded index-header snapshot failed", "file", fileName, "err", err)
}
return snapshot
}

// newReaderPool makes a new ReaderPool.
func newReaderPool(logger log.Logger, indexHeaderConfig Config, lazyLoadingGate gate.Gate, metrics *ReaderPoolMetrics, lazyLoadedHeadersSnapshot *lazyLoadedHeadersSnapshot) *ReaderPool {
func newReaderPool(logger log.Logger, indexHeaderConfig Config, lazyLoadingGate gate.Gate, metrics *ReaderPoolMetrics, loadedBlocks map[ulid.ULID]int64) *ReaderPool {
return &ReaderPool{
logger: logger,
metrics: metrics,
lazyReaderEnabled: indexHeaderConfig.LazyLoadingEnabled,
lazyReaderIdleTimeout: indexHeaderConfig.LazyLoadingIdleTimeout,
lazyReaders: make(map[*LazyBinaryReader]struct{}),
close: make(chan struct{}),
preShutdownLoadedBlocks: lazyLoadedHeadersSnapshot,
preShutdownLoadedBlocks: loadedBlocks,
lazyLoadingGate: lazyLoadingGate,
}
}

func loadLazyLoadedHeadersSnapshot(fileName string) (*lazyLoadedHeadersSnapshot, error) {
snapshotBytes, err := os.ReadFile(fileName)
if err != nil {
return nil, err
}
snapshot := &lazyLoadedHeadersSnapshot{}
err = json.Unmarshal(snapshotBytes, snapshot)
if err != nil {
return nil, err
}
return snapshot, nil
}

// NewBinaryReader creates and returns a new binary reader. If the pool has been configured
// with lazy reader enabled, this function will return a lazy reader. The returned lazy reader
// is tracked by the pool and automatically closed once the idle timeout expires.
Expand All @@ -224,7 +117,7 @@ func (p *ReaderPool) NewBinaryReader(ctx context.Context, logger log.Logger, bkt
// we only try to eager load during initialSync
if initialSync && p.preShutdownLoadedBlocks != nil {
// we only eager load if we have preShutdownLoadedBlocks for the given block id
if p.preShutdownLoadedBlocks.IndexHeaderLastUsedTime[id] > 0 {
if p.preShutdownLoadedBlocks[id] > 0 {
lazyBinaryReader.EagerLoad()
}
}
Expand Down Expand Up @@ -296,30 +189,15 @@ func (p *ReaderPool) onLazyReaderClosed(r *LazyBinaryReader) {
}

// LoadedBlocks returns a new map of lazy-loaded block IDs and the last time they were used in milliseconds.
// It skips blocks, which weren't in use after minUsedAt.
func (p *ReaderPool) LoadedBlocks(minUsedAt int64) map[ulid.ULID]int64 {
func (p *ReaderPool) LoadedBlocks() map[ulid.ULID]int64 {
p.lazyReadersMx.Lock()
defer p.lazyReadersMx.Unlock()

blocks := make(map[ulid.ULID]int64, len(p.lazyReaders))
for r := range p.lazyReaders {
if r.reader != nil {
usedAt := r.usedAt.Load() / int64(time.Millisecond)
if usedAt > minUsedAt {
blocks[r.blockID] = usedAt
}
}
}

// Add blocks from the pre-shutdown snapshot if those are still "fresh".
if p.lazyReaderEnabled && p.preShutdownLoadedBlocks != nil {
for id, usedAt := range p.preShutdownLoadedBlocks.IndexHeaderLastUsedTime {
if _, ok := blocks[id]; ok {
continue
}
if usedAt > minUsedAt {
blocks[id] = usedAt
}
blocks[r.blockID] = usedAt
}
}

Expand Down
Loading

0 comments on commit 486c033

Please sign in to comment.