Skip to content

Commit

Permalink
flow metrics prober: support dynamic interfaces
Browse files Browse the repository at this point in the history
example:
```
metrics:
  probes:
  - name: flow
    args:
       interface-name: eth*
```
  • Loading branch information
jzwlqx committed Nov 15, 2023
1 parent da1811d commit 80452b1
Show file tree
Hide file tree
Showing 2 changed files with 270 additions and 62 deletions.
293 changes: 231 additions & 62 deletions pkg/exporter/probe/flow/flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"encoding/binary"
"fmt"
"strings"
"sync"
"syscall"

"github.com/prometheus/client_golang/prometheus"

Expand Down Expand Up @@ -40,17 +42,177 @@ func init() {
}

type flowArgs struct {
Dev string
Dev string `mapstructure:"interface-name"`
}

func getDefaultRouteDevice() (netlink.Link, error) {
filter := &netlink.Route{
Dst: nil,
}
routers, err := netlink.RouteListFiltered(syscall.AF_INET, filter, netlink.RT_FILTER_DST)
if err != nil {
return nil, err
}

if len(routers) == 0 {
return nil, fmt.Errorf("no default route found")
}

if len(routers) > 1 {
return nil, fmt.Errorf("multi default route found")
}

link, err := netlink.LinkByIndex(routers[0].LinkIndex)
if err != nil {
return nil, err
}
return link, nil
}

type linkFlowHelper interface {
start() error
stop() error
}

type dynamicLinkFlowHelper struct {
bpfObjs *bpfObjects
pattern string
done chan struct{}
flows map[int]*ebpfFlow
lock sync.Mutex
}

func (h *dynamicLinkFlowHelper) tryStartLinkFlow(link netlink.Link) {
log.Infof("flow: try start flow on nic %s, index %d", link.Attrs().Name, link.Attrs().Index)
if _, ok := h.flows[link.Attrs().Index]; ok {
log.Warnf("new interface(%s) index %d already exists, skip process", link.Attrs().Name, link.Attrs().Index)
return
}
flow := &ebpfFlow{
dev: link,
bpfObjs: h.bpfObjs,
}

if err := flow.start(); err != nil {
log.Errorf("failed start flow on dev %s", link.Attrs().Name)
return
}

h.flows[link.Attrs().Index] = flow
}

func (h *dynamicLinkFlowHelper) tryStopLinkFlow(name string, index int) {
log.Infof("flow: try stop flow on nic %s, index %d", name, index)
flow, ok := h.flows[index]
if !ok {
log.Warnf("deleted interface index %d not exists, skip process", index)
return
}
_ = flow.stop()
delete(h.flows, index)
}

func (h *dynamicLinkFlowHelper) start() error {
h.done = make(chan struct{})
ch := make(chan netlink.LinkUpdate)
links, err := netlink.LinkList()
if err != nil {
return fmt.Errorf("%s error list link, err: %w", probeName, err)
}
for _, link := range links {
if !strings.HasSuffix(link.Attrs().Name, h.pattern) {
continue
}
h.tryStartLinkFlow(link)
}
go func() {
if err := netlink.LinkSubscribe(ch, h.done); err != nil {
log.Errorf("%s error watch link change, err: %v", probeName, err)
close(h.done)
}
}()
go func() {
h.lock.Lock()
defer h.lock.Unlock()
for {
select {
case change := <-ch:
if !strings.HasSuffix(change.Attrs().Name, h.pattern) {
break
}
switch change.Header.Type {
case syscall.RTM_NEWLINK:
link, err := netlink.LinkByIndex(int(change.Index))
if err != nil {
log.Errorf("failed get new created link by index %d, name %s, err: %v", change.Index, change.Attrs().Name, err)
break
}
h.tryStartLinkFlow(link)
case syscall.RTM_DELLINK:
h.tryStopLinkFlow(change.Attrs().Name, int(change.Index))
}
case <-h.done:
return
}
}
}()
return nil
}

