Skip to content

Commit

Permalink
fix doube close channel of subfetcher (#366)
Browse files Browse the repository at this point in the history
  • Loading branch information
unclezoro authored Aug 13, 2021
1 parent 8d5f2ba commit 504424d
Showing 1 changed file with 33 additions and 5 deletions.
38 changes: 33 additions & 5 deletions core/state/trie_prefetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import (
"github.com/ethereum/go-ethereum/metrics"
)

const abortChanSize = 64

var (
// triePrefetchMetricsPrefix is the prefix under which to publis the metrics.
triePrefetchMetricsPrefix = "trie/prefetch/"
Expand All @@ -41,6 +43,9 @@ type triePrefetcher struct {
fetches map[common.Hash]Trie // Partially or fully fetcher tries
fetchers map[common.Hash]*subfetcher // Subfetchers for each trie

abortChan chan *subfetcher
closeChan chan struct{}

deliveryMissMeter metrics.Meter
accountLoadMeter metrics.Meter
accountDupMeter metrics.Meter
Expand All @@ -56,9 +61,11 @@ type triePrefetcher struct {
func newTriePrefetcher(db Database, root common.Hash, namespace string) *triePrefetcher {
prefix := triePrefetchMetricsPrefix + namespace
p := &triePrefetcher{
db: db,
root: root,
fetchers: make(map[common.Hash]*subfetcher), // Active prefetchers use the fetchers map
db: db,
root: root,
fetchers: make(map[common.Hash]*subfetcher), // Active prefetchers use the fetchers map
abortChan: make(chan *subfetcher, abortChanSize),
closeChan: make(chan struct{}),

deliveryMissMeter: metrics.GetOrRegisterMeter(prefix+"/deliverymiss", nil),
accountLoadMeter: metrics.GetOrRegisterMeter(prefix+"/account/load", nil),
Expand All @@ -70,14 +77,34 @@ func newTriePrefetcher(db Database, root common.Hash, namespace string) *triePre
storageSkipMeter: metrics.GetOrRegisterMeter(prefix+"/storage/skip", nil),
storageWasteMeter: metrics.GetOrRegisterMeter(prefix+"/storage/waste", nil),
}
go p.abortLoop()
return p
}

func (p *triePrefetcher) abortLoop() {
for {
select {
case fetcher := <-p.abortChan:
fetcher.abort()
case <-p.closeChan:
// drain fetcher channel
for {
select {
case fetcher := <-p.abortChan:
fetcher.abort()
default:
return
}
}
}
}
}

// close iterates over all the subfetchers, aborts any that were left spinning
// and reports the stats to the metrics subsystem.
func (p *triePrefetcher) close() {
for _, fetcher := range p.fetchers {
fetcher.abort() // safe to do multiple times
p.abortChan <- fetcher // safe to do multiple times

if metrics.Enabled {
if fetcher.root == p.root {
Expand All @@ -101,6 +128,7 @@ func (p *triePrefetcher) close() {
}
}
}
close(p.closeChan)
// Clear out all fetchers (will crash on a second call, deliberate)
p.fetchers = nil
}
Expand Down Expand Up @@ -174,7 +202,7 @@ func (p *triePrefetcher) trie(root common.Hash) Trie {
}
// Interrupt the prefetcher if it's by any chance still running and return
// a copy of any pre-loaded trie.
fetcher.abort() // safe to do multiple times
p.abortChan <- fetcher // safe to do multiple times

trie := fetcher.peek()
if trie == nil {
Expand Down

0 comments on commit 504424d

Please sign in to comment.