Skip to content

Commit

Permalink
worker: enhancement of the current block generation logic.
Browse files Browse the repository at this point in the history
Currently, validator only try once to get transactions from TxPool to produce the block.
However, new transactions could arrive while the validator is committing transaction.
Validator should be allowed to add these new arrived transactions as long as
Header.Timestamp is not reached

This commit will:
** commitTransactions return with error code
** drop current mining block on new block imported
** try fillTransactions several times for the best
   not use append mode to follow the GasPrice rule.
** check if there is enough time for another fillTransactions.
  • Loading branch information
setunapo committed Nov 17, 2022
1 parent 139eb3f commit 5c88f9f
Show file tree
Hide file tree
Showing 6 changed files with 175 additions and 30 deletions.
4 changes: 4 additions & 0 deletions consensus/beacon/consensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,10 @@ func (beacon *Beacon) SetThreads(threads int) {
}
}

func (p *Beacon) DropOnNewBlock(*types.Header) bool {
return true
}

// IsTTDReached checks if the TotalTerminalDifficulty has been surpassed on the `parentHash` block.
// It depends on the parentHash already being stored in the database.
// If the parentHash is not stored in the database a UnknownAncestor error is returned.
Expand Down
5 changes: 5 additions & 0 deletions consensus/clique/clique.go
Original file line number Diff line number Diff line change
Expand Up @@ -705,6 +705,11 @@ func (c *Clique) APIs(chain consensus.ChainHeaderReader) []rpc.API {
}}
}

func (p *Clique) DropOnNewBlock(header *types.Header) bool {
// drop the block if it is not in turn.
return header.Difficulty.Cmp(diffNoTurn) == 0
}

