Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Store: Make queryStats log with human-readable format. #4885

Merged
merged 2 commits into from
Nov 29, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
124 changes: 63 additions & 61 deletions pkg/store/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"sync"
"time"

"github.com/alecthomas/units"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/gogo/protobuf/types"
Expand Down Expand Up @@ -1091,27 +1092,28 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie
defer func() {
s.metrics.seriesDataTouched.WithLabelValues("postings").Observe(float64(stats.postingsTouched))
s.metrics.seriesDataFetched.WithLabelValues("postings").Observe(float64(stats.postingsFetched))
s.metrics.seriesDataSizeTouched.WithLabelValues("postings").Observe(float64(stats.postingsTouchedSizeSum))
s.metrics.seriesDataSizeFetched.WithLabelValues("postings").Observe(float64(stats.postingsFetchedSizeSum))
s.metrics.seriesDataSizeTouched.WithLabelValues("postings").Observe(float64(stats.PostingsTouchedSizeSum))
s.metrics.seriesDataSizeFetched.WithLabelValues("postings").Observe(float64(stats.PostingsFetchedSizeSum))
s.metrics.seriesDataTouched.WithLabelValues("series").Observe(float64(stats.seriesTouched))
s.metrics.seriesDataFetched.WithLabelValues("series").Observe(float64(stats.seriesFetched))
s.metrics.seriesDataSizeTouched.WithLabelValues("series").Observe(float64(stats.seriesTouchedSizeSum))
s.metrics.seriesDataSizeFetched.WithLabelValues("series").Observe(float64(stats.seriesFetchedSizeSum))
s.metrics.seriesDataSizeTouched.WithLabelValues("series").Observe(float64(stats.SeriesTouchedSizeSum))
s.metrics.seriesDataSizeFetched.WithLabelValues("series").Observe(float64(stats.SeriesFetchedSizeSum))
s.metrics.seriesDataTouched.WithLabelValues("chunks").Observe(float64(stats.chunksTouched))
s.metrics.seriesDataFetched.WithLabelValues("chunks").Observe(float64(stats.chunksFetched))
s.metrics.seriesDataSizeTouched.WithLabelValues("chunks").Observe(float64(stats.chunksTouchedSizeSum))
s.metrics.seriesDataSizeFetched.WithLabelValues("chunks").Observe(float64(stats.chunksFetchedSizeSum))
s.metrics.seriesDataSizeTouched.WithLabelValues("chunks").Observe(float64(stats.ChunksTouchedSizeSum))
s.metrics.seriesDataSizeFetched.WithLabelValues("chunks").Observe(float64(stats.ChunksFetchedSizeSum))
s.metrics.resultSeriesCount.Observe(float64(stats.mergedSeriesCount))
s.metrics.cachedPostingsCompressions.WithLabelValues(labelEncode).Add(float64(stats.cachedPostingsCompressions))
s.metrics.cachedPostingsCompressions.WithLabelValues(labelDecode).Add(float64(stats.cachedPostingsDecompressions))
s.metrics.cachedPostingsCompressionErrors.WithLabelValues(labelEncode).Add(float64(stats.cachedPostingsCompressionErrors))
s.metrics.cachedPostingsCompressionErrors.WithLabelValues(labelDecode).Add(float64(stats.cachedPostingsDecompressionErrors))
s.metrics.cachedPostingsCompressionTimeSeconds.WithLabelValues(labelEncode).Add(stats.cachedPostingsCompressionTimeSum.Seconds())
s.metrics.cachedPostingsCompressionTimeSeconds.WithLabelValues(labelDecode).Add(stats.cachedPostingsDecompressionTimeSum.Seconds())
s.metrics.cachedPostingsOriginalSizeBytes.Add(float64(stats.cachedPostingsOriginalSizeSum))
s.metrics.cachedPostingsCompressedSizeBytes.Add(float64(stats.cachedPostingsCompressedSizeSum))
s.metrics.cachedPostingsCompressionTimeSeconds.WithLabelValues(labelEncode).Add(stats.CachedPostingsCompressionTimeSum.Seconds())
s.metrics.cachedPostingsCompressionTimeSeconds.WithLabelValues(labelDecode).Add(stats.CachedPostingsDecompressionTimeSum.Seconds())
s.metrics.cachedPostingsOriginalSizeBytes.Add(float64(stats.CachedPostingsOriginalSizeSum))
s.metrics.cachedPostingsCompressedSizeBytes.Add(float64(stats.CachedPostingsCompressedSizeSum))

level.Debug(s.logger).Log("msg", "stats query processed",
"request", req,
"stats", fmt.Sprintf("%+v", stats), "err", err)
}()