func (h *dynamicLinkFlowHelper) stop() error {
close(h.done)
h.lock.Lock()
defer h.lock.Unlock()
var first error
for _, flow := range h.flows {
if err := flow.stop(); err != nil {
if first == nil {
first = err
}
}
}
return first
}

func metricsProbeCreator(args flowArgs) (probe.MetricsProbe, error) {
p := &metricsProbe{}

if args.Dev == "" {
args.Dev = "eth0"
}
log.Infof("flow: auto detect network device with default route")
dev, err := getDefaultRouteDevice()
if err != nil {
return nil, fmt.Errorf("fail detect default route dev, err: %w", err)
}
log.Infof("flow: default network device %s", dev.Attrs().Name)

p := &metricsProbe{
args: args,
p.helper = &ebpfFlow{
dev: dev,
bpfObjs: &p.bpfObjs,
}
} else {
pattern := strings.TrimSuffix(args.Dev, "*")
if pattern != args.Dev {
log.Infof("flow: network device pattern %s", pattern)
p.helper = &dynamicLinkFlowHelper{
bpfObjs: &p.bpfObjs,
pattern: pattern,
done: make(chan struct{}),
flows: make(map[int]*ebpfFlow),
}
} else {
link, err := netlink.LinkByName(pattern)
if err != nil {
return nil, fmt.Errorf("cannot get network interface by name %s, err: %w", pattern, err)
}

log.Infof("flow: network device %s", pattern)
p.helper = &ebpfFlow{
bpfObjs: &p.bpfObjs,
dev: link,
}
}
}

opts := probe.BatchMetricsOpts{
Namespace: probe.MetricsNamespace,
Subsystem: probeName,
Expand All @@ -66,46 +228,28 @@ func metricsProbeCreator(args flowArgs) (probe.MetricsProbe, error) {

type metricsProbe struct {
bpfObjs bpfObjects
args flowArgs
helper linkFlowHelper
}

func (p *metricsProbe) Start(_ context.Context) error {
//TODO watch every netns create/destroy
if err := p.loadAndAttachBPF(); err != nil {
if err := p.loadBPF(); err != nil {
var verifierError *ebpf.VerifierError
log.Error("failed load ebpf program", err)
if errors.As(err, &verifierError) {
log.Warn("detail", strings.Join(verifierError.Log, "\n"))
}

return err
}

return nil
return p.helper.start()
}

func (p *metricsProbe) Stop(_ context.Context) error {
return p.cleanup()
}

func (p *metricsProbe) cleanup() error {
//TODO only clean qdisc after replace qdisc successfully
link, err := netlink.LinkByName(p.args.Dev)
if err == nil {
_ = cleanQdisc(link)
if err := p.helper.stop(); err != nil {
return err
}
return p.bpfObjs.Close()
}

func toIPString(addr uint32) string {
var bytes [4]byte
bytes[0] = byte(addr & 0xff)
bytes[1] = byte(addr >> 8 & 0xff)
bytes[2] = byte(addr >> 16 & 0xff)
bytes[3] = byte(addr >> 24 & 0xff)
return fmt.Sprintf("%d.%d.%d.%d", bytes[0], bytes[1], bytes[2], bytes[3])
}

func (p *metricsProbe) collectOnce(emit probe.Emit) error {
htons := func(port uint16) uint16 {
data := make([]byte, 2)
Expand Down Expand Up @@ -156,7 +300,60 @@ func (p *metricsProbe) collectOnce(emit probe.Emit) error {
return nil
}

func (p *metricsProbe) setupTCFilter(link netlink.Link) error {
func (p *metricsProbe) loadBPF() error {
if err := rlimit.RemoveMemlock(); err != nil {
return fmt.Errorf("remove limit failed: %s", err.Error())
}

opts := ebpf.CollectionOptions{}

opts.Programs = ebpf.ProgramOptions{
KernelTypes: bpfutil.LoadBTFSpecOrNil(),
}

// Load pre-compiled programs and maps into the kernel.
if err := loadBpfObjects(&p.bpfObjs, &opts); err != nil {
return fmt.Errorf("failed loading objects: %w", err)
}
return nil
}

type ebpfFlow struct {
dev netlink.Link
bpfObjs *bpfObjects
}

func (f *ebpfFlow) start() error {
err := f.attachBPF()
if err != nil {
log.Errorf("%s failed attach ebpf to dev %s, cleanup", probeName, f.dev)
_ = f.cleanup()
}
return err
}

func (f *ebpfFlow) stop() error {
err := f.cleanup()
if err != nil {
log.Errorf("failed stop flow on dev %s", f.dev.Attrs().Name)
}
return err
}

func (f *ebpfFlow) cleanup() error {
return cleanQdisc(f.dev)
}

func toIPString(addr uint32) string {
var bytes [4]byte
bytes[0] = byte(addr & 0xff)
bytes[1] = byte(addr >> 8 & 0xff)
bytes[2] = byte(addr >> 16 & 0xff)
bytes[3] = byte(addr >> 24 & 0xff)
return fmt.Sprintf("%d.%d.%d.%d", bytes[0], bytes[1], bytes[2], bytes[3])
}

func (f *ebpfFlow) setupTCFilter(link netlink.Link) error {
if err := replaceQdisc(link); err != nil {
return errors.Wrapf(err, "failed replace qdics clsact for dev %s", link.Attrs().Name)
}
Expand All @@ -169,11 +366,11 @@ func (p *metricsProbe) setupTCFilter(link netlink.Link) error {
case ingress:
directionName = "ingress"
filterParent = netlink.HANDLE_MIN_INGRESS
prog = p.bpfObjs.bpfPrograms.TcIngress
prog = f.bpfObjs.bpfPrograms.TcIngress
case egress:
directionName = "egress"
filterParent = netlink.HANDLE_MIN_EGRESS
prog = p.bpfObjs.bpfPrograms.TcEgress
prog = f.bpfObjs.bpfPrograms.TcEgress
default:
return fmt.Errorf("invalid direction value: %d", direction)
}
Expand Down Expand Up @@ -206,37 +403,9 @@ func (p *metricsProbe) setupTCFilter(link netlink.Link) error {
return nil
}

func (p *metricsProbe) loadBPF() error {
if err := rlimit.RemoveMemlock(); err != nil {
return fmt.Errorf("remove limit failed: %s", err.Error())
}

opts := ebpf.CollectionOptions{}

opts.Programs = ebpf.ProgramOptions{
LogLevel: ebpf.LogLevelInstruction | ebpf.LogLevelBranch | ebpf.LogLevelStats,
KernelTypes: bpfutil.LoadBTFSpecOrNil(),
}

// Load pre-compiled programs and maps into the kernel.
if err := loadBpfObjects(&p.bpfObjs, &opts); err != nil {
return fmt.Errorf("failed loading objects: %w", err)
}
return nil
}

func (p *metricsProbe) loadAndAttachBPF() error {
eth0, err := netlink.LinkByName(p.args.Dev)
if err != nil {
return fmt.Errorf("fail get link %s, err: %w", p.args.Dev, err)
}

if err := p.loadBPF(); err != nil {
return err
}

if err := p.setupTCFilter(eth0); err != nil {
return fmt.Errorf("failed replace %s qdisc with clsact, err: %v", p.args.Dev, err)
func (f *ebpfFlow) attachBPF() error {
if err := f.setupTCFilter(f.dev); err != nil {
return fmt.Errorf("failed replace %s qdisc with clsact, err: %v", f.dev, err)
}
return nil
}
Expand Down
Loading

0 comments on commit 80452b1

Please sign in to comment.