Skip to content

Commit

Permalink
network: Sync Network::Peers read/write with RWMutex.
Browse files Browse the repository at this point in the history
  • Loading branch information
losfair committed Jul 6, 2018
1 parent 3300549 commit c636392
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 50 deletions.
2 changes: 1 addition & 1 deletion network/builders/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func (builder *NetworkBuilder) Build() (*network.Network, error) {

Plugins: builder.plugins,

Peers: new(sync.Map),
Peers: make(map[string]*network.PeerClient),

Connections: new(sync.Map),
SendQueue: make(chan *network.Packet),
Expand Down
4 changes: 3 additions & 1 deletion network/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,9 @@ func (c *PeerClient) Close() error {

// Remove entries from node's network.
if c.ID != nil {
c.Network.Peers.Delete(c.ID.Address)
c.Network.PeersMutex.Lock()
delete(c.Network.Peers, c.ID.Address)
c.Network.PeersMutex.Unlock()
c.Network.Connections.Delete(c.ID.Address)
}

Expand Down
68 changes: 20 additions & 48 deletions network/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@ import (
"time"
)

type InitAwaiter chan struct{}

type Packet struct {
Target string
Payload *protobuf.Message
Expand All @@ -43,7 +41,8 @@ type Network struct {

// Map of connection addresses (string) <-> *network.PeerClient
// so that the Network doesn't dial multiple times to the same ip
Peers *sync.Map
Peers map[string]*PeerClient
PeersMutex sync.RWMutex

SendQueue chan *Packet
RecvQueue chan *protobuf.Message
Expand Down Expand Up @@ -108,17 +107,11 @@ func (n *Network) handleRecvQueue() {
for {
select {
case packet := <-n.RecvQueue:
if client, exists := n.Peers.Load(packet.Sender.Address); exists {
if awaiter, ok := client.(InitAwaiter); ok {
<-awaiter
client, exists = n.Peers.Load(packet.Sender.Address)
if !exists {
continue
}
}

client := client.(*PeerClient)
n.PeersMutex.RLock()
client, exists := n.Peers[packet.Sender.Address]
n.PeersMutex.RUnlock()

if exists {
var ptr ptypes.DynamicAny
if err := ptypes.UnmarshalAny(packet.Message, &ptr); err != nil {
continue
Expand Down Expand Up @@ -214,39 +207,25 @@ func (n *Network) Client(address string) (*PeerClient, error) {
return nil, errors.New("peer should not dial itself")
}

thisAwaiter := make(InitAwaiter)
n.PeersMutex.Lock()
defer n.PeersMutex.Unlock()

if client, loaded := n.Peers.LoadOrStore(address, thisAwaiter); loaded {
if awaiter, ok := client.(InitAwaiter); ok {
<-awaiter
var exists bool
client, exists = n.Peers.Load(address)
if !exists {
return nil, errors.New("initialization failed")
}
}
return client.(*PeerClient), nil
if client, exists := n.Peers[address]; exists {
return client, nil
} else {
defer func() {
close(thisAwaiter)
}()

session, err := n.Dial(address)

if err != nil {
n.Peers.Delete(address)
return nil, err
}

n.Connections.Store(address, session)

client, err := createPeerClient(n, address)
if err != nil {
n.Peers.Delete(address)
return nil, err
}
n.Peers.Store(address, client)

n.Peers[address] = client
return client, nil
}
}
Expand Down Expand Up @@ -451,20 +430,16 @@ func (n *Network) Write(address string, message *protobuf.Message) error {

// Broadcast asynchronously broadcasts a message to all peer clients.
func (n *Network) Broadcast(message proto.Message) {
n.Peers.Range(func(key, value interface{}) bool {
client, ok := value.(*PeerClient)
if !ok {
return true
}
n.PeersMutex.RLock()
defer n.PeersMutex.RUnlock()

for _, client := range n.Peers {
err := client.Tell(message)

if err != nil {
glog.Warningf("Failed to send message to peer %v [err=%s]", client.ID, err)
}

return true
})
}
}

// BroadcastByAddresses broadcasts a message to a set of peer clients denoted by their addresses.
Expand Down Expand Up @@ -496,21 +471,18 @@ func (n *Network) BroadcastByIDs(message proto.Message, ids ...peer.ID) {
func (n *Network) BroadcastRandomly(message proto.Message, K int) {
var addresses []string

n.Peers.Range(func(key, value interface{}) bool {
client, ok := value.(*PeerClient)
if !ok {
return true
}
n.PeersMutex.RLock()

for _, client := range n.Peers {
addresses = append(addresses, client.Address)

// Limit total amount of addresses in case we have a lot of peers.
if len(addresses) > K*3 {
return false
break
}
}

return true
})
n.PeersMutex.RUnlock()

// Flip a coin and shuffle :).
rand.Shuffle(len(addresses), func(i, j int) {
Expand Down

0 comments on commit c636392

Please sign in to comment.