Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add loki.source.api component #3648

Merged
merged 43 commits into from
May 5, 2023
Merged
Show file tree
Hide file tree
Changes from 38 commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
b47e950
Work in progress
thampiotr Apr 19, 2023
1ef9c37
basics working - wired things together
thampiotr Apr 20, 2023
9f7b4de
cleanin gup
thampiotr Apr 20, 2023
8a38eb5
actually sending to targets
thampiotr Apr 20, 2023
04734ba
WIP update
thampiotr Apr 21, 2023
6a78a85
update
thampiotr Apr 21, 2023
91d47c6
clean up
thampiotr Apr 21, 2023
c8a92cd
remove debut code
thampiotr Apr 21, 2023
717b4b7
Fix annoying log line
thampiotr Apr 21, 2023
e37da54
WIP tests
thampiotr Apr 25, 2023
b72dd12
make loki client visible to loki package
thampiotr Apr 25, 2023
de61636
tests for update
thampiotr Apr 26, 2023
d78b8f1
Address feedback part 1
thampiotr Apr 26, 2023
18bdbe8
Address feedback part 2 - moving loki clients
thampiotr Apr 26, 2023
ff68deb
Address feedback part 2.1 - fixing lint
thampiotr Apr 26, 2023
4fe36e3
Address feedback part 3 - rename component
thampiotr Apr 26, 2023
32f1dad
Merge branch 'main' into thampiotr/loki_source_http
thampiotr Apr 26, 2023
474895d
Add first draft of docs
thampiotr Apr 27, 2023
cce7a68
update changelog
thampiotr Apr 27, 2023
57b56f8
add to changelog & tweak example
thampiotr Apr 27, 2023
3f6669c
Merge remote-tracking branch 'origin/main' into thampiotr/loki_source…
thampiotr May 2, 2023
942edd5
Address PR comments
thampiotr May 2, 2023
63024ab
PR comments
thampiotr May 2, 2023
b942ee3
Merge branch 'main' into thampiotr/loki_source_http
thampiotr May 2, 2023
f0d38e0
reformat
thampiotr May 2, 2023
21fcc26
Merge branch 'main' into thampiotr/loki_source_http
thampiotr May 2, 2023
8f96158
improve the docs and change metrics namespace
thampiotr May 3, 2023
70f754a
better markdown formatting
thampiotr May 3, 2023
56d8f12
Merge remote-tracking branch 'origin/main' into thampiotr/loki_source…
thampiotr May 3, 2023
5dce94a
move to use the new shared server
thampiotr May 4, 2023
f6ade4a
minor fix to heroku docs
thampiotr May 4, 2023
c80432e
fix the metrics issue
thampiotr May 4, 2023
b3547ca
fix nil config issue
thampiotr May 4, 2023
cf2cf7f
add test for default endpoint
thampiotr May 4, 2023
011d2f4
docs updates & minor fix
thampiotr May 4, 2023
ef904e8
Merge remote-tracking branch 'origin/main' into thampiotr/loki_source…
thampiotr May 4, 2023
da2a3c9
fixlint
thampiotr May 4, 2023
d18bcc6
fixlint
thampiotr May 4, 2023
9704d05
Apply suggestions from code review
thampiotr May 5, 2023
2cec6b0
PR comments
thampiotr May 5, 2023
505368c
fix imports
thampiotr May 5, 2023
1470cef
docs on raw endpoint
thampiotr May 5, 2023
68cd869
timestamps doc
thampiotr May 5, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ Main (unreleased)
- `prometheus.operator.servicemonitors` discovers ServiceMonitor resources in your Kubernetes cluster and scrape
the targets they reference. (@captncraig, @marctc, @jcreixell)

- Added new Grafana Agent Flow components:
- `loki.source.api` - receive Loki log entries over HTTP (e.g. from other agents). (@thampiotr)
- Added coalesce function to river stdlib. (@jkroepke)

### Enhancements
Expand Down
1 change: 1 addition & 0 deletions component/all/all.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
_ "github.com/grafana/agent/component/loki/echo" // Import loki.echo
_ "github.com/grafana/agent/component/loki/process" // Import loki.process
_ "github.com/grafana/agent/component/loki/relabel" // Import loki.relabel
_ "github.com/grafana/agent/component/loki/source/api" // Import loki.source.api
_ "github.com/grafana/agent/component/loki/source/azure_event_hubs" // Import loki.source.azure_event_hubs
_ "github.com/grafana/agent/component/loki/source/cloudflare" // Import loki.source.cloudflare
_ "github.com/grafana/agent/component/loki/source/docker" // Import loki.source.docker
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,11 @@ func (c *Client) Chan() chan<- loki.Entry {
return c.entries
}

// LogsReceiver returns this client as a LogsReceiver, which is useful in testing.
func (c *Client) LogsReceiver() loki.LogsReceiver {
return c.entries
}

