Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(dot/network): added common request-response protocol #3334

Merged
merged 12 commits into from
Jul 4, 2023
4 changes: 2 additions & 2 deletions dot/network/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,14 +81,14 @@ func (s *testStreamHandler) writeToStream(stream libp2pnetwork.Stream, msg Messa

func (s *testStreamHandler) readStream(stream libp2pnetwork.Stream,
peer peer.ID, decoder messageDecoder, handler messageHandler) {
msgBytes := make([]byte, maxBlockResponseSize)
msgBytes := make([]byte, MaxBlockResponseSize)

defer func() {
s.exit = true
}()

for {
tot, err := readStream(stream, &msgBytes, maxBlockResponseSize)
tot, err := readStream(stream, &msgBytes, MaxBlockResponseSize)
if errors.Is(err, io.EOF) {
return
} else if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion dot/network/light.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (

// handleLightStream handles streams with the <protocol-id>/light/2 protocol ID
func (s *Service) handleLightStream(stream libp2pnetwork.Stream) {
s.readStream(stream, s.decodeLightMessage, s.handleLightMsg, maxBlockResponseSize)
s.readStream(stream, s.decodeLightMessage, s.handleLightMsg, MaxBlockResponseSize)
}

func (s *Service) decodeLightMessage(in []byte, peer peer.ID, _ bool) (Message, error) {
Expand Down
4 changes: 2 additions & 2 deletions dot/network/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ const (
RequestedDataJustification = byte(16)
)

var _ Message = &BlockRequestMessage{}
var _ Message = (*BlockRequestMessage)(nil)

// SyncDirection is the direction of data in a block response
type SyncDirection byte
Expand Down Expand Up @@ -167,7 +167,7 @@ func (bm *BlockRequestMessage) Decode(in []byte) error {
return nil
}

var _ Message = &BlockResponseMessage{}
var _ ResponseMessage = (*BlockResponseMessage)(nil)

// BlockResponseMessage is sent in response to a BlockRequestMessage
type BlockResponseMessage struct {
Expand Down
92 changes: 92 additions & 0 deletions dot/network/request_response.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
// Copyright 2021 ChainSafe Systems (ON)
// SPDX-License-Identifier: LGPL-3.0-only

package network

import (
"context"
"fmt"
"sync"
"time"

"github.com/ChainSafe/gossamer/dot/peerset"
libp2pnetwork "github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/protocol"
)

type RequestMaker interface {
Do(to peer.ID, req Message, res ResponseMessage) error
}

type RequestResponseProtocol struct {
ctx context.Context
host *host
requestTimeout time.Duration
maxResponseSize uint64
protocolID protocol.ID
responseBufMu sync.Mutex
responseBuf []byte
}

func (rrp *RequestResponseProtocol) Do(to peer.ID, req Message, res ResponseMessage) error {
rrp.host.p2pHost.ConnManager().Protect(to, "")
defer rrp.host.p2pHost.ConnManager().Unprotect(to, "")

ctx, cancel := context.WithTimeout(rrp.ctx, rrp.requestTimeout)
defer cancel()

stream, err := rrp.host.p2pHost.NewStream(ctx, to, rrp.protocolID)
if err != nil {
return err
}

defer func() {
err := stream.Close()
if err != nil {
logger.Warnf("failed to close stream: %s", err)
}
}()

if err = rrp.host.writeToStream(stream, req); err != nil {
return err
}

return rrp.receiveResponse(stream, res)
}

func (rrp *RequestResponseProtocol) receiveResponse(stream libp2pnetwork.Stream, msg ResponseMessage) error {
// allocating a new (large) buffer every time slows down receiving response by a dramatic amount,
// as malloc is one of the most CPU intensive tasks.
// thus we should allocate buffers at startup and re-use them instead of allocating new ones each time.
kishansagathiya marked this conversation as resolved.
Show resolved Hide resolved
rrp.responseBufMu.Lock()
defer rrp.responseBufMu.Unlock()

buf := rrp.responseBuf

n, err := readStream(stream, &buf, rrp.maxResponseSize)
if err != nil {
return fmt.Errorf("read stream error: %w", err)
}

if n == 0 {
return fmt.Errorf("received empty message")
}

err = msg.Decode(buf[:n])
if err != nil {
rrp.host.cm.peerSetHandler.ReportPeer(peerset.ReputationChange{
Value: peerset.BadMessageValue,
Reason: peerset.BadMessageReason,
}, stream.Conn().RemotePeer())
return fmt.Errorf("failed to decode block response: %w", err)
}

return nil
}

type ResponseMessage interface {
String() string
Encode() ([]byte, error)
Decode(in []byte) (err error)
}
26 changes: 19 additions & 7 deletions dot/network/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ const (
NetworkStateTimeout = time.Minute

// the following are sub-protocols used by the node
syncID = "/sync/2"
SyncID = "/sync/2"
lightID = "/light/2"
blockAnnounceID = "/block-announces/1"
transactionsID = "/transactions/1"
Expand Down Expand Up @@ -139,9 +139,7 @@ type Service struct {
telemetryInterval time.Duration
closeCh chan struct{}

blockResponseBuf []byte
blockResponseBufMu sync.Mutex
telemetry Telemetry
telemetry Telemetry
}

// NewService creates a new network service from the configuration and message channels
Expand Down Expand Up @@ -220,12 +218,11 @@ func NewService(cfg *Config) (*Service, error) {
closeCh: make(chan struct{}),
bufPool: bufPool,
streamManager: newStreamManager(ctx),
blockResponseBuf: make([]byte, maxBlockResponseSize),
telemetry: cfg.Telemetry,
Metrics: cfg.Metrics,
}

return network, err
return network, nil
}

// SetSyncer sets the Syncer used by the network service
Expand All @@ -252,7 +249,7 @@ func (s *Service) Start() error {
s.ctx, s.cancel = context.WithCancel(context.Background())
}

s.host.registerStreamHandler(s.host.protocolID+syncID, s.handleSyncStream)
s.host.registerStreamHandler(s.host.protocolID+SyncID, s.handleSyncStream)
s.host.registerStreamHandler(s.host.protocolID+lightID, s.handleLightStream)

// register block announce protocol
Expand Down Expand Up @@ -576,6 +573,21 @@ func (s *Service) SendMessage(to peer.ID, msg NotificationsMessage) error {
return errors.New("message not supported by any notifications protocol")
}

func (s *Service) GetRequestResponseProtocol(subprotocol string, requestTimeout time.Duration,
maxResponseSize uint64) *RequestResponseProtocol {

protocolID := s.host.protocolID + protocol.ID(subprotocol)
return &RequestResponseProtocol{
ctx: s.ctx,
host: s.host,
requestTimeout: requestTimeout,
maxResponseSize: maxResponseSize,
protocolID: protocolID,
responseBuf: make([]byte, maxResponseSize),
responseBufMu: sync.Mutex{},
}
}

// Health returns information about host needed for the rpc server
func (s *Service) Health() common.Health {
return common.Health{
Expand Down
72 changes: 2 additions & 70 deletions dot/network/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,91 +4,23 @@
package network

import (
"context"
"fmt"
"time"

"github.com/ChainSafe/gossamer/dot/peerset"
libp2pnetwork "github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
)

var (
blockRequestTimeout = time.Second * 20
BlockRequestTimeout = time.Second * 20
)

// DoBlockRequest sends a request to the given peer.
// If a response is received within a certain time period, it is returned,
// otherwise an error is returned.
func (s *Service) DoBlockRequest(to peer.ID, req *BlockRequestMessage) (*BlockResponseMessage, error) {
fullSyncID := s.host.protocolID + syncID

s.host.p2pHost.ConnManager().Protect(to, "")
defer s.host.p2pHost.ConnManager().Unprotect(to, "")

ctx, cancel := context.WithTimeout(s.ctx, blockRequestTimeout)
defer cancel()

stream, err := s.host.p2pHost.NewStream(ctx, to, fullSyncID)
if err != nil {
return nil, err
}

defer func() {
err := stream.Close()
if err != nil {
logger.Warnf("failed to close stream: %s", err)
}
}()

if err = s.host.writeToStream(stream, req); err != nil {
return nil, err
}

return s.receiveBlockResponse(stream)
}

func (s *Service) receiveBlockResponse(stream libp2pnetwork.Stream) (*BlockResponseMessage, error) {
// allocating a new (large) buffer every time slows down the syncing by a dramatic amount,
// as malloc is one of the most CPU intensive tasks.
// thus we should allocate buffers at startup and re-use them instead of allocating new ones each time.
//
// TODO: should we create another buffer pool for block response buffers?
// for bootstrap this is ok since it's not parallelized, but will need to be updated for tip-mode (#1858)
s.blockResponseBufMu.Lock()
defer s.blockResponseBufMu.Unlock()

buf := s.blockResponseBuf

n, err := readStream(stream, &buf, maxBlockResponseSize)
if err != nil {
return nil, fmt.Errorf("read stream error: %w", err)
}

if n == 0 {
return nil, fmt.Errorf("received empty message")
}

msg := new(BlockResponseMessage)
err = msg.Decode(buf[:n])
if err != nil {
s.host.cm.peerSetHandler.ReportPeer(peerset.ReputationChange{
Value: peerset.BadMessageValue,
Reason: peerset.BadMessageReason,
}, stream.Conn().RemotePeer())
return nil, fmt.Errorf("failed to decode block response: %w", err)
}

return msg, nil
}

// handleSyncStream handles streams with the <protocol-id>/sync/2 protocol ID
func (s *Service) handleSyncStream(stream libp2pnetwork.Stream) {
if stream == nil {
return
}

s.readStream(stream, decodeSyncMessage, s.handleSyncMessage, maxBlockResponseSize)
s.readStream(stream, decodeSyncMessage, s.handleSyncMessage, MaxBlockResponseSize)
}

func decodeSyncMessage(in []byte, _ peer.ID, _ bool) (Message, error) {
Expand Down
2 changes: 1 addition & 1 deletion dot/network/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (

const (
// maxBlockRequestSize uint64 = 1024 * 1024 // 1mb
maxBlockResponseSize uint64 = 1024 * 1024 * 16 // 16mb
MaxBlockResponseSize uint64 = 1024 * 1024 * 16 // 16mb
// MaxGrandpaNotificationSize is maximum size for a grandpa notification message.
MaxGrandpaNotificationSize uint64 = 1024 * 1024 // 1mb
maxTransactionsNotificationSize uint64 = 1024 * 1024 * 16 // 16mb
Expand Down
5 changes: 4 additions & 1 deletion dot/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -492,7 +492,10 @@ func (nodeBuilder) newSyncService(config *cfg.Config, st *state.Service, fg Bloc
BadBlocks: genesisData.BadBlocks,
}

return sync.NewService(syncCfg)
blockReqRes := net.GetRequestResponseProtocol(network.SyncID, network.BlockRequestTimeout,
network.MaxBlockResponseSize)

return sync.NewService(syncCfg, blockReqRes)
}

func (nodeBuilder) createDigestHandler(st *state.Service) (*digest.Handler, error) {
Expand Down
22 changes: 19 additions & 3 deletions dot/services_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,17 @@ func Test_createRuntime(t *testing.T) {
func Test_nodeBuilder_newSyncService(t *testing.T) {
t.Parallel()
finalityGadget := &grandpa.Service{}

ctrl := gomock.NewController(t)
stateSrvc := newStateService(t, ctrl)
networkConfig := &network.Config{
BasePath: t.TempDir(),
BlockState: stateSrvc.Block,
RandSeed: 2,
}
networkService, err := network.NewService(networkConfig)
require.NoError(t, err)

type args struct {
fg BlockJustificationVerifier
verifier *babe.VerificationManager
Expand All @@ -382,7 +393,7 @@ func Test_nodeBuilder_newSyncService(t *testing.T) {
fg: finalityGadget,
verifier: nil,
cs: nil,
net: nil,
net: networkService,
telemetryMailer: nil,
},
expectNil: false,
Expand Down Expand Up @@ -546,10 +557,15 @@ func TestCreateSyncService(t *testing.T) {
dh, err := builder.createDigestHandler(stateSrvc)
require.NoError(t, err)

coreSrvc, err := builder.createCoreService(config, ks, stateSrvc, &network.Service{}, dh)
networkService, err := network.NewService(&network.Config{
BlockState: stateSrvc.Block,
})
require.NoError(t, err)

coreSrvc, err := builder.createCoreService(config, ks, stateSrvc, networkService, dh)
require.NoError(t, err)

_, err = builder.newSyncService(config, stateSrvc, &grandpa.Service{}, ver, coreSrvc, &network.Service{}, nil)
_, err = builder.newSyncService(config, stateSrvc, &grandpa.Service{}, ver, coreSrvc, networkService, nil)
require.NoError(t, err)
}

Expand Down
Loading
Loading