Skip to content

Commit

Permalink
loki.source.heroku: use an unchecked collector for server metrics (gr…
Browse files Browse the repository at this point in the history
…afana#3711)

* loki.source.heroku: use an unchecked collector for server metrics

Every time an HTTP server for `loki.source.heroku` is created, it
creates a new weaveworks/common server, and registers the same set of
metrics to the provided collector.

By default, these are checked metrics, which causes the agent-wide
`/metrics` endpoint to stop working with an error:

    An error has occurred while serving metrics:

    4 error(s) occurred:
    * collected metric "loki_source_heroku_drain_target_tcp_connections_limit" { label:<name:"component_id" value:"loki.source.heroku.repro" > label:<name:"protocol" value:"http" > gauge:<value:0 > } was collected before with the same name and label values
    * collected metric "loki_source_heroku_drain_target_tcp_connections_limit" { label:<name:"component_id" value:"loki.source.heroku.repro" > label:<name:"protocol" value:"grpc" > gauge:<value:0 > } was collected before with the same name and label values
    * collected metric "loki_source_heroku_drain_target_tcp_connections" { label:<name:"component_id" value:"loki.source.heroku.repro" > label:<name:"protocol" value:"http" > gauge:<value:0 > } was collected before with the same name and label values
    * collected metric "loki_source_heroku_drain_target_tcp_connections" { label:<name:"component_id" value:"loki.source.heroku.repro" > label:<name:"protocol" value:"grpc" > gauge:<value:0 > } was collected before with the same name and label values

This commit adds a new utility method to create an unchecked
prometheus.Collector implementation, and then creates a new
prometheus.Registry to store server metrics any time a new server is
created. The prometheus.Registry instance for the server is then passed
to the unchecked prometheus.Collector for handling.

Closes grafana#3646.

* loki.source.gcplog: apply same server metrics fix

Co-authored-by: Piotr <17101802+thampiotr@users.noreply.github.com>
  • Loading branch information
rfratto and thampiotr committed Apr 28, 2023
1 parent 91bb86d commit 266484c
Show file tree
Hide file tree
Showing 6 changed files with 104 additions and 25 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ Main (unreleased)
- Fix version information not displaying correctly when passing the `--version`
flag or in the `agent_build_info` metric. (@rfratto)

- Fix issue in `loki.source.heroku` and `loki.source.gcplog` where updating the
component would cause Grafana Agent Flow's Prometheus metrics endpoint to
return an error until the process is restarted. (@rfratto)

### Other changes

- Add metrics when clustering mode is enabled. (@rfratto)
Expand Down
27 changes: 20 additions & 7 deletions component/loki/source/gcplog/gcplog.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import (
"github.com/grafana/agent/component/common/loki"
flow_relabel "github.com/grafana/agent/component/common/relabel"
gt "github.com/grafana/agent/component/loki/source/gcplog/internal/gcplogtarget"
"github.com/grafana/agent/pkg/util"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/prometheus/model/relabel"
)

Expand Down Expand Up @@ -54,8 +56,9 @@ func (a *Arguments) UnmarshalRiver(f func(v interface{}) error) error {

// Component implements the loki.source.gcplog component.
type Component struct {
opts component.Options
metrics *gt.Metrics
opts component.Options
metrics *gt.Metrics
serverMetrics *util.UncheckedCollector

mut sync.RWMutex
fanout []loki.LogsReceiver
Expand All @@ -67,12 +70,15 @@ type Component struct {
// New creates a new loki.source.gcplog component.
func New(o component.Options, args Arguments) (*Component, error) {
c := &Component{
opts: o,
metrics: gt.NewMetrics(o.Registerer),
handler: make(loki.LogsReceiver),
fanout: args.ForwardTo,
opts: o,
metrics: gt.NewMetrics(o.Registerer),
handler: make(loki.LogsReceiver),
fanout: args.ForwardTo,
serverMetrics: util.NewUncheckedCollector(nil),
}

o.Registerer.MustRegister(c.serverMetrics)

// Call to Update() to start readers and set receivers once at the start.
if err := c.Update(args); err != nil {
return nil, err
Expand Down Expand Up @@ -140,7 +146,14 @@ func (c *Component) Update(args component.Arguments) error {
c.target = t
}
if newArgs.PushTarget != nil {
t, err := gt.NewPushTarget(c.metrics, c.opts.Logger, entryHandler, jobName, newArgs.PushTarget, rcs, c.opts.Registerer)
// [gt.NewPushTarget] registers new metrics every time it is called. To
// avoid issues with re-registering metrics with the same name, we create a
// new registry for the target every time we create one, and pass it to an
// unchecked collector to bypass uniqueness checking.
registry := prometheus.NewRegistry()
c.serverMetrics.SetCollector(registry)

t, err := gt.NewPushTarget(c.metrics, c.opts.Logger, entryHandler, jobName, newArgs.PushTarget, rcs, registry)
if err != nil {
level.Error(c.opts.Logger).Log("msg", "failed to create gcplog target with provided config", "err", err)
return err
Expand Down
33 changes: 23 additions & 10 deletions component/loki/source/heroku/heroku.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import (
"github.com/grafana/agent/component/common/loki"
flow_relabel "github.com/grafana/agent/component/common/relabel"
ht "github.com/grafana/agent/component/loki/source/heroku/internal/herokutarget"
"github.com/grafana/agent/pkg/util"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/relabel"
sv "github.com/weaveworks/common/server"
Expand Down Expand Up @@ -64,8 +66,9 @@ func (lc *ListenerConfig) UnmarshalRiver(f func(interface{}) error) error {

// Component implements the loki.source.heroku component.
type Component struct {
opts component.Options
metrics *ht.Metrics
opts component.Options
metrics *ht.Metrics // Metrics about Heroku entries.
serverMetrics *util.UncheckedCollector // Metircs about the HTTP server managed by the component.

mut sync.RWMutex
args Arguments
Expand All @@ -78,15 +81,18 @@ type Component struct {
// New creates a new loki.source.heroku component.
func New(o component.Options, args Arguments) (*Component, error) {
c := &Component{
opts: o,
metrics: ht.NewMetrics(o.Registerer),
mut: sync.RWMutex{},
args: Arguments{},
fanout: args.ForwardTo,
target: nil,
handler: make(loki.LogsReceiver),
opts: o,
metrics: ht.NewMetrics(o.Registerer),
mut: sync.RWMutex{},
args: Arguments{},
fanout: args.ForwardTo,
target: nil,
handler: make(loki.LogsReceiver),
serverMetrics: util.NewUncheckedCollector(nil),
}

o.Registerer.MustRegister(c.serverMetrics)

// Call to Update() to start readers and set receivers once at the start.
if err := c.Update(args); err != nil {
return nil, err
Expand Down Expand Up @@ -145,8 +151,15 @@ func (c *Component) Update(args component.Arguments) error {
}
}

// [ht.NewHerokuTarget] registers new metrics every time it is called. To
// avoid issues with re-registering metrics with the same name, we create a
// new registry for the target every time we create one, and pass it to an
// unchecked collector to bypass uniqueness checking.
registry := prometheus.NewRegistry()
c.serverMetrics.SetCollector(registry)

entryHandler := loki.NewEntryHandler(c.handler, func() {})
t, err := ht.NewHerokuTarget(c.metrics, c.opts.Logger, entryHandler, rcs, newArgs.Convert(), c.opts.Registerer)
t, err := ht.NewHerokuTarget(c.metrics, c.opts.Logger, entryHandler, rcs, newArgs.Convert(), registry)
if err != nil {
level.Error(c.opts.Logger).Log("msg", "failed to create heroku listener with provided config", "err", err)
return err
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,11 +156,11 @@ func (h *HerokuTarget) drain(w http.ResponseWriter, r *http.Request) {
Line: message.Message,
},
}
h.metrics.herokuEntries.WithLabelValues().Inc()
h.metrics.herokuEntries.Inc()
}
err := herokuScanner.Err()
if err != nil {
h.metrics.herokuErrors.WithLabelValues().Inc()
h.metrics.herokuErrors.Inc()
level.Warn(h.logger).Log("msg", "failed to read incoming heroku request", "err", err.Error())
http.Error(w, err.Error(), http.StatusBadRequest)
return
Expand Down
12 changes: 6 additions & 6 deletions component/loki/source/heroku/internal/herokutarget/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,22 +7,22 @@ package herokutarget
import "github.com/prometheus/client_golang/prometheus"

type Metrics struct {
herokuEntries *prometheus.CounterVec
herokuErrors *prometheus.CounterVec
herokuEntries prometheus.Counter
herokuErrors prometheus.Counter
}

func NewMetrics(reg prometheus.Registerer) *Metrics {
var m Metrics

m.herokuEntries = prometheus.NewCounterVec(prometheus.CounterOpts{
m.herokuEntries = prometheus.NewCounter(prometheus.CounterOpts{
Name: "loki_source_heroku_drain_entries_total",
Help: "Number of successful entries received by the Heroku target",
}, []string{})
})

m.herokuErrors = prometheus.NewCounterVec(prometheus.CounterOpts{
m.herokuErrors = prometheus.NewCounter(prometheus.CounterOpts{
Name: "loki_source_heroku_drain_parsing_errors_total",
Help: "Number of parsing errors while receiving Heroku messages",
}, []string{})
})

reg.MustRegister(m.herokuEntries, m.herokuErrors)
return &m
Expand Down
49 changes: 49 additions & 0 deletions pkg/util/unchecked_collector.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package util

import (
"sync"

"github.com/prometheus/client_golang/prometheus"
)

// UncheckedCollector is a prometheus.Collector which stores a set of unchecked
// metrics.
type UncheckedCollector struct {
mut sync.RWMutex
inner prometheus.Collector
}

var _ prometheus.Collector = (*UncheckedCollector)(nil)

// NewUncheckedCollector creates a new UncheckedCollector. If inner is nil,
// UncheckedCollector returns no metrics.
func NewUncheckedCollector(inner prometheus.Collector) *UncheckedCollector {
return &UncheckedCollector{inner: inner}
}

// SetCollector replaces the inner collector.
func (uc *UncheckedCollector) SetCollector(inner prometheus.Collector) {
uc.mut.Lock()
defer uc.mut.Unlock()

uc.inner = inner
}

// Describe implements [prometheus.Collector], but is a no-op to be considered
// an "unchecked" collector by Prometheus. See [prometheus.Collector] for more
// information.
func (uc *UncheckedCollector) Describe(_ chan<- *prometheus.Desc) {
// No-op: do not send any descriptions of metrics to avoid having them be
// checked.
}

// Collector implements [prometheus.Collector]. If the UncheckedCollector has a
// non-nil inner collector, metrics will be collected from it.
func (uc *UncheckedCollector) Collect(ch chan<- prometheus.Metric) {
uc.mut.RLock()
defer uc.mut.RUnlock()

if uc.inner != nil {
uc.inner.Collect(ch)
}
}

0 comments on commit 266484c

Please sign in to comment.