// SealHash returns the hash of a block prior to it being sealed.
func SealHash(header *types.Header) (hash common.Hash) {
hasher := sha3.NewLegacyKeccak256()
Expand Down
6 changes: 6 additions & 0 deletions consensus/consensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,12 @@ type Engine interface {

// Close terminates any background threads maintained by the consensus engine.
Close() error

// DropOnNewBlock determine the action of mining when it is interrupted by new imported block.
// Return
// true: the mining result will be dropped
// false: the mining result will be kept and move on to the next mine step.
DropOnNewBlock(header *types.Header) bool
}

// PoW is a consensus engine based on proof-of-work.
Expand Down
4 changes: 4 additions & 0 deletions consensus/ethash/consensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -647,6 +647,10 @@ var (
big32 = big.NewInt(32)
)

func (p *Ethash) DropOnNewBlock(*types.Header) bool {
return true
}

// AccumulateRewards credits the coinbase of the given block with the mining
// reward. The total reward consists of the static block reward and rewards for
// included uncles. The coinbase of each uncle block is also rewarded.
Expand Down
5 changes: 5 additions & 0 deletions consensus/parlia/parlia.go
Original file line number Diff line number Diff line change
Expand Up @@ -976,6 +976,11 @@ func (p *Parlia) CalcDifficulty(chain consensus.ChainHeaderReader, time uint64,
return CalcDifficulty(snap, p.val)
}

func (p *Parlia) DropOnNewBlock(header *types.Header) bool {
// drop the block if it is not in turn.
return header.Difficulty.Cmp(diffNoTurn) == 0
}

// CalcDifficulty is the difficulty adjustment algorithm. It returns the difficulty
// that a new block should have based on the previous blocks in the chain and the
// current signer.
Expand Down
181 changes: 151 additions & 30 deletions miner/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package miner
import (
"errors"
"fmt"
"math/big"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -67,6 +68,11 @@ const (
var (
writeBlockTimer = metrics.NewRegisteredTimer("worker/writeblock", nil)
finalizeBlockTimer = metrics.NewRegisteredTimer("worker/finalizeblock", nil)

errBlockInterruptedByNewHead = errors.New("new head arrived while building block")
errBlockInterruptedByRecommit = errors.New("recommit interrupt while building block")
errBlockInterruptedByTimeout = errors.New("timeout while building block")
errBlockInterruptedByOutOfGas = errors.New("out of gas while building block")
)

// environment is the worker's current environment and holds all
Expand Down Expand Up @@ -142,8 +148,11 @@ type task struct {
}

const (
commitInterruptNewHead int32 = 1
commitInterruptResubmit int32 = 2
commitInterruptNone int32 = iota
commitInterruptNewHead
commitInterruptResubmit
commitInterruptTimeout
commitInterruptOutOfGas
)

// newWorkReq represents a request for new sealing work submitting with relative interrupt notifier.
Expand Down Expand Up @@ -754,7 +763,7 @@ func (w *worker) commitTransaction(env *environment, tx *types.Transaction, rece
}

func (w *worker) commitTransactions(env *environment, txs *types.TransactionsByPriceAndNonce,
interruptCh chan int32, stopTimer *time.Timer) bool {
interruptCh chan int32, stopTimer *time.Timer) error {
gasLimit := env.header.GasLimit
if env.gasPool == nil {
env.gasPool = new(core.GasPool).AddGas(gasLimit)
Expand All @@ -766,7 +775,7 @@ func (w *worker) commitTransactions(env *environment, txs *types.TransactionsByP
}

var coalescedLogs []*types.Log
// initilise bloom processors
// initialize bloom processors
processorCapacity := 100
if txs.CurrentSize() < processorCapacity {
processorCapacity = txs.CurrentSize()
Expand All @@ -781,6 +790,7 @@ func (w *worker) commitTransactions(env *environment, txs *types.TransactionsByP
txCurr := &tx
w.prefetcher.PrefetchMining(txsPrefetch, env.header, env.gasPool.Gas(), env.state.CopyDoPrefetch(), *w.chain.GetVMConfig(), stopPrefetchCh, txCurr)

signal := commitInterruptNone
LOOP:
for {
// In the following three cases, we will interrupt the execution of the transaction.
Expand All @@ -791,25 +801,27 @@ LOOP:
// For the third case, the semi-finished work will be submitted to the consensus engine.
if interruptCh != nil {
select {
case reason, ok := <-interruptCh:
case signal, ok := <-interruptCh:
if !ok {
// should never be here, since interruptCh should not be read before
log.Warn("commit transactions stopped unknown")
}
return reason == commitInterruptNewHead
return signalToErr(signal)
default:
}
}
// If we don't have enough gas for any further transactions then we're done
if env.gasPool.Gas() < params.TxGas {
log.Trace("Not enough gas for further transactions", "have", env.gasPool, "want", params.TxGas)
signal = commitInterruptOutOfGas
break
}
if stopTimer != nil {
select {
case <-stopTimer.C:
log.Info("Not enough time for further transactions", "txs", len(env.txs))
stopTimer.Reset(0) // re-active the timer, in case it will be used later.
signal = commitInterruptTimeout
break LOOP
default:
}
Expand Down Expand Up @@ -885,7 +897,7 @@ LOOP:
}
w.pendingLogsFeed.Send(cpy)
}
return false
return signalToErr(signal)
}

// generateParams wraps various of settings for generating sealing task.
Expand Down Expand Up @@ -988,7 +1000,7 @@ func (w *worker) prepareWork(genParams *generateParams) (*environment, error) {
// fillTransactions retrieves the pending transactions from the txpool and fills them
// into the given sealing block. The transaction selection and ordering strategy can
// be customized with the plugin in the future.
func (w *worker) fillTransactions(interruptCh chan int32, env *environment) {
func (w *worker) fillTransactions(interruptCh chan int32, env *environment, stopTimer *time.Timer) (err error) {
// Split the pending transactions into locals and remotes
// Fill the block with all available pending transactions.
pending := w.eth.TxPool().Pending(false)
Expand All @@ -1000,26 +1012,23 @@ func (w *worker) fillTransactions(interruptCh chan int32, env *environment) {
}
}

var stopTimer *time.Timer
delay := w.engine.Delay(w.chain, env.header, &w.config.DelayLeftOver)
if delay != nil {
stopTimer = time.NewTimer(*delay)
log.Debug("Time left for mining work", "delay", delay.String())
defer stopTimer.Stop()
}

err = nil
if len(localTxs) > 0 {
txs := types.NewTransactionsByPriceAndNonce(env.signer, localTxs, env.header.BaseFee)
if w.commitTransactions(env, txs, interruptCh, stopTimer) {
err = w.commitTransactions(env, txs, interruptCh, stopTimer)
if err == errBlockInterruptedByNewHead || err == errBlockInterruptedByOutOfGas || err == errBlockInterruptedByTimeout {
return
}
}
if len(remoteTxs) > 0 {
txs := types.NewTransactionsByPriceAndNonce(env.signer, remoteTxs, env.header.BaseFee)
if w.commitTransactions(env, txs, interruptCh, stopTimer) {
err = w.commitTransactions(env, txs, interruptCh, stopTimer)
if err == errBlockInterruptedByNewHead || err == errBlockInterruptedByOutOfGas || err == errBlockInterruptedByTimeout {
return
}
}

return
}

// generateWork generates a sealing block based on the given parameters.
Expand All @@ -1030,7 +1039,7 @@ func (w *worker) generateWork(params *generateParams) (*types.Block, error) {
}
defer work.discard()

w.fillTransactions(nil, work)
w.fillTransactions(nil, work, nil)
block, _, err := w.engine.FinalizeAndAssemble(w.chain, work.header, work.state, work.txs, work.unclelist(), work.receipts)
return block, err
}
Expand All @@ -1049,24 +1058,117 @@ func (w *worker) commitWork(interruptCh chan int32, timestamp int64) {
}
coinbase = w.coinbase // Use the preset address as the fee recipient
}
work, err := w.prepareWork(&generateParams{
timestamp: uint64(timestamp),
coinbase: coinbase,
})
if err != nil {
return
}

// Fill pending transactions from the txpool
w.fillTransactions(interruptCh, work)
w.commit(work, w.fullTaskHook, true, start)
stopTimer := time.NewTimer(0)
defer stopTimer.Stop()
<-stopTimer.C // discard the initial tick

// validator can try several times to get the most profitable block,
// as long as the timestamp is not reached.
workList := make([]*environment, 0, 10)
var bestWork *environment
// workList clean up
defer func() {
for _, w := range workList {
// only keep the best work, discard others.
if w == bestWork {
continue
}
w.discard()
}
}()
LOOP:
for {
work, err := w.prepareWork(&generateParams{
timestamp: uint64(timestamp),
coinbase: coinbase,
})
if err != nil {
return
}

workList = append(workList, work)

delay := w.engine.Delay(w.chain, work.header, &w.config.DelayLeftOver)
if delay == nil {
log.Warn("commitWork delay is nil, something is wrong")
stopTimer = nil
} else if *delay <= 0 {
log.Debug("Not enough time for commitWork")
break
} else {
log.Debug("commitWork stopTimer", "block", work.header.Number,
"header time", time.Until(time.Unix(int64(work.header.Time), 0)),
"commit delay", *delay, "DelayLeftOver", w.config.DelayLeftOver)
stopTimer.Reset(*delay)
}

// subscribe before fillTransactions
txsCh := make(chan core.NewTxsEvent, txChanSize)
sub := w.eth.TxPool().SubscribeNewTxsEvent(txsCh)
defer sub.Unsubscribe()

// Fill pending transactions from the txpool
fillStart := time.Now()
err = w.fillTransactions(interruptCh, work, stopTimer)
fillDuration := time.Since(fillStart)
switch {
case errors.Is(err, errBlockInterruptedByNewHead):
// For Parlia, it will drop the work on receiving new block if it is not inturn.
if w.engine.DropOnNewBlock(work.header) {
log.Debug("drop the block, when new block is imported")
return
}
case errors.Is(err, errBlockInterruptedByTimeout):
// break the loop to get the best work
log.Debug("commitWork timeout")
break LOOP
case errors.Is(err, errBlockInterruptedByOutOfGas):
log.Debug("commitWork out of gas")
break LOOP
}

if interruptCh == nil || stopTimer == nil {
// it is single commit work, no need to try several time.
log.Info("commitWork interruptCh or stopTimer is nil")
break
}

select {
case <-txsCh:
delay := w.engine.Delay(w.chain, work.header, &w.config.DelayLeftOver)
log.Debug("commitWork txsCh arrived", "fillDuration", fillDuration.String(), "delay", delay.String())
if fillDuration > *delay {
// there may not have enough time for another fillTransactions
break LOOP
}
case <-stopTimer.C:
log.Debug("commitWork stopTimer expired")
break LOOP
case <-interruptCh:
log.Debug("commitWork interruptCh closed, new block imported or resubmit triggered")
return
}
}
// get the most profitable work
bestWork = workList[0]
bestReward := new(big.Int)
for i, w := range workList {
balance := w.state.GetBalance(consensus.SystemAddress)
log.Debug("Get the most profitable work", "index", i, "balance", balance, "bestReward", bestReward)
if balance.Cmp(bestReward) > 0 {
bestWork = w
bestReward = balance
}
}
w.commit(bestWork, w.fullTaskHook, true, start)

// Swap out the old work with the new one, terminating any leftover
// prefetcher processes in the mean time and starting a new one.
if w.current != nil {
w.current.discard()
}
w.current = work
w.current = bestWork
}

// commit runs any post-transaction state modifications, assembles the final block
Expand Down Expand Up @@ -1167,3 +1269,22 @@ func (w *worker) postSideBlock(event core.ChainSideEvent) {
case <-w.exitCh:
}
}

// signalToErr converts the interruption signal to a concrete error type for return.
// The given signal must be a valid interruption signal.
func signalToErr(signal int32) error {
switch signal {
case commitInterruptNone:
return nil
case commitInterruptNewHead:
return errBlockInterruptedByNewHead
case commitInterruptResubmit:
return errBlockInterruptedByRecommit
case commitInterruptTimeout:
return errBlockInterruptedByTimeout
case commitInterruptOutOfGas:
return errBlockInterruptedByOutOfGas
default:
panic(fmt.Errorf("undefined signal %d", signal))
}
}

0 comments on commit 5c88f9f

Please sign in to comment.