Skip to content

Commit

Permalink
add newline on event stderr&file
Browse files Browse the repository at this point in the history
  • Loading branch information
jzwlqx committed Sep 26, 2023
1 parent 5d35e9a commit 72b30da
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 11 deletions.
1 change: 1 addition & 0 deletions pkg/exporter/sink/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ func (f *FileSink) Write(event *probe.Event) error {
return fmt.Errorf("failed marshal event, err: %w", err)
}
_, err = f.file.Write(data)
f.file.Write([]byte{0x0a})

if err != nil {
return fmt.Errorf("failed sink event to file %s, err: %w", f.file.Name(), err)
Expand Down
36 changes: 34 additions & 2 deletions pkg/exporter/sink/loki.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,26 @@ package sink
import (
"encoding/json"
"fmt"
"net/url"
"strings"
"time"

log "github.com/sirupsen/logrus"

"github.com/afiskon/promtail-client/promtail"
"github.com/alibaba/kubeskoop/pkg/exporter/probe"
)

func NewLokiSink(addr string, node string) (*LokiSink, error) {
labels := `{instance = "%s",job = "inspector"}`
url, err := buildURL(addr)
if err != nil {
return nil, fmt.Errorf("failed parse addr, not a valild url, err: %w", err)
}
log.Infof("create loki client with url %s", url)

labels := `{instance = "%s",job = "kubeskoop"}`
conf := promtail.ClientConfig{
PushURL: addr,
PushURL: url,
Labels: fmt.Sprintf(labels, node),
BatchWait: 5 * time.Second,
BatchEntriesNumber: 10000,
Expand All @@ -28,15 +38,37 @@ func NewLokiSink(addr string, node string) (*LokiSink, error) {
}, nil
}

func buildURL(addr string) (string, error) {
if !strings.HasPrefix(addr, "http://") || !strings.HasPrefix(addr, "https://") {
addr = "http://" + addr
}
u, err := url.Parse(addr)
if err != nil {
return "", err
}

if u.Path == "" {
u.Path = "/api/prom/push"
}

if u.Port() == "" {
u.Host = fmt.Sprintf("%s:%s", u.Hostname(), "3100")
}

return u.String(), nil
}

type LokiSink struct {
client promtail.Client
}

func (l *LokiSink) Write(event *probe.Event) error {
log.Error("try sink event in loki")
data, err := json.Marshal(event)
if err != nil {
return fmt.Errorf("failed marshal event, err: %w", err)
}
log.Error("after sink event in loki, data: %v", string(data))

l.client.Infof(string(data))
return nil
Expand Down
12 changes: 9 additions & 3 deletions pkg/exporter/sink/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,12 @@ import (
"github.com/alibaba/kubeskoop/pkg/exporter/probe"
)

const (
Stderr = "stderr"
File = "file"
Loki = "loki"
)

type Sink interface {
Write(event *probe.Event) error
}
Expand All @@ -16,12 +22,12 @@ func CreateSink(name string, args interface{}) (Sink, error) {
argsMap, _ := args.(map[string]interface{})

switch name {
case "stderr":
case Stderr:
return NewStderrSink(), nil
case "loki":
case Loki:
addr := argsMap["addr"].(string)
return NewLokiSink(addr, nettop.GetNodeName())
case "file":
case File:
path := argsMap["path"].(string)
return NewFileSink(path)
}
Expand Down
12 changes: 6 additions & 6 deletions pkg/exporter/sink/stderr.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,21 +8,21 @@ import (
"github.com/alibaba/kubeskoop/pkg/exporter/probe"
)

type Stderr struct {
type StderrSink struct {
}

func NewStderrSink() *Stderr {
return &Stderr{}
func NewStderrSink() *StderrSink {
return &StderrSink{}
}

func (s Stderr) Write(event *probe.Event) error {
func (s StderrSink) Write(event *probe.Event) error {
data, err := json.Marshal(event)
if err != nil {
return fmt.Errorf("failed marshal event, err: %w", err)
}

fmt.Fprintf(os.Stderr, "event: %s", string(data))
fmt.Fprintf(os.Stderr, "event: %s\n", string(data))
return nil
}

var _ Sink = &Stderr{}
var _ Sink = &StderrSink{}

0 comments on commit 72b30da

Please sign in to comment.