Skip to content

Commit

Permalink
fix: non-blocking peerlog logging
Browse files Browse the repository at this point in the history
Avoid ever blocking new connections in the peer logger. Instead:

1. Send all new peers to a highly buffered channel.
2. Emit "dropped event" errors whenever we detect that we're dropping events and falling behind.
3. Don't log protocols, they're too large.
4. Don't log disconnects, we don't need them.
  • Loading branch information
Stebalien committed Apr 28, 2020
1 parent 314e83c commit bdbb79d
Show file tree
Hide file tree
Showing 2 changed files with 89 additions and 38 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ require (
github.com/jbenet/go-random v0.0.0-20190219211222-123a90aedc0c
github.com/jbenet/go-temp-err-catcher v0.1.0
github.com/jbenet/goprocess v0.1.4
github.com/libp2p/go-eventbus v0.1.0
github.com/libp2p/go-libp2p v0.8.2
github.com/libp2p/go-libp2p-circuit v0.2.2
github.com/libp2p/go-libp2p-connmgr v0.2.1
Expand Down Expand Up @@ -101,6 +100,7 @@ require (
github.com/whyrusleeping/multiaddr-filter v0.0.0-20160516205228-e903e4adabd7
github.com/whyrusleeping/tar-utils v0.0.0-20180509141711-8c6c8ba81d5c
go.uber.org/fx v1.12.0
go.uber.org/zap v1.14.1
golang.org/x/crypto v0.0.0-20200423211502-4bdfaf469ed5
golang.org/x/net v0.0.0-20200324143707-d3edc9973b7e // indirect
golang.org/x/sys v0.0.0-20200413165638-669c56c373c4
Expand Down
125 changes: 88 additions & 37 deletions plugin/plugins/peerlog/peerlog.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,26 +2,44 @@ package peerlog

import (
"fmt"
"sync/atomic"

core "github.com/ipfs/go-ipfs/core"
plugin "github.com/ipfs/go-ipfs/plugin"
logging "github.com/ipfs/go-log"
eventbus "github.com/libp2p/go-eventbus"
event "github.com/libp2p/go-libp2p-core/event"
network "github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/peerstore"
"go.uber.org/zap"
)

var log = logging.Logger("plugin/peerlog")

type eventType int

const (
eventConnect eventType = iota
eventIdentify
)

type plEvent struct {
kind eventType
peer peer.ID
}

// Log all the PeerIDs we see
//
// Usage:
// GOLOG_FILE=~/peer.log IPFS_LOGGING_FMT=json ipfs daemon
// Output:
// {"level":"info","ts":"2020-02-10T13:54:26.639Z","logger":"plugin/peerlog","caller":"peerlog/peerlog.go:51","msg":"connected","peer":"QmS2H72gdrekXJggGdE9SunXPntBqdkJdkXQJjuxcH8Cbt"}
// {"level":"info","ts":"2020-02-10T13:54:59.095Z","logger":"plugin/peerlog","caller":"peerlog/peerlog.go:56","msg":"disconnected","peer":"QmS2H72gdrekXJggGdE9SunXPntBqdkJdkXQJjuxcH8Cbt"}
// {"level":"info","ts":"2020-02-10T13:54:59.095Z","logger":"plugin/peerlog","caller":"peerlog/peerlog.go:56","msg":"identified","peer":"QmS2H72gdrekXJggGdE9SunXPntBqdkJdkXQJjuxcH8Cbt","agent":"go-ipfs/0.5.0/"}
//
type peerLogPlugin struct{}
type peerLogPlugin struct {
droppedCount uint64
events chan plEvent
}

var _ plugin.PluginDaemonInternal = (*peerLogPlugin)(nil)

Expand All @@ -41,60 +59,93 @@ func (*peerLogPlugin) Version() string {
}

// Init initializes plugin
func (*peerLogPlugin) Init(*plugin.Environment) error {
func (pl *peerLogPlugin) Init(*plugin.Environment) error {
pl.events = make(chan plEvent, 64*1024)
return nil
}

func (*peerLogPlugin) Start(node *core.IpfsNode) error {
func (pl *peerLogPlugin) collectEvents(node *core.IpfsNode) {
go func() {
ctx := node.Context()

dlog := log.Desugar()
for {
dropped := atomic.SwapUint64(&pl.droppedCount, 0)
if dropped > 0 {
dlog.Error("dropped events", zap.Uint64("count", dropped))
}

var e plEvent
select {
case <-ctx.Done():
return
case e = <-pl.events:
}

peerID := zap.String("peer", e.peer.Pretty())

switch e.kind {
case eventConnect:
dlog.Info("connected", peerID)
case eventIdentify:
agent, err := node.Peerstore.Get(e.peer, "AgentVersion")
switch err {
case nil:
case peerstore.ErrNotFound:
continue
default:
dlog.Error("failed to get agent version", zap.Error(err))
continue
}

agentS, ok := agent.(string)
if !ok {
continue
}
dlog.Info("identified", peerID, zap.String("agent", agentS))
}
}
}()

}

func (pl *peerLogPlugin) emit(evt eventType, p peer.ID) {
select {
case pl.events <- plEvent{kind: evt, peer: p}:
default:
atomic.AddUint64(&pl.droppedCount, 1)
}
}

func (pl *peerLogPlugin) Start(node *core.IpfsNode) error {
// Ensure logs from this plugin get printed regardless of global IPFS_LOGGING value
if err := logging.SetLogLevel("plugin/peerlog", "info"); err != nil {
return fmt.Errorf("failed to set log level: %w", err)
}

sub, err := node.PeerHost.EventBus().Subscribe(new(event.EvtPeerIdentificationCompleted))
if err != nil {
return fmt.Errorf("failed to subscribe to identify notifications")
}

var notifee network.NotifyBundle
notifee.ConnectedF = func(net network.Network, conn network.Conn) {
// TODO: Log transport, country, etc?
log.Infow("connected",
"peer", conn.RemotePeer().Pretty(),
)
}
notifee.DisconnectedF = func(net network.Network, conn network.Conn) {
log.Infow("disconnected",
"peer", conn.RemotePeer().Pretty(),
)
pl.emit(eventConnect, conn.RemotePeer())
}
node.PeerHost.Network().Notify(&notifee)

sub, err := node.PeerHost.EventBus().Subscribe(
new(event.EvtPeerIdentificationCompleted),
eventbus.BufSize(1024),
)
if err != nil {
return fmt.Errorf("failed to subscribe to identify notifications")
}
go func() {
defer sub.Close()
for e := range sub.Out() {
switch e := e.(type) {
case event.EvtPeerIdentificationCompleted:
protocols, err := node.Peerstore.GetProtocols(e.Peer)
if err != nil {
log.Errorw("failed to get protocols", "error", err)
continue
}
agent, err := node.Peerstore.Get(e.Peer, "AgentVersion")
if err != nil {
log.Errorw("failed to get agent version", "error", err)
continue
}
log.Infow(
"identified",
"peer", e.Peer.Pretty(),
"agent", agent,
"protocols", protocols,
)
pl.emit(eventIdentify, e.Peer)
}
}
}()

go pl.collectEvents(node)

return nil
}

Expand Down

0 comments on commit bdbb79d

Please sign in to comment.