Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pkg/reloader: improve detection of directory changes #3136

Merged
merged 2 commits into from
Sep 10, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re

## Unreleased

### Changed

- [#3136](https://github.com/thanos-io/thanos/pull/3136) Sidecar: Add metric `thanos_sidecar_reloader_config_apply_operations_total` and rename metric `thanos_sidecar_reloader_config_apply_errors_total` to `thanos_sidecar_reloader_config_apply_operations_failed_total`.

## [v0.15.0](https://github.com/thanos-io/thanos/releases) - 2020.09.07

Highlights:
Expand Down
1 change: 1 addition & 0 deletions pkg/reloader/example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ func ExampleReloader() {
WatchedDirs: []string{"/path/to/dirs"},
WatchInterval: 3 * time.Minute,
RetryInterval: 5 * time.Second,
DelayInterval: 1 * time.Second,
})

ctx, cancel := context.WithCancel(context.Background())
Expand Down
274 changes: 207 additions & 67 deletions pkg/reloader/reloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ import (
"path/filepath"
"regexp"
"strings"
"sync"
"time"

"github.com/fsnotify/fsnotify"
Expand All @@ -86,19 +87,18 @@ type Reloader struct {
reloadURL *url.URL
cfgFile string
cfgOutputFile string
watchedDirs []string
watchInterval time.Duration
retryInterval time.Duration
watchedDirs []string
watcher *watcher

lastCfgHash []byte
lastWatchedDirsHash []byte

reloads prometheus.Counter
reloadErrors prometheus.Counter
watches prometheus.Gauge
watchEvents prometheus.Counter
watchErrors prometheus.Counter
configErrors prometheus.Counter
reloads prometheus.Counter
reloadErrors prometheus.Counter
configApplyErrors prometheus.Counter
configApply prometheus.Counter
}

// Options bundles options for the Reloader.
Expand All @@ -112,11 +112,15 @@ type Options struct {
// will be substituted and the output written into the given path. Prometheus should then use
// cfgOutputFile as its config file path.
CfgOutputFile string
// WatchedDirs is a collection of paths for this reloader to watch over.
// WatchedDirs is a collection of paths for the reloader to watch over.
WatchedDirs []string
// DelayInterval controls how long the reloader will wait without receiving
// new file-system events before it applies the reload.
DelayInterval time.Duration
// WatchInterval controls how often reloader re-reads config and directories.
WatchInterval time.Duration
// RetryInterval controls how often reloader retries config reload in case of error.
// RetryInterval controls how often the reloader retries a reloading of the
// configuration in case the endpoint returned an error.
RetryInterval time.Duration
}

Expand All @@ -133,6 +137,7 @@ func New(logger log.Logger, reg prometheus.Registerer, o *Options) *Reloader {
reloadURL: o.ReloadURL,
cfgFile: o.CfgFile,
cfgOutputFile: o.CfgOutputFile,
watcher: newWatcher(logger, reg, o.DelayInterval),
watchedDirs: o.WatchedDirs,
watchInterval: o.WatchInterval,
retryInterval: o.RetryInterval,
Expand All @@ -149,56 +154,45 @@ func New(logger log.Logger, reg prometheus.Registerer, o *Options) *Reloader {
Help: "Total number of reload requests that failed.",
},
),
configErrors: promauto.With(reg).NewCounter(
configApply: promauto.With(reg).NewCounter(
prometheus.CounterOpts{
Name: "reloader_config_apply_errors_total",
Help: "Total number of config applies that failed.",
Name: "reloader_config_apply_operations_total",
Help: "Total number of config apply operations.",
},
),
watches: promauto.With(reg).NewGauge(
prometheus.GaugeOpts{
Name: "reloader_watches",
Help: "Number of resources watched by the reloader.",
},
),
watchEvents: promauto.With(reg).NewCounter(
configApplyErrors: promauto.With(reg).NewCounter(
prometheus.CounterOpts{
Name: "reloader_watch_events_total",
Help: "Total number of events received by the reloader from the watcher.",
},
),
watchErrors: promauto.With(reg).NewCounter(
prometheus.CounterOpts{
Name: "reloader_watch_errors_total",
Help: "Total number of errors received by the reloader from the watcher.",
Name: "reloader_config_apply_operations_failed_total",
Help: "Total number of config apply operations that failed.",
},
),
}
return r
}

// We cannot detect everything via watch. Watch interval controls how often we re-read given dirs non-recursively.
func (r *Reloader) WithWatchInterval(duration time.Duration) {
r.watchInterval = duration
}

// Watch starts to watch periodically the config file and directories and process them until the context
// gets canceled. Config file gets env expanded if cfgOutputFile is specified and reload is trigger if
// config or directories changed.
// Watch watchers periodically based on r.watchInterval.
// For config file it watches it directly as well via fsnotify.
// It watches directories as well, but lot's of edge cases are missing, so rely on interval mostly.
// Watch detects any change made to the watched config file and directories. It
// returns when the context is canceled.
// Whenever a filesystem change is detected or the watch interval has elapsed,
// the reloader expands the config file (if cfgOutputFile is specified) and
// triggers a reload if the configuration file or files in the watched
// directories have changed.
// Because some edge cases might be missing, the reloader also relies on the
// watch interval.
func (r *Reloader) Watch(ctx context.Context) error {
if r.cfgFile == "" && len(r.watchedDirs) == 0 {
level.Info(r.logger).Log("msg", "nothing to be watched")
<-ctx.Done()
return nil
}

watcher, err := fsnotify.NewWatcher()
kakkoyun marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return errors.Wrap(err, "create watcher")
}
defer runutil.CloseWithLogOnErr(r.logger, watcher, "config watcher close")

watchables := map[string]struct{}{}
if r.cfgFile != "" {
watchables[filepath.Dir(r.cfgFile)] = struct{}{}
if err := watcher.Add(r.cfgFile); err != nil {
if err := r.watcher.addFile(r.cfgFile); err != nil {
return errors.Wrapf(err, "add config file %s to watcher", r.cfgFile)
}

Expand All @@ -207,42 +201,46 @@ func (r *Reloader) Watch(ctx context.Context) error {
}
}

// Watch directories in best effort manner.
for _, dir := range r.watchedDirs {
watchables[filepath.Dir(dir)] = struct{}{}
if err := watcher.Add(dir); err != nil {
return errors.Wrapf(err, "add dir %s to watcher", dir)
if err := r.watcher.addDirectory(dir); err != nil {
return errors.Wrapf(err, "add directory %s to watcher", dir)
}
}

tick := time.NewTicker(r.watchInterval)
defer tick.Stop()
// Start watching the file-system.
var wg sync.WaitGroup
wg.Add(1)
go func() {
r.watcher.run(ctx)
wg.Done()
}()

r.watches.Set(float64(len(watchables)))
level.Info(r.logger).Log(
"msg", "started watching config file and directories for changes",
"cfg", r.cfgFile,
"out", r.cfgOutputFile,
"dirs", strings.Join(r.watchedDirs, ","))

applyCtx, cancel := context.WithTimeout(ctx, r.watchInterval)
kakkoyun marked this conversation as resolved.
Show resolved Hide resolved

for {
select {
case <-ctx.Done():
return nil
case <-tick.C:
case event := <-watcher.Events:
r.watchEvents.Inc()
if _, ok := watchables[filepath.Dir(event.Name)]; !ok {
continue
case <-applyCtx.Done():
if ctx.Err() != nil {
cancel()
wg.Wait()
return nil
}
case err := <-watcher.Errors:
r.watchErrors.Inc()
level.Error(r.logger).Log("msg", "watch error", "err", err)
continue
case <-r.watcher.notify:
}

if err := r.apply(ctx); err != nil {
r.configErrors.Inc()
// Reset the watch timeout.
cancel()
applyCtx, cancel = context.WithTimeout(ctx, r.watchInterval)

r.configApply.Inc()
if err := r.apply(applyCtx); err != nil {
r.configApplyErrors.Inc()
level.Error(r.logger).Log("msg", "apply error", "err", err)
}
}
Expand Down Expand Up @@ -341,11 +339,7 @@ func (r *Reloader) apply(ctx context.Context) error {
return nil
}

// Retry trigger reload until it succeeded or next tick is near.
retryCtx, cancel := context.WithTimeout(ctx, r.watchInterval)
defer cancel()

if err := runutil.RetryWithLog(r.logger, r.retryInterval, retryCtx.Done(), func() error {
if err := runutil.RetryWithLog(r.logger, r.retryInterval, ctx.Done(), func() error {
r.reloads.Inc()
if err := r.triggerReload(ctx); err != nil {
r.reloadErrors.Inc()
Expand All @@ -355,7 +349,7 @@ func (r *Reloader) apply(ctx context.Context) error {
r.lastCfgHash = cfgHash
r.lastWatchedDirsHash = watchedDirsHash
level.Info(r.logger).Log(
"msg", "Prometheus reload triggered",
"msg", "Reload triggered",
"cfg_in", r.cfgFile,
"cfg_out", r.cfgOutputFile,
"watched_dirs", strings.Join(r.watchedDirs, ", "))
Expand Down Expand Up @@ -434,3 +428,149 @@ func expandEnv(b []byte) (r []byte, err error) {
})
return r, err
}

type watcher struct {
notify chan struct{}

w *fsnotify.Watcher
watchedDirs map[string]struct{}
delayInterval time.Duration

logger log.Logger
watchedItems prometheus.Gauge
watchEvents prometheus.Counter
watchErrors prometheus.Counter
}

func newWatcher(logger log.Logger, reg prometheus.Registerer, delayInterval time.Duration) *watcher {
return &watcher{
logger: logger,
delayInterval: delayInterval,
notify: make(chan struct{}),
watchedDirs: make(map[string]struct{}),

watchedItems: promauto.With(reg).NewGauge(
prometheus.GaugeOpts{
Name: "reloader_watches",
Help: "Number of resources watched by the reloader.",
},
),
watchEvents: promauto.With(reg).NewCounter(
prometheus.CounterOpts{
Name: "reloader_watch_events_total",
Help: "Total number of events received by the reloader from the watcher.",
},
),
watchErrors: promauto.With(reg).NewCounter(
prometheus.CounterOpts{
Name: "reloader_watch_errors_total",
Help: "Total number of errors received by the reloader from the watcher.",
},
),
}
}

func (w *watcher) addPath(name string) error {
if w.w == nil {
kakkoyun marked this conversation as resolved.
Show resolved Hide resolved
fsWatcher, err := fsnotify.NewWatcher()
if err != nil {
return errors.Wrap(err, "create watcher")
}
w.w = fsWatcher
}

if err := w.w.Add(name); err != nil {
return err
}

w.watchedDirs[name] = struct{}{}
w.watchedItems.Set(float64(len(w.watchedDirs)))

return nil
}

func (w *watcher) addDirectory(name string) error {
w.watchedDirs[name] = struct{}{}
return w.addPath(name)
}

func (w *watcher) addFile(name string) error {
w.watchedDirs[filepath.Dir(name)] = struct{}{}
return w.addPath(name)
}

func (w *watcher) run(ctx context.Context) {
defer runutil.CloseWithLogOnErr(w.logger, w.w, "config watcher close")

var wg sync.WaitGroup
notify := make(chan struct{})
kakkoyun marked this conversation as resolved.
Show resolved Hide resolved

wg.Add(1)
go func() {
defer wg.Done()

var (
delayCtx context.Context
cancel context.CancelFunc
)

for {
select {
case <-ctx.Done():
if cancel != nil {
cancel()
}
return

case <-notify:
if cancel != nil {
cancel()
}

delayCtx, cancel = context.WithCancel(ctx)

wg.Add(1)
go func(ctx context.Context) {
defer wg.Done()

if w.delayInterval > 0 {
t := time.NewTicker(w.delayInterval)
defer t.Stop()

select {
case <-ctx.Done():
return
case <-t.C:
}
}

select {
case w.notify <- struct{}{}:
case <-ctx.Done():
}
}(delayCtx)
}
}
}()

for {
select {
case <-ctx.Done():
wg.Wait()
return

case event := <-w.w.Events:
w.watchEvents.Inc()
if _, ok := w.watchedDirs[filepath.Dir(event.Name)]; ok {
select {
case notify <- struct{}{}:
default:
}
}

case err := <-w.w.Errors:
w.watchErrors.Inc()
level.Error(w.logger).Log("msg", "watch error", "err", err)
}
}
}
Loading