Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

store: Compose indexCache properly allowing injection for testing purposes. #1098

Merged
merged 1 commit into from
May 1, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 15 additions & 3 deletions cmd/thanos/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,14 @@ import (
"github.com/improbable-eng/thanos/pkg/objstore/client"
"github.com/improbable-eng/thanos/pkg/runutil"
"github.com/improbable-eng/thanos/pkg/store"
storecache "github.com/improbable-eng/thanos/pkg/store/cache"
"github.com/improbable-eng/thanos/pkg/store/storepb"
"github.com/oklog/run"
"github.com/opentracing/opentracing-go"
opentracing "github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"google.golang.org/grpc"
"gopkg.in/alecthomas/kingpin.v2"
kingpin "gopkg.in/alecthomas/kingpin.v2"
)

// registerStore registers a store command.
Expand Down Expand Up @@ -120,12 +121,23 @@ func runStore(
}
}()

// TODO(bwplotka): Add as a flag?
maxItemSizeBytes := indexCacheSizeBytes / 2

indexCache, err := storecache.NewIndexCache(logger, reg, storecache.Opts{
MaxSizeBytes: indexCacheSizeBytes,
MaxItemSizeBytes: maxItemSizeBytes,
})
if err != nil {
return errors.Wrap(err, "create index cache")
}

