diff --git a/config/config.go b/config/config.go index 0871e584f4..501819caa6 100644 --- a/config/config.go +++ b/config/config.go @@ -21,6 +21,7 @@ import ( bhost "github.com/libp2p/go-libp2p/p2p/host/basic" "github.com/libp2p/go-libp2p/p2p/host/relay" routed "github.com/libp2p/go-libp2p/p2p/host/routed" + holepunch "github.com/libp2p/go-libp2p/p2p/protocol/holepunch" autonat "github.com/libp2p/go-libp2p-autonat" blankhost "github.com/libp2p/go-libp2p-blankhost" @@ -92,6 +93,9 @@ type Config struct { EnableAutoRelay bool AutoNATConfig StaticRelays []peer.AddrInfo + + EnableHolePunching bool + HolePunchingOptions []holepunch.Option } func (cfg *Config) makeSwarm(ctx context.Context) (*swarm.Swarm, error) { @@ -186,11 +190,13 @@ func (cfg *Config) NewNode(ctx context.Context) (host.Host, error) { } h, err := bhost.NewHost(ctx, swrm, &bhost.HostOpts{ - ConnManager: cfg.ConnManager, - AddrsFactory: cfg.AddrsFactory, - NATManager: cfg.NATManager, - EnablePing: !cfg.DisablePing, - UserAgent: cfg.UserAgent, + ConnManager: cfg.ConnManager, + AddrsFactory: cfg.AddrsFactory, + NATManager: cfg.NATManager, + EnablePing: !cfg.DisablePing, + UserAgent: cfg.UserAgent, + EnableHolePunching: cfg.EnableHolePunching, + HolePunchingOptions: cfg.HolePunchingOptions, }) if err != nil { diff --git a/go.mod b/go.mod index aca28cf957..d94e0a3ff5 100644 --- a/go.mod +++ b/go.mod @@ -17,7 +17,7 @@ require ( github.com/libp2p/go-libp2p-autonat v0.4.0 github.com/libp2p/go-libp2p-blankhost v0.2.0 github.com/libp2p/go-libp2p-circuit v0.4.0 - github.com/libp2p/go-libp2p-core v0.8.4 + github.com/libp2p/go-libp2p-core v0.8.5 github.com/libp2p/go-libp2p-discovery v0.5.0 github.com/libp2p/go-libp2p-loggables v0.1.0 github.com/libp2p/go-libp2p-mplex v0.4.1 @@ -25,7 +25,7 @@ require ( github.com/libp2p/go-libp2p-netutil v0.1.0 github.com/libp2p/go-libp2p-noise v0.1.1 github.com/libp2p/go-libp2p-peerstore v0.2.6 - github.com/libp2p/go-libp2p-swarm v0.4.2 + github.com/libp2p/go-libp2p-swarm v0.4.3 github.com/libp2p/go-libp2p-testing v0.4.0 github.com/libp2p/go-libp2p-tls v0.1.3 github.com/libp2p/go-libp2p-transport-upgrader v0.4.2 diff --git a/go.sum b/go.sum index b69a2be694..a9f612b015 100644 --- a/go.sum +++ b/go.sum @@ -275,8 +275,9 @@ github.com/libp2p/go-libp2p-core v0.8.0 h1:5K3mT+64qDTKbV3yTdbMCzJ7O6wbNsavAEb8i github.com/libp2p/go-libp2p-core v0.8.0/go.mod h1:FfewUH/YpvWbEB+ZY9AQRQ4TAD8sJBt/G1rVvhz5XT8= github.com/libp2p/go-libp2p-core v0.8.1/go.mod h1:FfewUH/YpvWbEB+ZY9AQRQ4TAD8sJBt/G1rVvhz5XT8= github.com/libp2p/go-libp2p-core v0.8.2/go.mod h1:FfewUH/YpvWbEB+ZY9AQRQ4TAD8sJBt/G1rVvhz5XT8= -github.com/libp2p/go-libp2p-core v0.8.4 h1:BL0noEpCJm0FIexkHlGI3nYEW3mGpc7zy7dxRmvXpwg= -github.com/libp2p/go-libp2p-core v0.8.4/go.mod h1:FfewUH/YpvWbEB+ZY9AQRQ4TAD8sJBt/G1rVvhz5XT8= +github.com/libp2p/go-libp2p-core v0.8.3/go.mod h1:FfewUH/YpvWbEB+ZY9AQRQ4TAD8sJBt/G1rVvhz5XT8= +github.com/libp2p/go-libp2p-core v0.8.5 h1:aEgbIcPGsKy6zYcC+5AJivYFedhYa4sW7mIpWpUaLKw= +github.com/libp2p/go-libp2p-core v0.8.5/go.mod h1:FfewUH/YpvWbEB+ZY9AQRQ4TAD8sJBt/G1rVvhz5XT8= github.com/libp2p/go-libp2p-crypto v0.1.0/go.mod h1:sPUokVISZiy+nNuTTH/TY+leRSxnFj/2GLjtOTW90hI= github.com/libp2p/go-libp2p-discovery v0.2.0/go.mod h1:s4VGaxYMbw4+4+tsoQTqh7wfxg97AEdo4GYBt6BadWg= github.com/libp2p/go-libp2p-discovery v0.3.0/go.mod h1:o03drFnz9BVAZdzC/QUQ+NeQOu38Fu7LJGEOK2gQltw= @@ -324,8 +325,8 @@ github.com/libp2p/go-libp2p-swarm v0.2.8 h1:cIUUvytBzNQmGSjnXFlI6UpoBGsaud82mJPI github.com/libp2p/go-libp2p-swarm v0.2.8/go.mod h1:JQKMGSth4SMqonruY0a8yjlPVIkb0mdNSwckW7OYziM= github.com/libp2p/go-libp2p-swarm v0.3.0 h1:w18ZLMccbvwgyR+dODEeA3r1zbFZj+YVq6PClXo77lY= github.com/libp2p/go-libp2p-swarm v0.3.0/go.mod h1:hdv95GWCTmzkgeJpP+GK/9D9puJegb7H57B5hWQR5Kk= -github.com/libp2p/go-libp2p-swarm v0.4.2 h1:rwZUsls+8dImHCcO2LZEa9+QGVF2tBSkywnA0PIYEJg= -github.com/libp2p/go-libp2p-swarm v0.4.2/go.mod h1:gNfZcYwyQJPNKEz3iyPfgwIJu7flqyBQOfCW+8paNCM= +github.com/libp2p/go-libp2p-swarm v0.4.3 h1:tAdkIj9gxMernQ6FTDPALnb8zAiw8xmcYz85FfA4oME= +github.com/libp2p/go-libp2p-swarm v0.4.3/go.mod h1:mmxP1pGBSc1Arw4F5DIjcpjFAmsRzA1KADuMtMuCT4g= github.com/libp2p/go-libp2p-testing v0.0.2/go.mod h1:gvchhf3FQOtBdr+eFUABet5a4MBLK8jM3V4Zghvmi+E= github.com/libp2p/go-libp2p-testing v0.0.3/go.mod h1:gvchhf3FQOtBdr+eFUABet5a4MBLK8jM3V4Zghvmi+E= github.com/libp2p/go-libp2p-testing v0.0.4/go.mod h1:gvchhf3FQOtBdr+eFUABet5a4MBLK8jM3V4Zghvmi+E= diff --git a/options.go b/options.go index 4c638f2d5d..6bbbbf760b 100644 --- a/options.go +++ b/options.go @@ -21,6 +21,7 @@ import ( "github.com/libp2p/go-libp2p/config" bhost "github.com/libp2p/go-libp2p/p2p/host/basic" autorelay "github.com/libp2p/go-libp2p/p2p/host/relay" + holepunch "github.com/libp2p/go-libp2p/p2p/protocol/holepunch" ma "github.com/multiformats/go-multiaddr" ) @@ -462,3 +463,42 @@ func UserAgent(userAgent string) Option { return nil } } + +// Experimental +// EnableHolePunching enables NAT traversal by enabling NATT'd peers to both initiate and respond to hole punching attempts +// to create direct/NAT-traversed connections with other peers. (default: disabled) +// +// Dependencies: +// * Relay (enabled by default) +// +// This subsystem performs two functions: +// +// 1. On receiving an inbound Relay connection, it attempts to create a direct connection with the remote peer +// by initiating and co-ordinating a hole punch over the Relayed connection. +// 2. If a peer sees a request to co-ordinate a hole punch on an outbound Relay connection, +// it will participate in the hole-punch to create a direct connection with the remote peer. +// +// If the hole punch is successful, all new streams will thereafter be created on the hole-punched connection. +// The Relayed connection will eventually be closed after a grace period. +// +// All existing indefinite long-lived streams on the Relayed connection will have to re-opened on the hole-punched connection by the user. +// Users can make use of the `Connected`/`Disconnected` notifications emitted by the Network for this purpose. +// +// It is not mandatory but nice to also enable the `AutoRelay` option (See `EnableAutoRelay`) +// so the peer can discover and connect to Relay servers if it discovers that it is NATT'd and has private reachability via AutoNAT. +// This will then enable it to advertise Relay addresses which can be used to accept inbound Relay connections to then co-ordinate +// a hole punch. +// +// If `EnableAutoRelay` is configured and the user is confident that the peer has private reachability/is NATT'd, +// the `ForceReachabilityPrivate` option can be configured to short-circuit reachability discovery via AutoNAT +// so the peer can immediately start connecting to Relay servers. +// +// If `EnableAutoRelay` is configured, the `StaticRelays` option can be used to configure a static set of Relay servers +// for `AutoRelay` to connect to so that it does not need to discover Relay servers via Routing. +func EnableHolePunching(opts ...holepunch.Option) Option { + return func(cfg *Config) error { + cfg.EnableHolePunching = true + cfg.HolePunchingOptions = opts + return nil + } +} diff --git a/p2p/host/basic/basic_host.go b/p2p/host/basic/basic_host.go index de5b7dfe43..8537456502 100644 --- a/p2p/host/basic/basic_host.go +++ b/p2p/host/basic/basic_host.go @@ -19,6 +19,7 @@ import ( "github.com/libp2p/go-libp2p-core/peerstore" "github.com/libp2p/go-libp2p-core/protocol" "github.com/libp2p/go-libp2p-core/record" + "github.com/libp2p/go-libp2p/p2p/protocol/holepunch" addrutil "github.com/libp2p/go-addr-util" "github.com/libp2p/go-eventbus" @@ -87,6 +88,7 @@ type BasicHost struct { network network.Network mux *msmux.MultistreamMuxer ids *identify.IDService + hps *holepunch.HolePunchService pings *ping.PingService natmgr NATManager maResolver *madns.Resolver @@ -151,6 +153,11 @@ type HostOpts struct { // DisableSignedPeerRecord disables the generation of Signed Peer Records on this host. DisableSignedPeerRecord bool + + // EnableHolePunching enables the peer to initiate/respond to hole punching attempts for NAT traversal. + EnableHolePunching bool + // HolePunchingOptions are options for the hole punching service + HolePunchingOptions []holepunch.Option } // NewHost constructs a new *BasicHost and activates it by attaching its stream and connection handlers to the given inet.Network. @@ -217,6 +224,13 @@ func NewHost(ctx context.Context, n network.Network, opts *HostOpts) (*BasicHost return nil, fmt.Errorf("failed to create Identify service: %s", err) } + if opts.EnableHolePunching { + h.hps, err = holepunch.NewHolePunchService(h, h.ids, opts.HolePunchingOptions...) + if err != nil { + return nil, fmt.Errorf("failed to create hole punch service: %w", err) + } + } + if uint64(opts.NegotiationTimeout) != 0 { h.negtimeout = opts.NegotiationTimeout } @@ -649,7 +663,7 @@ func (h *BasicHost) NewStream(ctx context.Context, p peer.ID, pids ...protocol.I } case <-ctx.Done(): s.Reset() - // wait for the negotiation to cancel. + // wait for `SelectOneOf` to error out because of resetting the stream. <-errCh return nil, ctx.Err() } @@ -682,8 +696,11 @@ func (h *BasicHost) Connect(ctx context.Context, pi peer.AddrInfo) error { // absorb addresses into peerstore h.Peerstore().AddAddrs(pi.ID, pi.Addrs, peerstore.TempAddrTTL) - if h.Network().Connectedness(pi.ID) == network.Connected { - return nil + forceDirect, _ := network.GetForceDirectDial(ctx) + if !forceDirect { + if h.Network().Connectedness(pi.ID) == network.Connected { + return nil + } } resolved, err := h.resolveAddrs(ctx, h.Peerstore().PeerInfo(pi.ID)) @@ -1006,6 +1023,10 @@ func (h *BasicHost) Close() error { h.ids.Close() } + if h.hps != nil { + h.hps.Close() + } + _ = h.emitters.evtLocalProtocolsUpdated.Close() _ = h.emitters.evtLocalAddrsUpdated.Close() h.Network().Close() diff --git a/p2p/host/relay/autorelay.go b/p2p/host/relay/autorelay.go index a8d2284822..6f800ea634 100644 --- a/p2p/host/relay/autorelay.go +++ b/p2p/host/relay/autorelay.go @@ -33,8 +33,11 @@ var ( // These are the known PL-operated relays var DefaultRelays = []string{ "/ip4/147.75.80.110/tcp/4001/p2p/QmbFgm5zan8P6eWWmeyfncR5feYEMPbht5b1FW1C37aQ7y", + "/ip4/147.75.80.110/udp/4001/quic/p2p/QmbFgm5zan8P6eWWmeyfncR5feYEMPbht5b1FW1C37aQ7y", "/ip4/147.75.195.153/tcp/4001/p2p/QmW9m57aiBDHAkKj9nmFSEn7ZqrcF1fZS4bipsTCHburei", + "/ip4/147.75.195.153/udp/4001/quic/p2p/QmW9m57aiBDHAkKj9nmFSEn7ZqrcF1fZS4bipsTCHburei", "/ip4/147.75.70.221/tcp/4001/p2p/Qme8g49gm3q4Acp7xWBKg3nAa9fxZ1YmyDJdyGgoG6LsXh", + "/ip4/147.75.70.221/udp/4001/quic/p2p/Qme8g49gm3q4Acp7xWBKg3nAa9fxZ1YmyDJdyGgoG6LsXh", } // AutoRelay is a Host that uses relays for connectivity when a NAT is detected. diff --git a/p2p/protocol/holepunch/coordination.go b/p2p/protocol/holepunch/coordination.go new file mode 100644 index 0000000000..b2aab4179f --- /dev/null +++ b/p2p/protocol/holepunch/coordination.go @@ -0,0 +1,433 @@ +package holepunch + +import ( + "context" + "errors" + "fmt" + "sync" + "time" + + pb "github.com/libp2p/go-libp2p/p2p/protocol/holepunch/pb" + "github.com/libp2p/go-libp2p/p2p/protocol/identify" + + "github.com/libp2p/go-libp2p-core/host" + "github.com/libp2p/go-libp2p-core/network" + "github.com/libp2p/go-libp2p-core/peer" + "github.com/libp2p/go-libp2p-core/protocol" + + logging "github.com/ipfs/go-log" + protoio "github.com/libp2p/go-msgio/protoio" + ma "github.com/multiformats/go-multiaddr" + manet "github.com/multiformats/go-multiaddr/net" +) + +// TODO Should we have options for these ? +var ( + // Protocol is the libp2p protocol for Hole Punching. + Protocol protocol.ID = "/libp2p/holepunch/1.0.0" + // HolePunchTimeout is the timeout for the hole punch protocol stream. + HolePunchTimeout = 1 * time.Minute + + maxMsgSize = 4 * 1024 // 4K + dialTimeout = 5 * time.Second + maxRetries = 5 + retryWait = 2 * time.Second +) + +var ( + log = logging.Logger("p2p-holepunch") +) + +// TODO Find a better name for this protocol. +// HolePunchService is used to make direct connections with a peer via hole-punching. +type HolePunchService struct { + ctx context.Context + ctxCancel context.CancelFunc + + ids *identify.IDService + host host.Host + + tracer *Tracer + + // ensure we shutdown ONLY once + closeSync sync.Once + refCount sync.WaitGroup + + // active hole punches for deduplicating + activeMx sync.Mutex + active map[peer.ID]struct{} + + isTest bool + handlerErrsMu sync.Mutex + handlerErrs []error +} + +type Option func(*HolePunchService) error + +// NewHolePunchService creates a new service that can be used for hole punching +// The `isTest` should ONLY be turned ON for testing. +func NewHolePunchService(h host.Host, ids *identify.IDService, opts ...Option) (*HolePunchService, error) { + if ids == nil { + return nil, errors.New("Identify service can't be nil") + } + + ctx, cancel := context.WithCancel(context.Background()) + hs := &HolePunchService{ + ctx: ctx, + ctxCancel: cancel, + host: h, + ids: ids, + active: make(map[peer.ID]struct{}), + } + + for _, opt := range opts { + err := opt(hs) + if err != nil { + cancel() + return nil, err + } + } + + h.SetStreamHandler(Protocol, hs.handleNewStream) + h.Network().Notify((*netNotifiee)(hs)) + return hs, nil +} + +// Close closes the Hole Punch Service. +func (hs *HolePunchService) Close() error { + hs.closeSync.Do(func() { + hs.ctxCancel() + hs.refCount.Wait() + }) + + return nil +} + +// attempts to make a direct connection with the remote peer of `relayConn` by co-ordinating a hole punch over +// the given relay connection `relayConn`. +func (hs *HolePunchService) HolePunch(rp peer.ID) error { + // short-circuit hole punching if a direct dial works. + // attempt a direct connection ONLY if we have a public address for the remote peer + for _, a := range hs.host.Peerstore().Addrs(rp) { + if manet.IsPublicAddr(a) && !isRelayAddress(a) { + forceDirectConnCtx := network.WithForceDirectDial(hs.ctx, "hole-punching") + dialCtx, cancel := context.WithTimeout(forceDirectConnCtx, dialTimeout) + + tstart := time.Now() + err := hs.host.Connect(dialCtx, peer.AddrInfo{ID: rp}) + dt := time.Since(tstart) + cancel() + + if err == nil { + hs.tracer.DirectDialSuccessful(rp, dt) + log.Debugf("direct connection to peer %s successful, no need for a hole punch", rp.Pretty()) + return nil + } + hs.tracer.DirectDialFailed(rp, dt, err) + break + } + } + + // hole punch + hpCtx := network.WithUseTransient(hs.ctx, "hole-punch") + sCtx := network.WithNoDial(hpCtx, "hole-punch") + s, err := hs.host.NewStream(sCtx, rp, Protocol) + if err != nil { + err = fmt.Errorf("failed to open hole-punching stream with peer %s: %w", rp, err) + hs.tracer.ProtocolError(rp, err) + return err + } + log.Infof("will attempt hole punch with peer %s", rp.Pretty()) + _ = s.SetDeadline(time.Now().Add(HolePunchTimeout)) + w := protoio.NewDelimitedWriter(s) + + // send a CONNECT and start RTT measurement. + msg := new(pb.HolePunch) + msg.Type = pb.HolePunch_CONNECT.Enum() + msg.ObsAddrs = addrsToBytes(hs.ids.OwnObservedAddrs()) + + tstart := time.Now() + if err := w.WriteMsg(msg); err != nil { + s.Reset() + err = fmt.Errorf("failed to send hole punch CONNECT: %w", err) + hs.tracer.ProtocolError(rp, err) + return err + } + + // wait for a CONNECT message from the remote peer + rd := protoio.NewDelimitedReader(s, maxMsgSize) + msg.Reset() + if err := rd.ReadMsg(msg); err != nil { + s.Reset() + err = fmt.Errorf("failed to read CONNECT message from remote peer: %w", err) + hs.tracer.ProtocolError(rp, err) + return err + } + rtt := time.Since(tstart) + + if t := msg.GetType(); t != pb.HolePunch_CONNECT { + s.Reset() + err = fmt.Errorf("expected CONNECT message but got %d", t) + hs.tracer.ProtocolError(rp, err) + return err + } + + obsRemote := addrsFromBytes(msg.ObsAddrs) + + // send a SYNC message and attempt a direct connect after half the RTT + msg.Reset() + msg.Type = pb.HolePunch_SYNC.Enum() + if err := w.WriteMsg(msg); err != nil { + s.Reset() + err = fmt.Errorf("failed to send SYNC message for hole punching: %w", err) + hs.tracer.ProtocolError(rp, err) + return err + } + defer s.Close() + + synTime := rtt / 2 + log.Debugf("peer RTT is %s; starting hole punch in %s", rtt, synTime) + + // wait for sync to reach the other peer and then punch a hole for it in our NAT + // by attempting a connect to it. + select { + case <-time.After(synTime): + pi := peer.AddrInfo{ + ID: rp, + Addrs: obsRemote, + } + tstart = time.Now() + hs.tracer.StartHolePunch(rp, obsRemote, rtt) + err = hs.holePunchConnectWithRetry(pi) + dt := time.Since(tstart) + hs.tracer.EndHolePunch(rp, dt, err) + if err == nil { + log.Infof("hole punching with %s successful after %s", rp, dt) + } + return err + + case <-hs.ctx.Done(): + return hs.ctx.Err() + } +} + +// HandlerErrors returns the errors accumulated by the Stream Handler. +// This is ONLY for testing. +func (hs *HolePunchService) HandlerErrors() []error { + hs.handlerErrsMu.Lock() + defer hs.handlerErrsMu.Unlock() + return hs.handlerErrs +} + +func (hs *HolePunchService) handlerError(p peer.ID, err error) { + if !hs.isTest { + hs.tracer.ProtocolError(p, err) + log.Warn(err) + return + } + + hs.handlerErrsMu.Lock() + defer hs.handlerErrsMu.Unlock() + + hs.handlerErrs = append(hs.handlerErrs, err) +} + +func (hs *HolePunchService) handleNewStream(s network.Stream) { + log.Infof("got hole punch request from peer %s", s.Conn().RemotePeer().Pretty()) + _ = s.SetDeadline(time.Now().Add(HolePunchTimeout)) + rp := s.Conn().RemotePeer() + wr := protoio.NewDelimitedWriter(s) + rd := protoio.NewDelimitedReader(s, maxMsgSize) + + // Read Connect message + msg := new(pb.HolePunch) + if err := rd.ReadMsg(msg); err != nil { + s.Reset() + hs.handlerError(rp, fmt.Errorf("failed to read message from initator: %w", err)) + return + } + if t := msg.GetType(); t != pb.HolePunch_CONNECT { + s.Reset() + hs.handlerError(rp, fmt.Errorf("expected CONNECT message from initiator but got %d", t)) + return + } + obsDial := addrsFromBytes(msg.ObsAddrs) + + // Write CONNECT message + msg.Reset() + msg.Type = pb.HolePunch_CONNECT.Enum() + msg.ObsAddrs = addrsToBytes(hs.ids.OwnObservedAddrs()) + tstart := time.Now() + if err := wr.WriteMsg(msg); err != nil { + s.Reset() + hs.handlerError(rp, fmt.Errorf("failed to write CONNECT message to initator:: %w", err)) + return + } + + // Read SYNC message + msg.Reset() + if err := rd.ReadMsg(msg); err != nil { + s.Reset() + hs.handlerError(rp, fmt.Errorf("failed to read message from initator: %w", err)) + return + } + rtt := time.Since(tstart) + + if t := msg.GetType(); t != pb.HolePunch_SYNC { + s.Reset() + hs.handlerError(rp, fmt.Errorf("expected SYNC message from initiator but got %d", t)) + return + } + defer s.Close() + + // Hole punch now by forcing a connect + pi := peer.AddrInfo{ + ID: rp, + Addrs: obsDial, + } + + hs.tracer.StartHolePunch(rp, obsDial, rtt) + tstart = time.Now() + err := hs.holePunchConnectWithRetry(pi) + dt := time.Since(tstart) + hs.tracer.EndHolePunch(rp, dt, err) + if err != nil { + log.Warnf("hole punching with %s failed after %s: %s", rp, dt, err) + } else { + log.Infof("hole punching with %s successful after %s", rp, dt) + } +} + +func (hs *HolePunchService) holePunchConnectWithRetry(pi peer.AddrInfo) error { + log.Debugf("starting hole punch with %s", pi.ID) + holePunchCtx := network.WithSimultaneousConnect(hs.ctx, "hole-punching") + forceDirectConnCtx := network.WithForceDirectDial(holePunchCtx, "hole-punching") + + doConnect := func(attempt int) error { + dialCtx, cancel := context.WithTimeout(forceDirectConnCtx, dialTimeout) + defer cancel() + + hs.tracer.HolePunchAttempt(pi.ID, attempt) + err := hs.host.Connect(dialCtx, pi) + if err == nil { + log.Infof("hole punch with peer %s successful after %d retries; direct conns to peer are:", pi.ID, attempt) + for _, c := range hs.host.Network().ConnsToPeer(pi.ID) { + if !isRelayAddress(c.RemoteMultiaddr()) { + log.Info(c) + } + } + } + + return err + } + + err := doConnect(0) + if err == nil { + return nil + } + + log.Infof("first hole punch attempt with peer %s failed: %s; will retry in %s...", pi.ID, err, retryWait) + + for i := 1; i <= maxRetries; i++ { + time.Sleep(retryWait) + + err = doConnect(i) + if err == nil { + return nil + } + } + + return fmt.Errorf("all retries for hole punch with peer %s failed: %w", pi.ID, err) +} + +func isRelayAddress(a ma.Multiaddr) bool { + _, err := a.ValueForProtocol(ma.P_CIRCUIT) + + return err == nil +} + +func addrsToBytes(as []ma.Multiaddr) [][]byte { + bzs := make([][]byte, 0, len(as)) + for _, a := range as { + bzs = append(bzs, a.Bytes()) + } + + return bzs +} + +func addrsFromBytes(bzs [][]byte) []ma.Multiaddr { + addrs := make([]ma.Multiaddr, 0, len(bzs)) + for _, bz := range bzs { + a, err := ma.NewMultiaddrBytes(bz) + if err == nil { + addrs = append(addrs, a) + } + } + + return addrs +} + +type netNotifiee HolePunchService + +func (nn *netNotifiee) HolePunchService() *HolePunchService { + return (*HolePunchService)(nn) +} + +func (nn *netNotifiee) Connected(_ network.Network, v network.Conn) { + hs := nn.HolePunchService() + dir := v.Stat().Direction + + // Hole punch if it's an inbound proxy connection. + // If we already have a direct connection with the remote peer, this will be a no-op. + if dir == network.DirInbound && isRelayAddress(v.RemoteMultiaddr()) { + // short-circuit check to see if we already have a direct connection + for _, c := range hs.host.Network().ConnsToPeer(v.RemotePeer()) { + if !isRelayAddress(c.RemoteMultiaddr()) { + return + } + } + + p := v.RemotePeer() + hs.activeMx.Lock() + _, active := hs.active[p] + if !active { + hs.active[p] = struct{}{} + } + hs.activeMx.Unlock() + + if active { + return + } + + log.Debugf("got inbound proxy conn from peer %s, connectionID is %s", v.RemotePeer().String(), v.ID()) + hs.refCount.Add(1) + go func() { + defer hs.refCount.Done() + defer func() { + hs.activeMx.Lock() + delete(hs.active, p) + hs.activeMx.Unlock() + }() + select { + // waiting for Identify here will allow us to access the peer's public and observed addresses + // that we can dial to for a hole punch. + case <-hs.ids.IdentifyWait(v): + case <-hs.ctx.Done(): + return + } + + err := hs.HolePunch(v.RemotePeer()) + if err != nil { + log.Warnf("hole punching attempt with %s failed: %s", v.RemotePeer(), err) + } + }() + return + } +} + +// NO-OPS +func (nn *netNotifiee) Disconnected(_ network.Network, v network.Conn) {} +func (nn *netNotifiee) OpenedStream(n network.Network, v network.Stream) {} +func (nn *netNotifiee) ClosedStream(n network.Network, v network.Stream) {} +func (nn *netNotifiee) Listen(n network.Network, a ma.Multiaddr) {} +func (nn *netNotifiee) ListenClose(n network.Network, a ma.Multiaddr) {} diff --git a/p2p/protocol/holepunch/coordination_test.go b/p2p/protocol/holepunch/coordination_test.go new file mode 100644 index 0000000000..ea1c7ffc92 --- /dev/null +++ b/p2p/protocol/holepunch/coordination_test.go @@ -0,0 +1,407 @@ +package holepunch_test + +import ( + "context" + "net" + "testing" + "time" + + "github.com/libp2p/go-libp2p" + circuit "github.com/libp2p/go-libp2p-circuit" + "github.com/libp2p/go-libp2p-core/host" + "github.com/libp2p/go-libp2p-core/network" + "github.com/libp2p/go-libp2p-core/peer" + "github.com/libp2p/go-libp2p-core/peerstore" + "github.com/libp2p/go-libp2p/p2p/protocol/holepunch" + "github.com/libp2p/go-libp2p/p2p/protocol/holepunch/pb" + "github.com/libp2p/go-libp2p/p2p/protocol/identify" + identify_pb "github.com/libp2p/go-libp2p/p2p/protocol/identify/pb" + "github.com/libp2p/go-msgio/protoio" + ma "github.com/multiformats/go-multiaddr" + manet "github.com/multiformats/go-multiaddr/net" + "github.com/stretchr/testify/require" +) + +func TestDirectDialWorks(t *testing.T) { + // all addrs should be marked as public + cpy := manet.Private4 + manet.Private4 = []*net.IPNet{} + defer func() { + manet.Private4 = cpy + }() + + ctx := context.Background() + + // try to hole punch without any connection and streams, if it works -> it's a direct connection + h1, h1ps := mkHostWithHolePunchSvc(t, ctx) + h2, _ := mkHostWithHolePunchSvc(t, ctx) + h2.RemoveStreamHandler(holepunch.Protocol) + h1.Peerstore().AddAddrs(h2.ID(), h2.Addrs(), peerstore.ConnectedAddrTTL) + + require.NoError(t, h1ps.HolePunch(h2.ID())) + + cs := h1.Network().ConnsToPeer(h2.ID()) + require.Len(t, cs, 1) + + cs = h2.Network().ConnsToPeer(h1.ID()) + require.Len(t, cs, 1) +} + +func TestEndToEndSimConnect(t *testing.T) { + // all addrs should be marked as public + cpy := manet.Private4 + manet.Private4 = []*net.IPNet{} + defer func() { + manet.Private4 = cpy + }() + ctx := context.Background() + r := mkRelay(t, ctx) + + h1, _ := mkHostWithHolePunchSvc(t, ctx) + h2, _ := mkHostWithStaticAutoRelay(t, ctx, r) + + // h1 has a relay addr + // h2 should connect to the relay addr + var raddr ma.Multiaddr + for _, a := range h2.Addrs() { + if _, err := a.ValueForProtocol(ma.P_CIRCUIT); err == nil { + raddr = a + break + } + } + require.NotEmpty(t, raddr) + + require.NoError(t, h1.Connect(ctx, peer.AddrInfo{ + ID: h2.ID(), + Addrs: []ma.Multiaddr{raddr}, + })) + + // wait till a direct connection is complete + ensureDirectConn(t, h1, h2) + // ensure no hole-punching streams are open on either side + ensureNoHolePunchingStream(t, h1, h2) +} + +func TestFailuresOnInitiator(t *testing.T) { + t.Skip("broken test") + + ctx := context.Background() + + tcs := map[string]struct { + rhandler func(s network.Stream) + errMsg string + holePunchTimeout time.Duration + }{ + "responder does NOT send a CONNECT message": { + rhandler: func(s network.Stream) { + wr := protoio.NewDelimitedWriter(s) + msg := new(holepunch_pb.HolePunch) + msg.Type = holepunch_pb.HolePunch_SYNC.Enum() + wr.WriteMsg(msg) + }, + errMsg: "expected CONNECT message", + }, + "responder does NOT support protocol": { + rhandler: nil, + errMsg: "protocol not supported", + }, + "unable to READ CONNECT message from responder": { + rhandler: func(s network.Stream) { + s.Reset() + }, + errMsg: "failed to read CONNECT message", + }, + "responder does NOT reply within hole punch deadline": { + holePunchTimeout: 10 * time.Millisecond, + rhandler: func(s network.Stream) { + for { + + } + }, + errMsg: "i/o deadline reached", + }, + } + + for name, tc := range tcs { + t.Run(name, func(t *testing.T) { + if tc.holePunchTimeout != 0 { + cpy := holepunch.HolePunchTimeout + holepunch.HolePunchTimeout = tc.holePunchTimeout + defer func() { + holepunch.HolePunchTimeout = cpy + }() + } + + h1, h1ps := mkHostWithHolePunchSvc(t, ctx) + h2, _ := mkHostWithHolePunchSvc(t, ctx) + + if tc.rhandler != nil { + h2.SetStreamHandler(holepunch.Protocol, tc.rhandler) + } else { + h2.RemoveStreamHandler(holepunch.Protocol) + } + + connect(t, ctx, h1, h2) + err := h1ps.HolePunch(h2.ID()) + require.Error(t, err) + require.Contains(t, err.Error(), tc.errMsg) + }) + + } +} + +func TestFailuresOnResponder(t *testing.T) { + ctx := context.Background() + + tcs := map[string]struct { + initiator func(s network.Stream) + errMsg string + holePunchTimeout time.Duration + }{ + "initiator does NOT send a CONNECT message": { + initiator: func(s network.Stream) { + w := protoio.NewDelimitedWriter(s) + msg := new(holepunch_pb.HolePunch) + msg.Type = holepunch_pb.HolePunch_SYNC.Enum() + w.WriteMsg(msg) + + }, + errMsg: "expected CONNECT message", + }, + + "initiator does NOT send a SYNC message after a Connect message": { + initiator: func(s network.Stream) { + w := protoio.NewDelimitedWriter(s) + msg := new(holepunch_pb.HolePunch) + msg.Type = holepunch_pb.HolePunch_CONNECT.Enum() + w.WriteMsg(msg) + + msg = new(holepunch_pb.HolePunch) + msg.Type = holepunch_pb.HolePunch_CONNECT.Enum() + w.WriteMsg(msg) + }, + errMsg: "expected SYNC message", + }, + + "initiator does NOT reply within hole punch deadline": { + holePunchTimeout: 10 * time.Millisecond, + initiator: func(s network.Stream) { + w := protoio.NewDelimitedWriter(s) + msg := new(holepunch_pb.HolePunch) + msg.Type = holepunch_pb.HolePunch_CONNECT.Enum() + w.WriteMsg(msg) + for { + + } + + }, + errMsg: "i/o deadline reached", + }, + } + + for name, tc := range tcs { + t.Run(name, func(t *testing.T) { + if tc.holePunchTimeout != 0 { + cpy := holepunch.HolePunchTimeout + holepunch.HolePunchTimeout = tc.holePunchTimeout + defer func() { + holepunch.HolePunchTimeout = cpy + }() + } + + h1, _ := mkHostWithHolePunchSvc(t, ctx) + h2, h2ps := mkHostWithHolePunchSvc(t, ctx) + connect(t, ctx, h1, h2) + + s, err := h1.NewStream(ctx, h2.ID(), holepunch.Protocol) + require.NoError(t, err) + + go tc.initiator(s) + + require.Eventually(t, func() bool { + return len(h2ps.HandlerErrors()) != 0 + }, 5*time.Second, 100*time.Millisecond) + + errs := h2ps.HandlerErrors() + require.Len(t, errs, 1) + err = errs[0] + require.Contains(t, err.Error(), tc.errMsg) + }) + + } +} + +func TestObservedAddressesAreExchanged(t *testing.T) { + ctx := context.Background() + + obsAddrs1 := ma.StringCast("/ip4/8.8.8.8/tcp/1234") + obsAddrs2 := ma.StringCast("/ip4/9.8.8.8/tcp/1234") + + h1, h1ps := mkHostWithHolePunchSvc(t, ctx) + h2, _ := mkHostWithHolePunchSvc(t, ctx) + + // modify identify handlers to send our fake observed addresses + h1.SetStreamHandler(identify.ID, func(s network.Stream) { + writer := protoio.NewDelimitedWriter(s) + msg := new(identify_pb.Identify) + msg.ObservedAddr = obsAddrs2.Bytes() + writer.WriteMsg(msg) + s.Close() + }) + + h2.SetStreamHandler(identify.ID, func(s network.Stream) { + writer := protoio.NewDelimitedWriter(s) + msg := new(identify_pb.Identify) + msg.ObservedAddr = obsAddrs1.Bytes() + writer.WriteMsg(msg) + s.Close() + }) + + connect(t, ctx, h1, h2) + + // hole punch so both peers exchange each other's observed addresses and save to peerstore + require.NoError(t, h1ps.HolePunch(h2.ID())) + + require.Eventually(t, func() bool { + h2Addrs := h1.Peerstore().Addrs(h2.ID()) + h1Addrs := h2.Peerstore().Addrs(h1.ID()) + + b1 := false + b2 := false + for _, a := range h1Addrs { + if a.Equal(obsAddrs1) { + b1 = true + break + } + } + + for _, a := range h2Addrs { + if a.Equal(obsAddrs2) { + b2 = true + break + } + } + + return b1 && b2 + }, 2*time.Second, 100*time.Millisecond) +} + +func ensureNoHolePunchingStream(t *testing.T, h1, h2 host.Host) { + require.Eventually(t, func() bool { + for _, c := range h1.Network().ConnsToPeer(h2.ID()) { + for _, s := range c.GetStreams() { + if s.ID() == string(holepunch.Protocol) { + return false + } + } + } + + return true + + }, 5*time.Second, 200*time.Millisecond) + + require.Eventually(t, func() bool { + for _, c := range h2.Network().ConnsToPeer(h1.ID()) { + for _, s := range c.GetStreams() { + if s.ID() == string(holepunch.Protocol) { + return false + } + } + } + + return true + + }, 5*time.Second, 200*time.Millisecond) +} + +func ensureDirectConn(t *testing.T, h1, h2 host.Host) { + require.Eventually(t, func() bool { + cs := h1.Network().ConnsToPeer(h2.ID()) + if len(cs) != 2 { + return false + } + for _, c := range cs { + if _, err := c.RemoteMultiaddr().ValueForProtocol(ma.P_CIRCUIT); err != nil { + return true + } + } + return false + }, 5*time.Second, 200*time.Millisecond) + + require.Eventually(t, func() bool { + cs := h2.Network().ConnsToPeer(h1.ID()) + if len(cs) != 2 { + return false + } + for _, c := range cs { + if _, err := c.RemoteMultiaddr().ValueForProtocol(ma.P_CIRCUIT); err != nil { + return true + } + } + return false + }, 5*time.Second, 200*time.Millisecond) +} + +func TestNoHolePunchingIfDirectConnAlreadyExists(t *testing.T) { + +} + +func connect(t *testing.T, ctx context.Context, h1, h2 host.Host) network.Conn { + require.NoError(t, h1.Connect(ctx, peer.AddrInfo{ + ID: h2.ID(), + Addrs: h2.Addrs(), + })) + + cs := h1.Network().ConnsToPeer(h2.ID()) + require.Len(t, cs, 1) + return cs[0] +} + +func mkHostWithStaticAutoRelay(t *testing.T, ctx context.Context, relay host.Host) (host.Host, *holepunch.HolePunchService) { + pi := peer.AddrInfo{ + ID: relay.ID(), + Addrs: relay.Addrs(), + } + + h, err := libp2p.New(ctx, libp2p.EnableRelay(), libp2p.EnableAutoRelay(), libp2p.ForceReachabilityPrivate(), + libp2p.StaticRelays([]peer.AddrInfo{pi})) + require.NoError(t, err) + ids, err := identify.NewIDService(h) + require.NoError(t, err) + hps, err := holepunch.NewHolePunchService(h, ids, withTest) + require.NoError(t, err) + + // wait till we have a relay addr + require.Eventually(t, func() bool { + for _, a := range h.Addrs() { + if _, err := a.ValueForProtocol(ma.P_CIRCUIT); err == nil { + return true + } + } + + return false + }, 5*time.Second, 200*time.Millisecond) + + return h, hps +} + +func mkRelay(t *testing.T, ctx context.Context) host.Host { + h, err := libp2p.New(ctx, libp2p.EnableRelay(circuit.OptHop)) + require.NoError(t, err) + return h +} + +func mkHostWithHolePunchSvc(t *testing.T, ctx context.Context) (host.Host, *holepunch.HolePunchService) { + h, err := libp2p.New(ctx) + require.NoError(t, err) + ids, err := identify.NewIDService(h) + require.NoError(t, err) + hps, err := holepunch.NewHolePunchService(h, ids, withTest) + require.NoError(t, err) + + return h, hps +} + +func withTest(hps *HolePunchService) error { + hps.isTest = true +} diff --git a/p2p/protocol/holepunch/e2e_test.go b/p2p/protocol/holepunch/e2e_test.go new file mode 100644 index 0000000000..fae2d1d667 --- /dev/null +++ b/p2p/protocol/holepunch/e2e_test.go @@ -0,0 +1,9 @@ +package holepunch + +import ( + "testing" +) + +func TestHolePunchOnRelayedConnection(t *testing.T) { + +} diff --git a/p2p/protocol/holepunch/pb/Makefile b/p2p/protocol/holepunch/pb/Makefile new file mode 100644 index 0000000000..60d9dd4434 --- /dev/null +++ b/p2p/protocol/holepunch/pb/Makefile @@ -0,0 +1,11 @@ +PB = $(wildcard *.proto) +GO = $(PB:.proto=.pb.go) + +all: $(GO) + +%.pb.go: %.proto + protoc --proto_path=$(GOPATH)/src:. --gogofast_out=. $< + +clean: + rm -f *.pb.go + rm -f *.go \ No newline at end of file diff --git a/p2p/protocol/holepunch/pb/holepunch.pb.go b/p2p/protocol/holepunch/pb/holepunch.pb.go new file mode 100644 index 0000000000..0f2452ebeb --- /dev/null +++ b/p2p/protocol/holepunch/pb/holepunch.pb.go @@ -0,0 +1,414 @@ +// Code generated by protoc-gen-gogo. DO NOT EDIT. +// source: holepunch.proto + +package holepunch_pb + +import ( + fmt "fmt" + proto "github.com/gogo/protobuf/proto" + io "io" + math "math" + math_bits "math/bits" +) + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = fmt.Errorf +var _ = math.Inf + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the proto package it is being compiled against. +// A compilation error at this line likely means your copy of the +// proto package needs to be updated. +const _ = proto.GoGoProtoPackageIsVersion3 // please upgrade the proto package + +type HolePunch_Type int32 + +const ( + HolePunch_CONNECT HolePunch_Type = 100 + HolePunch_SYNC HolePunch_Type = 300 +) + +var HolePunch_Type_name = map[int32]string{ + 100: "CONNECT", + 300: "SYNC", +} + +var HolePunch_Type_value = map[string]int32{ + "CONNECT": 100, + "SYNC": 300, +} + +func (x HolePunch_Type) Enum() *HolePunch_Type { + p := new(HolePunch_Type) + *p = x + return p +} + +func (x HolePunch_Type) String() string { + return proto.EnumName(HolePunch_Type_name, int32(x)) +} + +func (x *HolePunch_Type) UnmarshalJSON(data []byte) error { + value, err := proto.UnmarshalJSONEnum(HolePunch_Type_value, data, "HolePunch_Type") + if err != nil { + return err + } + *x = HolePunch_Type(value) + return nil +} + +func (HolePunch_Type) EnumDescriptor() ([]byte, []int) { + return fileDescriptor_290ddea0f23ef64a, []int{0, 0} +} + +type HolePunch struct { + Type *HolePunch_Type `protobuf:"varint,1,opt,name=type,enum=holepunch.pb.HolePunch_Type" json:"type,omitempty"` + // For hole punching, we'll send some additional observed addresses to the remote peer + // that could have been filtered by the Host address factory (for example: AutoRelay removes all public addresses if peer has private reachability). + // This is a hack! + // We plan to have a better address discovery and advertisement mechanism in the future. + // See https://github.com/libp2p/go-libp2p-autonat/pull/98 + ObsAddrs [][]byte `protobuf:"bytes,2,rep,name=ObsAddrs" json:"ObsAddrs,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *HolePunch) Reset() { *m = HolePunch{} } +func (m *HolePunch) String() string { return proto.CompactTextString(m) } +func (*HolePunch) ProtoMessage() {} +func (*HolePunch) Descriptor() ([]byte, []int) { + return fileDescriptor_290ddea0f23ef64a, []int{0} +} +func (m *HolePunch) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *HolePunch) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_HolePunch.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *HolePunch) XXX_Merge(src proto.Message) { + xxx_messageInfo_HolePunch.Merge(m, src) +} +func (m *HolePunch) XXX_Size() int { + return m.Size() +} +func (m *HolePunch) XXX_DiscardUnknown() { + xxx_messageInfo_HolePunch.DiscardUnknown(m) +} + +var xxx_messageInfo_HolePunch proto.InternalMessageInfo + +func (m *HolePunch) GetType() HolePunch_Type { + if m != nil && m.Type != nil { + return *m.Type + } + return HolePunch_CONNECT +} + +func (m *HolePunch) GetObsAddrs() [][]byte { + if m != nil { + return m.ObsAddrs + } + return nil +} + +func init() { + proto.RegisterEnum("holepunch.pb.HolePunch_Type", HolePunch_Type_name, HolePunch_Type_value) + proto.RegisterType((*HolePunch)(nil), "holepunch.pb.HolePunch") +} + +func init() { proto.RegisterFile("holepunch.proto", fileDescriptor_290ddea0f23ef64a) } + +var fileDescriptor_290ddea0f23ef64a = []byte{ + // 153 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0xe2, 0xcf, 0xc8, 0xcf, 0x49, + 0x2d, 0x28, 0xcd, 0x4b, 0xce, 0xd0, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0xe2, 0x41, 0x12, 0x48, + 0x52, 0xaa, 0xe4, 0xe2, 0xf4, 0xc8, 0xcf, 0x49, 0x0d, 0x00, 0xf1, 0x85, 0x0c, 0xb8, 0x58, 0x4a, + 0x2a, 0x0b, 0x52, 0x25, 0x18, 0x15, 0x18, 0x35, 0xf8, 0x8c, 0x64, 0xf4, 0x90, 0x55, 0xea, 0xc1, + 0x95, 0xe9, 0x85, 0x54, 0x16, 0xa4, 0x06, 0x81, 0x55, 0x0a, 0x49, 0x71, 0x71, 0xf8, 0x27, 0x15, + 0x3b, 0xa6, 0xa4, 0x14, 0x15, 0x4b, 0x30, 0x29, 0x30, 0x6b, 0xf0, 0x04, 0xc1, 0xf9, 0x4a, 0x72, + 0x5c, 0x2c, 0x20, 0x95, 0x42, 0xdc, 0x5c, 0xec, 0xce, 0xfe, 0x7e, 0x7e, 0xae, 0xce, 0x21, 0x02, + 0x29, 0x42, 0x9c, 0x5c, 0x2c, 0xc1, 0x91, 0x7e, 0xce, 0x02, 0x6b, 0x98, 0x9c, 0x78, 0x4e, 0x3c, + 0x92, 0x63, 0xbc, 0xf0, 0x48, 0x8e, 0xf1, 0xc1, 0x23, 0x39, 0x46, 0x40, 0x00, 0x00, 0x00, 0xff, + 0xff, 0x62, 0xf4, 0xc8, 0x7c, 0xa8, 0x00, 0x00, 0x00, +} + +func (m *HolePunch) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *HolePunch) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *HolePunch) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if m.XXX_unrecognized != nil { + i -= len(m.XXX_unrecognized) + copy(dAtA[i:], m.XXX_unrecognized) + } + if len(m.ObsAddrs) > 0 { + for iNdEx := len(m.ObsAddrs) - 1; iNdEx >= 0; iNdEx-- { + i -= len(m.ObsAddrs[iNdEx]) + copy(dAtA[i:], m.ObsAddrs[iNdEx]) + i = encodeVarintHolepunch(dAtA, i, uint64(len(m.ObsAddrs[iNdEx]))) + i-- + dAtA[i] = 0x12 + } + } + if m.Type != nil { + i = encodeVarintHolepunch(dAtA, i, uint64(*m.Type)) + i-- + dAtA[i] = 0x8 + } + return len(dAtA) - i, nil +} + +func encodeVarintHolepunch(dAtA []byte, offset int, v uint64) int { + offset -= sovHolepunch(v) + base := offset + for v >= 1<<7 { + dAtA[offset] = uint8(v&0x7f | 0x80) + v >>= 7 + offset++ + } + dAtA[offset] = uint8(v) + return base +} +func (m *HolePunch) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if m.Type != nil { + n += 1 + sovHolepunch(uint64(*m.Type)) + } + if len(m.ObsAddrs) > 0 { + for _, b := range m.ObsAddrs { + l = len(b) + n += 1 + l + sovHolepunch(uint64(l)) + } + } + if m.XXX_unrecognized != nil { + n += len(m.XXX_unrecognized) + } + return n +} + +func sovHolepunch(x uint64) (n int) { + return (math_bits.Len64(x|1) + 6) / 7 +} +func sozHolepunch(x uint64) (n int) { + return sovHolepunch(uint64((x << 1) ^ uint64((int64(x) >> 63)))) +} +func (m *HolePunch) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowHolepunch + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: HolePunch: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: HolePunch: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Type", wireType) + } + var v HolePunch_Type + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowHolepunch + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + v |= HolePunch_Type(b&0x7F) << shift + if b < 0x80 { + break + } + } + m.Type = &v + case 2: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field ObsAddrs", wireType) + } + var byteLen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowHolepunch + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + byteLen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if byteLen < 0 { + return ErrInvalidLengthHolepunch + } + postIndex := iNdEx + byteLen + if postIndex < 0 { + return ErrInvalidLengthHolepunch + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.ObsAddrs = append(m.ObsAddrs, make([]byte, postIndex-iNdEx)) + copy(m.ObsAddrs[len(m.ObsAddrs)-1], dAtA[iNdEx:postIndex]) + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipHolepunch(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthHolepunch + } + if (iNdEx + skippy) < 0 { + return ErrInvalidLengthHolepunch + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + m.XXX_unrecognized = append(m.XXX_unrecognized, dAtA[iNdEx:iNdEx+skippy]...) + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func skipHolepunch(dAtA []byte) (n int, err error) { + l := len(dAtA) + iNdEx := 0 + depth := 0 + for iNdEx < l { + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowHolepunch + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + wireType := int(wire & 0x7) + switch wireType { + case 0: + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowHolepunch + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + iNdEx++ + if dAtA[iNdEx-1] < 0x80 { + break + } + } + case 1: + iNdEx += 8 + case 2: + var length int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowHolepunch + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + length |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if length < 0 { + return 0, ErrInvalidLengthHolepunch + } + iNdEx += length + case 3: + depth++ + case 4: + if depth == 0 { + return 0, ErrUnexpectedEndOfGroupHolepunch + } + depth-- + case 5: + iNdEx += 4 + default: + return 0, fmt.Errorf("proto: illegal wireType %d", wireType) + } + if iNdEx < 0 { + return 0, ErrInvalidLengthHolepunch + } + if depth == 0 { + return iNdEx, nil + } + } + return 0, io.ErrUnexpectedEOF +} + +var ( + ErrInvalidLengthHolepunch = fmt.Errorf("proto: negative length found during unmarshaling") + ErrIntOverflowHolepunch = fmt.Errorf("proto: integer overflow") + ErrUnexpectedEndOfGroupHolepunch = fmt.Errorf("proto: unexpected end of group") +) diff --git a/p2p/protocol/holepunch/pb/holepunch.proto b/p2p/protocol/holepunch/pb/holepunch.proto new file mode 100644 index 0000000000..5e9cfab0cb --- /dev/null +++ b/p2p/protocol/holepunch/pb/holepunch.proto @@ -0,0 +1,19 @@ +syntax = "proto2"; + +package holepunch.pb; + +message HolePunch { + enum Type { + CONNECT = 100; + SYNC = 300; + } + + optional Type type=1; + + // For hole punching, we'll send some additional observed addresses to the remote peer + // that could have been filtered by the Host address factory (for example: AutoRelay removes all public addresses if peer has private reachability). + // This is a hack! + // We plan to have a better address discovery and advertisement mechanism in the future. + // See https://github.com/libp2p/go-libp2p-autonat/pull/98 + repeated bytes ObsAddrs = 2; +} diff --git a/p2p/protocol/holepunch/tracer.go b/p2p/protocol/holepunch/tracer.go new file mode 100644 index 0000000000..c37610068a --- /dev/null +++ b/p2p/protocol/holepunch/tracer.go @@ -0,0 +1,181 @@ +package holepunch + +import ( + "time" + + "github.com/libp2p/go-libp2p-core/peer" + + ma "github.com/multiformats/go-multiaddr" +) + +// WithTracer is a HolePunchService option that enables hole punching tracing +func WithTracer(tr EventTracer) Option { + return func(hps *HolePunchService) error { + hps.tracer = &Tracer{tr: tr, self: hps.host.ID()} + return nil + } +} + +type Tracer struct { + tr EventTracer + self peer.ID +} + +type EventTracer interface { + Trace(evt *Event) +} + +type Event struct { + Timestamp int64 // UNIX nanos + Peer peer.ID // local peer ID + Remote peer.ID // remote peer ID + Type string // event type + Evt interface{} // the actual event +} + +// Event Types +const ( + DirectDialEvtT = "DirectDial" + ProtocolErrorEvtT = "ProtocolError" + StartHolePunchEvtT = "StartHolePunch" + EndHolePunchEvtT = "EndHolePunch" + HolePunchAttemptEvtT = "HolePunchAttempt" +) + +// Event Objects +type DirectDialEvt struct { + Success bool + EllapsedTime time.Duration + Error string `json:",omitempty"` +} + +type ProtocolErrorEvt struct { + Error string +} + +type StartHolePunchEvt struct { + RemoteAddrs []string + RTT time.Duration +} + +type EndHolePunchEvt struct { + Success bool + EllapsedTime time.Duration + Error string `json:",omitempty"` +} + +type HolePunchAttemptEvt struct { + Attempt int +} + +// Tracer interface +func (t *Tracer) DirectDialSuccessful(p peer.ID, dt time.Duration) { + if t == nil { + return + } + + t.tr.Trace(&Event{ + Timestamp: time.Now().UnixNano(), + Peer: t.self, + Remote: p, + Type: DirectDialEvtT, + Evt: &DirectDialEvt{ + Success: true, + EllapsedTime: dt, + }, + }) +} + +func (t *Tracer) DirectDialFailed(p peer.ID, dt time.Duration, err error) { + if t == nil { + return + } + + t.tr.Trace(&Event{ + Timestamp: time.Now().UnixNano(), + Peer: t.self, + Remote: p, + Type: DirectDialEvtT, + Evt: &DirectDialEvt{ + Success: false, + EllapsedTime: dt, + Error: err.Error(), + }, + }) +} + +func (t *Tracer) ProtocolError(p peer.ID, err error) { + if t == nil { + return + } + + t.tr.Trace(&Event{ + Timestamp: time.Now().UnixNano(), + Peer: t.self, + Remote: p, + Type: ProtocolErrorEvtT, + Evt: &ProtocolErrorEvt{ + Error: err.Error(), + }, + }) +} + +func (t *Tracer) StartHolePunch(p peer.ID, obsAddrs []ma.Multiaddr, rtt time.Duration) { + if t == nil { + return + } + + addrs := make([]string, 0, len(obsAddrs)) + for _, a := range obsAddrs { + addrs = append(addrs, a.String()) + } + + t.tr.Trace(&Event{ + Timestamp: time.Now().UnixNano(), + Peer: t.self, + Remote: p, + Type: StartHolePunchEvtT, + Evt: &StartHolePunchEvt{ + RemoteAddrs: addrs, + RTT: rtt, + }, + }) +} + +func (t *Tracer) EndHolePunch(p peer.ID, dt time.Duration, err error) { + if t == nil { + return + } + + evt := &EndHolePunchEvt{ + Success: err == nil, + EllapsedTime: dt, + } + if err != nil { + evt.Error = err.Error() + } + + t.tr.Trace(&Event{ + Timestamp: time.Now().UnixNano(), + Peer: t.self, + Remote: p, + Type: EndHolePunchEvtT, + Evt: evt, + }) +} + +func (t *Tracer) HolePunchAttempt(p peer.ID, attempt int) { + if t == nil { + return + } + + t.tr.Trace(&Event{ + Timestamp: time.Now().UnixNano(), + Peer: t.self, + Remote: p, + Type: HolePunchAttemptEvtT, + Evt: &HolePunchAttemptEvt{ + Attempt: attempt, + }, + }) +} diff --git a/p2p/protocol/identify/id.go b/p2p/protocol/identify/id.go index 9970112be6..224688cf91 100644 --- a/p2p/protocol/identify/id.go +++ b/p2p/protocol/identify/id.go @@ -357,7 +357,7 @@ func (ids *IDService) identifyConn(c network.Conn, signal chan struct{}) { } }() - s, err = c.NewStream(context.TODO()) + s, err = c.NewStream(network.WithUseTransient(context.TODO(), "identify")) if err != nil { log.Debugw("error opening identify stream", "error", err) // the connection is probably already closed if we hit this. diff --git a/p2p/protocol/identify/obsaddr.go b/p2p/protocol/identify/obsaddr.go index 279800ab01..2ea645dc0a 100644 --- a/p2p/protocol/identify/obsaddr.go +++ b/p2p/protocol/identify/obsaddr.go @@ -200,6 +200,21 @@ func (oas *ObservedAddrManager) filter(observedAddrs []*observedAddr) []ma.Multi } } + // For certain use cases such as hole punching, it's better to advertise even unactivated observed addresses rather than none at all + // because we don't want to wait for a hole-punch till we make enough connections with other peers to discover our activated addresses. + // If we have activated addresses, we will use them, otherwise, let's use whatever observed addresses we do have. + if len(pmap) == 0 { + for i := range observedAddrs { + a := observedAddrs[i] + if now.Sub(a.lastSeen) <= oas.ttl { + // group addresses by their IPX/Transport Protocol(TCP or UDP) pattern. + pat := a.groupKey() + pmap[pat] = append(pmap[pat], a) + + } + } + } + addrs := make([]ma.Multiaddr, 0, len(observedAddrs)) for pat := range pmap { s := pmap[pat]