Skip to content

Commit

Permalink
Store: export queryStats time.Duration field to log with human-readab…
Browse files Browse the repository at this point in the history
…le format.

Signed-off-by: Jimmie Han <hanjinming@outlook.com>
  • Loading branch information
hanjm committed Nov 20, 2021
1 parent 243526d commit 057cb50
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 29 deletions.
57 changes: 29 additions & 28 deletions pkg/store/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -1105,12 +1105,13 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie
s.metrics.cachedPostingsCompressions.WithLabelValues(labelDecode).Add(float64(stats.cachedPostingsDecompressions))
s.metrics.cachedPostingsCompressionErrors.WithLabelValues(labelEncode).Add(float64(stats.cachedPostingsCompressionErrors))
s.metrics.cachedPostingsCompressionErrors.WithLabelValues(labelDecode).Add(float64(stats.cachedPostingsDecompressionErrors))
s.metrics.cachedPostingsCompressionTimeSeconds.WithLabelValues(labelEncode).Add(stats.cachedPostingsCompressionTimeSum.Seconds())
s.metrics.cachedPostingsCompressionTimeSeconds.WithLabelValues(labelDecode).Add(stats.cachedPostingsDecompressionTimeSum.Seconds())
s.metrics.cachedPostingsCompressionTimeSeconds.WithLabelValues(labelEncode).Add(stats.CachedPostingsCompressionTimeSum.Seconds())
s.metrics.cachedPostingsCompressionTimeSeconds.WithLabelValues(labelDecode).Add(stats.CachedPostingsDecompressionTimeSum.Seconds())
s.metrics.cachedPostingsOriginalSizeBytes.Add(float64(stats.cachedPostingsOriginalSizeSum))
s.metrics.cachedPostingsCompressedSizeBytes.Add(float64(stats.cachedPostingsCompressedSizeSum))

level.Debug(s.logger).Log("msg", "stats query processed",
"request", req,
"stats", fmt.Sprintf("%+v", stats), "err", err)
}()

Expand All @@ -1128,8 +1129,8 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie
return status.Error(code, err.Error())
}
stats.blocksQueried = len(res)
stats.getAllDuration = time.Since(begin)
s.metrics.seriesGetAllDuration.Observe(stats.getAllDuration.Seconds())
stats.GetAllDuration = time.Since(begin)
s.metrics.seriesGetAllDuration.Observe(stats.GetAllDuration.Seconds())
s.metrics.seriesBlocksQueried.Observe(float64(stats.blocksQueried))
}
// Merge the sub-results from each selected block.
Expand Down Expand Up @@ -1163,8 +1164,8 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie
err = status.Error(codes.Unknown, errors.Wrap(set.Err(), "expand series set").Error())
return
}
stats.mergeDuration = time.Since(begin)
s.metrics.seriesMergeDuration.Observe(stats.mergeDuration.Seconds())
stats.MergeDuration = time.Since(begin)
s.metrics.seriesMergeDuration.Observe(stats.MergeDuration.Seconds())

