Skip to content

Commit

Permalink
feat: add DHT-specific resource manager limits
Browse files Browse the repository at this point in the history
  • Loading branch information
aschmahmann authored and hacdias committed Feb 13, 2024
1 parent 2fdbd1d commit 361f35b
Show file tree
Hide file tree
Showing 2 changed files with 121 additions and 12 deletions.
119 changes: 118 additions & 1 deletion rcmgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,52 @@ import (
"log"

"github.com/dustin/go-humanize"
"github.com/pbnjay/memory"

"github.com/ipfs/rainbow/internal/fd"
"github.com/libp2p/go-libp2p"
"github.com/libp2p/go-libp2p/core/network"
rcmgr "github.com/libp2p/go-libp2p/p2p/host/resource-manager"
"github.com/pbnjay/memory"
)

// Note: this comes from kubo/core/node/libp2p/rcmgr_defaults.go with minimal
// adaptations.

var infiniteResourceLimits = rcmgr.InfiniteLimits.ToPartialLimitConfig().System

func makeResourceMgrs(maxMemory uint64, maxFD int, connMgrHighWater int, separateDHT bool) (bitswapHost, dhtHost network.ResourceManager, err error) {
if maxMemory == 0 {
maxMemory = uint64((float64(memory.TotalMemory()) * 0.85))
}
if maxFD == 0 {
maxFD = fd.GetNumFDs() / 2
}

if !separateDHT {
mgr, err := rcmgr.NewResourceManager(rcmgr.NewFixedLimiter(makeResourceManagerConfig(maxMemory, maxFD, connMgrHighWater)))
if err != nil {
return nil, nil, err
}
return mgr, nil, nil
}

bitswapHostMem := uint64(float64(maxMemory) * 0.85)
bitswapHostFDs := int(float64(maxFD) * 0.75)
bitswapHostRcMgr, err := rcmgr.NewResourceManager(rcmgr.NewFixedLimiter(makeResourceManagerConfig(bitswapHostMem, bitswapHostFDs, connMgrHighWater)))
if err != nil {
return nil, nil, err
}

dhtHostMem := maxMemory - bitswapHostMem
dhtHostFDs := maxFD - bitswapHostFDs
dhtHostRcMgr, err := rcmgr.NewResourceManager(rcmgr.NewFixedLimiter(makeSeparateDHTClientResourceManagerConfig(dhtHostMem, dhtHostFDs)))
if err != nil {
return nil, nil, err
}

return bitswapHostRcMgr, dhtHostRcMgr, nil
}

