Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: rpc: correct eth_subscribe implementation #10027

Merged
merged 5 commits into from
Jan 31, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
eth rpc: Params are optional in eth_subscribe
  • Loading branch information
magik6k committed Jan 31, 2023
commit ad14d71978f9d63ff6268b5fbedb781a05fbe384
2 changes: 1 addition & 1 deletion api/api_full.go
Original file line number Diff line number Diff line change
Expand Up @@ -832,7 +832,7 @@ type FullNode interface {
// - logs: notify new event logs that match a criteria
// params contains additional parameters used with the log event type
// The client will receive a stream of EthSubscriptionResponse values until EthUnsubscribe is called.
EthSubscribe(ctx context.Context, eventType string, params *ethtypes.EthSubscriptionParams) (ethtypes.EthSubscriptionID, error) //perm:write
EthSubscribe(ctx context.Context, params jsonrpc.RawParams) (ethtypes.EthSubscriptionID, error) //perm:write

// Unsubscribe from a websocket subscription
EthUnsubscribe(ctx context.Context, id ethtypes.EthSubscriptionID) (bool, error) //perm:write
Expand Down
3 changes: 2 additions & 1 deletion api/api_gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/ipfs/go-cid"

"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-jsonrpc"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-state-types/builtin/v9/miner"
"github.com/filecoin-project/go-state-types/dline"
Expand Down Expand Up @@ -102,6 +103,6 @@ type Gateway interface {
EthNewBlockFilter(ctx context.Context) (ethtypes.EthFilterID, error)
EthNewPendingTransactionFilter(ctx context.Context) (ethtypes.EthFilterID, error)
EthUninstallFilter(ctx context.Context, id ethtypes.EthFilterID) (bool, error)
EthSubscribe(ctx context.Context, eventType string, params *ethtypes.EthSubscriptionParams) (ethtypes.EthSubscriptionID, error)
EthSubscribe(ctx context.Context, params jsonrpc.RawParams) (ethtypes.EthSubscriptionID, error)
EthUnsubscribe(ctx context.Context, id ethtypes.EthSubscriptionID) (bool, error)
}
9 changes: 5 additions & 4 deletions api/mocks/mock_full.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 8 additions & 8 deletions api/proxy_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Binary file modified build/openrpc/full.json.gz
Binary file not shown.
Binary file modified build/openrpc/gateway.json.gz
Binary file not shown.
37 changes: 37 additions & 0 deletions chain/types/ethtypes/eth_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -619,6 +619,43 @@ type EthLog struct {
BlockNumber EthUint64 `json:"blockNumber"`
}

// EthSubscribeParams handles raw jsonrpc params for eth_subscribe
type EthSubscribeParams struct {
EventType string
Params *EthSubscriptionParams
}

func (e *EthSubscribeParams) UnmarshalJSON(b []byte) error {
var params []json.RawMessage
err := json.Unmarshal(b, &params)
if err != nil {
return err
}
switch len(params) {
case 2:
err = json.Unmarshal(params[1], &e.Params)
if err != nil {
return err
}
fallthrough
case 1:
err = json.Unmarshal(params[0], &e.EventType)
if err != nil {
return err
}
default:
return xerrors.Errorf("expected 1 or 2 params, got %d", len(params))
}
return nil
}

func (e EthSubscribeParams) MarshalJSON() ([]byte, error) {
if e.Params != nil {
return json.Marshal([]interface{}{e.EventType, e.Params})
}
return json.Marshal([]interface{}{e.EventType})
}

type EthSubscriptionParams struct {
// List of topics to be matched.
// Optional, default: empty list
Expand Down
9 changes: 1 addition & 8 deletions documentation/en/api-v1-unstable-methods.md
Original file line number Diff line number Diff line change
Expand Up @@ -2886,14 +2886,7 @@ Perms: write
Inputs:
```json
[
"string value",
{
"topics": [
[
"0x37690cfec6c1bf4c3b9288c7a5d783e98731e90b0a4c177c2a374c7a9427355e"
]
]
}
"Bw=="
]
```

Expand Down
3 changes: 2 additions & 1 deletion gateway/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (

"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-bitfield"
"github.com/filecoin-project/go-jsonrpc"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-state-types/dline"
"github.com/filecoin-project/go-state-types/network"
Expand Down Expand Up @@ -117,7 +118,7 @@ type TargetAPI interface {
EthNewBlockFilter(ctx context.Context) (ethtypes.EthFilterID, error)
EthNewPendingTransactionFilter(ctx context.Context) (ethtypes.EthFilterID, error)
EthUninstallFilter(ctx context.Context, id ethtypes.EthFilterID) (bool, error)
EthSubscribe(ctx context.Context, eventType string, params *ethtypes.EthSubscriptionParams) (ethtypes.EthSubscriptionID, error)
EthSubscribe(ctx context.Context, params jsonrpc.RawParams) (ethtypes.EthSubscriptionID, error)
EthUnsubscribe(ctx context.Context, id ethtypes.EthSubscriptionID) (bool, error)
}

Expand Down
10 changes: 8 additions & 2 deletions gateway/proxy_eth.go
Original file line number Diff line number Diff line change
Expand Up @@ -436,7 +436,13 @@ func (gw *Node) EthUninstallFilter(ctx context.Context, id ethtypes.EthFilterID)
return ok, nil
}

func (gw *Node) EthSubscribe(ctx context.Context, eventType string, params *ethtypes.EthSubscriptionParams) (ethtypes.EthSubscriptionID, error) {
func (gw *Node) EthSubscribe(ctx context.Context, p jsonrpc.RawParams) (ethtypes.EthSubscriptionID, error) {
// validate params
_, err := jsonrpc.DecodeParams[ethtypes.EthSubscribeParams](p)
if err != nil {
return ethtypes.EthSubscriptionID{}, xerrors.Errorf("decoding params: %w", err)
}

if err := gw.limit(ctx, stateRateLimitTokens); err != nil {
return ethtypes.EthSubscriptionID{}, err
}
Expand All @@ -458,7 +464,7 @@ func (gw *Node) EthSubscribe(ctx context.Context, eventType string, params *etht
return ethtypes.EthSubscriptionID{}, fmt.Errorf("too many subscriptions")
}

sub, err := gw.target.EthSubscribe(ctx, eventType, params)
sub, err := gw.target.EthSubscribe(ctx, p)
if err != nil {
return ethtypes.EthSubscriptionID{}, err
}
Expand Down
45 changes: 20 additions & 25 deletions itests/eth_filter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"context"
"encoding/binary"
"encoding/hex"
"encoding/json"
"fmt"
"os"
"sort"
Expand All @@ -21,6 +22,7 @@ import (
"golang.org/x/xerrors"

"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-jsonrpc"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-state-types/big"

Expand All @@ -29,6 +31,7 @@ import (
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/chain/types/ethtypes"
"github.com/filecoin-project/lotus/itests/kit"
res "github.com/filecoin-project/lotus/lib/result"
)

// SolidityContractDef holds information about one of the test contracts
Expand Down Expand Up @@ -438,7 +441,7 @@ func TestEthSubscribeLogsNoTopicSpec(t *testing.T) {
t.Logf("actor ID address is %s", idAddr)

// install filter
subId, err := client.EthSubscribe(ctx, "logs", nil)
subId, err := client.EthSubscribe(ctx, res.Wrap[jsonrpc.RawParams](json.Marshal(ethtypes.EthSubscribeParams{EventType: "logs"})).Assert(require.NoError))
require.NoError(err)

var subResponses []ethtypes.EthSubscriptionResponse
Expand Down Expand Up @@ -567,51 +570,43 @@ func TestEthSubscribeLogs(t *testing.T) {

testResponses := map[string]chan ethtypes.EthSubscriptionResponse{}

// quit is used to signal that we're ready to start testing collected results
quit := make(chan struct{})

// Create all the filters
for _, tc := range testCases {

// subscribe to topics in filter
subCh, err := client.EthSubscribe(ctx, "logs", &ethtypes.EthSubscriptionParams{Topics: tc.spec.Topics})
subParam, err := json.Marshal(ethtypes.EthSubscribeParams{
EventType: "logs",
Params: &ethtypes.EthSubscriptionParams{Topics: tc.spec.Topics},
})
require.NoError(err)

subId, err := client.EthSubscribe(ctx, subParam)
require.NoError(err)

responseCh := make(chan ethtypes.EthSubscriptionResponse, len(invocations))
testResponses[tc.name] = responseCh

// start a goroutine to forward responses from subscription to a buffered channel with guaranteed capacity
go func(subCh <-chan ethtypes.EthSubscriptionResponse, responseCh chan<- ethtypes.EthSubscriptionResponse, quit chan struct{}) {
defer func() {
close(responseCh)
}()
for {
select {
case resp := <-subCh:
responseCh <- resp
case <-quit:
return
case <-ctx.Done():
return
}
}
}(subCh, responseCh, quit)

err = client.EthSubRouter.AddSub(ctx, subId, func(ctx context.Context, resp *ethtypes.EthSubscriptionResponse) error {
responseCh <- *resp
return nil
})
require.NoError(err)
}

// Perform all the invocations
messages := invokeAndWaitUntilAllOnChain(t, client, invocations)

// wait a little for subscriptions to gather results and then tell all the goroutines to stop
// wait a little for subscriptions to gather results
time.Sleep(blockTime * 6)
close(quit)

for _, tc := range testCases {
tc := tc // appease the lint despot
t.Run(tc.name, func(t *testing.T) {
responseCh, ok := testResponses[tc.name]
require.True(ok)

// don't expect any more responses
close(responseCh)

var elogs []*ethtypes.EthLog
for resp := range responseCh {
rlist, ok := resp.Result.([]interface{})
Expand Down
8 changes: 7 additions & 1 deletion lib/result/result.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,12 @@ func Wrap[T any](value T, err error) Result[T] {
}
}

func (r *Result[T]) Unwrap() (T, error) {
func (r Result[T]) Unwrap() (T, error) {
return r.Value, r.Error
}

func (r Result[T]) Assert(noErrFn func(err error, msgAndArgs ...interface{})) T {
noErrFn(r.Error)

return r.Value
}
19 changes: 12 additions & 7 deletions node/impl/full/eth.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ type EthEventAPI interface {
EthNewBlockFilter(ctx context.Context) (ethtypes.EthFilterID, error)
EthNewPendingTransactionFilter(ctx context.Context) (ethtypes.EthFilterID, error)
EthUninstallFilter(ctx context.Context, id ethtypes.EthFilterID) (bool, error)
EthSubscribe(ctx context.Context, eventType string, params *ethtypes.EthSubscriptionParams) (ethtypes.EthSubscriptionID, error)
EthSubscribe(ctx context.Context, params jsonrpc.RawParams) (ethtypes.EthSubscriptionID, error)
EthUnsubscribe(ctx context.Context, id ethtypes.EthSubscriptionID) (bool, error)
}

Expand Down Expand Up @@ -1103,7 +1103,12 @@ const (
EthSubscribeEventTypeLogs = "logs"
)

func (e *EthEvent) EthSubscribe(ctx context.Context, eventType string, params *ethtypes.EthSubscriptionParams) (ethtypes.EthSubscriptionID, error) {
func (e *EthEvent) EthSubscribe(ctx context.Context, p jsonrpc.RawParams) (ethtypes.EthSubscriptionID, error) {
params, err := jsonrpc.DecodeParams[ethtypes.EthSubscribeParams](p)
if err != nil {
return ethtypes.EthSubscriptionID{}, xerrors.Errorf("decoding params: %w", err)
}

if e.SubManager == nil {
return ethtypes.EthSubscriptionID{}, api.ErrNotSupported
}
Expand All @@ -1118,7 +1123,7 @@ func (e *EthEvent) EthSubscribe(ctx context.Context, eventType string, params *e
return ethtypes.EthSubscriptionID{}, err
}

switch eventType {
switch params.EventType {
case EthSubscribeEventTypeHeads:
f, err := e.TipSetFilterManager.Install(ctx)
if err != nil {
Expand All @@ -1130,13 +1135,13 @@ func (e *EthEvent) EthSubscribe(ctx context.Context, eventType string, params *e

case EthSubscribeEventTypeLogs:
keys := map[string][][]byte{}
if params != nil {
if params.Params != nil {
var err error
keys, err = parseEthTopics(params.Topics)
keys, err = parseEthTopics(params.Params.Topics)
if err != nil {
// clean up any previous filters added and stop the sub
_, _ = e.EthUnsubscribe(ctx, sub.id)
return nil, err
return ethtypes.EthSubscriptionID{}, err
}
}

Expand All @@ -1148,7 +1153,7 @@ func (e *EthEvent) EthSubscribe(ctx context.Context, eventType string, params *e
}
sub.addFilter(ctx, f)
default:
return ethtypes.EthSubscriptionID{}, xerrors.Errorf("unsupported event type: %s", eventType)
return ethtypes.EthSubscriptionID{}, xerrors.Errorf("unsupported event type: %s", params.EventType)
}

return sub.id, nil
Expand Down