Skip to content

Commit

Permalink
refactor flow probe with new interface
Browse files Browse the repository at this point in the history
  • Loading branch information
jzwlqx committed Sep 26, 2023
1 parent 0703c04 commit d98f12a
Show file tree
Hide file tree
Showing 3 changed files with 144 additions and 118 deletions.
231 changes: 129 additions & 102 deletions pkg/exporter/probe/flow/flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,13 @@ package flow

import (
"context"
"encoding/binary"
"fmt"
"strings"

probe2 "github.com/alibaba/kubeskoop/pkg/exporter/probe"
"github.com/prometheus/client_golang/prometheus"

"github.com/alibaba/kubeskoop/pkg/exporter/probe"

"github.com/alibaba/kubeskoop/pkg/exporter/bpfutil"
"github.com/cilium/ebpf"
Expand All @@ -18,109 +21,134 @@ import (

//go:generate go run github.com/cilium/ebpf/cmd/bpf2go -cc clang -cflags $BPF_CFLAGS bpf ../../../../bpf/flow.c -- -I../../../../bpf/headers -D__TARGET_ARCH_x86

type Probe struct {
enable bool
}

type Direction int
type direction int

const (
ModuleName = "flow"
Ingress Direction = 0
Egress Direction = 1
ingress direction = 0
egress direction = 1

metricsBytes = "bytes"
metricsPackets = "packets"
)

var (
dev = "eth0"
bpfObjs = bpfObjects{}
dev = "eth0"
probeName = "flow"
)

func (f *Probe) Start(_ context.Context, _ probe2.Type) {
log.Info("flow probe starting...")
func init() {
probe.MustRegisterMetricsProbe(probeName, metricsProbeCreator)
}

eth0, err := netlink.LinkByName(dev)
if err != nil {
log.Error("fail get link eth0", err)
return
func metricsProbeCreator(_ map[string]interface{}) (probe.MetricsProbe, error) {
p := &metricsProbe{}
opts := probe.BatchMetricsOpts{
Namespace: probe.MetricsNamespace,
Subsystem: probeName,
VariableLabels: []string{"protocol", "src", "dst", "sport", "dport"},
SingleMetricsOpts: []probe.SingleMetricsOpts{
{Name: metricsBytes, ValueType: prometheus.CounterValue},
{Name: metricsPackets, ValueType: prometheus.CounterValue},
},
}
batchMetrics := probe.NewBatchMetrics(opts, p.collectOnce)
return probe.NewMetricsProbe(probeName, p, batchMetrics), nil
}

type metricsProbe struct {
bpfObjs bpfObjects
}

if err := load(); err != nil {
func (p *metricsProbe) Start(_ context.Context) error {
//TODO watch every netns create/destroy
if err := p.loadAndAttachBPF(); err != nil {
var verifierError *ebpf.VerifierError
log.Error("failed load ebpf program", err)
if errors.As(err, &verifierError) {
log.Warn("detail", strings.Join(verifierError.Log, "\n"))
}
return
}

if err := setupTCFilter(eth0); err != nil {
log.Error("failed replace eth0 qdisc with clsact", err)
return
}

log.Info("finish setup flow ebpf")

//below is just for testing
//toip := func(addr uint32) string {
// var bytes [4]byte
// bytes[0] = byte(addr & 0xff)
// bytes[1] = byte(addr >> 8 & 0xff)
// bytes[2] = byte(addr >> 16 & 0xff)
// bytes[3] = byte(addr >> 24 & 0xff)
// return fmt.Sprintf("%d.%d.%d.%d", bytes[0], bytes[1], bytes[2], bytes[3])
//}
//htons := func(port uint16) uint16 {
// data := make([]byte, 2)
// binary.BigEndian.PutUint16(data, port)
// return binary.LittleEndian.Uint16(data)
//}
//go func() {
// for {
// var values []bpfFlowMetrics
// var key bpfFlowTuple4
// iterator := bpfObjs.bpfMaps.InspFlow4Metrics.Iterate()
// for {
// if !iterator.Next(&key, &values) {
// break
// }
//
// if err := iterator.Err(); err != nil {
// log.Error("failed read map", err)
// break
// }
//
// var val bpfFlowMetrics
// for i := 0; i < len(values); i++ {
// val.Bytes += values[i].Bytes
// val.Packets += values[i].Packets
// }
//
// fmt.Printf("proto: %d %s:%d->%s:%d pkts: %d, bytes: %d\n", key.Proto, toip(key.Src), htons(key.Sport), toip(key.Dst), htons(key.Dport), val.Packets, val.Bytes)
// }
// }
//}()

f.enable = true

return err
}

return nil
}

func (p *metricsProbe) Stop(_ context.Context) error {
return p.cleanup()
}

func setupTCFilter(link netlink.Link) error {
func (p *metricsProbe) cleanup() error {
//TODO only clean qdisc after replace qdisc successfully
link, err := netlink.LinkByName(dev)
if err == nil {
_ = cleanQdisc(link)
}
return p.bpfObjs.Close()
}

func toIPString(addr uint32) string {
var bytes [4]byte
bytes[0] = byte(addr & 0xff)
bytes[1] = byte(addr >> 8 & 0xff)
bytes[2] = byte(addr >> 16 & 0xff)
bytes[3] = byte(addr >> 24 & 0xff)
return fmt.Sprintf("%d.%d.%d.%d", bytes[0], bytes[1], bytes[2], bytes[3])
}

func (p *metricsProbe) collectOnce(emit probe.Emit) error {
htons := func(port uint16) uint16 {
data := make([]byte, 2)
binary.BigEndian.PutUint16(data, port)
return binary.LittleEndian.Uint16(data)
}
var values []bpfFlowMetrics
var key bpfFlowTuple4
iterator := p.bpfObjs.bpfMaps.InspFlow4Metrics.Iterate()

for iterator.Next(&key, &values) {
if err := iterator.Err(); err != nil {
return fmt.Errorf("failed read bpfmap, err: %w", err)
}

var val bpfFlowMetrics
for i := 0; i < len(values); i++ {
val.Bytes += values[i].Bytes
val.Packets += values[i].Packets
}
emit("bytes", []string{}, float64(val.Bytes))
emit("packets", []string{}, float64(val.Packets))

fmt.Printf("proto: %d %s:%d->%s:%d pkts: %d, bytes: %d\n",
key.Proto,
toIPString(key.Src),
htons(key.Sport),
toIPString(key.Dst),
htons(key.Dport),
val.Packets,
val.Bytes)
}
return nil
}

func (p *metricsProbe) setupTCFilter(link netlink.Link) error {
if err := replaceQdisc(link); err != nil {
return errors.Wrapf(err, "failed replace qdics clsact for dev %s", link.Attrs().Name)
}

replaceFilter := func(direction Direction) error {
replaceFilter := func(direction direction) error {
directionName := ""
var filterParent uint32
var prog *ebpf.Program
switch direction {
case Ingress:
case ingress:
directionName = "ingress"
filterParent = netlink.HANDLE_MIN_INGRESS
prog = bpfObjs.bpfPrograms.TcIngress
case Egress:
prog = p.bpfObjs.bpfPrograms.TcIngress
case egress:
directionName = "egress"
filterParent = netlink.HANDLE_MIN_EGRESS
prog = bpfObjs.bpfPrograms.TcEgress
prog = p.bpfObjs.bpfPrograms.TcEgress
default:
return fmt.Errorf("invalid direction value: %d", direction)
}
Expand All @@ -144,16 +172,16 @@ func setupTCFilter(link netlink.Link) error {
return nil
}

if err := replaceFilter(Ingress); err != nil {
if err := replaceFilter(ingress); err != nil {
return errors.Wrapf(err, "cannot set ingress filter for dev %s", link.Attrs().Name)
}
if err := replaceFilter(Egress); err != nil {
if err := replaceFilter(egress); err != nil {
return errors.Wrapf(err, "cannot set egress filter for dev %s", link.Attrs().Name)
}
return nil
}

func load() error {
func (p *metricsProbe) loadBPF() error {
if err := rlimit.RemoveMemlock(); err != nil {
return fmt.Errorf("remove limit failed: %s", err.Error())
}
Expand All @@ -172,46 +200,45 @@ func load() error {
}

// Load pre-compiled programs and maps into the kernel.
if err := loadBpfObjects(&bpfObjs, &opts); err != nil {
if err := loadBpfObjects(&p.bpfObjs, &opts); err != nil {
return fmt.Errorf("failed loading objects: %w", err)
}
return nil
}

func replaceQdisc(link netlink.Link) error {
attrs := netlink.QdiscAttrs{
LinkIndex: link.Attrs().Index,
Handle: netlink.MakeHandle(0xffff, 0),
Parent: netlink.HANDLE_CLSACT,
func (p *metricsProbe) loadAndAttachBPF() error {
eth0, err := netlink.LinkByName(dev)
if err != nil {
return fmt.Errorf("fail get link %s, err: %w", dev, err)
}

qdisc := &netlink.GenericQdisc{
QdiscAttrs: attrs,
QdiscType: "clsact",
if err := p.loadBPF(); err != nil {
return err
}

return netlink.QdiscReplace(qdisc)
}

func (f *Probe) Close(_ probe2.Type) error {
if f.enable {
return bpfObjs.Close()
if err := p.setupTCFilter(eth0); err != nil {
return fmt.Errorf("failed replace %s qdisc with clsact, err: %v", dev, err)
}
return nil
}

func (f *Probe) Ready() bool {
return f.enable
func cleanQdisc(link netlink.Link) error {
return netlink.QdiscDel(clsact(link))
}

func (f *Probe) Name() string {
return ModuleName
}
func clsact(link netlink.Link) netlink.Qdisc {
attrs := netlink.QdiscAttrs{
LinkIndex: link.Attrs().Index,
Handle: netlink.MakeHandle(0xffff, 0),
Parent: netlink.HANDLE_CLSACT,
}

func (f *Probe) GetMetricNames() []string {
return []string{"net_flow"}
return &netlink.GenericQdisc{
QdiscAttrs: attrs,
QdiscType: "clsact",
}
}

func (f *Probe) Collect(_ context.Context) (map[string]map[uint32]uint64, error) {
return map[string]map[uint32]uint64{}, nil
func replaceQdisc(link netlink.Link) error {
return netlink.QdiscReplace(clsact(link))
}
6 changes: 3 additions & 3 deletions pkg/exporter/probe/legacy.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,12 @@ type legacyBatchMetrics struct {

func legacyMetricsName(module string, name string, underscore bool) string {
if underscore {
return fmt.Sprintf("%s_pod_%s_%s", defaultNamespace, module, name)
return fmt.Sprintf("%s_pod_%s_%s", LegacyMetricsNamespace, module, name)
}
return fmt.Sprintf("%s_pod_%s%s", defaultNamespace, module, name)
return fmt.Sprintf("%s_pod_%s%s", LegacyMetricsNamespace, module, name)
}
func newMetricsName(module, name string) string {
return prometheus.BuildFQName(newNamespace, module, name)
return prometheus.BuildFQName(MetricsNamespace, module, name)
}

type LegacyCollector func() (map[string]map[uint32]uint64, error)
Expand Down
Loading

0 comments on commit d98f12a

Please sign in to comment.