Skip to content

Commit

Permalink
Merge pull request perlin-network#116 from perlin-network/close-signal
Browse files Browse the repository at this point in the history
Separate out the close signal to avoid sending on closed channels.
  • Loading branch information
iwasaki-kenta committed Jul 16, 2018
2 parents 2004863 + 96144c1 commit 1da1970
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 20 deletions.
46 changes: 28 additions & 18 deletions network/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ type PeerClient struct {
ID *peer.ID
Address string

Requests *sync.Map
Requests sync.Map // uint64 -> *RequestState
RequestNonce uint64

stream StreamState
Expand All @@ -31,6 +31,7 @@ type PeerClient struct {
jobs chan func()

closed uint32 // for atomic ops
closeSignal chan struct{}
}

type StreamState struct {
Expand All @@ -42,6 +43,11 @@ type StreamState struct {
writeDeadline time.Time
}

type RequestState struct {
data chan proto.Message
closeSignal chan struct{}
}

// createPeerClient creates a stub peer client.
func createPeerClient(network *Network, address string) (*PeerClient, error) {
// Ensure the address is valid.
Expand All @@ -52,7 +58,6 @@ func createPeerClient(network *Network, address string) (*PeerClient, error) {
client := &PeerClient{
Network: network,
Address: address,
Requests: new(sync.Map),
RequestNonce: 0,

incomingReady: make(chan struct{}),
Expand All @@ -64,6 +69,7 @@ func createPeerClient(network *Network, address string) (*PeerClient, error) {
},

jobs: make(chan func(), 128),
closeSignal: make(chan struct{}),
}

return client, nil
Expand All @@ -78,19 +84,20 @@ func (c *PeerClient) Init() {
}

func (c *PeerClient) Submit(job func()) {
// FIXME: This is a hack to prevent closed c.jobs from panicking the program.
defer func() {
if err := recover(); err != nil {

}
}()

c.jobs <- job
select {
case c.jobs <- job:
case <-c.closeSignal:
}
}

func (c *PeerClient) executeJobs() {
for job := range c.jobs {
job()
for {
select {
case job := <-c.jobs:
job()
case <-c.closeSignal:
return
}
}
}

Expand All @@ -101,14 +108,12 @@ func (c *PeerClient) Close() error {
return nil
}

close(c.closeSignal)

c.stream.Lock()
c.stream.closed = true
c.stream.Unlock()

if c.jobs != nil {
close(c.jobs)
}

// Handle 'on peer disconnect' callback for plugins.
c.Network.Plugins.Each(func(plugin PluginInterface) {
plugin.PeerDisconnect(c)
Expand Down Expand Up @@ -161,10 +166,15 @@ func (c *PeerClient) Request(req *rpc.Request) (proto.Message, error) {

// Start tracking the request.
channel := make(chan proto.Message, 1)
c.Requests.Store(signed.RequestNonce, channel)
closeSignal := make(chan struct{})

c.Requests.Store(signed.RequestNonce, &RequestState {
data: channel,
closeSignal: closeSignal,
})

// Stop tracking the request.
defer close(channel)
defer close(closeSignal)
defer c.Requests.Delete(signed.RequestNonce)

select {
Expand Down
8 changes: 6 additions & 2 deletions network/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,8 +131,12 @@ func (n *Network) dispatchMessage(client *PeerClient, msg *protobuf.Message) {
return
}

if channel, exists := client.Requests.Load(msg.RequestNonce); exists && msg.RequestNonce > 0 {
channel.(chan proto.Message) <- ptr.Message
if _state, exists := client.Requests.Load(msg.RequestNonce); exists && msg.RequestNonce > 0 {
state := _state.(*RequestState)
select {
case state.data <- ptr.Message:
case <-state.closeSignal:
}
return
}

Expand Down

0 comments on commit 1da1970

Please sign in to comment.