Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

worker enhancement: NewTxsEvent and triePrefetch reuse #1204

Merged
merged 2 commits into from
Nov 28, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions core/state/statedb.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,25 @@ func (s *StateDB) EnableWriteOnSharedStorage() {
s.writeOnSharedStorage = true
}

// In mining mode, we will try multi-fillTransactions to get the most profitable one.
// StateDB will be created for each fillTransactions with same block height.
// Share a single triePrefetcher to avoid too much prefetch routines.
func (s *StateDB) TransferPrefetcher(prev *StateDB) {
if prev == nil {
return
}
var fetcher *triePrefetcher

prev.prefetcherLock.Lock()
fetcher = prev.prefetcher
prev.prefetcher = nil
prev.prefetcherLock.Unlock()

s.prefetcherLock.Lock()
s.prefetcher = fetcher
s.prefetcherLock.Unlock()
}

// StartPrefetcher initializes a new trie prefetcher to pull in nodes from the
// state trie concurrently while the state is mutated so that when we reach the
// commit phase, most of the needed data is already hot.
Expand Down
62 changes: 23 additions & 39 deletions miner/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,8 +188,6 @@ type worker struct {

// Subscriptions
mux *event.TypeMux
txsCh chan core.NewTxsEvent
txsSub event.Subscription
chainHeadCh chan core.ChainHeadEvent
chainHeadSub event.Subscription
chainSideCh chan core.ChainSideEvent
Expand Down Expand Up @@ -225,7 +223,6 @@ type worker struct {

// atomic status counters
running int32 // The indicator whether the consensus engine is running or not.
newTxs int32 // New arrival transaction count since last sealing work submitting.

// External functions
isLocalBlock func(header *types.Header) bool // Function used to determine whether the specified block is mined by local miner.
Expand Down Expand Up @@ -253,7 +250,6 @@ func newWorker(config *Config, chainConfig *params.ChainConfig, engine consensus
remoteUncles: make(map[common.Hash]*types.Block),
unconfirmed: newUnconfirmedBlocks(eth.BlockChain(), sealingLogAtDepth),
pendingTasks: make(map[common.Hash]*task),
txsCh: make(chan core.NewTxsEvent, txChanSize),
chainHeadCh: make(chan core.ChainHeadEvent, chainHeadChanSize),
chainSideCh: make(chan core.ChainSideEvent, chainSideChanSize),
newWorkCh: make(chan *newWorkReq),
Expand All @@ -265,8 +261,6 @@ func newWorker(config *Config, chainConfig *params.ChainConfig, engine consensus
resubmitIntervalCh: make(chan time.Duration),
recentMinedBlocks: recentMinedBlocks,
}
// Subscribe NewTxsEvent for tx pool
worker.txsSub = eth.TxPool().SubscribeNewTxsEvent(worker.txsCh)
// Subscribe events for blockchain
worker.chainHeadSub = eth.BlockChain().SubscribeChainHeadEvent(worker.chainHeadCh)
worker.chainSideSub = eth.BlockChain().SubscribeChainSideEvent(worker.chainSideCh)
Expand Down Expand Up @@ -397,7 +391,6 @@ func (w *worker) newWorkLoop(recommit time.Duration) {
return
}
timer.Reset(recommit)
atomic.StoreInt32(&w.newTxs, 0)
}
// clearPending cleans the stale pending tasks.
clearPending := func(number uint64) {
Expand Down Expand Up @@ -442,10 +435,6 @@ func (w *worker) newWorkLoop(recommit time.Duration) {
if w.isRunning() && ((w.chainConfig.Ethash != nil) || (w.chainConfig.Clique != nil &&
w.chainConfig.Clique.Period > 0) || (w.chainConfig.Parlia != nil && w.chainConfig.Parlia.Period > 0)) {
// Short circuit if no new transaction arrives.
if atomic.LoadInt32(&w.newTxs) == 0 {
timer.Reset(recommit)
continue
}
commit(commitInterruptResubmit)
}

Expand Down Expand Up @@ -473,7 +462,6 @@ func (w *worker) newWorkLoop(recommit time.Duration) {
// submit it or return task according to given parameters for various proposes.
func (w *worker) mainLoop() {
defer w.wg.Done()
defer w.txsSub.Unsubscribe()
defer w.chainHeadSub.Unsubscribe()
defer w.chainSideSub.Unsubscribe()
defer func() {
Expand Down Expand Up @@ -539,24 +527,9 @@ func (w *worker) mainLoop() {
}
}

case ev := <-w.txsCh:
if w.isRunning() {
// Special case, if the consensus engine is 0 period clique(dev mode),
// submit sealing work here since all empty submission will be rejected
// by clique. Of course the advance sealing(empty submission) is disabled.
if (w.chainConfig.Clique != nil && w.chainConfig.Clique.Period == 0) ||
(w.chainConfig.Parlia != nil && w.chainConfig.Parlia.Period == 0) {
w.commitWork(nil, time.Now().Unix())
}
}

atomic.AddInt32(&w.newTxs, int32(len(ev.Txs)))

// System stopped
case <-w.exitCh:
return
case <-w.txsSub.Err():
return
case <-w.chainHeadSub.Err():
return
case <-w.chainSideSub.Err():
Expand Down Expand Up @@ -718,14 +691,19 @@ func (w *worker) resultLoop() {
}

// makeEnv creates a new environment for the sealing block.
func (w *worker) makeEnv(parent *types.Block, header *types.Header, coinbase common.Address) (*environment, error) {
func (w *worker) makeEnv(parent *types.Block, header *types.Header, coinbase common.Address,
prevEnv *environment) (*environment, error) {
// Retrieve the parent state to execute on top and start a prefetcher for
// the miner to speed block sealing up a bit
state, err := w.chain.StateAtWithSharedPool(parent.Root())
if err != nil {
return nil, err
}
state.StartPrefetcher("miner")
if prevEnv == nil {
state.StartPrefetcher("miner")
} else {
state.TransferPrefetcher(prevEnv.state)
}

// Note the passed coinbase may be different with header.Coinbase.
env := &environment{
Expand Down Expand Up @@ -941,6 +919,7 @@ type generateParams struct {
random common.Hash // The randomness generated by beacon chain, empty before the merge
noUncle bool // Flag whether the uncle block inclusion is allowed
noExtra bool // Flag whether the extra field assignment is allowed
prevWork *environment
}

// prepareWork constructs the sealing task according to the given parameters,
Expand Down Expand Up @@ -999,7 +978,7 @@ func (w *worker) prepareWork(genParams *generateParams) (*environment, error) {
// Could potentially happen if starting to mine in an odd state.
// Note genParams.coinbase can be different with header.Coinbase
// since clique algorithm can modify the coinbase field in header.
env, err := w.makeEnv(parent, header, genParams.coinbase)
env, err := w.makeEnv(parent, header, genParams.coinbase, genParams.prevWork)
if err != nil {
log.Error("Failed to create sealing context", "err", err)
return nil, err
Expand Down Expand Up @@ -1105,27 +1084,29 @@ func (w *worker) commitWork(interruptCh chan int32, timestamp int64) {
// validator can try several times to get the most profitable block,
// as long as the timestamp is not reached.
workList := make([]*environment, 0, 10)
var bestWork *environment
var prevWork *environment
// workList clean up
defer func() {
for _, w := range workList {
for _, wk := range workList {
// only keep the best work, discard others.
if w == bestWork {
if wk == w.current {
continue
}
w.discard()
wk.discard()
qinglin89 marked this conversation as resolved.
Show resolved Hide resolved
}
}()

LOOP:
for {
work, err := w.prepareWork(&generateParams{
timestamp: uint64(timestamp),
coinbase: coinbase,
prevWork: prevWork,
})
if err != nil {
return
}

prevWork = work
workList = append(workList, work)

delay := w.engine.Delay(w.chain, work.header, &w.config.DelayLeftOver)
Expand Down Expand Up @@ -1213,15 +1194,18 @@ LOOP:
}
}
}
// if sub's channel if full, it will block other NewTxsEvent subscribers,
// so unsubscribe ASAP and Unsubscribe() is re-enterable, safe to call several time.
sub.Unsubscribe()
}
// get the most profitable work
bestWork = workList[0]
bestWork := workList[0]
bestReward := new(big.Int)
for i, w := range workList {
balance := w.state.GetBalance(consensus.SystemAddress)
for i, wk := range workList {
balance := wk.state.GetBalance(consensus.SystemAddress)
log.Debug("Get the most profitable work", "index", i, "balance", balance, "bestReward", bestReward)
if balance.Cmp(bestReward) > 0 {
bestWork = w
bestWork = wk
bestReward = balance
}
}
Expand Down