Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[agent-smith] account for egress traffic #4677

Merged
merged 6 commits into from
Jul 9, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
[agent-smith] account for egress traffic
  • Loading branch information
fntlnz committed Jul 5, 2021
commit 495528bd96d5398cf5f454d27c1bd078d494e2be
153 changes: 131 additions & 22 deletions components/ee/agent-smith/pkg/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,13 @@ import (
"sort"
"strconv"
"strings"
"syscall"
"time"
"unsafe"

"github.com/cilium/ebpf/perf"
"github.com/gitpod-io/gitpod/agent-smith/pkg/bpf"
"github.com/gitpod-io/gitpod/agent-smith/pkg/network"
"github.com/gitpod-io/gitpod/agent-smith/pkg/signature"
"github.com/gitpod-io/gitpod/common-go/log"
"github.com/gitpod-io/gitpod/common-go/util"
Expand All @@ -43,10 +46,12 @@ type Smith struct {
Config Config
GitpodAPI gitpod.APIInterface
EnforcementRules map[string]EnforcementRules
EgressTraffic *EgressTraffic
metrics *metrics

notifiedInfringements *lru.Cache
perfHandler chan perfHandlerFunc
pids map[int]time.Time
}

// EgressTraffic configures an upper limit of allowed egress traffic over time
Expand Down Expand Up @@ -303,7 +308,7 @@ func (er EnforcementRules) Validate() error {
}

// Start gets a stream of Infringements from Run and executes a callback on them to apply a Penalty
func (agent *Smith) Start(callback func(InfringingWorkspace, []PenaltyKind)) {
func (agent *Smith) Start(ctx context.Context, callback func(InfringingWorkspace, []PenaltyKind)) {
// todo(fntlnz): do the bpf loading here before running Run so that we have everything sorted out
abpf, err := bpf.LoadAndAttach(agent.Config.ProbePath)

Expand All @@ -313,37 +318,68 @@ func (agent *Smith) Start(callback func(InfringingWorkspace, []PenaltyKind)) {

defer abpf.Close()

agent.cleanupDeadPIDS(ctx)

egressTicker := time.NewTicker(30 * time.Second)
fntlnz marked this conversation as resolved.
Show resolved Hide resolved

for i := 0; i < 10; i++ {
go func(i int) {
for h := range agent.perfHandler {
if h == nil {
continue
}
for {
select {
case <-egressTicker.C:
for p, t := range agent.pids {
infr, err := agent.checkEgressTrafficCallback(strconv.Itoa(p), t)
if err != nil {
log.WithError(err).Warn("error checking egress for pid: %d", p)
continue
}
v, err := getWorkspaceFromProcess(p)
if err != nil {
log.WithError(err).Warn("error getting workspace from process with pid: %d", p)
continue
}
v.Infringements = append(v.Infringements, *infr)
ps, err := agent.Penalize(*v)
if err != nil {
log.WithError(err).WithField("infringement", v).Warn("error while reacting to infringement")
}
alreadyNotified, _ := agent.notifiedInfringements.ContainsOrAdd(v.VID(), nil)
if alreadyNotified {
continue
}
callback(*v, ps)
}
case h := <-agent.perfHandler:
if h == nil {
continue
}

v, err := h()
if err != nil {
log.WithError(err).Warn("error while running perf handler")
}
v, err := h()
if err != nil {
log.WithError(err).Warn("error while running perf handler")
}

// event did not generate an infringement
if v == nil {
continue
}
ps, err := agent.Penalize(*v)
if err != nil {
log.WithError(err).WithField("infringement", v).Warn("error while reacting to infringement")
}
// event did not generate an infringement
if v == nil {
continue
}
ps, err := agent.Penalize(*v)
if err != nil {
log.WithError(err).WithField("infringement", v).Warn("error while reacting to infringement")
}

alreadyNotified, _ := agent.notifiedInfringements.ContainsOrAdd(v.VID(), nil)
if alreadyNotified {
continue
alreadyNotified, _ := agent.notifiedInfringements.ContainsOrAdd(v.VID(), nil)
if alreadyNotified {
continue
}
callback(*v, ps)
case <-ctx.Done():
return
}
callback(*v, ps)
}
}(i)
}

// todo(fntlnz): use a channel to cancel this execution
for {
rec, err := abpf.Read()
if err != nil {
Expand All @@ -357,6 +393,36 @@ func (agent *Smith) Start(callback func(InfringingWorkspace, []PenaltyKind)) {
}
}

func (agent *Smith) cleanupDeadPIDS(ctx context.Context) {
fntlnz marked this conversation as resolved.
Show resolved Hide resolved
ticker := time.NewTicker(30 * time.Second)
fntlnz marked this conversation as resolved.
Show resolved Hide resolved
go func() {
for {
select {
case <-ticker.C:
agent.cleanupDeadPidsCallback()
case <-ctx.Done():
ticker.Stop()
return
}
}
}()
}

func (agent *Smith) cleanupDeadPidsCallback() {
for p, _ := range agent.pids {
process, _ := os.FindProcess(p)
if process == nil {
delete(agent.pids, p)
continue
}

err := process.Signal(syscall.Signal(0))
if err != nil {
delete(agent.pids, p)
}
}
}

// Penalize acts on infringements and e.g. stops pods
func (agent *Smith) Penalize(ws InfringingWorkspace) ([]PenaltyKind, error) {
var remoteURL string
Expand Down Expand Up @@ -526,6 +592,10 @@ func (agent *Smith) processPerfRecord(rec perf.Record) {

// handles an execve event checks if it's infringing
func (agent *Smith) handleExecveEvent(execve Execve) func() (*InfringingWorkspace, error) {
// this is not the exact process startup time
// but for the type of comparison we need to do is enough
agent.pids[execve.TID] = time.Now()

return func() (*InfringingWorkspace, error) {
if agent.Config.Blacklists == nil {
return nil, nil
Expand Down Expand Up @@ -720,3 +790,42 @@ func mergeInfringingWorkspaces(vws []InfringingWorkspace) (vw InfringingWorkspac
func (agent *Smith) RegisterMetrics(reg prometheus.Registerer) error {
return agent.metrics.Register(reg)
}

func (agent *Smith) checkEgressTrafficCallback(pid string, pidCreationTime time.Time) (*Infringement, error) {
if agent.EgressTraffic == nil {
return nil, nil
}
podLifetime := time.Since(pidCreationTime)
resp, err := network.GetEgressTraffic(pid)
if err != nil {
return nil, err
}
if resp <= 0 {
log.WithField("total egress bytes", resp).Warn("GetEgressTraffic returned <= 0 value")
return nil, nil
}

type level struct {
V GradedInfringementKind
T *PerLevelEgressTraffic
}
levels := make([]level, 0, 2)
if agent.EgressTraffic.VeryExcessiveLevel != nil {
levels = append(levels, level{V: GradeKind(InfringementExcessiveEgress, InfringementSeverityVery), T: agent.EgressTraffic.VeryExcessiveLevel})
}
if agent.EgressTraffic.ExcessiveLevel != nil {
levels = append(levels, level{V: GradeKind(InfringementExcessiveEgress, InfringementSeverityAudit), T: agent.EgressTraffic.ExcessiveLevel})
}

dt := int64(podLifetime / time.Duration(agent.EgressTraffic.WindowDuration))
for _, lvl := range levels {
allowance := dt*lvl.T.Threshold.Value() + lvl.T.BaseBudget.Value()
excess := resp - allowance

if excess > 0 {
return &Infringement{Description: fmt.Sprintf("egress traffic is %.3f megabytes over limit", float64(excess)/(1024.0*1024.0)), Kind: lvl.V}, nil
}
}

return nil, nil
}
59 changes: 59 additions & 0 deletions components/ee/agent-smith/pkg/network/egress.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
// Copyright (c) 2021 Gitpod GmbH. All rights reserved.
// Licensed under the Gitpod Enterprise Source Code License,
// See License.enterprise.txt in the project root folder.

package network
fntlnz marked this conversation as resolved.
Show resolved Hide resolved

import (
"encoding/csv"
"fmt"
"io"
"os"
"path"
"strconv"
"strings"
)

func readDeviceEgress(inpt io.Reader, dev string) (total int64, err error) {
csvreader := csv.NewReader(inpt)
csvreader.Comma = ' '
csvreader.FieldsPerRecord = 17
csvreader.TrimLeadingSpace = true

var totalEgress int64 = -1
//nolint:errcheck,staticcheck
for rec, err := csvreader.Read(); rec != nil; rec, err = csvreader.Read() {
if len(rec) < 9 {
continue
}
if !strings.HasPrefix(rec[0], dev) {
continue
}

totalEgress, err = strconv.ParseInt(rec[9], 10, 64)
if err != nil {
return 0, err
}
break
}
if totalEgress < 0 {
return 0, fmt.Errorf("did not find interface")
}

return totalEgress, nil
}

func GetEgressTraffic(pid string) (int64, error) {
file, err := os.OpenFile(path.Join("/proc", pid, "/net/dev"), os.O_RDONLY, 0600)
if err != nil {
return 0, err
}
defer file.Close()

totalEgress, err := readDeviceEgress(file, "eth0")
if err != nil {
return 0, err
}

return totalEgress, nil
}