Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Experimental dht client #8061

Merged
merged 3 commits into from
May 14, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions core/commands/commands_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,7 @@ func TestCommands(t *testing.T) {
"/stats/bitswap",
"/stats/bw",
"/stats/dht",
"/stats/provide",
"/stats/repo",
"/swarm",
"/swarm/addrs",
Expand Down
56 changes: 34 additions & 22 deletions core/commands/dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,12 @@ const (
dhtVerboseOptionName = "verbose"
)

// kademlia extends the routing interface with a command to get the peers closest to the target
type kademlia interface {
routing.Routing
GetClosestPeers(ctx context.Context, key string) ([]peer.ID, error)
}

var queryDhtCmd = &cmds.Command{
Helptext: cmds.HelpText{
Tagline: "Find the closest Peer IDs to a given Peer ID by querying the DHT.",
Expand All @@ -63,7 +69,7 @@ var queryDhtCmd = &cmds.Command{
return err
}

if nd.DHT == nil {
if nd.DHTClient == nil {
return ErrNotDHT
}

Expand All @@ -73,40 +79,46 @@ var queryDhtCmd = &cmds.Command{
}

ctx, cancel := context.WithCancel(req.Context)
defer cancel()
ctx, events := routing.RegisterForQueryEvents(ctx)

dht := nd.DHT.WAN
if !nd.DHT.WANActive() {
dht = nd.DHT.LAN
client := nd.DHTClient
if client == nd.DHT {
client = nd.DHT.WAN
if !nd.DHT.WANActive() {
client = nd.DHT.LAN
}
}

errCh := make(chan error, 1)
go func() {
defer close(errCh)
defer cancel()
closestPeers, err := dht.GetClosestPeers(ctx, string(id))
if closestPeers != nil {
for p := range closestPeers {
if d, ok := client.(kademlia); !ok {
return fmt.Errorf("dht client does not support GetClosestPeers")
} else {
errCh := make(chan error, 1)
go func() {
defer close(errCh)
defer cancel()
closestPeers, err := d.GetClosestPeers(ctx, string(id))
for _, p := range closestPeers {
routing.PublishQueryEvent(ctx, &routing.QueryEvent{
ID: p,
Type: routing.FinalPeer,
})
}
}

if err != nil {
errCh <- err
return
}
}()
if err != nil {
errCh <- err
return
}
}()

for e := range events {
if err := res.Emit(e); err != nil {
return err
for e := range events {
if err := res.Emit(e); err != nil {
return err
}
}
}

return <-errCh
return <-errCh
}
},
Encoders: cmds.EncoderMap{
cmds.Text: cmds.MakeTypedEncoder(func(req *cmds.Request, w io.Writer, out *routing.QueryEvent) error {
Expand Down
1 change: 1 addition & 0 deletions core/commands/stat.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ for your IPFS node.`,
"repo": repoStatCmd,
"bitswap": bitswapStatCmd,
"dht": statDhtCmd,
"provide": statProvideCmd,
},
}

Expand Down
54 changes: 53 additions & 1 deletion core/commands/stat_dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/libp2p/go-libp2p-core/network"
pstore "github.com/libp2p/go-libp2p-core/peerstore"
dht "github.com/libp2p/go-libp2p-kad-dht"
"github.com/libp2p/go-libp2p-kad-dht/fullrt"
kbucket "github.com/libp2p/go-libp2p-kbucket"
)

Expand Down Expand Up @@ -43,7 +44,8 @@ This interface is not stable and may change from release to release.
`,
},
Arguments: []cmds.Argument{
cmds.StringArg("dht", false, true, "The DHT whose table should be listed (wan or lan). Defaults to both."),
cmds.StringArg("dht", false, true, "The DHT whose table should be listed (wanserver, lanserver, wan, lan). "+
"wan and lan refer to client routing tables. When using the experimental DHT client only WAN is supported. Defaults to wan and lan."),
Comment on lines +47 to +48
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What are wanserver/lanserver and are they new? Can the description explain 🙏

},
Options: []cmds.Option{},
Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error {
Expand All @@ -67,12 +69,62 @@ This interface is not stable and may change from release to release.
dhts = []string{"wan", "lan"}
}

dhttypeloop:
for _, name := range dhts {
var dht *dht.IpfsDHT

var separateClient bool
if nd.DHTClient != nd.DHT {
separateClient = true
}

switch name {
case "wan":
if separateClient {
client, ok := nd.DHTClient.(*fullrt.FullRT)
if !ok {
return cmds.Errorf(cmds.ErrClient, "could not generate stats for the WAN DHT client type")
}
peerMap := client.Stat()
buckets := make([]dhtBucket, 1)
aschmahmann marked this conversation as resolved.
Show resolved Hide resolved
b := &dhtBucket{}
for _, p := range peerMap {
info := dhtPeerInfo{ID: p.String()}

if ver, err := nd.Peerstore.Get(p, "AgentVersion"); err == nil {
info.AgentVersion, _ = ver.(string)
} else if err == pstore.ErrNotFound {
// ignore
} else {
// this is a bug, usually.
log.Errorw(
"failed to get agent version from peerstore",
"error", err,
)
}

info.Connected = nd.PeerHost.Network().Connectedness(p) == network.Connected
b.Peers = append(b.Peers, info)
}
buckets[0] = *b

if err := res.Emit(dhtStat{
Name: name,
Buckets: buckets,
}); err != nil {
return err
}
continue dhttypeloop
}
fallthrough
case "wanserver":
dht = nd.DHT.WAN
case "lan":
if separateClient {
return cmds.Errorf(cmds.ErrClient, "no LAN client found")
}
fallthrough
case "lanserver":
dht = nd.DHT.LAN
default:
return cmds.Errorf(cmds.ErrClient, "unknown dht type: %s", name)
Expand Down
51 changes: 51 additions & 0 deletions core/commands/stat_provide.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package commands

import (
"fmt"

cmds "github.com/ipfs/go-ipfs-cmds"
"github.com/ipfs/go-ipfs/core/commands/cmdenv"

"github.com/ipfs/go-ipfs-provider/batched"
)

var statProvideCmd = &cmds.Command{
Helptext: cmds.HelpText{
Tagline: "Returns statistics about the node's (re)provider system.",
ShortDescription: `
Returns statistics about the content the node is advertising.

This interface is not stable and may change from release to release.
`,
},
Arguments: []cmds.Argument{},
Options: []cmds.Option{},
Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error {
nd, err := cmdenv.GetNode(env)
if err != nil {
return err
}

if !nd.IsOnline {
return ErrNotOnline
}

sys, ok := nd.Provider.(*batched.BatchProvidingSystem)
if !ok {
return fmt.Errorf("can only return stats if Experimental.AcceleratedDHTClient is enabled")
}

stats, err := sys.Stat(req.Context)
if err != nil {
return err
}

if err := res.Emit(stats); err != nil {
return err
}

return nil
},
Encoders: cmds.EncoderMap{},
Type: batched.BatchedProviderStats{},
}
7 changes: 5 additions & 2 deletions core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,11 @@ type IpfsNode struct {

PubSub *pubsub.PubSub `optional:"true"`
PSRouter *psrouter.PubsubValueStore `optional:"true"`
DHT *ddht.DHT `optional:"true"`
P2P *p2p.P2P `optional:"true"`

DHT *ddht.DHT `optional:"true"`
DHTClient routing.Routing `name:"dhtc" optional:"true"`

P2P *p2p.P2P `optional:"true"`

Process goprocess.Process
ctx context.Context
Expand Down
6 changes: 3 additions & 3 deletions core/node/groups.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ func LibP2P(bcfg *BuildCfg, cfg *config.Config) fx.Option {
fx.Provide(libp2p.Security(!bcfg.DisableEncryptedConnections, cfg.Swarm.Transports)),

fx.Provide(libp2p.Routing),
fx.Provide(libp2p.BaseRouting),
fx.Provide(libp2p.BaseRouting(cfg.Experimental.AcceleratedDHTClient)),
maybeProvide(libp2p.PubsubRouter, bcfg.getOpt("ipnsps")),

maybeProvide(libp2p.BandwidthCounter, !cfg.Swarm.DisableBandwidthMetrics),
Expand Down Expand Up @@ -275,7 +275,7 @@ func Online(bcfg *BuildCfg, cfg *config.Config) fx.Option {
fx.Provide(p2p.New),

LibP2P(bcfg, cfg),
OnlineProviders(cfg.Experimental.StrategicProviding, cfg.Reprovider.Strategy, cfg.Reprovider.Interval),
OnlineProviders(cfg.Experimental.StrategicProviding, cfg.Experimental.AcceleratedDHTClient, cfg.Reprovider.Strategy, cfg.Reprovider.Interval),
)
}

Expand All @@ -286,7 +286,7 @@ func Offline(cfg *config.Config) fx.Option {
fx.Provide(DNSResolver),
fx.Provide(Namesys(0)),
fx.Provide(offroute.NewOfflineRouter),
OfflineProviders(cfg.Experimental.StrategicProviding, cfg.Reprovider.Strategy, cfg.Reprovider.Interval),
OfflineProviders(cfg.Experimental.StrategicProviding, cfg.Experimental.AcceleratedDHTClient, cfg.Reprovider.Strategy, cfg.Reprovider.Interval),
)
}

Expand Down
2 changes: 1 addition & 1 deletion core/node/libp2p/host.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ type P2PHostOut struct {
fx.Out

Host host.Host
Routing BaseIpfsRouting
Routing routing.Routing `name:"initialrouting"`
}

func Host(mctx helpers.MetricsCtx, lc fx.Lifecycle, params P2PHostIn) (out P2PHostOut, err error) {
Expand Down
Loading