Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

receive: Store the most recent config hash and timestamp as metrics #1378

Merged
merged 1 commit into from
Aug 6, 2019
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 54 additions & 11 deletions pkg/receive/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package receive

import (
"context"
"crypto/md5"
"encoding/binary"
"encoding/json"
"io/ioutil"
"os"
Expand Down Expand Up @@ -33,6 +35,9 @@ type ConfigWatcher struct {
logger log.Logger
watcher *fsnotify.Watcher

hashGauge prometheus.Gauge
successGauge prometheus.Gauge
lastSuccessTimeGauge prometheus.Gauge
changesCounter prometheus.Counter
errorCounter prometheus.Counter
refreshCounter prometheus.Counter
Expand Down Expand Up @@ -62,6 +67,21 @@ func NewConfigWatcher(logger log.Logger, r prometheus.Registerer, path string, i
interval: time.Duration(interval),
logger: logger,
watcher: watcher,
hashGauge: prometheus.NewGauge(
prometheus.GaugeOpts{
Name: "thanos_receive_config_hash",
Help: "Hash of the currently loaded hashring configuration file.",
}),
successGauge: prometheus.NewGauge(
prometheus.GaugeOpts{
Name: "thanos_receive_config_last_reload_successful",
Help: "Whether the last hashring configuration file reload attempt was successful.",
}),
lastSuccessTimeGauge: prometheus.NewGauge(
prometheus.GaugeOpts{
Name: "thanos_receive_config_last_reload_success_timestamp_seconds",
Help: "Timestamp of the last successful hashring configuration file reload.",
}),
changesCounter: prometheus.NewCounter(
prometheus.CounterOpts{
Name: "thanos_receive_hashrings_file_changes_total",
Expand Down Expand Up @@ -93,6 +113,9 @@ func NewConfigWatcher(logger log.Logger, r prometheus.Registerer, path string, i

if r != nil {
r.MustRegister(
c.hashGauge,
c.successGauge,
c.lastSuccessTimeGauge,
c.changesCounter,
c.errorCounter,
c.refreshCounter,
Expand Down Expand Up @@ -148,8 +171,13 @@ func (cw *ConfigWatcher) Run(ctx context.Context) {
}
}

// readFile reads the configured file and returns a configuration.
func (cw *ConfigWatcher) readFile() ([]HashringConfig, error) {
// C returns a chan that gets hashring configuration updates.
func (cw *ConfigWatcher) C() <-chan []HashringConfig {
return cw.ch
}

// readFile reads the configured file and returns content of configuration file.
func (cw *ConfigWatcher) readFile() ([]byte, error) {
fd, err := os.Open(cw.path)
if err != nil {
return nil, err
Expand All @@ -160,33 +188,43 @@ func (cw *ConfigWatcher) readFile() ([]HashringConfig, error) {
}
}()

content, err := ioutil.ReadAll(fd)
if err != nil {
return nil, err
}
return ioutil.ReadAll(fd)
}

// loadConfig loads raw configuration content and returns a configuration.
func (cw *ConfigWatcher) loadConfig(content []byte) ([]HashringConfig, error) {
var config []HashringConfig
err = json.Unmarshal(content, &config)
err := json.Unmarshal(content, &config)
return config, err
}

// refresh reads the configured file and sends the hashring configuration on the channel.
func (cw *ConfigWatcher) refresh(ctx context.Context) {
cw.refreshCounter.Inc()
config, err := cw.readFile()
cfgContent, err := cw.readFile()
if err != nil {
cw.errorCounter.Inc()
level.Error(cw.logger).Log("msg", "failed to read configuration file", "err", err, "path", cw.path)
return
}

config, err := cw.loadConfig(cfgContent)
if err != nil {
cw.errorCounter.Inc()
level.Error(cw.logger).Log("msg", "failed to load configuration file", "err", err, "path", cw.path)
return
}

// If there was no change to the configuration, return early.
if reflect.DeepEqual(cw.last, config) {
return
}
cw.changesCounter.Inc()
// Save the last known configuration.
cw.last = config
cw.successGauge.Set(1)
cw.lastSuccessTimeGauge.Set(float64(time.Now().Unix()))
cw.hashGauge.Set(hashAsMetricValue(cfgContent))

for _, c := range config {
cw.hashringNodesGauge.WithLabelValues(c.Hashring).Set(float64(len(c.Endpoints)))
Expand Down Expand Up @@ -228,7 +266,12 @@ func (cw *ConfigWatcher) stop() {
level.Debug(cw.logger).Log("msg", "hashring configuration watcher stopped")
}

// C returns a chan that gets hashring configuration updates.
func (cw *ConfigWatcher) C() <-chan []HashringConfig {
return cw.ch
// hashAsMetricValue generates metric value from hash of data.
func hashAsMetricValue(data []byte) float64 {
sum := md5.Sum(data)
// We only want 48 bits as a float64 only has a 53 bit mantissa.
smallSum := sum[0:6]
var bytes = make([]byte, 8)
copy(bytes, smallSum)
return float64(binary.LittleEndian.Uint64(bytes))
}