func makeResourceManagerConfig(maxMemory uint64, maxFD int, connMgrHighWater int) (limitConfig rcmgr.ConcreteLimitConfig) {
if maxMemory == 0 {
maxMemory = uint64((float64(memory.TotalMemory()) * 0.85))
Expand Down Expand Up @@ -142,3 +177,85 @@ go-libp2p Resource Manager limits based on:
// We already have a complete value thus pass in an empty ConcreteLimitConfig.
return partialLimits.Build(rcmgr.ConcreteLimitConfig{})
}

func makeSeparateDHTClientResourceManagerConfig(maxMemory uint64, maxFD int) (limitConfig rcmgr.ConcreteLimitConfig) {
// Being a DHT client should require very limited inbound connections or streams so we set those very low
systemConnsInbound := 30
systemStreamsPerPeerInbound := 10

// For simplicity we set as much else to unlimited as possible
partialLimits := rcmgr.PartialLimitConfig{
System: rcmgr.ResourceLimits{
Memory: rcmgr.LimitVal64(maxMemory),
FD: rcmgr.LimitVal(maxFD),

Conns: rcmgr.Unlimited,
ConnsInbound: rcmgr.LimitVal(systemConnsInbound),
ConnsOutbound: rcmgr.Unlimited,

Streams: rcmgr.Unlimited,
StreamsOutbound: rcmgr.Unlimited,
StreamsInbound: rcmgr.Unlimited,
},

// Transient connections won't cause any memory to be accounted for by the resource manager/accountant.
// Only established connections do.
// As a result, we can't rely on System.Memory to protect us from a bunch of transient connection being opened.
// We limit the same values as the System scope, but only allow the Transient scope to take 25% of what is allowed for the System scope.
Transient: rcmgr.ResourceLimits{
Memory: rcmgr.LimitVal64(maxMemory / 4),
FD: rcmgr.LimitVal(maxFD / 4),

Conns: rcmgr.Unlimited,
ConnsInbound: rcmgr.LimitVal(systemConnsInbound / 4),
ConnsOutbound: rcmgr.Unlimited,

Streams: rcmgr.Unlimited,
StreamsInbound: rcmgr.Unlimited,
StreamsOutbound: rcmgr.Unlimited,
},

AllowlistedSystem: infiniteResourceLimits,
AllowlistedTransient: infiniteResourceLimits,
ServiceDefault: infiniteResourceLimits,
ServicePeerDefault: infiniteResourceLimits,
ProtocolDefault: infiniteResourceLimits,
ProtocolPeerDefault: infiniteResourceLimits,
Conn: infiniteResourceLimits,
Stream: infiniteResourceLimits,

// Limit the resources consumed by a peer.
// This doesn't protect us against intentional DoS attacks since an attacker can easily spin up multiple peers.
// We specify this limit against unintentional DoS attacks (e.g., a peer has a bug and is sending too much traffic intentionally).
// In that case we want to keep that peer's resource consumption contained.
// To keep this simple, we only constrain inbound connections and streams.
PeerDefault: rcmgr.ResourceLimits{
Memory: rcmgr.Unlimited64,
FD: rcmgr.Unlimited,
Conns: rcmgr.Unlimited,
ConnsInbound: rcmgr.DefaultLimit,
ConnsOutbound: rcmgr.Unlimited,
Streams: rcmgr.Unlimited,
StreamsInbound: rcmgr.LimitVal(systemStreamsPerPeerInbound),
StreamsOutbound: rcmgr.Unlimited,
},
}

scalingLimitConfig := rcmgr.DefaultLimits
libp2p.SetDefaultServiceLimits(&scalingLimitConfig)

// Anything set above in partialLimits that had a value of rcmgr.DefaultLimit will be overridden.
// Anything in scalingLimitConfig that wasn't defined in partialLimits above will be added (e.g., libp2p's default service limits).
partialLimits = partialLimits.Build(scalingLimitConfig.Scale(int64(maxMemory), maxFD)).ToPartialLimitConfig()

log.Printf(`
go-libp2p Separate DHT Resource Manager limits based on:
- --max-memory: %s
- --max-fd: %d
`, humanize.Bytes(maxMemory), maxFD)

// We already have a complete value thus pass in an empty ConcreteLimitConfig.
return partialLimits.Build(rcmgr.ConcreteLimitConfig{})
}
14 changes: 3 additions & 11 deletions setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ import (
"github.com/libp2p/go-libp2p/core/metrics"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/routing"
rcmgr "github.com/libp2p/go-libp2p/p2p/host/resource-manager"
"github.com/libp2p/go-libp2p/p2p/net/connmgr"
"github.com/multiformats/go-multiaddr"
"go.opencensus.io/stats/view"
Expand Down Expand Up @@ -118,8 +117,7 @@ func Setup(ctx context.Context, cfg Config, key crypto.PrivKey, dnsCache *cached
return nil, err
}

limiter := rcmgr.NewFixedLimiter(makeResourceManagerConfig(cfg.MaxMemory, cfg.MaxFD, cfg.ConnMgrHi))
mgr, err := rcmgr.NewResourceManager(limiter)
bitswapRcMgr, dhtRcMgr, err := makeResourceMgrs(cfg.MaxMemory, cfg.MaxFD, cfg.ConnMgrHi, !cfg.DHTSharedHost)
if err != nil {
return nil, err
}
Expand All @@ -132,7 +130,7 @@ func Setup(ctx context.Context, cfg Config, key crypto.PrivKey, dnsCache *cached
libp2p.BandwidthReporter(bwc),
libp2p.DefaultTransports,
libp2p.DefaultMuxers,
libp2p.ResourceManager(mgr),
libp2p.ResourceManager(bitswapRcMgr),
}

if len(cfg.AnnounceAddrs) > 0 {
Expand Down Expand Up @@ -199,18 +197,12 @@ func Setup(ctx context.Context, cfg Config, key crypto.PrivKey, dnsCache *cached
if cfg.DHTSharedHost {
dhtHost = h
} else {
dhtLimiter := rcmgr.NewFixedLimiter(makeResourceManagerConfig(cfg.MaxMemory, cfg.MaxFD, cfg.ConnMgrHi))
dhtMgr, err := rcmgr.NewResourceManager(dhtLimiter)
if err != nil {
return nil, err
}

dhtHost, err = libp2p.New(
libp2p.NoListenAddrs,
libp2p.BandwidthReporter(bwc),
libp2p.DefaultTransports,
libp2p.DefaultMuxers,
libp2p.ResourceManager(dhtMgr),
libp2p.ResourceManager(dhtRcMgr),
)
if err != nil {
return nil, err
Expand Down

0 comments on commit 361f35b

Please sign in to comment.