Skip to content

Commit

Permalink
Revert "eth/filters, ethclient/gethclient: add fullTx option to pendi…
Browse files Browse the repository at this point in the history
…ng tx filter (#25186) (bnb-chain#1626)"

This reverts commit 291cb8a.
  • Loading branch information
joseddg92 committed Sep 25, 2023
1 parent bb6bdc0 commit 58b1ac7
Show file tree
Hide file tree
Showing 5 changed files with 45 additions and 93 deletions.
38 changes: 14 additions & 24 deletions eth/filters/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ type filter struct {
typ Type
deadline *time.Timer // filter is inactiv when deadline triggers
hashes []common.Hash
txs []*types.Transaction
crit FilterCriteria
logs []*types.Log
s *Subscription // associated subscription in event system
Expand Down Expand Up @@ -100,7 +99,7 @@ func (api *PublicFilterAPI) timeoutLoop(timeout time.Duration) {
}
}

// NewPendingTransactionFilter creates a filter that fetches pending transactions
// NewPendingTransactionFilter creates a filter that fetches pending transaction hashes
// as transactions enter the pending state.
//
// It is part of the filter package because this filter can be used through the
Expand All @@ -109,20 +108,20 @@ func (api *PublicFilterAPI) timeoutLoop(timeout time.Duration) {
// https://eth.wiki/json-rpc/API#eth_newpendingtransactionfilter
func (api *PublicFilterAPI) NewPendingTransactionFilter() rpc.ID {
var (
pendingTxs = make(chan []*types.Transaction)
pendingTxs = make(chan []common.Hash)
pendingTxSub = api.events.SubscribePendingTxs(pendingTxs)
)
api.filtersMu.Lock()
api.filters[pendingTxSub.ID] = &filter{typ: PendingTransactionsSubscription, deadline: time.NewTimer(api.timeout), txs: make([]*types.Transaction, 0), s: pendingTxSub}
api.filters[pendingTxSub.ID] = &filter{typ: PendingTransactionsSubscription, deadline: time.NewTimer(api.timeout), hashes: make([]common.Hash, 0), s: pendingTxSub}
api.filtersMu.Unlock()

gopool.Submit(func() {
for {
select {
case pTx := <-pendingTxs:
case ph := <-pendingTxs:
api.filtersMu.Lock()
if f, found := api.filters[pendingTxSub.ID]; found {
f.txs = append(f.txs, pTx...)
f.hashes = append(f.hashes, ph...)
}
api.filtersMu.Unlock()
case <-pendingTxSub.Err():
Expand All @@ -137,10 +136,9 @@ func (api *PublicFilterAPI) NewPendingTransactionFilter() rpc.ID {
return pendingTxSub.ID
}

// NewPendingTransactions creates a subscription that is triggered each time a
// transaction enters the transaction pool. If fullTx is true the full tx is
// sent to the client, otherwise the hash is sent.
func (api *PublicFilterAPI) NewPendingTransactions(ctx context.Context, fullTx *bool) (*rpc.Subscription, error) {
// NewPendingTransactions creates a subscription that is triggered each time a transaction
// enters the transaction pool and was signed from one of the transactions this nodes manages.
func (api *PublicFilterAPI) NewPendingTransactions(ctx context.Context) (*rpc.Subscription, error) {
notifier, supported := rpc.NotifierFromContext(ctx)
if !supported {
return &rpc.Subscription{}, rpc.ErrNotificationsUnsupported
Expand All @@ -149,20 +147,16 @@ func (api *PublicFilterAPI) NewPendingTransactions(ctx context.Context, fullTx *
rpcSub := notifier.CreateSubscription()

gopool.Submit(func() {
txs := make(chan []*types.Transaction, 128)
pendingTxSub := api.events.SubscribePendingTxs(txs)
txHashes := make(chan []common.Hash, 128)
pendingTxSub := api.events.SubscribePendingTxs(txHashes)

for {
select {
case txs := <-txs:
case hashes := <-txHashes:
// To keep the original behaviour, send a single tx hash in one notification.
// TODO(rjl493456442) Send a batch of tx hashes in one notification
for _, tx := range txs {
if fullTx != nil && *fullTx {
notifier.Notify(rpcSub.ID, tx)
} else {
notifier.Notify(rpcSub.ID, tx.Hash())
}
for _, h := range hashes {
notifier.Notify(rpcSub.ID, h)
}
case <-rpcSub.Err():
pendingTxSub.Unsubscribe()
Expand Down Expand Up @@ -557,14 +551,10 @@ func (api *PublicFilterAPI) GetFilterChanges(id rpc.ID) (interface{}, error) {
f.deadline.Reset(api.timeout)

switch f.typ {
case BlocksSubscription, FinalizedHeadersSubscription, VotesSubscription:
case PendingTransactionsSubscription, BlocksSubscription, FinalizedHeadersSubscription, VotesSubscription:
hashes := f.hashes
f.hashes = nil
return returnHashes(hashes), nil
case PendingTransactionsSubscription:
txs := f.txs
f.txs = nil
return txs, nil
case LogsSubscription, MinedAndPendingLogsSubscription:
logs := f.logs
f.logs = nil
Expand Down
32 changes: 18 additions & 14 deletions eth/filters/filter_system.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@ const (
PendingLogsSubscription
// MinedAndPendingLogsSubscription queries for logs in mined and pending blocks.
MinedAndPendingLogsSubscription
// PendingTransactionsSubscription queries for pending transactions entering
// the pending state
// PendingTransactionsSubscription queries tx hashes for pending
// transactions entering the pending state
PendingTransactionsSubscription
// BlocksSubscription queries hashes for blocks that are imported
BlocksSubscription
Expand Down Expand Up @@ -83,7 +83,7 @@ type subscription struct {
created time.Time
logsCrit ethereum.FilterQuery
logs chan []*types.Log
txs chan []*types.Transaction
hashes chan []common.Hash
headers chan *types.Header
finalizedHeaders chan *types.Header
votes chan *types.VoteEnvelope
Expand Down Expand Up @@ -187,7 +187,7 @@ func (sub *Subscription) Unsubscribe() {
case sub.es.uninstall <- sub.f:
break uninstallLoop
case <-sub.f.logs:
case <-sub.f.txs:
case <-sub.f.hashes:
case <-sub.f.headers:
case <-sub.f.votes:
}
Expand Down Expand Up @@ -255,7 +255,7 @@ func (es *EventSystem) subscribeMinedPendingLogs(crit ethereum.FilterQuery, logs
logsCrit: crit,
created: time.Now(),
logs: logs,
txs: make(chan []*types.Transaction),
hashes: make(chan []common.Hash),
headers: make(chan *types.Header),
votes: make(chan *types.VoteEnvelope),
installed: make(chan struct{}),
Expand All @@ -273,7 +273,7 @@ func (es *EventSystem) subscribeLogs(crit ethereum.FilterQuery, logs chan []*typ
logsCrit: crit,
created: time.Now(),
logs: logs,
txs: make(chan []*types.Transaction),
hashes: make(chan []common.Hash),
headers: make(chan *types.Header),
votes: make(chan *types.VoteEnvelope),
installed: make(chan struct{}),
Expand All @@ -291,7 +291,7 @@ func (es *EventSystem) subscribePendingLogs(crit ethereum.FilterQuery, logs chan
logsCrit: crit,
created: time.Now(),
logs: logs,
txs: make(chan []*types.Transaction),
hashes: make(chan []common.Hash),
headers: make(chan *types.Header),
votes: make(chan *types.VoteEnvelope),
installed: make(chan struct{}),
Expand All @@ -308,7 +308,7 @@ func (es *EventSystem) SubscribeNewHeads(headers chan *types.Header) *Subscripti
typ: BlocksSubscription,
created: time.Now(),
logs: make(chan []*types.Log),
txs: make(chan []*types.Transaction),
hashes: make(chan []common.Hash),
headers: headers,
votes: make(chan *types.VoteEnvelope),
installed: make(chan struct{}),
Expand All @@ -325,7 +325,7 @@ func (es *EventSystem) SubscribeNewFinalizedHeaders(headers chan *types.Header)
typ: FinalizedHeadersSubscription,
created: time.Now(),
logs: make(chan []*types.Log),
txs: make(chan []*types.Transaction),
hashes: make(chan []common.Hash),
headers: headers,
votes: make(chan *types.VoteEnvelope),
installed: make(chan struct{}),
Expand All @@ -334,15 +334,15 @@ func (es *EventSystem) SubscribeNewFinalizedHeaders(headers chan *types.Header)
return es.subscribe(sub)
}

// SubscribePendingTxs creates a subscription that writes transactions for
// SubscribePendingTxs creates a subscription that writes transaction hashes for
// transactions that enter the transaction pool.
func (es *EventSystem) SubscribePendingTxs(txs chan []*types.Transaction) *Subscription {
func (es *EventSystem) SubscribePendingTxs(hashes chan []common.Hash) *Subscription {
sub := &subscription{
id: rpc.NewID(),
typ: PendingTransactionsSubscription,
created: time.Now(),
logs: make(chan []*types.Log),
txs: txs,
hashes: hashes,
headers: make(chan *types.Header),
votes: make(chan *types.VoteEnvelope),
installed: make(chan struct{}),
Expand All @@ -359,7 +359,7 @@ func (es *EventSystem) SubscribeNewVotes(votes chan *types.VoteEnvelope) *Subscr
typ: VotesSubscription,
created: time.Now(),
logs: make(chan []*types.Log),
txs: make(chan []*types.Transaction),
hashes: make(chan []common.Hash),
headers: make(chan *types.Header),
votes: votes,
installed: make(chan struct{}),
Expand Down Expand Up @@ -404,8 +404,12 @@ func (es *EventSystem) handleRemovedLogs(filters filterIndex, ev core.RemovedLog
}

func (es *EventSystem) handleTxsEvent(filters filterIndex, ev core.NewTxsEvent) {
hashes := make([]common.Hash, 0, len(ev.Txs))
for _, tx := range ev.Txs {
hashes = append(hashes, tx.Hash())
}
for _, f := range filters[PendingTransactionsSubscription] {
f.txs <- ev.Txs
f.hashes <- hashes
}
}

Expand Down
22 changes: 11 additions & 11 deletions eth/filters/filter_system_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ func TestPendingTxFilter(t *testing.T) {
types.NewTransaction(4, common.HexToAddress("0xb794f5ea0ba39494ce83a213fffba74279579268"), new(big.Int), 0, new(big.Int), nil),
}

txs []*types.Transaction
hashes []common.Hash
)

fid0 := api.NewPendingTransactionFilter()
Expand All @@ -265,9 +265,9 @@ func TestPendingTxFilter(t *testing.T) {
t.Fatalf("Unable to retrieve logs: %v", err)
}

tx := results.([]*types.Transaction)
txs = append(txs, tx...)
if len(txs) >= len(transactions) {
h := results.([]common.Hash)
hashes = append(hashes, h...)
if len(hashes) >= len(transactions) {
break
}
// check timeout
Expand All @@ -278,13 +278,13 @@ func TestPendingTxFilter(t *testing.T) {
time.Sleep(100 * time.Millisecond)
}

if len(txs) != len(transactions) {
t.Errorf("invalid number of transactions, want %d transactions(s), got %d", len(transactions), len(txs))
if len(hashes) != len(transactions) {
t.Errorf("invalid number of transactions, want %d transactions(s), got %d", len(transactions), len(hashes))
return
}
for i := range txs {
if txs[i].Hash() != transactions[i].Hash() {
t.Errorf("hashes[%d] invalid, want %x, got %x", i, transactions[i].Hash(), txs[i].Hash())
for i := range hashes {
if hashes[i] != transactions[i].Hash() {
t.Errorf("hashes[%d] invalid, want %x, got %x", i, transactions[i].Hash(), hashes[i])
}
}
}
Expand Down Expand Up @@ -715,11 +715,11 @@ func TestPendingTxFilterDeadlock(t *testing.T) {
fids[i] = fid
// Wait for at least one tx to arrive in filter
for {
txs, err := api.GetFilterChanges(fid)
hashes, err := api.GetFilterChanges(fid)
if err != nil {
t.Fatalf("Filter should exist: %v\n", err)
}
if len(txs.([]*types.Transaction)) > 0 {
if len(hashes.([]common.Hash)) > 0 {
break
}
runtime.Gosched()
Expand Down
7 changes: 1 addition & 6 deletions ethclient/gethclient/gethclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,12 +175,7 @@ func (ec *Client) GetNodeInfo(ctx context.Context) (*p2p.NodeInfo, error) {
return &result, err
}

// SubscribeFullPendingTransactions subscribes to new pending transactions.
func (ec *Client) SubscribeFullPendingTransactions(ctx context.Context, ch chan<- *types.Transaction) (*rpc.ClientSubscription, error) {
return ec.c.EthSubscribe(ctx, ch, "newPendingTransactions", true)
}

// SubscribePendingTransactions subscribes to new pending transaction hashes.
// SubscribePendingTransactions subscribes to new pending transactions.
func (ec *Client) SubscribePendingTransactions(ctx context.Context, ch chan<- common.Hash) (*rpc.ClientSubscription, error) {
return ec.c.EthSubscribe(ctx, ch, "newPendingTransactions")
}
Expand Down
39 changes: 1 addition & 38 deletions ethclient/gethclient/gethclient_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,12 +122,9 @@ func TestGethClient(t *testing.T) {
}, {
"TestSetHead",
func(t *testing.T) { testSetHead(t, client) },
}, {
"TestSubscribePendingTxHashes",
func(t *testing.T) { testSubscribePendingTransactions(t, client) },
}, {
"TestSubscribePendingTxs",
func(t *testing.T) { testSubscribeFullPendingTransactions(t, client) },
func(t *testing.T) { testSubscribePendingTransactions(t, client) },
}, {
"TestCallContract",
func(t *testing.T) { testCallContract(t, client) },
Expand Down Expand Up @@ -301,40 +298,6 @@ func testSubscribePendingTransactions(t *testing.T, client *rpc.Client) {
}
}

func testSubscribeFullPendingTransactions(t *testing.T, client *rpc.Client) {
ec := New(client)
ethcl := ethclient.NewClient(client)
// Subscribe to Transactions
ch := make(chan *types.Transaction)
ec.SubscribeFullPendingTransactions(context.Background(), ch)
// Send a transaction
chainID, err := ethcl.ChainID(context.Background())
if err != nil {
t.Fatal(err)
}
// Create transaction
tx := types.NewTransaction(1, common.Address{1}, big.NewInt(1), 22000, big.NewInt(1), nil)
signer := types.LatestSignerForChainID(chainID)
signature, err := crypto.Sign(signer.Hash(tx).Bytes(), testKey)
if err != nil {
t.Fatal(err)
}
signedTx, err := tx.WithSignature(signer, signature)
if err != nil {
t.Fatal(err)
}
// Send transaction
err = ethcl.SendTransaction(context.Background(), signedTx)
if err != nil {
t.Fatal(err)
}
// Check that the transaction was send over the channel
tx = <-ch
if tx.Hash() != signedTx.Hash() {
t.Fatalf("Invalid tx hash received, got %v, want %v", tx.Hash(), signedTx.Hash())
}
}

func testCallContract(t *testing.T, client *rpc.Client) {
ec := New(client)
msg := ethereum.CallMsg{
Expand Down

0 comments on commit 58b1ac7

Please sign in to comment.