Skip to content

Commit

Permalink
clustering: enable refreshing the list of peers on an interval (grafa…
Browse files Browse the repository at this point in the history
…na#4608)

Signed-off-by: Paschalis Tsilias <paschalis.tsilias@grafana.com>
  • Loading branch information
tpaschalis committed Aug 16, 2023
1 parent 718622b commit 4bc9bf7
Show file tree
Hide file tree
Showing 8 changed files with 254 additions and 82 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ Main (unreleased)

- Flow: Users can now define `additional_fields` in `loki.source.cloudflare` (@wildum)

- Clustering: Enable nodes to periodically rediscover and rejoin peers. (@tpaschalis)

- New Grafana Agent Flow components:

- `prometheus.exporter.gcp` - scrape GCP metrics. (@tburgessdev)
Expand Down
10 changes: 8 additions & 2 deletions cmd/internal/flowmode/cmd_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"os/signal"
"sync"
"syscall"
"time"

"github.com/fatih/color"
"github.com/go-kit/log"
Expand Down Expand Up @@ -44,6 +45,8 @@ func runCommand() *cobra.Command {
disableReporting: false,
enablePprof: true,
configFormat: "flow",

clusterRejoinInterval: 60 * time.Second,
}

cmd := &cobra.Command{
Expand Down Expand Up @@ -97,6 +100,8 @@ depending on the nature of the reload error.
StringVar(&r.clusterJoinAddr, "cluster.join-addresses", r.clusterJoinAddr, "Comma-separated list of addresses to join the cluster at")
cmd.Flags().
StringVar(&r.clusterDiscoverPeers, "cluster.discover-peers", r.clusterDiscoverPeers, "List of key-value tuples for discovering peers")
cmd.Flags().
DurationVar(&r.clusterRejoinInterval, "cluster.rejoin-interval", r.clusterRejoinInterval, "How often to rejoin the list of peers")
cmd.Flags().
BoolVar(&r.disableReporting, "disable-reporting", r.disableReporting, "Disable reporting of enabled components to Grafana.")
cmd.Flags().StringVar(&r.configFormat, "config.format", r.configFormat, "The format of the source file. Supported formats: 'flow', 'prometheus'.")
Expand All @@ -116,6 +121,7 @@ type flowRun struct {
clusterAdvAddr string
clusterJoinAddr string
clusterDiscoverPeers string
clusterRejoinInterval time.Duration
configFormat string
configBypassConversionErrors bool
}
Expand Down Expand Up @@ -166,7 +172,7 @@ func (fr *flowRun) Run(configFile string) error {
reg := prometheus.DefaultRegisterer
reg.MustRegister(newResourcesCollector(l))

clusterer, err := cluster.New(l, reg, fr.clusterEnabled, fr.clusterNodeName, fr.httpListenAddr, fr.clusterAdvAddr, fr.clusterJoinAddr, fr.clusterDiscoverPeers)
clusterer, err := cluster.New(l, reg, fr.clusterEnabled, fr.clusterNodeName, fr.httpListenAddr, fr.clusterAdvAddr, fr.clusterJoinAddr, fr.clusterDiscoverPeers, fr.clusterRejoinInterval)
if err != nil {
return fmt.Errorf("building clusterer: %w", err)
}
Expand Down Expand Up @@ -250,7 +256,7 @@ func (fr *flowRun) Run(configFile string) error {
}

// Start the Clusterer's Node implementation.
err = clusterer.Start()
err = clusterer.Start(ctx)
if err != nil {
return fmt.Errorf("failed to start the clusterer: %w", err)
}
Expand Down
13 changes: 11 additions & 2 deletions docs/sources/flow/reference/cli/run.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ The following flags are supported:
* `--cluster.node-name`: The name to use for this node (defaults to the environment's hostname).
* `--cluster.join-addresses`: Comma-separated list of addresses to join the cluster at (default `""`). Mutually exclusive with `--cluster.discover-peers`.
* `--cluster.discover-peers`: List of key-value tuples for discovering peers (default `""`). Mutually exclusive with `--cluster.join-addresses`.
* `--cluster.rejoin-interval`: How often to rejoin the list of peers (default `"60s"`).
* `--cluster.advertise-address`: Address to advertise to other cluster nodes (default `""`).
* `--config.format`: The format of the source file. Supported formats: `flow`, `prometheus`, `promtail` (default `"flow"`).
* `--config.bypass-conversion-errors`: Enable bypassing errors when converting (default `false`).
Expand Down Expand Up @@ -109,18 +110,26 @@ on the chosen provider and the filtering key-values it supports. Clustering
supports the default set of providers available in go-discover and registers
the `k8s` provider on top.

If either the key or the value in a pair contains a space, a backslash, or
If either the key or the value in a tuple pair contains a space, a backslash, or
double quotes, then it must be quoted with double quotes. Within this quoted
string, the backslash can be used to escape double quotes or the backslash
itself.

The `--cluster.rejoin-interval` flag defines how often each node should
rediscover peers based on the contents of the `--cluster.join-addresses` and
`--cluster.discover-peers` flags and try to rejoin them. This operation
is useful for addressing split-brain issues if the initial bootstrap is
unsuccessful and for making clustering easier to manage in dynamic
environments. To disable this behavior, set the `--cluster.rejoin-interval`
flag to `"0s"`.

Discovering peers using the `--cluster.join-addresses` and
`--cluster.discover-peers` flags only happens on startup; after that, cluster
nodes depend on gossiping messages with each other to converge on the cluster's
state.

The first node that is used to bootstrap a new cluster (also known as
the "seed node") can either omit the flag that specifies peers to join or can
the "seed node") can either omit the flags that specify peers to join or can
try to connect to itself.

### Clustering states
Expand Down
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -620,6 +620,8 @@ require (
sigs.k8s.io/structured-merge-diff/v4 v4.2.3 // indirect
)

require github.com/foxcpp/go-mockdns v1.0.0

require (
github.com/drone/envsubst v1.0.3 // indirect
github.com/julienschmidt/httprouter v1.3.0 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1389,6 +1389,8 @@ github.com/form3tech-oss/jwt-go v3.2.5+incompatible h1:/l4kBbb4/vGSsdtB5nUe8L7B9
github.com/form3tech-oss/jwt-go v3.2.5+incompatible/go.mod h1:pbq4aXjuKjdthFRnoDwaVPLA+WlJuPGy+QneDUgJi2k=
github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw=
github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g=
github.com/foxcpp/go-mockdns v1.0.0 h1:7jBqxd3WDWwi/6WhDvacvH1XsN3rOLXyHM1uhvIx6FI=
github.com/foxcpp/go-mockdns v1.0.0/go.mod h1:lgRN6+KxQBawyIghpnl5CezHFGS9VLzvtVlwxvzXTQ4=
github.com/franela/goblin v0.0.0-20200105215937-c9ffbefa60db/go.mod h1:7dvUGVsVBjqR7JHJk0brhHOZYGmfBYOrK0ZhYMEtBr4=
github.com/franela/goblin v0.0.0-20210519012713-85d372ac71e2/go.mod h1:VzmDKDJVZI3aJmnRI9VjAn9nJ8qPPsN1fqzr9dqInIo=
github.com/franela/goreq v0.0.0-20171204163338-bcd34c9993f8/go.mod h1:ZhphrRTfi2rbfLwlschooIH4+wKKDR4Pdxhh+TRoA20=
Expand Down
56 changes: 36 additions & 20 deletions pkg/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ type Clusterer struct {
Node Node
}

func getJoinAddr(addrs []string, in string) []string {
func appendJoinAddr(addrs []string, in string) []string {
_, _, err := net.SplitHostPort(in)
if err == nil {
addrs = append(addrs, in)
Expand All @@ -117,21 +117,21 @@ func getJoinAddr(addrs []string, in string) []string {
}

// New creates a Clusterer.
func New(log log.Logger, reg prometheus.Registerer, clusterEnabled bool, name, listenAddr, advertiseAddr, joinAddr, discoverPeers string) (*Clusterer, error) {
func New(log log.Logger, reg prometheus.Registerer, clusterEnabled bool, name, listenAddr, advertiseAddr, joinAddr, discoverPeers string, rejoinInterval time.Duration) (*Clusterer, error) {
// Standalone node.
if !clusterEnabled {
return &Clusterer{Node: NewLocalNode(listenAddr)}, nil
}

gossipConfig := DefaultGossipConfig

defaultPort := 80
_, portStr, err := net.SplitHostPort(listenAddr)
if err == nil { // there was a port
defaultPort, err = strconv.Atoi(portStr)
defaultPort, err := strconv.Atoi(portStr)
if err != nil {
return nil, err
}
gossipConfig.DefaultPort = defaultPort
}

if name != "" {
Expand All @@ -142,19 +142,9 @@ func New(log log.Logger, reg prometheus.Registerer, clusterEnabled bool, name, l
gossipConfig.AdvertiseAddr = advertiseAddr
}

if joinAddr != "" {
gossipConfig.JoinPeers = []string{}
jaddrs := strings.Split(joinAddr, ",")
for _, jaddr := range jaddrs {
gossipConfig.JoinPeers = getJoinAddr(gossipConfig.JoinPeers, jaddr)
}
}
gossipConfig.JoinPeers = strings.Split(joinAddr, ",")
gossipConfig.DiscoverPeers = discoverPeers

err = gossipConfig.ApplyDefaults(defaultPort)
if err != nil {
return nil, err
}
gossipConfig.RejoinInterval = rejoinInterval

cli := &http.Client{
Transport: &http2.Transport{
Expand Down Expand Up @@ -192,21 +182,47 @@ func New(log log.Logger, reg prometheus.Registerer, clusterEnabled bool, name, l
// new cluster of its own.
// The gossipNode will start out as a Viewer; to participate in clustering,
// the node needs to transition to the Participant state using ChangeState.
func (c *Clusterer) Start() error {
func (c *Clusterer) Start(ctx context.Context) error {
switch node := c.Node.(type) {
case *localNode:
return nil // no-op, always ready
case *GossipNode:
err := node.Start() // TODO(@tpaschalis) Should we backoff and retry before moving on to the fallback here?
peers, err := node.GetPeers()
if err != nil {
return err
}
err = node.Start(peers) // TODO(@tpaschalis) Should we backoff and retry before moving on to the fallback here?
if err != nil {
level.Debug(node.log).Log("msg", "failed to connect to peers; bootstrapping a new cluster")
node.cfg.JoinPeers = nil
err = node.Start()
err = node.Start(nil)
if err != nil {
return err
}
}

if node.cfg.RejoinInterval > 0 {
go func() {
t := time.NewTicker(node.cfg.RejoinInterval)
for {
select {
case <-t.C:
peers, err := node.GetPeers()
if err != nil {
level.Error(node.log).Log("msg", "failed to refresh the list of peers", "err", err)
continue // we'll try on the next tick
}
err = node.Start(peers)
if err != nil {
level.Error(node.log).Log("msg", "failed to rejoin the list of peers", "err", err)
}
case <-ctx.Done():
t.Stop()
return
}
}
}()
}

node.Observe(ckit.FuncObserver(func(peers []peer.Peer) (reregister bool) {
names := make([]string, len(peers))
for i, p := range peers {
Expand Down
Loading

0 comments on commit 4bc9bf7

Please sign in to comment.