Skip to content

Commit

Permalink
OTLP: Categorize target_info labels
Browse files Browse the repository at this point in the history
Signed-off-by: Arve Knudsen <arve.knudsen@gmail.com>
  • Loading branch information
aknuds1 committed Mar 11, 2024
1 parent eefee5e commit c6d815e
Show file tree
Hide file tree
Showing 21 changed files with 856 additions and 144 deletions.
4 changes: 4 additions & 0 deletions cmd/prometheus/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -1526,6 +1526,10 @@ func (n notReadyAppender) Append(ref storage.SeriesRef, l labels.Labels, t int64
return 0, tsdb.ErrNotReady
}

func (n notReadyAppender) AppendInfoSample(storage.SeriesRef, labels.Labels, int64, []int) (storage.SeriesRef, error) {
return 0, tsdb.ErrNotReady
}

func (n notReadyAppender) AppendExemplar(ref storage.SeriesRef, l labels.Labels, e exemplar.Exemplar) (storage.SeriesRef, error) {
return 0, tsdb.ErrNotReady
}
Expand Down
38 changes: 19 additions & 19 deletions model/labels/labels_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -597,24 +597,24 @@ func TestBuilder(t *testing.T) {
want: FromStrings("aaa", "111", "ccc", "333"),
},
{
set: []Label{{"aaa", "111"}, {"bbb", "222"}, {"ccc", "333"}},
set: []Label{{Name: "aaa", Value: "111"}, {Name: "bbb", Value: "222"}, {Name: "ccc", Value: "333"}},
del: []string{"bbb"},
want: FromStrings("aaa", "111", "ccc", "333"),
},
{
base: FromStrings("aaa", "111"),
set: []Label{{"bbb", "222"}},
set: []Label{{Name: "bbb", Value: "222"}},
want: FromStrings("aaa", "111", "bbb", "222"),
},
{
base: FromStrings("aaa", "111"),
set: []Label{{"bbb", "222"}, {"bbb", "333"}},
set: []Label{{Name: "bbb", Value: "222"}, {Name: "bbb", Value: "333"}},
want: FromStrings("aaa", "111", "bbb", "333"),
},
{
base: FromStrings("aaa", "111", "bbb", "222", "ccc", "333"),
del: []string{"bbb"},
set: []Label{{"ddd", "444"}},
set: []Label{{Name: "ddd", Value: "444"}},
want: FromStrings("aaa", "111", "ccc", "333", "ddd", "444"),
},
{ // Blank value is interpreted as delete.
Expand All @@ -623,7 +623,7 @@ func TestBuilder(t *testing.T) {
},
{
base: FromStrings("aaa", "111", "bbb", "222", "ccc", "333"),
set: []Label{{"bbb", ""}},
set: []Label{{Name: "bbb", Value: ""}},
want: FromStrings("aaa", "111", "ccc", "333"),
},
{
Expand All @@ -639,7 +639,7 @@ func TestBuilder(t *testing.T) {
{
base: FromStrings("aaa", "111", "bbb", "222", "ccc", "333"),
del: []string{"bbb"},
set: []Label{{"ddd", "444"}},
set: []Label{{Name: "ddd", Value: "444"}},
keep: []string{"aaa", "ddd"},
want: FromStrings("aaa", "111", "ddd", "444"),
},
Expand Down Expand Up @@ -694,19 +694,19 @@ func TestScratchBuilder(t *testing.T) {
want: EmptyLabels(),
},
{
add: []Label{{"aaa", "111"}},
add: []Label{{Name: "aaa", Value: "111"}},
want: FromStrings("aaa", "111"),
},
{
add: []Label{{"aaa", "111"}, {"bbb", "222"}, {"ccc", "333"}},
add: []Label{{Name: "aaa", Value: "111"}, {Name: "bbb", Value: "222"}, {Name: "ccc", Value: "333"}},
want: FromStrings("aaa", "111", "bbb", "222", "ccc", "333"),
},
{
add: []Label{{"bbb", "222"}, {"aaa", "111"}, {"ccc", "333"}},
add: []Label{{Name: "bbb", Value: "222"}, {Name: "aaa", Value: "111"}, {Name: "ccc", Value: "333"}},
want: FromStrings("aaa", "111", "bbb", "222", "ccc", "333"),
},
{
add: []Label{{"ddd", "444"}},
add: []Label{{Name: "ddd", Value: "444"}},
want: FromStrings("ddd", "444"),
},
} {
Expand Down Expand Up @@ -787,15 +787,15 @@ func BenchmarkLabels_Hash(b *testing.B) {

func BenchmarkBuilder(b *testing.B) {
m := []Label{
{"job", "node"},
{"instance", "123.123.1.211:9090"},
{"path", "/api/v1/namespaces/<namespace>/deployments/<name>"},
{"method", "GET"},
{"namespace", "system"},
{"status", "500"},
{"prometheus", "prometheus-core-1"},
{"datacenter", "eu-west-1"},
{"pod_name", "abcdef-99999-defee"},
{Name: "job", Value: "node"},
{Name: "instance", Value: "123.123.1.211:9090"},
{Name: "path", Value: "/api/v1/namespaces/<namespace>/deployments/<name>"},
{Name: "method", Value: "GET"},
{Name: "namespace", Value: "system"},
{Name: "status", Value: "500"},
{Name: "prometheus", Value: "prometheus-core-1"},
{Name: "datacenter", Value: "eu-west-1"},
{Name: "pod_name", Value: "abcdef-99999-defee"},
}

var l Labels
Expand Down
314 changes: 213 additions & 101 deletions prompb/types.pb.go

Large diffs are not rendered by default.

10 changes: 6 additions & 4 deletions prompb/types.proto
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ message Sample {
// timestamp is in ms format, see model/timestamp/timestamp.go for
// conversion from time.Time to Prometheus timestamp.
int64 timestamp = 2;
// The set of identifying labels for info metrics, as array indices.
repeated int32 identifyingLabels = 3;
}

message Exemplar {
Expand Down Expand Up @@ -123,10 +125,10 @@ message BucketSpan {
message TimeSeries {
// For a timeseries to be valid, and for the samples and exemplars
// to be ingested by the remote system properly, the labels field is required.
repeated Label labels = 1 [(gogoproto.nullable) = false];
repeated Sample samples = 2 [(gogoproto.nullable) = false];
repeated Exemplar exemplars = 3 [(gogoproto.nullable) = false];
repeated Histogram histograms = 4 [(gogoproto.nullable) = false];
repeated Label labels = 1 [(gogoproto.nullable) = false];
repeated Sample samples = 2 [(gogoproto.nullable) = false];
repeated Exemplar exemplars = 3 [(gogoproto.nullable) = false];
repeated Histogram histograms = 4 [(gogoproto.nullable) = false];
}

message Label {
Expand Down
62 changes: 51 additions & 11 deletions scrape/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@ func (a nopAppender) Append(storage.SeriesRef, labels.Labels, int64, float64) (s
return 0, nil
}

func (a nopAppender) AppendInfoSample(storage.SeriesRef, labels.Labels, int64, []int) (storage.SeriesRef, error) {
return 0, nil
}

func (a nopAppender) AppendExemplar(storage.SeriesRef, labels.Labels, exemplar.Exemplar) (storage.SeriesRef, error) {
return 0, nil
}
Expand Down Expand Up @@ -77,6 +81,12 @@ func equalFloatSamples(a, b floatSample) bool {
return labels.Equal(a.metric, b.metric) && a.t == b.t && math.Float64bits(a.f) == math.Float64bits(b.f)
}

type infoSample struct {
metric labels.Labels
t int64
identifyingLabels []int
}

type histogramSample struct {
t int64
h *histogram.Histogram
Expand All @@ -96,17 +106,20 @@ func (a *collectResultAppendable) Appender(_ context.Context) storage.Appender {
type collectResultAppender struct {
mtx sync.Mutex

next storage.Appender
resultFloats []floatSample
pendingFloats []floatSample
rolledbackFloats []floatSample
resultHistograms []histogramSample
pendingHistograms []histogramSample
rolledbackHistograms []histogramSample
resultExemplars []exemplar.Exemplar
pendingExemplars []exemplar.Exemplar
resultMetadata []metadata.Metadata
pendingMetadata []metadata.Metadata
next storage.Appender
resultFloats []floatSample
pendingFloats []floatSample
rolledbackFloats []floatSample
resultInfoSamples []infoSample
pendingInfoSamples []infoSample
rolledbackInfoSamples []infoSample
resultHistograms []histogramSample
pendingHistograms []histogramSample
rolledbackHistograms []histogramSample
resultExemplars []exemplar.Exemplar
pendingExemplars []exemplar.Exemplar
resultMetadata []metadata.Metadata
pendingMetadata []metadata.Metadata
}

func (a *collectResultAppender) Append(ref storage.SeriesRef, lset labels.Labels, t int64, v float64) (storage.SeriesRef, error) {
Expand All @@ -132,6 +145,29 @@ func (a *collectResultAppender) Append(ref storage.SeriesRef, lset labels.Labels
return ref, err
}

func (a *collectResultAppender) AppendInfoSample(ref storage.SeriesRef, lset labels.Labels, t int64, identifyingLabels []int) (storage.SeriesRef, error) {
a.mtx.Lock()
defer a.mtx.Unlock()
a.pendingInfoSamples = append(a.pendingInfoSamples, infoSample{
metric: lset,
t: t,
identifyingLabels: identifyingLabels,
})

if ref == 0 {
ref = storage.SeriesRef(rand.Uint64())
}
if a.next == nil {
return ref, nil
}

ref, err := a.next.AppendInfoSample(ref, lset, t, identifyingLabels)
if err != nil {
return 0, err
}
return ref, err
}

func (a *collectResultAppender) AppendExemplar(ref storage.SeriesRef, l labels.Labels, e exemplar.Exemplar) (storage.SeriesRef, error) {
a.mtx.Lock()
defer a.mtx.Unlock()
Expand Down Expand Up @@ -176,10 +212,12 @@ func (a *collectResultAppender) Commit() error {
a.mtx.Lock()
defer a.mtx.Unlock()
a.resultFloats = append(a.resultFloats, a.pendingFloats...)
a.resultInfoSamples = append(a.resultInfoSamples, a.pendingInfoSamples...)
a.resultExemplars = append(a.resultExemplars, a.pendingExemplars...)
a.resultHistograms = append(a.resultHistograms, a.pendingHistograms...)
a.resultMetadata = append(a.resultMetadata, a.pendingMetadata...)
a.pendingFloats = nil
a.pendingInfoSamples = nil
a.pendingExemplars = nil
a.pendingHistograms = nil
a.pendingMetadata = nil
Expand All @@ -193,8 +231,10 @@ func (a *collectResultAppender) Rollback() error {
a.mtx.Lock()
defer a.mtx.Unlock()
a.rolledbackFloats = a.pendingFloats
a.rolledbackInfoSamples = a.pendingInfoSamples
a.rolledbackHistograms = a.pendingHistograms
a.pendingFloats = nil
a.pendingInfoSamples = nil
a.pendingHistograms = nil
if a.next == nil {
return nil
Expand Down
14 changes: 14 additions & 0 deletions storage/fanout.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,20 @@ func (f *fanoutAppender) Append(ref SeriesRef, l labels.Labels, t int64, v float
return ref, nil
}

func (f *fanoutAppender) AppendInfoSample(ref SeriesRef, l labels.Labels, t int64, identifyingLabels []int) (SeriesRef, error) {
ref, err := f.primary.AppendInfoSample(ref, l, t, identifyingLabels)
if err != nil {
return ref, err
}

for _, appender := range f.secondaries {
if _, err := appender.AppendInfoSample(ref, l, t, identifyingLabels); err != nil {
return 0, err
}
}
return ref, nil
}

func (f *fanoutAppender) AppendExemplar(ref SeriesRef, l labels.Labels, e exemplar.Exemplar) (SeriesRef, error) {
ref, err := f.primary.AppendExemplar(ref, l, e)
if err != nil {
Expand Down
3 changes: 3 additions & 0 deletions storage/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,9 @@ type Appender interface {
// If the reference is 0 it must not be used for caching.
Append(ref SeriesRef, l labels.Labels, t int64, v float64) (SeriesRef, error)

// AppendInfoSample adds an info metric sample for the given series.
AppendInfoSample(ref SeriesRef, l labels.Labels, t int64, identifyingLabels []int) (SeriesRef, error)

// Commit submits the collected samples and purges the batch. If Commit
// returns a non-nil error, it also rolls back all modifications made in
// the appender so far, as Rollback would do. In any case, an Appender
Expand Down
11 changes: 10 additions & 1 deletion storage/remote/otlptranslator/prometheusremotewrite/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,8 @@ func createAttributes(resource pcommon.Resource, attributes pcommon.Map, externa
for k, v := range l {
s = append(s, prompb.Label{Name: k, Value: v})
}
// Ensure consistent label ordering
sort.Sort(ByLabelName(s))

return s
}
Expand Down Expand Up @@ -565,10 +567,17 @@ func addResourceTargetInfo(resource pcommon.Resource, settings Settings, timesta
name = settings.Namespace + "_" + name
}
labels := createAttributes(resource, attributes, settings.ExternalLabels, model.MetricNameLabel, name)
identifyingLabels := make([]int32, 0, 2)
for i, l := range labels {
if l.Name == model.InstanceLabel || l.Name == model.JobLabel {
identifyingLabels = append(identifyingLabels, int32(i))
}
}
sample := &prompb.Sample{
Value: float64(1),
// convert ns to ms
Timestamp: convertTimeStamp(timestamp),
Timestamp: convertTimeStamp(timestamp),
IdentifyingLabels: identifyingLabels,
}
addSample(tsMap, sample, labels, infoType)
}
Expand Down
10 changes: 10 additions & 0 deletions storage/remote/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,7 @@ func (rws *WriteStorage) Close() error {
type timestampTracker struct {
writeStorage *WriteStorage
samples int64
infoSamples int64
exemplars int64
histograms int64
highestTimestamp int64
Expand All @@ -284,6 +285,15 @@ func (t *timestampTracker) Append(_ storage.SeriesRef, _ labels.Labels, ts int64
return 0, nil
}

// AppendInfoSample implements storage.Appender.
func (t *timestampTracker) AppendInfoSample(_ storage.SeriesRef, _ labels.Labels, ts int64, _ []int) (storage.SeriesRef, error) {
t.infoSamples++
if ts > t.highestTimestamp {
t.highestTimestamp = ts
}
return 0, nil
}

func (t *timestampTracker) AppendExemplar(_ storage.SeriesRef, _ labels.Labels, _ exemplar.Exemplar) (storage.SeriesRef, error) {
t.exemplars++
return 0, nil
Expand Down
12 changes: 10 additions & 2 deletions storage/remote/write_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,16 @@ func (h *writeHandler) write(ctx context.Context, req *prompb.WriteRequest) (err
}
var ref storage.SeriesRef
for _, s := range ts.Samples {
ref, err = app.Append(ref, labels, s.Timestamp, s.Value)
if len(s.IdentifyingLabels) == 0 {
ref, err = app.Append(ref, labels, s.Timestamp, s.Value)
} else {
// This is an info metric sample
ils := make([]int, 0, len(s.IdentifyingLabels))
for _, idx := range s.IdentifyingLabels {
ils = append(ils, int(idx))
}
ref, err = app.AppendInfoSample(ref, labels, s.Timestamp, ils)
}
if err != nil {
unwrappedErr := errors.Unwrap(err)
if unwrappedErr == nil {
Expand All @@ -135,7 +144,6 @@ func (h *writeHandler) write(ctx context.Context, req *prompb.WriteRequest) (err
}
return err
}

}

for _, ep := range ts.Exemplars {
Expand Down
17 changes: 17 additions & 0 deletions storage/remote/write_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,7 @@ func genSeriesWithSample(numSeries int, ts int64) []prompb.TimeSeries {
type mockAppendable struct {
latestSample int64
samples []mockSample
infoSamples []mockInfoSample
latestExemplar int64
exemplars []mockExemplar
latestHistogram int64
Expand All @@ -282,6 +283,12 @@ type mockSample struct {
v float64
}

type mockInfoSample struct {
l labels.Labels
t int64
identifyingLabels []int
}

type mockExemplar struct {
l labels.Labels
el labels.Labels
Expand Down Expand Up @@ -317,6 +324,16 @@ func (m *mockAppendable) Append(_ storage.SeriesRef, l labels.Labels, t int64, v
return 0, nil
}

func (m *mockAppendable) AppendInfoSample(_ storage.SeriesRef, l labels.Labels, t int64, identifyingLabels []int) (storage.SeriesRef, error) {
if t < m.latestSample {
return 0, storage.ErrOutOfOrderSample
}

m.latestSample = t
m.infoSamples = append(m.infoSamples, mockInfoSample{l: l, t: t, identifyingLabels: identifyingLabels})
return 0, nil
}

func (m *mockAppendable) Commit() error {
return m.commitErr
}
Expand Down
Loading

0 comments on commit c6d815e

Please sign in to comment.