Skip to content

Commit

Permalink
Merge pull request alibaba#115 from Lyt99/feature/custom-args
Browse files Browse the repository at this point in the history
feat(exporter): custom args
  • Loading branch information
jzwlqx committed Oct 10, 2023
2 parents d5bc117 + d7f7764 commit 9d4f976
Show file tree
Hide file tree
Showing 27 changed files with 398 additions and 53 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ require (
github.com/gorilla/mux v1.8.0
github.com/hashicorp/golang-lru/v2 v2.0.6
github.com/mdlayher/netlink v1.7.1
github.com/mitchellh/mapstructure v1.5.0
github.com/moby/ipvs v1.1.0
github.com/onsi/ginkgo/v2 v2.9.1
github.com/onsi/gomega v1.27.4
Expand Down Expand Up @@ -131,7 +132,6 @@ require (
github.com/mazznoer/csscolorparser v0.1.3 // indirect
github.com/mdlayher/socket v0.4.0 // indirect
github.com/miekg/pkcs11 v1.1.1 // indirect
github.com/mitchellh/mapstructure v1.5.0 // indirect
github.com/moby/locker v1.0.1 // indirect
github.com/moby/spdystream v0.2.0 // indirect
github.com/moby/sys/mountinfo v0.6.2 // indirect
Expand Down
4 changes: 2 additions & 2 deletions pkg/exporter/cmd/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,8 +233,8 @@ type EventSinkConfig struct {
}

type ProbeConfig struct {
Name string `yaml:"name" mapstructure:"name"`
Args interface{} `yaml:"args" mapstructure:"args"`
Name string `yaml:"name" mapstructure:"name"`
Args map[string]interface{} `yaml:"args" mapstructure:"args"`
}

type inspServer struct {
Expand Down
110 changes: 103 additions & 7 deletions pkg/exporter/probe/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,37 +2,133 @@ package probe

import (
"fmt"
"reflect"

"github.com/alibaba/kubeskoop/pkg/exporter/nettop"
"golang.org/x/exp/slog"
)

var (
availableEventProbe = make(map[string]EventProbeCreator)
availableEventProbe = make(map[string]*eventProbeCreator)
)

type EventProbeCreator func(sink chan<- *Event, args map[string]interface{}) (EventProbe, error)
type eventProbeCreator struct {
f reflect.Value
s *reflect.Type
}

func newEventProbeCreator(creator interface{}) (*eventProbeCreator, error) {
t := reflect.TypeOf(creator)
err := validateProbeCreatorReturnValue[EventProbe](t)
if err != nil {
return nil, err
}

if t.NumIn() != 1 && t.NumIn() != 2 {
return nil, fmt.Errorf("input parameter count of creator should be either 1 or 2")
}

ct := t.In(0)
et := reflect.TypeOf((*Event)(nil))
if ct.Kind() != reflect.Chan || ct.ChanDir() != reflect.SendDir || ct.Elem() != et {
return nil, fmt.Errorf("first input parameter type should be chan<- *Event")
}

ret := &eventProbeCreator{
f: reflect.ValueOf(creator),
}

if t.NumIn() == 2 {
st := t.In(1)
if err := validateParamTypeMapOrStruct(st); err != nil {
return nil, err
}
ret.s = &st
}

func MustRegisterEventProbe(name string, creator EventProbeCreator) {
return ret, nil
}

func (e *eventProbeCreator) Call(sink chan<- *Event, args map[string]interface{}) (EventProbe, error) {
in := []reflect.Value{
reflect.ValueOf(sink),
}
if e.s != nil {
s, err := createStructFromTypeWithArgs(*e.s, args)
if err != nil {
return nil, err
}
in = append(in, s)
}

result := e.f.Call(in)
// return parameter count and type has been checked in newEventProbeCreator
ret := result[0].Interface().(EventProbe)
err := result[1].Interface()
if err == nil {
return ret, nil
}
return ret, err.(error)
}

// MustRegisterEventProbe registers the event probe by given name and creator.
// The creator is a function that creates EventProbe. Return values of the creator
// must be (EventProbe, error). The creator can accept one parameter
// of type chan<- *Event, or struct/map as an extra parameter.
// When the creator specifies the extra parameter, the configuration of the probe in the configuration file
// will be passed to the creator when the probe is created. For example:
//
// The creator accepts no extra args.
//
// func eventProbeCreator(sink chan<- *Event) (EventProbe, error)
//
// The creator accepts struct "probeArgs" as args. Names of struct fields are case-insensitive.
//
// // Config in yaml
// args:
// argA: test
// argB: 20
// argC:
// - a
// // Struct definition
// type probeArgs struct {
// ArgA string
// ArgB int
// ArgC []string
// }
// // The creator function:
// func eventProbeCreator(sink chan<- *Event, args probeArgs) (EventProbe, error)
//
// The creator can also use a map with string keys as parameters.
// However, if you use a type other than interface{} as the value type, errors may occur
// during the configuration parsing process.
//
// func metricsProbeCreator(sink chan<- *Event, args map[string]string) (EventProbe, error)
// func metricsProbeCreator(sink chan<- *Event, args map[string]interface{} (EventProbe, error)
func MustRegisterEventProbe(name string, creator interface{}) {
if _, ok := availableEventProbe[name]; ok {
panic(fmt.Errorf("duplicated event probe %s", name))
}

availableEventProbe[name] = creator
c, err := newEventProbeCreator(creator)
if err != nil {
panic(fmt.Errorf("error register event probe %s: %s", name, err))
}

availableEventProbe[name] = c
}

func NewEventProbe(name string, simpleProbe SimpleProbe) EventProbe {
return NewProbe(name, simpleProbe)
}

func CreateEventProbe(name string, sink chan<- *Event, _ interface{}) (EventProbe, error) {
func CreateEventProbe(name string, sink chan<- *Event, args map[string]interface{}) (EventProbe, error) {
creator, ok := availableEventProbe[name]
if !ok {
return nil, fmt.Errorf("undefined probe %s", name)
}

//TODO reflect creator's arguments
return creator(sink, nil)
return creator.Call(sink, args)
}

func ListEventProbes() []string {
Expand Down
32 changes: 18 additions & 14 deletions pkg/exporter/probe/flow/flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,16 +32,25 @@ const (
)

var (
dev = "eth0"
probeName = "flow"
)

func init() {
probe.MustRegisterMetricsProbe(probeName, metricsProbeCreator)
}

func metricsProbeCreator(_ map[string]interface{}) (probe.MetricsProbe, error) {
p := &metricsProbe{}
type flowArgs struct {
Dev string
}

func metricsProbeCreator(args flowArgs) (probe.MetricsProbe, error) {
if args.Dev == "" {
args.Dev = "eth0"
}

p := &metricsProbe{
args: args,
}
opts := probe.BatchMetricsOpts{
Namespace: probe.MetricsNamespace,
Subsystem: probeName,
Expand All @@ -57,6 +66,7 @@ func metricsProbeCreator(_ map[string]interface{}) (probe.MetricsProbe, error) {

type metricsProbe struct {
bpfObjs bpfObjects
args flowArgs
}

func (p *metricsProbe) Start(_ context.Context) error {
Expand All @@ -80,7 +90,7 @@ func (p *metricsProbe) Stop(_ context.Context) error {

func (p *metricsProbe) cleanup() error {
//TODO only clean qdisc after replace qdisc successfully
link, err := netlink.LinkByName(dev)
link, err := netlink.LinkByName(p.args.Dev)
if err == nil {
_ = cleanQdisc(link)
}
Expand Down Expand Up @@ -203,15 +213,9 @@ func (p *metricsProbe) loadBPF() error {

opts := ebpf.CollectionOptions{}

//TODO 优化btf文件的查找方式
btf, err := bpfutil.LoadBTFFromFile("/sys/kernel/btf/vmlinux")
if err != nil {
panic(err)
}

opts.Programs = ebpf.ProgramOptions{
LogLevel: ebpf.LogLevelInstruction | ebpf.LogLevelBranch | ebpf.LogLevelStats,
KernelTypes: btf,
KernelTypes: bpfutil.LoadBTFSpecOrNil(),
}

// Load pre-compiled programs and maps into the kernel.
Expand All @@ -222,17 +226,17 @@ func (p *metricsProbe) loadBPF() error {
}

func (p *metricsProbe) loadAndAttachBPF() error {
eth0, err := netlink.LinkByName(dev)
eth0, err := netlink.LinkByName(p.args.Dev)
if err != nil {
return fmt.Errorf("fail get link %s, err: %w", dev, err)
return fmt.Errorf("fail get link %s, err: %w", p.args.Dev, err)
}

if err := p.loadBPF(); err != nil {
return err
}

if err := p.setupTCFilter(eth0); err != nil {
return fmt.Errorf("failed replace %s qdisc with clsact, err: %v", dev, err)
return fmt.Errorf("failed replace %s qdisc with clsact, err: %v", p.args.Dev, err)
}
return nil
}
Expand Down
103 changes: 95 additions & 8 deletions pkg/exporter/probe/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package probe
import (
"errors"
"fmt"
"reflect"

log "github.com/sirupsen/logrus"

Expand All @@ -14,28 +15,114 @@ const LegacyMetricsNamespace = "inspector"
const MetricsNamespace = "kubeskoop"

var (
availableMetricsProbes = make(map[string]MetricsProbeCreator)
availableMetricsProbes = make(map[string]*metricsProbeCreator)
ErrUndeclaredMetrics = errors.New("undeclared metrics")
)

type MetricsProbeCreator func(args map[string]interface{}) (MetricsProbe, error)
type metricsProbeCreator struct {
f reflect.Value
s *reflect.Type
}

func newMetricProbeCreator(creator interface{}) (*metricsProbeCreator, error) {
t := reflect.TypeOf(creator)
err := validateProbeCreatorReturnValue[MetricsProbe](t)
if err != nil {
return nil, err
}

if t.NumIn() > 1 {
return nil, fmt.Errorf("input parameter count of creator should be either 0 or 1")
}

func MustRegisterMetricsProbe(name string, creator MetricsProbeCreator) {
ret := &metricsProbeCreator{
f: reflect.ValueOf(creator),
}

if t.NumIn() == 1 {
st := t.In(0)
if err := validateParamTypeMapOrStruct(st); err != nil {
return nil, err
}
ret.s = &st
}

return ret, nil
}

func (m *metricsProbeCreator) Call(args map[string]interface{}) (MetricsProbe, error) {
var in []reflect.Value
if m.s != nil {
s, err := createStructFromTypeWithArgs(*m.s, args)
if err != nil {
return nil, err
}
in = append(in, s)
}

result := m.f.Call(in)
// return parameter count and type has been checked in newMetricProbeCreator
ret := result[0].Interface().(MetricsProbe)
err := result[1].Interface()
if err == nil {
return ret, nil
}
return ret, err.(error)
}

// MustRegisterMetricsProbe registers the metrics probe by given name and creator.
// The creator is a function that creates MetricProbe. Return values of the creator
// must be (MetricsProbe, error). The creator can accept no parameter, or struct/map as a parameter.
// When the creator specifies the parameter, the configuration of the probe in the configuration file
// will be passed to the creator when the probe is created. For example:
//
// The creator accepts no extra args.
//
// func metricsProbeCreator() (MetricsProbe, error)
//
// The creator accepts struct "probeArgs" as args. Names of struct fields are case-insensitive.
//
// // Config in yaml
// args:
// argA: test
// argB: 20
// argC:
// - a
// // Struct definition
// type probeArgs struct {
// ArgA string
// ArgB int
// ArgC []string
// }
// // The creator function:
// func metricsProbeCreator(args probeArgs) (MetricsProbe, error)
//
// The creator can also use a map with string keys as parameters.
// However, if you use a type other than interface{} as the value type, errors may occur
// during the configuration parsing process.
//
// func metricsProbeCreator(args map[string]string) (MetricsProbe, error)
// func metricsProbeCreator(args map[string]interface{} (MetricsProbe, error)
func MustRegisterMetricsProbe(name string, creator interface{}) {
if _, ok := availableMetricsProbes[name]; ok {
panic(fmt.Errorf("duplicated event probe %s", name))
panic(fmt.Errorf("duplicated metric probe %s", name))
}

c, err := newMetricProbeCreator(creator)
if err != nil {
panic(fmt.Errorf("error register metric probe %s: %s", name, err))
}

availableMetricsProbes[name] = creator
availableMetricsProbes[name] = c
}

func CreateMetricsProbe(name string, _ interface{}) (MetricsProbe, error) {
func CreateMetricsProbe(name string, args map[string]interface{}) (MetricsProbe, error) {
creator, ok := availableMetricsProbes[name]
if !ok {
return nil, fmt.Errorf("undefined probe %s", name)
}

//TODO reflect creator's arguments
return creator(nil)
return creator.Call(args)
}

func ListMetricsProbes() []string {
Expand Down
2 changes: 1 addition & 1 deletion pkg/exporter/probe/nlconntrack/conntrackevents.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ var (
probeName = "conntrack"
)

func eventProbeCreator(sink chan<- *probe.Event, _ map[string]interface{}) (probe.EventProbe, error) {
func eventProbeCreator(sink chan<- *probe.Event) (probe.EventProbe, error) {
p := &conntrackEventProbe{
sink: sink,
}
Expand Down
Loading

0 comments on commit 9d4f976

Please sign in to comment.