Skip to content
This repository has been archived by the owner on Jun 9, 2024. It is now read-only.

Commit

Permalink
perf(handler): add retry logic to broadcasting to cometbft mempool (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
itsdevbear committed Oct 23, 2023
1 parent 216cf48 commit 12f300b
Showing 1 changed file with 63 additions and 19 deletions.
82 changes: 63 additions & 19 deletions cosmos/txpool/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ package txpool
import (
"errors"
"sync/atomic"
"time"

"cosmossdk.io/log"

Expand All @@ -36,7 +37,11 @@ import (

// txChanSize is the size of channel listening to NewTxsEvent. The number is referenced from the
// size of tx pool.
const txChanSize = 4096
const (
txChanSize = 4096
maxRetries = 5
retryDelay = 50 * time.Millisecond
)

// SdkTx is used to generate mocks.
type SdkTx interface {
Expand Down Expand Up @@ -64,6 +69,12 @@ type Subscription interface {
event.Subscription
}

// failedTx represents a transaction that failed to broadcast.
type failedTx struct {
tx *coretypes.Transaction
retries int
}

// handler listens for new insertions into the geth txpool and broadcasts them to the CometBFT
// layer for p2p and ABCI.
type handler struct {
Expand All @@ -78,6 +89,9 @@ type handler struct {
stopCh chan struct{}
txsSub Subscription
running atomic.Bool

// Queue for failed transactions
failedTxs chan *failedTx
}

// newHandler creates a new handler.
Expand All @@ -91,6 +105,7 @@ func newHandler(
txPool: txPool,
txsCh: make(chan core.NewTxsEvent, txChanSize),
stopCh: make(chan struct{}),
failedTxs: make(chan *failedTx, txChanSize),
}
return h
}
Expand All @@ -100,7 +115,8 @@ func (h *handler) Start() error {
if h.running.Load() {
return errors.New("handler already started")
}
go h.eventLoop()
go h.mainLoop()
go h.failedLoop() // Start the retry policy
return nil
}

Expand All @@ -109,17 +125,19 @@ func (h *handler) Stop() error {
if !h.Running() {
return errors.New("handler already stopped")
}

// Push two stop signals to the stop channel to ensure that both loops stop.
h.stopCh <- struct{}{}
h.stopCh <- struct{}{}
return nil
}

// start handles the subscription to the txpool and broadcasts transactions.
func (h *handler) eventLoop() {
func (h *handler) mainLoop() {
// Connect to the subscription.
h.txsSub = h.txPool.SubscribeNewTxsEvent(h.txsCh)
h.logger.With("module", "txpool-handler").Info("starting txpool handler")
h.running.Store(true)

// Handle events.
var err error
for {
Expand All @@ -135,6 +153,25 @@ func (h *handler) eventLoop() {
}
}

// failedLoop will periodically attempt to re-broadcast failed transactions.
func (h *handler) failedLoop() {
for {
select {
case <-h.stopCh:
return
case failed := <-h.failedTxs:
if failed.retries == 0 {
h.logger.Error("failed to broadcast transaction after max retries", "tx", maxRetries)
continue
}
h.broadcastTransaction(failed.tx, failed.retries-1)
}

// We slightly space out the retries in order to prioritize new transactions.
time.Sleep(retryDelay)
}
}

// Running returns true if the handler is running.
func (h *handler) Running() bool {
return h.running.Load()
Expand All @@ -156,28 +193,35 @@ func (h *handler) stop(err error) {
// Close channels.
close(h.txsCh)
close(h.stopCh)
close(h.failedTxs)
}

// broadcastTransactions will propagate a batch of transactions to the CometBFT mempool.
func (h *handler) broadcastTransactions(txs coretypes.Transactions) {
h.logger.Debug("broadcasting transactions", "num_txs", len(txs))
for _, signedEthTx := range txs {
// Serialize the transaction to Bytes
txBytes, err := h.serializer.ToSdkTxBytes(signedEthTx, signedEthTx.Gas())
if err != nil {
h.logger.Error("failed to serialize transaction", "err", err)
continue
}
h.broadcastTransaction(signedEthTx, maxRetries)
}
}

// Send the transaction to the CometBFT mempool, which will gossip it to peers via
// CometBFT's p2p layer.
rsp, err := h.clientCtx.BroadcastTxSync(txBytes)
// broadcastTransaction will propagate a transaction to the CometBFT mempool.
func (h *handler) broadcastTransaction(tx *coretypes.Transaction, retries int) {
txBytes, err := h.serializer.ToSdkTxBytes(tx, tx.Gas())
if err != nil {
h.logger.Error("failed to serialize transaction", "err", err)
return
}

// If we see an ABCI response error.
if rsp != nil && rsp.Code != 0 {
h.logger.Error("failed to broadcast transaction", "rsp", rsp, "err", err)
} else if err != nil {
h.logger.Error("error on transactions broadcast", "err", err)
}
// Send the transaction to the CometBFT mempool, which will gossip it to peers via
// CometBFT's p2p layer.
rsp, err := h.clientCtx.BroadcastTxSync(txBytes)

// If we see an ABCI response error.
if rsp != nil && rsp.Code != 0 {
h.logger.Error("failed to broadcast transaction", "rsp", rsp, "err", err)
h.failedTxs <- &failedTx{tx: tx, retries: retries}
} else if err != nil {
h.logger.Error("error on transactions broadcast", "err", err)
h.failedTxs <- &failedTx{tx: tx, retries: retries}
}
}

0 comments on commit 12f300b

Please sign in to comment.