Skip to content

Commit

Permalink
p2p/discover: ensure no goroutines remain after shutdown
Browse files Browse the repository at this point in the history
  • Loading branch information
fjl committed Oct 11, 2018
1 parent 5adb6d0 commit 91025b7
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 8 deletions.
14 changes: 10 additions & 4 deletions p2p/discover/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,10 @@ func (tab *Table) ReadRandomNodes(buf []*enode.Node) (n int) {

// Close terminates the network listener and flushes the node database.
func (tab *Table) Close() {
if tab.net != nil {
tab.net.close()
}

select {
case <-tab.closed:
// already closed.
Expand Down Expand Up @@ -337,8 +341,8 @@ func (tab *Table) loop() {
revalidate = time.NewTimer(tab.nextRevalidateTime())
refresh = time.NewTicker(refreshInterval)
copyNodes = time.NewTicker(copyNodesInterval)
revalidateDone = make(chan struct{})
refreshDone = make(chan struct{}) // where doRefresh reports completion
revalidateDone chan struct{} // where doRevalidate reports completion
waiting = []chan struct{}{tab.initDone} // holds waiting callers while doRefresh runs
)
defer refresh.Stop()
Expand Down Expand Up @@ -369,25 +373,27 @@ loop:
}
waiting, refreshDone = nil, nil
case <-revalidate.C:
revalidateDone = make(chan struct{})
go tab.doRevalidate(revalidateDone)
case <-revalidateDone:
revalidate.Reset(tab.nextRevalidateTime())
revalidateDone = nil
case <-copyNodes.C:
go tab.copyLiveNodes()
case <-tab.closeReq:
break loop
}
}

if tab.net != nil {
tab.net.close()
}
if refreshDone != nil {
<-refreshDone
}
for _, ch := range waiting {
close(ch)
}
if revalidateDone != nil {
<-revalidateDone
}
close(tab.closed)
}

Expand Down
12 changes: 8 additions & 4 deletions p2p/discover/udp.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,12 @@ import (
"errors"
"fmt"
"net"
"sync"
"time"

"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/p2p/nat"
"github.com/ethereum/go-ethereum/p2p/netutil"
"github.com/ethereum/go-ethereum/rlp"
)
Expand Down Expand Up @@ -175,7 +175,7 @@ type udp struct {
localNode *enode.LocalNode
db *enode.DB
tab *Table
nat nat.Interface
wg sync.WaitGroup

addpending chan *pending
gotreply chan reply
Expand Down Expand Up @@ -262,6 +262,7 @@ func newUDP(c conn, ln *enode.LocalNode, cfg Config) (*Table, *udp, error) {
}
udp.tab = tab

udp.wg.Add(2)
go udp.loop()
go udp.readLoop(cfg.Unhandled)
return udp.tab, udp, nil
Expand All @@ -274,7 +275,7 @@ func (t *udp) self() *enode.Node {
func (t *udp) close() {
close(t.closing)
t.conn.Close()
// TODO: wait for the loops to end.
t.wg.Wait()
}

func (t *udp) ourEndpoint() rpcEndpoint {
Expand Down Expand Up @@ -379,6 +380,8 @@ func (t *udp) handleReply(from enode.ID, ptype byte, req packet) bool {
// loop runs in its own goroutine. it keeps track of
// the refresh timer and the pending reply queue.
func (t *udp) loop() {
defer t.wg.Done()

var (
plist = list.New()
timeout = time.NewTimer(0)
Expand Down Expand Up @@ -540,10 +543,11 @@ func encodePacket(priv *ecdsa.PrivateKey, ptype byte, req interface{}) (packet,

// readLoop runs in its own goroutine. it handles incoming UDP packets.
func (t *udp) readLoop(unhandled chan<- ReadPacket) {
defer t.conn.Close()
defer t.wg.Done()
if unhandled != nil {
defer close(unhandled)
}

// Discovery packets are defined to be no larger than 1280 bytes.
// Packets larger than this size will be cut at the end and treated
// as invalid because their hash won't match.
Expand Down

0 comments on commit 91025b7

Please sign in to comment.