Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: rpc: correct eth_subscribe implementation #10027

Merged
merged 5 commits into from
Jan 31, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
gateway: eth_subscribe support
  • Loading branch information
magik6k committed Jan 31, 2023
commit 1286d769889f10c72e2136f74f03dcf6e227ae12
3 changes: 1 addition & 2 deletions api/api_full.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@ import (
"fmt"
"time"

"github.com/filecoin-project/go-jsonrpc"

"github.com/google/uuid"
blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-cid"
Expand All @@ -18,6 +16,7 @@ import (
datatransfer "github.com/filecoin-project/go-data-transfer"
"github.com/filecoin-project/go-fil-markets/retrievalmarket"
"github.com/filecoin-project/go-fil-markets/storagemarket"
"github.com/filecoin-project/go-jsonrpc"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-state-types/big"
"github.com/filecoin-project/go-state-types/builtin/v8/paych"
Expand Down
2 changes: 1 addition & 1 deletion api/api_gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,6 @@ type Gateway interface {
EthNewBlockFilter(ctx context.Context) (ethtypes.EthFilterID, error)
EthNewPendingTransactionFilter(ctx context.Context) (ethtypes.EthFilterID, error)
EthUninstallFilter(ctx context.Context, id ethtypes.EthFilterID) (bool, error)
EthSubscribe(ctx context.Context, eventType string, params *ethtypes.EthSubscriptionParams) (<-chan ethtypes.EthSubscriptionResponse, error)
EthSubscribe(ctx context.Context, eventType string, params *ethtypes.EthSubscriptionParams) (ethtypes.EthSubscriptionID, error)
EthUnsubscribe(ctx context.Context, id ethtypes.EthSubscriptionID) (bool, error)
}
4 changes: 2 additions & 2 deletions api/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,10 @@ func NewFullNodeRPCV0(ctx context.Context, addr string, requestHeader http.Heade
}

// NewFullNodeRPCV1 creates a new http jsonrpc client.
func NewFullNodeRPCV1(ctx context.Context, addr string, requestHeader http.Header) (api.FullNode, jsonrpc.ClientCloser, error) {
func NewFullNodeRPCV1(ctx context.Context, addr string, requestHeader http.Header, opts ...jsonrpc.Option) (api.FullNode, jsonrpc.ClientCloser, error) {
var res v1api.FullNodeStruct
closer, err := jsonrpc.NewMergeClient(ctx, addr, "Filecoin",
api.GetInternalStructs(&res), requestHeader, jsonrpc.WithErrors(api.RPCErrors))
api.GetInternalStructs(&res), requestHeader, append([]jsonrpc.Option{jsonrpc.WithErrors(api.RPCErrors)}, opts...)...)

return &res, closer, err
}
Expand Down
4 changes: 2 additions & 2 deletions api/mocks/mock_full.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

28 changes: 15 additions & 13 deletions api/proxy_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 6 additions & 4 deletions api/v0api/proxy_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Binary file modified build/openrpc/full.json.gz
Binary file not shown.
Binary file modified build/openrpc/gateway.json.gz
Binary file not shown.
Binary file modified build/openrpc/miner.json.gz
Binary file not shown.
Binary file modified build/openrpc/worker.json.gz
Binary file not shown.
24 changes: 23 additions & 1 deletion cli/util/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,11 +319,33 @@ func GetFullNodeAPIV1Single(ctx *cli.Context) (v1api.FullNode, jsonrpc.ClientClo
return v1API, closer, nil
}

func GetFullNodeAPIV1(ctx *cli.Context) (v1api.FullNode, jsonrpc.ClientCloser, error) {
type GetFullNodeOptions struct {
ethSubHandler api.EthSubscriber
}

type GetFullNodeOption func(*GetFullNodeOptions)

func FullNodeWithEthSubscribtionHandler(sh api.EthSubscriber) GetFullNodeOption {
return func(opts *GetFullNodeOptions) {
opts.ethSubHandler = sh
}
}

func GetFullNodeAPIV1(ctx *cli.Context, opts ...GetFullNodeOption) (v1api.FullNode, jsonrpc.ClientCloser, error) {
if tn, ok := ctx.App.Metadata["testnode-full"]; ok {
return tn.(v1api.FullNode), func() {}, nil
}

var options GetFullNodeOptions
for _, opt := range opts {
opt(&options)
}

var rpcOpts []jsonrpc.Option
if options.ethSubHandler != nil {
rpcOpts = append(rpcOpts, jsonrpc.WithClientHandler("Filecoin", options.ethSubHandler), jsonrpc.WithClientHandlerAlias("eth_subscription", "Filecoin.EthSubscription"))
}

heads, err := GetRawAPIMulti(ctx, repo.FullNode, "v1")
if err != nil {
return nil, nil, err
Expand Down
6 changes: 4 additions & 2 deletions cmd/lotus-gateway/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,9 @@ var runCmd = &cli.Command{
log.Fatalf("Cannot register the view: %v", err)
}

api, closer, err := lcli.GetFullNodeAPIV1(cctx)
subHnd := gateway.NewEthSubHandler()

api, closer, err := lcli.GetFullNodeAPIV1(cctx, cliutil.FullNodeWithEthSubscribtionHandler(subHnd))
if err != nil {
return err
}
Expand Down Expand Up @@ -195,7 +197,7 @@ var runCmd = &cli.Command{
return xerrors.Errorf("failed to convert endpoint address to multiaddr: %w", err)
}

gwapi := gateway.NewNode(api, lookbackCap, waitLookback, rateLimit, rateLimitTimeout)
gwapi := gateway.NewNode(api, subHnd, lookbackCap, waitLookback, rateLimit, rateLimitTimeout)
h, err := gateway.Handler(gwapi, api, perConnRateLimit, connPerMinute, serverOptions...)
if err != nil {
return xerrors.Errorf("failed to set up gateway HTTP handler")
Expand Down
71 changes: 34 additions & 37 deletions documentation/en/api-v1-unstable-methods.md
Original file line number Diff line number Diff line change
Expand Up @@ -3073,43 +3073,40 @@ Inputs:

Response:
```json
{
"subscription": [
55,
105,
12,
254,
198,
193,
191,
76,
59,
146,
136,
199,
165,
215,
131,
233,
135,
49,
233,
11,
10,
76,
23,
124,
42,
55,
76,
122,
148,
39,
53,
94
],
"result": {}
}
[
55,
105,
12,
254,
198,
193,
191,
76,
59,
146,
136,
199,
165,
215,
131,
233,
135,
49,
233,
11,
10,
76,
23,
124,
42,
55,
76,
122,
148,
39,
53,
94
]
```

### EthUninstallFilter
Expand Down
70 changes: 70 additions & 0 deletions gateway/eth_sub.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package gateway

import (
"context"
"sync"

"github.com/filecoin-project/go-jsonrpc"

"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/chain/types/ethtypes"
)

type EthSubHandler struct {
queued map[ethtypes.EthSubscriptionID][]ethtypes.EthSubscriptionResponse
sinks map[ethtypes.EthSubscriptionID]func(context.Context, *ethtypes.EthSubscriptionResponse) error

lk sync.Mutex
}

func NewEthSubHandler() *EthSubHandler {
return &EthSubHandler{
queued: make(map[ethtypes.EthSubscriptionID][]ethtypes.EthSubscriptionResponse),
sinks: make(map[ethtypes.EthSubscriptionID]func(context.Context, *ethtypes.EthSubscriptionResponse) error),
}
}

func (e *EthSubHandler) addSub(ctx context.Context, id ethtypes.EthSubscriptionID, sink func(context.Context, *ethtypes.EthSubscriptionResponse) error) error {
e.lk.Lock()
defer e.lk.Unlock()

for _, p := range e.queued[id] {
if err := sink(ctx, &p); err != nil {
return err
}
}
delete(e.queued, id)
e.sinks[id] = sink
return nil
}

func (e *EthSubHandler) removeSub(id ethtypes.EthSubscriptionID) {
e.lk.Lock()
defer e.lk.Unlock()

delete(e.sinks, id)
delete(e.queued, id)
}

func (e *EthSubHandler) EthSubscription(ctx context.Context, r jsonrpc.RawParams) error {
p, err := jsonrpc.DecodeParams[ethtypes.EthSubscriptionResponse](r)
if err != nil {
return err
}

e.lk.Lock()

geoff-vball marked this conversation as resolved.
Show resolved Hide resolved
sink := e.sinks[p.SubscriptionID]

if sink == nil {
e.queued[p.SubscriptionID] = append(e.queued[p.SubscriptionID], p)
e.lk.Unlock()
return nil
}

e.lk.Unlock()

return sink(ctx, &p) // todo track errors and auto-unsubscribe on rpc conn close?
}

var _ api.EthSubscriber = (*EthSubHandler)(nil)
4 changes: 2 additions & 2 deletions gateway/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ const perConnLimiterKey perConnLimiterKeyType = "limiter"

type filterTrackerKeyType string

const filterTrackerKey filterTrackerKeyType = "filterTracker"
const statefulCallTrackerKey filterTrackerKeyType = "statefulCallTracker"

// Handler returns a gateway http.Handler, to be mounted as-is on the server.
func Handler(gwapi lapi.Gateway, api lapi.FullNode, rateLimit int64, connPerMinute int64, opts ...jsonrpc.ServerOption) (http.Handler, error) {
Expand Down Expand Up @@ -90,7 +90,7 @@ func (h RateLimiterHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
r = r.WithContext(context.WithValue(r.Context(), perConnLimiterKey, h.limiter))

// also add a filter tracker to the context
r = r.WithContext(context.WithValue(r.Context(), filterTrackerKey, newFilterTracker()))
r = r.WithContext(context.WithValue(r.Context(), statefulCallTrackerKey, newStatefulCallTracker()))

h.handler.ServeHTTP(w, r)
}
Expand Down
Loading