From 173fef8a2ead3a8a583b2e94a9f7fbaab8972636 Mon Sep 17 00:00:00 2001 From: Sukun Date: Wed, 28 Jun 2023 11:23:44 +0530 Subject: [PATCH] rcmgr: move StatsTraceReporter to rcmgr package (#2388) --- config/config.go | 4 +- dashboards/resource-manager/README.md | 3 +- p2p/host/resource-manager/README.md | 4 +- .../{obs => }/noalloc_test.go | 39 +++++++------ p2p/host/resource-manager/obs/obs.go | 18 ++++++ p2p/host/resource-manager/{obs => }/stats.go | 57 +++++++++---------- .../resource-manager/{obs => }/stats_test.go | 14 ++--- p2p/test/resource-manager/rcmgr_test.go | 3 +- 8 files changed, 77 insertions(+), 65 deletions(-) rename p2p/host/resource-manager/{obs => }/noalloc_test.go (73%) create mode 100644 p2p/host/resource-manager/obs/obs.go rename p2p/host/resource-manager/{obs => }/stats.go (87%) rename p2p/host/resource-manager/{obs => }/stats_test.go (50%) diff --git a/config/config.go b/config/config.go index fb082fbad9..6bc80c96d4 100644 --- a/config/config.go +++ b/config/config.go @@ -26,7 +26,7 @@ import ( blankhost "github.com/libp2p/go-libp2p/p2p/host/blank" "github.com/libp2p/go-libp2p/p2p/host/eventbus" "github.com/libp2p/go-libp2p/p2p/host/peerstore/pstoremem" - rcmgrObs "github.com/libp2p/go-libp2p/p2p/host/resource-manager/obs" + rcmgr "github.com/libp2p/go-libp2p/p2p/host/resource-manager" routed "github.com/libp2p/go-libp2p/p2p/host/routed" "github.com/libp2p/go-libp2p/p2p/net/swarm" tptu "github.com/libp2p/go-libp2p/p2p/net/upgrader" @@ -301,7 +301,7 @@ func (cfg *Config) NewNode() (host.Host, error) { } if !cfg.DisableMetrics { - rcmgrObs.MustRegisterWith(cfg.PrometheusRegisterer) + rcmgr.MustRegisterWith(cfg.PrometheusRegisterer) } h, err := bhost.NewHost(swrm, &bhost.HostOpts{ diff --git a/dashboards/resource-manager/README.md b/dashboards/resource-manager/README.md index cfcc6d3fd7..8697769d3a 100644 --- a/dashboards/resource-manager/README.md +++ b/dashboards/resource-manager/README.md @@ -14,13 +14,12 @@ option libp2p.PrometheusRegisterer. For example: import ( // ... rcmgr "github.com/libp2p/go-libp2p/p2p/host/resource-manager" - rcmgrObs "github.com/libp2p/go-libp2p/p2p/host/resource-manager/obs" "github.com/prometheus/client_golang/prometheus" ) func SetupResourceManager() (network.ResourceManager, error) { - str, err := rcmgrObs.NewStatsTraceReporter() + str, err := rcmgr.NewStatsTraceReporter() if err != nil { return nil, err } diff --git a/p2p/host/resource-manager/README.md b/p2p/host/resource-manager/README.md index 934058edd1..9533eadb04 100644 --- a/p2p/host/resource-manager/README.md +++ b/p2p/host/resource-manager/README.md @@ -48,8 +48,8 @@ limits := cfg.Build(scaledDefaultLimits) limiter := rcmgr.NewFixedLimiter(limits) // (Optional if you want metrics) -rcmgrObs.MustRegisterWith(prometheus.DefaultRegisterer) -str, err := rcmgrObs.NewStatsTraceReporter() +rcmgr.MustRegisterWith(prometheus.DefaultRegisterer) +str, err := rcmgr.NewStatsTraceReporter() if err != nil { panic(err) } diff --git a/p2p/host/resource-manager/obs/noalloc_test.go b/p2p/host/resource-manager/noalloc_test.go similarity index 73% rename from p2p/host/resource-manager/obs/noalloc_test.go rename to p2p/host/resource-manager/noalloc_test.go index 7b409865c6..461a001f9c 100644 --- a/p2p/host/resource-manager/obs/noalloc_test.go +++ b/p2p/host/resource-manager/noalloc_test.go @@ -1,6 +1,6 @@ //go:build nocover -package obs +package rcmgr import ( "math/rand" @@ -8,26 +8,25 @@ import ( "testing" "time" - rcmgr "github.com/libp2p/go-libp2p/p2p/host/resource-manager" "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/require" ) -func randomTraceEvt(rng *rand.Rand) rcmgr.TraceEvt { +func randomTraceEvt(rng *rand.Rand) TraceEvt { // Possibly non-sensical - typs := []rcmgr.TraceEvtTyp{ - rcmgr.TraceStartEvt, - rcmgr.TraceCreateScopeEvt, - rcmgr.TraceDestroyScopeEvt, - rcmgr.TraceReserveMemoryEvt, - rcmgr.TraceBlockReserveMemoryEvt, - rcmgr.TraceReleaseMemoryEvt, - rcmgr.TraceAddStreamEvt, - rcmgr.TraceBlockAddStreamEvt, - rcmgr.TraceRemoveStreamEvt, - rcmgr.TraceAddConnEvt, - rcmgr.TraceBlockAddConnEvt, - rcmgr.TraceRemoveConnEvt, + typs := []TraceEvtTyp{ + TraceStartEvt, + TraceCreateScopeEvt, + TraceDestroyScopeEvt, + TraceReserveMemoryEvt, + TraceBlockReserveMemoryEvt, + TraceReleaseMemoryEvt, + TraceAddStreamEvt, + TraceBlockAddStreamEvt, + TraceRemoveStreamEvt, + TraceAddConnEvt, + TraceBlockAddConnEvt, + TraceRemoveConnEvt, } names := []string{ @@ -43,7 +42,7 @@ func randomTraceEvt(rng *rand.Rand) rcmgr.TraceEvt { "service:libp2p.autonat.peer:12D3Koo", } - return rcmgr.TraceEvt{ + return TraceEvt{ Type: typs[rng.Intn(len(typs))], Name: names[rng.Intn(len(names))], DeltaOut: rng.Intn(5), @@ -60,7 +59,7 @@ func randomTraceEvt(rng *rand.Rand) rcmgr.TraceEvt { } -var registerOnce sync.Once +var regOnce sync.Once func BenchmarkMetricsRecording(b *testing.B) { b.ReportAllocs() @@ -70,7 +69,7 @@ func BenchmarkMetricsRecording(b *testing.B) { }) evtCount := 10000 - evts := make([]rcmgr.TraceEvt, evtCount) + evts := make([]TraceEvt, evtCount) rng := rand.New(rand.NewSource(int64(b.N))) for i := 0; i < evtCount; i++ { evts[i] = randomTraceEvt(rng) @@ -92,7 +91,7 @@ func TestNoAllocsNoCover(t *testing.T) { require.NoError(t, err) evtCount := 10_000 - evts := make([]rcmgr.TraceEvt, 0, evtCount) + evts := make([]TraceEvt, 0, evtCount) rng := rand.New(rand.NewSource(1)) for i := 0; i < evtCount; i++ { diff --git a/p2p/host/resource-manager/obs/obs.go b/p2p/host/resource-manager/obs/obs.go new file mode 100644 index 0000000000..3484fae4b2 --- /dev/null +++ b/p2p/host/resource-manager/obs/obs.go @@ -0,0 +1,18 @@ +// Package obs implements metrics tracing for resource manager +// +// Deprecated: obs is deprecated and the exported types and methods +// are moved to rcmgr package. Use the corresponding identifier in +// the rcmgr package, for example +// obs.NewStatsTraceReporter => rcmgr.NewStatsTraceReporter +package obs + +import ( + rcmgr "github.com/libp2p/go-libp2p/p2p/host/resource-manager" +) + +var MustRegisterWith = rcmgr.MustRegisterWith + +// StatsTraceReporter reports stats on the resource manager using its traces. +type StatsTraceReporter = rcmgr.StatsTraceReporter + +var NewStatsTraceReporter = rcmgr.NewStatsTraceReporter diff --git a/p2p/host/resource-manager/obs/stats.go b/p2p/host/resource-manager/stats.go similarity index 87% rename from p2p/host/resource-manager/obs/stats.go rename to p2p/host/resource-manager/stats.go index 3362d70e31..b9f90b8449 100644 --- a/p2p/host/resource-manager/obs/stats.go +++ b/p2p/host/resource-manager/stats.go @@ -1,9 +1,8 @@ -package obs +package rcmgr import ( "strings" - rcmgr "github.com/libp2p/go-libp2p/p2p/host/resource-manager" "github.com/libp2p/go-libp2p/p2p/metricshelper" "github.com/prometheus/client_golang/prometheus" ) @@ -74,7 +73,7 @@ var ( previousPeerStreamsOutbound = previousPeerStreams.With(prometheus.Labels{"dir": "outbound"}) // Memory - memory = prometheus.NewGaugeVec(prometheus.GaugeOpts{ + memoryTotal = prometheus.NewGaugeVec(prometheus.GaugeOpts{ Namespace: metricNamespace, Name: "memory", Help: "Amount of memory reserved as reported to the Resource Manager", @@ -151,7 +150,7 @@ func MustRegisterWith(reg prometheus.Registerer) { previousPeerStreams, - memory, + memoryTotal, peerMemory, previousPeerMemory, connMemory, @@ -169,7 +168,7 @@ func NewStatsTraceReporter() (StatsTraceReporter, error) { return StatsTraceReporter{}, nil } -func (r StatsTraceReporter) ConsumeEvent(evt rcmgr.TraceEvt) { +func (r StatsTraceReporter) ConsumeEvent(evt TraceEvt) { tags := metricshelper.GetStringSlice() defer metricshelper.PutStringSlice(tags) @@ -177,10 +176,10 @@ func (r StatsTraceReporter) ConsumeEvent(evt rcmgr.TraceEvt) { } // Separate func so that we can test that this function does not allocate. The syncPool may allocate. -func (r StatsTraceReporter) consumeEventWithLabelSlice(evt rcmgr.TraceEvt, tags *[]string) { +func (r StatsTraceReporter) consumeEventWithLabelSlice(evt TraceEvt, tags *[]string) { switch evt.Type { - case rcmgr.TraceAddStreamEvt, rcmgr.TraceRemoveStreamEvt: - if p := rcmgr.PeerStrInScopeName(evt.Name); p != "" { + case TraceAddStreamEvt, TraceRemoveStreamEvt: + if p := PeerStrInScopeName(evt.Name); p != "" { // Aggregated peer stats. Counts how many peers have N number of streams open. // Uses two buckets aggregations. One to count how many streams the // peer has now. The other to count the negative value, or how many @@ -210,11 +209,11 @@ func (r StatsTraceReporter) consumeEventWithLabelSlice(evt rcmgr.TraceEvt, tags } } else { if evt.DeltaOut != 0 { - if rcmgr.IsSystemScope(evt.Name) || rcmgr.IsTransientScope(evt.Name) { + if IsSystemScope(evt.Name) || IsTransientScope(evt.Name) { *tags = (*tags)[:0] *tags = append(*tags, "outbound", evt.Name, "") streams.WithLabelValues(*tags...).Set(float64(evt.StreamsOut)) - } else if proto := rcmgr.ParseProtocolScopeName(evt.Name); proto != "" { + } else if proto := ParseProtocolScopeName(evt.Name); proto != "" { *tags = (*tags)[:0] *tags = append(*tags, "outbound", "protocol", proto) streams.WithLabelValues(*tags...).Set(float64(evt.StreamsOut)) @@ -227,11 +226,11 @@ func (r StatsTraceReporter) consumeEventWithLabelSlice(evt rcmgr.TraceEvt, tags } if evt.DeltaIn != 0 { - if rcmgr.IsSystemScope(evt.Name) || rcmgr.IsTransientScope(evt.Name) { + if IsSystemScope(evt.Name) || IsTransientScope(evt.Name) { *tags = (*tags)[:0] *tags = append(*tags, "inbound", evt.Name, "") streams.WithLabelValues(*tags...).Set(float64(evt.StreamsIn)) - } else if proto := rcmgr.ParseProtocolScopeName(evt.Name); proto != "" { + } else if proto := ParseProtocolScopeName(evt.Name); proto != "" { *tags = (*tags)[:0] *tags = append(*tags, "inbound", "protocol", proto) streams.WithLabelValues(*tags...).Set(float64(evt.StreamsIn)) @@ -244,8 +243,8 @@ func (r StatsTraceReporter) consumeEventWithLabelSlice(evt rcmgr.TraceEvt, tags } } - case rcmgr.TraceAddConnEvt, rcmgr.TraceRemoveConnEvt: - if p := rcmgr.PeerStrInScopeName(evt.Name); p != "" { + case TraceAddConnEvt, TraceRemoveConnEvt: + if p := PeerStrInScopeName(evt.Name); p != "" { // Aggregated peer stats. Counts how many peers have N number of connections. // Uses two buckets aggregations. One to count how many streams the // peer has now. The other to count the negative value, or how many @@ -274,31 +273,31 @@ func (r StatsTraceReporter) consumeEventWithLabelSlice(evt rcmgr.TraceEvt, tags } } } else { - if rcmgr.IsConnScope(evt.Name) { + if IsConnScope(evt.Name) { // Not measuring this. I don't think it's useful. break } - if rcmgr.IsSystemScope(evt.Name) { + if IsSystemScope(evt.Name) { connsInboundSystem.Set(float64(evt.ConnsIn)) connsOutboundSystem.Set(float64(evt.ConnsOut)) - } else if rcmgr.IsTransientScope(evt.Name) { + } else if IsTransientScope(evt.Name) { connsInboundTransient.Set(float64(evt.ConnsIn)) connsOutboundTransient.Set(float64(evt.ConnsOut)) } // Represents the delta in fds if evt.Delta != 0 { - if rcmgr.IsSystemScope(evt.Name) { + if IsSystemScope(evt.Name) { fdsSystem.Set(float64(evt.FD)) - } else if rcmgr.IsTransientScope(evt.Name) { + } else if IsTransientScope(evt.Name) { fdsTransient.Set(float64(evt.FD)) } } } - case rcmgr.TraceReserveMemoryEvt, rcmgr.TraceReleaseMemoryEvt: - if p := rcmgr.PeerStrInScopeName(evt.Name); p != "" { + case TraceReserveMemoryEvt, TraceReleaseMemoryEvt: + if p := PeerStrInScopeName(evt.Name); p != "" { oldMem := evt.Memory - evt.Delta if oldMem != evt.Memory { if oldMem != 0 { @@ -308,7 +307,7 @@ func (r StatsTraceReporter) consumeEventWithLabelSlice(evt rcmgr.TraceEvt, tags peerMemory.Observe(float64(evt.Memory)) } } - } else if rcmgr.IsConnScope(evt.Name) { + } else if IsConnScope(evt.Name) { oldMem := evt.Memory - evt.Delta if oldMem != evt.Memory { if oldMem != 0 { @@ -319,14 +318,14 @@ func (r StatsTraceReporter) consumeEventWithLabelSlice(evt rcmgr.TraceEvt, tags } } } else { - if rcmgr.IsSystemScope(evt.Name) || rcmgr.IsTransientScope(evt.Name) { + if IsSystemScope(evt.Name) || IsTransientScope(evt.Name) { *tags = (*tags)[:0] *tags = append(*tags, evt.Name, "") - memory.WithLabelValues(*tags...).Set(float64(evt.Memory)) - } else if proto := rcmgr.ParseProtocolScopeName(evt.Name); proto != "" { + memoryTotal.WithLabelValues(*tags...).Set(float64(evt.Memory)) + } else if proto := ParseProtocolScopeName(evt.Name); proto != "" { *tags = (*tags)[:0] *tags = append(*tags, "protocol", proto) - memory.WithLabelValues(*tags...).Set(float64(evt.Memory)) + memoryTotal.WithLabelValues(*tags...).Set(float64(evt.Memory)) } else { // Not measuring connscope, servicepeer and protocolpeer. Lots of data, and // you can use aggregated peer stats + service stats to infer @@ -335,11 +334,11 @@ func (r StatsTraceReporter) consumeEventWithLabelSlice(evt rcmgr.TraceEvt, tags } } - case rcmgr.TraceBlockAddConnEvt, rcmgr.TraceBlockAddStreamEvt, rcmgr.TraceBlockReserveMemoryEvt: + case TraceBlockAddConnEvt, TraceBlockAddStreamEvt, TraceBlockReserveMemoryEvt: var resource string - if evt.Type == rcmgr.TraceBlockAddConnEvt { + if evt.Type == TraceBlockAddConnEvt { resource = "connection" - } else if evt.Type == rcmgr.TraceBlockAddStreamEvt { + } else if evt.Type == TraceBlockAddStreamEvt { resource = "stream" } else { resource = "memory" diff --git a/p2p/host/resource-manager/obs/stats_test.go b/p2p/host/resource-manager/stats_test.go similarity index 50% rename from p2p/host/resource-manager/obs/stats_test.go rename to p2p/host/resource-manager/stats_test.go index bc98a0eec3..b4f1ec996a 100644 --- a/p2p/host/resource-manager/obs/stats_test.go +++ b/p2p/host/resource-manager/stats_test.go @@ -1,19 +1,17 @@ -package obs_test +package rcmgr import ( "sync" "testing" "time" - rcmgr "github.com/libp2p/go-libp2p/p2p/host/resource-manager" - "github.com/libp2p/go-libp2p/p2p/host/resource-manager/obs" "github.com/prometheus/client_golang/prometheus" ) var registerOnce sync.Once func TestTraceReporterStartAndClose(t *testing.T) { - rcmgr, err := rcmgr.NewResourceManager(rcmgr.NewFixedLimiter(rcmgr.DefaultLimits.AutoScale()), rcmgr.WithTraceReporter(obs.StatsTraceReporter{})) + rcmgr, err := NewResourceManager(NewFixedLimiter(DefaultLimits.AutoScale()), WithTraceReporter(StatsTraceReporter{})) if err != nil { t.Fatal(err) } @@ -21,18 +19,18 @@ func TestTraceReporterStartAndClose(t *testing.T) { } func TestConsumeEvent(t *testing.T) { - evt := rcmgr.TraceEvt{ - Type: rcmgr.TraceBlockAddStreamEvt, + evt := TraceEvt{ + Type: TraceBlockAddStreamEvt, Name: "conn-1", DeltaOut: 1, Time: time.Now().Format(time.RFC3339Nano), } registerOnce.Do(func() { - obs.MustRegisterWith(prometheus.DefaultRegisterer) + MustRegisterWith(prometheus.DefaultRegisterer) }) - str, err := obs.NewStatsTraceReporter() + str, err := NewStatsTraceReporter() if err != nil { t.Fatal(err) } diff --git a/p2p/test/resource-manager/rcmgr_test.go b/p2p/test/resource-manager/rcmgr_test.go index c953c5c7b7..e4a1227685 100644 --- a/p2p/test/resource-manager/rcmgr_test.go +++ b/p2p/test/resource-manager/rcmgr_test.go @@ -13,7 +13,6 @@ import ( "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/peer" rcmgr "github.com/libp2p/go-libp2p/p2p/host/resource-manager" - rcmgrObs "github.com/libp2p/go-libp2p/p2p/host/resource-manager/obs" "github.com/stretchr/testify/require" ) @@ -319,7 +318,7 @@ func TestReadmeExample(t *testing.T) { limiter := rcmgr.NewFixedLimiter(limits) // (Optional if you want metrics) Construct the OpenCensus metrics reporter. - str, err := rcmgrObs.NewStatsTraceReporter() + str, err := rcmgr.NewStatsTraceReporter() if err != nil { panic(err) }