Skip to content

Commit

Permalink
Porting latest Loki CRI stage code (grafana#3548)
Browse files Browse the repository at this point in the history
  • Loading branch information
aglees committed Apr 20, 2023
1 parent 4f86002 commit 7b40be3
Show file tree
Hide file tree
Showing 4 changed files with 127 additions and 61 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,8 @@ Main (unreleased)

- Fix internal metrics reported as invalid by promtool's linter. (@tpaschalis)

- Fix issues with cri stage which treats partial line coming from any stream as same. (@kavirajk @aglees)

- Operator: fix for running multiple operators with different `--agent-selector` flags. (@captncraig)

- Operator: respect FilterRunning on PodMonitor and ServiceMonitor resources to only scrape running pods. (@captncraig)
Expand Down
64 changes: 42 additions & 22 deletions component/loki/process/internal/stages/extensions.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,13 @@ import (
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
)

// MaxPartialLinesSize is the max buffer size to hold partial lines when
// parsing the CRI stage format.
const MaxPartialLinesSize = 100
const (
RFC3339Nano = "RFC3339Nano"
MaxPartialLinesSize = 100 // MaxPartialLinesSize is the max buffer size to hold partial lines when parsing the CRI stage format.lines.
)

// DockerConfig is an empty struct that is used to enable a pre-defined
// pipeline for decoding entries that are using the Docker logs format.
Expand Down Expand Up @@ -45,7 +47,7 @@ func NewDocker(logger log.Logger, registerer prometheus.Registerer) (Stage, erro
{
TimestampConfig: &TimestampConfig{
Source: "timestamp",
Format: "RFC3339Nano",
Format: RFC3339Nano,
},
},
{
Expand All @@ -59,7 +61,7 @@ func NewDocker(logger log.Logger, registerer prometheus.Registerer) (Stage, erro

type cri struct {
// bounded buffer for CRI-O Partial logs lines (identified with tag `P` till we reach first `F`)
partialLines []string
partialLines map[model.Fingerprint]Entry
maxPartialLines int
base *Pipeline
}
Expand All @@ -69,30 +71,48 @@ func (c *cri) Name() string {
return "cri"
}

// Run implements the Stage interface.
// Run implements Stage interface
func (c *cri) Run(entry chan Entry) chan Entry {
entry = c.base.Run(entry)

in := RunWithSkip(entry, func(e Entry) (Entry, bool) {
in := RunWithSkipOrSendMany(entry, func(e Entry) ([]Entry, bool) {
fingerprint := e.Labels.Fingerprint()

// We received partial-line (tag: "P")
if e.Extracted["flags"] == "P" {
if len(c.partialLines) >= c.maxPartialLines {
if len(c.partialLines) > c.maxPartialLines {
// Merge existing partialLines
newPartialLine := e.Line
e.Line = strings.Join(c.partialLines, "\n")
entries := make([]Entry, 0, len(c.partialLines))
for _, v := range c.partialLines {
entries = append(entries, v)
}

level.Warn(c.base.logger).Log("msg", "cri stage: partial lines upperbound exceeded. merging it to single line", "threshold", MaxPartialLinesSize)
c.partialLines = c.partialLines[:0]
c.partialLines = append(c.partialLines, newPartialLine)
return e, false

c.partialLines = make(map[model.Fingerprint]Entry)
c.partialLines[fingerprint] = e

return entries, false
}
c.partialLines = append(c.partialLines, e.Line)
return e, true

prev, ok := c.partialLines[fingerprint]
if ok {
e.Line = strings.Join([]string{prev.Line, e.Line}, "")
}
c.partialLines[fingerprint] = e

return []Entry{e}, true // it's a partial-line so skip it.
}
if len(c.partialLines) > 0 {
c.partialLines = append(c.partialLines, e.Line)
e.Line = strings.Join(c.partialLines, "\n")
c.partialLines = c.partialLines[:0]

// Now we got full-line (tag: "F").
// 1. If any old partialLines matches with this full-line stream, merge it
// 2. Else just return the full line.
prev, ok := c.partialLines[fingerprint]
if ok {
e.Line = strings.Join([]string{prev.Line, e.Line}, "")
delete(c.partialLines, fingerprint)
}
return e, false
return []Entry{e}, false
})

return in
Expand All @@ -115,7 +135,7 @@ func NewCRI(logger log.Logger, registerer prometheus.Registerer) (Stage, error)
{
TimestampConfig: &TimestampConfig{
Source: "time",
Format: "RFC3339Nano",
Format: RFC3339Nano,
},
},
{
Expand All @@ -139,6 +159,6 @@ func NewCRI(logger log.Logger, registerer prometheus.Registerer) (Stage, error)
maxPartialLines: MaxPartialLinesSize,
base: p,
}
c.partialLines = make([]string, 0, c.maxPartialLines)
c.partialLines = make(map[model.Fingerprint]Entry)
return &c, nil
}
103 changes: 64 additions & 39 deletions component/loki/process/internal/stages/extensions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"time"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

Expand All @@ -25,7 +26,9 @@ var (

func TestNewDocker(t *testing.T) {
loc, err := time.LoadLocation("UTC")
require.NoError(t, err, "could not parse timezone")
if err != nil {
t.Fatal("could not parse timezone", err)
}

tests := map[string]struct {
entry string
Expand Down Expand Up @@ -90,67 +93,78 @@ var (
criTestTime2 = time.Now()
)

func TestCriTags(t *testing.T) {
type testEntry struct {
labels model.LabelSet
line string
}

func TestCRI_tags(t *testing.T) {
cases := []struct {
name string
lines []string
expected []string
maxPartialLines int
entries []testEntry
err error
}{
{
name: "tag F",
lines: []string{
"2019-05-07T18:57:50.904275087+00:00 stdout F some full line",
"2019-05-07T18:57:55.904275087+00:00 stdout F log",
entries: []testEntry{
{line: "2019-05-07T18:57:50.904275087+00:00 stdout F some full line", labels: model.LabelSet{"foo": "bar"}},
{line: "2019-05-07T18:57:55.904275087+00:00 stdout F log", labels: model.LabelSet{"foo": "bar"}},
},
expected: []string{"some full line", "log"},
},
{
name: "tag P",
lines: []string{
"2019-05-07T18:57:50.904275087+00:00 stdout P partial line 1",
"2019-05-07T18:57:50.904275087+00:00 stdout P partial line 2",
"2019-05-07T18:57:55.904275087+00:00 stdout F log finished",
"2019-05-07T18:57:55.904275087+00:00 stdout F another full log",
name: "tag P multi-stream",
entries: []testEntry{
{line: "2019-05-07T18:57:50.904275087+00:00 stdout P partial line 1 ", labels: model.LabelSet{"foo": "bar"}},
{line: "2019-05-07T18:57:50.904275087+00:00 stdout P partial line 2 ", labels: model.LabelSet{"foo": "bar2"}},
{line: "2019-05-07T18:57:55.904275087+00:00 stdout F log finished", labels: model.LabelSet{"foo": "bar"}},
{line: "2019-05-07T18:57:55.904275087+00:00 stdout F another full log", labels: model.LabelSet{"foo": "bar2"}},
},
expected: []string{
"partial line 1\npartial line 2\nlog finished",
"another full log",
"partial line 1 log finished", // belongs to stream `{foo="bar"}`
"partial line 2 another full log", // belongs to stream `{foo="bar2"}
},
},
{
name: "tag P exceeding MaxPartialLinesSize lines",
lines: []string{
"2019-05-07T18:57:50.904275087+00:00 stdout P partial line 1",
"2019-05-07T18:57:50.904275087+00:00 stdout P partial line 2",
"2019-05-07T18:57:50.904275087+00:00 stdout P partial line 3",
"2019-05-07T18:57:50.904275087+00:00 stdout P partial line 4", // this exceeds the `MaxPartialLinesSize` of 3
"2019-05-07T18:57:55.904275087+00:00 stdout F log finished",
"2019-05-07T18:57:55.904275087+00:00 stdout F another full log",
name: "tag P multi-stream with maxPartialLines exceeded",
entries: []testEntry{
{line: "2019-05-07T18:57:50.904275087+00:00 stdout P partial line 1 ", labels: model.LabelSet{"label1": "val1", "label2": "val2"}},

{line: "2019-05-07T18:57:50.904275087+00:00 stdout P partial line 2 ", labels: model.LabelSet{"label1": "val1"}},
{line: "2019-05-07T18:57:50.904275087+00:00 stdout P partial line 3 ", labels: model.LabelSet{"label1": "val1", "label2": "val2"}},
{line: "2019-05-07T18:57:50.904275087+00:00 stdout P partial line 4 ", labels: model.LabelSet{"label1": "val3"}},
{line: "2019-05-07T18:57:50.904275087+00:00 stdout P partial line 5 ", labels: model.LabelSet{"label1": "val4"}}, // exceeded maxPartialLines as already 3 streams in flight.
{line: "2019-05-07T18:57:55.904275087+00:00 stdout F log finished", labels: model.LabelSet{"label1": "val1", "label2": "val2"}},
{line: "2019-05-07T18:57:55.904275087+00:00 stdout F another full log", labels: model.LabelSet{"label1": "val3"}},
{line: "2019-05-07T18:57:55.904275087+00:00 stdout F yet an another full log", labels: model.LabelSet{"label1": "val4"}},
},
maxPartialLines: 3,
maxPartialLines: 2,
expected: []string{
"partial line 1\npartial line 2\npartial line 3",
"partial line 4\nlog finished",
"partial line 1 partial line 3 ",
"partial line 2 ",
"partial line 4 ",
"log finished",
"another full log",
"partial line 5 yet an another full log",
},
},
{
name: "panic",
lines: []string{
"2019-05-07T18:57:50.904275087+00:00 stdout P panic: I'm pannicing",
"2019-05-07T18:57:50.904275087+00:00 stdout P ",
"2019-05-07T18:57:50.904275087+00:00 stdout P goroutine 1 [running]:",
"2019-05-07T18:57:55.904275087+00:00 stdout P main.main()",
"2019-05-07T18:57:55.904275087+00:00 stdout F /home/kavirajk/src/go-play/main.go:11 +0x27",
name: "tag P single stream",
entries: []testEntry{
{line: "2019-05-07T18:57:50.904275087+00:00 stdout P partial line 1 ", labels: model.LabelSet{"foo": "bar"}},
{line: "2019-05-07T18:57:50.904275087+00:00 stdout P partial line 2 ", labels: model.LabelSet{"foo": "bar"}},
{line: "2019-05-07T18:57:50.904275087+00:00 stdout P partial line 3 ", labels: model.LabelSet{"foo": "bar"}},
{line: "2019-05-07T18:57:50.904275087+00:00 stdout P partial line 4 ", labels: model.LabelSet{"foo": "bar"}}, // this exceeds the `MaxPartialLinesSize` of 3
{line: "2019-05-07T18:57:55.904275087+00:00 stdout F log finished", labels: model.LabelSet{"foo": "bar"}},
{line: "2019-05-07T18:57:55.904275087+00:00 stdout F another full log", labels: model.LabelSet{"foo": "bar"}},
},
maxPartialLines: 3,
expected: []string{
`panic: I'm pannicing
goroutine 1 [running]:
main.main()
/home/kavirajk/src/go-play/main.go:11 +0x27`,
"partial line 1 partial line 2 partial line 3 partial line 4 log finished",
"another full log",
},
},
}
Expand All @@ -167,15 +181,26 @@ main.main()
p.(*cri).maxPartialLines = tt.maxPartialLines
}

for _, line := range tt.lines {
out := processEntries(p, newEntry(nil, nil, line, time.Now()))
for _, entry := range tt.entries {
out := processEntries(p, newEntry(nil, entry.labels, entry.line, time.Now()))
if len(out) > 0 {
for _, en := range out {
got = append(got, en.Line)
}
}
}
assert.Equal(t, tt.expected, got)

expectedMap := make(map[string]bool)
for _, v := range tt.expected {
expectedMap[v] = true
}

gotMap := make(map[string]bool)
for _, v := range got {
gotMap[v] = true
}

assert.Equal(t, expectedMap, gotMap)
})
}
}
Expand Down
19 changes: 19 additions & 0 deletions component/loki/process/internal/stages/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,25 @@ func RunWithSkip(input chan Entry, process func(e Entry) (Entry, bool)) chan Ent
return out
}

// RunWithSkiporSendMany same as RunWithSkip, except it can either skip sending it to output channel, if `process` functions returns `skip` true. Or send many entries.
func RunWithSkipOrSendMany(input chan Entry, process func(e Entry) ([]Entry, bool)) chan Entry {
out := make(chan Entry)
go func() {
defer close(out)
for e := range input {
results, skip := process(e)
if skip {
continue
}
for _, result := range results {
out <- result
}
}
}()

return out
}

// Run implements Stage
func (p *Pipeline) Run(in chan Entry) chan Entry {
in = RunWith(in, func(e Entry) Entry {
Expand Down

0 comments on commit 7b40be3

Please sign in to comment.