err = nil
})
Expand Down Expand Up @@ -2000,7 +2001,7 @@ func (r *bucketIndexReader) fetchPostings(ctx context.Context, keys []labels.Lab
s := time.Now()
l, err = diffVarintSnappyDecode(b)
r.stats.cachedPostingsDecompressions += 1
r.stats.cachedPostingsDecompressionTimeSum += time.Since(s)
r.stats.CachedPostingsDecompressionTimeSum += time.Since(s)
if err != nil {
r.stats.cachedPostingsDecompressionErrors += 1
}
Expand Down Expand Up @@ -2063,7 +2064,7 @@ func (r *bucketIndexReader) fetchPostings(ctx context.Context, keys []labels.Lab
r.mtx.Lock()
r.stats.postingsFetchCount++
r.stats.postingsFetched += j - i
r.stats.postingsFetchDurationSum += fetchTime
r.stats.PostingsFetchDurationSum += fetchTime
r.stats.postingsFetchedSizeSum += int(length)
r.mtx.Unlock()

Expand Down Expand Up @@ -2109,7 +2110,7 @@ func (r *bucketIndexReader) fetchPostings(ctx context.Context, keys []labels.Lab
r.stats.cachedPostingsCompressionErrors += compressionErrors
r.stats.cachedPostingsOriginalSizeSum += len(pBytes)
r.stats.cachedPostingsCompressedSizeSum += compressedSize
r.stats.cachedPostingsCompressionTimeSum += compressionTime
r.stats.CachedPostingsCompressionTimeSum += compressionTime
r.mtx.Unlock()
}
return nil
Expand Down Expand Up @@ -2225,7 +2226,7 @@ func (r *bucketIndexReader) loadSeries(ctx context.Context, ids []uint64, refetc
r.mtx.Lock()
r.stats.seriesFetchCount++
r.stats.seriesFetched += len(ids)
r.stats.seriesFetchDurationSum += time.Since(begin)
r.stats.SeriesFetchDurationSum += time.Since(begin)
r.stats.seriesFetchedSizeSum += int(end - start)
r.mtx.Unlock()

Expand Down Expand Up @@ -2513,7 +2514,7 @@ func (r *bucketChunkReader) loadChunks(ctx context.Context, res []seriesEntry, a

r.stats.chunksFetchCount++
r.stats.chunksFetched += len(pIdxs)
r.stats.chunksFetchDurationSum += time.Since(fetchBegin)
r.stats.ChunksFetchDurationSum += time.Since(fetchBegin)
r.stats.chunksFetchedSizeSum += int(part.End - part.Start)

var (
Expand Down Expand Up @@ -2599,7 +2600,7 @@ func (r *bucketChunkReader) loadChunks(ctx context.Context, res []seriesEntry, a
locked = true

r.stats.chunksFetchCount++
r.stats.chunksFetchDurationSum += time.Since(fetchBegin)
r.stats.ChunksFetchDurationSum += time.Since(fetchBegin)
r.stats.chunksFetchedSizeSum += len(*nb)
err = populateChunk(&(res[pIdx.seriesEntry].chks[pIdx.chunk]), rawChunk((*nb)[n:]), aggrs, r.save)
if err != nil {
Expand Down Expand Up @@ -2666,35 +2667,35 @@ type queryStats struct {
postingsFetched int
postingsFetchedSizeSum int
postingsFetchCount int
postingsFetchDurationSum time.Duration
PostingsFetchDurationSum time.Duration

cachedPostingsCompressions int
cachedPostingsCompressionErrors int
cachedPostingsOriginalSizeSum int
cachedPostingsCompressedSizeSum int
cachedPostingsCompressionTimeSum time.Duration
cachedPostingsDecompressions int
cachedPostingsCompressedSizeSum int
CachedPostingsCompressionTimeSum time.Duration
cachedPostingsDecompressions int
cachedPostingsDecompressionErrors int
cachedPostingsDecompressionTimeSum time.Duration
CachedPostingsDecompressionTimeSum time.Duration

seriesTouched int
seriesTouchedSizeSum int
seriesFetched int
seriesFetchedSizeSum int
seriesFetchCount int
seriesFetchDurationSum time.Duration
SeriesFetchDurationSum time.Duration

chunksTouched int
chunksTouchedSizeSum int
chunksFetched int
chunksFetchedSizeSum int
chunksFetchCount int
chunksFetchDurationSum time.Duration
ChunksFetchDurationSum time.Duration

getAllDuration time.Duration
GetAllDuration time.Duration
mergedSeriesCount int
mergedChunksCount int
mergeDuration time.Duration
MergeDuration time.Duration
}

func (s queryStats) merge(o *queryStats) *queryStats {
Expand All @@ -2705,35 +2706,35 @@ func (s queryStats) merge(o *queryStats) *queryStats {
s.postingsFetched += o.postingsFetched
s.postingsFetchedSizeSum += o.postingsFetchedSizeSum
s.postingsFetchCount += o.postingsFetchCount
s.postingsFetchDurationSum += o.postingsFetchDurationSum
s.PostingsFetchDurationSum += o.PostingsFetchDurationSum

s.cachedPostingsCompressions += o.cachedPostingsCompressions
s.cachedPostingsCompressionErrors += o.cachedPostingsCompressionErrors
s.cachedPostingsOriginalSizeSum += o.cachedPostingsOriginalSizeSum
s.cachedPostingsCompressedSizeSum += o.cachedPostingsCompressedSizeSum
s.cachedPostingsCompressionTimeSum += o.cachedPostingsCompressionTimeSum
s.CachedPostingsCompressionTimeSum += o.CachedPostingsCompressionTimeSum
s.cachedPostingsDecompressions += o.cachedPostingsDecompressions
s.cachedPostingsDecompressionErrors += o.cachedPostingsDecompressionErrors
s.cachedPostingsDecompressionTimeSum += o.cachedPostingsDecompressionTimeSum
s.CachedPostingsDecompressionTimeSum += o.CachedPostingsDecompressionTimeSum

s.seriesTouched += o.seriesTouched
s.seriesTouchedSizeSum += o.seriesTouchedSizeSum
s.seriesFetched += o.seriesFetched
s.seriesFetchedSizeSum += o.seriesFetchedSizeSum
s.seriesFetchCount += o.seriesFetchCount
s.seriesFetchDurationSum += o.seriesFetchDurationSum
s.SeriesFetchDurationSum += o.SeriesFetchDurationSum

s.chunksTouched += o.chunksTouched
s.chunksTouchedSizeSum += o.chunksTouchedSizeSum
s.chunksFetched += o.chunksFetched
s.chunksFetchedSizeSum += o.chunksFetchedSizeSum
s.chunksFetchCount += o.chunksFetchCount
s.chunksFetchDurationSum += o.chunksFetchDurationSum
s.ChunksFetchDurationSum += o.ChunksFetchDurationSum

s.getAllDuration += o.getAllDuration
s.GetAllDuration += o.GetAllDuration
s.mergedSeriesCount += o.mergedSeriesCount
s.mergedChunksCount += o.mergedChunksCount
s.mergeDuration += o.mergeDuration
s.MergeDuration += o.MergeDuration

return &s
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/store/bucket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1387,7 +1387,7 @@ func TestBucketSeries_OneBlock_InMemIndexCacheSegfault(t *testing.T) {
testutil.Ok(t, err)
defer func() { testutil.Ok(t, bkt.Close()) }()

logger := log.NewNopLogger()
logger := log.NewLogfmtLogger(os.Stderr)
thanosMeta := metadata.Thanos{
Labels: labels.Labels{{Name: "ext1", Value: "1"}}.Map(),
Downsample: metadata.ThanosDownsample{Resolution: 0},
Expand Down

0 comments on commit 057cb50

Please sign in to comment.