Skip to content

Commit

Permalink
fix(promtail): Handle docker logs when a log is split in multiple fra…
Browse files Browse the repository at this point in the history
…mes (#12374)
  • Loading branch information
jonaslb committed Apr 26, 2024
1 parent 76ba24e commit c0113db
Show file tree
Hide file tree
Showing 13 changed files with 637 additions and 72 deletions.
1 change: 1 addition & 0 deletions LICENSING.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ The following folders and their subfolders are licensed under Apache-2.0:

```
clients/
pkg/framedstdcopy/
pkg/ingester/wal
pkg/logproto/
pkg/loghttp/
Expand Down
2 changes: 1 addition & 1 deletion clients/pkg/promtail/promtail.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ func (p *Promtail) reloadConfig(cfg *config.Config) error {
entryHandlers = append(entryHandlers, p.client)
p.entriesFanout = utils.NewFanoutEntryHandler(timeoutUntilFanoutHardStop, entryHandlers...)

tms, err := targets.NewTargetManagers(p, p.reg, p.logger, cfg.PositionsConfig, p.entriesFanout, cfg.ScrapeConfig, &cfg.TargetConfig, cfg.Global.FileWatch)
tms, err := targets.NewTargetManagers(p, p.reg, p.logger, cfg.PositionsConfig, p.entriesFanout, cfg.ScrapeConfig, &cfg.TargetConfig, cfg.Global.FileWatch, &cfg.LimitsConfig)
if err != nil {
return err
}
Expand Down
162 changes: 101 additions & 61 deletions clients/pkg/promtail/targets/docker/target.go
Original file line number Diff line number Diff line change
@@ -1,18 +1,15 @@
package docker

import (
"bufio"
"context"
"fmt"
"io"
"strconv"
"strings"
"sync"
"time"

"github.com/docker/docker/api/types/container"
"github.com/docker/docker/client"
"github.com/docker/docker/pkg/stdcopy"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/prometheus/common/model"
Expand All @@ -24,6 +21,7 @@ import (
"github.com/grafana/loki/v3/clients/pkg/promtail/positions"
"github.com/grafana/loki/v3/clients/pkg/promtail/targets/target"

"github.com/grafana/loki/v3/pkg/framedstdcopy"
"github.com/grafana/loki/v3/pkg/logproto"
)

Expand All @@ -36,6 +34,7 @@ type Target struct {
labels model.LabelSet
relabelConfig []*relabel.Config
metrics *Metrics
maxLineSize int

cancel context.CancelFunc
client client.APIClient
Expand All @@ -53,6 +52,7 @@ func NewTarget(
labels model.LabelSet,
relabelConfig []*relabel.Config,
client client.APIClient,
maxLineSize int,
) (*Target, error) {

pos, err := position.Get(positions.CursorKey(containerName))
Expand All @@ -73,6 +73,7 @@ func NewTarget(
labels: labels,
relabelConfig: relabelConfig,
metrics: metrics,
maxLineSize: maxLineSize,

client: client,
running: atomic.NewBool(false),
Expand Down Expand Up @@ -109,22 +110,22 @@ func (t *Target) processLoop(ctx context.Context) {
}

// Start transferring
rstdout, wstdout := io.Pipe()
rstderr, wstderr := io.Pipe()
cstdout := make(chan []byte)
cstderr := make(chan []byte)
t.wg.Add(1)
go func() {
defer func() {
t.wg.Done()
wstdout.Close()
wstderr.Close()
close(cstdout)
close(cstderr)
t.Stop()
}()
var written int64
var err error
if inspectInfo.Config.Tty {
written, err = io.Copy(wstdout, logs)
written, err = framedstdcopy.NoHeaderFramedStdCopy(cstdout, logs)
} else {
written, err = stdcopy.StdCopy(wstdout, wstderr, logs)
written, err = framedstdcopy.FramedStdCopy(cstdout, cstderr, logs)
}
if err != nil {
level.Warn(t.logger).Log("msg", "could not transfer logs", "written", written, "container", t.containerName, "err", err)
Expand All @@ -135,8 +136,8 @@ func (t *Target) processLoop(ctx context.Context) {

// Start processing
t.wg.Add(2)
go t.process(rstdout, "stdout")
go t.process(rstderr, "stderr")
go t.process(cstdout, "stdout")
go t.process(cstderr, "stderr")

// Wait until done
<-ctx.Done()
Expand All @@ -149,81 +150,120 @@ func (t *Target) processLoop(ctx context.Context) {
func extractTs(line string) (time.Time, string, error) {
pair := strings.SplitN(line, " ", 2)
if len(pair) != 2 {
return time.Now(), line, fmt.Errorf("Could not find timestamp in '%s'", line)
return time.Now(), line, fmt.Errorf("could not find timestamp in '%s'", line)
}
ts, err := time.Parse("2006-01-02T15:04:05.999999999Z07:00", pair[0])
if err != nil {
return time.Now(), line, fmt.Errorf("Could not parse timestamp from '%s': %w", pair[0], err)
return time.Now(), line, fmt.Errorf("could not parse timestamp from '%s': %w", pair[0], err)
}
return ts, pair[1], nil
}

// https://devmarkpro.com/working-big-files-golang
func readLine(r *bufio.Reader) (string, error) {
func (t *Target) process(frames chan []byte, logStream string) {
defer func() {
t.wg.Done()
}()

var (
isPrefix = true
err error
line, ln []byte
sizeLimit = t.maxLineSize
discardRemainingLine = false
payloadAcc strings.Builder
curTs = time.Now()
)

for isPrefix && err == nil {
line, isPrefix, err = r.ReadLine()
ln = append(ln, line...)
// If max_line_size is disabled (set to 0), we can in theory have infinite buffer growth.
// We can't guarantee that there's any bound on Docker logs, they could be an infinite stream
// without newlines for all we know. To protect promtail from OOM in that case, we introduce
// this safety limit into the Docker target, inspired by the default Loki max_line_size value:
// https://grafana.com/docs/loki/latest/configure/#limits_config
if sizeLimit == 0 {
sizeLimit = 256 * 1024
}

return string(ln), err
}

func (t *Target) process(r io.Reader, logStream string) {
defer func() {
t.wg.Done()
}()

reader := bufio.NewReader(r)
for {
line, err := readLine(reader)
for frame := range frames {
// Split frame into timestamp and payload
ts, payload, err := extractTs(string(frame))
if err != nil {
if err == io.EOF {
break
if payloadAcc.Len() == 0 {
// If we are currently accumulating a line split over multiple frames, we would still expect
// timestamps in every frame, but since we don't use those secondary ones, we don't log an error in that case.
level.Error(t.logger).Log("msg", "error reading docker log line, skipping line", "err", err)
t.metrics.dockerErrors.Inc()
continue
}
level.Error(t.logger).Log("msg", "error reading docker log line, skipping line", "err", err)
t.metrics.dockerErrors.Inc()
ts = curTs
}

ts, line, err := extractTs(line)
if err != nil {
level.Error(t.logger).Log("msg", "could not extract timestamp, skipping line", "err", err)
t.metrics.dockerErrors.Inc()
// If time has changed, we are looking at a new event (although we should have seen a new line..),
// so flush the buffer if we have one.
if ts != curTs {
discardRemainingLine = false
if payloadAcc.Len() > 0 {
t.handleOutput(logStream, curTs, payloadAcc.String())
payloadAcc.Reset()
}
}

// Check if we have the end of the event
var isEol = strings.HasSuffix(payload, "\n")

// If we are currently discarding a line (due to size limits), skip ahead, but don't skip the next
// frame if we saw the end of the line.
if discardRemainingLine {
discardRemainingLine = !isEol
continue
}

// Add all labels from the config, relabel and filter them.
lb := labels.NewBuilder(nil)
for k, v := range t.labels {
lb.Set(string(k), string(v))
// Strip newline ending if we have it
payload = strings.TrimRight(payload, "\r\n")

// Fast path: Most log lines are a single frame. If we have a full line in frame and buffer is empty,
// then don't use the buffer at all.
if payloadAcc.Len() == 0 && isEol {
t.handleOutput(logStream, ts, payload)
continue
}
lb.Set(dockerLabelLogStream, logStream)
processed, _ := relabel.Process(lb.Labels(), t.relabelConfig...)

filtered := make(model.LabelSet)
for _, lbl := range processed {
if strings.HasPrefix(lbl.Name, "__") {
continue
}
filtered[model.LabelName(lbl.Name)] = model.LabelValue(lbl.Value)
// Add to buffer
payloadAcc.WriteString(payload)
curTs = ts

// Send immediately if line ended or we built a very large event
if isEol || payloadAcc.Len() > sizeLimit {
discardRemainingLine = !isEol
t.handleOutput(logStream, curTs, payloadAcc.String())
payloadAcc.Reset()
}
}
}

func (t *Target) handleOutput(logStream string, ts time.Time, payload string) {
// Add all labels from the config, relabel and filter them.
lb := labels.NewBuilder(nil)
for k, v := range t.labels {
lb.Set(string(k), string(v))
}
lb.Set(dockerLabelLogStream, logStream)
processed, _ := relabel.Process(lb.Labels(), t.relabelConfig...)

t.handler.Chan() <- api.Entry{
Labels: filtered,
Entry: logproto.Entry{
Timestamp: ts,
Line: line,
},
filtered := make(model.LabelSet)
for _, lbl := range processed {
if strings.HasPrefix(lbl.Name, "__") {
continue
}
t.metrics.dockerEntries.Inc()
t.positions.Put(positions.CursorKey(t.containerName), ts.Unix())
t.since = ts.Unix()
filtered[model.LabelName(lbl.Name)] = model.LabelValue(lbl.Value)
}

t.handler.Chan() <- api.Entry{
Labels: filtered,
Entry: logproto.Entry{
Timestamp: ts,
Line: payload,
},
}
t.metrics.dockerEntries.Inc()
t.positions.Put(positions.CursorKey(t.containerName), ts.Unix())
t.since = ts.Unix()
}

// startIfNotRunning starts processing container logs. The operation is idempotent , i.e. the processing cannot be started twice.
Expand Down
2 changes: 2 additions & 0 deletions clients/pkg/promtail/targets/docker/target_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ type targetGroup struct {
httpClientConfig config.HTTPClientConfig
client client.APIClient
refreshInterval model.Duration
maxLineSize int

mtx sync.Mutex
targets map[string]*Target
Expand Down Expand Up @@ -120,6 +121,7 @@ func (tg *targetGroup) addTarget(id string, discoveredLabels model.LabelSet) err
discoveredLabels.Merge(tg.defaultLabels),
tg.relabelConfig,
tg.client,
tg.maxLineSize,
)
if err != nil {
return err
Expand Down
Loading

0 comments on commit c0113db

Please sign in to comment.