Skip to content

Commit

Permalink
receive,ruler: Upgraded TSDB and used ChunkIterator in Series TSDB St…
Browse files Browse the repository at this point in the history
…ore. (#2876)

Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com>
  • Loading branch information
bwplotka authored Aug 10, 2020
1 parent dec6c09 commit 626f248
Show file tree
Hide file tree
Showing 9 changed files with 70 additions and 77 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ We use *breaking* word for marking changes that are not backward compatible (rel
- [#2902](https://github.com/thanos-io/thanos/pull/2902) ui: React: Separate dedupe and partial response checkboxes per panel.
- [#2931](https://github.com/thanos-io/thanos/pull/2931) Query: Allow passing a `storeMatcher[]` to select matching stores when debugging the querier. See [documentation](https://thanos.io/components/query.md/#store-filtering)
- [#2991](https://github.com/thanos-io/thanos/pull/2991) store: `operation` label value `getrange` changed to `get_range` for `thanos_store_bucket_cache_operation_requests_total` and `thanos_store_bucket_cache_operation_hits_total` to be consistent with bucket operation metrics.
- [#2876](https://github.com/thanos-io/thanos/pull/2876) Receive,Ruler: Updated TSDB and switched to ChunkIterators instead of sample one, which avoids unnecessary decoding / encoding.

## [v0.14.0](https://github.com/thanos-io/thanos/releases/tag/v0.14.0) - 2020.07.10

Expand Down
1 change: 0 additions & 1 deletion cmd/thanos/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,6 @@ func registerQuery(m map[string]setupFunc, app *kingpin.Application) {
}

flagsMap := getFlagsMap(cmd.Model().Flags)

return runQuery(
g,
logger,
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,8 @@ require (
// so that we don't get errors about being incompatible with the Go proxies.
// See https://github.com/thanos-io/thanos/issues/1415
replace (
// Make sure Cortex is not forcing us to some other Prometheus version.
github.com/prometheus/prometheus => github.com/prometheus/prometheus v1.8.2-0.20200805082714-e0cf219f0de2
// Make sure Prometheus version is pinned as Prometheus semver does not include Go APIs.
github.com/prometheus/prometheus => github.com/prometheus/prometheus v1.8.2-0.20200807135816-2899773b0159
google.golang.org/grpc => google.golang.org/grpc v1.29.1
k8s.io/klog => k8s.io/klog v0.3.1
k8s.io/kube-openapi => k8s.io/kube-openapi v0.0.0-20190228160746-b3a7cee44a30
Expand Down
5 changes: 2 additions & 3 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -857,7 +857,6 @@ github.com/prometheus/common v0.6.0/go.mod h1:eBmuwkDJBwy6iBfxCBob6t6dR6ENT/y+J+
github.com/prometheus/common v0.7.0/go.mod h1:DjGbpBbp5NYNiECxcL/VnbXCCaQpKd3tt26CguLLsqA=
github.com/prometheus/common v0.8.0/go.mod h1:PC/OgXc+UN7B4ALwvn1yzVZmVwvhXp5JsbBv6wSv6i0=
github.com/prometheus/common v0.9.1/go.mod h1:yhUN8i9wzaXS3w1O07YhxHEBxD+W35wd8bs7vj7HSQ4=
github.com/prometheus/common v0.10.0 h1:RyRA7RzGXQZiW+tGMr7sxa85G1z0yOpM1qq5c8lNawc=
github.com/prometheus/common v0.10.0/go.mod h1:Tlit/dnDKsSWFlCLTWaA1cyBgKHSMdTB80sz/V91rCo=
github.com/prometheus/common v0.11.1 h1:0ZISXCMRuCZcxF77aT1BXY5m74mX2vrGYl1dSwBI0Jo=
github.com/prometheus/common v0.11.1/go.mod h1:U+gB1OBLb1lF3O42bTCL+FK18tX9Oar16Clt/msog/s=
Expand All @@ -876,8 +875,8 @@ github.com/prometheus/procfs v0.0.8/go.mod h1:7Qr8sr6344vo1JqZ6HhLceV9o3AJ1Ff+Gx
github.com/prometheus/procfs v0.0.11/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4OA4YeYWdaU=
github.com/prometheus/procfs v0.1.3 h1:F0+tqvhOksq22sc6iCHF5WGlWjdwj92p0udFh1VFBS8=
github.com/prometheus/procfs v0.1.3/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4OA4YeYWdaU=
github.com/prometheus/prometheus v1.8.2-0.20200805082714-e0cf219f0de2 h1:eQ88y1vfbXuclr6B04jYTmhc6ydXlBUSIaXCjEs0osk=
github.com/prometheus/prometheus v1.8.2-0.20200805082714-e0cf219f0de2/go.mod h1:i1KZsZmyDTJRvnR7zE8z/u2v+tkpPjoiPpnWp6nwhr0=
github.com/prometheus/prometheus v1.8.2-0.20200807135816-2899773b0159 h1:mT66e//l/+QugUat5A42YvIPxlsS6O/yr9UtjlDhPYw=
github.com/prometheus/prometheus v1.8.2-0.20200807135816-2899773b0159/go.mod h1:zfAqy/MwhMFajB9E2n12/9gG2fvofIE9uKDtlZCDxqs=
github.com/rafaeljusto/redigomock v0.0.0-20190202135759-257e089e14a1/go.mod h1:JaY6n2sDr+z2WTsXkOmNRUfDy6FN0L6Nk7x06ndm4tY=
github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
github.com/retailnext/hllpp v1.0.1-0.20180308014038-101a6d2f8b52/go.mod h1:RDpi1RftBQPUCDRw6SmxeaREsAaRKnOclghuzp/WRzc=
Expand Down
2 changes: 1 addition & 1 deletion pkg/rules/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,8 +156,8 @@ func (m *Manager) Stop() {
mgr.Stop()
}
}

func (m *Manager) protoRuleGroups() []*rulespb.RuleGroup {

rg := m.RuleGroups()
res := make([]*rulespb.RuleGroup, 0, len(rg))
for _, g := range rg {
Expand Down
7 changes: 3 additions & 4 deletions pkg/rules/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func (n nopQueryable) Querier(_ context.Context, _, _ int64) (storage.Querier, e
}

// Regression test against https://github.com/thanos-io/thanos/issues/1779.
func TestRun(t *testing.T) {
func TestRun_Subqueries(t *testing.T) {
dir, err := ioutil.TempDir("", "test_rule_run")
testutil.Ok(t, err)
defer func() { testutil.Ok(t, os.RemoveAll(dir)) }()
Expand Down Expand Up @@ -84,17 +84,16 @@ groups:
},
labels.FromStrings("replica", "1"),
)
testutil.Ok(t, thanosRuleMgr.Update(10*time.Second, []string{filepath.Join(dir, "rule.yaml")}))
testutil.Ok(t, thanosRuleMgr.Update(1*time.Second, []string{filepath.Join(dir, "rule.yaml")}))

thanosRuleMgr.Run()
defer thanosRuleMgr.Stop()

select {
case <-time.After(2 * time.Minute):
case <-time.After(1 * time.Minute):
t.Fatal("timeout while waiting on rule manager query evaluation")
case <-queryDone:
}

testutil.Equals(t, "rate(some_metric[1h:5m] offset 1d)", query)
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/store/storepb/testutil/series.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func CreateHeadWithSeries(t testing.TB, j int, opts HeadGenOptions) (*tsdb.Head,
testutil.Ok(t, err)
}

h, err := tsdb.NewHead(nil, nil, w, 10000000, tsdbDir, nil, tsdb.DefaultStripeSize, nil)
h, err := tsdb.NewHead(nil, nil, w, tsdb.DefaultBlockDuration, tsdbDir, nil, tsdb.DefaultStripeSize, nil)
testutil.Ok(t, err)

app := h.Appender(context.Background())
Expand Down
124 changes: 60 additions & 64 deletions pkg/store/tsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb/chunkenc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

Expand All @@ -24,18 +23,19 @@ import (
)

type TSDBReader interface {
storage.Queryable
storage.ChunkQueryable
StartTime() (int64, error)
}

// TSDBStore implements the store API against a local TSDB instance.
// It attaches the provided external labels to all results. It only responds with raw data
// and does not support downsampling.
type TSDBStore struct {
logger log.Logger
db TSDBReader
component component.StoreAPI
externalLabels labels.Labels
logger log.Logger
db TSDBReader
component component.StoreAPI
externalLabels labels.Labels
maxBytesPerFrame int
}

// ReadWriteTSDBStore is a TSDBStore that can also be written to.
Expand All @@ -50,10 +50,11 @@ func NewTSDBStore(logger log.Logger, _ prometheus.Registerer, db TSDBReader, com
logger = log.NewNopLogger()
}
return &TSDBStore{
logger: logger,
db: db,
component: component,
externalLabels: externalLabels,
logger: logger,
db: db,
component: component,
externalLabels: externalLabels,
maxBytesPerFrame: 1024 * 1024, // 1MB as recommended by gRPC.
}
}

Expand Down Expand Up @@ -109,7 +110,7 @@ func (s *TSDBStore) Series(r *storepb.SeriesRequest, srv storepb.Store_SeriesSer
return status.Error(codes.InvalidArgument, err.Error())
}

q, err := s.db.Querier(context.Background(), r.MinTime, r.MaxTime)
q, err := s.db.ChunkQuerier(context.Background(), r.MinTime, r.MaxTime)
if err != nil {
return status.Error(codes.Internal, err.Error())
}
Expand All @@ -119,72 +120,67 @@ func (s *TSDBStore) Series(r *storepb.SeriesRequest, srv storepb.Store_SeriesSer
set = q.Select(false, nil, matchers...)
respSeries storepb.Series
)

// Stream at most one series per frame; series may be split over multiple frames according to maxBytesInFrame.
for set.Next() {
series := set.At()

respSeries.Labels = s.translateAndExtendLabels(series.Labels(), s.externalLabels)

if !r.SkipChunks {
// TODO(fabxc): An improvement over this trivial approach would be to directly
// use the chunks provided by TSDB in the response.
c, err := s.encodeChunks(series.Iterator(), MaxSamplesPerChunk)
if err != nil {
return status.Errorf(codes.Internal, "encode chunk: %s", err)
respSeries.Chunks = respSeries.Chunks[:0]
if r.SkipChunks {
if err := srv.Send(storepb.NewSeriesResponse(&respSeries)); err != nil {
return status.Error(codes.Aborted, err.Error())
}

respSeries.Chunks = append(respSeries.Chunks[:0], c...)
continue
}

if err := srv.Send(storepb.NewSeriesResponse(&respSeries)); err != nil {
return status.Error(codes.Aborted, err.Error())
frameBytesLeft := s.maxBytesPerFrame
for _, lbl := range respSeries.Labels {
frameBytesLeft -= lbl.Size()
}
}
if err := set.Err(); err != nil {
return status.Error(codes.Internal, err.Error())
}
return nil
}

func (s *TSDBStore) encodeChunks(it chunkenc.Iterator, maxSamplesPerChunk int) (chks []storepb.AggrChunk, err error) {
var (
chkMint int64
chk *chunkenc.XORChunk
app chunkenc.Appender
isNext = it.Next()
)

for isNext {
if chk == nil {
chk = chunkenc.NewXORChunk()
app, err = chk.Appender()
if err != nil {
return nil, err
chIter := series.Iterator()
isNext := chIter.Next()
for isNext {
chk := chIter.At()
if chk.Chunk == nil {
return status.Errorf(codes.Internal, "TSDBStore: found not populated chunk returned by SeriesSet at ref: %v", chk.Ref)
}
chkMint, _ = it.At()
}

app.Append(it.At())
chkMaxt, _ := it.At()
respSeries.Chunks = append(respSeries.Chunks, storepb.AggrChunk{
MinTime: chk.MinTime,
MaxTime: chk.MaxTime,
Raw: &storepb.Chunk{
Type: storepb.Chunk_Encoding(chk.Chunk.Encoding() - 1), // Proto chunk encoding is one off to TSDB one.
Data: chk.Chunk.Bytes(),
},
})
frameBytesLeft -= respSeries.Chunks[len(respSeries.Chunks)-1].Size()

// We are fine with minor inaccuracy of max bytes per frame. The inaccuracy will be max of full chunk size.
isNext = chIter.Next()
if frameBytesLeft > 0 && isNext {
continue
}

isNext = it.Next()
if isNext && chk.NumSamples() < maxSamplesPerChunk {
continue
if err := srv.Send(storepb.NewSeriesResponse(&respSeries)); err != nil {
return status.Error(codes.Aborted, err.Error())
}
respSeries.Chunks = respSeries.Chunks[:0]
}
if err := chIter.Err(); err != nil {
return status.Error(codes.Internal, errors.Wrap(err, "chunk iter").Error())
}

// Cut the chunk.
chks = append(chks, storepb.AggrChunk{
MinTime: chkMint,
MaxTime: chkMaxt,
Raw: &storepb.Chunk{Type: storepb.Chunk_XOR, Data: chk.Bytes()},
})
chk = nil
}
if it.Err() != nil {
return nil, errors.Wrap(it.Err(), "read TSDB series")
if err := set.Err(); err != nil {
return status.Error(codes.Internal, err.Error())
}
for _, w := range set.Warnings() {
if err := srv.Send(storepb.NewWarnSeriesResponse(w)); err != nil {
return status.Error(codes.Aborted, err.Error())
}
}

return chks, nil

return nil
}

// translateAndExtendLabels transforms a metrics into a protobuf label set. It additionally
Expand Down Expand Up @@ -217,7 +213,7 @@ func (s *TSDBStore) translateAndExtendLabels(m, extend labels.Labels) []storepb.
func (s *TSDBStore) LabelNames(ctx context.Context, r *storepb.LabelNamesRequest) (
*storepb.LabelNamesResponse, error,
) {
q, err := s.db.Querier(ctx, r.Start, r.End)
q, err := s.db.ChunkQuerier(ctx, r.Start, r.End)
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
Expand All @@ -234,7 +230,7 @@ func (s *TSDBStore) LabelNames(ctx context.Context, r *storepb.LabelNamesRequest
func (s *TSDBStore) LabelValues(ctx context.Context, r *storepb.LabelValuesRequest) (
*storepb.LabelValuesResponse, error,
) {
q, err := s.db.Querier(ctx, r.Start, r.End)
q, err := s.db.ChunkQuerier(ctx, r.Start, r.End)
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
Expand Down
1 change: 0 additions & 1 deletion pkg/store/tsdb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,6 @@ func TestTSDBStore_LabelValues(t *testing.T) {
}

tsdbStore := NewTSDBStore(nil, nil, db, component.Rule, labels.FromStrings("region", "eu-west"))

now := time.Now()
head := db.Head()
for _, tc := range []struct {
Expand Down

0 comments on commit 626f248

Please sign in to comment.