Skip to content

Commit

Permalink
Merge pull request ipfs#10 from libp2p/feat/client-mode
Browse files Browse the repository at this point in the history
Add support for a 'client mode' dht
  • Loading branch information
whyrusleeping committed Sep 26, 2016
2 parents d09b2e7 + d90061d commit 2f5770b
Show file tree
Hide file tree
Showing 4 changed files with 113 additions and 35 deletions.
57 changes: 41 additions & 16 deletions dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,49 +61,74 @@ type IpfsDHT struct {
ctx context.Context
proc goprocess.Process

clientOnly bool

strmap map[peer.ID]*messageSender
smlk sync.Mutex
}

// NewDHT creates a new DHT object with the given peer as the 'local' host
func NewDHT(ctx context.Context, h host.Host, dstore ds.Batching) *IpfsDHT {
dht := new(IpfsDHT)
dht.datastore = dstore
dht.self = h.ID()
dht.peerstore = h.Peerstore()
dht.host = h
dht := makeDHT(ctx, h, dstore)

// register for network notifs.
dht.host.Network().Notify((*netNotifiee)(dht))

dht.proc = goprocess.WithTeardown(func() error {
dht.proc = goprocessctx.WithContextAndTeardown(ctx, func() error {
// remove ourselves from network notifs.
dht.host.Network().StopNotify((*netNotifiee)(dht))
return nil
})

dht.strmap = make(map[peer.ID]*messageSender)
dht.ctx = ctx
dht.proc.AddChild(dht.providers.Process())

h.SetStreamHandler(ProtocolDHT, dht.handleNewStream)
h.SetStreamHandler(ProtocolDHTOld, dht.handleNewStream)

dht.providers = providers.NewProviderManager(dht.ctx, dht.self, dstore)
dht.proc.AddChild(dht.providers.Process())
goprocessctx.CloseAfterContext(dht.proc, ctx)
dht.Validator["pk"] = record.PublicKeyValidator
dht.Selector["pk"] = record.PublicKeySelector

dht.routingTable = kb.NewRoutingTable(20, kb.ConvertPeerID(dht.self), time.Minute, dht.peerstore)
dht.birth = time.Now()
return dht
}

dht.Validator = make(record.Validator)
dht.Validator["pk"] = record.PublicKeyValidator
// NewDHTClient creates a new DHT object with the given peer as the 'local' host
func NewDHTClient(ctx context.Context, h host.Host, dstore ds.Batching) *IpfsDHT {
dht := makeDHT(ctx, h, dstore)

// register for network notifs.
dht.host.Network().Notify((*netNotifiee)(dht))

dht.Selector = make(record.Selector)
dht.proc = goprocessctx.WithContextAndTeardown(ctx, func() error {
// remove ourselves from network notifs.
dht.host.Network().StopNotify((*netNotifiee)(dht))
return nil
})

dht.proc.AddChild(dht.providers.Process())

dht.Validator["pk"] = record.PublicKeyValidator
dht.Selector["pk"] = record.PublicKeySelector

return dht
}

func makeDHT(ctx context.Context, h host.Host, dstore ds.Batching) *IpfsDHT {
return &IpfsDHT{
datastore: dstore,
self: h.ID(),
peerstore: h.Peerstore(),
host: h,
strmap: make(map[peer.ID]*messageSender),
ctx: ctx,
providers: providers.NewProviderManager(ctx, h.ID(), dstore),
birth: time.Now(),
routingTable: kb.NewRoutingTable(KValue, kb.ConvertPeerID(h.ID()), time.Minute, h.Peerstore()),

Validator: make(record.Validator),
Selector: make(record.Selector),
}
}

// putValueToPeer stores the given key/value pair at the peer 'p'
func (dht *IpfsDHT) putValueToPeer(ctx context.Context, p peer.ID,
key key.Key, rec *recpb.Record) error {
Expand Down
55 changes: 45 additions & 10 deletions dht_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,16 @@ func init() {
}
}

func setupDHT(ctx context.Context, t *testing.T) *IpfsDHT {
func setupDHT(ctx context.Context, t *testing.T, client bool) *IpfsDHT {
h := netutil.GenHostSwarm(t, ctx)

dss := dssync.MutexWrap(ds.NewMapDatastore())
d := NewDHT(ctx, h, dss)
var d *IpfsDHT
if client {
d = NewDHTClient(ctx, h, dss)
} else {
d = NewDHT(ctx, h, dss)
}

d.Validator["v"] = &record.ValidChecker{
Func: func(key.Key, []byte) error {
Expand All @@ -61,7 +66,7 @@ func setupDHTS(ctx context.Context, n int, t *testing.T) ([]ma.Multiaddr, []peer
sanityPeersMap := make(map[string]struct{})

for i := 0; i < n; i++ {
dhts[i] = setupDHT(ctx, t)
dhts[i] = setupDHT(ctx, t, false)
peers[i] = dhts[i].self
addrs[i] = dhts[i].peerstore.Addrs(dhts[i].self)[0]

Expand All @@ -80,8 +85,7 @@ func setupDHTS(ctx context.Context, n int, t *testing.T) ([]ma.Multiaddr, []peer
return addrs, peers, dhts
}

func connect(t *testing.T, ctx context.Context, a, b *IpfsDHT) {

func connectNoSync(t *testing.T, ctx context.Context, a, b *IpfsDHT) {
idB := b.self
addrB := b.peerstore.Addrs(idB)
if len(addrB) == 0 {
Expand All @@ -93,6 +97,10 @@ func connect(t *testing.T, ctx context.Context, a, b *IpfsDHT) {
if err := a.host.Connect(ctx, pi); err != nil {
t.Fatal(err)
}
}

func connect(t *testing.T, ctx context.Context, a, b *IpfsDHT) {
connectNoSync(t, ctx, a, b)

// loop until connection notification has been received.
// under high load, this may not happen as immediately as we would like.
Expand Down Expand Up @@ -132,8 +140,8 @@ func TestValueGetSet(t *testing.T) {

ctx := context.Background()

dhtA := setupDHT(ctx, t)
dhtB := setupDHT(ctx, t)
dhtA := setupDHT(ctx, t, false)
dhtB := setupDHT(ctx, t, false)

defer dhtA.Close()
defer dhtB.Close()
Expand Down Expand Up @@ -780,8 +788,8 @@ func TestConnectCollision(t *testing.T) {

ctx := context.Background()

dhtA := setupDHT(ctx, t)
dhtB := setupDHT(ctx, t)
dhtA := setupDHT(ctx, t, false)
dhtB := setupDHT(ctx, t, false)

addrA := dhtA.peerstore.Addrs(dhtA.self)[0]
addrB := dhtB.peerstore.Addrs(dhtB.self)[0]
Expand Down Expand Up @@ -832,10 +840,37 @@ func TestBadProtoMessages(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

d := setupDHT(ctx, t)
d := setupDHT(ctx, t, false)

nilrec := new(pb.Message)
if _, err := d.handlePutValue(ctx, "testpeer", nilrec); err == nil {
t.Fatal("should have errored on nil record")
}
}

func TestClientModeConnect(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

a := setupDHT(ctx, t, false)
b := setupDHT(ctx, t, true)

connectNoSync(t, ctx, a, b)

k := key.Key("TestHash")
p := peer.ID("TestPeer")
a.providers.AddProvider(ctx, k, p)

provs, err := b.FindProviders(ctx, k)
if err != nil {
t.Fatal(err)
}

if len(provs) == 0 {
t.Fatal("Expected to get a provider back")
}

if provs[0].ID != p {
t.Fatal("expected it to be our test peer")
}
}
19 changes: 18 additions & 1 deletion notif.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package dht
import (
ma "github.com/jbenet/go-multiaddr"
inet "github.com/libp2p/go-libp2p/p2p/net"
mstream "github.com/whyrusleeping/go-multistream"
)

// netNotifiee defines methods to be used with the IpfsDHT
Expand All @@ -19,7 +20,23 @@ func (nn *netNotifiee) Connected(n inet.Network, v inet.Conn) {
return
default:
}
dht.Update(dht.Context(), v.RemotePeer())

// Note: We *could* just check the peerstore to see if the remote side supports the dht
// protocol, but its not clear that that information will make it into the peerstore
// by the time this notification is sent. So just to be very careful, we brute force this
// and open a new stream
s, err := dht.host.NewStream(dht.Context(), v.RemotePeer(), ProtocolDHT, ProtocolDHTOld)
switch err {
case nil:
s.Close()
// connected fine? full dht node
dht.Update(dht.Context(), v.RemotePeer())
case mstream.ErrNotSupported:
// Client mode only, don't bother adding them to our routing table
default:
// real error? thats odd
log.Errorf("checking dht client type: %#v", err)
}
}

func (nn *netNotifiee) Disconnected(n inet.Network, v inet.Conn) {
Expand Down
17 changes: 9 additions & 8 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,9 @@
},
{
"author": "whyrusleeping",
"hash": "QmdMfSLMDBDYhtc4oF3NYGCZr5dy4wQb6Ji26N4D4mdxa2",
"hash": "QmYkwVGkwoPbMVQEbf6LonZg4SsCxGP3H7PBEtdNCNRyxD",
"name": "go-libp2p-peerstore",
"version": "1.2.4"
"version": "1.2.5"
},
{
"author": "whyrusleeping",
Expand Down Expand Up @@ -107,15 +107,15 @@
},
{
"author": "whyrusleeping",
"hash": "QmTZsN8hysGnbakvK6mS8rwDQ9uwokxmWFBv94pig6zGd1",
"hash": "QmVsCNFD32GzZ6Q5XD1TVGPRviNYqDdoNvgq853TU9hhzP",
"name": "go-libp2p-kbucket",
"version": "1.0.2"
"version": "1.0.3"
},
{
"author": "whyrusleeping",
"hash": "QmcoQiBzRaaVv1DZbbXoDWiEtvDN94Ca1DcwnQKK2tP92s",
"hash": "QmemZcG8WprPbnVX3AM43GhhSUiA3V6NjcTLAguvWzkdpQ",
"name": "go-libp2p-routing",
"version": "1.0.2"
"version": "1.0.3"
},
{
"author": "whyrusleeping",
Expand All @@ -125,15 +125,16 @@
},
{
"author": "whyrusleeping",
"hash": "QmR61Ut9oN9mEacVUDWpvvhRPYXSxHEAZVbZkiLy9tKmdr",
"hash": "QmbiRCGZqhfcSjnm9icGz3oNQQdPLAnLWnKHXixaEWXVCN",
"name": "go-libp2p",
"version": "3.5.3"
"version": "3.5.4"
}
],
"gxVersion": "0.4.0",
"language": "go",
"license": "MIT",
"name": "go-libp2p-kad-dht",
"releaseCmd": "git commit -a -m \"gx publish $VERSION\"",
"version": "1.1.1"
}

0 comments on commit 2f5770b

Please sign in to comment.