Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix issue with unnecessary config reload in static mode #6977

Merged
merged 4 commits into from
Jul 11, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Fix issue with unnecessary config reload
  • Loading branch information
ptodev committed Jul 9, 2024
commit 8a26cb4c9c0b4763a8df1f80cc4a93fe7f1b5b8a
13 changes: 11 additions & 2 deletions static/logs/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/grafana/loki/clients/pkg/promtail/wal"
"github.com/grafana/loki/pkg/tracing"
"github.com/prometheus/client_golang/prometheus"
"gopkg.in/yaml.v2"
)

func init() {
Expand Down Expand Up @@ -121,6 +122,8 @@ type Instance struct {
log log.Logger
reg *util.Unregisterer

previousConfig string

promtail *promtail.Promtail
}

Expand Down Expand Up @@ -155,14 +158,20 @@ func (i *Instance) ApplyConfig(c *InstanceConfig, g GlobalConfig, dryRun bool) e
defer i.mut.Unlock()

// No-op if the configs haven't changed.
if util.CompareYAML(c, i.cfg) {
newConfigByteArr, err := yaml.Marshal(c)
if err != nil {
return fmt.Errorf("failed to marshal new logs instance config: %w", err)
}
newConfig := string(newConfigByteArr[:])
if newConfig == i.previousConfig {
level.Debug(i.log).Log("msg", "instance config hasn't changed, not recreating Promtail")
return nil
}
i.previousConfig = newConfig
i.cfg = c

positionsDir := filepath.Dir(c.PositionsConfig.PositionsFile)
err := os.MkdirAll(positionsDir, 0775)
err = os.MkdirAll(positionsDir, 0775)
if err != nil {
level.Warn(i.log).Log("msg", "failed to create the positions directory. logs may be unable to save their position", "path", positionsDir, "err", err)
}
Expand Down
31 changes: 30 additions & 1 deletion static/logs/logs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
package logs

import (
"bytes"
"fmt"
"net"
"net/http"
Expand Down Expand Up @@ -30,6 +31,12 @@ func TestLogs_NilConfig(t *testing.T) {
defer l.Stop()
}

func checkConfigReloadLog(t *testing.T, logs string, expectedOccurances int) {
logLine := `level=debug component=logs logs_config=default msg="instance config hasn't changed, not recreating Promtail"`
actualOccurances := strings.Count(logs, logLine)
require.Equal(t, expectedOccurances, actualOccurances)
}

func TestLogs(t *testing.T) {
//
// Create a temporary file to tail
Expand Down Expand Up @@ -87,7 +94,8 @@ configs:
dec.SetStrict(true)
require.NoError(t, dec.Decode(&cfg))
require.NoError(t, cfg.ApplyDefaults())
logger := log.NewSyncLogger(log.NewNopLogger())
logBuffer := bytes.Buffer{}
logger := log.NewSyncLogger(log.NewLogfmtLogger(&logBuffer))
l, err := New(prometheus.NewRegistry(), &cfg, logger, false)
require.NoError(t, err)
defer l.Stop()
Expand All @@ -103,6 +111,23 @@ configs:
require.Equal(t, "Hello, world!", req.Streams[0].Entries[0].Line)
}

// The config did change.
// We expect the config reload log line to not be printed.
checkConfigReloadLog(t, logBuffer.String(), 0)

//
// Apply the same config and try reloading.
// Recreate the config struct to make sure it's clean.
//
var sameCfg Config
dec = yaml.NewDecoder(strings.NewReader(cfgText))
dec.SetStrict(true)
require.NoError(t, dec.Decode(&sameCfg))
require.NoError(t, sameCfg.ApplyDefaults())
require.NoError(t, l.ApplyConfig(&sameCfg, false))

checkConfigReloadLog(t, logBuffer.String(), 1)

//
// Apply a new config and write a new line.
//
Expand Down Expand Up @@ -138,6 +163,10 @@ configs:
require.Equal(t, "Hello again!", req.Streams[0].Entries[0].Line)
}

// The config did change this time.
// We expect the config reload log line to not be printed again.
checkConfigReloadLog(t, logBuffer.String(), 1)

t.Run("update to nil", func(t *testing.T) {
// Applying a nil config should remove all instances.
err := l.ApplyConfig(nil, false)
Expand Down
12 changes: 11 additions & 1 deletion static/metrics/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/atomic"
"google.golang.org/grpc"
"gopkg.in/yaml.v2"

"github.com/grafana/agent/internal/util"
"github.com/grafana/agent/static/metrics/cluster"
Expand Down Expand Up @@ -150,6 +151,8 @@ type Agent struct {
actor chan func()

initialBootDone atomic.Bool

previousConfig string
}

// New creates and starts a new Agent.
Expand Down Expand Up @@ -227,9 +230,16 @@ func (a *Agent) ApplyConfig(cfg Config) error {
a.mut.Lock()
defer a.mut.Unlock()

if util.CompareYAML(a.cfg, cfg) {
newConfigByteArr, err := yaml.Marshal(cfg)
if err != nil {
return fmt.Errorf("failed to marshal new config: %w", err)
}
newConfig := string(newConfigByteArr[:])
ptodev marked this conversation as resolved.
Show resolved Hide resolved
if newConfig == a.previousConfig {
level.Debug(a.logger).Log("msg", "not recreating metrics instance because config hasn't changed")
return nil
}
a.previousConfig = newConfig
ptodev marked this conversation as resolved.
Show resolved Hide resolved

if a.stopped {
return fmt.Errorf("agent stopped")
Expand Down
93 changes: 93 additions & 0 deletions static/metrics/agent_test.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
package metrics

import (
"bytes"
"context"
"errors"
"fmt"
"net/http"
"strings"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -113,6 +115,97 @@ configs:
require.Greater(t, int64(scrapeConfig.ScrapeInterval), int64(0))
}

func checkConfigReloadLog(t *testing.T, logs string, expectedOccurances int) {
logLine := `level=debug agent=prometheus msg="not recreating metrics instance because config hasn't changed"`
actualOccurances := strings.Count(logs, logLine)
require.Equal(t, expectedOccurances, actualOccurances)
}

func TestConfigReload(t *testing.T) {
cfgText := `
wal_directory: /tmp/wal
configs:
- name: instance_a
scrape_configs:
- job_name: 'node'
static_configs:
- targets: ['localhost:9100']
`
var cfg Config

err := yaml.Unmarshal([]byte(cfgText), &cfg)
require.NoError(t, err)
err = cfg.ApplyDefaults()
require.NoError(t, err)

fact := newFakeInstanceFactory()

logBuffer := bytes.Buffer{}
logger := log.NewLogfmtLogger(&logBuffer)

a, err := newAgent(prometheus.NewRegistry(), cfg, logger, fact.factory)
require.NoError(t, err)

util.Eventually(t, func(t require.TestingT) {
require.NotNil(t, fact.created)
require.Equal(t, 1, int(fact.created.Load()))
require.Equal(t, 1, len(a.mm.ListInstances()))
})

t.Run("instances should be running", func(t *testing.T) {
for _, mi := range fact.Mocks() {
// Each instance should have wait called on it
util.Eventually(t, func(t require.TestingT) {
require.True(t, mi.running.Load())
})
}
})

//
// The config has changed (it used to be ""). The log line won't be printed.
//
checkConfigReloadLog(t, logBuffer.String(), 0)

var sameCfg Config

//
// Try the same config.
//
err = yaml.Unmarshal([]byte(cfgText), &sameCfg)
require.NoError(t, err)
err = sameCfg.ApplyDefaults()
require.NoError(t, err)

a.ApplyConfig(sameCfg)

// The config did not change. The log line should be printed.
checkConfigReloadLog(t, logBuffer.String(), 1)

//
// Try a different config.
//
cfgText = `
wal_directory: /tmp/wal
configs:
- name: instance_b
scrape_configs:
- job_name: 'node'
static_configs:
- targets: ['localhost:9100']
`
var differentCfg Config

err = yaml.Unmarshal([]byte(cfgText), &differentCfg)
require.NoError(t, err)
err = differentCfg.ApplyDefaults()
require.NoError(t, err)

a.ApplyConfig(differentCfg)

// The config has changed. The log line won't be printed.
checkConfigReloadLog(t, logBuffer.String(), 1)
}

func TestAgent(t *testing.T) {
// Launch two instances
cfg := Config{
Expand Down
1 change: 1 addition & 0 deletions static/traces/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ func (i *Instance) ApplyConfig(logsSubsystem *logs.Logs, promInstanceManager ins

if util.CompareYAML(cfg, i.cfg) {
// No config change
i.logger.Debug("tracing config won't be recreated because it hasn't changed")
return nil
}
i.cfg = cfg
Expand Down
90 changes: 86 additions & 4 deletions static/traces/traces_test.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
package traces

import (
"bytes"
"fmt"
"strings"
"testing"
"time"

"github.com/go-kit/log"
"github.com/grafana/agent/internal/util"
"github.com/grafana/agent/static/server"
"github.com/grafana/agent/static/traces/traceutils"
"github.com/grafana/dskit/log"
dskitlog "github.com/grafana/dskit/log"
"github.com/opentracing/opentracing-go"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -45,7 +47,7 @@ configs:
err := dec.Decode(&cfg)
require.NoError(t, err)

var loggingLevel log.Level
var loggingLevel dskitlog.Level
require.NoError(t, loggingLevel.Set("debug"))

traces, err := New(nil, nil, prometheus.NewRegistry(), cfg, &server.HookLogger{})
Expand Down Expand Up @@ -90,7 +92,7 @@ configs:
err := dec.Decode(&cfg)
require.NoError(t, err)

var loggingLevel log.Level
var loggingLevel dskitlog.Level
require.NoError(t, loggingLevel.Set("debug"))

traces, err := New(nil, nil, prometheus.NewRegistry(), cfg, &server.HookLogger{})
Expand Down Expand Up @@ -127,7 +129,10 @@ configs:
err := dec.Decode(&cfg)
require.NoError(t, err)

traces, err := New(nil, nil, prometheus.NewRegistry(), cfg, &server.HookLogger{})
logBuffer := bytes.Buffer{}
logger := log.NewLogfmtLogger(&logBuffer)

traces, err := New(nil, nil, prometheus.NewRegistry(), cfg, logger)
require.NoError(t, err)
t.Cleanup(traces.Stop)

Expand Down Expand Up @@ -191,3 +196,80 @@ func testJaegerTracer(t *testing.T) opentracing.Tracer {

return tr
}

func checkConfigReloadLog(t *testing.T, logs string, expectedOccurances int) {
logLine := `level=debug component=traces traces_config=default msg="tracing config won't be recreated because it hasn't changed"`
actualOccurances := strings.Count(logs, logLine)
require.Equal(t, expectedOccurances, actualOccurances)
}

func Test_ReapplyConfig(t *testing.T) {
tracesCfgText := util.Untab(`
configs:
- name: default
receivers:
jaeger:
protocols:
thrift_compact:
remote_write:
- endpoint: tempo:4317
insecure: true
batch:
timeout: 100ms
send_batch_size: 1
`)

var cfg Config
dec := yaml.NewDecoder(strings.NewReader(tracesCfgText))
dec.SetStrict(true)
err := dec.Decode(&cfg)
require.NoError(t, err)

logBuffer := bytes.Buffer{}
logger := log.NewLogfmtLogger(&logBuffer)

traces, err := New(nil, nil, prometheus.NewRegistry(), cfg, logger)
require.NoError(t, err)

checkConfigReloadLog(t, logBuffer.String(), 0)

// Try applying the same config again
var sameFixedConfig Config
dec = yaml.NewDecoder(strings.NewReader(tracesCfgText))
dec.SetStrict(true)
err = dec.Decode(&sameFixedConfig)
require.NoError(t, err)

err = traces.ApplyConfig(nil, nil, sameFixedConfig)
require.NoError(t, err)

checkConfigReloadLog(t, logBuffer.String(), 1)

// Change the configuration slightly
tracesCfgText = util.Untab(`
configs:
- name: default
receivers:
jaeger:
protocols:
thrift_compact:
remote_write:
- endpoint: tempo:4318
insecure: true
batch:
timeout: 100ms
send_batch_size: 1
`)

// Try applying a different config
var differentConfig Config
dec = yaml.NewDecoder(strings.NewReader(tracesCfgText))
dec.SetStrict(true)
err = dec.Decode(&differentConfig)
require.NoError(t, err)

err = traces.ApplyConfig(nil, nil, differentConfig)
require.NoError(t, err)

checkConfigReloadLog(t, logBuffer.String(), 1)
}