From 10c400264032010eb9dc28e844e7fc731d2aacca Mon Sep 17 00:00:00 2001 From: Arve Knudsen Date: Thu, 8 Feb 2024 11:48:54 +0400 Subject: [PATCH] Implement Appender.AppendIdentifyingLabels Signed-off-by: Arve Knudsen --- cmd/prometheus/main.go | 4 ++++ scrape/helpers_test.go | 15 +++++++++++++++ storage/fanout.go | 14 ++++++++++++++ storage/interface.go | 2 ++ storage/remote/write.go | 8 ++++++++ storage/remote/write_handler.go | 11 +++++++++-- storage/remote/write_handler_test.go | 25 ++++++++++++++++++------- tsdb/agent/db.go | 5 +++++ tsdb/head_append.go | 20 ++++++++++++++++++++ 9 files changed, 95 insertions(+), 9 deletions(-) diff --git a/cmd/prometheus/main.go b/cmd/prometheus/main.go index e36665857b..e586415958 100644 --- a/cmd/prometheus/main.go +++ b/cmd/prometheus/main.go @@ -1534,6 +1534,10 @@ func (n notReadyAppender) AppendHistogram(ref storage.SeriesRef, l labels.Labels return 0, tsdb.ErrNotReady } +func (n notReadyAppender) AppendIdentifyingLabels(storage.SeriesRef, []string, int64) error { + return tsdb.ErrNotReady +} + func (n notReadyAppender) UpdateMetadata(ref storage.SeriesRef, l labels.Labels, m metadata.Metadata) (storage.SeriesRef, error) { return 0, tsdb.ErrNotReady } diff --git a/scrape/helpers_test.go b/scrape/helpers_test.go index 43ee0fcecf..48e7956936 100644 --- a/scrape/helpers_test.go +++ b/scrape/helpers_test.go @@ -54,6 +54,10 @@ func (a nopAppender) AppendHistogram(storage.SeriesRef, labels.Labels, int64, *h return 0, nil } +func (a nopAppender) AppendIdentifyingLabels(storage.SeriesRef, []string, int64) error { + return nil +} + func (a nopAppender) UpdateMetadata(storage.SeriesRef, labels.Labels, metadata.Metadata) (storage.SeriesRef, error) { return 0, nil } @@ -148,6 +152,17 @@ func (a *collectResultAppender) AppendHistogram(ref storage.SeriesRef, l labels. return a.next.AppendHistogram(ref, l, t, h, fh) } +func (a *collectResultAppender) AppendIdentifyingLabels(ref storage.SeriesRef, identifyingLabels []string, t int64) error { + a.mtx.Lock() + defer a.mtx.Unlock() + // TODO: Add to pendingDataLabels + if a.next == nil { + return nil + } + + return a.next.AppendIdentifyingLabels(ref, identifyingLabels, t) +} + func (a *collectResultAppender) UpdateMetadata(ref storage.SeriesRef, l labels.Labels, m metadata.Metadata) (storage.SeriesRef, error) { a.mtx.Lock() defer a.mtx.Unlock() diff --git a/storage/fanout.go b/storage/fanout.go index e52342bc7e..1b9c29c070 100644 --- a/storage/fanout.go +++ b/storage/fanout.go @@ -190,6 +190,20 @@ func (f *fanoutAppender) AppendHistogram(ref SeriesRef, l labels.Labels, t int64 return ref, nil } +func (f *fanoutAppender) AppendIdentifyingLabels(ref SeriesRef, identifyingLabels []string, t int64) error { + if err := f.primary.AppendIdentifyingLabels(ref, identifyingLabels, t); err != nil { + return err + } + + for _, appender := range f.secondaries { + if err := appender.AppendIdentifyingLabels(ref, identifyingLabels, t); err != nil { + return err + } + } + + return nil +} + func (f *fanoutAppender) UpdateMetadata(ref SeriesRef, l labels.Labels, m metadata.Metadata) (SeriesRef, error) { ref, err := f.primary.UpdateMetadata(ref, l, m) if err != nil { diff --git a/storage/interface.go b/storage/interface.go index 892897e51e..3e23fd3593 100644 --- a/storage/interface.go +++ b/storage/interface.go @@ -245,6 +245,8 @@ type Appender interface { // If the reference is 0 it must not be used for caching. Append(ref SeriesRef, l labels.Labels, t int64, v float64) (SeriesRef, error) + AppendIdentifyingLabels(ref SeriesRef, identifyingLabels []string, t int64) error + // Commit submits the collected samples and purges the batch. If Commit // returns a non-nil error, it also rolls back all modifications made in // the appender so far, as Rollback would do. In any case, an Appender diff --git a/storage/remote/write.go b/storage/remote/write.go index 66455cb4dd..c47ef23c0f 100644 --- a/storage/remote/write.go +++ b/storage/remote/write.go @@ -297,6 +297,14 @@ func (t *timestampTracker) AppendHistogram(_ storage.SeriesRef, _ labels.Labels, return 0, nil } +// AppendIdentifyingLabels implements storage.Appender. +func (t *timestampTracker) AppendIdentifyingLabels(_ storage.SeriesRef, _ []string, ts int64) error { + if ts > t.highestTimestamp { + t.highestTimestamp = ts + } + return nil +} + func (t *timestampTracker) UpdateMetadata(_ storage.SeriesRef, _ labels.Labels, _ metadata.Metadata) (storage.SeriesRef, error) { // TODO: Add and increment a `metadata` field when we get around to wiring metadata in remote_write. // UpadteMetadata is no-op for remote write (where timestampTracker is being used) for now. diff --git a/storage/remote/write_handler.go b/storage/remote/write_handler.go index b592002fa7..e1b8c3d903 100644 --- a/storage/remote/write_handler.go +++ b/storage/remote/write_handler.go @@ -18,6 +18,7 @@ import ( "errors" "fmt" "net/http" + "slices" "github.com/go-kit/log" "github.com/go-kit/log/level" @@ -169,8 +170,14 @@ func (h *writeHandler) write(ctx context.Context, req *prompb.WriteRequest) (err } if len(ts.DataLabels) > 0 { - // Info type metric with metadata represented as data labels - if err := app.AppendDataLabels(ts.DataLabels, ts.Samples[0].Timestamp); err != nil { + // Info type metric with metadata represented as data labels. + identifyingLabels := make([]string, 0, len(ts.Labels)-len(dataLabels)) + for _, l := range ts.Labels { + if !slices.Contains(dataLabels, l.Name) { + identifyingLabels = append(identifyingLabels, l.Name) + } + } + if err := app.AppendIdentifyingLabels(ref, identifyingLabels, ts.Samples[0].Timestamp); err != nil { return err } } diff --git a/storage/remote/write_handler_test.go b/storage/remote/write_handler_test.go index df92dc6bcc..b258fb9a87 100644 --- a/storage/remote/write_handler_test.go +++ b/storage/remote/write_handler_test.go @@ -264,13 +264,14 @@ func genSeriesWithSample(numSeries int, ts int64) []prompb.TimeSeries { } type mockAppendable struct { - latestSample int64 - samples []mockSample - latestExemplar int64 - exemplars []mockExemplar - latestHistogram int64 - histograms []mockHistogram - commitErr error + latestSample int64 + samples []mockSample + latestExemplar int64 + exemplars []mockExemplar + latestHistogram int64 + latestDataLabels int64 + histograms []mockHistogram + commitErr error } type mockSample struct { @@ -335,6 +336,16 @@ func (m *mockAppendable) AppendHistogram(_ storage.SeriesRef, l labels.Labels, t return 0, nil } +func (m *mockAppendable) AppendIdentifyingLabels(_ storage.SeriesRef, identifyingLabels []string, t int64) error { + if t < m.latestDataLabels { + return storage.ErrOutOfOrderSample + } + + m.latestDataLabels = t + // TODO: Record data labels sample + return nil +} + func (m *mockAppendable) UpdateMetadata(_ storage.SeriesRef, _ labels.Labels, _ metadata.Metadata) (storage.SeriesRef, error) { // TODO: Wire metadata in a mockAppendable field when we get around to handling metadata in remote_write. // UpdateMetadata is no-op for remote write (where mockAppendable is being used to test) for now. diff --git a/tsdb/agent/db.go b/tsdb/agent/db.go index d399897133..2f8d3d2c41 100644 --- a/tsdb/agent/db.go +++ b/tsdb/agent/db.go @@ -957,6 +957,11 @@ func (a *appender) AppendHistogram(ref storage.SeriesRef, l labels.Labels, t int return storage.SeriesRef(series.ref), nil } +func (a *appender) AppendIdentifyingLabels(storage.SeriesRef, []string, int64) error { + // TODO: Implement + return nil +} + func (a *appender) UpdateMetadata(storage.SeriesRef, labels.Labels, metadata.Metadata) (storage.SeriesRef, error) { // TODO: Wire metadata in the Agent's appender. return 0, nil diff --git a/tsdb/head_append.go b/tsdb/head_append.go index bd5ef53463..628583bfda 100644 --- a/tsdb/head_append.go +++ b/tsdb/head_append.go @@ -78,6 +78,16 @@ func (a *initAppender) AppendHistogram(ref storage.SeriesRef, l labels.Labels, t return a.app.AppendHistogram(ref, l, t, h, fh) } +func (a *initAppender) AppendIdentifyingLabels(ref storage.SeriesRef, identifyingLabels []string, t int64) error { + if a.app != nil { + return a.app.AppendIdentifyingLabels(ref, identifyingLabels, t) + } + + a.head.initTime(t) + a.app = a.head.appender() + return a.app.AppendIdentifyingLabels(ref, identifyingLabels, t) +} + func (a *initAppender) UpdateMetadata(ref storage.SeriesRef, l labels.Labels, m metadata.Metadata) (storage.SeriesRef, error) { if a.app != nil { return a.app.UpdateMetadata(ref, l, m) @@ -677,6 +687,16 @@ func (a *headAppender) AppendHistogram(ref storage.SeriesRef, lset labels.Labels return storage.SeriesRef(s.ref), nil } +func (a *headAppender) AppendIdentifyingLabels(ref storage.SeriesRef, identifyingLabels []string, t int64) error { + s := a.head.series.getByID(chunks.HeadSeriesRef(ref)) + if s == nil { + return fmt.Errorf("series not found: %d", ref) + } + + // TODO: Implement persisting of identifying labels index + return nil +} + // UpdateMetadata for headAppender assumes the series ref already exists, and so it doesn't // use getOrCreate or make any of the lset sanity checks that Append does. func (a *headAppender) UpdateMetadata(ref storage.SeriesRef, lset labels.Labels, meta metadata.Metadata) (storage.SeriesRef, error) {