Skip to content

Commit

Permalink
promtail config conversions part 3 (grafana#4638)
Browse files Browse the repository at this point in the history
* Add windows events and kafka

* Add azure event hubs

* Add gelf

* Add heroku
  • Loading branch information
thampiotr committed Jul 28, 2023
1 parent 3b999a0 commit c6f9be5
Show file tree
Hide file tree
Showing 22 changed files with 600 additions and 12 deletions.
1 change: 1 addition & 0 deletions component/loki/source/windowsevent/component_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ func (c *Component) Update(args component.Arguments) error {
_ = f.Close()
}

// TODO: Add support for labels - see https://github.com/grafana/agent/issues/4634 for more details
winTarget, err := windows.New(c.opts.Logger, c.handle, nil, convertConfig(newArgs))
if err != nil {
return err
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package build

import (
"github.com/grafana/agent/component/common/relabel"
"github.com/grafana/agent/component/loki/source/azure_event_hubs"
"github.com/grafana/agent/converter/internal/common"
)

func (s *ScrapeConfigBuilder) AppendAzureEventHubs() {
if s.cfg.AzureEventHubsConfig == nil {
return
}
aCfg := s.cfg.AzureEventHubsConfig
args := azure_event_hubs.Arguments{
FullyQualifiedNamespace: aCfg.FullyQualifiedNamespace,
EventHubs: aCfg.EventHubs,
Authentication: azure_event_hubs.AzureEventHubsAuthentication{
ConnectionString: aCfg.ConnectionString,
},
GroupID: aCfg.GroupID,
UseIncomingTimestamp: aCfg.UseIncomingTimestamp,
DisallowCustomMessages: aCfg.DisallowCustomMessages,
RelabelRules: relabel.Rules{},
Labels: convertPromLabels(aCfg.Labels),
ForwardTo: s.getOrNewProcessStageReceivers(),
}
override := func(val interface{}) interface{} {
switch val.(type) {
case relabel.Rules:
return common.CustomTokenizer{Expr: s.getOrNewDiscoveryRelabelRules()}
default:
return val
}
}
compLabel := common.LabelForParts(s.globalCtx.LabelPrefix, s.cfg.JobName)
s.f.Body().AppendBlock(common.NewBlockWithOverrideFn(
[]string{"loki", "source", "azure_event_hubs"},
compLabel,
args,
override,
))
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package build

import (
"fmt"
"time"

"github.com/grafana/agent/component/common/loki"
Expand All @@ -26,7 +25,7 @@ func (s *ScrapeConfigBuilder) AppendCloudFlareConfig() {
override := func(val interface{}) interface{} {
switch conv := val.(type) {
case []loki.LogsReceiver:
return common.CustomTokenizer{Expr: fmt.Sprintf("[%s]", s.getOrNewLokiRelabel())}
return common.CustomTokenizer{Expr: s.getOrNewLokiRelabel()}
case rivertypes.Secret:
return string(conv)
default:
Expand Down
35 changes: 35 additions & 0 deletions converter/internal/promtailconvert/internal/build/gelf.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package build

import (
"github.com/grafana/agent/component/common/relabel"
"github.com/grafana/agent/component/loki/source/gelf"
"github.com/grafana/agent/converter/internal/common"
)

func (s *ScrapeConfigBuilder) AppendGelfConfig() {
if s.cfg.GelfConfig == nil {
return
}
gCfg := s.cfg.GelfConfig
args := gelf.Arguments{
ListenAddress: gCfg.ListenAddress,
UseIncomingTimestamp: gCfg.UseIncomingTimestamp,
RelabelRules: relabel.Rules{},
Receivers: s.getOrNewProcessStageReceivers(),
}
override := func(val interface{}) interface{} {
switch val.(type) {
case relabel.Rules:
return common.CustomTokenizer{Expr: s.getOrNewDiscoveryRelabelRules()}
default:
return val
}
}
compLabel := common.LabelForParts(s.globalCtx.LabelPrefix, s.cfg.JobName)
s.f.Body().AppendBlock(common.NewBlockWithOverrideFn(
[]string{"loki", "source", "gelf"},
compLabel,
args,
override,
))
}
36 changes: 36 additions & 0 deletions converter/internal/promtailconvert/internal/build/herokudrain.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package build

import (
"github.com/grafana/agent/component/common/relabel"
"github.com/grafana/agent/component/loki/source/heroku"
"github.com/grafana/agent/converter/internal/common"
)

func (s *ScrapeConfigBuilder) AppendHerokuDrainConfig() {
if s.cfg.HerokuDrainConfig == nil {
return
}
hCfg := s.cfg.HerokuDrainConfig
args := heroku.Arguments{
Server: common.WeaveWorksServerToFlowServer(hCfg.Server),
Labels: convertPromLabels(hCfg.Labels),
UseIncomingTimestamp: hCfg.UseIncomingTimestamp,
ForwardTo: s.getOrNewProcessStageReceivers(),
RelabelRules: relabel.Rules{},
}
override := func(val interface{}) interface{} {
switch val.(type) {
case relabel.Rules:
return common.CustomTokenizer{Expr: s.getOrNewDiscoveryRelabelRules()}
default:
return val
}
}
compLabel := common.LabelForParts(s.globalCtx.LabelPrefix, s.cfg.JobName)
s.f.Body().AppendBlock(common.NewBlockWithOverrideFn(
[]string{"loki", "source", "heroku"},
compLabel,
args,
override,
))
}
60 changes: 60 additions & 0 deletions converter/internal/promtailconvert/internal/build/kafka.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package build

import (
"github.com/grafana/agent/component/common/relabel"
"github.com/grafana/agent/component/loki/source/kafka"
"github.com/grafana/agent/converter/internal/common"
"github.com/grafana/agent/converter/internal/prometheusconvert"
"github.com/grafana/agent/pkg/river/rivertypes"
"github.com/grafana/loki/clients/pkg/promtail/scrapeconfig"
)

func (s *ScrapeConfigBuilder) AppendKafka() {
if s.cfg.KafkaConfig == nil {
return
}
kafkaCfg := s.cfg.KafkaConfig
args := kafka.Arguments{
Brokers: kafkaCfg.Brokers,
Topics: kafkaCfg.Topics,
GroupID: kafkaCfg.GroupID,
Assignor: kafkaCfg.Assignor,
Version: kafkaCfg.Version,
Authentication: convertKafkaAuthConfig(kafkaCfg),
UseIncomingTimestamp: kafkaCfg.UseIncomingTimestamp,
Labels: convertPromLabels(kafkaCfg.Labels),
ForwardTo: s.getOrNewProcessStageReceivers(),
RelabelRules: relabel.Rules{},
}
override := func(val interface{}) interface{} {
switch value := val.(type) {
case relabel.Rules:
return common.CustomTokenizer{Expr: s.getOrNewDiscoveryRelabelRules()}
case rivertypes.Secret:
return string(value)
default:
return val
}
}
compLabel := common.LabelForParts(s.globalCtx.LabelPrefix, s.cfg.JobName)
s.f.Body().AppendBlock(common.NewBlockWithOverrideFn(
[]string{"loki", "source", "kafka"},
compLabel,
args,
override,
))
}

func convertKafkaAuthConfig(kafkaCfg *scrapeconfig.KafkaTargetConfig) kafka.KafkaAuthentication {
return kafka.KafkaAuthentication{
Type: string(kafkaCfg.Authentication.Type),
TLSConfig: *prometheusconvert.ToTLSConfig(&kafkaCfg.Authentication.TLSConfig),
SASLConfig: kafka.KafkaSASLConfig{
Mechanism: string(kafkaCfg.Authentication.SASLConfig.Mechanism),
User: kafkaCfg.Authentication.SASLConfig.User,
Password: kafkaCfg.Authentication.SASLConfig.Password.String(),
UseTLS: kafkaCfg.Authentication.SASLConfig.UseTLS,
TLSConfig: *prometheusconvert.ToTLSConfig(&kafkaCfg.Authentication.SASLConfig.TLSConfig),
},
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func (s *ScrapeConfigBuilder) getOrNewLokiRelabel() string {
}
compLabel := common.LabelForParts(s.globalCtx.LabelPrefix, s.cfg.JobName)
s.f.Body().AppendBlock(common.NewBlockWithOverride([]string{"loki", "relabel"}, compLabel, args))
s.lokiRelabelReceiverExpr = "loki.relabel." + compLabel + ".receiver"
s.lokiRelabelReceiverExpr = "[loki.relabel." + compLabel + ".receiver]"
}
return s.lokiRelabelReceiverExpr
}
Expand Down Expand Up @@ -239,7 +239,7 @@ func convertPromLabels(labels model.LabelSet) map[string]string {
func logsReceiversToExpr(r []loki.LogsReceiver) string {
var exprs []string
for _, r := range r {
clr := r.(*common.ConvertLogsReceiver)
clr := r.(common.ConvertLogsReceiver)
exprs = append(exprs, clr.Expr)
}
return "[" + strings.Join(exprs, ", ") + "]"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package build

import (
"github.com/grafana/agent/component/common/loki"
"github.com/grafana/agent/component/loki/source/windowsevent"
"github.com/grafana/agent/converter/diag"
"github.com/grafana/agent/converter/internal/common"
)

func (s *ScrapeConfigBuilder) AppendWindowsEventsConfig() {
if s.cfg.WindowsConfig == nil {
return
}
winCfg := s.cfg.WindowsConfig
if len(winCfg.Labels) != 0 {
// TODO: Add support for labels - see https://github.com/grafana/agent/issues/4634 for more details
s.diags.Add(diag.SeverityLevelError, "windows_events.labels are currently not supported")
}
args := windowsevent.Arguments{
Locale: int(winCfg.Locale),
EventLogName: winCfg.EventlogName,
XPathQuery: winCfg.Query,
BookmarkPath: winCfg.BookmarkPath,
PollInterval: winCfg.PollInterval,
ExcludeEventData: winCfg.ExcludeEventData,
ExcludeUserdata: winCfg.ExcludeUserData,
UseIncomingTimestamp: winCfg.UseIncomingTimestamp,
ForwardTo: make([]loki.LogsReceiver, 0),
}

override := func(val interface{}) interface{} {
switch val.(type) {
case []loki.LogsReceiver:
return common.CustomTokenizer{Expr: s.getOrNewLokiRelabel()}
default:
return val
}
}
compLabel := common.LabelForParts(s.globalCtx.LabelPrefix, s.cfg.JobName)
s.f.Body().AppendBlock(common.NewBlockWithOverrideFn(
[]string{"loki", "source", "windowsevent"},
compLabel,
args,
override,
))
}
13 changes: 5 additions & 8 deletions converter/internal/promtailconvert/promtailconvert.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,14 +131,6 @@ func appendScrapeConfig(
//Encoding string `mapstructure:"encoding,omitempty" yaml:"encoding,omitempty"`
//DecompressionCfg *DecompressionConfig `yaml:"decompression,omitempty"`

//TODO(thampiotr): support/warn about the following log producing promtail configs:
//GcplogConfig *GcplogTargetConfig `mapstructure:"gcplog,omitempty" yaml:"gcplog,omitempty"`
//WindowsConfig *WindowsEventsTargetConfig `mapstructure:"windows_events,omitempty" yaml:"windows_events,omitempty"`
//KafkaConfig *KafkaTargetConfig `mapstructure:"kafka,omitempty" yaml:"kafka,omitempty"`
//AzureEventHubsConfig *AzureEventHubsTargetConfig `mapstructure:"azure_event_hubs,omitempty" yaml:"azure_event_hubs,omitempty"`
//GelfConfig *GelfTargetConfig `mapstructure:"gelf,omitempty" yaml:"gelf,omitempty"`
//HerokuDrainConfig *HerokuDrainTargetConfig `mapstructure:"heroku_drain,omitempty" yaml:"heroku_drain,omitempty"`

b := build.NewScrapeConfigBuilder(f, diags, cfg, gctx)
b.Validate()

Expand Down Expand Up @@ -168,4 +160,9 @@ func appendScrapeConfig(
b.AppendPushAPI()
b.AppendSyslogConfig()
b.AppendGCPLog()
b.AppendWindowsEventsConfig()
b.AppendKafka()
b.AppendAzureEventHubs()
b.AppendGelfConfig()
b.AppendHerokuDrainConfig()
}
37 changes: 37 additions & 0 deletions converter/internal/promtailconvert/testdata/azure_event_hubs.river
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
discovery.relabel "fun" {
targets = []

rule {
source_labels = ["__trail__"]
target_label = "__path__"
}
}

loki.source.azure_event_hubs "fun" {
fully_qualified_namespace = "robin.servicebus.windows.net"
event_hubs = ["hub0", "hub1"]

authentication {
mechanism = ""
connection_string = "Endpoint=sb://<NamespaceName>.servicebus.windows.net/;SharedAccessKeyName=<KeyName>;SharedAccessKey=<KeyValue>"
}
group_id = "my_consumer_group"
use_incoming_timestamp = true
disallow_custom_messages = true
relabel_rules = discovery.relabel.fun.rules
labels = {
job = "fun",
quality = "excellent",
}
assignor = ""
forward_to = [loki.write.default.receiver]
}

loki.write "default" {
endpoint {
url = "http://localhost/loki/api/v1/push"
follow_redirects = false
enable_http2 = false
}
external_labels = {}
}
22 changes: 22 additions & 0 deletions converter/internal/promtailconvert/testdata/azure_event_hubs.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
clients:
- url: http://localhost/loki/api/v1/push
scrape_configs:
- job_name: fun
relabel_configs:
- source_labels:
- __trail__
target_label: __path__
azure_event_hubs:
labels:
job: fun
quality: excellent
use_incoming_timestamp: true
event_hubs:
- hub0
- hub1
connection_string: Endpoint=sb://<NamespaceName>.servicebus.windows.net/;SharedAccessKeyName=<KeyName>;SharedAccessKey=<KeyValue>
fully_qualified_namespace: robin.servicebus.windows.net
group_id: my_consumer_group
disallow_custom_messages: true
tracing: {enabled: false}
server: {register_instrumentation: false}
24 changes: 24 additions & 0 deletions converter/internal/promtailconvert/testdata/gelf.river
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
discovery.relabel "fun" {
targets = []

rule {
source_labels = ["__trail__"]
target_label = "__path__"
}
}

loki.source.gelf "fun" {
listen_address = "localhost:12201"
use_incoming_timestamp = true
relabel_rules = discovery.relabel.fun.rules
forward_to = [loki.write.default.receiver]
}

loki.write "default" {
endpoint {
url = "http://localhost/loki/api/v1/push"
follow_redirects = false
enable_http2 = false
}
external_labels = {}
}
16 changes: 16 additions & 0 deletions converter/internal/promtailconvert/testdata/gelf.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
clients:
- url: http://localhost/loki/api/v1/push
scrape_configs:
- job_name: fun
relabel_configs:
- source_labels:
- __trail__
target_label: __path__
gelf:
listen_address: localhost:12201
labels:
job: fun
quality: excellent
use_incoming_timestamp: true
tracing: {enabled: false}
server: {register_instrumentation: false}
Loading

0 comments on commit c6f9be5

Please sign in to comment.