Skip to content

Commit

Permalink
store: Compose indexCache properly allowing injection for testing pur…
Browse files Browse the repository at this point in the history
…poses. (#1098)

Signed-off-by: Bartek Plotka <bwplotka@gmail.com>
  • Loading branch information
bwplotka authored May 1, 2019
1 parent 8498d91 commit 3b6a73b
Show file tree
Hide file tree
Showing 4 changed files with 107 additions and 40 deletions.
18 changes: 15 additions & 3 deletions cmd/thanos/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,14 @@ import (
"github.com/improbable-eng/thanos/pkg/objstore/client"
"github.com/improbable-eng/thanos/pkg/runutil"
"github.com/improbable-eng/thanos/pkg/store"
storecache "github.com/improbable-eng/thanos/pkg/store/cache"
"github.com/improbable-eng/thanos/pkg/store/storepb"
"github.com/oklog/run"
"github.com/opentracing/opentracing-go"
opentracing "github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"google.golang.org/grpc"
"gopkg.in/alecthomas/kingpin.v2"
kingpin "gopkg.in/alecthomas/kingpin.v2"
)

// registerStore registers a store command.
Expand Down Expand Up @@ -120,12 +121,23 @@ func runStore(
}
}()

// TODO(bwplotka): Add as a flag?
maxItemSizeBytes := indexCacheSizeBytes / 2

indexCache, err := storecache.NewIndexCache(logger, reg, storecache.Opts{
MaxSizeBytes: indexCacheSizeBytes,
MaxItemSizeBytes: maxItemSizeBytes,
})
if err != nil {
return errors.Wrap(err, "create index cache")
}

