From 96144c1ecd64f175152f6498ec46e2a562788650 Mon Sep 17 00:00:00 2001 From: losfair Date: Mon, 16 Jul 2018 13:05:32 +0800 Subject: [PATCH] network: Separate out the close signal to avoid sending on closed channels. --- go.mod | 2 -- network/client.go | 46 ++++++++++++++++++++++++++++------------------ network/network.go | 8 ++++++-- 3 files changed, 34 insertions(+), 22 deletions(-) diff --git a/go.mod b/go.mod index c6d2409a..e5be5564 100644 --- a/go.mod +++ b/go.mod @@ -1,7 +1,6 @@ module github.com/perlin-network/noise require ( - github.com/davecgh/go-spew v1.1.0 github.com/fd/go-nat v1.0.0 github.com/gogo/protobuf v1.0.0 github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b @@ -10,7 +9,6 @@ require ( github.com/klauspost/cpuid v0.0.0-20180405133222-e7e905edc00e github.com/klauspost/reedsolomon v0.0.0-20180704173009-925cb01d6510 github.com/pkg/errors v0.8.0 - github.com/pmezard/go-difflib v1.0.0 github.com/stretchr/objx v0.1.1 github.com/stretchr/testify v1.2.2 github.com/templexxx/cpufeat v0.0.0-20170927014610-3794dfbfb047 diff --git a/network/client.go b/network/client.go index 3906e5c3..61cd795a 100644 --- a/network/client.go +++ b/network/client.go @@ -20,7 +20,7 @@ type PeerClient struct { ID *peer.ID Address string - Requests *sync.Map + Requests sync.Map // uint64 -> *RequestState RequestNonce uint64 stream StreamState @@ -31,6 +31,7 @@ type PeerClient struct { jobs chan func() closed uint32 // for atomic ops + closeSignal chan struct{} } type StreamState struct { @@ -42,6 +43,11 @@ type StreamState struct { writeDeadline time.Time } +type RequestState struct { + data chan proto.Message + closeSignal chan struct{} +} + // createPeerClient creates a stub peer client. func createPeerClient(network *Network, address string) (*PeerClient, error) { // Ensure the address is valid. @@ -52,7 +58,6 @@ func createPeerClient(network *Network, address string) (*PeerClient, error) { client := &PeerClient{ Network: network, Address: address, - Requests: new(sync.Map), RequestNonce: 0, incomingReady: make(chan struct{}), @@ -64,6 +69,7 @@ func createPeerClient(network *Network, address string) (*PeerClient, error) { }, jobs: make(chan func(), 128), + closeSignal: make(chan struct{}), } return client, nil @@ -78,19 +84,20 @@ func (c *PeerClient) Init() { } func (c *PeerClient) Submit(job func()) { - // FIXME: This is a hack to prevent closed c.jobs from panicking the program. - defer func() { - if err := recover(); err != nil { - - } - }() - - c.jobs <- job + select { + case c.jobs <- job: + case <-c.closeSignal: + } } func (c *PeerClient) executeJobs() { - for job := range c.jobs { - job() + for { + select { + case job := <-c.jobs: + job() + case <-c.closeSignal: + return + } } } @@ -101,14 +108,12 @@ func (c *PeerClient) Close() error { return nil } + close(c.closeSignal) + c.stream.Lock() c.stream.closed = true c.stream.Unlock() - if c.jobs != nil { - close(c.jobs) - } - // Handle 'on peer disconnect' callback for plugins. c.Network.Plugins.Each(func(plugin PluginInterface) { plugin.PeerDisconnect(c) @@ -161,10 +166,15 @@ func (c *PeerClient) Request(req *rpc.Request) (proto.Message, error) { // Start tracking the request. channel := make(chan proto.Message, 1) - c.Requests.Store(signed.RequestNonce, channel) + closeSignal := make(chan struct{}) + + c.Requests.Store(signed.RequestNonce, &RequestState { + data: channel, + closeSignal: closeSignal, + }) // Stop tracking the request. - defer close(channel) + defer close(closeSignal) defer c.Requests.Delete(signed.RequestNonce) select { diff --git a/network/network.go b/network/network.go index 1d060ce0..1b77d1a3 100644 --- a/network/network.go +++ b/network/network.go @@ -131,8 +131,12 @@ func (n *Network) dispatchMessage(client *PeerClient, msg *protobuf.Message) { return } - if channel, exists := client.Requests.Load(msg.RequestNonce); exists && msg.RequestNonce > 0 { - channel.(chan proto.Message) <- ptr.Message + if _state, exists := client.Requests.Load(msg.RequestNonce); exists && msg.RequestNonce > 0 { + state := _state.(*RequestState) + select { + case state.data <- ptr.Message: + case <-state.closeSignal: + } return }