Skip to content

Commit

Permalink
use Prometheus directly for rcmgr metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
marten-seemann authored and lidel committed Apr 5, 2022
1 parent b029308 commit dfd9e91
Showing 1 changed file with 153 additions and 101 deletions.
254 changes: 153 additions & 101 deletions core/node/libp2p/rcmgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"os"
"path/filepath"
"strconv"
"strings"

config "github.com/ipfs/go-ipfs/config"
Expand All @@ -17,8 +18,7 @@ import (
"github.com/libp2p/go-libp2p-core/protocol"
rcmgr "github.com/libp2p/go-libp2p-resource-manager"

"go.opencensus.io/stats"
"go.opencensus.io/tag"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/fx"
)

Expand Down Expand Up @@ -379,138 +379,190 @@ func NetSetLimit(mgr network.ResourceManager, scope string, limit config.Resourc
}

var (
ServiceID, _ = tag.NewKey("svc")
ProtocolID, _ = tag.NewKey("proto")
Direction, _ = tag.NewKey("direction")
UseFD, _ = tag.NewKey("use_fd")
PeerID, _ = tag.NewKey("peer_id")
rcmgrConnAllowed *prometheus.CounterVec
rcmgrConnBlocked *prometheus.CounterVec
rcmgrStreamAllowed *prometheus.CounterVec
rcmgrStreamBlocked *prometheus.CounterVec
rcmgrPeerAllowed prometheus.Counter
rcmgrPeerBlocked prometheus.Counter
rcmgrProtocolAllowed prometheus.Counter
rcmgrProtocolBlocked prometheus.Counter
rcmgrProtocolPeerBlocked prometheus.Counter
rcmgrServiceAllowed prometheus.Counter
rcmgrServiceBlocked prometheus.Counter
rcmgrServicePeerBlocked prometheus.Counter
rcmgrMemoryAllowed prometheus.Counter
rcmgrMemoryBlocked prometheus.Counter
)

var (
RcmgrAllowConn = stats.Int64("rcmgr/allow_conn", "Number of allowed connections", stats.UnitDimensionless)
RcmgrBlockConn = stats.Int64("rcmgr/block_conn", "Number of blocked connections", stats.UnitDimensionless)
RcmgrAllowStream = stats.Int64("rcmgr/allow_stream", "Number of allowed streams", stats.UnitDimensionless)
RcmgrBlockStream = stats.Int64("rcmgr/block_stream", "Number of blocked streams", stats.UnitDimensionless)
RcmgrAllowPeer = stats.Int64("rcmgr/allow_peer", "Number of allowed peer connections", stats.UnitDimensionless)
RcmgrBlockPeer = stats.Int64("rcmgr/block_peer", "Number of blocked peer connections", stats.UnitDimensionless)
RcmgrAllowProto = stats.Int64("rcmgr/allow_proto", "Number of allowed streams attached to a protocol", stats.UnitDimensionless)
RcmgrBlockProto = stats.Int64("rcmgr/block_proto", "Number of blocked blocked streams attached to a protocol", stats.UnitDimensionless)
RcmgrBlockProtoPeer = stats.Int64("rcmgr/block_proto", "Number of blocked blocked streams attached to a protocol for a specific peer", stats.UnitDimensionless)
RcmgrAllowSvc = stats.Int64("rcmgr/allow_svc", "Number of allowed streams attached to a service", stats.UnitDimensionless)
RcmgrBlockSvc = stats.Int64("rcmgr/block_svc", "Number of blocked blocked streams attached to a service", stats.UnitDimensionless)
RcmgrBlockSvcPeer = stats.Int64("rcmgr/block_svc", "Number of blocked blocked streams attached to a service for a specific peer", stats.UnitDimensionless)
RcmgrAllowMem = stats.Int64("rcmgr/allow_mem", "Number of allowed memory reservations", stats.UnitDimensionless)
RcmgrBlockMem = stats.Int64("rcmgr/block_mem", "Number of blocked memory reservations", stats.UnitDimensionless)
)
func init() {
const (
direction = "direction"
usesFD = "usesFD"
)

rcmgrConnAllowed = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "libp2p_rcmgr_conns_allowed_total",
Help: "allowed connections",
},
[]string{direction, usesFD},
)
prometheus.MustRegister(rcmgrConnAllowed)

rcmgrConnBlocked = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "libp2p_rcmgr_conns_blocked_total",
Help: "blocked connections",
},
[]string{direction, usesFD},
)
prometheus.MustRegister(rcmgrConnBlocked)

rcmgrStreamAllowed = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "libp2p_rcmgr_streams_allowed_total",
Help: "allowed streams",
},
[]string{direction},
)
prometheus.MustRegister(rcmgrStreamAllowed)

rcmgrStreamBlocked = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "libp2p_rcmgr_streams_blocked_total",
Help: "blocked streams",
},
[]string{direction},
)
prometheus.MustRegister(rcmgrStreamBlocked)

