diff --git a/CHANGELOG.md b/CHANGELOG.md index 5806e91cc4bb..c6dad47fe663 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -28,6 +28,10 @@ v0.41.1 (2024-06-07) - Updated pyroscope to v0.4.6 introducing `symbols_map_size` and `pid_map_size` configuration. (@simonswine) +### Bugfixes + +- Fix an issue which caused the config to be reloaded if a config reload was triggered but the config hasn't changed. + The bug only affected the "metrics" and "logs" subsystems in Static mode. v0.41.0 (2024-05-31) -------------------- diff --git a/cmd/grafana-agent-service/service_test.go b/cmd/grafana-agent-service/service_test.go index 145879a2434e..79ebc47201c5 100644 --- a/cmd/grafana-agent-service/service_test.go +++ b/cmd/grafana-agent-service/service_test.go @@ -9,7 +9,6 @@ import ( "os/exec" "path/filepath" "runtime" - "sync" "testing" "github.com/go-kit/log" @@ -84,7 +83,7 @@ func Test_serviceManager(t *testing.T) { t.Run("can forward to stdout", func(t *testing.T) { listenHost := getListenHost(t) - var buf syncBuffer + var buf util.SyncBuffer mgr := newServiceManager(l, serviceManagerConfig{ Path: serviceBinary, @@ -112,7 +111,7 @@ func Test_serviceManager(t *testing.T) { t.Run("can forward to stderr", func(t *testing.T) { listenHost := getListenHost(t) - var buf syncBuffer + var buf util.SyncBuffer mgr := newServiceManager(l, serviceManagerConfig{ Path: serviceBinary, @@ -186,24 +185,3 @@ func makeServiceRequest(host string, path string, body []byte) ([]byte, error) { } return io.ReadAll(resp.Body) } - -// syncBuffer wraps around a bytes.Buffer and makes it safe to use from -// multiple goroutines. -type syncBuffer struct { - mut sync.RWMutex - buf bytes.Buffer -} - -func (sb *syncBuffer) Bytes() []byte { - sb.mut.RLock() - defer sb.mut.RUnlock() - - return sb.buf.Bytes() -} - -func (sb *syncBuffer) Write(p []byte) (n int, err error) { - sb.mut.Lock() - defer sb.mut.Unlock() - - return sb.buf.Write(p) -} diff --git a/internal/util/syncbuffer.go b/internal/util/syncbuffer.go new file mode 100644 index 000000000000..bee030dd95f3 --- /dev/null +++ b/internal/util/syncbuffer.go @@ -0,0 +1,34 @@ +package util + +import ( + "bytes" + "sync" +) + +// SyncBuffer wraps around a bytes.Buffer and makes it safe to use from +// multiple goroutines. +type SyncBuffer struct { + mut sync.RWMutex + buf bytes.Buffer +} + +func (sb *SyncBuffer) Bytes() []byte { + sb.mut.RLock() + defer sb.mut.RUnlock() + + return sb.buf.Bytes() +} + +func (sb *SyncBuffer) String() string { + sb.mut.RLock() + defer sb.mut.RUnlock() + + return sb.buf.String() +} + +func (sb *SyncBuffer) Write(p []byte) (n int, err error) { + sb.mut.Lock() + defer sb.mut.Unlock() + + return sb.buf.Write(p) +} diff --git a/static/logs/logs.go b/static/logs/logs.go index 2d6c478fe510..5de9a2c7ade2 100644 --- a/static/logs/logs.go +++ b/static/logs/logs.go @@ -23,6 +23,7 @@ import ( "github.com/grafana/loki/clients/pkg/promtail/wal" "github.com/grafana/loki/pkg/tracing" "github.com/prometheus/client_golang/prometheus" + "gopkg.in/yaml.v2" ) func init() { @@ -121,6 +122,8 @@ type Instance struct { log log.Logger reg *util.Unregisterer + previousConfig string + promtail *promtail.Promtail } @@ -155,14 +158,20 @@ func (i *Instance) ApplyConfig(c *InstanceConfig, g GlobalConfig, dryRun bool) e defer i.mut.Unlock() // No-op if the configs haven't changed. - if util.CompareYAML(c, i.cfg) { + newConfigByteArr, err := yaml.Marshal(c) + if err != nil { + return fmt.Errorf("failed to marshal new logs instance config: %w", err) + } + newConfig := string(newConfigByteArr) + if newConfig == i.previousConfig { level.Debug(i.log).Log("msg", "instance config hasn't changed, not recreating Promtail") return nil } + i.previousConfig = newConfig i.cfg = c positionsDir := filepath.Dir(c.PositionsConfig.PositionsFile) - err := os.MkdirAll(positionsDir, 0775) + err = os.MkdirAll(positionsDir, 0775) if err != nil { level.Warn(i.log).Log("msg", "failed to create the positions directory. logs may be unable to save their position", "path", positionsDir, "err", err) } diff --git a/static/logs/logs_test.go b/static/logs/logs_test.go index 255c99b55f59..9ff1f1465bfc 100644 --- a/static/logs/logs_test.go +++ b/static/logs/logs_test.go @@ -3,6 +3,7 @@ package logs import ( + "bytes" "fmt" "net" "net/http" @@ -30,6 +31,12 @@ func TestLogs_NilConfig(t *testing.T) { defer l.Stop() } +func checkConfigReloadLog(t *testing.T, logs string, expectedOccurances int) { + logLine := `level=debug component=logs logs_config=default msg="instance config hasn't changed, not recreating Promtail"` + actualOccurances := strings.Count(logs, logLine) + require.Equal(t, expectedOccurances, actualOccurances) +} + func TestLogs(t *testing.T) { // // Create a temporary file to tail @@ -87,7 +94,8 @@ configs: dec.SetStrict(true) require.NoError(t, dec.Decode(&cfg)) require.NoError(t, cfg.ApplyDefaults()) - logger := log.NewSyncLogger(log.NewNopLogger()) + logBuffer := bytes.Buffer{} + logger := log.NewSyncLogger(log.NewLogfmtLogger(&logBuffer)) l, err := New(prometheus.NewRegistry(), &cfg, logger, false) require.NoError(t, err) defer l.Stop() @@ -103,6 +111,23 @@ configs: require.Equal(t, "Hello, world!", req.Streams[0].Entries[0].Line) } + // The config did change. + // We expect the config reload log line to not be printed. + checkConfigReloadLog(t, logBuffer.String(), 0) + + // + // Apply the same config and try reloading. + // Recreate the config struct to make sure it's clean. + // + var sameCfg Config + dec = yaml.NewDecoder(strings.NewReader(cfgText)) + dec.SetStrict(true) + require.NoError(t, dec.Decode(&sameCfg)) + require.NoError(t, sameCfg.ApplyDefaults()) + require.NoError(t, l.ApplyConfig(&sameCfg, false)) + + checkConfigReloadLog(t, logBuffer.String(), 1) + // // Apply a new config and write a new line. // @@ -138,6 +163,10 @@ configs: require.Equal(t, "Hello again!", req.Streams[0].Entries[0].Line) } + // The config did change this time. + // We expect the config reload log line to not be printed again. + checkConfigReloadLog(t, logBuffer.String(), 1) + t.Run("update to nil", func(t *testing.T) { // Applying a nil config should remove all instances. err := l.ApplyConfig(nil, false) diff --git a/static/metrics/agent.go b/static/metrics/agent.go index 487462d8da2c..a29022c86afe 100644 --- a/static/metrics/agent.go +++ b/static/metrics/agent.go @@ -15,6 +15,7 @@ import ( "github.com/prometheus/client_golang/prometheus" "go.uber.org/atomic" "google.golang.org/grpc" + "gopkg.in/yaml.v2" "github.com/grafana/agent/internal/util" "github.com/grafana/agent/static/metrics/cluster" @@ -150,6 +151,8 @@ type Agent struct { actor chan func() initialBootDone atomic.Bool + + previousConfig string } // New creates and starts a new Agent. @@ -227,9 +230,16 @@ func (a *Agent) ApplyConfig(cfg Config) error { a.mut.Lock() defer a.mut.Unlock() - if util.CompareYAML(a.cfg, cfg) { + newConfigByteArr, err := yaml.Marshal(cfg) + if err != nil { + return fmt.Errorf("failed to marshal new config: %w", err) + } + newConfig := string(newConfigByteArr) + if newConfig == a.previousConfig { + level.Debug(a.logger).Log("msg", "not recreating metrics instance because config hasn't changed") return nil } + a.previousConfig = newConfig if a.stopped { return fmt.Errorf("agent stopped") diff --git a/static/metrics/agent_test.go b/static/metrics/agent_test.go index 6f5f46b293ff..726ee5a1a6d1 100644 --- a/static/metrics/agent_test.go +++ b/static/metrics/agent_test.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "net/http" + "strings" "sync" "testing" "time" @@ -13,6 +14,7 @@ import ( "github.com/grafana/agent/internal/util" "github.com/grafana/agent/static/metrics/instance" "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/testutil" "github.com/prometheus/prometheus/scrape" "github.com/prometheus/prometheus/storage" "github.com/stretchr/testify/require" @@ -113,6 +115,142 @@ configs: require.Greater(t, int64(scrapeConfig.ScrapeInterval), int64(0)) } +func checkConfigReloadLog(t *testing.T, logs string, expectedOccurances int) { + logLine := `level=debug agent=prometheus msg="not recreating metrics instance because config hasn't changed"` + actualOccurances := strings.Count(logs, logLine) + require.Equal(t, expectedOccurances, actualOccurances) +} + +func TestConfigReload(t *testing.T) { + cfgText := ` +wal_directory: /tmp/wal +configs: + - name: instance_a + scrape_configs: + - job_name: 'node' + static_configs: + - targets: ['localhost:9100'] +` + var cfg Config + + err := yaml.Unmarshal([]byte(cfgText), &cfg) + require.NoError(t, err) + err = cfg.ApplyDefaults() + require.NoError(t, err) + + fact := newFakeInstanceFactory() + + logBuffer := util.SyncBuffer{} + logger := log.NewSyncLogger(log.NewLogfmtLogger(&logBuffer)) + + reg := prometheus.NewRegistry() + + a, err := newAgent(reg, cfg, logger, fact.factory) + require.NoError(t, err) + + util.Eventually(t, func(t require.TestingT) { + require.NotNil(t, fact.created) + require.Equal(t, 1, int(fact.created.Load())) + require.Equal(t, 1, len(a.mm.ListInstances())) + }) + + t.Run("instances should be running", func(t *testing.T) { + for _, mi := range fact.Mocks() { + // Each instance should have wait called on it + util.Eventually(t, func(t require.TestingT) { + require.True(t, mi.running.Load()) + }) + } + }) + + util.Eventually(t, func(t require.TestingT) { + if err := testutil.GatherAndCompare(reg, + strings.NewReader(` +# HELP agent_metrics_configs_changed_total Total number of dynamically updated configs +# TYPE agent_metrics_configs_changed_total counter +agent_metrics_configs_changed_total{event="created"} 1 +`), "agent_metrics_configs_changed_total"); err != nil { + t.Errorf("mismatch metrics: %v", err) + t.FailNow() + } + }) + + // The config has changed (it used to be ""). The log line won't be printed. + checkConfigReloadLog(t, logBuffer.String(), 0) + + // + // Try the same config. + // + var sameCfg Config + + err = yaml.Unmarshal([]byte(cfgText), &sameCfg) + require.NoError(t, err) + err = sameCfg.ApplyDefaults() + require.NoError(t, err) + + a.ApplyConfig(sameCfg) + + util.Eventually(t, func(t require.TestingT) { + if err := testutil.GatherAndCompare(reg, + strings.NewReader(` +# HELP agent_metrics_configs_changed_total Total number of dynamically updated configs +# TYPE agent_metrics_configs_changed_total counter +agent_metrics_configs_changed_total{event="created"} 1 +`), "agent_metrics_configs_changed_total"); err != nil { + t.Errorf("mismatch metrics: %v", err) + t.FailNow() + } + }) + + // The config did not change. The log line should be printed. + checkConfigReloadLog(t, logBuffer.String(), 1) + + // + // Try a different config. + // + cfgText = ` +wal_directory: /tmp/wal +configs: + - name: instance_b + scrape_configs: + - job_name: 'node' + static_configs: + - targets: ['localhost:9100'] +` + var differentCfg Config + + err = yaml.Unmarshal([]byte(cfgText), &differentCfg) + require.NoError(t, err) + err = differentCfg.ApplyDefaults() + require.NoError(t, err) + + a.ApplyConfig(differentCfg) + + util.Eventually(t, func(t require.TestingT) { + if err := testutil.GatherAndCompare(reg, + strings.NewReader(` +# HELP agent_metrics_configs_changed_total Total number of dynamically updated configs +# TYPE agent_metrics_configs_changed_total counter +agent_metrics_configs_changed_total{event="created"} 2 +agent_metrics_configs_changed_total{event="deleted"} 1 +`), "agent_metrics_configs_changed_total"); err != nil { + t.Errorf("mismatch metrics: %v", err) + t.FailNow() + } + }) + + // The config has changed. The log line won't be printed. + checkConfigReloadLog(t, logBuffer.String(), 1) + + for _, mi := range fact.Mocks() { + util.Eventually(t, func(t require.TestingT) { + require.Equal(t, 1, int(mi.startedCount.Load())) + }) + } + + a.Stop() +} + func TestAgent(t *testing.T) { // Launch two instances cfg := Config{ diff --git a/static/traces/instance.go b/static/traces/instance.go index ebb922c95264..5c3b4ffd2037 100644 --- a/static/traces/instance.go +++ b/static/traces/instance.go @@ -57,6 +57,7 @@ func (i *Instance) ApplyConfig(logsSubsystem *logs.Logs, promInstanceManager ins if util.CompareYAML(cfg, i.cfg) { // No config change + i.logger.Debug("tracing config won't be recreated because it hasn't changed") return nil } i.cfg = cfg diff --git a/static/traces/traces_test.go b/static/traces/traces_test.go index 645457bf008c..2727a5f32f70 100644 --- a/static/traces/traces_test.go +++ b/static/traces/traces_test.go @@ -1,15 +1,17 @@ package traces import ( + "bytes" "fmt" "strings" "testing" "time" + "github.com/go-kit/log" "github.com/grafana/agent/internal/util" "github.com/grafana/agent/static/server" "github.com/grafana/agent/static/traces/traceutils" - "github.com/grafana/dskit/log" + dskitlog "github.com/grafana/dskit/log" "github.com/opentracing/opentracing-go" "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/require" @@ -45,7 +47,7 @@ configs: err := dec.Decode(&cfg) require.NoError(t, err) - var loggingLevel log.Level + var loggingLevel dskitlog.Level require.NoError(t, loggingLevel.Set("debug")) traces, err := New(nil, nil, prometheus.NewRegistry(), cfg, &server.HookLogger{}) @@ -90,7 +92,7 @@ configs: err := dec.Decode(&cfg) require.NoError(t, err) - var loggingLevel log.Level + var loggingLevel dskitlog.Level require.NoError(t, loggingLevel.Set("debug")) traces, err := New(nil, nil, prometheus.NewRegistry(), cfg, &server.HookLogger{}) @@ -127,7 +129,10 @@ configs: err := dec.Decode(&cfg) require.NoError(t, err) - traces, err := New(nil, nil, prometheus.NewRegistry(), cfg, &server.HookLogger{}) + logBuffer := bytes.Buffer{} + logger := log.NewLogfmtLogger(&logBuffer) + + traces, err := New(nil, nil, prometheus.NewRegistry(), cfg, logger) require.NoError(t, err) t.Cleanup(traces.Stop) @@ -191,3 +196,80 @@ func testJaegerTracer(t *testing.T) opentracing.Tracer { return tr } + +func checkConfigReloadLog(t *testing.T, logs string, expectedOccurances int) { + logLine := `level=debug component=traces traces_config=default msg="tracing config won't be recreated because it hasn't changed"` + actualOccurances := strings.Count(logs, logLine) + require.Equal(t, expectedOccurances, actualOccurances) +} + +func Test_ReapplyConfig(t *testing.T) { + tracesCfgText := util.Untab(` +configs: +- name: default + receivers: + jaeger: + protocols: + thrift_compact: + remote_write: + - endpoint: tempo:4317 + insecure: true + batch: + timeout: 100ms + send_batch_size: 1 + `) + + var cfg Config + dec := yaml.NewDecoder(strings.NewReader(tracesCfgText)) + dec.SetStrict(true) + err := dec.Decode(&cfg) + require.NoError(t, err) + + logBuffer := bytes.Buffer{} + logger := log.NewLogfmtLogger(&logBuffer) + + traces, err := New(nil, nil, prometheus.NewRegistry(), cfg, logger) + require.NoError(t, err) + + checkConfigReloadLog(t, logBuffer.String(), 0) + + // Try applying the same config again + var sameFixedConfig Config + dec = yaml.NewDecoder(strings.NewReader(tracesCfgText)) + dec.SetStrict(true) + err = dec.Decode(&sameFixedConfig) + require.NoError(t, err) + + err = traces.ApplyConfig(nil, nil, sameFixedConfig) + require.NoError(t, err) + + checkConfigReloadLog(t, logBuffer.String(), 1) + + // Change the configuration slightly + tracesCfgText = util.Untab(` +configs: +- name: default + receivers: + jaeger: + protocols: + thrift_compact: + remote_write: + - endpoint: tempo:4318 + insecure: true + batch: + timeout: 100ms + send_batch_size: 1 + `) + + // Try applying a different config + var differentConfig Config + dec = yaml.NewDecoder(strings.NewReader(tracesCfgText)) + dec.SetStrict(true) + err = dec.Decode(&differentConfig) + require.NoError(t, err) + + err = traces.ApplyConfig(nil, nil, differentConfig) + require.NoError(t, err) + + checkConfigReloadLog(t, logBuffer.String(), 1) +}