diff --git a/go.mod b/go.mod index 0c738651e..fea57f68c 100644 --- a/go.mod +++ b/go.mod @@ -47,6 +47,7 @@ require ( require ( github.com/Finschia/r2ishiguro_vrf v0.1.2 github.com/bufbuild/buf v1.25.0 + github.com/creachadair/taskgroup v0.3.2 github.com/golangci/golangci-lint v1.53.3 github.com/klauspost/pgzip v1.2.6 // indirect github.com/oasisprotocol/curve25519-voi v0.0.0-20230110094441-db37f07504ce diff --git a/go.sum b/go.sum index eb8f2c86c..f3665b86e 100644 --- a/go.sum +++ b/go.sum @@ -302,6 +302,8 @@ github.com/cpuguy83/go-md2man/v2 v2.0.0/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsr github.com/cpuguy83/go-md2man/v2 v2.0.1/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o= github.com/cpuguy83/go-md2man/v2 v2.0.2 h1:p1EgwI/C7NhT0JmVkwCD2ZBK8j4aeHQX2pMHHBfMQ6w= github.com/cpuguy83/go-md2man/v2 v2.0.2/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o= +github.com/creachadair/taskgroup v0.3.2 h1:zlfutDS+5XG40AOxcHDSThxKzns8Tnr9jnr6VqkYlkM= +github.com/creachadair/taskgroup v0.3.2/go.mod h1:wieWwecHVzsidg2CsUnFinW1faVN4+kq+TDlRJQ0Wbk= github.com/creack/pty v1.1.7/go.mod h1:lj5s0c3V2DBrqTV7llrYr5NG6My20zk30Fl46Y7DoTY= github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/creack/pty v1.1.11/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= diff --git a/mempool/v1/mempool.go b/mempool/v1/mempool.go index 96925e219..41687890b 100644 --- a/mempool/v1/mempool.go +++ b/mempool/v1/mempool.go @@ -4,12 +4,13 @@ package v1 import ( "fmt" - "reflect" + "runtime" "sort" "sync" "sync/atomic" "time" + "github.com/creachadair/taskgroup" abci "github.com/tendermint/tendermint/abci/types" "github.com/tendermint/tendermint/config" "github.com/tendermint/tendermint/libs/clist" @@ -42,8 +43,7 @@ type TxMempool struct { cache mempool.TxCache // seen transactions // Atomically-updated fields - txsBytes int64 // atomic: the total size of all transactions in the mempool, in bytes - txRecheck int64 // atomic: the number of pending recheck calls + txsBytes int64 // atomic: the total size of all transactions in the mempool, in bytes // Synchronized fields, protected by mtx. mtx *sync.RWMutex @@ -84,8 +84,6 @@ func NewTxMempool( txmp.cache = mempool.NewLRUTxCache(cfg.CacheSize) } - proxyAppConn.SetResponseCallback(txmp.recheckTxCallback) - for _, opt := range options { opt(txmp) } @@ -220,30 +218,23 @@ func (txmp *TxMempool) CheckTx(tx types.Tx, cb func(*abci.Response), txInfo memp return err } - // Initiate an ABCI CheckTx for this transaction. The callback is - // responsible for adding the transaction to the pool if it survives. - // - // N.B.: We have to issue the call outside the lock. In a local client, - // even an "async" call invokes its callback immediately which will make - // the callback deadlock trying to acquire the same lock. This isn't a - // problem with out-of-process calls, but this has to work for both. - reqRes := txmp.proxyAppConn.CheckTxAsync(abci.RequestCheckTx{Tx: tx}) - if err := txmp.proxyAppConn.FlushSync(); err != nil { + // Invoke an ABCI CheckTx for this transaction. + rsp, err := txmp.proxyAppConn.CheckTxSync(abci.RequestCheckTx{Tx: tx}) + if err != nil { + txmp.cache.Remove(tx) return err } - reqRes.SetCallback(func(res *abci.Response) { - wtx := &WrappedTx{ - tx: tx, - hash: tx.Key(), - timestamp: time.Now().UTC(), - height: height, - } - wtx.SetPeer(txInfo.SenderID) - txmp.initialTxCallback(wtx, res) - if cb != nil { - cb(res) - } - }) + wtx := &WrappedTx{ + tx: tx, + hash: tx.Key(), + timestamp: time.Now().UTC(), + height: height, + } + wtx.SetPeer(txInfo.SenderID) + txmp.addNewTransaction(wtx, rsp) + if cb != nil { + cb(&abci.Response{Value: &abci.Response_CheckTx{CheckTx: rsp}}) + } return nil } @@ -299,10 +290,6 @@ func (txmp *TxMempool) Flush() { cur = next } txmp.cache.Reset() - - // Discard any pending recheck calls that may be in flight. The calls will - // still complete, but will have no effect on the mempool. - atomic.StoreInt64(&txmp.txRecheck, 0) } // allEntriesSorted returns a slice of all the transactions currently in the @@ -398,12 +385,6 @@ func (txmp *TxMempool) Update( newPreFn mempool.PreCheckFunc, newPostFn mempool.PostCheckFunc, ) error { - // TODO(creachadair): This would be a nice safety check but requires Go 1.18. - // // Safety check: The caller is required to hold the lock. - // if txmp.mtx.TryLock() { - // txmp.mtx.Unlock() - // panic("mempool: Update caller does not hold the lock") - // } // Safety check: Transactions and responses must match in number. if len(blockTxs) != len(deliverTxResponses) { panic(fmt.Sprintf("mempool: got %d transactions but %d DeliverTx responses", @@ -451,9 +432,9 @@ func (txmp *TxMempool) Update( return nil } -// initialTxCallback handles the ABCI CheckTx response for the first time a +// addNewTransaction handles the ABCI CheckTx response for the first time a // transaction is added to the mempool. A recheck after a block is committed -// goes to the default callback (see recheckTxCallback). +// goes to handleRecheckResult. // // If either the application rejected the transaction or a post-check hook is // defined and rejects the transaction, it is discarded. @@ -464,31 +445,22 @@ func (txmp *TxMempool) Update( // transactions are evicted. // // Finally, the new transaction is added and size stats updated. -func (txmp *TxMempool) initialTxCallback(wtx *WrappedTx, res *abci.Response) { - checkTxRes, ok := res.Value.(*abci.Response_CheckTx) - if !ok { - txmp.logger.Error("mempool: received incorrect result type in CheckTx callback", - "expected", reflect.TypeOf(&abci.Response_CheckTx{}).Name(), - "got", reflect.TypeOf(res.Value).Name(), - ) - return - } - +func (txmp *TxMempool) addNewTransaction(wtx *WrappedTx, checkTxRes *abci.ResponseCheckTx) { txmp.mtx.Lock() defer txmp.mtx.Unlock() var err error if txmp.postCheck != nil { - err = txmp.postCheck(wtx.tx, checkTxRes.CheckTx) + err = txmp.postCheck(wtx.tx, checkTxRes) } - if err != nil || checkTxRes.CheckTx.Code != abci.CodeTypeOK { + if err != nil || checkTxRes.Code != abci.CodeTypeOK { txmp.logger.Info( "rejected bad transaction", "priority", wtx.Priority(), "tx", fmt.Sprintf("%X", wtx.tx.Hash()), "peer_id", wtx.peers, - "code", checkTxRes.CheckTx.Code, + "code", checkTxRes.Code, "post_check_err", err, ) @@ -503,13 +475,13 @@ func (txmp *TxMempool) initialTxCallback(wtx *WrappedTx, res *abci.Response) { // If there was a post-check error, record its text in the result for // debugging purposes. if err != nil { - checkTxRes.CheckTx.MempoolError = err.Error() + checkTxRes.MempoolError = err.Error() } return } - priority := checkTxRes.CheckTx.Priority - sender := checkTxRes.CheckTx.Sender + priority := checkTxRes.Priority + sender := checkTxRes.Sender // Disallow multiple concurrent transactions from the same sender assigned // by the ABCI application. As a special case, an empty sender is not @@ -523,7 +495,7 @@ func (txmp *TxMempool) initialTxCallback(wtx *WrappedTx, res *abci.Response) { "tx", fmt.Sprintf("%X", w.tx.Hash()), "sender", sender, ) - checkTxRes.CheckTx.MempoolError = + checkTxRes.MempoolError = fmt.Sprintf("rejected valid incoming transaction; tx already exists for sender %q (%X)", sender, w.tx.Hash()) txmp.metrics.RejectedTxs.Add(1) @@ -558,7 +530,7 @@ func (txmp *TxMempool) initialTxCallback(wtx *WrappedTx, res *abci.Response) { "tx", fmt.Sprintf("%X", wtx.tx.Hash()), "err", err.Error(), ) - checkTxRes.CheckTx.MempoolError = + checkTxRes.MempoolError = fmt.Sprintf("rejected valid incoming transaction; mempool is full (%X)", wtx.tx.Hash()) txmp.metrics.RejectedTxs.Add(1) @@ -604,7 +576,7 @@ func (txmp *TxMempool) initialTxCallback(wtx *WrappedTx, res *abci.Response) { } } - wtx.SetGasWanted(checkTxRes.CheckTx.GasWanted) + wtx.SetGasWanted(checkTxRes.GasWanted) wtx.SetPriority(priority) wtx.SetSender(sender) txmp.insertTx(wtx) @@ -631,33 +603,14 @@ func (txmp *TxMempool) insertTx(wtx *WrappedTx) { atomic.AddInt64(&txmp.txsBytes, wtx.Size()) } -// recheckTxCallback handles the responses from ABCI CheckTx calls issued -// during the recheck phase of a block Update. It updates the recheck counter -// and removes any transactions invalidated by the application. +// handleRecheckResult handles the responses from ABCI CheckTx calls issued +// during the recheck phase of a block Update. It removes any transactions +// invalidated by the application. // -// This callback is NOT executed for the initial CheckTx on a new transaction; -// that case is handled by initialTxCallback instead. -func (txmp *TxMempool) recheckTxCallback(req *abci.Request, res *abci.Response) { - checkTxRes, ok := res.Value.(*abci.Response_CheckTx) - if !ok { - // Don't log this; this is the default callback and other response types - // can safely be ignored. - return - } - - // Check whether we are expecting recheck responses at this point. - // If not, we will ignore the response, this usually means the mempool was Flushed. - // If this is the "last" pending recheck, trigger a notification when it's been processed. - numLeft := atomic.AddInt64(&txmp.txRecheck, -1) - if numLeft == 0 { - defer txmp.notifyTxsAvailable() // notify waiters on return, if mempool is non-empty - } else if numLeft < 0 { - return - } - +// This method is NOT executed for the initial CheckTx on a new transaction; +// that case is handled by addNewTransaction instead. +func (txmp *TxMempool) handleRecheckResult(tx types.Tx, checkTxRes *abci.ResponseCheckTx) { txmp.metrics.RecheckTimes.Add(1) - tx := types.Tx(req.GetCheckTx().Tx) - txmp.mtx.Lock() defer txmp.mtx.Unlock() @@ -673,11 +626,11 @@ func (txmp *TxMempool) recheckTxCallback(req *abci.Request, res *abci.Response) // If a postcheck hook is defined, call it before checking the result. var err error if txmp.postCheck != nil { - err = txmp.postCheck(tx, checkTxRes.CheckTx) + err = txmp.postCheck(tx, checkTxRes) } - if checkTxRes.CheckTx.Code == abci.CodeTypeOK && err == nil { - wtx.SetPriority(checkTxRes.CheckTx.Priority) + if checkTxRes.Code == abci.CodeTypeOK && err == nil { + wtx.SetPriority(checkTxRes.Priority) return // N.B. Size of mempool did not change } @@ -686,7 +639,7 @@ func (txmp *TxMempool) recheckTxCallback(req *abci.Request, res *abci.Response) "priority", wtx.Priority(), "tx", fmt.Sprintf("%X", wtx.tx.Hash()), "err", err, - "code", checkTxRes.CheckTx.Code, + "code", checkTxRes.Code, ) txmp.removeTxByElement(elt) txmp.metrics.FailedTxs.Add(1) @@ -711,29 +664,43 @@ func (txmp *TxMempool) recheckTransactions() { "num_txs", txmp.Size(), "height", txmp.height, ) - // N.B.: We have to issue the calls outside the lock. In a local client, - // even an "async" call invokes its callback immediately which will make the - // callback deadlock trying to acquire the same lock. This isn't a problem - // with out-of-process calls, but this has to work for both. - txmp.mtx.Unlock() - defer txmp.mtx.Lock() - atomic.StoreInt64(&txmp.txRecheck, int64(txmp.txs.Len())) + // Collect transactions currently in the mempool requiring recheck. + wtxs := make([]*WrappedTx, 0, txmp.txs.Len()) for e := txmp.txs.Front(); e != nil; e = e.Next() { - wtx := e.Value.(*WrappedTx) - - // The response for this CheckTx is handled by the default recheckTxCallback. - _ = txmp.proxyAppConn.CheckTxAsync(abci.RequestCheckTx{ - Tx: wtx.tx, - Type: abci.CheckTxType_Recheck, - }) - if err := txmp.proxyAppConn.FlushSync(); err != nil { - atomic.AddInt64(&txmp.txRecheck, -1) - txmp.logger.Error("mempool: error flushing re-CheckTx", "key", wtx.tx.Key(), "err", err) + wtxs = append(wtxs, e.Value.(*WrappedTx)) + } + + // Issue CheckTx calls for each remaining transaction, and when all the + // rechecks are complete signal watchers that transactions may be available. + go func() { + g, start := taskgroup.New(nil).Limit(2 * runtime.NumCPU()) + + for _, wtx := range wtxs { + wtx := wtx + start(func() error { + // The response for this CheckTx is handled by the default recheckTxCallback. + rsp, err := txmp.proxyAppConn.CheckTxSync(abci.RequestCheckTx{ + Tx: wtx.tx, + Type: abci.CheckTxType_Recheck, + }) + if err != nil { + txmp.logger.Error("failed to execute CheckTx during recheck", + "err", err, "hash", fmt.Sprintf("%x", wtx.tx.Hash())) + } else { + txmp.handleRecheckResult(wtx.tx, rsp) + } + return nil + }) } - } + _ = txmp.proxyAppConn.FlushAsync() - txmp.proxyAppConn.FlushAsync() + // When recheck is complete, trigger a notification for more transactions. + _ = g.Wait() + txmp.mtx.Lock() + defer txmp.mtx.Unlock() + txmp.notifyTxsAvailable() + }() } // canAddTx returns an error if we cannot insert the provided *WrappedTx into