Skip to content

Commit

Permalink
Add subscribe event or transaction by websocket
Browse files Browse the repository at this point in the history
  • Loading branch information
0xblockhash committed May 8, 2023
1 parent 9aeb437 commit 5c79b52
Show file tree
Hide file tree
Showing 11 changed files with 332 additions and 7 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ Powred by [BlockVision](https://blockvision.org/) & [SuiVision](https://suivisio
+ Customized request method `SuiCall`.
+ Unsigned methods can be executed without loading your keystore file.
+ Provide the method `SignAndExecuteTransactionBlock` to send signed transaction.
+ Support subscriptions to events or transactions via websockets.

## Quick Start

Expand Down
9 changes: 9 additions & 0 deletions common/wsconn/subscription.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package wsconn

type SubscriptionResp struct {
Jsonrpc string `json:"jsonrpc"`
Result int64 `json:"result"`
Id int64 `json:"id"`
}


91 changes: 91 additions & 0 deletions common/wsconn/wsconn.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package wsconn

import (
"context"
"encoding/json"
"fmt"
"github.com/block-vision/sui-go-sdk/models"
"github.com/gorilla/websocket"
"github.com/tidwall/gjson"
"log"
"time"
)

type WsConn struct {
Conn *websocket.Conn
wsUrl string
}

type CallOp struct {
Method string
Params []interface{}
}

func NewWsConn(wsUrl string) *WsConn {
dialer := websocket.Dialer{}
conn, _, err := dialer.Dial(wsUrl, nil)

if err != nil {
log.Fatal("Error connecting to Websocket Server:", err, wsUrl)
}

return &WsConn{
Conn: conn,
wsUrl: wsUrl,
}
}

func (w *WsConn) Call(ctx context.Context, op CallOp, receiveMsgCh chan []byte) error {
jsonRPCCall := models.JsonRPCRequest{
JsonRPC: "2.0",
ID: time.Now().UnixMilli(),
Method: op.Method,
Params: op.Params,
}

callBytes, err := json.Marshal(jsonRPCCall)
if err != nil {
return err
}

err = w.Conn.WriteMessage(websocket.TextMessage, callBytes)
if nil != err {
return err
}

_, messageData, err := w.Conn.ReadMessage()
if nil != err {
return err
}

var rsp SubscriptionResp
if gjson.ParseBytes(messageData).Get("error").Exists() {
return fmt.Errorf(gjson.ParseBytes(messageData).Get("error").String())
}

err = json.Unmarshal([]byte(gjson.ParseBytes(messageData).String()), &rsp)
if err != nil {
return err
}

fmt.Printf("establish successfully, subscriptionID: %d, Waiting to accept data...\n", rsp.Result)

go func(conn *websocket.Conn) {
for {
messageType, messageData, err := conn.ReadMessage()
if nil != err {
log.Println(err)
break
}
switch messageType {
case websocket.TextMessage:
receiveMsgCh <- messageData

default:
continue
}
}
}(w.Conn)

return nil
}
5 changes: 5 additions & 0 deletions constant/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,9 @@ const (
BvMainnetEndpoint = "https://sui-mainnet-endpoint.blockvision.org"
SuiTestnetEndpoint = "https://fullnode.testnet.sui.io"
SuiMainnetEndpoint = "https://fullnode.mainnet.sui.io"

WssBvTestnetEndpoint = "wss://sui-testnet-endpoint.blockvision.org/websocket"
WssBvMainnetEndpoint = "wss://sui-mainnet-endpoint.blockvision.org/websocket"
WssSuiTestnetEndpoint = "wss://fullnode.testnet.sui.io"
WssSuiMainnetEndpoint = "wss://fullnode.mainnet.sui.io"
)
63 changes: 63 additions & 0 deletions examples/subscribe/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package main

import (
"context"
"github.com/block-vision/sui-go-sdk/constant"
"github.com/block-vision/sui-go-sdk/models"
"github.com/block-vision/sui-go-sdk/sui"
"github.com/block-vision/sui-go-sdk/utils"
)

func main() {
go SubscribeEvent()
go SubscribeTransaction()
select {}
}

func SubscribeEvent() {
var ctx = context.Background()
var cli = sui.NewSuiWebsocketClient(constant.WssBvTestnetEndpoint)

receiveMsgCh := make(chan models.SuiEventResponse, 10)
err := cli.SubscribeEvent(ctx, models.SuiXSubscribeEventsRequest{
SuiEventFilter: map[string]interface{}{
"All": []string{},
},
}, receiveMsgCh)
if err != nil {
panic(err)
}

for {
select {
case msg := <-receiveMsgCh:
utils.PrettyPrint(msg)
case <-ctx.Done():
return
}
}
}

func SubscribeTransaction() {
var ctx = context.Background()
var cli = sui.NewSuiWebsocketClient(constant.WssBvTestnetEndpoint)

receiveMsgCh := make(chan models.SuiEffects, 10)
err := cli.SubscribeTransaction(ctx, models.SuiXSubscribeTransactionsRequest{
TransactionFilter: models.TransactionFilterByFromAddress{
FromAddress: "0x0000000000000000000000000000000000000000000000000000000000000000",
},
}, receiveMsgCh)
if err != nil {
panic(err)
}

for {
select {
case msg := <-receiveMsgCh:
utils.PrettyPrint(msg)
case <-ctx.Done():
return
}
}
}
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ go 1.20

require (
github.com/go-playground/validator/v10 v10.12.0
github.com/gorilla/websocket v1.5.0
github.com/tidwall/gjson v1.14.4
github.com/tyler-smith/go-bip39 v1.1.0
golang.org/x/crypto v0.8.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ github.com/go-playground/universal-translator v0.18.1 h1:Bcnm0ZwsGyWbCzImXv+pAJn
github.com/go-playground/universal-translator v0.18.1/go.mod h1:xekY+UJKNuX9WP91TpwSH2VMlDf28Uj24BCp08ZFTUY=
github.com/go-playground/validator/v10 v10.12.0 h1:E4gtWgxWxp8YSxExrQFv5BpCahla0PVF2oTTEYaWQGI=
github.com/go-playground/validator/v10 v10.12.0/go.mod h1:hCAPuzYvKdP33pxWa+2+6AIKXEKqjIUyqsNCtbsSJrA=
github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc=
github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/leodido/go-urn v1.2.2 h1:7z68G0FCGvDk646jz1AelTYNYWrTNm0bEcFAo147wt4=
github.com/leodido/go-urn v1.2.2/go.mod h1:kUaIbLZWttglzwNuG0pgsh5vuV6u2YcGBYz1hIPjtOQ=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
Expand Down
43 changes: 43 additions & 0 deletions models/read_transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,44 @@ type SuiTransactionBlockResponseQuery struct {

type TransactionFilter map[string]interface{}

// TransactionFilterByFromAddress is a filter for from address
type TransactionFilterByFromAddress struct {
FromAddress string `json:"FromAddress"`
}

// TransactionFilterByToAddress is a filter for to address
type TransactionFilterByToAddress struct {
ToAddress string `json:"ToAddress"`
}

// TransactionFilterByInputObject is a filter for input objects
type TransactionFilterByInputObject struct {
// InputObject is the id of the object
InputObject string `json:"InputObject"`
}

// TransactionFilterByChangedObjectFilter is a filter for changed objects
type TransactionFilterByChangedObjectFilter struct {
// ChangedObject is a filter for changed objects
ChangedObject string `json:"ChangedObject"`
}

// TransactionFilterByMoveFunction is a filter for move functions
type TransactionFilterByMoveFunction struct {
MoveFunction MoveFunction `json:"MoveFunction"`
}

type MoveFunction struct {
Package string `json:"package"`
Module *string `json:"module"`
Function *string `json:"function"`
}

type SuiXSubscribeTransactionsRequest struct {
// the transaction query criteria.
TransactionFilter interface{} `json:"filter"`
}

type SuiXQueryTransactionBlocksRequest struct {
SuiTransactionBlockResponseQuery SuiTransactionBlockResponseQuery
// optional paging cursor
Expand Down Expand Up @@ -246,3 +284,8 @@ type SuiDevInspectTransactionBlockRequest struct {
// The epoch to perform the call. Will be set from the system state object if not provided
Epoch string `json:"epoch"`
}

type SuiXSubscribeEventsRequest struct {
// the event query criteria.
SuiEventFilter interface{} `json:"suiEventFilter"`
}
7 changes: 0 additions & 7 deletions models/request.go

This file was deleted.

89 changes: 89 additions & 0 deletions sui/subscribe_api.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
package sui

import (
"context"
"encoding/json"
"github.com/block-vision/sui-go-sdk/common/wsconn"
"github.com/block-vision/sui-go-sdk/models"
"github.com/tidwall/gjson"
"log"
)

type ISubscribeAPI interface {
SubscribeEvent(ctx context.Context, req models.SuiXSubscribeEventsRequest, msgCh chan models.SuiEventResponse) error
SubscribeTransaction(ctx context.Context, req models.SuiXSubscribeTransactionsRequest, msgCh chan models.SuiEffects) error
}

type suiSubscribeImpl struct {
conn *wsconn.WsConn
}

// SubscribeEvent implements the method `suix_subscribeEvent`, subscribe to a stream of Sui event.
func (s *suiSubscribeImpl) SubscribeEvent(ctx context.Context, req models.SuiXSubscribeEventsRequest, msgCh chan models.SuiEventResponse) error {
rsp := make(chan []byte, 10)
err := s.conn.Call(ctx, wsconn.CallOp{
Method: "suix_subscribeEvent",
Params: []interface{}{
req.SuiEventFilter,
},
}, rsp)
if err != nil {
return err
}

go func() {
for {
select {
case messageData := <-rsp:
var result models.SuiEventResponse
if gjson.ParseBytes(messageData).Get("error").Exists() {
log.Fatal(gjson.ParseBytes(messageData).Get("error").String())
}

err := json.Unmarshal([]byte(gjson.ParseBytes(messageData).Get("params.result").String()), &result)
if err != nil {
log.Fatal(err)
}

msgCh <- result
}
}
}()

return nil
}

// SubscribeTransaction implements the method `suix_subscribeTransaction`, subscribe to a stream of Sui transaction effects.
func (s *suiSubscribeImpl) SubscribeTransaction(ctx context.Context, req models.SuiXSubscribeTransactionsRequest, msgCh chan models.SuiEffects) error {
rsp := make(chan []byte, 10)
err := s.conn.Call(ctx, wsconn.CallOp{
Method: "suix_subscribeTransaction",
Params: []interface{}{
req.TransactionFilter,
},
}, rsp)
if err != nil {
return err
}

go func() {
for {
select {
case messageData := <-rsp:
var result models.SuiEffects
if gjson.ParseBytes(messageData).Get("error").Exists() {
log.Fatal(gjson.ParseBytes(messageData).Get("error").String())
}

err := json.Unmarshal([]byte(gjson.ParseBytes(messageData).Get("params.result").String()), &result)
if err != nil {
log.Fatal(err)
}

msgCh <- result
}
}
}()

return nil
}
28 changes: 28 additions & 0 deletions sui/websocket.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
// Copyright (c) BlockVision, Inc. All rights reserved.
// SPDX-License-Identifier: Apache-2.0

package sui

import (
"github.com/block-vision/sui-go-sdk/common/wsconn"
)

// ISuiWebsocketAPI defines the subscription API related interface, and then implement it by the WebsocketClient.
type ISuiWebsocketAPI interface {
ISubscribeAPI
}

// WebsocketClient implements SuiWebsocketAPI related interfaces.
type WebsocketClient struct {
ISubscribeAPI
}

// NewSuiWebsocketClient instantiates the WebsocketClient to call the methods of each module.
func NewSuiWebsocketClient(rpcUrl string) ISuiWebsocketAPI {
conn := wsconn.NewWsConn(rpcUrl)
return &WebsocketClient{
ISubscribeAPI: &suiSubscribeImpl{
conn: conn,
},
}
}

0 comments on commit 5c79b52

Please sign in to comment.