Skip to content

Commit

Permalink
Add loki.source.api component (#3648)
Browse files Browse the repository at this point in the history
  • Loading branch information
thampiotr committed May 5, 2023
1 parent f96fe04 commit af52a4e
Show file tree
Hide file tree
Showing 34 changed files with 1,245 additions and 18 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ Main (unreleased)
- `prometheus.operator.servicemonitors` discovers ServiceMonitor resources in your Kubernetes cluster and scrape
the targets they reference. (@captncraig, @marctc, @jcreixell)

- Added new Grafana Agent Flow components:
- `loki.source.api` - receive Loki log entries over HTTP (e.g. from other agents). (@thampiotr)
- Added coalesce function to river stdlib. (@jkroepke)

### Enhancements
Expand Down
1 change: 1 addition & 0 deletions component/all/all.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
_ "github.com/grafana/agent/component/loki/echo" // Import loki.echo
_ "github.com/grafana/agent/component/loki/process" // Import loki.process
_ "github.com/grafana/agent/component/loki/relabel" // Import loki.relabel
_ "github.com/grafana/agent/component/loki/source/api" // Import loki.source.api
_ "github.com/grafana/agent/component/loki/source/azure_event_hubs" // Import loki.source.azure_event_hubs
_ "github.com/grafana/agent/component/loki/source/cloudflare" // Import loki.source.cloudflare
_ "github.com/grafana/agent/component/loki/source/docker" // Import loki.source.docker
Expand Down
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,11 @@ func (c *Client) Chan() chan<- loki.Entry {
return c.entries
}

// LogsReceiver returns this client as a LogsReceiver, which is useful in testing.
func (c *Client) LogsReceiver() loki.LogsReceiver {
return c.entries
}

func (c *Client) Received() []loki.Entry {
c.mtx.Lock()
defer c.mtx.Unlock()
Expand Down
File renamed without changes.
File renamed without changes.
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
"testing"
"time"

"github.com/grafana/agent/component/loki/internal/fake"
"github.com/grafana/agent/component/common/loki/client/fake"

"github.com/go-kit/log"
"github.com/grafana/dskit/backoff"
Expand Down
2 changes: 1 addition & 1 deletion component/common/loki/positions/positions.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ type Config struct {
ReadOnly bool `mapstructure:"-" yaml:"-"`
}

// RegisterFlags with prefix registers flags where every name is prefixed by
// RegisterFlagsWithPrefix registers flags where every name is prefixed by
// prefix. If prefix is a non-empty string, prefix should end with a period.
func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
f.DurationVar(&cfg.SyncPeriod, prefix+"positions.sync-period", 10*time.Second, "Period with this to sync the position file.")
Expand Down
2 changes: 1 addition & 1 deletion component/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ type Component interface {
// always match the struct type which the component registers.
//
// Update will be called concurrently with Run. The component must be able to
// gracefully handle updating its config will still running.
// gracefully handle updating its config while still running.
//
// An error may be returned if the provided config is invalid.
Update(args Arguments) error
Expand Down
2 changes: 1 addition & 1 deletion component/loki/process/internal/stages/pipeline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
"testing"
"time"

"github.com/grafana/agent/component/loki/internal/fake"
"github.com/grafana/agent/component/common/loki/client/fake"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
Expand Down
160 changes: 160 additions & 0 deletions component/loki/source/api/api.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
package api

import (
"context"
"fmt"
"reflect"
"sync"

"github.com/grafana/agent/component"
"github.com/grafana/agent/component/common/loki"
fnet "github.com/grafana/agent/component/common/net"
"github.com/grafana/agent/component/common/relabel"
"github.com/grafana/agent/component/loki/source/api/internal/lokipush"
"github.com/grafana/agent/pkg/util"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
)

func init() {
component.Register(component.Registration{
Name: "loki.source.api",
Args: Arguments{},
Build: func(opts component.Options, args component.Arguments) (component.Component, error) {
return New(opts, args.(Arguments))
},
})
}

type Arguments struct {
Server *fnet.ServerConfig `river:",squash"`
ForwardTo []loki.LogsReceiver `river:"forward_to,attr"`
Labels map[string]string `river:"labels,attr,optional"`
RelabelRules relabel.Rules `river:"relabel_rules,attr,optional"`
UseIncomingTimestamp bool `river:"use_incoming_timestamp,attr,optional"`
}

func (a *Arguments) labelSet() model.LabelSet {
labelSet := make(model.LabelSet, len(a.Labels))
for k, v := range a.Labels {
labelSet[model.LabelName(k)] = model.LabelValue(v)
}
return labelSet
}

type Component struct {
opts component.Options
entriesChan chan loki.Entry
uncheckedCollector *util.UncheckedCollector

serverMut sync.Mutex
server *lokipush.PushAPIServer

// Use separate receivers mutex to address potential deadlock when Update drains the current server.
// e.g. https://github.com/grafana/agent/issues/3391
receiversMut sync.RWMutex
receivers []loki.LogsReceiver
}

func New(opts component.Options, args Arguments) (component.Component, error) {
c := &Component{
opts: opts,
entriesChan: make(chan loki.Entry),
receivers: args.ForwardTo,
uncheckedCollector: util.NewUncheckedCollector(nil),
}
opts.Registerer.MustRegister(c.uncheckedCollector)
err := c.Update(args)
if err != nil {
return nil, err
}
return c, nil
}

func (c *Component) Run(ctx context.Context) (err error) {
defer c.stop()

for {
select {
case entry := <-c.entriesChan:
c.receiversMut.RLock()
receivers := c.receivers
c.receiversMut.RUnlock()

for _, receiver := range receivers {
select {
case receiver <- entry:
case <-ctx.Done():
return
}
}
case <-ctx.Done():
return
}
}
}

func (c *Component) Update(args component.Arguments) error {
newArgs, ok := args.(Arguments)
if !ok {
return fmt.Errorf("invalid type of arguments: %T", args)
}

// if no server config provided, we'll use defaults
if newArgs.Server == nil {
newArgs.Server = &fnet.ServerConfig{}
}
// to avoid port conflicts, if no GRPC is configured, make sure we use a random port
// also, use localhost IP, so we don't require root to run.
if newArgs.Server.GRPC == nil {
newArgs.Server.GRPC = &fnet.GRPCConfig{
ListenPort: 0,
ListenAddress: "127.0.0.1",
}
}

c.receiversMut.Lock()
c.receivers = newArgs.ForwardTo
c.receiversMut.Unlock()

c.serverMut.Lock()
defer c.serverMut.Unlock()
serverNeedsRestarting := c.server == nil || !reflect.DeepEqual(c.server.ServerConfig(), *newArgs.Server)
if serverNeedsRestarting {
if c.server != nil {
c.server.Shutdown()
}

// [server.Server] registers new metrics every time it is created. To
// avoid issues with re-registering metrics with the same name, we create a
// new registry for the server every time we create one, and pass it to an
// unchecked collector to bypass uniqueness checking.
serverRegistry := prometheus.NewRegistry()
c.uncheckedCollector.SetCollector(serverRegistry)

var err error
c.server, err = lokipush.NewPushAPIServer(c.opts.Logger, newArgs.Server, loki.NewEntryHandler(c.entriesChan, func() {}), serverRegistry)
if err != nil {
return fmt.Errorf("failed to create embedded server: %v", err)
}
err = c.server.Run()
if err != nil {
return fmt.Errorf("failed to run embedded server: %v", err)
}
}

c.server.SetLabels(newArgs.labelSet())
c.server.SetRelabelRules(newArgs.RelabelRules)
c.server.SetKeepTimestamp(newArgs.UseIncomingTimestamp)

return nil
}

func (c *Component) stop() {
c.serverMut.Lock()
defer c.serverMut.Unlock()
if c.server != nil {
c.server.Shutdown()
c.server = nil
}
}
Loading

0 comments on commit af52a4e

Please sign in to comment.