Skip to content

Commit

Permalink
Websocket support batch request[v031] (#169)
Browse files Browse the repository at this point in the history
* add ws batch request

* rename validateWsRequest
  • Loading branch information
giskook authored Apr 10, 2024
1 parent 19e11a2 commit c107cba
Show file tree
Hide file tree
Showing 2 changed files with 87 additions and 2 deletions.
2 changes: 1 addition & 1 deletion jsonrpc/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -450,7 +450,7 @@ func (s *Server) handleWs(w http.ResponseWriter, req *http.Request) {
}

if msgType == websocket.TextMessage || msgType == websocket.BinaryMessage {
resp, err := s.handler.HandleWs(message, wsConn, req)
resp, err := s.handleWsMessage(req, wsConn, message)
if err != nil {
log.Error(fmt.Sprintf("Unable to handle WS request, %s", err.Error()))
_ = wsConn.WriteMessage(msgType, []byte(fmt.Sprintf("WS Handle error: %s", err.Error())))
Expand Down
87 changes: 86 additions & 1 deletion jsonrpc/server_xlayer.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,15 @@
package jsonrpc

import "github.com/0xPolygonHermez/zkevm-node/jsonrpc/nacos"
import (
"encoding/json"
"fmt"
"net/http"
"time"

"github.com/0xPolygonHermez/zkevm-node/jsonrpc/metrics"
"github.com/0xPolygonHermez/zkevm-node/jsonrpc/nacos"
"github.com/0xPolygonHermez/zkevm-node/jsonrpc/types"
)

func (s *Server) registerNacos() {
// start nacos client for registering restful service
Expand Down Expand Up @@ -30,3 +39,79 @@ func (s *Server) getBatchReqLimit() (bool, uint) {

return batchRequestEnable, batchRequestLimit
}

func (s *Server) handleWsMessage(httpRequest *http.Request, wsConn *concurrentWsConn, data []byte) ([]byte, error) {
if validateWsRequest(data) != nil {
return types.NewResponse(types.Request{}, nil, types.NewRPCError(types.InvalidRequestErrorCode, "Invalid json request")).Bytes()
}
single, err := s.isSingleRequest(data)
if err != nil {
return types.NewResponse(types.Request{}, nil, types.NewRPCError(types.InvalidRequestErrorCode, err.Error())).Bytes()
}
if single {
return s.handler.HandleWs(data, wsConn, httpRequest)
}
return s.handleWsBatch(httpRequest, wsConn, data)
}

func (s *Server) handleWsBatch(httpRequest *http.Request, wsConn *concurrentWsConn, data []byte) ([]byte, error) {
requests, err := s.parseRequests(data)
if err != nil {
return types.NewResponse(types.Request{}, nil, types.NewRPCError(types.InvalidRequestErrorCode, err.Error())).Bytes()
}

batchRequestEnable, batchRequestLimit := s.getBatchReqLimit()

if !batchRequestEnable {
return types.NewResponse(types.Request{}, nil, types.NewRPCError(types.InvalidRequestErrorCode, "Batch requests are disabled")).Bytes()
}

// Checking if batch requests limit is exceeded
if batchRequestLimit > 0 {
if len(requests) > int(batchRequestLimit) {
errMsg := fmt.Sprintf("Batch requests limit exceeded: %d", batchRequestLimit)
return types.NewResponse(types.Request{}, nil, types.NewRPCError(types.InvalidRequestErrorCode, errMsg)).Bytes()
}
}

responses := make([]types.Response, 0, len(requests))

for _, request := range requests {
if !methodRateLimitAllow(request.Method) {
responses = append(responses, types.NewResponse(request, nil, types.NewRPCError(types.InvalidParamsErrorCode, "server is too busy")))
continue
}
st := time.Now()
metrics.RequestMethodCount(request.Method)
req := handleRequest{Request: request, wsConn: wsConn, HttpRequest: httpRequest}
response := s.handler.Handle(req)
responses = append(responses, response)
metrics.RequestMethodDuration(request.Method, st)
}

return json.Marshal(responses)
}

func validateWsRequest(data []byte) error {
if len(data) > maxRequestContentLength {
return fmt.Errorf("content length too large (%d>%d)", len(data), maxRequestContentLength)
}
var valid bool
var req types.Request
if err := json.Unmarshal(data, &req); err != nil {
valid = true
}

if !valid {
var reqs []types.Request
if err := json.Unmarshal(data, &reqs); err != nil {
valid = true
}
}

if !valid {
return fmt.Errorf("invalid request")
}

return nil
}

0 comments on commit c107cba

Please sign in to comment.