bs, err := store.NewBucketStore(
logger,
reg,
bkt,
dataDir,
indexCacheSizeBytes,
indexCache,
chunkPoolSizeBytes,
maxSampleCount,
maxConcurrent,
Expand Down
43 changes: 19 additions & 24 deletions pkg/store/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"github.com/improbable-eng/thanos/pkg/objstore"
"github.com/improbable-eng/thanos/pkg/pool"
"github.com/improbable-eng/thanos/pkg/runutil"
storecache "github.com/improbable-eng/thanos/pkg/store/cache"
"github.com/improbable-eng/thanos/pkg/store/storepb"
"github.com/improbable-eng/thanos/pkg/strutil"
"github.com/improbable-eng/thanos/pkg/tracing"
Expand Down Expand Up @@ -176,14 +175,21 @@ func newBucketStoreMetrics(reg prometheus.Registerer) *bucketStoreMetrics {
return &m
}

type indexCache interface {
SetPostings(b ulid.ULID, l labels.Label, v []byte)
Postings(b ulid.ULID, l labels.Label) ([]byte, bool)
SetSeries(b ulid.ULID, id uint64, v []byte)
Series(b ulid.ULID, id uint64) ([]byte, bool)
}

// BucketStore implements the store API backed by a bucket. It loads all index
// files to local disk.
type BucketStore struct {
logger log.Logger
metrics *bucketStoreMetrics
bucket objstore.BucketReader
dir string
indexCache *storecache.IndexCache
indexCache indexCache
chunkPool *pool.BytesPool

// Sets of blocks that have the same labels. They are indexed by a hash over their label set.
Expand Down Expand Up @@ -211,7 +217,7 @@ func NewBucketStore(
reg prometheus.Registerer,
bucket objstore.BucketReader,
dir string,
indexCacheSizeBytes uint64,
indexCache indexCache,
maxChunkPoolBytes uint64,
maxSampleCount uint64,
maxConcurrent int,
Expand All @@ -226,17 +232,6 @@ func NewBucketStore(
return nil, errors.Errorf("max concurrency value cannot be lower than 0 (got %v)", maxConcurrent)
}

// TODO(bwplotka): Add as a flag?
maxItemSizeBytes := indexCacheSizeBytes / 2

indexCache, err := storecache.NewIndexCache(logger, reg, storecache.Opts{
MaxSizeBytes: indexCacheSizeBytes,
MaxItemSizeBytes: maxItemSizeBytes,
})
if err != nil {
return nil, errors.Wrap(err, "create index cache")
}

chunkPool, err := pool.NewBytesPool(2e5, 50e6, 2, maxChunkPoolBytes)
if err != nil {
return nil, errors.Wrap(err, "create chunk pool")
Expand Down Expand Up @@ -1066,7 +1061,7 @@ type bucketBlock struct {
bucket objstore.BucketReader
meta *metadata.Meta
dir string
indexCache *storecache.IndexCache
indexCache indexCache
chunkPool *pool.BytesPool

indexVersion int
Expand All @@ -1089,7 +1084,7 @@ func newBucketBlock(
bkt objstore.BucketReader,
id ulid.ULID,
dir string,
indexCache *storecache.IndexCache,
indexCache indexCache,
chunkPool *pool.BytesPool,
p partitioner,
) (b *bucketBlock, err error) {
Expand All @@ -1105,7 +1100,7 @@ func newBucketBlock(
if err = b.loadMeta(ctx, id); err != nil {
return nil, errors.Wrap(err, "load meta")
}
if err = b.loadIndexCache(ctx); err != nil {
if err = b.loadIndexCacheFile(ctx); err != nil {
return nil, errors.Wrap(err, "load index cache")
}
// Get object handles for all chunk files.
Expand Down Expand Up @@ -1149,9 +1144,9 @@ func (b *bucketBlock) loadMeta(ctx context.Context, id ulid.ULID) error {
return nil
}

func (b *bucketBlock) loadIndexCache(ctx context.Context) (err error) {
func (b *bucketBlock) loadIndexCacheFile(ctx context.Context) (err error) {
cachefn := filepath.Join(b.dir, block.IndexCacheFilename)
if err = b.loadIndexCacheFromFile(ctx, cachefn); err == nil {
if err = b.loadIndexCacheFileFromFile(ctx, cachefn); err == nil {
return nil
}
if !os.IsNotExist(errors.Cause(err)) {
Expand All @@ -1160,7 +1155,7 @@ func (b *bucketBlock) loadIndexCache(ctx context.Context) (err error) {

// Try to download index cache file from object store.
if err = objstore.DownloadFile(ctx, b.logger, b.bucket, b.indexCacheFilename(), cachefn); err == nil {
return b.loadIndexCacheFromFile(ctx, cachefn)
return b.loadIndexCacheFileFromFile(ctx, cachefn)
}

if !b.bucket.IsObjNotFoundErr(errors.Cause(err)) {
Expand All @@ -1184,10 +1179,10 @@ func (b *bucketBlock) loadIndexCache(ctx context.Context) (err error) {
return errors.Wrap(err, "write index cache")
}

return errors.Wrap(b.loadIndexCacheFromFile(ctx, cachefn), "read index cache")
return errors.Wrap(b.loadIndexCacheFileFromFile(ctx, cachefn), "read index cache")
}

func (b *bucketBlock) loadIndexCacheFromFile(ctx context.Context, cache string) (err error) {
func (b *bucketBlock) loadIndexCacheFileFromFile(ctx context.Context, cache string) (err error) {
b.indexVersion, b.symbols, b.lvals, b.postings, err = block.ReadIndexCache(b.logger, cache)
return err
}
Expand Down Expand Up @@ -1249,13 +1244,13 @@ type bucketIndexReader struct {
block *bucketBlock
dec *index.Decoder
stats *queryStats
cache *storecache.IndexCache
cache indexCache

mtx sync.Mutex
loadedSeries map[uint64][]byte
}

func newBucketIndexReader(ctx context.Context, logger log.Logger, block *bucketBlock, cache *storecache.IndexCache) *bucketIndexReader {
func newBucketIndexReader(ctx context.Context, logger log.Logger, block *bucketBlock, cache indexCache) *bucketIndexReader {
r := &bucketIndexReader{
logger: logger,
ctx: ctx,
Expand Down
84 changes: 72 additions & 12 deletions pkg/store/bucket_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (
"testing"
"time"

"github.com/oklog/ulid"

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove the spacey

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will do in next PR.

"github.com/go-kit/kit/log"
"github.com/improbable-eng/thanos/pkg/block"
"github.com/improbable-eng/thanos/pkg/block/metadata"
Expand All @@ -23,12 +25,46 @@ import (
"github.com/prometheus/tsdb/labels"
)

type noopCache struct{}

func (noopCache) SetPostings(b ulid.ULID, l labels.Label, v []byte) {}
func (noopCache) Postings(b ulid.ULID, l labels.Label) ([]byte, bool) { return nil, false }
func (noopCache) SetSeries(b ulid.ULID, id uint64, v []byte) {}
func (noopCache) Series(b ulid.ULID, id uint64) ([]byte, bool) { return nil, false }

type swappableCache struct {
ptr indexCache
}

func (c *swappableCache) SwapWith(ptr2 indexCache) {
c.ptr = ptr2
}

func (c *swappableCache) SetPostings(b ulid.ULID, l labels.Label, v []byte) {
c.ptr.SetPostings(b, l, v)
}

func (c *swappableCache) Postings(b ulid.ULID, l labels.Label) ([]byte, bool) {
return c.ptr.Postings(b, l)
}

func (c *swappableCache) SetSeries(b ulid.ULID, id uint64, v []byte) {
c.ptr.SetSeries(b, id, v)
}

func (c *swappableCache) Series(b ulid.ULID, id uint64) ([]byte, bool) {
return c.ptr.Series(b, id)
}

type storeSuite struct {
cancel context.CancelFunc
wg sync.WaitGroup

store *BucketStore
minTime, maxTime int64
cache *swappableCache

logger log.Logger
}

func (s *storeSuite) Close() {
Expand All @@ -53,7 +89,11 @@ func prepareStoreWithTestBlocks(t testing.TB, dir string, bkt objstore.Bucket, m
now := start

ctx, cancel := context.WithCancel(context.Background())
s := &storeSuite{cancel: cancel}
s := &storeSuite{
cancel: cancel,
logger: log.NewLogfmtLogger(os.Stderr),
cache: &swappableCache{},
}
blocks := 0
for i := 0; i < 3; i++ {
mint := timestamp.FromTime(now)
Expand All @@ -78,17 +118,17 @@ func prepareStoreWithTestBlocks(t testing.TB, dir string, bkt objstore.Bucket, m
meta, err := metadata.Read(dir2)
testutil.Ok(t, err)
meta.Thanos.Labels = map[string]string{"ext2": "value2"}
testutil.Ok(t, metadata.Write(log.NewNopLogger(), dir2, meta))
testutil.Ok(t, metadata.Write(s.logger, dir2, meta))

testutil.Ok(t, block.Upload(ctx, log.NewNopLogger(), bkt, dir1))
testutil.Ok(t, block.Upload(ctx, log.NewNopLogger(), bkt, dir2))
testutil.Ok(t, block.Upload(ctx, s.logger, bkt, dir1))
testutil.Ok(t, block.Upload(ctx, s.logger, bkt, dir2))
blocks += 2

testutil.Ok(t, os.RemoveAll(dir1))
testutil.Ok(t, os.RemoveAll(dir2))
}

store, err := NewBucketStore(log.NewLogfmtLogger(os.Stderr), nil, bkt, dir, 100, 0, maxSampleCount, 20, false, 20)
store, err := NewBucketStore(s.logger, nil, bkt, dir, s.cache, 0, maxSampleCount, 20, false, 20)
testutil.Ok(t, err)

s.store = store
Expand Down Expand Up @@ -310,13 +350,6 @@ func testBucketStore_e2e(t testing.TB, ctx context.Context, s *storeSuite) {
} {
t.Log("Run ", i)

// Always clean cache before each test.
s.store.indexCache, err = storecache.NewIndexCache(log.NewNopLogger(), nil, storecache.Opts{
MaxSizeBytes: 100,
MaxItemSizeBytes: 100,
})
testutil.Ok(t, err)

srv := newStoreSeriesServer(ctx)

testutil.Ok(t, s.store.Series(tcase.req, srv))
Expand All @@ -341,6 +374,26 @@ func TestBucketStore_e2e(t *testing.T) {
s := prepareStoreWithTestBlocks(t, dir, bkt, false, 0)
defer s.Close()

t.Log("Test with no index cache")
s.cache.SwapWith(noopCache{})
testBucketStore_e2e(t, ctx, s)

t.Log("Test with large, sufficient index cache")
indexCache, err := storecache.NewIndexCache(s.logger, nil, storecache.Opts{
MaxItemSizeBytes: 1e5,
MaxSizeBytes: 2e5,
})
testutil.Ok(t, err)
s.cache.SwapWith(indexCache)
testBucketStore_e2e(t, ctx, s)

t.Log("Test with small index cache")
indexCache2, err := storecache.NewIndexCache(s.logger, nil, storecache.Opts{
MaxItemSizeBytes: 50,
MaxSizeBytes: 100,
})
testutil.Ok(t, err)
s.cache.SwapWith(indexCache2)
testBucketStore_e2e(t, ctx, s)
})
}
Expand Down Expand Up @@ -370,6 +423,13 @@ func TestBucketStore_ManyParts_e2e(t *testing.T) {
s := prepareStoreWithTestBlocks(t, dir, bkt, true, 0)
defer s.Close()

indexCache, err := storecache.NewIndexCache(s.logger, nil, storecache.Opts{
MaxItemSizeBytes: 1e5,
MaxSizeBytes: 2e5,
})
testutil.Ok(t, err)
s.cache.SwapWith(indexCache)

testBucketStore_e2e(t, ctx, s)
})
}
2 changes: 1 addition & 1 deletion pkg/store/bucket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ func TestBucketStore_Info(t *testing.T) {
dir, err := ioutil.TempDir("", "prometheus-test")
testutil.Ok(t, err)

bucketStore, err := NewBucketStore(nil, nil, nil, dir, 2e5, 2e5, 0, 0, false, 20)
bucketStore, err := NewBucketStore(nil, nil, nil, dir, noopCache{}, 2e5, 0, 0, false, 20)
testutil.Ok(t, err)

resp, err := bucketStore.Info(ctx, &storepb.InfoRequest{})
Expand Down