From 8fa0bb0a2e72b408290569afb07f1f3d7f0ee6c5 Mon Sep 17 00:00:00 2001 From: Bartek Plotka Date: Wed, 1 May 2019 02:02:35 +0100 Subject: [PATCH] store: Compose indexCache properly allowing injection for testing purposes. Signed-off-by: Bartek Plotka --- cmd/thanos/store.go | 18 ++++++-- pkg/store/bucket.go | 43 ++++++++---------- pkg/store/bucket_e2e_test.go | 84 ++++++++++++++++++++++++++++++------ pkg/store/bucket_test.go | 2 +- 4 files changed, 107 insertions(+), 40 deletions(-) diff --git a/cmd/thanos/store.go b/cmd/thanos/store.go index 7f77e15412..c135956403 100644 --- a/cmd/thanos/store.go +++ b/cmd/thanos/store.go @@ -12,13 +12,14 @@ import ( "github.com/improbable-eng/thanos/pkg/objstore/client" "github.com/improbable-eng/thanos/pkg/runutil" "github.com/improbable-eng/thanos/pkg/store" + storecache "github.com/improbable-eng/thanos/pkg/store/cache" "github.com/improbable-eng/thanos/pkg/store/storepb" "github.com/oklog/run" - "github.com/opentracing/opentracing-go" + opentracing "github.com/opentracing/opentracing-go" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "google.golang.org/grpc" - "gopkg.in/alecthomas/kingpin.v2" + kingpin "gopkg.in/alecthomas/kingpin.v2" ) // registerStore registers a store command. @@ -120,12 +121,23 @@ func runStore( } }() + // TODO(bwplotka): Add as a flag? + maxItemSizeBytes := indexCacheSizeBytes / 2 + + indexCache, err := storecache.NewIndexCache(logger, reg, storecache.Opts{ + MaxSizeBytes: indexCacheSizeBytes, + MaxItemSizeBytes: maxItemSizeBytes, + }) + if err != nil { + return errors.Wrap(err, "create index cache") + } + bs, err := store.NewBucketStore( logger, reg, bkt, dataDir, - indexCacheSizeBytes, + indexCache, chunkPoolSizeBytes, maxSampleCount, maxConcurrent, diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index 24b80ec5fa..3823d84225 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -26,7 +26,6 @@ import ( "github.com/improbable-eng/thanos/pkg/objstore" "github.com/improbable-eng/thanos/pkg/pool" "github.com/improbable-eng/thanos/pkg/runutil" - storecache "github.com/improbable-eng/thanos/pkg/store/cache" "github.com/improbable-eng/thanos/pkg/store/storepb" "github.com/improbable-eng/thanos/pkg/strutil" "github.com/improbable-eng/thanos/pkg/tracing" @@ -176,6 +175,13 @@ func newBucketStoreMetrics(reg prometheus.Registerer) *bucketStoreMetrics { return &m } +type indexCache interface { + SetPostings(b ulid.ULID, l labels.Label, v []byte) + Postings(b ulid.ULID, l labels.Label) ([]byte, bool) + SetSeries(b ulid.ULID, id uint64, v []byte) + Series(b ulid.ULID, id uint64) ([]byte, bool) +} + // BucketStore implements the store API backed by a bucket. It loads all index // files to local disk. type BucketStore struct { @@ -183,7 +189,7 @@ type BucketStore struct { metrics *bucketStoreMetrics bucket objstore.BucketReader dir string - indexCache *storecache.IndexCache + indexCache indexCache chunkPool *pool.BytesPool // Sets of blocks that have the same labels. They are indexed by a hash over their label set. @@ -211,7 +217,7 @@ func NewBucketStore( reg prometheus.Registerer, bucket objstore.BucketReader, dir string, - indexCacheSizeBytes uint64, + indexCache indexCache, maxChunkPoolBytes uint64, maxSampleCount uint64, maxConcurrent int, @@ -226,17 +232,6 @@ func NewBucketStore( return nil, errors.Errorf("max concurrency value cannot be lower than 0 (got %v)", maxConcurrent) } - // TODO(bwplotka): Add as a flag? - maxItemSizeBytes := indexCacheSizeBytes / 2 - - indexCache, err := storecache.NewIndexCache(logger, reg, storecache.Opts{ - MaxSizeBytes: indexCacheSizeBytes, - MaxItemSizeBytes: maxItemSizeBytes, - }) - if err != nil { - return nil, errors.Wrap(err, "create index cache") - } - chunkPool, err := pool.NewBytesPool(2e5, 50e6, 2, maxChunkPoolBytes) if err != nil { return nil, errors.Wrap(err, "create chunk pool") @@ -1066,7 +1061,7 @@ type bucketBlock struct { bucket objstore.BucketReader meta *metadata.Meta dir string - indexCache *storecache.IndexCache + indexCache indexCache chunkPool *pool.BytesPool indexVersion int @@ -1089,7 +1084,7 @@ func newBucketBlock( bkt objstore.BucketReader, id ulid.ULID, dir string, - indexCache *storecache.IndexCache, + indexCache indexCache, chunkPool *pool.BytesPool, p partitioner, ) (b *bucketBlock, err error) { @@ -1105,7 +1100,7 @@ func newBucketBlock( if err = b.loadMeta(ctx, id); err != nil { return nil, errors.Wrap(err, "load meta") } - if err = b.loadIndexCache(ctx); err != nil { + if err = b.loadIndexCacheFile(ctx); err != nil { return nil, errors.Wrap(err, "load index cache") } // Get object handles for all chunk files. @@ -1149,9 +1144,9 @@ func (b *bucketBlock) loadMeta(ctx context.Context, id ulid.ULID) error { return nil } -func (b *bucketBlock) loadIndexCache(ctx context.Context) (err error) { +func (b *bucketBlock) loadIndexCacheFile(ctx context.Context) (err error) { cachefn := filepath.Join(b.dir, block.IndexCacheFilename) - if err = b.loadIndexCacheFromFile(ctx, cachefn); err == nil { + if err = b.loadIndexCacheFileFromFile(ctx, cachefn); err == nil { return nil } if !os.IsNotExist(errors.Cause(err)) { @@ -1160,7 +1155,7 @@ func (b *bucketBlock) loadIndexCache(ctx context.Context) (err error) { // Try to download index cache file from object store. if err = objstore.DownloadFile(ctx, b.logger, b.bucket, b.indexCacheFilename(), cachefn); err == nil { - return b.loadIndexCacheFromFile(ctx, cachefn) + return b.loadIndexCacheFileFromFile(ctx, cachefn) } if !b.bucket.IsObjNotFoundErr(errors.Cause(err)) { @@ -1184,10 +1179,10 @@ func (b *bucketBlock) loadIndexCache(ctx context.Context) (err error) { return errors.Wrap(err, "write index cache") } - return errors.Wrap(b.loadIndexCacheFromFile(ctx, cachefn), "read index cache") + return errors.Wrap(b.loadIndexCacheFileFromFile(ctx, cachefn), "read index cache") } -func (b *bucketBlock) loadIndexCacheFromFile(ctx context.Context, cache string) (err error) { +func (b *bucketBlock) loadIndexCacheFileFromFile(ctx context.Context, cache string) (err error) { b.indexVersion, b.symbols, b.lvals, b.postings, err = block.ReadIndexCache(b.logger, cache) return err } @@ -1249,13 +1244,13 @@ type bucketIndexReader struct { block *bucketBlock dec *index.Decoder stats *queryStats - cache *storecache.IndexCache + cache indexCache mtx sync.Mutex loadedSeries map[uint64][]byte } -func newBucketIndexReader(ctx context.Context, logger log.Logger, block *bucketBlock, cache *storecache.IndexCache) *bucketIndexReader { +func newBucketIndexReader(ctx context.Context, logger log.Logger, block *bucketBlock, cache indexCache) *bucketIndexReader { r := &bucketIndexReader{ logger: logger, ctx: ctx, diff --git a/pkg/store/bucket_e2e_test.go b/pkg/store/bucket_e2e_test.go index 02ad79781a..c0fc42b97c 100644 --- a/pkg/store/bucket_e2e_test.go +++ b/pkg/store/bucket_e2e_test.go @@ -9,6 +9,8 @@ import ( "testing" "time" + "github.com/oklog/ulid" + "github.com/go-kit/kit/log" "github.com/improbable-eng/thanos/pkg/block" "github.com/improbable-eng/thanos/pkg/block/metadata" @@ -23,12 +25,46 @@ import ( "github.com/prometheus/tsdb/labels" ) +type noopCache struct{} + +func (noopCache) SetPostings(b ulid.ULID, l labels.Label, v []byte) {} +func (noopCache) Postings(b ulid.ULID, l labels.Label) ([]byte, bool) { return nil, false } +func (noopCache) SetSeries(b ulid.ULID, id uint64, v []byte) {} +func (noopCache) Series(b ulid.ULID, id uint64) ([]byte, bool) { return nil, false } + +type swappableCache struct { + ptr indexCache +} + +func (c *swappableCache) SwapWith(ptr2 indexCache) { + c.ptr = ptr2 +} + +func (c *swappableCache) SetPostings(b ulid.ULID, l labels.Label, v []byte) { + c.ptr.SetPostings(b, l, v) +} + +func (c *swappableCache) Postings(b ulid.ULID, l labels.Label) ([]byte, bool) { + return c.ptr.Postings(b, l) +} + +func (c *swappableCache) SetSeries(b ulid.ULID, id uint64, v []byte) { + c.ptr.SetSeries(b, id, v) +} + +func (c *swappableCache) Series(b ulid.ULID, id uint64) ([]byte, bool) { + return c.ptr.Series(b, id) +} + type storeSuite struct { cancel context.CancelFunc wg sync.WaitGroup store *BucketStore minTime, maxTime int64 + cache *swappableCache + + logger log.Logger } func (s *storeSuite) Close() { @@ -53,7 +89,11 @@ func prepareStoreWithTestBlocks(t testing.TB, dir string, bkt objstore.Bucket, m now := start ctx, cancel := context.WithCancel(context.Background()) - s := &storeSuite{cancel: cancel} + s := &storeSuite{ + cancel: cancel, + logger: log.NewLogfmtLogger(os.Stderr), + cache: &swappableCache{}, + } blocks := 0 for i := 0; i < 3; i++ { mint := timestamp.FromTime(now) @@ -78,17 +118,17 @@ func prepareStoreWithTestBlocks(t testing.TB, dir string, bkt objstore.Bucket, m meta, err := metadata.Read(dir2) testutil.Ok(t, err) meta.Thanos.Labels = map[string]string{"ext2": "value2"} - testutil.Ok(t, metadata.Write(log.NewNopLogger(), dir2, meta)) + testutil.Ok(t, metadata.Write(s.logger, dir2, meta)) - testutil.Ok(t, block.Upload(ctx, log.NewNopLogger(), bkt, dir1)) - testutil.Ok(t, block.Upload(ctx, log.NewNopLogger(), bkt, dir2)) + testutil.Ok(t, block.Upload(ctx, s.logger, bkt, dir1)) + testutil.Ok(t, block.Upload(ctx, s.logger, bkt, dir2)) blocks += 2 testutil.Ok(t, os.RemoveAll(dir1)) testutil.Ok(t, os.RemoveAll(dir2)) } - store, err := NewBucketStore(log.NewLogfmtLogger(os.Stderr), nil, bkt, dir, 100, 0, maxSampleCount, 20, false, 20) + store, err := NewBucketStore(s.logger, nil, bkt, dir, s.cache, 0, maxSampleCount, 20, false, 20) testutil.Ok(t, err) s.store = store @@ -310,13 +350,6 @@ func testBucketStore_e2e(t testing.TB, ctx context.Context, s *storeSuite) { } { t.Log("Run ", i) - // Always clean cache before each test. - s.store.indexCache, err = storecache.NewIndexCache(log.NewNopLogger(), nil, storecache.Opts{ - MaxSizeBytes: 100, - MaxItemSizeBytes: 100, - }) - testutil.Ok(t, err) - srv := newStoreSeriesServer(ctx) testutil.Ok(t, s.store.Series(tcase.req, srv)) @@ -341,6 +374,26 @@ func TestBucketStore_e2e(t *testing.T) { s := prepareStoreWithTestBlocks(t, dir, bkt, false, 0) defer s.Close() + t.Log("Test with no index cache") + s.cache.SwapWith(noopCache{}) + testBucketStore_e2e(t, ctx, s) + + t.Log("Test with large, sufficient index cache") + indexCache, err := storecache.NewIndexCache(s.logger, nil, storecache.Opts{ + MaxItemSizeBytes: 1e5, + MaxSizeBytes: 2e5, + }) + testutil.Ok(t, err) + s.cache.SwapWith(indexCache) + testBucketStore_e2e(t, ctx, s) + + t.Log("Test with small index cache") + indexCache2, err := storecache.NewIndexCache(s.logger, nil, storecache.Opts{ + MaxItemSizeBytes: 50, + MaxSizeBytes: 100, + }) + testutil.Ok(t, err) + s.cache.SwapWith(indexCache2) testBucketStore_e2e(t, ctx, s) }) } @@ -370,6 +423,13 @@ func TestBucketStore_ManyParts_e2e(t *testing.T) { s := prepareStoreWithTestBlocks(t, dir, bkt, true, 0) defer s.Close() + indexCache, err := storecache.NewIndexCache(s.logger, nil, storecache.Opts{ + MaxItemSizeBytes: 1e5, + MaxSizeBytes: 2e5, + }) + testutil.Ok(t, err) + s.cache.SwapWith(indexCache) + testBucketStore_e2e(t, ctx, s) }) } diff --git a/pkg/store/bucket_test.go b/pkg/store/bucket_test.go index 18f953c298..4e9e581014 100644 --- a/pkg/store/bucket_test.go +++ b/pkg/store/bucket_test.go @@ -283,7 +283,7 @@ func TestBucketStore_Info(t *testing.T) { dir, err := ioutil.TempDir("", "prometheus-test") testutil.Ok(t, err) - bucketStore, err := NewBucketStore(nil, nil, nil, dir, 2e5, 2e5, 0, 0, false, 20) + bucketStore, err := NewBucketStore(nil, nil, nil, dir, noopCache{}, 2e5, 0, 0, false, 20) testutil.Ok(t, err) resp, err := bucketStore.Info(ctx, &storepb.InfoRequest{})