Skip to content

Commit

Permalink
[R4R]prune bundle automatically && add error code (bnb-chain#14)
Browse files Browse the repository at this point in the history
* prune bundle automatically

* resolve comments

Co-authored-by: unclereal <walt@nodereal.io>
  • Loading branch information
unclezoro and unclereal authored Dec 13, 2021
1 parent bf027d2 commit 77abd63
Show file tree
Hide file tree
Showing 6 changed files with 192 additions and 36 deletions.
18 changes: 18 additions & 0 deletions core/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,3 +75,21 @@ var (
// current network configuration.
ErrTxTypeNotSupported = types.ErrTxTypeNotSupported
)

type CustomError struct {
msg string
code int
}

func NewCustomError(msg string, code int) *CustomError {
return &CustomError{
msg: msg,
code: code,
}
}

func (e *CustomError) ErrorCode() int { return e.code }

func (e *CustomError) Error() string {
return e.msg
}
35 changes: 30 additions & 5 deletions core/tx_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package core

import (
"errors"
"fmt"
"math"
"math/big"
"sort"
Expand Down Expand Up @@ -53,6 +54,9 @@ const (
// more expensive to propagate; larger transactions also take more resources
// to validate whether they fit into the pool or not.
txMaxSize = 4 * txSlotSize // 128KB

// Bundle Error Code
GasPriceTooLowErrorCode = -38011
)

var (
Expand Down Expand Up @@ -88,8 +92,10 @@ var (
// making the transaction invalid, rather a DOS protection.
ErrOversizedData = errors.New("oversized data")

// ErrorBundlePoolIsFull is returned if the number of bundle exceed the limit
ErrorBundlePoolIsFull = errors.New("bundle pool is full")
BundleAlreadyExistError = NewCustomError("bundle already exist", -38001)
BundlePoolFullError = NewCustomError("bundle pool is full", -38002)
SimulatorMissingError = NewCustomError("bundle simulator is missing", -38003)
DifferentSendersError = NewCustomError("only one tx sender is allowed within one bundle", -38010)
)

var (
Expand Down Expand Up @@ -574,10 +580,29 @@ func (pool *TxPool) PruneBundle(bundle common.Hash) {
delete(pool.mevBundles, bundle)
}

// For testing
func (pool *TxPool) SetBundle(bundleHash common.Hash, bundle *types.MevBundle) {
pool.mu.Lock()
defer pool.mu.Unlock()
pool.mevBundles[bundleHash] = bundle
}

// AddMevBundle adds a mev bundle to the pool
func (pool *TxPool) AddMevBundle(txs types.Transactions, maxBlockNumber *big.Int, minTimestamp, maxTimestamp uint64, revertingTxHashes []common.Hash) (common.Hash, error) {
senders := make(map[common.Address]bool)
for _, tx := range txs {
txSender, _ := types.Sender(pool.signer, tx)
senders[txSender] = true
if tx.GasPrice() == nil || tx.GasPrice().Cmp(big.NewInt(int64(pool.config.PriceLimit))) < 0 {
return common.Hash{}, NewCustomError(fmt.Sprintf("tx gas price too low, expected %d at least", pool.config.PriceLimit), GasPriceTooLowErrorCode)
}
if len(senders) > 1 {
return common.Hash{}, DifferentSendersError
}
}

if pool.simulator == nil {
return common.Hash{}, errors.New("bundle simulator is nil")
return common.Hash{}, SimulatorMissingError
}
bundle := types.MevBundle{
Txs: txs,
Expand All @@ -600,7 +625,7 @@ func (pool *TxPool) AddMevBundle(txs types.Transactions, maxBlockNumber *big.Int
pool.mu.Lock()
defer pool.mu.Unlock()
if _, ok := pool.mevBundles[hash]; ok {
return common.Hash{}, errors.New("bundle already exist")
return common.Hash{}, BundleAlreadyExistError
}
if len(pool.mevBundles) > int(pool.config.BundleSlot) {
leastPrice := big.NewInt(math.MaxInt64)
Expand All @@ -612,7 +637,7 @@ func (pool *TxPool) AddMevBundle(txs types.Transactions, maxBlockNumber *big.Int
}
}
if bundle.Price.Cmp(leastPrice) < 0 {
return common.Hash{}, ErrorBundlePoolIsFull
return common.Hash{}, BundlePoolFullError
}
delete(pool.mevBundles, leastBundleHash)
}
Expand Down
2 changes: 1 addition & 1 deletion eth/api_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ func (b *EthAPIBackend) SendBundle(ctx context.Context, txs types.Transactions,

func (b *EthAPIBackend) BundlePrice() (*big.Int, error) {
bundles := b.eth.txPool.AllMevBundles()
if len(bundles) == 0 {
if len(bundles) < b.eth.config.Miner.MaxSimulatBundles/2 {
return big.NewInt(b.eth.config.Miner.MevGasPriceFloor), nil
}
sort.SliceStable(bundles, func(i, j int) bool {
Expand Down
36 changes: 10 additions & 26 deletions internal/ethapi/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"errors"
"fmt"
"math/big"
"sort"
"strings"
"time"

Expand Down Expand Up @@ -55,10 +54,12 @@ import (

const (
UnHealthyTimeout = 5 * time.Second
MaxBundleBlockDelay = 1200
MaxBundleTimeDelay = 60 * 60 // second
MaxBundleBlockDelay = 100
MaxBundleTimeDelay = 5 * 60 // second
MaxOracleBlocks = 21
DropBlocks = 3

InvalidBundleParamError = -38000
)

// PublicEthereumAPI provides an API to access Ethereum related information.
Expand Down Expand Up @@ -2521,48 +2522,31 @@ func (s *PrivateTxBundleAPI) BundlePrice(ctx context.Context) (*big.Int, error)
// SendBundle will add the signed transaction to the transaction pool.
// The sender is responsible for signing the transaction and using the correct nonce and ensuring validity
func (s *PrivateTxBundleAPI) SendBundle(ctx context.Context, args SendBundleArgs) (common.Hash, error) {
gasUsedRatio := make([]int, 0, MaxOracleBlocks)
block := s.b.CurrentBlock()
var err error
for i := 0; i < MaxOracleBlocks && block.NumberU64() > 1; i++ {
gasUsedRatio = append(gasUsedRatio, int(block.GasUsed()*100/block.GasLimit()))
block, err = s.b.BlockByHash(context.Background(), block.ParentHash())
if err != nil {
break
}
}
sort.Ints(gasUsedRatio)
validGasUsedRatio := gasUsedRatio
if len(gasUsedRatio) > DropBlocks {
validGasUsedRatio = gasUsedRatio[DropBlocks:]
}
if len(validGasUsedRatio) == 0 {
return common.Hash{}, errors.New("no enough example ratio")
}

var txs types.Transactions
if len(args.Txs) == 0 {
return common.Hash{}, errors.New("bundle missing txs")
return common.Hash{}, core.NewCustomError("bundle missing txs", InvalidBundleParamError)
}
if args.MaxBlockNumber == 0 && (args.MaxTimestamp == nil || *args.MaxTimestamp == 0) {
maxTimeStamp := uint64(time.Now().Unix()) + MaxBundleTimeDelay
args.MaxTimestamp = &maxTimeStamp
}
currentBlock := s.b.CurrentBlock()
if args.MaxBlockNumber != 0 && args.MaxBlockNumber.Int64() > int64(currentBlock.NumberU64())+MaxBundleBlockDelay {
return common.Hash{}, errors.New("the maxBlockNumber should not be lager than currentBlockNum + 1200")
return common.Hash{}, core.NewCustomError("the maxBlockNumber should not be lager than currentBlockNum + 100", InvalidBundleParamError)
}
if args.MaxTimestamp != nil && *args.MaxTimestamp > currentBlock.Time()+uint64(MaxBundleTimeDelay) {
return common.Hash{}, errors.New("the maxTimestamp should not be later than currentBlockTimestamp + 1 hour")
return common.Hash{}, core.NewCustomError("the maxTimestamp should not be later than currentBlockTimestamp + 5 minutes", InvalidBundleParamError)
}
if args.MaxTimestamp != nil && args.MinTimestamp != nil && *args.MaxTimestamp != 0 && *args.MinTimestamp != 0 {
if *args.MaxTimestamp <= *args.MinTimestamp {
return common.Hash{}, errors.New("the maxTimestamp should not be less than minTimestamp")
return common.Hash{}, core.NewCustomError("the maxTimestamp should not be less than minTimestamp", InvalidBundleParamError)
}
}
if args.MinTimestamp != nil && *args.MinTimestamp > currentBlock.Time()+uint64(MaxBundleTimeDelay) {
return common.Hash{}, errors.New("the minTimestamp should not be later than currentBlockTimestamp + 1 hour")
return common.Hash{}, core.NewCustomError("the minTimestamp should not be later than currentBlockTimestamp + 5 minutes", InvalidBundleParamError)
}

for _, encodedTx := range args.Txs {
tx := new(types.Transaction)
if err := tx.UnmarshalBinary(encodedTx); err != nil {
Expand Down
57 changes: 53 additions & 4 deletions miner/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,10 +79,19 @@ const (

// staleThreshold is the maximum depth of the acceptable stale block.
staleThreshold = 11

// bundlePruneInterval is the interval to do bundle prune check
bundlePruneInterval = 3 * time.Second

// ErrorCode
SimulatorTxFailedErrorCode = -38006
)

var (
commitTxsTimer = metrics.NewRegisteredTimer("worker/committxs", nil)

SimulatorReceiptFailedError = core.NewCustomError("simulate tx success, while status of receipt is failed", -38004)
BundlePriceTooLowError = core.NewCustomError("no enough gas price for the bundle", -38005)
)

// environment is the worker's current environment and holds all of the current state information.
Expand Down Expand Up @@ -241,6 +250,9 @@ func newWorker(config *Config, chainConfig *params.ChainConfig, engine consensus
go worker.newWorkLoop(recommit)
go worker.resultLoop()
go worker.taskLoop()
if worker.config.IsFlashbots {
go worker.bundlePruneLoop()
}

// Submit first work to initialize pending state.
if init {
Expand Down Expand Up @@ -547,6 +559,41 @@ func (w *worker) mainLoop() {
}
}

func (w *worker) bundlePruneLoop() {
ticker := time.NewTicker(bundlePruneInterval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
parent := w.chain.CurrentBlock()
num := parent.Number()
header := &types.Header{
ParentHash: parent.Hash(),
Number: num.Add(num, common.Big1),
GasLimit: core.CalcGasLimit(parent, w.config.GasFloor, w.config.GasCeil),
Extra: w.extra,
Time: uint64(time.Now().Unix()),
Difficulty: big.NewInt(2),
}
pruneBundles := func() {
bundles, err := w.eth.TxPool().MevBundles(num.Add(num, common.Big1), uint64(time.Now().Unix()))
log.Info("Total bundles", "n", len(bundles))
if err != nil {
log.Error("Failed to fetch pending transactions", "err", err)
return
}
sort.SliceStable(bundles, func(i, j int) bool {
return bundles[j].Price.Cmp(bundles[i].Price) < 0
})
w.simulateBundles(bundles, w.coinbase, parent, header)
}
pruneBundles()
case <-w.exitCh:
return
}
}
}

// taskLoop is a standalone goroutine to fetch sealing task from the generator and
// push them to consensus engine.
func (w *worker) taskLoop() {
Expand Down Expand Up @@ -979,7 +1026,7 @@ func (w *worker) commitNewWork(interrupt *int32, noempty bool, timestamp int64)
}

bundles, err := w.eth.TxPool().MevBundles(header.Number, header.Time)
log.Error("Total bundles", "n", len(bundles))
log.Info("Total bundles", "n", len(bundles))
if err != nil {
log.Error("Failed to fetch pending transactions", "err", err)
return
Expand Down Expand Up @@ -1195,18 +1242,20 @@ func (w *worker) computeBundleGas(bundle types.MevBundle, parent *types.Block, h
if err == core.ErrGasLimitReached && !pruneGasExceed {
log.Warn("bundle gas limit exceed", "hash", bundle.Hash.String(), "err", err)
} else {

log.Warn("Prune bundle because of err", "hash", bundle.Hash.String(), "err", err)
w.eth.TxPool().PruneBundle(bundle.Hash)
}
}
return simulatedBundle{}, err
return simulatedBundle{}, core.NewCustomError(err.Error(), SimulatorTxFailedErrorCode)
}
if receipt.Status == types.ReceiptStatusFailed && !containsHash(bundle.RevertingTxHashes, receipt.TxHash) {
if prune {
log.Warn("Prune bundle because of failed tx", "hash", bundle.Hash.String())

w.eth.TxPool().PruneBundle(bundle.Hash)
}
return simulatedBundle{}, errors.New("failed tx")
return simulatedBundle{}, SimulatorReceiptFailedError
}

totalGasUsed += receipt.GasUsed
Expand All @@ -1228,7 +1277,7 @@ func (w *worker) computeBundleGas(bundle types.MevBundle, parent *types.Block, h
log.Warn("Prune bundle because of not enough gas price", "hash", bundle.Hash.String())
w.eth.TxPool().PruneBundle(bundle.Hash)
}
return simulatedBundle{}, errors.New("no enough gas price")
return simulatedBundle{}, BundlePriceTooLowError
}

return simulatedBundle{
Expand Down
Loading

0 comments on commit 77abd63

Please sign in to comment.