From ecf20f540b25905dc0a5b1505f72238de704bba2 Mon Sep 17 00:00:00 2001 From: Gus Eggert Date: Mon, 1 May 2023 15:29:34 -0400 Subject: [PATCH] fix: use default HTTP routers when FullRT DHT client is used (#9841) --- cmd/ipfs/daemon.go | 14 ++------- core/node/groups.go | 2 +- core/node/libp2p/routing.go | 47 ++++++++++++++++++----------- core/node/libp2p/routingopt.go | 55 ++++++++++++++++++++-------------- 4 files changed, 65 insertions(+), 53 deletions(-) diff --git a/cmd/ipfs/daemon.go b/cmd/ipfs/daemon.go index 52addcc072c..2964716a3b2 100644 --- a/cmd/ipfs/daemon.go +++ b/cmd/ipfs/daemon.go @@ -413,19 +413,9 @@ func daemonFunc(req *cmds.Request, re cmds.ResponseEmitter, env cmds.Environment case routingOptionSupernodeKwd: return errors.New("supernode routing was never fully implemented and has been removed") case routingOptionDefaultKwd, routingOptionAutoKwd: - ncfg.Routing = libp2p.ConstructDefaultRouting( - cfg.Identity.PeerID, - cfg.Addresses.Swarm, - cfg.Identity.PrivKey, - libp2p.DHTOption, - ) + ncfg.Routing = libp2p.ConstructDefaultRouting(cfg, libp2p.DHTOption) case routingOptionAutoClientKwd: - ncfg.Routing = libp2p.ConstructDefaultRouting( - cfg.Identity.PeerID, - cfg.Addresses.Swarm, - cfg.Identity.PrivKey, - libp2p.DHTClientOption, - ) + ncfg.Routing = libp2p.ConstructDefaultRouting(cfg, libp2p.DHTClientOption) case routingOptionDHTClientKwd: ncfg.Routing = libp2p.DHTClientOption case routingOptionDHTKwd: diff --git a/core/node/groups.go b/core/node/groups.go index 866204ce36b..086d71d1473 100644 --- a/core/node/groups.go +++ b/core/node/groups.go @@ -165,7 +165,7 @@ func LibP2P(bcfg *BuildCfg, cfg *config.Config, userResourceOverrides rcmgr.Part fx.Provide(libp2p.Routing), fx.Provide(libp2p.ContentRouting), - fx.Provide(libp2p.BaseRouting(cfg.Experimental.AcceleratedDHTClient)), + fx.Provide(libp2p.BaseRouting(cfg)), maybeProvide(libp2p.PubsubRouter, bcfg.getOpt("ipnsps")), maybeProvide(libp2p.BandwidthCounter, !cfg.Swarm.DisableBandwidthMetrics), diff --git a/core/node/libp2p/routing.go b/core/node/libp2p/routing.go index 8014d4142ac..0fc138cadfb 100644 --- a/core/node/libp2p/routing.go +++ b/core/node/libp2p/routing.go @@ -63,35 +63,34 @@ type processInitialRoutingOut struct { type AddrInfoChan chan peer.AddrInfo -func BaseRouting(experimentalDHTClient bool) interface{} { +func BaseRouting(cfg *config.Config) interface{} { return func(lc fx.Lifecycle, in processInitialRoutingIn) (out processInitialRoutingOut, err error) { - var dr *ddht.DHT + var dualDHT *ddht.DHT if dht, ok := in.Router.(*ddht.DHT); ok { - dr = dht + dualDHT = dht lc.Append(fx.Hook{ OnStop: func(ctx context.Context) error { - return dr.Close() + return dualDHT.Close() }, }) } - if pr, ok := in.Router.(routinghelpers.ComposableRouter); ok { - for _, r := range pr.Routers() { + if cr, ok := in.Router.(routinghelpers.ComposableRouter); ok { + for _, r := range cr.Routers() { if dht, ok := r.(*ddht.DHT); ok { - dr = dht + dualDHT = dht lc.Append(fx.Hook{ OnStop: func(ctx context.Context) error { - return dr.Close() + return dualDHT.Close() }, }) - break } } } - if dr != nil && experimentalDHTClient { + if dualDHT != nil && cfg.Experimental.AcceleratedDHTClient { cfg, err := in.Repo.Config() if err != nil { return out, err @@ -101,7 +100,7 @@ func BaseRouting(experimentalDHTClient bool) interface{} { return out, err } - expClient, err := fullrt.NewFullRT(in.Host, + fullRTClient, err := fullrt.NewFullRT(in.Host, dht.DefaultPrefix, fullrt.DHTOption( dht.Validator(in.Validator), @@ -116,18 +115,30 @@ func BaseRouting(experimentalDHTClient bool) interface{} { lc.Append(fx.Hook{ OnStop: func(ctx context.Context) error { - return expClient.Close() + return fullRTClient.Close() }, }) + // we want to also use the default HTTP routers, so wrap the FullRT client + // in a parallel router that calls them in parallel + httpRouters, err := constructDefaultHTTPRouters(cfg) + if err != nil { + return out, err + } + routers := []*routinghelpers.ParallelRouter{ + {Router: fullRTClient}, + } + routers = append(routers, httpRouters...) + router := routinghelpers.NewComposableParallel(routers) + return processInitialRoutingOut{ Router: Router{ - Routing: expClient, Priority: 1000, + Routing: router, }, - DHT: dr, - DHTClient: expClient, - ContentRouter: expClient, + DHT: dualDHT, + DHTClient: fullRTClient, + ContentRouter: fullRTClient, }, nil } @@ -136,8 +147,8 @@ func BaseRouting(experimentalDHTClient bool) interface{} { Priority: 1000, Routing: in.Router, }, - DHT: dr, - DHTClient: dr, + DHT: dualDHT, + DHTClient: dualDHT, ContentRouter: in.Router, }, nil } diff --git a/core/node/libp2p/routingopt.go b/core/node/libp2p/routingopt.go index 8a69e181b66..62d3320f831 100644 --- a/core/node/libp2p/routingopt.go +++ b/core/node/libp2p/routingopt.go @@ -39,8 +39,35 @@ func init() { } } +func constructDefaultHTTPRouters(cfg *config.Config) ([]*routinghelpers.ParallelRouter, error) { + var routers []*routinghelpers.ParallelRouter + // Append HTTP routers for additional speed + for _, endpoint := range defaultHTTPRouters { + httpRouter, err := irouting.ConstructHTTPRouter(endpoint, cfg.Identity.PeerID, cfg.Addresses.Swarm, cfg.Identity.PrivKey) + if err != nil { + return nil, err + } + + r := &irouting.Composer{ + GetValueRouter: routinghelpers.Null{}, + PutValueRouter: routinghelpers.Null{}, + ProvideRouter: routinghelpers.Null{}, // modify this when indexers supports provide + FindPeersRouter: routinghelpers.Null{}, + FindProvidersRouter: httpRouter, + } + + routers = append(routers, &routinghelpers.ParallelRouter{ + Router: r, + IgnoreError: true, // https://github.com/ipfs/kubo/pull/9475#discussion_r1042507387 + Timeout: 15 * time.Second, // 5x server value from https://github.com/ipfs/kubo/pull/9475#discussion_r1042428529 + ExecuteAfter: 0, + }) + } + return routers, nil +} + // ConstructDefaultRouting returns routers used when Routing.Type is unset or set to "auto" -func ConstructDefaultRouting(peerID string, addrs []string, privKey string, routingOpt RoutingOption) func( +func ConstructDefaultRouting(cfg *config.Config, routingOpt RoutingOption) func( ctx context.Context, host host.Host, dstore datastore.Batching, @@ -68,29 +95,13 @@ func ConstructDefaultRouting(peerID string, addrs []string, privKey string, rout ExecuteAfter: 0, }) - // Append HTTP routers for additional speed - for _, endpoint := range defaultHTTPRouters { - httpRouter, err := irouting.ConstructHTTPRouter(endpoint, peerID, addrs, privKey) - if err != nil { - return nil, err - } - - r := &irouting.Composer{ - GetValueRouter: routinghelpers.Null{}, - PutValueRouter: routinghelpers.Null{}, - ProvideRouter: routinghelpers.Null{}, // modify this when indexers supports provide - FindPeersRouter: routinghelpers.Null{}, - FindProvidersRouter: httpRouter, - } - - routers = append(routers, &routinghelpers.ParallelRouter{ - Router: r, - IgnoreError: true, // https://github.com/ipfs/kubo/pull/9475#discussion_r1042507387 - Timeout: 15 * time.Second, // 5x server value from https://github.com/ipfs/kubo/pull/9475#discussion_r1042428529 - ExecuteAfter: 0, - }) + httpRouters, err := constructDefaultHTTPRouters(cfg) + if err != nil { + return nil, err } + routers = append(routers, httpRouters...) + routing := routinghelpers.NewComposableParallel(routers) return routing, nil }