diff --git a/rpc/core/mempool.go b/rpc/core/mempool.go index 0555389f8..4a7d80dfd 100644 --- a/rpc/core/mempool.go +++ b/rpc/core/mempool.go @@ -38,19 +38,31 @@ func BroadcastTxAsync(ctx *rpctypes.Context, tx types.Tx) (*ctypes.ResultBroadca // DeliverTx result. // More: https://docs.tendermint.com/master/rpc/#/Tx/broadcast_tx_sync func BroadcastTxSync(ctx *rpctypes.Context, tx types.Tx) (*ctypes.ResultBroadcastTx, error) { - res, err := env.Mempool.CheckTxSync(tx, mempl.TxInfo{}) + resCh := make(chan *ocabci.Response, 1) + err := env.Mempool.CheckTxSync(tx, func(res *ocabci.Response) { + select { + case <-ctx.Context().Done(): + case resCh <- res: + } + + }, mempl.TxInfo{}) if err != nil { return nil, err } - r := res.GetCheckTx() - return &ctypes.ResultBroadcastTx{ - Code: r.Code, - Data: r.Data, - Log: r.Log, - Codespace: r.Codespace, - MempoolError: r.MempoolError, - Hash: tx.Hash(), - }, nil + + select { + case <-ctx.Context().Done(): + return nil, fmt.Errorf("broadcast confirmation not received: %w", ctx.Context().Err()) + case res := <-resCh: + r := res.GetCheckTx() + return &ctypes.ResultBroadcastTx{ + Code: r.Code, + Data: r.Data, + Log: r.Log, + Codespace: r.Codespace, + Hash: tx.Hash(), + }, nil + } } // BroadcastTxCommit returns with the responses from CheckTx and DeliverTx. @@ -80,53 +92,64 @@ func BroadcastTxCommit(ctx *rpctypes.Context, tx types.Tx) (*ctypes.ResultBroadc } }() - // Broadcast tx and check tx - checkTxResMsg, err := env.Mempool.CheckTxSync(tx, mempl.TxInfo{}) + // Broadcast tx and wait for CheckTx result + checkTxResCh := make(chan *ocabci.Response, 1) + err = env.Mempool.CheckTxSync(tx, func(res *ocabci.Response) { + select { + case <-ctx.Context().Done(): + case checkTxResCh <- res: + } + }, mempl.TxInfo{}) if err != nil { env.Logger.Error("Error on broadcastTxCommit", "err", err) return nil, fmt.Errorf("error on broadcastTxCommit: %v", err) } - checkTxRes := checkTxResMsg.GetCheckTx() - if checkTxRes.Code != ocabci.CodeTypeOK { - return &ctypes.ResultBroadcastTxCommit{ - CheckTx: *checkTxRes, - DeliverTx: abci.ResponseDeliverTx{}, - Hash: tx.Hash(), - }, nil - } - - // Wait for the tx to be included in a block or timeout. select { - case msg := <-deliverTxSub.Out(): // The tx was included in a block. - deliverTxRes := msg.Data().(types.EventDataTx) - return &ctypes.ResultBroadcastTxCommit{ - CheckTx: *checkTxRes, - DeliverTx: deliverTxRes.Result, - Hash: tx.Hash(), - Height: deliverTxRes.Height, - }, nil - case <-deliverTxSub.Cancelled(): - var reason string - if deliverTxSub.Err() == nil { - reason = "Ostracon exited" - } else { - reason = deliverTxSub.Err().Error() + case <-ctx.Context().Done(): + return nil, fmt.Errorf("broadcast confirmation not received: %w", ctx.Context().Err()) + case checkTxResMsg := <-checkTxResCh: + checkTxRes := checkTxResMsg.GetCheckTx() + if checkTxRes.Code != abci.CodeTypeOK { + return &ctypes.ResultBroadcastTxCommit{ + CheckTx: *checkTxRes, + DeliverTx: abci.ResponseDeliverTx{}, + Hash: tx.Hash(), + }, nil + } + + // Wait for the tx to be included in a block or timeout. + select { + case msg := <-deliverTxSub.Out(): // The tx was included in a block. + deliverTxRes := msg.Data().(types.EventDataTx) + return &ctypes.ResultBroadcastTxCommit{ + CheckTx: *checkTxRes, + DeliverTx: deliverTxRes.Result, + Hash: tx.Hash(), + Height: deliverTxRes.Height, + }, nil + case <-deliverTxSub.Cancelled(): + var reason string + if deliverTxSub.Err() == nil { + reason = "Tendermint exited" + } else { + reason = deliverTxSub.Err().Error() + } + err = fmt.Errorf("deliverTxSub was cancelled (reason: %s)", reason) + env.Logger.Error("Error on broadcastTxCommit", "err", err) + return &ctypes.ResultBroadcastTxCommit{ + CheckTx: *checkTxRes, + DeliverTx: abci.ResponseDeliverTx{}, + Hash: tx.Hash(), + }, err + case <-time.After(env.Config.TimeoutBroadcastTxCommit): + err = errors.New("timed out waiting for tx to be included in a block") + env.Logger.Error("Error on broadcastTxCommit", "err", err) + return &ctypes.ResultBroadcastTxCommit{ + CheckTx: *checkTxRes, + DeliverTx: abci.ResponseDeliverTx{}, + Hash: tx.Hash(), + }, err } - err = fmt.Errorf("deliverTxSub was cancelled (reason: %s)", reason) - env.Logger.Error("Error on broadcastTxCommit", "err", err) - return &ctypes.ResultBroadcastTxCommit{ - CheckTx: *checkTxRes, - DeliverTx: abci.ResponseDeliverTx{}, - Hash: tx.Hash(), - }, err - case <-time.After(env.Config.TimeoutBroadcastTxCommit): - err = errors.New("timed out waiting for tx to be included in a block") - env.Logger.Error("Error on broadcastTxCommit", "err", err) - return &ctypes.ResultBroadcastTxCommit{ - CheckTx: *checkTxRes, - DeliverTx: abci.ResponseDeliverTx{}, - Hash: tx.Hash(), - }, err } }