Skip to content

Commit

Permalink
chore(dot/network): added common request-response protocol (ChainSafe…
Browse files Browse the repository at this point in the history
…#3334)

So far, we were using only one request-response protocol (block request and block response). But now, we need to add quite a few more. Just like we did for notification protocol, we need some common code-infrastructure for adding new request-response protocol. 

This PR introduces interfaces for Request and Response to follow and common functions to make request and decode response.
  • Loading branch information
kishansagathiya authored Jul 4, 2023
1 parent 457fe39 commit ac52090
Show file tree
Hide file tree
Showing 19 changed files with 244 additions and 143 deletions.
4 changes: 2 additions & 2 deletions dot/network/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,14 +81,14 @@ func (s *testStreamHandler) writeToStream(stream libp2pnetwork.Stream, msg Messa

func (s *testStreamHandler) readStream(stream libp2pnetwork.Stream,
peer peer.ID, decoder messageDecoder, handler messageHandler) {
msgBytes := make([]byte, maxBlockResponseSize)
msgBytes := make([]byte, MaxBlockResponseSize)

defer func() {
s.exit = true
}()

for {
tot, err := readStream(stream, &msgBytes, maxBlockResponseSize)
tot, err := readStream(stream, &msgBytes, MaxBlockResponseSize)
if errors.Is(err, io.EOF) {
return
} else if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion dot/network/light.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (

// handleLightStream handles streams with the <protocol-id>/light/2 protocol ID
func (s *Service) handleLightStream(stream libp2pnetwork.Stream) {
s.readStream(stream, s.decodeLightMessage, s.handleLightMsg, maxBlockResponseSize)
s.readStream(stream, s.decodeLightMessage, s.handleLightMsg, MaxBlockResponseSize)
}

func (s *Service) decodeLightMessage(in []byte, peer peer.ID, _ bool) (Message, error) {
Expand Down
4 changes: 2 additions & 2 deletions dot/network/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ const (
RequestedDataJustification = byte(16)
)

var _ Message = &BlockRequestMessage{}
var _ Message = (*BlockRequestMessage)(nil)

// SyncDirection is the direction of data in a block response
type SyncDirection byte
Expand Down Expand Up @@ -167,7 +167,7 @@ func (bm *BlockRequestMessage) Decode(in []byte) error {
return nil
}

var _ Message = &BlockResponseMessage{}
var _ ResponseMessage = (*BlockResponseMessage)(nil)

// BlockResponseMessage is sent in response to a BlockRequestMessage
type BlockResponseMessage struct {
Expand Down
92 changes: 92 additions & 0 deletions dot/network/request_response.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
// Copyright 2021 ChainSafe Systems (ON)
// SPDX-License-Identifier: LGPL-3.0-only

package network

import (
"context"
"fmt"
"sync"
"time"

"github.com/ChainSafe/gossamer/dot/peerset"
libp2pnetwork "github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/protocol"
)

type RequestMaker interface {
Do(to peer.ID, req Message, res ResponseMessage) error
}

type RequestResponseProtocol struct {
ctx context.Context
host *host
requestTimeout time.Duration
maxResponseSize uint64
protocolID protocol.ID
responseBufMu sync.Mutex
responseBuf []byte
}

func (rrp *RequestResponseProtocol) Do(to peer.ID, req Message, res ResponseMessage) error {
rrp.host.p2pHost.ConnManager().Protect(to, "")
defer rrp.host.p2pHost.ConnManager().Unprotect(to, "")

ctx, cancel := context.WithTimeout(rrp.ctx, rrp.requestTimeout)
defer cancel()

stream, err := rrp.host.p2pHost.NewStream(ctx, to, rrp.protocolID)
if err != nil {
return err
}

defer func() {
err := stream.Close()
if err != nil {
logger.Warnf("failed to close stream: %s", err)
}
}()

if err = rrp.host.writeToStream(stream, req); err != nil {
return err
}

return rrp.receiveResponse(stream, res)
}

func (rrp *RequestResponseProtocol) receiveResponse(stream libp2pnetwork.Stream, msg ResponseMessage) error {
// allocating a new (large) buffer every time slows down receiving response by a dramatic amount,
// as malloc is one of the most CPU intensive tasks.
// thus we should allocate buffers at startup and re-use them instead of allocating new ones each time.
rrp.responseBufMu.Lock()
defer rrp.responseBufMu.Unlock()

buf := rrp.responseBuf

n, err := readStream(stream, &buf, rrp.maxResponseSize)
if err != nil {
return fmt.Errorf("read stream error: %w", err)
}

if n == 0 {
return fmt.Errorf("received empty message")
}

err = msg.Decode(buf[:n])
if err != nil {
rrp.host.cm.peerSetHandler.ReportPeer(peerset.ReputationChange{
Value: peerset.BadMessageValue,
Reason: peerset.BadMessageReason,
}, stream.Conn().RemotePeer())
return fmt.Errorf("failed to decode block response: %w", err)
}

return nil
}

type ResponseMessage interface {
String() string
Encode() ([]byte, error)
Decode(in []byte) (err error)
}
26 changes: 19 additions & 7 deletions dot/network/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ const (
NetworkStateTimeout = time.Minute

// the following are sub-protocols used by the node
syncID = "/sync/2"
SyncID = "/sync/2"
lightID = "/light/2"
blockAnnounceID = "/block-announces/1"
transactionsID = "/transactions/1"
Expand Down Expand Up @@ -139,9 +139,7 @@ type Service struct {
telemetryInterval time.Duration
closeCh chan struct{}

blockResponseBuf []byte
blockResponseBufMu sync.Mutex
telemetry Telemetry
telemetry Telemetry
}

// NewService creates a new network service from the configuration and message channels
Expand Down Expand Up @@ -220,12 +218,11 @@ func NewService(cfg *Config) (*Service, error) {
closeCh: make(chan struct{}),
bufPool: bufPool,
streamManager: newStreamManager(ctx),
blockResponseBuf: make([]byte, maxBlockResponseSize),
telemetry: cfg.Telemetry,
Metrics: cfg.Metrics,
}

return network, err
return network, nil
}

// SetSyncer sets the Syncer used by the network service
Expand All @@ -252,7 +249,7 @@ func (s *Service) Start() error {
s.ctx, s.cancel = context.WithCancel(context.Background())
}

s.host.registerStreamHandler(s.host.protocolID+syncID, s.handleSyncStream)
s.host.registerStreamHandler(s.host.protocolID+SyncID, s.handleSyncStream)
s.host.registerStreamHandler(s.host.protocolID+lightID, s.handleLightStream)

// register block announce protocol
Expand Down Expand Up @@ -576,6 +573,21 @@ func (s *Service) SendMessage(to peer.ID, msg NotificationsMessage) error {
return errors.New("message not supported by any notifications protocol")
}

func (s *Service) GetRequestResponseProtocol(subprotocol string, requestTimeout time.Duration,
maxResponseSize uint64) *RequestResponseProtocol {

protocolID := s.host.protocolID + protocol.ID(subprotocol)
return &RequestResponseProtocol{
ctx: s.ctx,
host: s.host,
requestTimeout: requestTimeout,
maxResponseSize: maxResponseSize,
protocolID: protocolID,
responseBuf: make([]byte, maxResponseSize),
responseBufMu: sync.Mutex{},
}
}

// Health returns information about host needed for the rpc server
func (s *Service) Health() common.Health {
return common.Health{
Expand Down
72 changes: 2 additions & 70 deletions dot/network/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,91 +4,23 @@
package network

import (
"context"
"fmt"
"time"

"github.com/ChainSafe/gossamer/dot/peerset"
libp2pnetwork "github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
)

var (
blockRequestTimeout = time.Second * 20
BlockRequestTimeout = time.Second * 20
)

// DoBlockRequest sends a request to the given peer.
// If a response is received within a certain time period, it is returned,
// otherwise an error is returned.
func (s *Service) DoBlockRequest(to peer.ID, req *BlockRequestMessage) (*BlockResponseMessage, error) {
fullSyncID := s.host.protocolID + syncID

s.host.p2pHost.ConnManager().Protect(to, "")
defer s.host.p2pHost.ConnManager().Unprotect(to, "")

ctx, cancel := context.WithTimeout(s.ctx, blockRequestTimeout)
defer cancel()

stream, err := s.host.p2pHost.NewStream(ctx, to, fullSyncID)
if err != nil {
return nil, err
}

defer func() {
err := stream.Close()
if err != nil {
logger.Warnf("failed to close stream: %s", err)
}
}()

if err = s.host.writeToStream(stream, req); err != nil {
return nil, err
}

return s.receiveBlockResponse(stream)
}

func (s *Service) receiveBlockResponse(stream libp2pnetwork.Stream) (*BlockResponseMessage, error) {
// allocating a new (large) buffer every time slows down the syncing by a dramatic amount,
// as malloc is one of the most CPU intensive tasks.
// thus we should allocate buffers at startup and re-use them instead of allocating new ones each time.
//
// TODO: should we create another buffer pool for block response buffers?
// for bootstrap this is ok since it's not parallelized, but will need to be updated for tip-mode (#1858)
s.blockResponseBufMu.Lock()
defer s.blockResponseBufMu.Unlock()

buf := s.blockResponseBuf

n, err := readStream(stream, &buf, maxBlockResponseSize)
if err != nil {
return nil, fmt.Errorf("read stream error: %w", err)
}

if n == 0 {
return nil, fmt.Errorf("received empty message")
}

msg := new(BlockResponseMessage)
err = msg.Decode(buf[:n])
if err != nil {
s.host.cm.peerSetHandler.ReportPeer(peerset.ReputationChange{
Value: peerset.BadMessageValue,
Reason: peerset.BadMessageReason,
}, stream.Conn().RemotePeer())
return nil, fmt.Errorf("failed to decode block response: %w", err)
}

return msg, nil
}

// handleSyncStream handles streams with the <protocol-id>/sync/2 protocol ID
func (s *Service) handleSyncStream(stream libp2pnetwork.Stream) {
if stream == nil {
return
}

s.readStream(stream, decodeSyncMessage, s.handleSyncMessage, maxBlockResponseSize)
s.readStream(stream, decodeSyncMessage, s.handleSyncMessage, MaxBlockResponseSize)
}

func decodeSyncMessage(in []byte, _ peer.ID, _ bool) (Message, error) {
Expand Down
2 changes: 1 addition & 1 deletion dot/network/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (

const (
// maxBlockRequestSize uint64 = 1024 * 1024 // 1mb
maxBlockResponseSize uint64 = 1024 * 1024 * 16 // 16mb
MaxBlockResponseSize uint64 = 1024 * 1024 * 16 // 16mb
// MaxGrandpaNotificationSize is maximum size for a grandpa notification message.
MaxGrandpaNotificationSize uint64 = 1024 * 1024 // 1mb
maxTransactionsNotificationSize uint64 = 1024 * 1024 * 16 // 16mb
Expand Down
5 changes: 4 additions & 1 deletion dot/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -492,7 +492,10 @@ func (nodeBuilder) newSyncService(config *cfg.Config, st *state.Service, fg Bloc
BadBlocks: genesisData.BadBlocks,
}

return sync.NewService(syncCfg)
blockReqRes := net.GetRequestResponseProtocol(network.SyncID, network.BlockRequestTimeout,
network.MaxBlockResponseSize)

return sync.NewService(syncCfg, blockReqRes)
}

func (nodeBuilder) createDigestHandler(st *state.Service) (*digest.Handler, error) {
Expand Down
22 changes: 19 additions & 3 deletions dot/services_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,17 @@ func Test_createRuntime(t *testing.T) {
func Test_nodeBuilder_newSyncService(t *testing.T) {
t.Parallel()
finalityGadget := &grandpa.Service{}

ctrl := gomock.NewController(t)
stateSrvc := newStateService(t, ctrl)
networkConfig := &network.Config{
BasePath: t.TempDir(),
BlockState: stateSrvc.Block,
RandSeed: 2,
}
networkService, err := network.NewService(networkConfig)
require.NoError(t, err)

type args struct {
fg BlockJustificationVerifier
verifier *babe.VerificationManager
Expand All @@ -382,7 +393,7 @@ func Test_nodeBuilder_newSyncService(t *testing.T) {
fg: finalityGadget,
verifier: nil,
cs: nil,
net: nil,
net: networkService,
telemetryMailer: nil,
},
expectNil: false,
Expand Down Expand Up @@ -546,10 +557,15 @@ func TestCreateSyncService(t *testing.T) {
dh, err := builder.createDigestHandler(stateSrvc)
require.NoError(t, err)

coreSrvc, err := builder.createCoreService(config, ks, stateSrvc, &network.Service{}, dh)
networkService, err := network.NewService(&network.Config{
BlockState: stateSrvc.Block,
})
require.NoError(t, err)

coreSrvc, err := builder.createCoreService(config, ks, stateSrvc, networkService, dh)
require.NoError(t, err)

_, err = builder.newSyncService(config, stateSrvc, &grandpa.Service{}, ver, coreSrvc, &network.Service{}, nil)
_, err = builder.newSyncService(config, stateSrvc, &grandpa.Service{}, ver, coreSrvc, networkService, nil)
require.NoError(t, err)
}

Expand Down
Loading

0 comments on commit ac52090

Please sign in to comment.