Skip to content

Commit

Permalink
query frontend, query UI: Native histogram support (thanos-io#6071)
Browse files Browse the repository at this point in the history
* Implemented native histogram support for qfe and query UI

Signed-off-by: Sebastian Rabenhorst <sebastian.rabenhorst@shopify.com>

Fixed marshalling for histograms in qfe

Started working on native histogram query ui

Copied histogram implementation for graph

Added query range support for native histograms in qfe

Use prom model (un-)marshal for native histograms in qfe

Use prom model (un-)marshal for native histograms in qfe

Fixed sample and sample stream marshal fn

Extended qfe native histogram e2e tests

Added copyright to qfe queryrange compat

Added query range test fo histograms and try to fix ui tests

Fixed DataTable test

Review feedback

Fixed native histogram e2e test

Signed-off-by: Sebastian Rabenhorst <sebastian.rabenhorst@shopify.com>

Add histogram support for ApplyCounterResetsSeriesIterator

Signed-off-by: Sebastian Rabenhorst <sebastian.rabenhorst@shopify.com>

Made assets

Signed-off-by: Sebastian Rabenhorst <sebastian.rabenhorst@shopify.com>

Add chnagelog

Signed-off-by: Sebastian Rabenhorst <sebastian.rabenhorst@shopify.com>

Fixed changelog

Signed-off-by: Sebastian Rabenhorst <sebastian.rabenhorst@shopify.com>

Fixed qfe

Signed-off-by: Sebastian Rabenhorst <sebastian.rabenhorst@shopify.com>

Fixed PrometheusResponse minTime for histograms in qfe

Signed-off-by: Sebastian Rabenhorst <sebastian.rabenhorst@shopify.com>

Updated prometheus common to v0.40.0 and queryrange.Sample fixes

Signed-off-by: Sebastian Rabenhorst <sebastian.rabenhorst@shopify.com>

Updated Readme

Signed-off-by: Sebastian Rabenhorst <sebastian.rabenhorst@shopify.com>

Addressed PR comments

Signed-off-by: Sebastian Rabenhorst <sebastian.rabenhorst@shopify.com>

trigger tests

Signed-off-by: Sebastian Rabenhorst <sebastian.rabenhorst@shopify.com>

Made assets

Signed-off-by: Sebastian Rabenhorst <sebastian.rabenhorst@shopify.com>

* Made assets

Signed-off-by: Sebastian Rabenhorst <sebastian.rabenhorst@shopify.com>

* fixed tsdbutil references

Signed-off-by: Sebastian Rabenhorst <sebastian.rabenhorst@shopify.com>

* fixed imports

Signed-off-by: Sebastian Rabenhorst <sebastian.rabenhorst@shopify.com>

* Enabled pushdown for query native hist test and removed ToDo

Signed-off-by: Sebastian Rabenhorst <sebastian.rabenhorst@shopify.com>

* Refactored native histogram query UI

Signed-off-by: Sebastian Rabenhorst <sebastian.rabenhorst@shopify.com>

---------

Signed-off-by: Sebastian Rabenhorst <sebastian.rabenhorst@shopify.com>
  • Loading branch information
rabenhorst authored and HC Zhu committed Jun 27, 2023
1 parent 504062f commit 6165b81
Show file tree
Hide file tree
Showing 21 changed files with 2,073 additions and 337 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
- [#6228](https://github.com/thanos-io/thanos/pull/6228) Conditionally generate debug messages in ProxyStore to avoid memory bloat.
- [#6231](https://github.com/thanos-io/thanos/pull/6231) mixins: Add code/grpc-code dimension to error widgets.
- [#6244](https://github.com/thanos-io/thanos/pull/6244) mixin(Rule): Add rule evaluation failures to the Rule dashboard.
- [#6071](https://github.com/thanos-io/thanos/pull/6071) Query Frontend: *breaking :warning:* Add experimental native histogram support for which we updated and aligned with the [Prometheus common](https://github.com/prometheus/common) model, which is used for caching so a cache reset required.

### Removed

Expand Down
64 changes: 64 additions & 0 deletions internal/cortex/querier/queryrange/compat.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
// Copyright (c) The Cortex Authors.
// Licensed under the Apache License 2.0.

package queryrange

import (
"github.com/prometheus/common/model"
)

// The following code will allow us to use JSON marshal and unmarshal functions from the Prometheus common package in
// query_range.go: https://github.com/prometheus/common/blob/846591a166358c7048ef197e84501ca688dda920/model/value.go
// Please see the link above for more details on Sample, SampleStream, HistogramPair and SampleHistogramPair.

func toModelSampleHistogramPair(s SampleHistogramPair) model.SampleHistogramPair {
return model.SampleHistogramPair{
Timestamp: model.Time(s.Timestamp),
Histogram: toModelSampleHistogram(s.Histogram),
}
}

func fromModelSampleHistogramPair(modelSampleHistogram model.SampleHistogramPair) (s SampleHistogramPair) {
return SampleHistogramPair{
Timestamp: int64(modelSampleHistogram.Timestamp),
Histogram: fromModelSampleHistogram(modelSampleHistogram.Histogram),
}
}

func fromModelSampleHistogram(modelSampleHistogram *model.SampleHistogram) (s SampleHistogram) {
buckets := make([]*HistogramBucket, len(modelSampleHistogram.Buckets))

for i, b := range modelSampleHistogram.Buckets {
buckets[i] = &HistogramBucket{
Lower: float64(b.Lower),
Upper: float64(b.Upper),
Count: float64(b.Count),
Boundaries: b.Boundaries,
}
}

return SampleHistogram{
Count: float64(modelSampleHistogram.Count),
Sum: float64(modelSampleHistogram.Sum),
Buckets: buckets,
}
}

func toModelSampleHistogram(s SampleHistogram) *model.SampleHistogram {
modelBuckets := make([]*model.HistogramBucket, len(s.Buckets))

for i, b := range s.Buckets {
modelBuckets[i] = &model.HistogramBucket{
Lower: model.FloatString(b.Lower),
Upper: model.FloatString(b.Upper),
Count: model.FloatString(b.Count),
Boundaries: b.Boundaries,
}
}

return &model.SampleHistogram{
Count: model.FloatString(s.Count),
Sum: model.FloatString(s.Sum),
Buckets: modelBuckets,
}
}
139 changes: 110 additions & 29 deletions internal/cortex/querier/queryrange/query_range.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,10 +154,26 @@ func (resp *PrometheusResponse) minTime() int64 {
if len(result) == 0 {
return -1
}
if len(result[0].Samples) == 0 {
if len(result[0].Samples) == 0 && len(result[0].Histograms) == 0 {
return -1
}
return result[0].Samples[0].TimestampMs

if len(result[0].Samples) == 0 {
return result[0].Histograms[0].Timestamp
}

if len(result[0].Histograms) == 0 {
return result[0].Samples[0].TimestampMs
}

return minInt64(result[0].Samples[0].TimestampMs, result[0].Histograms[0].Timestamp)
}

func minInt64(a, b int64) int64 {
if a < b {
return a
}
return b
}

func (resp *PrometheusResponse) GetStats() *PrometheusResponseStats {
Expand Down Expand Up @@ -397,54 +413,86 @@ func (prometheusCodec) EncodeResponse(ctx context.Context, res Response) (*http.
return &resp, nil
}

// UnmarshalJSON implements json.Unmarshaler.
// UnmarshalJSON implements json.Unmarshaler and is used for unmarshalling
// a Prometheus range query response (matrix).
func (s *SampleStream) UnmarshalJSON(data []byte) error {
var stream struct {
Metric model.Metric `json:"metric"`
Values []cortexpb.Sample `json:"values"`
}
if err := json.Unmarshal(data, &stream); err != nil {
var sampleStream model.SampleStream
if err := json.Unmarshal(data, &sampleStream); err != nil {
return err
}
s.Labels = cortexpb.FromMetricsToLabelAdapters(stream.Metric)
s.Samples = stream.Values

s.Labels = cortexpb.FromMetricsToLabelAdapters(sampleStream.Metric)

if len(sampleStream.Values) > 0 {
s.Samples = make([]cortexpb.Sample, 0, len(sampleStream.Values))
for _, sample := range sampleStream.Values {
s.Samples = append(s.Samples, cortexpb.Sample{
Value: float64(sample.Value),
TimestampMs: int64(sample.Timestamp),
})
}
}

if len(sampleStream.Histograms) > 0 {
s.Histograms = make([]SampleHistogramPair, 0, len(sampleStream.Histograms))
for _, h := range sampleStream.Histograms {
s.Histograms = append(s.Histograms, fromModelSampleHistogramPair(h))
}
}

return nil
}

// MarshalJSON implements json.Marshaler.
func (s *SampleStream) MarshalJSON() ([]byte, error) {
stream := struct {
Metric model.Metric `json:"metric"`
Values []cortexpb.Sample `json:"values"`
}{
Metric: cortexpb.FromLabelAdaptersToMetric(s.Labels),
Values: s.Samples,
var sampleStream model.SampleStream
sampleStream.Metric = cortexpb.FromLabelAdaptersToMetric(s.Labels)

sampleStream.Values = make([]model.SamplePair, 0, len(s.Samples))
for _, sample := range s.Samples {
sampleStream.Values = append(sampleStream.Values, model.SamplePair{
Value: model.SampleValue(sample.Value),
Timestamp: model.Time(sample.TimestampMs),
})
}

sampleStream.Histograms = make([]model.SampleHistogramPair, 0, len(s.Histograms))
for _, h := range s.Histograms {
sampleStream.Histograms = append(sampleStream.Histograms, toModelSampleHistogramPair(h))
}
return json.Marshal(stream)

return json.Marshal(sampleStream)
}

// UnmarshalJSON implements json.Unmarshaler.
// UnmarshalJSON implements json.Unmarshaler and is used for unmarshalling
// a Prometheus instant query response (vector).
func (s *Sample) UnmarshalJSON(data []byte) error {
var sample struct {
Metric model.Metric `json:"metric"`
Value cortexpb.Sample `json:"value"`
}
var sample model.Sample
if err := json.Unmarshal(data, &sample); err != nil {
return err
}
s.Labels = cortexpb.FromMetricsToLabelAdapters(sample.Metric)
s.Sample = sample.Value
s.SampleValue = float64(sample.Value)
s.Timestamp = int64(sample.Timestamp)

if sample.Histogram != nil {
sh := fromModelSampleHistogram(sample.Histogram)
s.Histogram = &sh
} else {
s.Histogram = nil
}

return nil
}

// MarshalJSON implements json.Marshaler.
func (s *Sample) MarshalJSON() ([]byte, error) {
sample := struct {
Metric model.Metric `json:"metric"`
Value cortexpb.Sample `json:"value"`
}{
Metric: cortexpb.FromLabelAdaptersToMetric(s.Labels),
Value: s.Sample,
var sample model.Sample
sample.Metric = cortexpb.FromLabelAdaptersToMetric(s.Labels)
sample.Value = model.SampleValue(s.SampleValue)
sample.Timestamp = model.Time(s.Timestamp)
if s.Histogram != nil {
sample.Histogram = toModelSampleHistogram(*s.Histogram)
}
return json.Marshal(sample)
}
Expand Down Expand Up @@ -657,7 +705,20 @@ func matrixMerge(resps []*PrometheusResponse) []SampleStream {
stream.Samples = SliceSamples(stream.Samples, existingEndTs)
} // else there is no overlap, yay!
}
// Same for histograms as for samples above.
if len(existing.Histograms) > 0 && len(stream.Histograms) > 0 {
existingEndTs := existing.Histograms[len(existing.Histograms)-1].GetTimestamp()
if existingEndTs == stream.Histograms[0].GetTimestamp() {
stream.Histograms = stream.Histograms[1:]
} else if existingEndTs > stream.Histograms[0].GetTimestamp() {
stream.Histograms = SliceHistogram(stream.Histograms, existingEndTs)
}
}

existing.Samples = append(existing.Samples, stream.Samples...)

existing.Histograms = append(existing.Histograms, stream.Histograms...)

output[metric] = existing
}
}
Expand Down Expand Up @@ -696,6 +757,26 @@ func SliceSamples(samples []cortexpb.Sample, minTs int64) []cortexpb.Sample {
return samples[searchResult:]
}

// SliceHistogram assumes given histogram are sorted by timestamp in ascending order and
// return a sub slice whose first element's is the smallest timestamp that is strictly
// bigger than the given minTs. Empty slice is returned if minTs is bigger than all the
// timestamps in histogram.
func SliceHistogram(histograms []SampleHistogramPair, minTs int64) []SampleHistogramPair {
if len(histograms) <= 0 || minTs < histograms[0].GetTimestamp() {
return histograms
}

if len(histograms) > 0 && minTs > histograms[len(histograms)-1].GetTimestamp() {
return histograms[len(histograms):]
}

searchResult := sort.Search(len(histograms), func(i int) bool {
return histograms[i].GetTimestamp() > minTs
})

return histograms[searchResult:]
}

func parseDurationMs(s string) (int64, error) {
if d, err := strconv.ParseFloat(s, 64); err == nil {
ts := d * float64(time.Second/time.Millisecond)
Expand Down
14 changes: 11 additions & 3 deletions internal/cortex/querier/queryrange/query_range_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,15 +86,17 @@ func TestRequest(t *testing.T) {
}

func TestResponse(t *testing.T) {
r := *parsedResponse
r.Headers = respHeaders
for i, tc := range []struct {
body string
expected *PrometheusResponse
}{
{
body: responseBody,
expected: &r,
expected: withHeaders(parsedResponse, respHeaders),
},
{
body: histogramResponseBody,
expected: withHeaders(parsedHistogramResponse, respHeaders),
},
} {
t.Run(strconv.Itoa(i), func(t *testing.T) {
Expand Down Expand Up @@ -667,3 +669,9 @@ func mustParse(t *testing.T, response string) Response {
require.NoError(t, json.Unmarshal([]byte(response), &resp))
return &resp
}

func withHeaders(response *PrometheusResponse, headers []*PrometheusResponseHeader) *PrometheusResponse {
r := *response
r.Headers = headers
return &r
}
Loading

0 comments on commit 6165b81

Please sign in to comment.