func (c *Client) Received() []loki.Entry {
c.mtx.Lock()
defer c.mtx.Unlock()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
"testing"
"time"

"github.com/grafana/agent/component/loki/internal/fake"
"github.com/grafana/agent/component/common/loki/client/fake"

"github.com/go-kit/log"
"github.com/grafana/dskit/backoff"
Expand Down
2 changes: 1 addition & 1 deletion component/common/loki/positions/positions.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ type Config struct {
ReadOnly bool `mapstructure:"-" yaml:"-"`
}

// RegisterFlags with prefix registers flags where every name is prefixed by
// RegisterFlagsWithPrefix registers flags where every name is prefixed by
// prefix. If prefix is a non-empty string, prefix should end with a period.
func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
f.DurationVar(&cfg.SyncPeriod, prefix+"positions.sync-period", 10*time.Second, "Period with this to sync the position file.")
Expand Down
2 changes: 1 addition & 1 deletion component/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ type Component interface {
// always match the struct type which the component registers.
//
// Update will be called concurrently with Run. The component must be able to
// gracefully handle updating its config will still running.
// gracefully handle updating its config while still running.
//
// An error may be returned if the provided config is invalid.
Update(args Arguments) error
Expand Down
2 changes: 1 addition & 1 deletion component/loki/process/internal/stages/pipeline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
"testing"
"time"

"github.com/grafana/agent/component/loki/internal/fake"
"github.com/grafana/agent/component/common/loki/client/fake"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
Expand Down
160 changes: 160 additions & 0 deletions component/loki/source/api/api.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
package api

import (
"context"
"fmt"
"reflect"
"sync"

"github.com/grafana/agent/component"
"github.com/grafana/agent/component/common/loki"
fnet "github.com/grafana/agent/component/common/net"
"github.com/grafana/agent/component/common/relabel"
"github.com/grafana/agent/component/loki/source/api/internal/lokipush"
"github.com/grafana/agent/pkg/util"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
)

func init() {
component.Register(component.Registration{
Name: "loki.source.api",
Args: Arguments{},
Build: func(opts component.Options, args component.Arguments) (component.Component, error) {
return New(opts, args.(Arguments))
},
})
}

type Arguments struct {
Server *fnet.ServerConfig `river:",squash"`
ForwardTo []loki.LogsReceiver `river:"forward_to,attr"`
Labels map[string]string `river:"labels,attr,optional"`
RelabelRules relabel.Rules `river:"relabel_rules,attr,optional"`
UseIncomingTimestamp bool `river:"use_incoming_timestamp,attr,optional"`
}

func (a *Arguments) labelSet() model.LabelSet {
labelSet := make(model.LabelSet, len(a.Labels))
for k, v := range a.Labels {
labelSet[model.LabelName(k)] = model.LabelValue(v)
}
return labelSet
}

type Component struct {
opts component.Options
entriesChan chan loki.Entry
uncheckedCollector *util.UncheckedCollector

serverMut sync.Mutex
server *lokipush.PushAPIServer

// Use separate receivers mutex to address potential deadlock when Update drains the current server.
// e.g. https://github.com/grafana/agent/issues/3391
receiversMut sync.RWMutex
receivers []loki.LogsReceiver
}

func New(opts component.Options, args Arguments) (component.Component, error) {
c := &Component{
opts: opts,
entriesChan: make(chan loki.Entry),
receivers: args.ForwardTo,
uncheckedCollector: util.NewUncheckedCollector(nil),
}
opts.Registerer.MustRegister(c.uncheckedCollector)
err := c.Update(args)
if err != nil {
return nil, err
}
return c, nil
}

func (c *Component) Run(ctx context.Context) (err error) {
defer c.stop()

for {
select {
case entry := <-c.entriesChan:
c.receiversMut.RLock()
receivers := c.receivers
c.receiversMut.RUnlock()

for _, receiver := range receivers {
select {
case receiver <- entry:
case <-ctx.Done():
return
}
}
case <-ctx.Done():
return
}
}
}

func (c *Component) Update(args component.Arguments) error {
newArgs, ok := args.(Arguments)
if !ok {
return fmt.Errorf("invalid type of arguments: %T", args)
}

// if no server config provided, we'll use defaults
if newArgs.Server == nil {
newArgs.Server = &fnet.ServerConfig{}
}
// to avoid port conflicts, if no GRPC is configured, make sure we use a random port
// also, use localhost IP, so we don't require root to run.
if newArgs.Server.GRPC == nil {
newArgs.Server.GRPC = &fnet.GRPCConfig{
ListenPort: 0,
ListenAddress: "127.0.0.1",
}
}

c.receiversMut.Lock()
c.receivers = newArgs.ForwardTo
c.receiversMut.Unlock()

c.serverMut.Lock()
defer c.serverMut.Unlock()
serverNeedsRestarting := c.server == nil || !reflect.DeepEqual(c.server.ServerConfig(), *newArgs.Server)
if serverNeedsRestarting {
if c.server != nil {
c.server.Shutdown()
}

// [server.Server] registers new metrics every time it is created. To
// avoid issues with re-registering metrics with the same name, we create a
// new registry for the server every time we create one, and pass it to an
// unchecked collector to bypass uniqueness checking.
serverRegistry := prometheus.NewRegistry()
c.uncheckedCollector.SetCollector(serverRegistry)

var err error
c.server, err = lokipush.NewPushAPIServer(c.opts.Logger, newArgs.Server, loki.NewEntryHandler(c.entriesChan, func() {}), serverRegistry)
if err != nil {
return fmt.Errorf("failed to create embedded server: %v", err)
}
err = c.server.Run()
if err != nil {
return fmt.Errorf("failed to run embedded server: %v", err)
}
}

c.server.SetLabels(newArgs.labelSet())
c.server.SetRelabelRules(newArgs.RelabelRules)
c.server.SetKeepTimestamp(newArgs.UseIncomingTimestamp)

return nil
}

func (c *Component) stop() {
c.serverMut.Lock()
defer c.serverMut.Unlock()
if c.server != nil {
c.server.Shutdown()
c.server = nil
}
}
Loading