diff --git a/CHANGELOG.md b/CHANGELOG.md index 877e3a9c1a81..3a36b09ee813 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -174,6 +174,8 @@ Main (unreleased) - Fix internal metrics reported as invalid by promtool's linter. (@tpaschalis) +- Fix issues with cri stage which treats partial line coming from any stream as same. (@kavirajk @aglees) + - Operator: fix for running multiple operators with different `--agent-selector` flags. (@captncraig) - Operator: respect FilterRunning on PodMonitor and ServiceMonitor resources to only scrape running pods. (@captncraig) diff --git a/component/loki/process/internal/stages/extensions.go b/component/loki/process/internal/stages/extensions.go index 6db6fb3541e5..30480333612f 100644 --- a/component/loki/process/internal/stages/extensions.go +++ b/component/loki/process/internal/stages/extensions.go @@ -10,11 +10,13 @@ import ( "github.com/go-kit/log" "github.com/go-kit/log/level" "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/common/model" ) -// MaxPartialLinesSize is the max buffer size to hold partial lines when -// parsing the CRI stage format. -const MaxPartialLinesSize = 100 +const ( + RFC3339Nano = "RFC3339Nano" + MaxPartialLinesSize = 100 // MaxPartialLinesSize is the max buffer size to hold partial lines when parsing the CRI stage format.lines. +) // DockerConfig is an empty struct that is used to enable a pre-defined // pipeline for decoding entries that are using the Docker logs format. @@ -45,7 +47,7 @@ func NewDocker(logger log.Logger, registerer prometheus.Registerer) (Stage, erro { TimestampConfig: &TimestampConfig{ Source: "timestamp", - Format: "RFC3339Nano", + Format: RFC3339Nano, }, }, { @@ -59,7 +61,7 @@ func NewDocker(logger log.Logger, registerer prometheus.Registerer) (Stage, erro type cri struct { // bounded buffer for CRI-O Partial logs lines (identified with tag `P` till we reach first `F`) - partialLines []string + partialLines map[model.Fingerprint]Entry maxPartialLines int base *Pipeline } @@ -69,30 +71,48 @@ func (c *cri) Name() string { return "cri" } -// Run implements the Stage interface. +// Run implements Stage interface func (c *cri) Run(entry chan Entry) chan Entry { entry = c.base.Run(entry) - in := RunWithSkip(entry, func(e Entry) (Entry, bool) { + in := RunWithSkipOrSendMany(entry, func(e Entry) ([]Entry, bool) { + fingerprint := e.Labels.Fingerprint() + + // We received partial-line (tag: "P") if e.Extracted["flags"] == "P" { - if len(c.partialLines) >= c.maxPartialLines { + if len(c.partialLines) > c.maxPartialLines { // Merge existing partialLines - newPartialLine := e.Line - e.Line = strings.Join(c.partialLines, "\n") + entries := make([]Entry, 0, len(c.partialLines)) + for _, v := range c.partialLines { + entries = append(entries, v) + } + level.Warn(c.base.logger).Log("msg", "cri stage: partial lines upperbound exceeded. merging it to single line", "threshold", MaxPartialLinesSize) - c.partialLines = c.partialLines[:0] - c.partialLines = append(c.partialLines, newPartialLine) - return e, false + + c.partialLines = make(map[model.Fingerprint]Entry) + c.partialLines[fingerprint] = e + + return entries, false } - c.partialLines = append(c.partialLines, e.Line) - return e, true + + prev, ok := c.partialLines[fingerprint] + if ok { + e.Line = strings.Join([]string{prev.Line, e.Line}, "") + } + c.partialLines[fingerprint] = e + + return []Entry{e}, true // it's a partial-line so skip it. } - if len(c.partialLines) > 0 { - c.partialLines = append(c.partialLines, e.Line) - e.Line = strings.Join(c.partialLines, "\n") - c.partialLines = c.partialLines[:0] + + // Now we got full-line (tag: "F"). + // 1. If any old partialLines matches with this full-line stream, merge it + // 2. Else just return the full line. + prev, ok := c.partialLines[fingerprint] + if ok { + e.Line = strings.Join([]string{prev.Line, e.Line}, "") + delete(c.partialLines, fingerprint) } - return e, false + return []Entry{e}, false }) return in @@ -115,7 +135,7 @@ func NewCRI(logger log.Logger, registerer prometheus.Registerer) (Stage, error) { TimestampConfig: &TimestampConfig{ Source: "time", - Format: "RFC3339Nano", + Format: RFC3339Nano, }, }, { @@ -139,6 +159,6 @@ func NewCRI(logger log.Logger, registerer prometheus.Registerer) (Stage, error) maxPartialLines: MaxPartialLinesSize, base: p, } - c.partialLines = make([]string, 0, c.maxPartialLines) + c.partialLines = make(map[model.Fingerprint]Entry) return &c, nil } diff --git a/component/loki/process/internal/stages/extensions_test.go b/component/loki/process/internal/stages/extensions_test.go index a0000c331e9a..9eaa61717489 100644 --- a/component/loki/process/internal/stages/extensions_test.go +++ b/component/loki/process/internal/stages/extensions_test.go @@ -9,6 +9,7 @@ import ( "time" "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/common/model" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -25,7 +26,9 @@ var ( func TestNewDocker(t *testing.T) { loc, err := time.LoadLocation("UTC") - require.NoError(t, err, "could not parse timezone") + if err != nil { + t.Fatal("could not parse timezone", err) + } tests := map[string]struct { entry string @@ -90,67 +93,78 @@ var ( criTestTime2 = time.Now() ) -func TestCriTags(t *testing.T) { +type testEntry struct { + labels model.LabelSet + line string +} + +func TestCRI_tags(t *testing.T) { cases := []struct { name string lines []string expected []string maxPartialLines int + entries []testEntry err error }{ { name: "tag F", - lines: []string{ - "2019-05-07T18:57:50.904275087+00:00 stdout F some full line", - "2019-05-07T18:57:55.904275087+00:00 stdout F log", + entries: []testEntry{ + {line: "2019-05-07T18:57:50.904275087+00:00 stdout F some full line", labels: model.LabelSet{"foo": "bar"}}, + {line: "2019-05-07T18:57:55.904275087+00:00 stdout F log", labels: model.LabelSet{"foo": "bar"}}, }, expected: []string{"some full line", "log"}, }, { - name: "tag P", - lines: []string{ - "2019-05-07T18:57:50.904275087+00:00 stdout P partial line 1", - "2019-05-07T18:57:50.904275087+00:00 stdout P partial line 2", - "2019-05-07T18:57:55.904275087+00:00 stdout F log finished", - "2019-05-07T18:57:55.904275087+00:00 stdout F another full log", + name: "tag P multi-stream", + entries: []testEntry{ + {line: "2019-05-07T18:57:50.904275087+00:00 stdout P partial line 1 ", labels: model.LabelSet{"foo": "bar"}}, + {line: "2019-05-07T18:57:50.904275087+00:00 stdout P partial line 2 ", labels: model.LabelSet{"foo": "bar2"}}, + {line: "2019-05-07T18:57:55.904275087+00:00 stdout F log finished", labels: model.LabelSet{"foo": "bar"}}, + {line: "2019-05-07T18:57:55.904275087+00:00 stdout F another full log", labels: model.LabelSet{"foo": "bar2"}}, }, expected: []string{ - "partial line 1\npartial line 2\nlog finished", - "another full log", + "partial line 1 log finished", // belongs to stream `{foo="bar"}` + "partial line 2 another full log", // belongs to stream `{foo="bar2"} }, }, { - name: "tag P exceeding MaxPartialLinesSize lines", - lines: []string{ - "2019-05-07T18:57:50.904275087+00:00 stdout P partial line 1", - "2019-05-07T18:57:50.904275087+00:00 stdout P partial line 2", - "2019-05-07T18:57:50.904275087+00:00 stdout P partial line 3", - "2019-05-07T18:57:50.904275087+00:00 stdout P partial line 4", // this exceeds the `MaxPartialLinesSize` of 3 - "2019-05-07T18:57:55.904275087+00:00 stdout F log finished", - "2019-05-07T18:57:55.904275087+00:00 stdout F another full log", + name: "tag P multi-stream with maxPartialLines exceeded", + entries: []testEntry{ + {line: "2019-05-07T18:57:50.904275087+00:00 stdout P partial line 1 ", labels: model.LabelSet{"label1": "val1", "label2": "val2"}}, + + {line: "2019-05-07T18:57:50.904275087+00:00 stdout P partial line 2 ", labels: model.LabelSet{"label1": "val1"}}, + {line: "2019-05-07T18:57:50.904275087+00:00 stdout P partial line 3 ", labels: model.LabelSet{"label1": "val1", "label2": "val2"}}, + {line: "2019-05-07T18:57:50.904275087+00:00 stdout P partial line 4 ", labels: model.LabelSet{"label1": "val3"}}, + {line: "2019-05-07T18:57:50.904275087+00:00 stdout P partial line 5 ", labels: model.LabelSet{"label1": "val4"}}, // exceeded maxPartialLines as already 3 streams in flight. + {line: "2019-05-07T18:57:55.904275087+00:00 stdout F log finished", labels: model.LabelSet{"label1": "val1", "label2": "val2"}}, + {line: "2019-05-07T18:57:55.904275087+00:00 stdout F another full log", labels: model.LabelSet{"label1": "val3"}}, + {line: "2019-05-07T18:57:55.904275087+00:00 stdout F yet an another full log", labels: model.LabelSet{"label1": "val4"}}, }, - maxPartialLines: 3, + maxPartialLines: 2, expected: []string{ - "partial line 1\npartial line 2\npartial line 3", - "partial line 4\nlog finished", + "partial line 1 partial line 3 ", + "partial line 2 ", + "partial line 4 ", + "log finished", "another full log", + "partial line 5 yet an another full log", }, }, { - name: "panic", - lines: []string{ - "2019-05-07T18:57:50.904275087+00:00 stdout P panic: I'm pannicing", - "2019-05-07T18:57:50.904275087+00:00 stdout P ", - "2019-05-07T18:57:50.904275087+00:00 stdout P goroutine 1 [running]:", - "2019-05-07T18:57:55.904275087+00:00 stdout P main.main()", - "2019-05-07T18:57:55.904275087+00:00 stdout F /home/kavirajk/src/go-play/main.go:11 +0x27", + name: "tag P single stream", + entries: []testEntry{ + {line: "2019-05-07T18:57:50.904275087+00:00 stdout P partial line 1 ", labels: model.LabelSet{"foo": "bar"}}, + {line: "2019-05-07T18:57:50.904275087+00:00 stdout P partial line 2 ", labels: model.LabelSet{"foo": "bar"}}, + {line: "2019-05-07T18:57:50.904275087+00:00 stdout P partial line 3 ", labels: model.LabelSet{"foo": "bar"}}, + {line: "2019-05-07T18:57:50.904275087+00:00 stdout P partial line 4 ", labels: model.LabelSet{"foo": "bar"}}, // this exceeds the `MaxPartialLinesSize` of 3 + {line: "2019-05-07T18:57:55.904275087+00:00 stdout F log finished", labels: model.LabelSet{"foo": "bar"}}, + {line: "2019-05-07T18:57:55.904275087+00:00 stdout F another full log", labels: model.LabelSet{"foo": "bar"}}, }, + maxPartialLines: 3, expected: []string{ - `panic: I'm pannicing - -goroutine 1 [running]: -main.main() - /home/kavirajk/src/go-play/main.go:11 +0x27`, + "partial line 1 partial line 2 partial line 3 partial line 4 log finished", + "another full log", }, }, } @@ -167,15 +181,26 @@ main.main() p.(*cri).maxPartialLines = tt.maxPartialLines } - for _, line := range tt.lines { - out := processEntries(p, newEntry(nil, nil, line, time.Now())) + for _, entry := range tt.entries { + out := processEntries(p, newEntry(nil, entry.labels, entry.line, time.Now())) if len(out) > 0 { for _, en := range out { got = append(got, en.Line) } } } - assert.Equal(t, tt.expected, got) + + expectedMap := make(map[string]bool) + for _, v := range tt.expected { + expectedMap[v] = true + } + + gotMap := make(map[string]bool) + for _, v := range got { + gotMap[v] = true + } + + assert.Equal(t, expectedMap, gotMap) }) } } diff --git a/component/loki/process/internal/stages/pipeline.go b/component/loki/process/internal/stages/pipeline.go index 672796ba720c..3e8224a81fdd 100644 --- a/component/loki/process/internal/stages/pipeline.go +++ b/component/loki/process/internal/stages/pipeline.go @@ -105,6 +105,25 @@ func RunWithSkip(input chan Entry, process func(e Entry) (Entry, bool)) chan Ent return out } +// RunWithSkiporSendMany same as RunWithSkip, except it can either skip sending it to output channel, if `process` functions returns `skip` true. Or send many entries. +func RunWithSkipOrSendMany(input chan Entry, process func(e Entry) ([]Entry, bool)) chan Entry { + out := make(chan Entry) + go func() { + defer close(out) + for e := range input { + results, skip := process(e) + if skip { + continue + } + for _, result := range results { + out <- result + } + } + }() + + return out +} + // Run implements Stage func (p *Pipeline) Run(in chan Entry) chan Entry { in = RunWith(in, func(e Entry) Entry {