Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

memberlist metrics #327

Merged
merged 4 commits into from
Jun 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
* [CHANGE] Cache: Remove the `context.Context` argument from the `Cache.Store` method and rename the method to `Cache.StoreAsync`. #273
* [CHANGE] ring: make it harder to leak contexts when using `DoUntilQuorum`. #319
* [CHANGE] ring: `CountTokens()`, used for the calculation of ownership in the ring page has been changed in such a way that when zone-awareness is enabled, it calculates the ownership per-zone. Previously zones were not taken into account. #325
* [CHANGE] memberlist: `MetricsRegisterer` field has been removed from `memberlist.KVConfig` in favor of registrerer passed via into `NewKV` function. #327
* [FEATURE] Cache: Add support for configuring a Redis cache backend. #268 #271 #276
* [FEATURE] Add support for waiting on the rate limiter using the new `WaitN` method. #279
* [ENHANCEMENT] Add configuration to customize backoff for the gRPC clients.
Expand Down Expand Up @@ -139,3 +140,4 @@
* [BUGFIX] Ring: prevent iterating the whole ring when using `ExcludedZones`. #285
* [BUGFIX] grpcclient: fix missing `.` in flag name for initial connection window size flag. #314
* [BUGFIX] ring.Lifecycler: Handle when previous ring state is leaving and the number of tokens has changed. #79
* [BUGFIX] memberlist metrics: fix registration of `memberlist_client_kv_store_count` metric and disable expiration of metrics exposed by memberlist library. #327
7 changes: 2 additions & 5 deletions kv/memberlist/memberlist_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,9 +164,7 @@ type KVConfig struct {

TCPTransport TCPTransportConfig `yaml:",inline"`

// Where to put custom metrics. Metrics are not registered, if this is nil.
MetricsRegisterer prometheus.Registerer `yaml:"-"`
MetricsNamespace string `yaml:"-"`
MetricsNamespace string `yaml:"-"`

// Codecs to register. Codecs need to be registered before joining other members.
Codecs []codec.Codec `yaml:"-"`
Expand Down Expand Up @@ -354,7 +352,6 @@ var (
// trigger connecting to the existing memberlist cluster. If that fails and AbortIfJoinFails is true, error is returned
// and service enters Failed state.
func NewKV(cfg KVConfig, logger log.Logger, dnsProvider DNSProvider, registerer prometheus.Registerer) *KV {
cfg.TCPTransport.MetricsRegisterer = cfg.MetricsRegisterer
cfg.TCPTransport.MetricsNamespace = cfg.MetricsNamespace

mlkv := &KV{
Expand Down Expand Up @@ -386,7 +383,7 @@ func defaultMemberlistConfig() *memberlist.Config {
}

func (m *KV) buildMemberlistConfig() (*memberlist.Config, error) {
tr, err := NewTCPTransport(m.cfg.TCPTransport, m.logger)
tr, err := NewTCPTransport(m.cfg.TCPTransport, m.logger, m.registerer)
if err != nil {
return nil, fmt.Errorf("failed to create transport: %v", err)
}
Expand Down
24 changes: 24 additions & 0 deletions kv/memberlist/memberlist_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (

"github.com/go-kit/log"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/testutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

Expand Down Expand Up @@ -1478,6 +1479,29 @@ func TestDelegateMethodsDontCrashBeforeKVStarts(t *testing.T) {
assert.Equal(t, msg, val)
}

func TestMetricsRegistration(t *testing.T) {
c := dataCodec{}

cfg := KVConfig{}
cfg.Codecs = append(cfg.Codecs, c)

reg := prometheus.NewPedanticRegistry()
kv := NewKV(cfg, log.NewNopLogger(), &dnsProviderMock{}, reg)
err := kv.CAS(context.Background(), "test", c, func(in interface{}) (out interface{}, retry bool, err error) {
return &data{Members: map[string]member{
"member": {},
}}, true, nil
})
require.NoError(t, err)

assert.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(`
# HELP memberlist_client_kv_store_count Number of values in KV Store
# TYPE memberlist_client_kv_store_count gauge
memberlist_client_kv_store_count 1
`), "memberlist_client_kv_store_count"))

}

func decodeDataFromMarshalledKeyValuePair(t *testing.T, marshalledKVP []byte, key string, codec dataCodec) *data {
kvp := KeyValuePair{}
require.NoError(t, kvp.Unmarshal(marshalledKVP))
Expand Down
14 changes: 11 additions & 3 deletions kv/memberlist/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,17 +171,25 @@ func (m *KV) createAndRegisterMetrics() {
Help: "Number of dropped notifications in WatchPrefix function",
}, []string{"prefix"})

if m.cfg.MetricsRegisterer == nil {
if m.registerer == nil {
return
}

m.registerer.MustRegister(m)

// memberlist uses armonmetrics package for internal usage
// here we configure armonmetrics to use prometheus
sink, err := armonprometheus.NewPrometheusSink() // there is no option to pass registrerer, this uses default
// here we configure armonmetrics to use prometheus.
opts := armonprometheus.PrometheusOpts{
Expiration: 0, // Don't expire metrics.
Registerer: m.registerer,
}

sink, err := armonprometheus.NewPrometheusSinkFrom(opts)
if err == nil {
cfg := armonmetrics.DefaultConfig("")
cfg.EnableHostname = false // no need to put hostname into metric
cfg.EnableHostnameLabel = false // no need to put hostname into labels
cfg.EnableServiceLabel = false // Don't put service name into a label. (We don't set service name anyway).
cfg.EnableRuntimeMetrics = false // metrics about Go runtime already provided by prometheus
cfg.EnableTypePrefix = true // to make better sense of internal memberlist metrics
cfg.TimerGranularity = time.Second // timers are in seconds in prometheus world
Expand Down
7 changes: 3 additions & 4 deletions kv/memberlist/tcp_transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,7 @@ type TCPTransportConfig struct {
TransportDebug bool `yaml:"-" category:"advanced"`

// Where to put custom metrics. nil = don't register.
MetricsRegisterer prometheus.Registerer `yaml:"-"`
MetricsNamespace string `yaml:"-"`
MetricsNamespace string `yaml:"-"`

TLSEnabled bool `yaml:"tls_enabled" category:"advanced"`
TLS dstls.ClientConfig `yaml:",inline"`
Expand Down Expand Up @@ -113,7 +112,7 @@ type TCPTransport struct {

// NewTCPTransport returns a new tcp-based transport with the given configuration. On
// success all the network listeners will be created and listening.
func NewTCPTransport(config TCPTransportConfig, logger log.Logger) (*TCPTransport, error) {
func NewTCPTransport(config TCPTransportConfig, logger log.Logger, registerer prometheus.Registerer) (*TCPTransport, error) {
if len(config.BindAddrs) == 0 {
config.BindAddrs = []string{zeroZeroZeroZero}
}
Expand All @@ -135,7 +134,7 @@ func NewTCPTransport(config TCPTransportConfig, logger log.Logger) (*TCPTranspor
}
}

t.registerMetrics(config.MetricsRegisterer)
t.registerMetrics(registerer)

// Clean up listeners if there's an error.
defer func() {
Expand Down
5 changes: 3 additions & 2 deletions kv/memberlist/tcp_transport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"testing"

"github.com/go-kit/log"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

Expand Down Expand Up @@ -44,7 +45,7 @@ func TestTCPTransport_WriteTo_ShouldNotLogAsWarningExpectedFailures(t *testing.T
testData.setup(t, &cfg)
}

transport, err := NewTCPTransport(cfg, logger)
transport, err := NewTCPTransport(cfg, logger, nil)
require.NoError(t, err)

_, err = transport.WriteTo([]byte("test"), testData.remoteAddr)
Expand Down Expand Up @@ -83,7 +84,7 @@ func TestFinalAdvertiseAddr(t *testing.T) {
cfg.BindAddrs = testData.bindAddrs
cfg.BindPort = testData.bindPort

transport, err := NewTCPTransport(cfg, logger)
transport, err := NewTCPTransport(cfg, logger, prometheus.NewPedanticRegistry())
require.NoError(t, err)

ip, port, err := transport.FinalAdvertiseAddr(testData.advertiseAddr, testData.bindPort)
Expand Down