Skip to content

Commit

Permalink
Support native histograms in pkg/ruler
Browse files Browse the repository at this point in the history
Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com>
  • Loading branch information
codesome committed Mar 6, 2023
1 parent 5ad1837 commit b85f452
Show file tree
Hide file tree
Showing 2 changed files with 154 additions and 34 deletions.
28 changes: 20 additions & 8 deletions pkg/ruler/compat.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,13 @@ type PusherAppender struct {
failedWrites prometheus.Counter
totalWrites prometheus.Counter

ctx context.Context
pusher Pusher
labels []labels.Labels
samples []mimirpb.Sample
userID string
ctx context.Context
pusher Pusher
labels []labels.Labels
samples []mimirpb.Sample
histogramLabels []labels.Labels
histograms []mimirpb.Histogram
userID string
}

func (a *PusherAppender) Append(_ storage.SeriesRef, l labels.Labels, t int64, v float64) (storage.SeriesRef, error) {
Expand All @@ -65,16 +67,26 @@ func (a *PusherAppender) UpdateMetadata(_ storage.SeriesRef, _ labels.Labels, _
return 0, errors.New("metadata updates are unsupported")
}

func (a *PusherAppender) AppendHistogram(_ storage.SeriesRef, _ labels.Labels, _ int64, _ *histogram.Histogram, _ *histogram.FloatHistogram) (storage.SeriesRef, error) {
return 0, errors.New("histograms are unsupported")
func (a *PusherAppender) AppendHistogram(_ storage.SeriesRef, l labels.Labels, t int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (storage.SeriesRef, error) {
a.histogramLabels = append(a.histogramLabels, l)
var hp mimirpb.Histogram
if h != nil {
hp = mimirpb.FromHistogramToHistogramProto(t, h)
} else {
hp = mimirpb.FromFloatHistogramToHistogramProto(t, fh)
}
a.histograms = append(a.histograms, hp)
return 0, nil
}

func (a *PusherAppender) Commit() error {
a.totalWrites.Inc()

// Since a.pusher is distributor, client.ReuseSlice will be called in a.pusher.Push.
// We shouldn't call client.ReuseSlice here.
_, err := a.pusher.Push(user.InjectOrgID(a.ctx, a.userID), mimirpb.ToWriteRequest(a.labels, a.samples, nil, nil, mimirpb.RULE))
req := mimirpb.ToWriteRequest(a.labels, a.samples, nil, nil, mimirpb.RULE)
req.AddHistogramSeries(a.histogramLabels, a.histograms, nil)
_, err := a.pusher.Push(user.InjectOrgID(a.ctx, a.userID), req)

if err != nil {
// Don't report errors that ended with 4xx HTTP status code (series limits, duplicate samples, out of order, etc.)
Expand Down
160 changes: 134 additions & 26 deletions pkg/ruler/compat_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import (
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/client_golang/prometheus/testutil"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/histogram"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/rulefmt"
"github.com/prometheus/prometheus/model/value"
"github.com/prometheus/prometheus/notifier"
Expand All @@ -30,6 +32,7 @@ import (

"github.com/grafana/mimir/pkg/mimirpb"
"github.com/grafana/mimir/pkg/ruler/rulespb"
"github.com/grafana/mimir/pkg/util/test"
"github.com/grafana/mimir/pkg/util/validation"
)

Expand All @@ -48,52 +51,157 @@ func TestPusherAppendable(t *testing.T) {
pusher := &fakePusher{}
pa := NewPusherAppendable(pusher, "user-1", nil, promauto.With(nil).NewCounter(prometheus.CounterOpts{}), promauto.With(nil).NewCounter(prometheus.CounterOpts{}))

type sample struct {
series string
value float64
histogram *histogram.Histogram
floatHistogram *histogram.FloatHistogram
ts int64
}

for _, tc := range []struct {
name string
series string
value float64
expectedTS int64
name string
hasNanSample bool // If true, it will be a single float sample with NaN.
samples []sample
}{
{
name: "tenant without delay, normal value",
series: "foo_bar",
value: 1.234,
expectedTS: 120_000,
name: "tenant without delay, normal value",
samples: []sample{
{
series: "foo_bar",
value: 1.234,
ts: 120_000,
},
},
},
{
name: "tenant without delay, stale nan value",
series: "foo_bar",
value: math.Float64frombits(value.StaleNaN),
expectedTS: 120_000,
name: "tenant without delay, stale nan value",
hasNanSample: true,
samples: []sample{
{
series: "foo_bar",
value: math.Float64frombits(value.StaleNaN),
ts: 120_000,
},
},
},
{
name: "ALERTS, normal value",
series: `ALERTS{alertname="boop"}`,
value: 1.234,
expectedTS: 120_000,
name: "ALERTS, normal value",
samples: []sample{
{
series: `ALERTS{alertname="boop"}`,
value: 1.234,
ts: 120_000,
},
},
},
{
name: "ALERTS, stale nan value",
series: `ALERTS{alertname="boop"}`,
value: math.Float64frombits(value.StaleNaN),
expectedTS: 120_000,
name: "ALERTS, stale nan value",
hasNanSample: true,
samples: []sample{
{
series: `ALERTS{alertname="boop"}`,
value: math.Float64frombits(value.StaleNaN),
ts: 120_000,
},
},
},
{
name: "tenant without delay, histogram value",
samples: []sample{
{
series: "foo_bar",
histogram: test.GenerateTestHistogram(10),
ts: 200_000,
},
},
},
{
name: "tenant without delay, float histogram value",
samples: []sample{
{
series: "foo_bar",
floatHistogram: test.GenerateTestFloatHistogram(10),
ts: 230_000,
},
},
},
{
name: "mix of float and float histogram",
samples: []sample{
{
series: "foo_bar1",
value: 999,
ts: 230_000,
},
{
series: "foo_bar3",
value: 888,
ts: 230_000,
},
{
series: "foo_bar2",
floatHistogram: test.GenerateTestFloatHistogram(10),
ts: 230_000,
},
{
series: "foo_bar4",
floatHistogram: test.GenerateTestFloatHistogram(99),
ts: 230_000,
},
},
},
} {
t.Run(tc.name, func(t *testing.T) {
ctx := context.Background()

lbls, err := parser.ParseMetric(tc.series)
require.NoError(t, err)
var expReq []mimirpb.PreallocTimeseries

pusher.response = &mimirpb.WriteResponse{}
a := pa.Appender(ctx)
_, err = a.Append(0, lbls, 120_000, tc.value)
require.NoError(t, err)

for _, sm := range tc.samples {
lbls, err := parser.ParseMetric(sm.series)
require.NoError(t, err)
timeseries := mimirpb.PreallocTimeseries{
TimeSeries: &mimirpb.TimeSeries{
Labels: mimirpb.FromLabelsToLabelAdapters(lbls),
Exemplars: []mimirpb.Exemplar{},
Samples: []mimirpb.Sample{},
},
}
expReq = append(expReq, timeseries)

if sm.histogram != nil || sm.floatHistogram != nil {
_, err = a.AppendHistogram(0, lbls, sm.ts, sm.histogram, sm.floatHistogram)
if sm.histogram != nil {
timeseries.Histograms = append(timeseries.Histograms, mimirpb.FromHistogramToHistogramProto(sm.ts, sm.histogram))
} else {
timeseries.Histograms = append(timeseries.Histograms, mimirpb.FromFloatHistogramToHistogramProto(sm.ts, sm.floatHistogram))
}
} else {
_, err = a.Append(0, lbls, sm.ts, sm.value)
timeseries.Samples = append(timeseries.Samples, mimirpb.Sample{
TimestampMs: sm.ts,
Value: sm.value,
})
}
require.NoError(t, err)
}
require.NoError(t, a.Commit())

require.Equal(t, tc.expectedTS, pusher.request.Timeseries[0].Samples[0].TimestampMs)
if !tc.hasNanSample {
require.Equal(t, expReq, pusher.request.Timeseries)
return
}

// For NaN, we cannot use require.Equal.
require.Len(t, pusher.request.Timeseries, 1)
require.Len(t, pusher.request.Timeseries[0].Samples, 1)
lbls, err := parser.ParseMetric(tc.samples[0].series)
require.NoError(t, err)
require.Equal(t, 0, labels.Compare(mimirpb.FromLabelAdaptersToLabels(pusher.request.Timeseries[0].Labels), lbls))
require.Equal(t, tc.samples[0].ts, pusher.request.Timeseries[0].Samples[0].TimestampMs)
require.True(t, math.IsNaN(pusher.request.Timeseries[0].Samples[0].Value))
})
}
}
Expand Down

0 comments on commit b85f452

Please sign in to comment.