rcmgrPeerAllowed = prometheus.NewCounter(prometheus.CounterOpts{
Name: "libp2p_rcmgr_peers_allowed_total",
Help: "allowed peers",
})
prometheus.MustRegister(rcmgrPeerAllowed)

rcmgrPeerBlocked = prometheus.NewCounter(prometheus.CounterOpts{
Name: "libp2p_rcmgr_peer_blocked_total",
Help: "blocked peers",
})
prometheus.MustRegister(rcmgrPeerBlocked)

rcmgrProtocolAllowed = prometheus.NewCounter(prometheus.CounterOpts{
Name: "libp2p_rcmgr_protocols_allowed_total",
Help: "allowed streams attached to a protocol",
})
prometheus.MustRegister(rcmgrProtocolAllowed)

rcmgrProtocolBlocked = prometheus.NewCounter(prometheus.CounterOpts{
Name: "libp2p_rcmgr_protocols_blocked_total",
Help: "blocked streams attached to a protocol",
})
prometheus.MustRegister(rcmgrProtocolBlocked)

rcmgrProtocolPeerBlocked = prometheus.NewCounter(prometheus.CounterOpts{
Name: "libp2p_rcmgr_protocols_for_peer_blocked_total",
Help: "blocked streams attached to a protocol for a specific peer",
})
prometheus.MustRegister(rcmgrProtocolPeerBlocked)

rcmgrServiceAllowed = prometheus.NewCounter(prometheus.CounterOpts{
Name: "libp2p_rcmgr_services_allowed_total",
Help: "allowed streams attached to a service",
})
prometheus.MustRegister(rcmgrServiceAllowed)

rcmgrServiceBlocked = prometheus.NewCounter(prometheus.CounterOpts{
Name: "libp2p_rcmgr_services_blocked_total",
Help: "blocked streams attached to a service",
})
prometheus.MustRegister(rcmgrServiceBlocked)

rcmgrServicePeerBlocked = prometheus.NewCounter(prometheus.CounterOpts{
Name: "libp2p_rcmgr_service_for_peer_blocked_total",
Help: "blocked streams attached to a service for a specific peer",
})
prometheus.MustRegister(rcmgrServicePeerBlocked)

rcmgrMemoryAllowed = prometheus.NewCounter(prometheus.CounterOpts{
Name: "libp2p_rcmgr_memory_allocations_allowed_total",
Help: "allowed memory allocations",
})
prometheus.MustRegister(rcmgrMemoryAllowed)

rcmgrMemoryBlocked = prometheus.NewCounter(prometheus.CounterOpts{
Name: "libp2p_rcmgr_memory_allocations_blocked_total",
Help: "blocked memory allocations",
})
prometheus.MustRegister(rcmgrMemoryBlocked)
}

type rcmgrMetrics struct{}

