From 0fa864b19ef565dcc98198259f4d880b1dd42e3b Mon Sep 17 00:00:00 2001 From: Matej Gera Date: Thu, 14 Jul 2022 17:06:09 +0200 Subject: [PATCH 01/11] Add label set validation method Signed-off-by: Matej Gera --- pkg/store/labelpb/label.go | 43 +++++++++++++++++++++++++++++++++++++- 1 file changed, 42 insertions(+), 1 deletion(-) diff --git a/pkg/store/labelpb/label.go b/pkg/store/labelpb/label.go index 69a69f029b..d9c1901c06 100644 --- a/pkg/store/labelpb/label.go +++ b/pkg/store/labelpb/label.go @@ -19,7 +19,13 @@ import ( "github.com/prometheus/prometheus/model/labels" ) -var sep = []byte{'\xff'} +var ( + ErrOutOfOrderLabels = errors.New("out of order labels") + ErrEmptyLabels = errors.New("label set contains an empty label") + ErrDuplicateLabels = errors.New("label set contains duplicate label names") + + sep = []byte{'\xff'} +) func noAllocString(buf []byte) string { return *(*string)(unsafe.Pointer(&buf)) @@ -346,6 +352,41 @@ func HashWithPrefix(prefix string, lbls []ZLabel) uint64 { return xxhash.Sum64(b) } +// TODO +func ValidateLabels(lbls []ZLabel) error { + if len(lbls) == 0 { + return ErrEmptyLabels + } + + labelNames := map[string]struct{}{} + + // Check first label. + l0 := lbls[0] + if l0.Name == "" { + return ErrEmptyLabels + } + labelNames[l0.Name] = struct{}{} + + // Iterate over the rest, check each for empty / duplicates and check ordering. + for _, l := range lbls[1:] { + if l.Name == "" { + return ErrEmptyLabels + } + + if _, ok := labelNames[l.Name]; ok { + return ErrDuplicateLabels + } + labelNames[l.Name] = struct{}{} + + if l.Name < l0.Name { + return ErrOutOfOrderLabels + } + l0 = l + } + + return nil +} + // ZLabelSets is a sortable list of ZLabelSet. It assumes the label pairs in each ZLabelSet element are already sorted. type ZLabelSets []ZLabelSet From 62dc02ad54d04ffd19ca0099b7d27bb25c21239b Mon Sep 17 00:00:00 2001 From: Matej Gera Date: Thu, 14 Jul 2022 17:06:32 +0200 Subject: [PATCH 02/11] Add tests for label validation method Signed-off-by: Matej Gera --- pkg/store/labelpb/label.go | 4 +- pkg/store/labelpb/label_test.go | 139 ++++++++++++++++++++++++++++++++ 2 files changed, 142 insertions(+), 1 deletion(-) diff --git a/pkg/store/labelpb/label.go b/pkg/store/labelpb/label.go index d9c1901c06..765dc4ffea 100644 --- a/pkg/store/labelpb/label.go +++ b/pkg/store/labelpb/label.go @@ -352,7 +352,9 @@ func HashWithPrefix(prefix string, lbls []ZLabel) uint64 { return xxhash.Sum64(b) } -// TODO +// ValidateLabels validates label names (checks for empty names, +// out of order labels and duplicate label names). Returns appropriate +// error if validation fails on a label. func ValidateLabels(lbls []ZLabel) error { if len(lbls) == 0 { return ErrEmptyLabels diff --git a/pkg/store/labelpb/label_test.go b/pkg/store/labelpb/label_test.go index bf57b801ef..706bd0b50c 100644 --- a/pkg/store/labelpb/label_test.go +++ b/pkg/store/labelpb/label_test.go @@ -66,6 +66,145 @@ func TestExtendLabels(t *testing.T) { testInjectExtLabels(testutil.NewTB(t)) } +func TestValidateLabels(t *testing.T) { + testCases := []struct { + labelSet []ZLabel + expectedErr error + }{ + { + // No labels at all. + labelSet: []ZLabel{}, + expectedErr: ErrEmptyLabels, + }, + { + // Empty label. + labelSet: []ZLabel{ + { + Name: "foo", + Value: "bar", + }, + { + Name: "", + Value: "baz", + }, + }, + expectedErr: ErrEmptyLabels, + }, + { + // Empty label (first label). + labelSet: []ZLabel{ + { + Name: "", + Value: "bar", + }, + { + Name: "foo", + Value: "baz", + }, + }, + expectedErr: ErrEmptyLabels, + }, + { + // Duplicate label. + labelSet: []ZLabel{ + { + Name: "foo", + Value: "bar", + }, + { + Name: "test", + Value: "baz", + }, + { + Name: "foo", + Value: "bar", + }, + }, + expectedErr: ErrDuplicateLabels, + }, + { + // Empty and duplicate label (empty comes first). + labelSet: []ZLabel{ + { + Name: "foo", + Value: "bar", + }, + { + Name: "", + Value: "baz", + }, + { + Name: "foo", + Value: "bar", + }, + }, + expectedErr: ErrEmptyLabels, + }, + { + // Wrong order. + labelSet: []ZLabel{ + { + Name: "a", + Value: "bar", + }, + { + Name: "b", + Value: "baz", + }, + { + Name: "__name__", + Value: "test", + }, + }, + expectedErr: ErrOutOfOrderLabels, + }, + { + // Wrong order and duplicate (wrong order comes first). + labelSet: []ZLabel{ + { + Name: "a", + Value: "bar", + }, + { + Name: "__name__", + Value: "test", + }, + { + Name: "a", + Value: "bar", + }, + }, + expectedErr: ErrOutOfOrderLabels, + }, + { + // All good. + labelSet: []ZLabel{ + { + Name: "__name__", + Value: "test", + }, + { + Name: "a1", + Value: "bar", + }, + { + Name: "a2", + Value: "baz", + }, + }, + expectedErr: nil, + }, + } + + for i, tc := range testCases { + t.Run(fmt.Sprintf("case %d", i+1), func(t *testing.T) { + err := ValidateLabels(tc.labelSet) + testutil.Equals(t, tc.expectedErr, err) + }) + } + +} + func BenchmarkExtendLabels(b *testing.B) { testInjectExtLabels(testutil.NewTB(b)) } From 246bc3803f980680e6a52d017fad423db60e492a Mon Sep 17 00:00:00 2001 From: Matej Gera Date: Thu, 14 Jul 2022 17:07:14 +0200 Subject: [PATCH 03/11] Validate label set during receiver write Signed-off-by: Matej Gera --- pkg/receive/writer.go | 68 +++++++++++++++++++++++++++++++++---------- 1 file changed, 53 insertions(+), 15 deletions(-) diff --git a/pkg/receive/writer.go b/pkg/receive/writer.go index 90bb2897c4..88f435c5e4 100644 --- a/pkg/receive/writer.go +++ b/pkg/receive/writer.go @@ -43,9 +43,14 @@ func (r *Writer) Write(ctx context.Context, tenantID string, wreq *prompb.WriteR tLogger := log.With(r.logger, "tenant", tenantID) var ( - numOutOfOrder = 0 - numDuplicates = 0 - numOutOfBounds = 0 + numLabelsOutOfOrder = 0 + numLabelsDuplicates = 0 + numLabelsEmpty = 0 + + numSamplesOutOfOrder = 0 + numSamplesDuplicates = 0 + numSamplesOutOfBounds = 0 + numExemplarsOutOfOrder = 0 numExemplarsDuplicate = 0 numExemplarsLabelLength = 0 @@ -70,6 +75,25 @@ func (r *Writer) Write(ctx context.Context, tenantID string, wreq *prompb.WriteR errs errutil.MultiError ) for _, t := range wreq.Timeseries { + // Check if time series labels are valid. If not, skip the time series + // and report the error. + err := labelpb.ValidateLabels(t.Labels) + if err != nil { + switch err { + case labelpb.ErrOutOfOrderLabels: + numLabelsOutOfOrder++ + level.Debug(tLogger).Log("msg", "Out of order labels in the label set", "lset", t.Labels) + case labelpb.ErrDuplicateLabels: + numLabelsDuplicates++ + level.Debug(tLogger).Log("msg", "Duplicate labels in the label set", "lset", t.Labels) + case labelpb.ErrEmptyLabels: + numLabelsEmpty++ + level.Debug(tLogger).Log("msg", "Labels with empty name in the label set", "lset", t.Labels) + } + + continue + } + lset := labelpb.ZLabelsToPromLabels(t.Labels) // Check if the TSDB has cached reference for those labels. @@ -86,13 +110,13 @@ func (r *Writer) Write(ctx context.Context, tenantID string, wreq *prompb.WriteR ref, err = app.Append(ref, lset, s.Timestamp, s.Value) switch err { case storage.ErrOutOfOrderSample: - numOutOfOrder++ + numSamplesOutOfOrder++ level.Debug(tLogger).Log("msg", "Out of order sample", "lset", lset, "value", s.Value, "timestamp", s.Timestamp) case storage.ErrDuplicateSampleForTimestamp: - numDuplicates++ + numSamplesDuplicates++ level.Debug(tLogger).Log("msg", "Duplicate sample for timestamp", "lset", lset, "value", s.Value, "timestamp", s.Timestamp) case storage.ErrOutOfBounds: - numOutOfBounds++ + numSamplesOutOfBounds++ level.Debug(tLogger).Log("msg", "Out of bounds metric", "lset", lset, "value", s.Value, "timestamp", s.Timestamp) } } @@ -129,18 +153,32 @@ func (r *Writer) Write(ctx context.Context, tenantID string, wreq *prompb.WriteR } } - if numOutOfOrder > 0 { - level.Warn(tLogger).Log("msg", "Error on ingesting out-of-order samples", "numDropped", numOutOfOrder) - errs.Add(errors.Wrapf(storage.ErrOutOfOrderSample, "add %d samples", numOutOfOrder)) + if numLabelsOutOfOrder > 0 { + level.Warn(tLogger).Log("msg", "Error on series with out-of-order labels", "numDropped", numLabelsOutOfOrder) + errs.Add(errors.Wrapf(labelpb.ErrOutOfOrderLabels, "add %d series", numLabelsOutOfOrder)) + } + if numLabelsDuplicates > 0 { + level.Warn(tLogger).Log("msg", "Error on series with duplicate labels", "numDropped", numLabelsDuplicates) + errs.Add(errors.Wrapf(labelpb.ErrDuplicateLabels, "add %d series", numLabelsDuplicates)) + } + if numLabelsEmpty > 0 { + level.Warn(tLogger).Log("msg", "Error on series with empty label(s)", "numDropped", numLabelsEmpty) + errs.Add(errors.Wrapf(labelpb.ErrEmptyLabels, "add %d series", numLabelsEmpty)) + } + + if numSamplesOutOfOrder > 0 { + level.Warn(tLogger).Log("msg", "Error on ingesting out-of-order samples", "numDropped", numSamplesOutOfOrder) + errs.Add(errors.Wrapf(storage.ErrOutOfOrderSample, "add %d samples", numSamplesOutOfOrder)) } - if numDuplicates > 0 { - level.Warn(tLogger).Log("msg", "Error on ingesting samples with different value but same timestamp", "numDropped", numDuplicates) - errs.Add(errors.Wrapf(storage.ErrDuplicateSampleForTimestamp, "add %d samples", numDuplicates)) + if numSamplesDuplicates > 0 { + level.Warn(tLogger).Log("msg", "Error on ingesting samples with different value but same timestamp", "numDropped", numSamplesDuplicates) + errs.Add(errors.Wrapf(storage.ErrDuplicateSampleForTimestamp, "add %d samples", numSamplesDuplicates)) } - if numOutOfBounds > 0 { - level.Warn(tLogger).Log("msg", "Error on ingesting samples that are too old or are too far into the future", "numDropped", numOutOfBounds) - errs.Add(errors.Wrapf(storage.ErrOutOfBounds, "add %d samples", numOutOfBounds)) + if numSamplesOutOfBounds > 0 { + level.Warn(tLogger).Log("msg", "Error on ingesting samples that are too old or are too far into the future", "numDropped", numSamplesOutOfBounds) + errs.Add(errors.Wrapf(storage.ErrOutOfBounds, "add %d samples", numSamplesOutOfBounds)) } + if numExemplarsOutOfOrder > 0 { level.Warn(tLogger).Log("msg", "Error on ingesting out-of-order exemplars", "numDropped", numExemplarsOutOfOrder) errs.Add(errors.Wrapf(storage.ErrOutOfOrderExemplar, "add %d exemplars", numExemplarsOutOfOrder)) From e2700fb89b560cf1d8006067eb46cc276f45add1 Mon Sep 17 00:00:00 2001 From: Matej Gera Date: Fri, 15 Jul 2022 14:08:49 +0200 Subject: [PATCH 04/11] Handle labels conflict errors Signed-off-by: Matej Gera --- pkg/receive/handler.go | 31 ++++++++++++++++++++++++++----- pkg/receive/handler_test.go | 26 +++++++++++++++++++++++++- 2 files changed, 51 insertions(+), 6 deletions(-) diff --git a/pkg/receive/handler.go b/pkg/receive/handler.go index f5f0270700..3bf7453183 100644 --- a/pkg/receive/handler.go +++ b/pkg/receive/handler.go @@ -812,13 +812,34 @@ func isConflict(err error) bool { return false } return err == errConflict || - err == storage.ErrDuplicateSampleForTimestamp || + isSampleConflictErr(err) || + isExemplarConflictErr(err) || + isLabelsConflictErr(err) || + status.Code(err) == codes.AlreadyExists +} + +// isSampleConflictErr returns whether or not the given error represents +// a sample-related conflict. +func isSampleConflictErr(err error) bool { + return err == storage.ErrDuplicateSampleForTimestamp || err == storage.ErrOutOfOrderSample || - err == storage.ErrOutOfBounds || - err == storage.ErrDuplicateExemplar || + err == storage.ErrOutOfBounds +} + +// isExemplarConflictErr returns whether or not the given error represents +// a exemplar-related conflict. +func isExemplarConflictErr(err error) bool { + return err == storage.ErrDuplicateExemplar || err == storage.ErrOutOfOrderExemplar || - err == storage.ErrExemplarLabelLength || - status.Code(err) == codes.AlreadyExists + err == storage.ErrExemplarLabelLength +} + +// isLabelsConflictErr returns whether or not the given error represents +// a labels-related conflict. +func isLabelsConflictErr(err error) bool { + return err == labelpb.ErrDuplicateLabels || + err == labelpb.ErrEmptyLabels || + err == labelpb.ErrOutOfOrderLabels } // isNotReady returns whether or not the given error represents a not ready error. diff --git a/pkg/receive/handler_test.go b/pkg/receive/handler_test.go index dee2600209..7a5c555f80 100644 --- a/pkg/receive/handler_test.go +++ b/pkg/receive/handler_test.go @@ -114,6 +114,16 @@ func TestDetermineWriteErrorCause(t *testing.T) { threshold: 1, exp: errConflict, }, + { + name: "matching multierror (labels error)", + err: errutil.NonNilMultiError([]error{ + labelpb.ErrEmptyLabels, + errors.New("foo"), + errors.New("bar"), + }), + threshold: 1, + exp: errConflict, + }, { name: "matching but below threshold multierror", err: errutil.NonNilMultiError([]error{ @@ -163,7 +173,7 @@ func TestDetermineWriteErrorCause(t *testing.T) { exp: errConflict, }, { - name: "matching multierror many, both above threshold, conflict have precedence", + name: "matching multierror many, both above threshold, conflict has precedence", err: errutil.NonNilMultiError([]error{ storage.ErrOutOfOrderSample, errConflict, @@ -176,6 +186,20 @@ func TestDetermineWriteErrorCause(t *testing.T) { threshold: 2, exp: errConflict, }, + { + name: "matching multierror many, both above threshold, conflict has precedence (labels error)", + err: errutil.NonNilMultiError([]error{ + labelpb.ErrDuplicateLabels, + labelpb.ErrDuplicateLabels, + tsdb.ErrNotReady, + tsdb.ErrNotReady, + tsdb.ErrNotReady, + labelpb.ErrDuplicateLabels, + errors.New("foo"), + }), + threshold: 2, + exp: errConflict, + }, { name: "nested matching multierror", err: errors.Wrap(errors.Wrap(errutil.NonNilMultiError([]error{ From 6830c6a7e719aafa161f9c220914f8b6d57639af Mon Sep 17 00:00:00 2001 From: Matej Gera Date: Fri, 15 Jul 2022 14:31:30 +0200 Subject: [PATCH 05/11] Rename out-of-order error methods Signed-off-by: Matej Gera --- pkg/block/index.go | 6 +++--- pkg/compact/compact.go | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/pkg/block/index.go b/pkg/block/index.go index 830ac89718..4bc13195d4 100644 --- a/pkg/block/index.go +++ b/pkg/block/index.go @@ -92,10 +92,10 @@ type HealthStats struct { MetricLabelValuesCount int64 } -// PrometheusIssue5372Err returns an error if the HealthStats object indicates +// OutOfOrderLabelsErr returns an error if the HealthStats object indicates // postings with out of order labels. This is corrected by Prometheus Issue // #5372 and affects Prometheus versions 2.8.0 and below. -func (i HealthStats) PrometheusIssue5372Err() error { +func (i HealthStats) OutOfOrderLabelsErr() error { if i.OutOfOrderLabels > 0 { return errors.Errorf("index contains %d postings with out of order labels", i.OutOfOrderLabels) @@ -157,7 +157,7 @@ func (i HealthStats) AnyErr() error { errMsg = append(errMsg, err.Error()) } - if err := i.PrometheusIssue5372Err(); err != nil { + if err := i.OutOfOrderLabelsErr(); err != nil { errMsg = append(errMsg, err.Error()) } diff --git a/pkg/compact/compact.go b/pkg/compact/compact.go index e9a5dcdd34..3759184a8d 100644 --- a/pkg/compact/compact.go +++ b/pkg/compact/compact.go @@ -1053,7 +1053,7 @@ func (cg *Group) compact(ctx context.Context, dir string, planner Planner, comp return issue347Error(errors.Wrapf(err, "invalid, but reparable block %s", bdir), meta.ULID) } - if err := stats.PrometheusIssue5372Err(); !cg.acceptMalformedIndex && err != nil { + if err := stats.OutOfOrderLabelsErr(); !cg.acceptMalformedIndex && err != nil { return errors.Wrapf(err, "block id %s, try running with --debug.accept-malformed-index", meta.ULID) } From 93d07d5154abbe81a1ed7229e0f30572cf9c6616 Mon Sep 17 00:00:00 2001 From: Matej Gera Date: Fri, 15 Jul 2022 15:06:49 +0200 Subject: [PATCH 06/11] Add empty label value validation Signed-off-by: Matej Gera --- pkg/receive/writer.go | 2 +- pkg/store/labelpb/label.go | 12 ++++++------ pkg/store/labelpb/label_test.go | 14 ++++++++++++++ 3 files changed, 21 insertions(+), 7 deletions(-) diff --git a/pkg/receive/writer.go b/pkg/receive/writer.go index 88f435c5e4..93c53e87d3 100644 --- a/pkg/receive/writer.go +++ b/pkg/receive/writer.go @@ -162,7 +162,7 @@ func (r *Writer) Write(ctx context.Context, tenantID string, wreq *prompb.WriteR errs.Add(errors.Wrapf(labelpb.ErrDuplicateLabels, "add %d series", numLabelsDuplicates)) } if numLabelsEmpty > 0 { - level.Warn(tLogger).Log("msg", "Error on series with empty label(s)", "numDropped", numLabelsEmpty) + level.Warn(tLogger).Log("msg", "Error on series with empty label name or value", "numDropped", numLabelsEmpty) errs.Add(errors.Wrapf(labelpb.ErrEmptyLabels, "add %d series", numLabelsEmpty)) } diff --git a/pkg/store/labelpb/label.go b/pkg/store/labelpb/label.go index 765dc4ffea..8a4323baaf 100644 --- a/pkg/store/labelpb/label.go +++ b/pkg/store/labelpb/label.go @@ -21,7 +21,7 @@ import ( var ( ErrOutOfOrderLabels = errors.New("out of order labels") - ErrEmptyLabels = errors.New("label set contains an empty label") + ErrEmptyLabels = errors.New("label set contains a label with empty name or value") ErrDuplicateLabels = errors.New("label set contains duplicate label names") sep = []byte{'\xff'} @@ -352,9 +352,9 @@ func HashWithPrefix(prefix string, lbls []ZLabel) uint64 { return xxhash.Sum64(b) } -// ValidateLabels validates label names (checks for empty names, -// out of order labels and duplicate label names). Returns appropriate -// error if validation fails on a label. +// ValidateLabels validates label names and values (checks for empty +// names and values, out of order labels and duplicate label names) +// Returns appropriate error if validation fails on a label. func ValidateLabels(lbls []ZLabel) error { if len(lbls) == 0 { return ErrEmptyLabels @@ -364,14 +364,14 @@ func ValidateLabels(lbls []ZLabel) error { // Check first label. l0 := lbls[0] - if l0.Name == "" { + if l0.Name == "" || l0.Value == "" { return ErrEmptyLabels } labelNames[l0.Name] = struct{}{} // Iterate over the rest, check each for empty / duplicates and check ordering. for _, l := range lbls[1:] { - if l.Name == "" { + if l.Name == "" || l.Value == "" { return ErrEmptyLabels } diff --git a/pkg/store/labelpb/label_test.go b/pkg/store/labelpb/label_test.go index 706bd0b50c..1019ef3559 100644 --- a/pkg/store/labelpb/label_test.go +++ b/pkg/store/labelpb/label_test.go @@ -104,6 +104,20 @@ func TestValidateLabels(t *testing.T) { }, expectedErr: ErrEmptyLabels, }, + { + // Empty label (empty value). + labelSet: []ZLabel{ + { + Name: "foo", + Value: "bar", + }, + { + Name: "baz", + Value: "", + }, + }, + expectedErr: ErrEmptyLabels, + }, { // Duplicate label. labelSet: []ZLabel{ From 127b606f9b33af598542e244d6c24846552f5c7a Mon Sep 17 00:00:00 2001 From: Matej Gera Date: Fri, 15 Jul 2022 16:20:51 +0200 Subject: [PATCH 07/11] Add receive writer tests Signed-off-by: Matej Gera --- pkg/receive/writer_test.go | 86 ++++++++++++++++++++++++++++++++++++++ pkg/store/labelpb/label.go | 3 +- 2 files changed, 88 insertions(+), 1 deletion(-) diff --git a/pkg/receive/writer_test.go b/pkg/receive/writer_test.go index 2f17315940..715f788aa0 100644 --- a/pkg/receive/writer_test.go +++ b/pkg/receive/writer_test.go @@ -5,6 +5,7 @@ package receive import ( "context" + "fmt" "io/ioutil" "os" "strings" @@ -34,6 +35,81 @@ func TestWriter(t *testing.T) { expectedIngested []prompb.TimeSeries maxExemplars int64 }{ + "should error out on series with no labels": { + reqs: []*prompb.WriteRequest{ + { + Timeseries: []prompb.TimeSeries{ + { + Samples: []prompb.Sample{{Value: 1, Timestamp: 10}}, + }, + { + Labels: []labelpb.ZLabel{{Name: "__name__", Value: ""}}, + Samples: []prompb.Sample{{Value: 1, Timestamp: 10}}, + }, + }, + }, + }, + expectedErr: errors.Wrapf(labelpb.ErrEmptyLabels, "add 2 series"), + }, + "should succeed on series with valid labels": { + reqs: []*prompb.WriteRequest{ + { + Timeseries: []prompb.TimeSeries{ + { + Labels: append(lbls, labelpb.ZLabel{Name: "a", Value: "1"}, labelpb.ZLabel{Name: "b", Value: "2"}), + Samples: []prompb.Sample{{Value: 1, Timestamp: 10}}, + }, + }, + }, + }, + expectedErr: nil, + expectedIngested: []prompb.TimeSeries{ + { + Labels: append(lbls, labelpb.ZLabel{Name: "a", Value: "1"}, labelpb.ZLabel{Name: "b", Value: "2"}), + Samples: []prompb.Sample{{Value: 1, Timestamp: 10}}, + }, + }, + }, + "should error out and skip series with out-of-order labels": { + reqs: []*prompb.WriteRequest{ + { + Timeseries: []prompb.TimeSeries{ + { + Labels: append(lbls, labelpb.ZLabel{Name: "A", Value: "1"}, labelpb.ZLabel{Name: "b", Value: "2"}), + Samples: []prompb.Sample{{Value: 1, Timestamp: 10}}, + }, + }, + }, + }, + expectedErr: errors.Wrapf(labelpb.ErrOutOfOrderLabels, "add 1 series"), + }, + "should error out and skip series with out-of-order labels; accept series with valid labels": { + reqs: []*prompb.WriteRequest{ + { + Timeseries: []prompb.TimeSeries{ + { + Labels: append(lbls, labelpb.ZLabel{Name: "A", Value: "1"}, labelpb.ZLabel{Name: "b", Value: "2"}), + Samples: []prompb.Sample{{Value: 1, Timestamp: 10}}, + }, + { + Labels: append(lbls, labelpb.ZLabel{Name: "c", Value: "1"}, labelpb.ZLabel{Name: "d", Value: "2"}), + Samples: []prompb.Sample{{Value: 1, Timestamp: 10}}, + }, + { + Labels: append(lbls, labelpb.ZLabel{Name: "E", Value: "1"}, labelpb.ZLabel{Name: "f", Value: "2"}), + Samples: []prompb.Sample{{Value: 1, Timestamp: 10}}, + }, + }, + }, + }, + expectedErr: errors.Wrapf(labelpb.ErrOutOfOrderLabels, "add 2 series"), + expectedIngested: []prompb.TimeSeries{ + { + Labels: append(lbls, labelpb.ZLabel{Name: "c", Value: "1"}, labelpb.ZLabel{Name: "d", Value: "2"}), + Samples: []prompb.Sample{{Value: 1, Timestamp: 10}}, + }, + }, + }, "should succeed on valid series with exemplars": { reqs: []*prompb.WriteRequest{{ Timeseries: []prompb.TimeSeries{ @@ -171,6 +247,16 @@ func TestWriter(t *testing.T) { testutil.Equals(t, testData.expectedErr.Error(), err.Error()) } } + + // On each expected series, assert we have a ref available. + a, err := app.Appender(context.Background()) + testutil.Ok(t, err) + gr := a.(storage.GetRef) + + for _, ts := range testData.expectedIngested { + ref, _ := gr.GetRef(labelpb.ZLabelsToPromLabels(ts.Labels)) + testutil.Assert(t, ref != 0, fmt.Sprintf("appender should have reference to series %v", ts)) + } }) } } diff --git a/pkg/store/labelpb/label.go b/pkg/store/labelpb/label.go index 8a4323baaf..4939883eb9 100644 --- a/pkg/store/labelpb/label.go +++ b/pkg/store/labelpb/label.go @@ -369,7 +369,8 @@ func ValidateLabels(lbls []ZLabel) error { } labelNames[l0.Name] = struct{}{} - // Iterate over the rest, check each for empty / duplicates and check ordering. + // Iterate over the rest, check each for empty / duplicates and + // check lexographical ordering. for _, l := range lbls[1:] { if l.Name == "" || l.Value == "" { return ErrEmptyLabels From 1b870b4de9b18ba684436ae9cf65e51355e89c71 Mon Sep 17 00:00:00 2001 From: Matej Gera Date: Fri, 22 Jul 2022 12:30:37 +0200 Subject: [PATCH 08/11] Optimize label validation Signed-off-by: Matej Gera --- pkg/store/labelpb/label.go | 6 +----- pkg/store/labelpb/label_test.go | 24 +++++++++++++++++++++++- 2 files changed, 24 insertions(+), 6 deletions(-) diff --git a/pkg/store/labelpb/label.go b/pkg/store/labelpb/label.go index 4939883eb9..33546e8e3f 100644 --- a/pkg/store/labelpb/label.go +++ b/pkg/store/labelpb/label.go @@ -360,14 +360,11 @@ func ValidateLabels(lbls []ZLabel) error { return ErrEmptyLabels } - labelNames := map[string]struct{}{} - // Check first label. l0 := lbls[0] if l0.Name == "" || l0.Value == "" { return ErrEmptyLabels } - labelNames[l0.Name] = struct{}{} // Iterate over the rest, check each for empty / duplicates and // check lexographical ordering. @@ -376,10 +373,9 @@ func ValidateLabels(lbls []ZLabel) error { return ErrEmptyLabels } - if _, ok := labelNames[l.Name]; ok { + if l.Name == l0.Name { return ErrDuplicateLabels } - labelNames[l.Name] = struct{}{} if l.Name < l0.Name { return ErrOutOfOrderLabels diff --git a/pkg/store/labelpb/label_test.go b/pkg/store/labelpb/label_test.go index 1019ef3559..7c5f72d71f 100644 --- a/pkg/store/labelpb/label_test.go +++ b/pkg/store/labelpb/label_test.go @@ -119,7 +119,7 @@ func TestValidateLabels(t *testing.T) { expectedErr: ErrEmptyLabels, }, { - // Duplicate label. + // Out-of-order and duplicate label (out-of-order comes first). labelSet: []ZLabel{ { Name: "foo", @@ -134,6 +134,28 @@ func TestValidateLabels(t *testing.T) { Value: "bar", }, }, + expectedErr: ErrOutOfOrderLabels, + }, + { + // Out-of-order and duplicate label (out-of-order comes first). + labelSet: []ZLabel{ + { + Name: "__test__", + Value: "baz", + }, + { + Name: "foo", + Value: "bar", + }, + { + Name: "foo", + Value: "bar", + }, + { + Name: "test", + Value: "baz", + }, + }, expectedErr: ErrDuplicateLabels, }, { From 155c8e3b1a39b00e12c3ce2fbf6bb46c94f57943 Mon Sep 17 00:00:00 2001 From: Matej Gera Date: Fri, 22 Jul 2022 12:30:54 +0200 Subject: [PATCH 09/11] Adjust writer tests Signed-off-by: Matej Gera --- pkg/receive/writer_test.go | 92 +++++++++++++++++++++++++++++++++++++- 1 file changed, 91 insertions(+), 1 deletion(-) diff --git a/pkg/receive/writer_test.go b/pkg/receive/writer_test.go index 715f788aa0..8cb1db3b40 100644 --- a/pkg/receive/writer_test.go +++ b/pkg/receive/writer_test.go @@ -75,7 +75,7 @@ func TestWriter(t *testing.T) { { Timeseries: []prompb.TimeSeries{ { - Labels: append(lbls, labelpb.ZLabel{Name: "A", Value: "1"}, labelpb.ZLabel{Name: "b", Value: "2"}), + Labels: append(lbls, labelpb.ZLabel{Name: "a", Value: "1"}, labelpb.ZLabel{Name: "b", Value: "1"}, labelpb.ZLabel{Name: "Z", Value: "1"}, labelpb.ZLabel{Name: "b", Value: "2"}), Samples: []prompb.Sample{{Value: 1, Timestamp: 10}}, }, }, @@ -83,6 +83,19 @@ func TestWriter(t *testing.T) { }, expectedErr: errors.Wrapf(labelpb.ErrOutOfOrderLabels, "add 1 series"), }, + "should error out and skip series with duplicate labels": { + reqs: []*prompb.WriteRequest{ + { + Timeseries: []prompb.TimeSeries{ + { + Labels: append(lbls, labelpb.ZLabel{Name: "a", Value: "1"}, labelpb.ZLabel{Name: "b", Value: "1"}, labelpb.ZLabel{Name: "b", Value: "2"}, labelpb.ZLabel{Name: "z", Value: "1"}), + Samples: []prompb.Sample{{Value: 1, Timestamp: 10}}, + }, + }, + }, + }, + expectedErr: errors.Wrapf(labelpb.ErrDuplicateLabels, "add 1 series"), + }, "should error out and skip series with out-of-order labels; accept series with valid labels": { reqs: []*prompb.WriteRequest{ { @@ -260,3 +273,80 @@ func TestWriter(t *testing.T) { }) } } + +func BenchmarkWriterTimeSeriesWithSingleLabel_10(b *testing.B) { benchmarkWriter(b, 1, 10) } +func BenchmarkWriterTimeSeriesWithSingleLabel_100(b *testing.B) { benchmarkWriter(b, 1, 100) } +func BenchmarkWriterTimeSeriesWithSingleLabel_1000(b *testing.B) { benchmarkWriter(b, 1, 1000) } + +func BenchmarkWriterTimeSeriesWith10Labels_10(b *testing.B) { benchmarkWriter(b, 10, 10) } +func BenchmarkWriterTimeSeriesWith10Labels_100(b *testing.B) { benchmarkWriter(b, 10, 100) } +func BenchmarkWriterTimeSeriesWith10Labels_1000(b *testing.B) { benchmarkWriter(b, 10, 1000) } + +func benchmarkWriter(b *testing.B, labelsNum int, seriesNum int) { + dir, err := ioutil.TempDir("", "test") + testutil.Ok(b, err) + defer func() { testutil.Ok(b, os.RemoveAll(dir)) }() + + logger := log.NewNopLogger() + + m := NewMultiTSDB(dir, logger, prometheus.NewRegistry(), &tsdb.Options{ + MinBlockDuration: (2 * time.Hour).Milliseconds(), + MaxBlockDuration: (2 * time.Hour).Milliseconds(), + RetentionDuration: (6 * time.Hour).Milliseconds(), + NoLockfile: true, + MaxExemplars: 0, + EnableExemplarStorage: true, + }, + labels.FromStrings("replica", "01"), + "tenant_id", + nil, + false, + metadata.NoneFunc, + ) + defer func() { testutil.Ok(b, m.Close()) }() + + testutil.Ok(b, m.Flush()) + testutil.Ok(b, m.Open()) + + app, err := m.TenantAppendable("foo") + testutil.Ok(b, err) + + w := NewWriter(logger, m) + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + testutil.Ok(b, runutil.Retry(1*time.Second, ctx.Done(), func() error { + _, err = app.Appender(context.Background()) + return err + })) + + timeSeries := generateLabelsAndSeries(labelsNum, seriesNum) + + wreq := &prompb.WriteRequest{ + Timeseries: timeSeries, + } + + b.ReportAllocs() + b.ResetTimer() + + for i := 0; i < b.N; i++ { + _ = w.Write(ctx, "foo", wreq) + } +} + +func generateLabelsAndSeries(labelsNum int, seriesNum int) []prompb.TimeSeries { + // Generate some labels first. + l := make([]labelpb.ZLabel, 0, labelsNum) + l = append(l, labelpb.ZLabel{Name: "__name__", Value: "test"}) + for i := 0; i < labelsNum; i++ { + l = append(l, labelpb.ZLabel{Name: fmt.Sprintf("label_%q", rune('a'-1+i)), Value: fmt.Sprintf("%d", i)}) + } + + ts := make([]prompb.TimeSeries, 0, seriesNum) + for j := 0; j < seriesNum; j++ { + ts = append(ts, prompb.TimeSeries{Labels: l, Samples: []prompb.Sample{{Value: 1, Timestamp: 10}}}) + } + + return ts +} From 56677acc0b922e7835a3e484c81ec7a015d171e0 Mon Sep 17 00:00:00 2001 From: Matej Gera Date: Fri, 22 Jul 2022 14:04:25 +0200 Subject: [PATCH 10/11] Fix comment in valildation method Signed-off-by: Matej Gera --- pkg/store/labelpb/label.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/store/labelpb/label.go b/pkg/store/labelpb/label.go index 33546e8e3f..f25c7125e6 100644 --- a/pkg/store/labelpb/label.go +++ b/pkg/store/labelpb/label.go @@ -367,7 +367,7 @@ func ValidateLabels(lbls []ZLabel) error { } // Iterate over the rest, check each for empty / duplicates and - // check lexographical ordering. + // check lexicographical (alphabetically) ordering. for _, l := range lbls[1:] { if l.Name == "" || l.Value == "" { return ErrEmptyLabels From bb457753736849b402adc6bc61d7bea970cca398 Mon Sep 17 00:00:00 2001 From: Matej Gera Date: Fri, 5 Aug 2022 15:20:37 +0200 Subject: [PATCH 11/11] Improve error handling Signed-off-by: Matej Gera --- pkg/receive/writer.go | 42 ++++++++++++++++++++++++------------------ 1 file changed, 24 insertions(+), 18 deletions(-) diff --git a/pkg/receive/writer.go b/pkg/receive/writer.go index 93c53e87d3..aa02f7e08b 100644 --- a/pkg/receive/writer.go +++ b/pkg/receive/writer.go @@ -77,8 +77,7 @@ func (r *Writer) Write(ctx context.Context, tenantID string, wreq *prompb.WriteR for _, t := range wreq.Timeseries { // Check if time series labels are valid. If not, skip the time series // and report the error. - err := labelpb.ValidateLabels(t.Labels) - if err != nil { + if err := labelpb.ValidateLabels(t.Labels); err != nil { switch err { case labelpb.ErrOutOfOrderLabels: numLabelsOutOfOrder++ @@ -89,6 +88,8 @@ func (r *Writer) Write(ctx context.Context, tenantID string, wreq *prompb.WriteR case labelpb.ErrEmptyLabels: numLabelsEmpty++ level.Debug(tLogger).Log("msg", "Labels with empty name in the label set", "lset", t.Labels) + default: + level.Debug(tLogger).Log("msg", "Error validating labels", "err", err) } continue @@ -118,6 +119,10 @@ func (r *Writer) Write(ctx context.Context, tenantID string, wreq *prompb.WriteR case storage.ErrOutOfBounds: numSamplesOutOfBounds++ level.Debug(tLogger).Log("msg", "Out of bounds metric", "lset", lset, "value", s.Value, "timestamp", s.Timestamp) + default: + if err != nil { + level.Debug(tLogger).Log("msg", "Error ingesting sample", "err", err) + } } } @@ -126,27 +131,28 @@ func (r *Writer) Write(ctx context.Context, tenantID string, wreq *prompb.WriteR if ref != 0 && len(t.Exemplars) > 0 { for _, ex := range t.Exemplars { exLset := labelpb.ZLabelsToPromLabels(ex.Labels) - logger := log.With(tLogger, "exemplarLset", exLset, "exemplar", ex.String()) + exLogger := log.With(tLogger, "exemplarLset", exLset, "exemplar", ex.String()) - _, err = app.AppendExemplar(ref, lset, exemplar.Exemplar{ + if _, err = app.AppendExemplar(ref, lset, exemplar.Exemplar{ Labels: exLset, Value: ex.Value, Ts: ex.Timestamp, HasTs: true, - }) - switch err { - case storage.ErrOutOfOrderExemplar: - numExemplarsOutOfOrder++ - level.Debug(logger).Log("msg", "Out of order exemplar") - case storage.ErrDuplicateExemplar: - numExemplarsDuplicate++ - level.Debug(logger).Log("msg", "Duplicate exemplar") - case storage.ErrExemplarLabelLength: - numExemplarsLabelLength++ - level.Debug(logger).Log("msg", "Label length for exemplar exceeds max limit", "limit", exemplar.ExemplarMaxLabelSetLength) - default: - if err != nil { - level.Debug(logger).Log("msg", "Error ingesting exemplar", "err", err) + }); err != nil { + switch err { + case storage.ErrOutOfOrderExemplar: + numExemplarsOutOfOrder++ + level.Debug(exLogger).Log("msg", "Out of order exemplar") + case storage.ErrDuplicateExemplar: + numExemplarsDuplicate++ + level.Debug(exLogger).Log("msg", "Duplicate exemplar") + case storage.ErrExemplarLabelLength: + numExemplarsLabelLength++ + level.Debug(exLogger).Log("msg", "Label length for exemplar exceeds max limit", "limit", exemplar.ExemplarMaxLabelSetLength) + default: + if err != nil { + level.Debug(exLogger).Log("msg", "Error ingesting exemplar", "err", err) + } } } }