Expand All @@ -1129,8 +1131,8 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie
return status.Error(code, err.Error())
}
stats.blocksQueried = len(res)
stats.getAllDuration = time.Since(begin)
s.metrics.seriesGetAllDuration.Observe(stats.getAllDuration.Seconds())
stats.GetAllDuration = time.Since(begin)
s.metrics.seriesGetAllDuration.Observe(stats.GetAllDuration.Seconds())
s.metrics.seriesBlocksQueried.Observe(float64(stats.blocksQueried))
}
// Merge the sub-results from each selected block.
Expand Down Expand Up @@ -1164,8 +1166,8 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie
err = status.Error(codes.Unknown, errors.Wrap(set.Err(), "expand series set").Error())
return
}
stats.mergeDuration = time.Since(begin)
s.metrics.seriesMergeDuration.Observe(stats.mergeDuration.Seconds())
stats.MergeDuration = time.Since(begin)
s.metrics.seriesMergeDuration.Observe(stats.MergeDuration.Seconds())

err = nil
})
Expand Down Expand Up @@ -1989,7 +1991,7 @@ func (r *bucketIndexReader) fetchPostings(ctx context.Context, keys []labels.Lab
// Get postings for the given key from cache first.
if b, ok := fromCache[key]; ok {
r.stats.postingsTouched++
r.stats.postingsTouchedSizeSum += len(b)
r.stats.PostingsTouchedSizeSum += units.Base2Bytes(len(b))

// Even if this instance is not using compression, there may be compressed
// entries in the cache written by other stores.
Expand All @@ -2001,7 +2003,7 @@ func (r *bucketIndexReader) fetchPostings(ctx context.Context, keys []labels.Lab
s := time.Now()
l, err = diffVarintSnappyDecode(b)
r.stats.cachedPostingsDecompressions += 1
r.stats.cachedPostingsDecompressionTimeSum += time.Since(s)
r.stats.CachedPostingsDecompressionTimeSum += time.Since(s)
if err != nil {
r.stats.cachedPostingsDecompressionErrors += 1
}
Expand Down Expand Up @@ -2064,8 +2066,8 @@ func (r *bucketIndexReader) fetchPostings(ctx context.Context, keys []labels.Lab
r.mtx.Lock()
r.stats.postingsFetchCount++
r.stats.postingsFetched += j - i
r.stats.postingsFetchDurationSum += fetchTime
r.stats.postingsFetchedSizeSum += int(length)
r.stats.PostingsFetchDurationSum += fetchTime
r.stats.PostingsFetchedSizeSum += units.Base2Bytes(int(length))
r.mtx.Unlock()

for _, p := range ptrs[i:j] {
Expand Down Expand Up @@ -2105,12 +2107,12 @@ func (r *bucketIndexReader) fetchPostings(ctx context.Context, keys []labels.Lab

// If we just fetched it we still have to update the stats for touched postings.
r.stats.postingsTouched++
r.stats.postingsTouchedSizeSum += len(pBytes)
r.stats.PostingsTouchedSizeSum += units.Base2Bytes(len(pBytes))
r.stats.cachedPostingsCompressions += compressions
r.stats.cachedPostingsCompressionErrors += compressionErrors
r.stats.cachedPostingsOriginalSizeSum += len(pBytes)
r.stats.cachedPostingsCompressedSizeSum += compressedSize
r.stats.cachedPostingsCompressionTimeSum += compressionTime
r.stats.CachedPostingsOriginalSizeSum += units.Base2Bytes(len(pBytes))
r.stats.CachedPostingsCompressedSizeSum += units.Base2Bytes(compressedSize)
r.stats.CachedPostingsCompressionTimeSum += compressionTime
r.mtx.Unlock()
}
return nil
Expand Down Expand Up @@ -2226,8 +2228,8 @@ func (r *bucketIndexReader) loadSeries(ctx context.Context, ids []storage.Series
r.mtx.Lock()
r.stats.seriesFetchCount++
r.stats.seriesFetched += len(ids)
r.stats.seriesFetchDurationSum += time.Since(begin)
r.stats.seriesFetchedSizeSum += int(end - start)
r.stats.SeriesFetchDurationSum += time.Since(begin)
r.stats.SeriesFetchedSizeSum += units.Base2Bytes(int(end - start))
r.mtx.Unlock()

for i, id := range ids {
Expand Down Expand Up @@ -2331,7 +2333,7 @@ func (r *bucketIndexReader) LoadSeriesForTime(ref storage.SeriesRef, lset *[]sym
}

r.stats.seriesTouched++
r.stats.seriesTouchedSizeSum += len(b)
r.stats.SeriesTouchedSizeSum += units.Base2Bytes(len(b))
return decodeSeriesForTime(b, lset, chks, skipChunks, mint, maxt)
}

Expand Down Expand Up @@ -2514,8 +2516,8 @@ func (r *bucketChunkReader) loadChunks(ctx context.Context, res []seriesEntry, a

r.stats.chunksFetchCount++
r.stats.chunksFetched += len(pIdxs)
r.stats.chunksFetchDurationSum += time.Since(fetchBegin)
r.stats.chunksFetchedSizeSum += int(part.End - part.Start)
r.stats.ChunksFetchDurationSum += time.Since(fetchBegin)
r.stats.ChunksFetchedSizeSum += units.Base2Bytes(int(part.End - part.Start))

var (
buf []byte
Expand Down Expand Up @@ -2576,7 +2578,7 @@ func (r *bucketChunkReader) loadChunks(ctx context.Context, res []seriesEntry, a
return errors.Wrap(err, "populate chunk")
}
r.stats.chunksTouched++
r.stats.chunksTouchedSizeSum += int(chunkDataLen)
r.stats.ChunksTouchedSizeSum += units.Base2Bytes(int(chunkDataLen))
continue
}

Expand All @@ -2600,15 +2602,15 @@ func (r *bucketChunkReader) loadChunks(ctx context.Context, res []seriesEntry, a
locked = true

r.stats.chunksFetchCount++
r.stats.chunksFetchDurationSum += time.Since(fetchBegin)
r.stats.chunksFetchedSizeSum += len(*nb)
r.stats.ChunksFetchDurationSum += time.Since(fetchBegin)
r.stats.ChunksFetchedSizeSum += units.Base2Bytes(len(*nb))
err = populateChunk(&(res[pIdx.seriesEntry].chks[pIdx.chunk]), rawChunk((*nb)[n:]), aggrs, r.save)
if err != nil {
r.block.chunkPool.Put(nb)
return errors.Wrap(err, "populate chunk")
}
r.stats.chunksTouched++
r.stats.chunksTouchedSizeSum += int(chunkDataLen)
r.stats.ChunksTouchedSizeSum += units.Base2Bytes(int(chunkDataLen))

r.block.chunkPool.Put(nb)
}
Expand Down Expand Up @@ -2662,79 +2664,79 @@ type queryStats struct {
blocksQueried int

postingsTouched int
postingsTouchedSizeSum int
PostingsTouchedSizeSum units.Base2Bytes
hanjm marked this conversation as resolved.
Show resolved Hide resolved
postingsToFetch int
postingsFetched int
postingsFetchedSizeSum int
PostingsFetchedSizeSum units.Base2Bytes
postingsFetchCount int
postingsFetchDurationSum time.Duration
PostingsFetchDurationSum time.Duration

cachedPostingsCompressions int
cachedPostingsCompressionErrors int
cachedPostingsOriginalSizeSum int
cachedPostingsCompressedSizeSum int
cachedPostingsCompressionTimeSum time.Duration
CachedPostingsOriginalSizeSum units.Base2Bytes
CachedPostingsCompressedSizeSum units.Base2Bytes
CachedPostingsCompressionTimeSum time.Duration
cachedPostingsDecompressions int
cachedPostingsDecompressionErrors int
cachedPostingsDecompressionTimeSum time.Duration
CachedPostingsDecompressionTimeSum time.Duration

seriesTouched int
seriesTouchedSizeSum int
SeriesTouchedSizeSum units.Base2Bytes
seriesFetched int
seriesFetchedSizeSum int
SeriesFetchedSizeSum units.Base2Bytes
seriesFetchCount int
seriesFetchDurationSum time.Duration
SeriesFetchDurationSum time.Duration

chunksTouched int
chunksTouchedSizeSum int
ChunksTouchedSizeSum units.Base2Bytes
chunksFetched int
chunksFetchedSizeSum int
ChunksFetchedSizeSum units.Base2Bytes
chunksFetchCount int
chunksFetchDurationSum time.Duration
ChunksFetchDurationSum time.Duration

getAllDuration time.Duration
GetAllDuration time.Duration
mergedSeriesCount int
mergedChunksCount int
mergeDuration time.Duration
MergeDuration time.Duration
}

func (s queryStats) merge(o *queryStats) *queryStats {
s.blocksQueried += o.blocksQueried

s.postingsTouched += o.postingsTouched
s.postingsTouchedSizeSum += o.postingsTouchedSizeSum
s.PostingsTouchedSizeSum += o.PostingsTouchedSizeSum
s.postingsFetched += o.postingsFetched
s.postingsFetchedSizeSum += o.postingsFetchedSizeSum
s.PostingsFetchedSizeSum += o.PostingsFetchedSizeSum
s.postingsFetchCount += o.postingsFetchCount
s.postingsFetchDurationSum += o.postingsFetchDurationSum
s.PostingsFetchDurationSum += o.PostingsFetchDurationSum

s.cachedPostingsCompressions += o.cachedPostingsCompressions
s.cachedPostingsCompressionErrors += o.cachedPostingsCompressionErrors
s.cachedPostingsOriginalSizeSum += o.cachedPostingsOriginalSizeSum
s.cachedPostingsCompressedSizeSum += o.cachedPostingsCompressedSizeSum
s.cachedPostingsCompressionTimeSum += o.cachedPostingsCompressionTimeSum
s.CachedPostingsOriginalSizeSum += o.CachedPostingsOriginalSizeSum
s.CachedPostingsCompressedSizeSum += o.CachedPostingsCompressedSizeSum
s.CachedPostingsCompressionTimeSum += o.CachedPostingsCompressionTimeSum
s.cachedPostingsDecompressions += o.cachedPostingsDecompressions
s.cachedPostingsDecompressionErrors += o.cachedPostingsDecompressionErrors
s.cachedPostingsDecompressionTimeSum += o.cachedPostingsDecompressionTimeSum
s.CachedPostingsDecompressionTimeSum += o.CachedPostingsDecompressionTimeSum

s.seriesTouched += o.seriesTouched
s.seriesTouchedSizeSum += o.seriesTouchedSizeSum
s.SeriesTouchedSizeSum += o.SeriesTouchedSizeSum
s.seriesFetched += o.seriesFetched
s.seriesFetchedSizeSum += o.seriesFetchedSizeSum
s.SeriesFetchedSizeSum += o.SeriesFetchedSizeSum
s.seriesFetchCount += o.seriesFetchCount
s.seriesFetchDurationSum += o.seriesFetchDurationSum
s.SeriesFetchDurationSum += o.SeriesFetchDurationSum

s.chunksTouched += o.chunksTouched
s.chunksTouchedSizeSum += o.chunksTouchedSizeSum
s.ChunksTouchedSizeSum += o.ChunksTouchedSizeSum
s.chunksFetched += o.chunksFetched
s.chunksFetchedSizeSum += o.chunksFetchedSizeSum
s.ChunksFetchedSizeSum += o.ChunksFetchedSizeSum
s.chunksFetchCount += o.chunksFetchCount
s.chunksFetchDurationSum += o.chunksFetchDurationSum
s.ChunksFetchDurationSum += o.ChunksFetchDurationSum

s.getAllDuration += o.getAllDuration
s.GetAllDuration += o.GetAllDuration
s.mergedSeriesCount += o.mergedSeriesCount
s.mergedChunksCount += o.mergedChunksCount
s.mergeDuration += o.mergeDuration
s.MergeDuration += o.MergeDuration

return &s
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/store/bucket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1387,7 +1387,7 @@ func TestBucketSeries_OneBlock_InMemIndexCacheSegfault(t *testing.T) {
testutil.Ok(t, err)
defer func() { testutil.Ok(t, bkt.Close()) }()

logger := log.NewNopLogger()
logger := log.NewLogfmtLogger(os.Stderr)
thanosMeta := metadata.Thanos{
Labels: labels.Labels{{Name: "ext1", Value: "1"}}.Map(),
Downsample: metadata.ThanosDownsample{Resolution: 0},
Expand Down