Skip to content

Commit

Permalink
Enabled index-header under experimental flag.
Browse files Browse the repository at this point in the history
Enabled it also on all our tests.

Depends on: #1952

Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com>
  • Loading branch information
bwplotka committed Jan 11, 2020
1 parent 2163a97 commit 48e75fa
Show file tree
Hide file tree
Showing 5 changed files with 140 additions and 42 deletions.
9 changes: 9 additions & 0 deletions cmd/thanos/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,9 @@ func registerStore(m map[string]setupFunc, app *kingpin.Application) {

selectorRelabelConf := regSelectorRelabelFlags(cmd)

enableIndexHeader := cmd.Flag("experimental.enable-index-header", "If true, Store Gateway will recreate index-header instead of index-cache.json for each block. This will replace index-cache.json permanently once it will be out of experimental stage.").
Hidden().Default("false").Bool()

m[component.Store.String()] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, debugLogging bool) error {
if minTime.PrometheusTimestamp() > maxTime.PrometheusTimestamp() {
return errors.Errorf("invalid argument: --min-time '%s' can't be greater than --max-time '%s'",
Expand Down Expand Up @@ -109,6 +112,7 @@ func registerStore(m map[string]setupFunc, app *kingpin.Application) {
},
selectorRelabelConf,
*advertiseCompatibilityLabel,
*enableIndexHeader,
)
}
}
Expand Down Expand Up @@ -140,6 +144,7 @@ func runStore(
filterConf *store.FilterConfig,
selectorRelabelConf *extflag.PathOrContent,
advertiseCompatibilityLabel bool,
enableIndexHeader bool,
) error {
// Initiate HTTP listener providing metrics endpoint and readiness/liveness probes.
statusProber := prober.New(component, logger, prometheus.WrapRegistererWithPrefix("thanos_", reg))
Expand Down Expand Up @@ -214,6 +219,9 @@ func runStore(
return errors.Wrap(err, "meta fetcher")
}

if enableIndexHeader {
level.Info(logger).Log("msg", "index-header instead of index-cache.json enabled")
}
bs, err := store.NewBucketStore(
logger,
reg,
Expand All @@ -228,6 +236,7 @@ func runStore(
blockSyncConcurrency,
filterConf,
advertiseCompatibilityLabel,
enableIndexHeader,
)
if err != nil {
return errors.Wrap(err, "create object storage store")
Expand Down
94 changes: 83 additions & 11 deletions pkg/store/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/tsdb/chunkenc"
"github.com/prometheus/prometheus/tsdb/chunks"
"github.com/prometheus/prometheus/tsdb/encoding"
"github.com/prometheus/prometheus/tsdb/fileutil"
"github.com/prometheus/prometheus/tsdb/index"
"github.com/thanos-io/thanos/pkg/block"
Expand Down Expand Up @@ -226,6 +227,7 @@ type BucketStore struct {
filterConfig *FilterConfig
advLabelSets []storepb.LabelSet
enableCompatibilityLabel bool
enableIndexHeader bool
}

// NewBucketStore creates a new bucket backed store that implements the store API against
Expand All @@ -244,6 +246,7 @@ func NewBucketStore(
blockSyncConcurrency int,
filterConfig *FilterConfig,
enableCompatibilityLabel bool,
enableIndexHeader bool,
) (*BucketStore, error) {
if logger == nil {
logger = log.NewNopLogger()
Expand Down Expand Up @@ -280,6 +283,7 @@ func NewBucketStore(
samplesLimiter: NewLimiter(maxSampleCount, metrics.queriesDropped),
partitioner: gapBasedPartitioner{maxGapSize: maxGapSize},
enableCompatibilityLabel: enableCompatibilityLabel,
enableIndexHeader: enableIndexHeader,
}
s.metrics = metrics

Expand Down Expand Up @@ -439,9 +443,17 @@ func (s *BucketStore) addBlock(ctx context.Context, meta *metadata.Meta) (err er
lset := labels.FromMap(meta.Thanos.Labels)
h := lset.Hash()

jr, err := indexheader.NewJSONReader(ctx, s.logger, s.bkt, s.dir, meta.ULID)
if err != nil {
return errors.Wrap(err, "create index header reader")
var indexHeaderReader indexheader.Reader
if s.enableIndexHeader {
indexHeaderReader, err = indexheader.NewBinaryReader(ctx, s.logger, s.bkt, s.dir, meta.ULID)
if err != nil {
return errors.Wrap(err, "create index header reader")
}
} else {
indexHeaderReader, err = indexheader.NewJSONReader(ctx, s.logger, s.bkt, s.dir, meta.ULID)
if err != nil {
return errors.Wrap(err, "create index cache reader")
}
}

b, err := newBucketBlock(
Expand All @@ -452,7 +464,7 @@ func (s *BucketStore) addBlock(ctx context.Context, meta *metadata.Meta) (err er
dir,
s.indexCache,
s.chunkPool,
jr,
indexHeaderReader,
s.partitioner,
)
if err != nil {
Expand Down Expand Up @@ -1452,6 +1464,7 @@ func (r *bucketIndexReader) fetchPostings(groups []*postingGroup) error {
if err != nil {
return errors.Wrap(err, "decode postings")
}

g.Fill(j, l)
continue
}
Expand Down Expand Up @@ -1510,20 +1523,20 @@ func (r *bucketIndexReader) fetchPostings(groups []*postingGroup) error {
r.stats.postingsFetchedSizeSum += int(length)

for _, p := range ptrs[i:j] {
c := b[p.ptr.Start-start : p.ptr.End-start]

_, fetchedPostings, err := r.dec.Postings(c)
// index-header can estimate endings, which means we need to resize the endings.
pBytes, err := resizePostings(b[p.ptr.Start-start : p.ptr.End-start])
if err != nil {
return errors.Wrap(err, "read postings list")
return err
}

// Return postings and fill LRU cache.
groups[p.groupID].Fill(p.keyID, fetchedPostings)
r.cache.StorePostings(r.ctx, r.block.meta.ULID, groups[p.groupID].keys[p.keyID], c)
// Truncate first 4 bytes which are length of posting.
groups[p.groupID].Fill(p.keyID, newBigEndianPostings(pBytes[4:]))
r.cache.StorePostings(r.ctx, r.block.meta.ULID, groups[p.groupID].keys[p.keyID], pBytes)

// If we just fetched it we still have to update the stats for touched postings.
r.stats.postingsTouched++
r.stats.postingsTouchedSizeSum += len(c)
r.stats.postingsTouchedSizeSum += len(pBytes)
}
return nil
})
Expand All @@ -1532,6 +1545,65 @@ func (r *bucketIndexReader) fetchPostings(groups []*postingGroup) error {
return g.Wait()
}

func resizePostings(b []byte) ([]byte, error) {
d := encoding.Decbuf{B: b}
n := d.Be32int()
if d.Err() != nil {
return nil, errors.Wrap(d.Err(), "read postings list")
}
// 4 for posting length, then n * 4, foreach each big endian posting.
return b[:4+n*4], nil
}

// bigEndianPostings implements the Postings interface over a byte stream of
// big endian numbers.
type bigEndianPostings struct {
list []byte
cur uint32
}

// TODO(bwplotka): Expose those inside Prometheus.
func newBigEndianPostings(list []byte) *bigEndianPostings {
return &bigEndianPostings{list: list}
}

func (it *bigEndianPostings) At() uint64 {
return uint64(it.cur)
}

func (it *bigEndianPostings) Next() bool {
if len(it.list) >= 4 {
it.cur = binary.BigEndian.Uint32(it.list)
it.list = it.list[4:]
return true
}
return false
}

func (it *bigEndianPostings) Seek(x uint64) bool {
if uint64(it.cur) >= x {
return true
}

num := len(it.list) / 4
// Do binary search between current position and end.
i := sort.Search(num, func(i int) bool {
return binary.BigEndian.Uint32(it.list[i*4:]) >= uint32(x)
})
if i < num {
j := i * 4
it.cur = binary.BigEndian.Uint32(it.list[j:])
it.list = it.list[j+4:]
return true
}
it.list = nil
return false
}

func (it *bigEndianPostings) Err() error {
return nil
}

func (r *bucketIndexReader) PreloadSeries(ids []uint64) error {
const maxSeriesSize = 64 * 1024

Expand Down
73 changes: 43 additions & 30 deletions pkg/store/bucket_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@ func prepareStoreWithTestBlocks(t testing.TB, dir string, bkt objstore.Bucket, m
20,
filterConf,
true,
true,
)
testutil.Ok(t, err)
s.store = store
Expand All @@ -178,7 +179,8 @@ func prepareStoreWithTestBlocks(t testing.TB, dir string, bkt objstore.Bucket, m
return s
}

func testBucketStore_e2e(t testing.TB, ctx context.Context, s *storeSuite) {
// TODO(bwplotka): Benchmark Series.
func testBucketStore_e2e(t *testing.T, ctx context.Context, s *storeSuite) {
mint, maxt := s.store.TimeRange()
testutil.Equals(t, s.minTime, mint)
testutil.Equals(t, s.maxTime, maxt)
Expand Down Expand Up @@ -392,16 +394,18 @@ func testBucketStore_e2e(t testing.TB, ctx context.Context, s *storeSuite) {
},
},
} {
t.Log("Run ", i)

srv := newStoreSeriesServer(ctx)

testutil.Ok(t, s.store.Series(tcase.req, srv))
testutil.Equals(t, len(tcase.expected), len(srv.SeriesSet))

for i, s := range srv.SeriesSet {
testutil.Equals(t, tcase.expected[i], s.Labels)
testutil.Equals(t, tcase.expectedChunkLen, len(s.Chunks))
if ok := t.Run(fmt.Sprint(i), func(t *testing.T) {
srv := newStoreSeriesServer(ctx)

testutil.Ok(t, s.store.Series(tcase.req, srv))
testutil.Equals(t, len(tcase.expected), len(srv.SeriesSet))

for i, s := range srv.SeriesSet {
testutil.Equals(t, tcase.expected[i], s.Labels)
testutil.Equals(t, tcase.expectedChunkLen, len(s.Chunks))
}
}); !ok {
return
}
}
}
Expand All @@ -417,27 +421,36 @@ func TestBucketStore_e2e(t *testing.T) {

s := prepareStoreWithTestBlocks(t, dir, bkt, false, 0, emptyRelabelConfig, allowAllFilterConf)

t.Log("Test with no index cache")
s.cache.SwapWith(noopCache{})
testBucketStore_e2e(t, ctx, s)
if ok := t.Run("no index cache", func(t *testing.T) {
s.cache.SwapWith(noopCache{})
testBucketStore_e2e(t, ctx, s)
}); !ok {
return
}

t.Log("Test with large, sufficient index cache")
indexCache, err := storecache.NewInMemoryIndexCacheWithConfig(s.logger, nil, storecache.InMemoryIndexCacheConfig{
MaxItemSize: 1e5,
MaxSize: 2e5,
})
testutil.Ok(t, err)
s.cache.SwapWith(indexCache)
testBucketStore_e2e(t, ctx, s)
if ok := t.Run("with large, sufficient index cache", func(t *testing.T) {
indexCache, err := storecache.NewInMemoryIndexCacheWithConfig(s.logger, nil, storecache.InMemoryIndexCacheConfig{
MaxItemSize: 1e5,
MaxSize: 2e5,
})
testutil.Ok(t, err)
s.cache.SwapWith(indexCache)
testBucketStore_e2e(t, ctx, s)
}); !ok {
return
}

t.Log("Test with small index cache")
indexCache2, err := storecache.NewInMemoryIndexCacheWithConfig(s.logger, nil, storecache.InMemoryIndexCacheConfig{
MaxItemSize: 50,
MaxSize: 100,
})
testutil.Ok(t, err)
s.cache.SwapWith(indexCache2)
testBucketStore_e2e(t, ctx, s)
if ok := t.Run("with small index cache", func(t *testing.T) {
indexCache2, err := storecache.NewInMemoryIndexCacheWithConfig(s.logger, nil, storecache.InMemoryIndexCacheConfig{
MaxItemSize: 50,
MaxSize: 100,
})
testutil.Ok(t, err)
s.cache.SwapWith(indexCache2)
testBucketStore_e2e(t, ctx, s)
}); !ok {
return
}
})
}

Expand Down
5 changes: 4 additions & 1 deletion pkg/store/bucket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -447,6 +447,7 @@ func TestBucketStore_Info(t *testing.T) {
20,
allowAllFilterConf,
true,
true,
)
testutil.Ok(t, err)

Expand Down Expand Up @@ -687,7 +688,9 @@ func testSharding(t *testing.T, reuseDisk string, bkt objstore.Bucket, all ...ul
false,
20,
allowAllFilterConf,
true)
true,
true,
)
testutil.Ok(t, err)

testutil.Ok(t, bucketStore.InitialSync(context.Background()))
Expand Down
1 change: 1 addition & 0 deletions test/e2e/spinup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,7 @@ func storeGateway(http, grpc address, bucketConfig []byte, relabelConfig []byte)
// Accelerated sync time for quicker test (3m by default).
"--sync-block-duration", "5s",
"--selector.relabel-config", string(relabelConfig),
"--experimental.enable-index-header",
)), nil
},
}
Expand Down

0 comments on commit 48e75fa

Please sign in to comment.