diff --git a/routing/dht/dht.go b/routing/dht/dht.go index f9d3963dc5f..4ab7b8f32e8 100644 --- a/routing/dht/dht.go +++ b/routing/dht/dht.go @@ -117,34 +117,6 @@ func (dht *IpfsDHT) putValueToPeer(ctx context.Context, p peer.ID, return nil } -// putProvider sends a message to peer 'p' saying that the local node -// can provide the value of 'key' -func (dht *IpfsDHT) putProvider(ctx context.Context, p peer.ID, skey string) error { - - // add self as the provider - pi := pstore.PeerInfo{ - ID: dht.self, - Addrs: dht.host.Addrs(), - } - - // // only share WAN-friendly addresses ?? - // pi.Addrs = addrutil.WANShareableAddrs(pi.Addrs) - if len(pi.Addrs) < 1 { - // log.Infof("%s putProvider: %s for %s error: no wan-friendly addresses", dht.self, p, key.Key(key), pi.Addrs) - return fmt.Errorf("no known addresses for self. cannot put provider.") - } - - pmes := pb.NewMessage(pb.Message_ADD_PROVIDER, skey, 0) - pmes.ProviderPeers = pb.RawPeerInfosToPBPeers([]pstore.PeerInfo{pi}) - err := dht.sendMessage(ctx, p, pmes) - if err != nil { - return err - } - - log.Debugf("%s putProvider: %s for %s (%s)", dht.self, p, key.Key(skey), pi.Addrs) - return nil -} - var errInvalidRecord = errors.New("received invalid record") // getValueOrPeers queries a particular peer p for the value for diff --git a/routing/dht/routing.go b/routing/dht/routing.go index 9f56fdba14e..7298490df42 100644 --- a/routing/dht/routing.go +++ b/routing/dht/routing.go @@ -2,6 +2,7 @@ package dht import ( "bytes" + "fmt" "sync" "time" @@ -243,13 +244,18 @@ func (dht *IpfsDHT) Provide(ctx context.Context, key key.Key) error { return err } + mes, err := dht.makeProvRecord(key) + if err != nil { + return err + } + wg := sync.WaitGroup{} for p := range peers { wg.Add(1) go func(p peer.ID) { defer wg.Done() log.Debugf("putProvider(%s, %s)", key, p) - err := dht.putProvider(ctx, p, string(key)) + err := dht.sendMessage(ctx, p, mes) if err != nil { log.Debug(err) } @@ -258,6 +264,22 @@ func (dht *IpfsDHT) Provide(ctx context.Context, key key.Key) error { wg.Wait() return nil } +func (dht *IpfsDHT) makeProvRecord(skey key.Key) (*pb.Message, error) { + pi := pstore.PeerInfo{ + ID: dht.self, + Addrs: dht.host.Addrs(), + } + + // // only share WAN-friendly addresses ?? + // pi.Addrs = addrutil.WANShareableAddrs(pi.Addrs) + if len(pi.Addrs) < 1 { + return nil, fmt.Errorf("no known addresses for self. cannot put provider.") + } + + pmes := pb.NewMessage(pb.Message_ADD_PROVIDER, string(skey), 0) + pmes.ProviderPeers = pb.RawPeerInfosToPBPeers([]pstore.PeerInfo{pi}) + return pmes, nil +} // FindProviders searches until the context expires. func (dht *IpfsDHT) FindProviders(ctx context.Context, key key.Key) ([]pstore.PeerInfo, error) { diff --git a/routing/kbucket/table.go b/routing/kbucket/table.go index ff77275d9ef..0dfdff45784 100644 --- a/routing/kbucket/table.go +++ b/routing/kbucket/table.go @@ -48,11 +48,11 @@ func NewRoutingTable(bucketsize int, localID ID, latency time.Duration, m pstore // Update adds or moves the given peer to the front of its respective bucket // If a peer gets removed from a bucket, it is returned func (rt *RoutingTable) Update(p peer.ID) { - rt.tabLock.Lock() - defer rt.tabLock.Unlock() peerID := ConvertPeerID(p) cpl := commonPrefixLen(peerID, rt.local) + rt.tabLock.Lock() + defer rt.tabLock.Unlock() bucketID := cpl if bucketID >= len(rt.Buckets) { bucketID = len(rt.Buckets) - 1 @@ -144,10 +144,10 @@ func (rt *RoutingTable) NearestPeer(id ID) peer.ID { // NearestPeers returns a list of the 'count' closest peers to the given ID func (rt *RoutingTable) NearestPeers(id ID, count int) []peer.ID { - rt.tabLock.RLock() - defer rt.tabLock.RUnlock() cpl := commonPrefixLen(id, rt.local) + rt.tabLock.RLock() + // Get bucket at cpl index or last bucket var bucket *Bucket if cpl >= len(rt.Buckets) { @@ -170,6 +170,7 @@ func (rt *RoutingTable) NearestPeers(id ID, count int) []peer.ID { peerArr = copyPeersFromList(id, peerArr, plist) } } + rt.tabLock.RUnlock() // Sort by distance to local peer sort.Sort(peerArr)