bs, err := store.NewBucketStore(
logger,
reg,
bkt,
dataDir,
indexCacheSizeBytes,
indexCache,
chunkPoolSizeBytes,
maxSampleCount,
maxConcurrent,
Expand Down
43 changes: 19 additions & 24 deletions pkg/store/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"github.com/improbable-eng/thanos/pkg/objstore"
"github.com/improbable-eng/thanos/pkg/pool"
"github.com/improbable-eng/thanos/pkg/runutil"
storecache "github.com/improbable-eng/thanos/pkg/store/cache"
"github.com/improbable-eng/thanos/pkg/store/storepb"
"github.com/improbable-eng/thanos/pkg/strutil"
"github.com/improbable-eng/thanos/pkg/tracing"
Expand Down Expand Up @@ -176,14 +175,21 @@ func newBucketStoreMetrics(reg prometheus.Registerer) *bucketStoreMetrics {
return &m
}

type indexCache interface {
SetPostings(b ulid.ULID, l labels.Label, v []byte)
Postings(b ulid.ULID, l labels.Label) ([]byte, bool)
SetSeries(b ulid.ULID, id uint64, v []byte)
Series(b ulid.ULID, id uint64) ([]byte, bool)
}

// BucketStore implements the store API backed by a bucket. It loads all index
// files to local disk.
type BucketStore struct {
logger log.Logger
metrics *bucketStoreMetrics
bucket objstore.BucketReader
dir string
indexCache *storecache.IndexCache
indexCache indexCache
chunkPool *pool.BytesPool

// Sets of blocks that have the same labels. They are indexed by a hash over their label set.
Expand Down Expand Up @@ -211,7 +217,7 @@ func NewBucketStore(
reg prometheus.Registerer,
bucket objstore.BucketReader,
dir string,
indexCacheSizeBytes uint64,
indexCache indexCache,
maxChunkPoolBytes uint64,
maxSampleCount uint64,
maxConcurrent int,
Expand All @@ -226,17 +232,6 @@ func NewBucketStore(
return nil, errors.Errorf("max concurrency value cannot be lower than 0 (got %v)", maxConcurrent)
}

// TODO(bwplotka): Add as a flag?
maxItemSizeBytes := indexCacheSizeBytes / 2

indexCache, err := storecache.NewIndexCache(logger, reg, storecache.Opts{
MaxSizeBytes: indexCacheSizeBytes,
MaxItemSizeBytes: maxItemSizeBytes,
})
if err != nil {
return nil, errors.Wrap(err, "create index cache")
}

chunkPool, err := pool.NewBytesPool(2e5, 50e6, 2, maxChunkPoolBytes)
if err != nil {
return nil, errors.Wrap(err, "create chunk pool")
Expand Down Expand Up @@ -1066,7 +1061,7 @@ type bucketBlock struct {
bucket objstore.BucketReader
meta *metadata.Meta
dir string
indexCache *storecache.IndexCache
indexCache indexCache
chunkPool *pool.BytesPool

indexVersion int
Expand All @@ -1089,7 +1084,7 @@ func newBucketBlock(
bkt objstore.BucketReader,
id ulid.ULID,
dir string,
indexCache *storecache.IndexCache,
indexCache indexCache,
chunkPool *pool.BytesPool,
p partitioner,
) (b *bucketBlock, err error) {
Expand All @@ -1105,7 +1100,7 @@ func newBucketBlock(
if err = b.loadMeta(ctx, id); err != nil {
return nil, errors.Wrap(err, "load meta")
}
if err = b.loadIndexCache(ctx); err != nil {
if err = b.loadIndexCacheFile(ctx); err != nil {
return nil, errors.Wrap(err, "load index cache")
}
// Get object handles for all chunk files.
Expand Down Expand Up @@ -1149,9 +1144,9 @@ func (b *bucketBlock) loadMeta(ctx context.Context, id ulid.ULID) error {
return nil
}

func (b *bucketBlock) loadIndexCache(ctx context.Context) (err error) {
func (b *bucketBlock) loadIndexCacheFile(ctx context.Context) (err error) {
cachefn := filepath.Join(b.dir, block.IndexCacheFilename)
if err = b.loadIndexCacheFromFile(ctx, cachefn); err == nil {
if err = b.loadIndexCacheFileFromFile(ctx, cachefn); err == nil {
return nil
}
if !os.IsNotExist(errors.Cause(err)) {
Expand All @@ -1160,7 +1155,7 @@ func (b *bucketBlock) loadIndexCache(ctx context.Context) (err error) {

// Try to download index cache file from object store.
if err = objstore.DownloadFile(ctx, b.logger, b.bucket, b.indexCacheFilename(), cachefn); err == nil {
return b.loadIndexCacheFromFile(ctx, cachefn)
return b.loadIndexCacheFileFromFile(ctx, cachefn)
}

if !b.bucket.IsObjNotFoundErr(errors.Cause(err)) {
Expand All @@ -1184,10 +1179,10 @@ func (b *bucketBlock) loadIndexCache(ctx context.Context) (err error) {
return errors.Wrap(err, "write index cache")
}

return errors.Wrap(b.loadIndexCacheFromFile(ctx, cachefn), "read index cache")
return errors.Wrap(b.loadIndexCacheFileFromFile(ctx, cachefn), "read index cache")
}

func (b *bucketBlock) loadIndexCacheFromFile(ctx context.Context, cache string) (err error) {
func (b *bucketBlock) loadIndexCacheFileFromFile(ctx context.Context, cache string) (err error) {
b.indexVersion, b.symbols, b.lvals, b.postings, err = block.ReadIndexCache(b.logger, cache)
return err
}
Expand Down Expand Up @@ -1249,13 +1244,13 @@ type bucketIndexReader struct {
block *bucketBlock
dec *index.Decoder
stats *queryStats
cache *storecache.IndexCache
cache indexCache

mtx sync.Mutex
loadedSeries map[uint64][]byte
}

func newBucketIndexReader(ctx context.Context, logger log.Logger, block *bucketBlock, cache *storecache.IndexCache) *bucketIndexReader {
func newBucketIndexReader(ctx context.Context, logger log.Logger, block *bucketBlock, cache indexCache) *bucketIndexReader {
r := &bucketIndexReader{
logger: logger,
ctx: ctx,
Expand Down
84 changes: 72 additions & 12 deletions pkg/store/bucket_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (
"testing"
"time"

"github.com/oklog/ulid"

"github.com/go-kit/kit/log"
"github.com/improbable-eng/thanos/pkg/block"
"github.com/improbable-eng/thanos/pkg/block/metadata"
Expand All @@ -23,12 +25,46 @@ import (
"github.com/prometheus/tsdb/labels"
)

type noopCache struct{}

func (noopCache) SetPostings(b ulid.ULID, l labels.Label, v []byte) {}
func (noopCache) Postings(b ulid.ULID, l labels.Label) ([]byte, bool) { return nil, false }
func (noopCache) SetSeries(b ulid.ULID, id uint64, v []byte) {}
func (noopCache) Series(b ulid.ULID, id uint64) ([]byte, bool) { return nil, false }

type swappableCache struct {
ptr indexCache
}

func (c *swappableCache) SwapWith(ptr2 indexCache) {
c.ptr = ptr2
}

func (c *swappableCache) SetPostings(b ulid.ULID, l labels.Label, v []byte) {
c.ptr.SetPostings(b, l, v)
}

func (c *swappableCache) Postings(b ulid.ULID, l labels.Label) ([]byte, bool) {
return c.ptr.Postings(b, l)
}

func (c *swappableCache) SetSeries(b ulid.ULID, id uint64, v []byte) {
c.ptr.SetSeries(b, id, v)
}

func (c *swappableCache) Series(b ulid.ULID, id uint64) ([]byte, bool) {
return c.ptr.Series(b, id)
}

type storeSuite struct {
cancel context.CancelFunc
wg sync.WaitGroup

store *BucketStore
minTime, maxTime int64
cache *swappableCache

logger log.Logger
}

func (s *storeSuite) Close() {
Expand All @@ -53,7 +89,11 @@ func prepareStoreWithTestBlocks(t testing.TB, dir string, bkt objstore.Bucket, m
now := start

ctx, cancel := context.WithCancel(context.Background())
s := &storeSuite{cancel: cancel}
s := &storeSuite{
cancel: cancel,
logger: log.NewLogfmtLogger(os.Stderr),
cache: &swappableCache{},
}
blocks := 0
for i := 0; i < 3; i++ {
mint := timestamp.FromTime(now)
Expand All @@ -78,17 +118,17 @@ func prepareStoreWithTestBlocks(t testing.TB, dir string, bkt objstore.Bucket, m
meta, err := metadata.Read(dir2)
testutil.Ok(t, err)
meta.Thanos.Labels = map[string]string{"ext2": "value2"}
testutil.Ok(t, metadata.Write(log.NewNopLogger(), dir2, meta))
testutil.Ok(t, metadata.Write(s.logger, dir2, meta))

testutil.Ok(t, block.Upload(ctx, log.NewNopLogger(), bkt, dir1))
testutil.Ok(t, block.Upload(ctx, log.NewNopLogger(), bkt, dir2))
testutil.Ok(t, block.Upload(ctx, s.logger, bkt, dir1))
testutil.Ok(t, block.Upload(ctx, s.logger, bkt, dir2))
blocks += 2

testutil.Ok(t, os.RemoveAll(dir1))
testutil.Ok(t, os.RemoveAll(dir2))
}

store, err := NewBucketStore(log.NewLogfmtLogger(os.Stderr), nil, bkt, dir, 100, 0, maxSampleCount, 20, false, 20)
store, err := NewBucketStore(s.logger, nil, bkt, dir, s.cache, 0, maxSampleCount, 20, false, 20)
testutil.Ok(t, err)

s.store = store
Expand Down Expand Up @@ -310,13 +350,6 @@ func testBucketStore_e2e(t testing.TB, ctx context.Context, s *storeSuite) {
} {
t.Log("Run ", i)

// Always clean cache before each test.
s.store.indexCache, err = storecache.NewIndexCache(log.NewNopLogger(), nil, storecache.Opts{
MaxSizeBytes: 100,
MaxItemSizeBytes: 100,
})
testutil.Ok(t, err)

srv := newStoreSeriesServer(ctx)

testutil.Ok(t, s.store.Series(tcase.req, srv))
Expand All @@ -341,6 +374,26 @@ func TestBucketStore_e2e(t *testing.T) {
s := prepareStoreWithTestBlocks(t, dir, bkt, false, 0)
defer s.Close()

t.Log("Test with no index cache")
s.cache.SwapWith(noopCache{})
testBucketStore_e2e(t, ctx, s)

t.Log("Test with large, sufficient index cache")
indexCache, err := storecache.NewIndexCache(s.logger, nil, storecache.Opts{
MaxItemSizeBytes: 1e5,
MaxSizeBytes: 2e5,
})
testutil.Ok(t, err)
s.cache.SwapWith(indexCache)
testBucketStore_e2e(t, ctx, s)

t.Log("Test with small index cache")
indexCache2, err := storecache.NewIndexCache(s.logger, nil, storecache.Opts{
MaxItemSizeBytes: 50,
MaxSizeBytes: 100,
})
testutil.Ok(t, err)
s.cache.SwapWith(indexCache2)
testBucketStore_e2e(t, ctx, s)
})
}
Expand Down Expand Up @@ -370,6 +423,13 @@ func TestBucketStore_ManyParts_e2e(t *testing.T) {
s := prepareStoreWithTestBlocks(t, dir, bkt, true, 0)
defer s.Close()

indexCache, err := storecache.NewIndexCache(s.logger, nil, storecache.Opts{
MaxItemSizeBytes: 1e5,
MaxSizeBytes: 2e5,
})
testutil.Ok(t, err)
s.cache.SwapWith(indexCache)

testBucketStore_e2e(t, ctx, s)
})
}
2 changes: 1 addition & 1 deletion pkg/store/bucket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ func TestBucketStore_Info(t *testing.T) {
dir, err := ioutil.TempDir("", "prometheus-test")
testutil.Ok(t, err)

bucketStore, err := NewBucketStore(nil, nil, nil, dir, 2e5, 2e5, 0, 0, false, 20)
bucketStore, err := NewBucketStore(nil, nil, nil, dir, noopCache{}, 2e5, 0, 0, false, 20)
testutil.Ok(t, err)

resp, err := bucketStore.Info(ctx, &storepb.InfoRequest{})
Expand Down

0 comments on commit 3b6a73b

Please sign in to comment.