Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: use default HTTP routers when FullRT DHT client is used #9841

Merged
merged 1 commit into from
May 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 2 additions & 12 deletions cmd/ipfs/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -413,19 +413,9 @@ func daemonFunc(req *cmds.Request, re cmds.ResponseEmitter, env cmds.Environment
case routingOptionSupernodeKwd:
return errors.New("supernode routing was never fully implemented and has been removed")
case routingOptionDefaultKwd, routingOptionAutoKwd:
ncfg.Routing = libp2p.ConstructDefaultRouting(
cfg.Identity.PeerID,
cfg.Addresses.Swarm,
cfg.Identity.PrivKey,
libp2p.DHTOption,
)
ncfg.Routing = libp2p.ConstructDefaultRouting(cfg, libp2p.DHTOption)
case routingOptionAutoClientKwd:
ncfg.Routing = libp2p.ConstructDefaultRouting(
cfg.Identity.PeerID,
cfg.Addresses.Swarm,
cfg.Identity.PrivKey,
libp2p.DHTClientOption,
)
ncfg.Routing = libp2p.ConstructDefaultRouting(cfg, libp2p.DHTClientOption)
case routingOptionDHTClientKwd:
ncfg.Routing = libp2p.DHTClientOption
case routingOptionDHTKwd:
Expand Down
2 changes: 1 addition & 1 deletion core/node/groups.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ func LibP2P(bcfg *BuildCfg, cfg *config.Config, userResourceOverrides rcmgr.Part
fx.Provide(libp2p.Routing),
fx.Provide(libp2p.ContentRouting),

fx.Provide(libp2p.BaseRouting(cfg.Experimental.AcceleratedDHTClient)),
fx.Provide(libp2p.BaseRouting(cfg)),
maybeProvide(libp2p.PubsubRouter, bcfg.getOpt("ipnsps")),

maybeProvide(libp2p.BandwidthCounter, !cfg.Swarm.DisableBandwidthMetrics),
Expand Down
47 changes: 29 additions & 18 deletions core/node/libp2p/routing.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,35 +63,34 @@ type processInitialRoutingOut struct {

type AddrInfoChan chan peer.AddrInfo

func BaseRouting(experimentalDHTClient bool) interface{} {
func BaseRouting(cfg *config.Config) interface{} {
return func(lc fx.Lifecycle, in processInitialRoutingIn) (out processInitialRoutingOut, err error) {
var dr *ddht.DHT
var dualDHT *ddht.DHT
if dht, ok := in.Router.(*ddht.DHT); ok {
dr = dht
dualDHT = dht

lc.Append(fx.Hook{
OnStop: func(ctx context.Context) error {
return dr.Close()
return dualDHT.Close()
},
})
}

if pr, ok := in.Router.(routinghelpers.ComposableRouter); ok {
for _, r := range pr.Routers() {
if cr, ok := in.Router.(routinghelpers.ComposableRouter); ok {
for _, r := range cr.Routers() {
if dht, ok := r.(*ddht.DHT); ok {
dr = dht
dualDHT = dht
lc.Append(fx.Hook{
OnStop: func(ctx context.Context) error {
return dr.Close()
return dualDHT.Close()
},
})

break
}
}
}

if dr != nil && experimentalDHTClient {
if dualDHT != nil && cfg.Experimental.AcceleratedDHTClient {
cfg, err := in.Repo.Config()
if err != nil {
return out, err
Expand All @@ -101,7 +100,7 @@ func BaseRouting(experimentalDHTClient bool) interface{} {
return out, err
}

expClient, err := fullrt.NewFullRT(in.Host,
fullRTClient, err := fullrt.NewFullRT(in.Host,
dht.DefaultPrefix,
fullrt.DHTOption(
dht.Validator(in.Validator),
Expand All @@ -116,18 +115,30 @@ func BaseRouting(experimentalDHTClient bool) interface{} {

lc.Append(fx.Hook{
OnStop: func(ctx context.Context) error {
return expClient.Close()
return fullRTClient.Close()
},
})

// we want to also use the default HTTP routers, so wrap the FullRT client
// in a parallel router that calls them in parallel
httpRouters, err := constructDefaultHTTPRouters(cfg)
if err != nil {
return out, err
}
routers := []*routinghelpers.ParallelRouter{
{Router: fullRTClient},
}
routers = append(routers, httpRouters...)
router := routinghelpers.NewComposableParallel(routers)

return processInitialRoutingOut{
Router: Router{
Routing: expClient,
Priority: 1000,
Routing: router,
},
DHT: dr,
DHTClient: expClient,
ContentRouter: expClient,
DHT: dualDHT,
DHTClient: fullRTClient,
ContentRouter: fullRTClient,
}, nil
}

Expand All @@ -136,8 +147,8 @@ func BaseRouting(experimentalDHTClient bool) interface{} {
Priority: 1000,
Routing: in.Router,
},
DHT: dr,
DHTClient: dr,
DHT: dualDHT,
DHTClient: dualDHT,
ContentRouter: in.Router,
}, nil
}
Expand Down
55 changes: 33 additions & 22 deletions core/node/libp2p/routingopt.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,35 @@ func init() {
}
}

func constructDefaultHTTPRouters(cfg *config.Config) ([]*routinghelpers.ParallelRouter, error) {
var routers []*routinghelpers.ParallelRouter
// Append HTTP routers for additional speed
for _, endpoint := range defaultHTTPRouters {
httpRouter, err := irouting.ConstructHTTPRouter(endpoint, cfg.Identity.PeerID, cfg.Addresses.Swarm, cfg.Identity.PrivKey)
if err != nil {
return nil, err
}

r := &irouting.Composer{
GetValueRouter: routinghelpers.Null{},
PutValueRouter: routinghelpers.Null{},
ProvideRouter: routinghelpers.Null{}, // modify this when indexers supports provide
FindPeersRouter: routinghelpers.Null{},
FindProvidersRouter: httpRouter,
}

routers = append(routers, &routinghelpers.ParallelRouter{
Router: r,
IgnoreError: true, // https://github.com/ipfs/kubo/pull/9475#discussion_r1042507387
Timeout: 15 * time.Second, // 5x server value from https://github.com/ipfs/kubo/pull/9475#discussion_r1042428529
ExecuteAfter: 0,
})
}
return routers, nil
}

// ConstructDefaultRouting returns routers used when Routing.Type is unset or set to "auto"
func ConstructDefaultRouting(peerID string, addrs []string, privKey string, routingOpt RoutingOption) RoutingOption {
func ConstructDefaultRouting(cfg *config.Config, routingOpt RoutingOption) RoutingOption {
return func(args RoutingOptionArgs) (routing.Routing, error) {
// Defined routers will be queried in parallel (optimizing for response speed)
// Different trade-offs can be made by setting Routing.Type = "custom" with own Routing.Routers
Expand All @@ -60,29 +87,13 @@ func ConstructDefaultRouting(peerID string, addrs []string, privKey string, rout
ExecuteAfter: 0,
})

// Append HTTP routers for additional speed
for _, endpoint := range defaultHTTPRouters {
httpRouter, err := irouting.ConstructHTTPRouter(endpoint, peerID, addrs, privKey)
if err != nil {
return nil, err
}

r := &irouting.Composer{
GetValueRouter: routinghelpers.Null{},
PutValueRouter: routinghelpers.Null{},
ProvideRouter: routinghelpers.Null{}, // modify this when indexers supports provide
FindPeersRouter: routinghelpers.Null{},
FindProvidersRouter: httpRouter,
}

routers = append(routers, &routinghelpers.ParallelRouter{
Router: r,
IgnoreError: true, // https://github.com/ipfs/kubo/pull/9475#discussion_r1042507387
Timeout: 15 * time.Second, // 5x server value from https://github.com/ipfs/kubo/pull/9475#discussion_r1042428529
ExecuteAfter: 0,
})
httpRouters, err := constructDefaultHTTPRouters(cfg)
if err != nil {
return nil, err
}

routers = append(routers, httpRouters...)

routing := routinghelpers.NewComposableParallel(routers)
return routing, nil
}
Expand Down