func (r rcmgrMetrics) AllowConn(dir network.Direction, usefd bool) {
ctx := context.Background()
if dir == network.DirInbound {
ctx, _ = tag.New(ctx, tag.Upsert(Direction, "inbound"))
} else {
ctx, _ = tag.New(ctx, tag.Upsert(Direction, "outbound"))
}
if usefd {
ctx, _ = tag.New(ctx, tag.Upsert(UseFD, "true"))
} else {
ctx, _ = tag.New(ctx, tag.Upsert(UseFD, "false"))
func getDirection(d network.Direction) string {
switch d {
default:
return ""
case network.DirInbound:
return "inbound"
case network.DirOutbound:
return "outbound"
}
stats.Record(ctx, RcmgrAllowConn.M(1))
}

func (r rcmgrMetrics) AllowConn(dir network.Direction, usefd bool) {
rcmgrConnAllowed.WithLabelValues(getDirection(dir), strconv.FormatBool(usefd)).Add(1)
}

func (r rcmgrMetrics) BlockConn(dir network.Direction, usefd bool) {
ctx := context.Background()
if dir == network.DirInbound {
ctx, _ = tag.New(ctx, tag.Upsert(Direction, "inbound"))
} else {
ctx, _ = tag.New(ctx, tag.Upsert(Direction, "outbound"))
}
if usefd {
ctx, _ = tag.New(ctx, tag.Upsert(UseFD, "true"))
} else {
ctx, _ = tag.New(ctx, tag.Upsert(UseFD, "false"))
}
stats.Record(ctx, RcmgrBlockConn.M(1))
rcmgrConnBlocked.WithLabelValues(getDirection(dir), strconv.FormatBool(usefd)).Add(1)
}

func (r rcmgrMetrics) AllowStream(p peer.ID, dir network.Direction) {
ctx := context.Background()
if dir == network.DirInbound {
ctx, _ = tag.New(ctx, tag.Upsert(Direction, "inbound"))
} else {
ctx, _ = tag.New(ctx, tag.Upsert(Direction, "outbound"))
}
ctx, _ = tag.New(ctx, tag.Upsert(PeerID, p.Pretty()))
stats.Record(ctx, RcmgrAllowStream.M(1))
func (r rcmgrMetrics) AllowStream(_ peer.ID, dir network.Direction) {
rcmgrStreamAllowed.WithLabelValues(getDirection(dir)).Add(1)
}

func (r rcmgrMetrics) BlockStream(p peer.ID, dir network.Direction) {
ctx := context.Background()
if dir == network.DirInbound {
ctx, _ = tag.New(ctx, tag.Upsert(Direction, "inbound"))
} else {
ctx, _ = tag.New(ctx, tag.Upsert(Direction, "outbound"))
}
ctx, _ = tag.New(ctx, tag.Upsert(PeerID, p.Pretty()))
stats.Record(ctx, RcmgrBlockStream.M(1))
func (r rcmgrMetrics) BlockStream(_ peer.ID, dir network.Direction) {
rcmgrStreamBlocked.WithLabelValues(getDirection(dir)).Add(1)
}

func (r rcmgrMetrics) AllowPeer(p peer.ID) {
ctx := context.Background()
ctx, _ = tag.New(ctx, tag.Upsert(PeerID, p.Pretty()))
stats.Record(ctx, RcmgrAllowPeer.M(1))
func (r rcmgrMetrics) AllowPeer(_ peer.ID) {
rcmgrPeerAllowed.Add(1)
}

func (r rcmgrMetrics) BlockPeer(p peer.ID) {
ctx := context.Background()
ctx, _ = tag.New(ctx, tag.Upsert(PeerID, p.Pretty()))
stats.Record(ctx, RcmgrBlockPeer.M(1))
func (r rcmgrMetrics) BlockPeer(_ peer.ID) {
rcmgrPeerBlocked.Add(1)
}

func (r rcmgrMetrics) AllowProtocol(proto protocol.ID) {
ctx := context.Background()
ctx, _ = tag.New(ctx, tag.Upsert(ProtocolID, string(proto)))
stats.Record(ctx, RcmgrAllowProto.M(1))
func (r rcmgrMetrics) AllowProtocol(_ protocol.ID) {
rcmgrProtocolAllowed.Add(1)
}

func (r rcmgrMetrics) BlockProtocol(proto protocol.ID) {
ctx := context.Background()
ctx, _ = tag.New(ctx, tag.Upsert(ProtocolID, string(proto)))
stats.Record(ctx, RcmgrBlockProto.M(1))
func (r rcmgrMetrics) BlockProtocol(_ protocol.ID) {
rcmgrProtocolBlocked.Add(1)
}

func (r rcmgrMetrics) BlockProtocolPeer(proto protocol.ID, p peer.ID) {
ctx := context.Background()
ctx, _ = tag.New(ctx, tag.Upsert(ProtocolID, string(proto)))
ctx, _ = tag.New(ctx, tag.Upsert(PeerID, p.Pretty()))
stats.Record(ctx, RcmgrBlockProtoPeer.M(1))
func (r rcmgrMetrics) BlockProtocolPeer(_ protocol.ID, _ peer.ID) {
rcmgrProtocolPeerBlocked.Add(1)
}

func (r rcmgrMetrics) AllowService(svc string) {
ctx := context.Background()
ctx, _ = tag.New(ctx, tag.Upsert(ServiceID, svc))
stats.Record(ctx, RcmgrAllowSvc.M(1))
rcmgrServiceAllowed.Add(1)
}

func (r rcmgrMetrics) BlockService(svc string) {
ctx := context.Background()
ctx, _ = tag.New(ctx, tag.Upsert(ServiceID, svc))
stats.Record(ctx, RcmgrBlockSvc.M(1))
rcmgrServiceBlocked.Add(1)
}

func (r rcmgrMetrics) BlockServicePeer(svc string, p peer.ID) {
ctx := context.Background()
ctx, _ = tag.New(ctx, tag.Upsert(ServiceID, svc))
ctx, _ = tag.New(ctx, tag.Upsert(PeerID, p.Pretty()))
stats.Record(ctx, RcmgrBlockSvcPeer.M(1))
func (r rcmgrMetrics) BlockServicePeer(_ string, _ peer.ID) {
rcmgrServicePeerBlocked.Add(1)
}

func (r rcmgrMetrics) AllowMemory(size int) {
stats.Record(context.Background(), RcmgrAllowMem.M(1))
func (r rcmgrMetrics) AllowMemory(_ int) {
rcmgrMemoryAllowed.Add(1)
}

func (r rcmgrMetrics) BlockMemory(size int) {
stats.Record(context.Background(), RcmgrBlockMem.M(1))
func (r rcmgrMetrics) BlockMemory(_ int) {
rcmgrMemoryBlocked.Add(1)
}

0 comments on commit dfd9e91

Please sign in to comment.