From 1ba63839f218011069568e42acbd92882d49dd63 Mon Sep 17 00:00:00 2001 From: Simon Pasquier Date: Thu, 3 Sep 2020 16:13:43 +0200 Subject: [PATCH 1/2] pkg/reloader: improve detection of directory changes When watching for changes in directories, the reloader used to rely only on the watch interval and not the inotify events. This commit implements a more efficient detection of changes for watched directories. The change also adds a new `DelayInterval` option that allows to delay the config reload after no additional event are received. Finally a new metric, `thanos_sidecar_reloader_config_apply_operations_total`, is added and `thanos_sidecar_reloader_config_apply_errors_total` has been renamed to `thanos_sidecar_reloader_config_apply_operations_failed_total` for consistency. Signed-off-by: Simon Pasquier --- CHANGELOG.md | 4 + pkg/reloader/example_test.go | 1 + pkg/reloader/reloader.go | 274 +++++++++++++++++++++++++--------- pkg/reloader/reloader_test.go | 259 ++++++++++++++++++++++++++++---- 4 files changed, 443 insertions(+), 95 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 63ec3e6b97..35ae806629 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,10 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re ## Unreleased +### Changed + +- [#3136](https://github.com/thanos-io/thanos/pull/3136) Sidecar: Add metric `thanos_sidecar_reloader_config_apply_operations_total` and rename metric `thanos_sidecar_reloader_config_apply_errors_total` to `thanos_sidecar_reloader_config_apply_operations_failed_total`. + ## [v0.15.0](https://github.com/thanos-io/thanos/releases) - 2020.09.07 Highlights: diff --git a/pkg/reloader/example_test.go b/pkg/reloader/example_test.go index 0ee46e8b30..3ec805b38c 100644 --- a/pkg/reloader/example_test.go +++ b/pkg/reloader/example_test.go @@ -27,6 +27,7 @@ func ExampleReloader() { WatchedDirs: []string{"/path/to/dirs"}, WatchInterval: 3 * time.Minute, RetryInterval: 5 * time.Second, + DelayInterval: 1 * time.Second, }) ctx, cancel := context.WithCancel(context.Background()) diff --git a/pkg/reloader/reloader.go b/pkg/reloader/reloader.go index 997cd6b3a1..472cf6a67c 100644 --- a/pkg/reloader/reloader.go +++ b/pkg/reloader/reloader.go @@ -67,6 +67,7 @@ import ( "path/filepath" "regexp" "strings" + "sync" "time" "github.com/fsnotify/fsnotify" @@ -86,19 +87,18 @@ type Reloader struct { reloadURL *url.URL cfgFile string cfgOutputFile string - watchedDirs []string watchInterval time.Duration retryInterval time.Duration + watchedDirs []string + watcher *watcher lastCfgHash []byte lastWatchedDirsHash []byte - reloads prometheus.Counter - reloadErrors prometheus.Counter - watches prometheus.Gauge - watchEvents prometheus.Counter - watchErrors prometheus.Counter - configErrors prometheus.Counter + reloads prometheus.Counter + reloadErrors prometheus.Counter + configApplyErrors prometheus.Counter + configApply prometheus.Counter } // Options bundles options for the Reloader. @@ -112,11 +112,15 @@ type Options struct { // will be substituted and the output written into the given path. Prometheus should then use // cfgOutputFile as its config file path. CfgOutputFile string - // WatchedDirs is a collection of paths for this reloader to watch over. + // WatchedDirs is a collection of paths for the reloader to watch over. WatchedDirs []string + // DelayInterval controls how long the reloader will wait without receiving + // new file-system events before it applies the reload. + DelayInterval time.Duration // WatchInterval controls how often reloader re-reads config and directories. WatchInterval time.Duration - // RetryInterval controls how often reloader retries config reload in case of error. + // RetryInterval controls how often the reloader retries a reloading of the + // configuration in case the endpoint returned an error. RetryInterval time.Duration } @@ -133,6 +137,7 @@ func New(logger log.Logger, reg prometheus.Registerer, o *Options) *Reloader { reloadURL: o.ReloadURL, cfgFile: o.CfgFile, cfgOutputFile: o.CfgOutputFile, + watcher: newWatcher(logger, reg, o.DelayInterval), watchedDirs: o.WatchedDirs, watchInterval: o.WatchInterval, retryInterval: o.RetryInterval, @@ -149,56 +154,45 @@ func New(logger log.Logger, reg prometheus.Registerer, o *Options) *Reloader { Help: "Total number of reload requests that failed.", }, ), - configErrors: promauto.With(reg).NewCounter( + configApply: promauto.With(reg).NewCounter( prometheus.CounterOpts{ - Name: "reloader_config_apply_errors_total", - Help: "Total number of config applies that failed.", + Name: "reloader_config_apply_operations_total", + Help: "Total number of config apply operations.", }, ), - watches: promauto.With(reg).NewGauge( - prometheus.GaugeOpts{ - Name: "reloader_watches", - Help: "Number of resources watched by the reloader.", - }, - ), - watchEvents: promauto.With(reg).NewCounter( + configApplyErrors: promauto.With(reg).NewCounter( prometheus.CounterOpts{ - Name: "reloader_watch_events_total", - Help: "Total number of events received by the reloader from the watcher.", - }, - ), - watchErrors: promauto.With(reg).NewCounter( - prometheus.CounterOpts{ - Name: "reloader_watch_errors_total", - Help: "Total number of errors received by the reloader from the watcher.", + Name: "reloader_config_apply_operations_failed_total", + Help: "Total number of config apply operations that failed.", }, ), } return r } -// We cannot detect everything via watch. Watch interval controls how often we re-read given dirs non-recursively. -func (r *Reloader) WithWatchInterval(duration time.Duration) { - r.watchInterval = duration -} - -// Watch starts to watch periodically the config file and directories and process them until the context -// gets canceled. Config file gets env expanded if cfgOutputFile is specified and reload is trigger if -// config or directories changed. -// Watch watchers periodically based on r.watchInterval. -// For config file it watches it directly as well via fsnotify. -// It watches directories as well, but lot's of edge cases are missing, so rely on interval mostly. +// Watch detects any change made to the watched config file and directories. It +// returns when the context is canceled. +// Whenever a filesystem change is detected or the watch interval has elapsed, +// the reloader expands the config file (if cfgOutputFile is specified) and +// triggers a reload if the configuration file or files in the watched +// directories have changed. +// Because some edge cases might be missing, the reloader also relies on the +// watch interval. func (r *Reloader) Watch(ctx context.Context) error { + if r.cfgFile == "" && len(r.watchedDirs) == 0 { + level.Info(r.logger).Log("msg", "nothing to be watched") + <-ctx.Done() + return nil + } + watcher, err := fsnotify.NewWatcher() if err != nil { return errors.Wrap(err, "create watcher") } defer runutil.CloseWithLogOnErr(r.logger, watcher, "config watcher close") - watchables := map[string]struct{}{} if r.cfgFile != "" { - watchables[filepath.Dir(r.cfgFile)] = struct{}{} - if err := watcher.Add(r.cfgFile); err != nil { + if err := r.watcher.addFile(r.cfgFile); err != nil { return errors.Wrapf(err, "add config file %s to watcher", r.cfgFile) } @@ -207,42 +201,46 @@ func (r *Reloader) Watch(ctx context.Context) error { } } - // Watch directories in best effort manner. for _, dir := range r.watchedDirs { - watchables[filepath.Dir(dir)] = struct{}{} - if err := watcher.Add(dir); err != nil { - return errors.Wrapf(err, "add dir %s to watcher", dir) + if err := r.watcher.addDirectory(dir); err != nil { + return errors.Wrapf(err, "add directory %s to watcher", dir) } } - tick := time.NewTicker(r.watchInterval) - defer tick.Stop() + // Start watching the file-system. + var wg sync.WaitGroup + wg.Add(1) + go func() { + r.watcher.run(ctx) + wg.Done() + }() - r.watches.Set(float64(len(watchables))) level.Info(r.logger).Log( "msg", "started watching config file and directories for changes", "cfg", r.cfgFile, "out", r.cfgOutputFile, "dirs", strings.Join(r.watchedDirs, ",")) + applyCtx, cancel := context.WithTimeout(ctx, r.watchInterval) + for { select { - case <-ctx.Done(): - return nil - case <-tick.C: - case event := <-watcher.Events: - r.watchEvents.Inc() - if _, ok := watchables[filepath.Dir(event.Name)]; !ok { - continue + case <-applyCtx.Done(): + if ctx.Err() != nil { + cancel() + wg.Wait() + return nil } - case err := <-watcher.Errors: - r.watchErrors.Inc() - level.Error(r.logger).Log("msg", "watch error", "err", err) - continue + case <-r.watcher.notify: } - if err := r.apply(ctx); err != nil { - r.configErrors.Inc() + // Reset the watch timeout. + cancel() + applyCtx, cancel = context.WithTimeout(ctx, r.watchInterval) + + r.configApply.Inc() + if err := r.apply(applyCtx); err != nil { + r.configApplyErrors.Inc() level.Error(r.logger).Log("msg", "apply error", "err", err) } } @@ -341,11 +339,7 @@ func (r *Reloader) apply(ctx context.Context) error { return nil } - // Retry trigger reload until it succeeded or next tick is near. - retryCtx, cancel := context.WithTimeout(ctx, r.watchInterval) - defer cancel() - - if err := runutil.RetryWithLog(r.logger, r.retryInterval, retryCtx.Done(), func() error { + if err := runutil.RetryWithLog(r.logger, r.retryInterval, ctx.Done(), func() error { r.reloads.Inc() if err := r.triggerReload(ctx); err != nil { r.reloadErrors.Inc() @@ -355,7 +349,7 @@ func (r *Reloader) apply(ctx context.Context) error { r.lastCfgHash = cfgHash r.lastWatchedDirsHash = watchedDirsHash level.Info(r.logger).Log( - "msg", "Prometheus reload triggered", + "msg", "Reload triggered", "cfg_in", r.cfgFile, "cfg_out", r.cfgOutputFile, "watched_dirs", strings.Join(r.watchedDirs, ", ")) @@ -434,3 +428,149 @@ func expandEnv(b []byte) (r []byte, err error) { }) return r, err } + +type watcher struct { + notify chan struct{} + + w *fsnotify.Watcher + watchedDirs map[string]struct{} + delayInterval time.Duration + + logger log.Logger + watchedItems prometheus.Gauge + watchEvents prometheus.Counter + watchErrors prometheus.Counter +} + +func newWatcher(logger log.Logger, reg prometheus.Registerer, delayInterval time.Duration) *watcher { + return &watcher{ + logger: logger, + delayInterval: delayInterval, + notify: make(chan struct{}), + watchedDirs: make(map[string]struct{}), + + watchedItems: promauto.With(reg).NewGauge( + prometheus.GaugeOpts{ + Name: "reloader_watches", + Help: "Number of resources watched by the reloader.", + }, + ), + watchEvents: promauto.With(reg).NewCounter( + prometheus.CounterOpts{ + Name: "reloader_watch_events_total", + Help: "Total number of events received by the reloader from the watcher.", + }, + ), + watchErrors: promauto.With(reg).NewCounter( + prometheus.CounterOpts{ + Name: "reloader_watch_errors_total", + Help: "Total number of errors received by the reloader from the watcher.", + }, + ), + } +} + +func (w *watcher) addPath(name string) error { + if w.w == nil { + fsWatcher, err := fsnotify.NewWatcher() + if err != nil { + return errors.Wrap(err, "create watcher") + } + w.w = fsWatcher + } + + if err := w.w.Add(name); err != nil { + return err + } + + w.watchedDirs[name] = struct{}{} + w.watchedItems.Set(float64(len(w.watchedDirs))) + + return nil +} + +func (w *watcher) addDirectory(name string) error { + w.watchedDirs[name] = struct{}{} + return w.addPath(name) +} + +func (w *watcher) addFile(name string) error { + w.watchedDirs[filepath.Dir(name)] = struct{}{} + return w.addPath(name) +} + +func (w *watcher) run(ctx context.Context) { + defer runutil.CloseWithLogOnErr(w.logger, w.w, "config watcher close") + + var wg sync.WaitGroup + notify := make(chan struct{}) + + wg.Add(1) + go func() { + defer wg.Done() + + var ( + delayCtx context.Context + cancel context.CancelFunc + ) + + for { + select { + case <-ctx.Done(): + if cancel != nil { + cancel() + } + return + + case <-notify: + if cancel != nil { + cancel() + } + + delayCtx, cancel = context.WithCancel(ctx) + + wg.Add(1) + go func(ctx context.Context) { + defer wg.Done() + + if w.delayInterval > 0 { + t := time.NewTicker(w.delayInterval) + defer t.Stop() + + select { + case <-ctx.Done(): + return + case <-t.C: + } + } + + select { + case w.notify <- struct{}{}: + case <-ctx.Done(): + } + }(delayCtx) + } + } + }() + + for { + select { + case <-ctx.Done(): + wg.Wait() + return + + case event := <-w.w.Events: + w.watchEvents.Inc() + if _, ok := w.watchedDirs[filepath.Dir(event.Name)]; ok { + select { + case notify <- struct{}{}: + default: + } + } + + case err := <-w.w.Errors: + w.watchErrors.Inc() + level.Error(w.logger).Log("msg", "watch error", "err", err) + } + } +} diff --git a/pkg/reloader/reloader_test.go b/pkg/reloader/reloader_test.go index 624f40757f..e8cbdade64 100644 --- a/pkg/reloader/reloader_test.go +++ b/pkg/reloader/reloader_test.go @@ -19,6 +19,7 @@ import ( "testing" "time" + "github.com/go-kit/kit/log" "go.uber.org/atomic" "go.uber.org/goleak" @@ -75,6 +76,7 @@ func TestReloader_ConfigApply(t *testing.T) { WatchedDirs: nil, WatchInterval: 9999 * time.Hour, // Disable interval to test watch logic only. RetryInterval: 100 * time.Millisecond, + DelayInterval: 1 * time.Millisecond, }) // Fail without config. @@ -148,7 +150,7 @@ config: // Change the mode so reloader can't read the file. testutil.Ok(t, os.Chmod(input, os.ModeDir)) - attemptsCnt += 1 + attemptsCnt++ // That was the second attempt to reload config. All good, break. if attemptsCnt == 2 { break @@ -162,7 +164,7 @@ config: testutil.Ok(t, os.Unsetenv("TEST_RELOADER_THANOS_ENV2")) } -func TestReloader_RuleApply(t *testing.T) { +func TestReloader_DirectoriesApply(t *testing.T) { l, err := net.Listen("tcp", "localhost:0") testutil.Ok(t, err) @@ -173,7 +175,7 @@ func TestReloader_RuleApply(t *testing.T) { srv.Handler = http.HandlerFunc(func(resp http.ResponseWriter, r *http.Request) { i++ if i%2 == 0 { - // Every second request, fail to ensure that retry works. + // Fail every second request to ensure that retry works. resp.WriteHeader(http.StatusServiceUnavailable) return } @@ -189,30 +191,41 @@ func TestReloader_RuleApply(t *testing.T) { reloadURL, err := url.Parse(fmt.Sprintf("http://%s", l.Addr().String())) testutil.Ok(t, err) - dir, err := ioutil.TempDir("", "reloader-rules-test") - testutil.Ok(t, err) - defer func() { testutil.Ok(t, os.RemoveAll(dir)) }() - - dir2, err := ioutil.TempDir("", "reload-rules-test2") - testutil.Ok(t, err) - defer func() { testutil.Ok(t, os.RemoveAll(dir2)) }() + dir := t.TempDir() + dir2 := t.TempDir() - // Symlinked directory. + // dir + // └─ rule-dir -> dir2/rule-dir + // dir2 + // └─ rule-dir testutil.Ok(t, os.Mkdir(path.Join(dir2, "rule-dir"), os.ModePerm)) testutil.Ok(t, os.Symlink(path.Join(dir2, "rule-dir"), path.Join(dir, "rule-dir"))) - reloader := New(nil, nil, &Options{ - ReloadURL: reloadURL, - CfgFile: "", - CfgOutputFile: "", - WatchedDirs: []string{dir, path.Join(dir, "rule-dir")}, - WatchInterval: 100 * time.Millisecond, - RetryInterval: 100 * time.Millisecond, - }) - - // Some initial state. + logger := log.NewNopLogger() + reloader := New( + logger, + nil, + &Options{ + ReloadURL: reloadURL, + CfgFile: "", + CfgOutputFile: "", + WatchedDirs: []string{dir, path.Join(dir, "rule-dir")}, + WatchInterval: 9999 * time.Hour, // Disable interval to test watch logic only. + RetryInterval: 100 * time.Millisecond, + }) + + // dir + // ├─ rule-dir -> dir2/rule-dir + // └─ rule1.yaml + // dir2 + // ├─ rule-dir + // │ └─ rule4.yaml + // ├─ rule3-001.yaml -> rule3-source.yaml + // └─ rule3-source.yaml + // The reloader watches 2 directories: dir and dir/rule-dir. testutil.Ok(t, ioutil.WriteFile(path.Join(dir, "rule1.yaml"), []byte("rule"), os.ModePerm)) testutil.Ok(t, ioutil.WriteFile(path.Join(dir2, "rule3-source.yaml"), []byte("rule3"), os.ModePerm)) + testutil.Ok(t, os.Symlink(path.Join(dir2, "rule3-source.yaml"), path.Join(dir2, "rule3-001.yaml"))) testutil.Ok(t, ioutil.WriteFile(path.Join(dir2, "rule-dir", "rule4.yaml"), []byte("rule4"), os.ModePerm)) ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) @@ -241,25 +254,81 @@ func TestReloader_RuleApply(t *testing.T) { reloadsSeen = rel t.Log("Performing step number", rel) - switch rel { case 0: - // Add new rule file. + // Create rule2.yaml. + // + // dir + // ├─ rule-dir -> dir2/rule-dir + // ├─ rule1.yaml + // └─ rule2.yaml (*) + // dir2 + // ├─ rule-dir + // │ └─ rule4.yaml + // ├─ rule3-001.yaml -> rule3-source.yaml + // └─ rule3-source.yaml testutil.Ok(t, ioutil.WriteFile(path.Join(dir, "rule2.yaml"), []byte("rule2"), os.ModePerm)) case 1: - // Change rule 1 in place. + // Update rule1.yaml. + // + // dir + // ├─ rule-dir -> dir2/rule-dir + // ├─ rule1.yaml (*) + // └─ rule2.yaml + // dir2 + // ├─ rule-dir + // │ └─ rule4.yaml + // ├─ rule3-001.yaml -> rule3-source.yaml + // └─ rule3-source.yaml testutil.Ok(t, ioutil.WriteFile(path.Join(dir, "rule1.yaml"), []byte("rule1-changed"), os.ModePerm)) case 2: - // Add new rule as symlink. - testutil.Ok(t, os.Symlink(path.Join(dir2, "rule3-source.yaml"), path.Join(dir2, "rule3.yaml"))) + // Create dir/rule3.yaml (symlink to rule3-001.yaml). + // + // dir + // ├─ rule-dir -> dir2/rule-dir + // ├─ rule1.yaml + // ├─ rule2.yaml + // └─ rule3.yaml -> dir2/rule3-001.yaml (*) + // dir2 + // ├─ rule-dir + // │ └─ rule4.yaml + // ├─ rule3-001.yaml -> rule3-source.yaml + // └─ rule3-source.yaml + testutil.Ok(t, os.Symlink(path.Join(dir2, "rule3-001.yaml"), path.Join(dir2, "rule3.yaml"))) testutil.Ok(t, os.Rename(path.Join(dir2, "rule3.yaml"), path.Join(dir, "rule3.yaml"))) case 3: - // Change rule in symlink. + // Update the symlinked file and replace the symlink file to trigger fsnotify. + // + // dir + // ├─ rule-dir -> dir2/rule-dir + // ├─ rule1.yaml + // ├─ rule2.yaml + // └─ rule3.yaml -> dir2/rule3-002.yaml (*) + // dir2 + // ├─ rule-dir + // │ └─ rule4.yaml + // ├─ rule3-002.yaml -> rule3-source.yaml (*) + // └─ rule3-source.yaml (*) testutil.Ok(t, ioutil.WriteFile(path.Join(dir2, "rule3-source.yaml"), []byte("rule3-changed"), os.ModePerm)) + testutil.Ok(t, os.Symlink(path.Join(dir2, "rule3-source.yaml"), path.Join(dir2, "rule3-002.yaml"))) + testutil.Ok(t, os.Symlink(path.Join(dir2, "rule3-002.yaml"), path.Join(dir2, "rule3.yaml"))) + testutil.Ok(t, os.Rename(path.Join(dir2, "rule3.yaml"), path.Join(dir, "rule3.yaml"))) + testutil.Ok(t, os.Remove(path.Join(dir2, "rule3-001.yaml"))) case 4: - // Change rule in symlinked directory.. + // Update rule4.yaml in the symlinked directory. + // + // dir + // ├─ rule-dir -> dir2/rule-dir + // ├─ rule1.yaml + // ├─ rule2.yaml + // └─ rule3.yaml -> rule3-source.yaml + // dir2 + // ├─ rule-dir + // │ └─ rule4.yaml (*) + // └─ rule3-source.yaml testutil.Ok(t, ioutil.WriteFile(path.Join(dir2, "rule-dir", "rule4.yaml"), []byte("rule4-changed"), os.ModePerm)) } + if rel > 4 { // All good. return @@ -273,3 +342,137 @@ func TestReloader_RuleApply(t *testing.T) { testutil.Ok(t, err) testutil.Equals(t, 5, reloads.Load().(int)) } + +func TestReloaderDirectoriesApplyBasedOnWatchInterval(t *testing.T) { + l, err := net.Listen("tcp", "localhost:0") + testutil.Ok(t, err) + + reloads := &atomic.Value{} + reloads.Store(0) + srv := &http.Server{} + srv.Handler = http.HandlerFunc(func(resp http.ResponseWriter, r *http.Request) { + reloads.Store(reloads.Load().(int) + 1) // The only writer. + resp.WriteHeader(http.StatusOK) + }) + go func() { + _ = srv.Serve(l) + }() + defer func() { testutil.Ok(t, srv.Close()) }() + + reloadURL, err := url.Parse(fmt.Sprintf("http://%s", l.Addr().String())) + testutil.Ok(t, err) + + dir := t.TempDir() + dir2 := t.TempDir() + + // dir + // └─ rule-dir -> dir2/rule-dir + // dir2 + // └─ rule-dir + testutil.Ok(t, os.Mkdir(path.Join(dir2, "rule-dir"), os.ModePerm)) + testutil.Ok(t, os.Symlink(path.Join(dir2, "rule-dir"), path.Join(dir, "rule-dir"))) + + logger := log.NewNopLogger() + reloader := New( + logger, + nil, + &Options{ + ReloadURL: reloadURL, + CfgFile: "", + CfgOutputFile: "", + WatchedDirs: []string{dir, path.Join(dir, "rule-dir")}, + WatchInterval: 1 * time.Second, // use a small watch interval. + RetryInterval: 9999 * time.Hour, + }, + ) + + // dir + // ├─ rule-dir -> dir2/rule-dir + // └─ rule1.yaml + // dir2 + // ├─ rule-dir + // │ └─ rule4.yaml + // ├─ rule3-001.yaml -> rule3-source.yaml + // └─ rule3-source.yaml + // + // The reloader watches 2 directories: dir and dir/rule-dir. + testutil.Ok(t, ioutil.WriteFile(path.Join(dir, "rule1.yaml"), []byte("rule"), os.ModePerm)) + testutil.Ok(t, ioutil.WriteFile(path.Join(dir2, "rule3-source.yaml"), []byte("rule3"), os.ModePerm)) + testutil.Ok(t, os.Symlink(path.Join(dir2, "rule3-source.yaml"), path.Join(dir2, "rule3-001.yaml"))) + testutil.Ok(t, ioutil.WriteFile(path.Join(dir2, "rule-dir", "rule4.yaml"), []byte("rule4"), os.ModePerm)) + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + g := sync.WaitGroup{} + g.Add(1) + go func() { + defer g.Done() + defer cancel() + + reloadsSeen := 0 + init := false + for { + runtime.Gosched() // Ensure during testing on small machine, other go routines have chance to continue. + + select { + case <-ctx.Done(): + return + case <-time.After(500 * time.Millisecond): + } + + rel := reloads.Load().(int) + if init && rel <= reloadsSeen { + continue + } + init = true + reloadsSeen = rel + + t.Log("Performing step number", rel) + switch rel { + case 0: + // Create rule3.yaml (symlink to rule3-001.yaml). + // + // dir + // ├─ rule-dir -> dir2/rule-dir + // ├─ rule1.yaml + // ├─ rule2.yaml + // └─ rule3.yaml -> dir2/rule3-001.yaml (*) + // dir2 + // ├─ rule-dir + // │ └─ rule4.yaml + // ├─ rule3-001.yaml -> rule3-source.yaml + // └─ rule3-source.yaml + testutil.Ok(t, os.Symlink(path.Join(dir2, "rule3-001.yaml"), path.Join(dir2, "rule3.yaml"))) + testutil.Ok(t, os.Rename(path.Join(dir2, "rule3.yaml"), path.Join(dir, "rule3.yaml"))) + case 1: + // Update the symlinked file but do not replace the symlink in dir. + // + // fsnotify shouldn't send any event because the change happens + // in a directory that isn't watched but the reloader should detect + // the update thanks to the watch interval. + // + // dir + // ├─ rule-dir -> dir2/rule-dir + // ├─ rule1.yaml + // ├─ rule2.yaml + // └─ rule3.yaml -> dir2/rule3-001.yaml + // dir2 + // ├─ rule-dir + // │ └─ rule4.yaml + // ├─ rule3-001.yaml -> rule3-source.yaml + // └─ rule3-source.yaml (*) + testutil.Ok(t, ioutil.WriteFile(path.Join(dir2, "rule3-source.yaml"), []byte("rule3-changed"), os.ModePerm)) + } + + if rel > 1 { + // All good. + return + } + } + }() + err = reloader.Watch(ctx) + cancel() + g.Wait() + + testutil.Ok(t, err) + testutil.Equals(t, 2, reloads.Load().(int)) +} From cf0a8685ec9efeccb895cc087ebede74671e12a5 Mon Sep 17 00:00:00 2001 From: Simon Pasquier Date: Wed, 9 Sep 2020 17:47:36 +0200 Subject: [PATCH 2/2] Updates after Kemal's review Signed-off-by: Simon Pasquier --- pkg/reloader/reloader.go | 30 +++++++++++++++++++----------- 1 file changed, 19 insertions(+), 11 deletions(-) diff --git a/pkg/reloader/reloader.go b/pkg/reloader/reloader.go index 472cf6a67c..ccb862b747 100644 --- a/pkg/reloader/reloader.go +++ b/pkg/reloader/reloader.go @@ -185,11 +185,7 @@ func (r *Reloader) Watch(ctx context.Context) error { return nil } - watcher, err := fsnotify.NewWatcher() - if err != nil { - return errors.Wrap(err, "create watcher") - } - defer runutil.CloseWithLogOnErr(r.logger, watcher, "config watcher close") + defer runutil.CloseWithLogOnErr(r.logger, r.watcher, "config watcher close") if r.cfgFile != "" { if err := r.watcher.addFile(r.cfgFile); err != nil { @@ -221,13 +217,13 @@ func (r *Reloader) Watch(ctx context.Context) error { "out", r.cfgOutputFile, "dirs", strings.Join(r.watchedDirs, ",")) - applyCtx, cancel := context.WithTimeout(ctx, r.watchInterval) + applyCtx, applyCancel := context.WithTimeout(ctx, r.watchInterval) for { select { case <-applyCtx.Done(): if ctx.Err() != nil { - cancel() + applyCancel() wg.Wait() return nil } @@ -235,8 +231,8 @@ func (r *Reloader) Watch(ctx context.Context) error { } // Reset the watch timeout. - cancel() - applyCtx, cancel = context.WithTimeout(ctx, r.watchInterval) + applyCancel() + applyCtx, applyCancel = context.WithTimeout(ctx, r.watchInterval) r.configApply.Inc() if err := r.apply(applyCtx); err != nil { @@ -470,6 +466,16 @@ func newWatcher(logger log.Logger, reg prometheus.Registerer, delayInterval time } } +// Close implements the io.Closer interface. +func (w *watcher) Close() error { + if w.w == nil { + return nil + } + watcher := w.w + w.w = nil + return watcher.Close() +} + func (w *watcher) addPath(name string) error { if w.w == nil { fsWatcher, err := fsnotify.NewWatcher() @@ -502,8 +508,10 @@ func (w *watcher) addFile(name string) error { func (w *watcher) run(ctx context.Context) { defer runutil.CloseWithLogOnErr(w.logger, w.w, "config watcher close") - var wg sync.WaitGroup - notify := make(chan struct{}) + var ( + wg sync.WaitGroup + notify = make(chan struct{}) + ) wg.Add(1) go func() {