Skip to content
This repository has been archived by the owner on Aug 2, 2021. It is now read-only.

swarm/pss: forwarding function refactoring #1043

Closed
wants to merge 13 commits into from
103 changes: 53 additions & 50 deletions swarm/pss/pss.go
Original file line number Diff line number Diff line change
Expand Up @@ -886,68 +886,71 @@ func (p *Pss) send(to []byte, topic Topic, msg []byte, asymmetric bool, key []by
return nil
}

// tries to send a message, returns true if successful
func (p *Pss) trySend(sp *network.Peer, msg *PssMsg) bool {
var isPssEnabled bool
info := sp.Info()
for _, capability := range info.Caps {
if capability == p.capstring {
isPssEnabled = true
break
}
}
if !isPssEnabled {
log.Trace("peer doesn't have matching pss capabilities, skipping", "peer", info.Name, "caps", info.Caps)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just occurred to me this should likely be a log.Error. Running swarm with Pss used to be optional, but it is no longer.

return false
}

// get the protocol peer from the forwarding peer cache
p.fwdPoolMu.RLock()
pp := p.fwdPool[sp.Info().ID]
p.fwdPoolMu.RUnlock()

err := pp.Send(context.TODO(), msg)
gluk256 marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
metrics.GetOrRegisterCounter("pss.pp.send.error", nil).Inc(1)
log.Error(err.Error())
}

return err == nil
gluk256 marked this conversation as resolved.
Show resolved Hide resolved
}

// Forwards a pss message to the peer(s) closest to the to recipient address in the PssMsg struct
// The recipient address can be of any length, and the byte slice will be matched to the MSB slice
// of the peer address of the equivalent length.
// If the recipient address (or partial address) is within the neighbourhood depth of the forwarding
// node, then it will be forwarded to all the nearest neighbours of the forwarding node. In case of
// partial address, it should be forwarded to all the peers matching the partial address, if there
// are any; otherwise only to one peer, closest to the recipient address. In any case, if the message
// forwarding fails, the node should try to forward it to the next best peer, until the message is
// successfully forwarded to at least one peer.
func (p *Pss) forward(msg *PssMsg) error {
metrics.GetOrRegisterCounter("pss.forward", nil).Inc(1)

sent := 0 // number of successful sends
to := make([]byte, addressLength)
copy(to[:len(msg.To)], msg.To)
neighbourhoodDepth := p.Kademlia.NeighbourhoodDepth()

// luminosity is the opposite of darkness. the more bytes are removed from the address, the higher is darkness,
// but the luminosity is less. here luminosity equals the number of bits present in the destination address.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

number of bits defined in the destination address

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think given is best

luminosityRadius := len(msg.To) * 8
pof := pot.DefaultPof(neighbourhoodDepth) // pof function matching up to neighbourhoodDepth bits (pof <= neighbourhoodDepth)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May I suggest describing why and how exactly this mechanism is used? pof can be a rather alien term to many that will read this in the future.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i have changed the description a bit. however, i think that description of POF belongs where POF is defined, and not where it is used.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given the relative abundance and frequent obscurity of terms in our code, I don't think it hurts to be generous with explanations ;)

depth, _ := pof(to, p.BaseAddr(), 0)
if depth > luminosityRadius {
depth = luminosityRadius
}

// send with kademlia
// find the closest peer to the recipient and attempt to send
sent := 0
p.Kademlia.EachConn(to, 256, func(sp *network.Peer, po int, isproxbin bool) bool {
info := sp.Info()

// check if the peer is running pss
var ispss bool
for _, cap := range info.Caps {
if cap == p.capstring {
ispss = true
break
}
}
if !ispss {
log.Trace("peer doesn't have matching pss capabilities, skipping", "peer", info.Name, "caps", info.Caps)
return true
}

// get the protocol peer from the forwarding peer cache
sendMsg := fmt.Sprintf("MSG TO %x FROM %x VIA %x", to, p.BaseAddr(), sp.Address())
p.fwdPoolMu.RLock()
pp := p.fwdPool[sp.Info().ID]
p.fwdPoolMu.RUnlock()

// attempt to send the message
err := pp.Send(context.TODO(), msg)
if err != nil {
metrics.GetOrRegisterCounter("pss.pp.send.error", nil).Inc(1)
log.Error(err.Error())
return true
p.Kademlia.EachConn(to, addressLength*8, func(sp *network.Peer, po int, _ bool) bool {
if po < depth && sent > 0 {
return false // stop iterating
}
sent++
log.Trace(fmt.Sprintf("%v: successfully forwarded", sendMsg))

// continue forwarding if:
// - if the peer is end recipient but the full address has not been disclosed
// - if the peer address matches the partial address fully
// - if the peer is in proxbin
if len(msg.To) < addressLength && bytes.Equal(msg.To, sp.Address()[:len(msg.To)]) {
log.Trace(fmt.Sprintf("Pss keep forwarding: Partial address + full partial match"))
return true
} else if isproxbin {
log.Trace(fmt.Sprintf("%x is in proxbin, keep forwarding", common.ToHex(sp.Address())))
return true
if p.trySend(sp, msg) {
sent++
}
// at this point we stop forwarding, and the state is as follows:
// - the peer is end recipient and we have full address
// - we are not in proxbin (directed routing)
// - partial addresses don't fully match
return false
return true // continue
})

// if we failed to send to anyone, re-insert message in the send-queue
if sent == 0 {
log.Debug("unable to forward to any peers")
if err := p.enqueue(msg); err != nil {
Expand Down