Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rcmgr: move StatsTraceReporter to rcmgr package #2388

Merged
merged 1 commit into from
Jun 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
blankhost "github.com/libp2p/go-libp2p/p2p/host/blank"
"github.com/libp2p/go-libp2p/p2p/host/eventbus"
"github.com/libp2p/go-libp2p/p2p/host/peerstore/pstoremem"
rcmgrObs "github.com/libp2p/go-libp2p/p2p/host/resource-manager/obs"
rcmgr "github.com/libp2p/go-libp2p/p2p/host/resource-manager"
routed "github.com/libp2p/go-libp2p/p2p/host/routed"
"github.com/libp2p/go-libp2p/p2p/net/swarm"
tptu "github.com/libp2p/go-libp2p/p2p/net/upgrader"
Expand Down Expand Up @@ -301,7 +301,7 @@ func (cfg *Config) NewNode() (host.Host, error) {
}

if !cfg.DisableMetrics {
rcmgrObs.MustRegisterWith(cfg.PrometheusRegisterer)
rcmgr.MustRegisterWith(cfg.PrometheusRegisterer)
}

h, err := bhost.NewHost(swrm, &bhost.HostOpts{
Expand Down
3 changes: 1 addition & 2 deletions dashboards/resource-manager/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,12 @@ option libp2p.PrometheusRegisterer. For example:
import (
// ...
rcmgr "github.com/libp2p/go-libp2p/p2p/host/resource-manager"
rcmgrObs "github.com/libp2p/go-libp2p/p2p/host/resource-manager/obs"

"github.com/prometheus/client_golang/prometheus"
)

func SetupResourceManager() (network.ResourceManager, error) {
str, err := rcmgrObs.NewStatsTraceReporter()
str, err := rcmgr.NewStatsTraceReporter()
if err != nil {
return nil, err
}
Expand Down
4 changes: 2 additions & 2 deletions p2p/host/resource-manager/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@ limits := cfg.Build(scaledDefaultLimits)
limiter := rcmgr.NewFixedLimiter(limits)

// (Optional if you want metrics)
rcmgrObs.MustRegisterWith(prometheus.DefaultRegisterer)
str, err := rcmgrObs.NewStatsTraceReporter()
rcmgr.MustRegisterWith(prometheus.DefaultRegisterer)
str, err := rcmgr.NewStatsTraceReporter()
if err != nil {
panic(err)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,33 +1,32 @@
//go:build nocover

package obs
package rcmgr

import (
"math/rand"
"sync"
"testing"
"time"

rcmgr "github.com/libp2p/go-libp2p/p2p/host/resource-manager"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/require"
)

func randomTraceEvt(rng *rand.Rand) rcmgr.TraceEvt {
func randomTraceEvt(rng *rand.Rand) TraceEvt {
// Possibly non-sensical
typs := []rcmgr.TraceEvtTyp{
rcmgr.TraceStartEvt,
rcmgr.TraceCreateScopeEvt,
rcmgr.TraceDestroyScopeEvt,
rcmgr.TraceReserveMemoryEvt,
rcmgr.TraceBlockReserveMemoryEvt,
rcmgr.TraceReleaseMemoryEvt,
rcmgr.TraceAddStreamEvt,
rcmgr.TraceBlockAddStreamEvt,
rcmgr.TraceRemoveStreamEvt,
rcmgr.TraceAddConnEvt,
rcmgr.TraceBlockAddConnEvt,
rcmgr.TraceRemoveConnEvt,
typs := []TraceEvtTyp{
TraceStartEvt,
TraceCreateScopeEvt,
TraceDestroyScopeEvt,
TraceReserveMemoryEvt,
TraceBlockReserveMemoryEvt,
TraceReleaseMemoryEvt,
TraceAddStreamEvt,
TraceBlockAddStreamEvt,
TraceRemoveStreamEvt,
TraceAddConnEvt,
TraceBlockAddConnEvt,
TraceRemoveConnEvt,
}

names := []string{
Expand All @@ -43,7 +42,7 @@ func randomTraceEvt(rng *rand.Rand) rcmgr.TraceEvt {
"service:libp2p.autonat.peer:12D3Koo",
}

return rcmgr.TraceEvt{
return TraceEvt{
Type: typs[rng.Intn(len(typs))],
Name: names[rng.Intn(len(names))],
DeltaOut: rng.Intn(5),
Expand All @@ -60,7 +59,7 @@ func randomTraceEvt(rng *rand.Rand) rcmgr.TraceEvt {

}

var registerOnce sync.Once
var regOnce sync.Once

func BenchmarkMetricsRecording(b *testing.B) {
b.ReportAllocs()
Expand All @@ -70,7 +69,7 @@ func BenchmarkMetricsRecording(b *testing.B) {
})

evtCount := 10000
evts := make([]rcmgr.TraceEvt, evtCount)
evts := make([]TraceEvt, evtCount)
rng := rand.New(rand.NewSource(int64(b.N)))
for i := 0; i < evtCount; i++ {
evts[i] = randomTraceEvt(rng)
Expand All @@ -92,7 +91,7 @@ func TestNoAllocsNoCover(t *testing.T) {
require.NoError(t, err)

evtCount := 10_000
evts := make([]rcmgr.TraceEvt, 0, evtCount)
evts := make([]TraceEvt, 0, evtCount)
rng := rand.New(rand.NewSource(1))

for i := 0; i < evtCount; i++ {
Expand Down
18 changes: 18 additions & 0 deletions p2p/host/resource-manager/obs/obs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
// Package obs implements metrics tracing for resource manager
//
// Deprecated: obs is deprecated and the exported types and methods
// are moved to rcmgr package. Use the corresponding identifier in
// the rcmgr package, for example
// obs.NewStatsTraceReporter => rcmgr.NewStatsTraceReporter
package obs

import (
rcmgr "github.com/libp2p/go-libp2p/p2p/host/resource-manager"
)

var MustRegisterWith = rcmgr.MustRegisterWith

// StatsTraceReporter reports stats on the resource manager using its traces.
type StatsTraceReporter = rcmgr.StatsTraceReporter

var NewStatsTraceReporter = rcmgr.NewStatsTraceReporter
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
package obs
package rcmgr

import (
"strings"

rcmgr "github.com/libp2p/go-libp2p/p2p/host/resource-manager"
"github.com/libp2p/go-libp2p/p2p/metricshelper"
"github.com/prometheus/client_golang/prometheus"
)
Expand Down Expand Up @@ -74,7 +73,7 @@ var (
previousPeerStreamsOutbound = previousPeerStreams.With(prometheus.Labels{"dir": "outbound"})

// Memory
memory = prometheus.NewGaugeVec(prometheus.GaugeOpts{
memoryTotal = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: metricNamespace,
Name: "memory",
Help: "Amount of memory reserved as reported to the Resource Manager",
Expand Down Expand Up @@ -151,7 +150,7 @@ func MustRegisterWith(reg prometheus.Registerer) {

previousPeerStreams,

memory,
memoryTotal,
peerMemory,
previousPeerMemory,
connMemory,
Expand All @@ -169,18 +168,18 @@ func NewStatsTraceReporter() (StatsTraceReporter, error) {
return StatsTraceReporter{}, nil
}

func (r StatsTraceReporter) ConsumeEvent(evt rcmgr.TraceEvt) {
func (r StatsTraceReporter) ConsumeEvent(evt TraceEvt) {
tags := metricshelper.GetStringSlice()
defer metricshelper.PutStringSlice(tags)

r.consumeEventWithLabelSlice(evt, tags)
}

// Separate func so that we can test that this function does not allocate. The syncPool may allocate.
func (r StatsTraceReporter) consumeEventWithLabelSlice(evt rcmgr.TraceEvt, tags *[]string) {
func (r StatsTraceReporter) consumeEventWithLabelSlice(evt TraceEvt, tags *[]string) {
switch evt.Type {
case rcmgr.TraceAddStreamEvt, rcmgr.TraceRemoveStreamEvt:
if p := rcmgr.PeerStrInScopeName(evt.Name); p != "" {
case TraceAddStreamEvt, TraceRemoveStreamEvt:
if p := PeerStrInScopeName(evt.Name); p != "" {
// Aggregated peer stats. Counts how many peers have N number of streams open.
// Uses two buckets aggregations. One to count how many streams the
// peer has now. The other to count the negative value, or how many
Expand Down Expand Up @@ -210,11 +209,11 @@ func (r StatsTraceReporter) consumeEventWithLabelSlice(evt rcmgr.TraceEvt, tags
}
} else {
if evt.DeltaOut != 0 {
if rcmgr.IsSystemScope(evt.Name) || rcmgr.IsTransientScope(evt.Name) {
if IsSystemScope(evt.Name) || IsTransientScope(evt.Name) {
*tags = (*tags)[:0]
*tags = append(*tags, "outbound", evt.Name, "")
streams.WithLabelValues(*tags...).Set(float64(evt.StreamsOut))
} else if proto := rcmgr.ParseProtocolScopeName(evt.Name); proto != "" {
} else if proto := ParseProtocolScopeName(evt.Name); proto != "" {
*tags = (*tags)[:0]
*tags = append(*tags, "outbound", "protocol", proto)
streams.WithLabelValues(*tags...).Set(float64(evt.StreamsOut))
Expand All @@ -227,11 +226,11 @@ func (r StatsTraceReporter) consumeEventWithLabelSlice(evt rcmgr.TraceEvt, tags
}

if evt.DeltaIn != 0 {
if rcmgr.IsSystemScope(evt.Name) || rcmgr.IsTransientScope(evt.Name) {
if IsSystemScope(evt.Name) || IsTransientScope(evt.Name) {
*tags = (*tags)[:0]
*tags = append(*tags, "inbound", evt.Name, "")
streams.WithLabelValues(*tags...).Set(float64(evt.StreamsIn))
} else if proto := rcmgr.ParseProtocolScopeName(evt.Name); proto != "" {
} else if proto := ParseProtocolScopeName(evt.Name); proto != "" {
*tags = (*tags)[:0]
*tags = append(*tags, "inbound", "protocol", proto)
streams.WithLabelValues(*tags...).Set(float64(evt.StreamsIn))
Expand All @@ -244,8 +243,8 @@ func (r StatsTraceReporter) consumeEventWithLabelSlice(evt rcmgr.TraceEvt, tags
}
}

case rcmgr.TraceAddConnEvt, rcmgr.TraceRemoveConnEvt:
if p := rcmgr.PeerStrInScopeName(evt.Name); p != "" {
case TraceAddConnEvt, TraceRemoveConnEvt:
if p := PeerStrInScopeName(evt.Name); p != "" {
// Aggregated peer stats. Counts how many peers have N number of connections.
// Uses two buckets aggregations. One to count how many streams the
// peer has now. The other to count the negative value, or how many
Expand Down Expand Up @@ -274,31 +273,31 @@ func (r StatsTraceReporter) consumeEventWithLabelSlice(evt rcmgr.TraceEvt, tags
}
}
} else {
if rcmgr.IsConnScope(evt.Name) {
if IsConnScope(evt.Name) {
// Not measuring this. I don't think it's useful.
break
}

if rcmgr.IsSystemScope(evt.Name) {
if IsSystemScope(evt.Name) {
connsInboundSystem.Set(float64(evt.ConnsIn))
connsOutboundSystem.Set(float64(evt.ConnsOut))
} else if rcmgr.IsTransientScope(evt.Name) {
} else if IsTransientScope(evt.Name) {
connsInboundTransient.Set(float64(evt.ConnsIn))
connsOutboundTransient.Set(float64(evt.ConnsOut))
}

// Represents the delta in fds
if evt.Delta != 0 {
if rcmgr.IsSystemScope(evt.Name) {
if IsSystemScope(evt.Name) {
fdsSystem.Set(float64(evt.FD))
} else if rcmgr.IsTransientScope(evt.Name) {
} else if IsTransientScope(evt.Name) {
fdsTransient.Set(float64(evt.FD))
}
}
}

case rcmgr.TraceReserveMemoryEvt, rcmgr.TraceReleaseMemoryEvt:
if p := rcmgr.PeerStrInScopeName(evt.Name); p != "" {
case TraceReserveMemoryEvt, TraceReleaseMemoryEvt:
if p := PeerStrInScopeName(evt.Name); p != "" {
oldMem := evt.Memory - evt.Delta
if oldMem != evt.Memory {
if oldMem != 0 {
Expand All @@ -308,7 +307,7 @@ func (r StatsTraceReporter) consumeEventWithLabelSlice(evt rcmgr.TraceEvt, tags
peerMemory.Observe(float64(evt.Memory))
}
}
} else if rcmgr.IsConnScope(evt.Name) {
} else if IsConnScope(evt.Name) {
oldMem := evt.Memory - evt.Delta
if oldMem != evt.Memory {
if oldMem != 0 {
Expand All @@ -319,14 +318,14 @@ func (r StatsTraceReporter) consumeEventWithLabelSlice(evt rcmgr.TraceEvt, tags
}
}
} else {
if rcmgr.IsSystemScope(evt.Name) || rcmgr.IsTransientScope(evt.Name) {
if IsSystemScope(evt.Name) || IsTransientScope(evt.Name) {
*tags = (*tags)[:0]
*tags = append(*tags, evt.Name, "")
memory.WithLabelValues(*tags...).Set(float64(evt.Memory))
} else if proto := rcmgr.ParseProtocolScopeName(evt.Name); proto != "" {
memoryTotal.WithLabelValues(*tags...).Set(float64(evt.Memory))
} else if proto := ParseProtocolScopeName(evt.Name); proto != "" {
*tags = (*tags)[:0]
*tags = append(*tags, "protocol", proto)
memory.WithLabelValues(*tags...).Set(float64(evt.Memory))
memoryTotal.WithLabelValues(*tags...).Set(float64(evt.Memory))
} else {
// Not measuring connscope, servicepeer and protocolpeer. Lots of data, and
// you can use aggregated peer stats + service stats to infer
Expand All @@ -335,11 +334,11 @@ func (r StatsTraceReporter) consumeEventWithLabelSlice(evt rcmgr.TraceEvt, tags
}
}

case rcmgr.TraceBlockAddConnEvt, rcmgr.TraceBlockAddStreamEvt, rcmgr.TraceBlockReserveMemoryEvt:
case TraceBlockAddConnEvt, TraceBlockAddStreamEvt, TraceBlockReserveMemoryEvt:
var resource string
if evt.Type == rcmgr.TraceBlockAddConnEvt {
if evt.Type == TraceBlockAddConnEvt {
resource = "connection"
} else if evt.Type == rcmgr.TraceBlockAddStreamEvt {
} else if evt.Type == TraceBlockAddStreamEvt {
resource = "stream"
} else {
resource = "memory"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,38 +1,36 @@
package obs_test
package rcmgr

import (
"sync"
"testing"
"time"

rcmgr "github.com/libp2p/go-libp2p/p2p/host/resource-manager"
"github.com/libp2p/go-libp2p/p2p/host/resource-manager/obs"
"github.com/prometheus/client_golang/prometheus"
)

var registerOnce sync.Once

func TestTraceReporterStartAndClose(t *testing.T) {
rcmgr, err := rcmgr.NewResourceManager(rcmgr.NewFixedLimiter(rcmgr.DefaultLimits.AutoScale()), rcmgr.WithTraceReporter(obs.StatsTraceReporter{}))
rcmgr, err := NewResourceManager(NewFixedLimiter(DefaultLimits.AutoScale()), WithTraceReporter(StatsTraceReporter{}))
if err != nil {
t.Fatal(err)
}
defer rcmgr.Close()
}

func TestConsumeEvent(t *testing.T) {
evt := rcmgr.TraceEvt{
Type: rcmgr.TraceBlockAddStreamEvt,
evt := TraceEvt{
Type: TraceBlockAddStreamEvt,
Name: "conn-1",
DeltaOut: 1,
Time: time.Now().Format(time.RFC3339Nano),
}

registerOnce.Do(func() {
obs.MustRegisterWith(prometheus.DefaultRegisterer)
MustRegisterWith(prometheus.DefaultRegisterer)
})

str, err := obs.NewStatsTraceReporter()
str, err := NewStatsTraceReporter()
if err != nil {
t.Fatal(err)
}
Expand Down
3 changes: 1 addition & 2 deletions p2p/test/resource-manager/rcmgr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
rcmgr "github.com/libp2p/go-libp2p/p2p/host/resource-manager"
rcmgrObs "github.com/libp2p/go-libp2p/p2p/host/resource-manager/obs"

"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -319,7 +318,7 @@ func TestReadmeExample(t *testing.T) {
limiter := rcmgr.NewFixedLimiter(limits)

// (Optional if you want metrics) Construct the OpenCensus metrics reporter.
str, err := rcmgrObs.NewStatsTraceReporter()
str, err := rcmgr.NewStatsTraceReporter()
if err != nil {
panic(err)
}
Expand Down