diff --git a/dot/network/helpers_test.go b/dot/network/helpers_test.go index e65855836a..ba57e81e7d 100644 --- a/dot/network/helpers_test.go +++ b/dot/network/helpers_test.go @@ -81,14 +81,14 @@ func (s *testStreamHandler) writeToStream(stream libp2pnetwork.Stream, msg Messa func (s *testStreamHandler) readStream(stream libp2pnetwork.Stream, peer peer.ID, decoder messageDecoder, handler messageHandler) { - msgBytes := make([]byte, maxBlockResponseSize) + msgBytes := make([]byte, MaxBlockResponseSize) defer func() { s.exit = true }() for { - tot, err := readStream(stream, &msgBytes, maxBlockResponseSize) + tot, err := readStream(stream, &msgBytes, MaxBlockResponseSize) if errors.Is(err, io.EOF) { return } else if err != nil { diff --git a/dot/network/light.go b/dot/network/light.go index 6be6b4b910..28d0fb3b38 100644 --- a/dot/network/light.go +++ b/dot/network/light.go @@ -16,7 +16,7 @@ import ( // handleLightStream handles streams with the /light/2 protocol ID func (s *Service) handleLightStream(stream libp2pnetwork.Stream) { - s.readStream(stream, s.decodeLightMessage, s.handleLightMsg, maxBlockResponseSize) + s.readStream(stream, s.decodeLightMessage, s.handleLightMsg, MaxBlockResponseSize) } func (s *Service) decodeLightMessage(in []byte, peer peer.ID, _ bool) (Message, error) { diff --git a/dot/network/message.go b/dot/network/message.go index 7afa554738..144abe78cf 100644 --- a/dot/network/message.go +++ b/dot/network/message.go @@ -46,7 +46,7 @@ const ( RequestedDataJustification = byte(16) ) -var _ Message = &BlockRequestMessage{} +var _ Message = (*BlockRequestMessage)(nil) // SyncDirection is the direction of data in a block response type SyncDirection byte @@ -167,7 +167,7 @@ func (bm *BlockRequestMessage) Decode(in []byte) error { return nil } -var _ Message = &BlockResponseMessage{} +var _ ResponseMessage = (*BlockResponseMessage)(nil) // BlockResponseMessage is sent in response to a BlockRequestMessage type BlockResponseMessage struct { diff --git a/dot/network/request_response.go b/dot/network/request_response.go new file mode 100644 index 0000000000..1671bca2f2 --- /dev/null +++ b/dot/network/request_response.go @@ -0,0 +1,92 @@ +// Copyright 2021 ChainSafe Systems (ON) +// SPDX-License-Identifier: LGPL-3.0-only + +package network + +import ( + "context" + "fmt" + "sync" + "time" + + "github.com/ChainSafe/gossamer/dot/peerset" + libp2pnetwork "github.com/libp2p/go-libp2p/core/network" + "github.com/libp2p/go-libp2p/core/peer" + "github.com/libp2p/go-libp2p/core/protocol" +) + +type RequestMaker interface { + Do(to peer.ID, req Message, res ResponseMessage) error +} + +type RequestResponseProtocol struct { + ctx context.Context + host *host + requestTimeout time.Duration + maxResponseSize uint64 + protocolID protocol.ID + responseBufMu sync.Mutex + responseBuf []byte +} + +func (rrp *RequestResponseProtocol) Do(to peer.ID, req Message, res ResponseMessage) error { + rrp.host.p2pHost.ConnManager().Protect(to, "") + defer rrp.host.p2pHost.ConnManager().Unprotect(to, "") + + ctx, cancel := context.WithTimeout(rrp.ctx, rrp.requestTimeout) + defer cancel() + + stream, err := rrp.host.p2pHost.NewStream(ctx, to, rrp.protocolID) + if err != nil { + return err + } + + defer func() { + err := stream.Close() + if err != nil { + logger.Warnf("failed to close stream: %s", err) + } + }() + + if err = rrp.host.writeToStream(stream, req); err != nil { + return err + } + + return rrp.receiveResponse(stream, res) +} + +func (rrp *RequestResponseProtocol) receiveResponse(stream libp2pnetwork.Stream, msg ResponseMessage) error { + // allocating a new (large) buffer every time slows down receiving response by a dramatic amount, + // as malloc is one of the most CPU intensive tasks. + // thus we should allocate buffers at startup and re-use them instead of allocating new ones each time. + rrp.responseBufMu.Lock() + defer rrp.responseBufMu.Unlock() + + buf := rrp.responseBuf + + n, err := readStream(stream, &buf, rrp.maxResponseSize) + if err != nil { + return fmt.Errorf("read stream error: %w", err) + } + + if n == 0 { + return fmt.Errorf("received empty message") + } + + err = msg.Decode(buf[:n]) + if err != nil { + rrp.host.cm.peerSetHandler.ReportPeer(peerset.ReputationChange{ + Value: peerset.BadMessageValue, + Reason: peerset.BadMessageReason, + }, stream.Conn().RemotePeer()) + return fmt.Errorf("failed to decode block response: %w", err) + } + + return nil +} + +type ResponseMessage interface { + String() string + Encode() ([]byte, error) + Decode(in []byte) (err error) +} diff --git a/dot/network/service.go b/dot/network/service.go index 1cd9587c79..e2da8d43d3 100644 --- a/dot/network/service.go +++ b/dot/network/service.go @@ -30,7 +30,7 @@ const ( NetworkStateTimeout = time.Minute // the following are sub-protocols used by the node - syncID = "/sync/2" + SyncID = "/sync/2" lightID = "/light/2" blockAnnounceID = "/block-announces/1" transactionsID = "/transactions/1" @@ -139,9 +139,7 @@ type Service struct { telemetryInterval time.Duration closeCh chan struct{} - blockResponseBuf []byte - blockResponseBufMu sync.Mutex - telemetry Telemetry + telemetry Telemetry } // NewService creates a new network service from the configuration and message channels @@ -220,12 +218,11 @@ func NewService(cfg *Config) (*Service, error) { closeCh: make(chan struct{}), bufPool: bufPool, streamManager: newStreamManager(ctx), - blockResponseBuf: make([]byte, maxBlockResponseSize), telemetry: cfg.Telemetry, Metrics: cfg.Metrics, } - return network, err + return network, nil } // SetSyncer sets the Syncer used by the network service @@ -252,7 +249,7 @@ func (s *Service) Start() error { s.ctx, s.cancel = context.WithCancel(context.Background()) } - s.host.registerStreamHandler(s.host.protocolID+syncID, s.handleSyncStream) + s.host.registerStreamHandler(s.host.protocolID+SyncID, s.handleSyncStream) s.host.registerStreamHandler(s.host.protocolID+lightID, s.handleLightStream) // register block announce protocol @@ -576,6 +573,21 @@ func (s *Service) SendMessage(to peer.ID, msg NotificationsMessage) error { return errors.New("message not supported by any notifications protocol") } +func (s *Service) GetRequestResponseProtocol(subprotocol string, requestTimeout time.Duration, + maxResponseSize uint64) *RequestResponseProtocol { + + protocolID := s.host.protocolID + protocol.ID(subprotocol) + return &RequestResponseProtocol{ + ctx: s.ctx, + host: s.host, + requestTimeout: requestTimeout, + maxResponseSize: maxResponseSize, + protocolID: protocolID, + responseBuf: make([]byte, maxResponseSize), + responseBufMu: sync.Mutex{}, + } +} + // Health returns information about host needed for the rpc server func (s *Service) Health() common.Health { return common.Health{ diff --git a/dot/network/sync.go b/dot/network/sync.go index 54e32c461f..efef281203 100644 --- a/dot/network/sync.go +++ b/dot/network/sync.go @@ -4,91 +4,23 @@ package network import ( - "context" - "fmt" "time" - "github.com/ChainSafe/gossamer/dot/peerset" libp2pnetwork "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/peer" ) var ( - blockRequestTimeout = time.Second * 20 + BlockRequestTimeout = time.Second * 20 ) -// DoBlockRequest sends a request to the given peer. -// If a response is received within a certain time period, it is returned, -// otherwise an error is returned. -func (s *Service) DoBlockRequest(to peer.ID, req *BlockRequestMessage) (*BlockResponseMessage, error) { - fullSyncID := s.host.protocolID + syncID - - s.host.p2pHost.ConnManager().Protect(to, "") - defer s.host.p2pHost.ConnManager().Unprotect(to, "") - - ctx, cancel := context.WithTimeout(s.ctx, blockRequestTimeout) - defer cancel() - - stream, err := s.host.p2pHost.NewStream(ctx, to, fullSyncID) - if err != nil { - return nil, err - } - - defer func() { - err := stream.Close() - if err != nil { - logger.Warnf("failed to close stream: %s", err) - } - }() - - if err = s.host.writeToStream(stream, req); err != nil { - return nil, err - } - - return s.receiveBlockResponse(stream) -} - -func (s *Service) receiveBlockResponse(stream libp2pnetwork.Stream) (*BlockResponseMessage, error) { - // allocating a new (large) buffer every time slows down the syncing by a dramatic amount, - // as malloc is one of the most CPU intensive tasks. - // thus we should allocate buffers at startup and re-use them instead of allocating new ones each time. - // - // TODO: should we create another buffer pool for block response buffers? - // for bootstrap this is ok since it's not parallelized, but will need to be updated for tip-mode (#1858) - s.blockResponseBufMu.Lock() - defer s.blockResponseBufMu.Unlock() - - buf := s.blockResponseBuf - - n, err := readStream(stream, &buf, maxBlockResponseSize) - if err != nil { - return nil, fmt.Errorf("read stream error: %w", err) - } - - if n == 0 { - return nil, fmt.Errorf("received empty message") - } - - msg := new(BlockResponseMessage) - err = msg.Decode(buf[:n]) - if err != nil { - s.host.cm.peerSetHandler.ReportPeer(peerset.ReputationChange{ - Value: peerset.BadMessageValue, - Reason: peerset.BadMessageReason, - }, stream.Conn().RemotePeer()) - return nil, fmt.Errorf("failed to decode block response: %w", err) - } - - return msg, nil -} - // handleSyncStream handles streams with the /sync/2 protocol ID func (s *Service) handleSyncStream(stream libp2pnetwork.Stream) { if stream == nil { return } - s.readStream(stream, decodeSyncMessage, s.handleSyncMessage, maxBlockResponseSize) + s.readStream(stream, decodeSyncMessage, s.handleSyncMessage, MaxBlockResponseSize) } func decodeSyncMessage(in []byte, _ peer.ID, _ bool) (Message, error) { diff --git a/dot/network/utils.go b/dot/network/utils.go index 61a8414839..1391a1f91c 100644 --- a/dot/network/utils.go +++ b/dot/network/utils.go @@ -21,7 +21,7 @@ import ( const ( // maxBlockRequestSize uint64 = 1024 * 1024 // 1mb - maxBlockResponseSize uint64 = 1024 * 1024 * 16 // 16mb + MaxBlockResponseSize uint64 = 1024 * 1024 * 16 // 16mb // MaxGrandpaNotificationSize is maximum size for a grandpa notification message. MaxGrandpaNotificationSize uint64 = 1024 * 1024 // 1mb maxTransactionsNotificationSize uint64 = 1024 * 1024 * 16 // 16mb diff --git a/dot/services.go b/dot/services.go index 4c7a2f96e1..89ecf571eb 100644 --- a/dot/services.go +++ b/dot/services.go @@ -492,7 +492,10 @@ func (nodeBuilder) newSyncService(config *cfg.Config, st *state.Service, fg Bloc BadBlocks: genesisData.BadBlocks, } - return sync.NewService(syncCfg) + blockReqRes := net.GetRequestResponseProtocol(network.SyncID, network.BlockRequestTimeout, + network.MaxBlockResponseSize) + + return sync.NewService(syncCfg, blockReqRes) } func (nodeBuilder) createDigestHandler(st *state.Service) (*digest.Handler, error) { diff --git a/dot/services_integration_test.go b/dot/services_integration_test.go index 6750742428..0f2c34ff38 100644 --- a/dot/services_integration_test.go +++ b/dot/services_integration_test.go @@ -363,6 +363,17 @@ func Test_createRuntime(t *testing.T) { func Test_nodeBuilder_newSyncService(t *testing.T) { t.Parallel() finalityGadget := &grandpa.Service{} + + ctrl := gomock.NewController(t) + stateSrvc := newStateService(t, ctrl) + networkConfig := &network.Config{ + BasePath: t.TempDir(), + BlockState: stateSrvc.Block, + RandSeed: 2, + } + networkService, err := network.NewService(networkConfig) + require.NoError(t, err) + type args struct { fg BlockJustificationVerifier verifier *babe.VerificationManager @@ -382,7 +393,7 @@ func Test_nodeBuilder_newSyncService(t *testing.T) { fg: finalityGadget, verifier: nil, cs: nil, - net: nil, + net: networkService, telemetryMailer: nil, }, expectNil: false, @@ -546,10 +557,15 @@ func TestCreateSyncService(t *testing.T) { dh, err := builder.createDigestHandler(stateSrvc) require.NoError(t, err) - coreSrvc, err := builder.createCoreService(config, ks, stateSrvc, &network.Service{}, dh) + networkService, err := network.NewService(&network.Config{ + BlockState: stateSrvc.Block, + }) + require.NoError(t, err) + + coreSrvc, err := builder.createCoreService(config, ks, stateSrvc, networkService, dh) require.NoError(t, err) - _, err = builder.newSyncService(config, stateSrvc, &grandpa.Service{}, ver, coreSrvc, &network.Service{}, nil) + _, err = builder.newSyncService(config, stateSrvc, &grandpa.Service{}, ver, coreSrvc, networkService, nil) require.NoError(t, err) } diff --git a/dot/sync/chain_sync.go b/dot/sync/chain_sync.go index 0629cb9400..48e5c1232e 100644 --- a/dot/sync/chain_sync.go +++ b/dot/sync/chain_sync.go @@ -167,6 +167,8 @@ type chainSync struct { logSyncStarted bool logSyncDone chan struct{} badBlocks []string + + blockReqRes network.RequestMaker } type chainSyncConfig struct { @@ -179,7 +181,7 @@ type chainSyncConfig struct { badBlocks []string } -func newChainSync(cfg chainSyncConfig) *chainSync { +func newChainSync(cfg chainSyncConfig, blockReqRes network.RequestMaker) *chainSync { ctx, cancel := context.WithCancel(context.Background()) const syncSamplesToKeep = 30 const logSyncPeriod = 5 * time.Second @@ -208,6 +210,7 @@ func newChainSync(cfg chainSyncConfig) *chainSync { logSyncTickerC: logSyncTicker.C, logSyncDone: make(chan struct{}), badBlocks: cfg.badBlocks, + blockReqRes: blockReqRes, } } @@ -695,7 +698,10 @@ func (cs *chainSync) doSync(req *network.BlockRequestMessage, peersTried map[pee // TODO: use scoring to determine what peer to try to sync from first (#1399) idx, _ := rand.Int(rand.Reader, big.NewInt(int64(len(peers)))) who := peers[idx.Int64()] - resp, err := cs.network.DoBlockRequest(who, req) + + resp := new(network.BlockResponseMessage) + + err := cs.blockReqRes.Do(who, req, resp) if err != nil { return &workerError{ err: err, @@ -703,13 +709,6 @@ func (cs *chainSync) doSync(req *network.BlockRequestMessage, peersTried map[pee } } - if resp == nil { - return &workerError{ - err: errNilResponse, - who: who, - } - } - if req.Direction == network.Descending { // reverse blocks before pre-validating and placing in ready queue reverseBlockData(resp.BlockData) diff --git a/dot/sync/chain_sync_test.go b/dot/sync/chain_sync_test.go index 1633371d4c..a53aa8ba4c 100644 --- a/dot/sync/chain_sync_test.go +++ b/dot/sync/chain_sync_test.go @@ -398,12 +398,15 @@ func TestChainSync_sync_bootstrap_withWorkerError(t *testing.T) { mockNetwork := NewMockNetwork(ctrl) startingBlock := variadic.MustNewUint32OrHash(1) max := uint32(128) - mockNetwork.EXPECT().DoBlockRequest(peer.ID("noot"), &network.BlockRequestMessage{ + + mockReqRes := NewMockRequestMaker(ctrl) + mockReqRes.EXPECT().Do(peer.ID("noot"), &network.BlockRequestMessage{ RequestedData: 19, StartingBlock: *startingBlock, Direction: 0, Max: &max, - }) + }, &network.BlockResponseMessage{}) + cs.blockReqRes = mockReqRes cs.network = mockNetwork go cs.sync() @@ -419,7 +422,7 @@ func TestChainSync_sync_bootstrap_withWorkerError(t *testing.T) { select { case res := <-cs.resultQueue: expected := &workerError{ - err: errNilResponse, // since MockNetwork returns a nil response + err: errEmptyBlockData, // since MockNetwork returns a nil response who: testPeer, } require.Equal(t, expected, res.err) @@ -889,7 +892,9 @@ func TestChainSync_validateResponse(t *testing.T) { badBlockHash.String(), }, } - cs := newChainSync(cfg) + mockReqRes := NewMockRequestMaker(ctrl) + + cs := newChainSync(cfg, mockReqRes) err := cs.validateResponse(tt.req, tt.resp, "") if tt.expectedError != nil { @@ -931,19 +936,23 @@ func TestChainSync_doSync(t *testing.T) { mockNetwork := NewMockNetwork(ctrl) startingBlock := variadic.MustNewUint32OrHash(1) max1 := uint32(1) - mockNetwork.EXPECT().DoBlockRequest(peer.ID("noot"), &network.BlockRequestMessage{ + + mockReqRes := NewMockRequestMaker(ctrl) + mockReqRes.EXPECT().Do(peer.ID("noot"), &network.BlockRequestMessage{ RequestedData: 19, StartingBlock: *startingBlock, Direction: 0, Max: &max1, - }) + }, &network.BlockResponseMessage{}) + cs.blockReqRes = mockReqRes + cs.network = mockNetwork workerErr = cs.doSync(req, make(map[peer.ID]struct{})) require.NotNil(t, workerErr) - require.Equal(t, errNilResponse, workerErr.err) + require.Equal(t, errEmptyBlockData, workerErr.err) - resp := &network.BlockResponseMessage{ + expectedResp := &network.BlockResponseMessage{ BlockData: []*types.BlockData{ { Hash: common.Hash{0x1}, @@ -955,26 +964,28 @@ func TestChainSync_doSync(t *testing.T) { }, } - mockNetwork = NewMockNetwork(ctrl) - mockNetwork.EXPECT().DoBlockRequest(peer.ID("noot"), &network.BlockRequestMessage{ + mockReqRes.EXPECT().Do(peer.ID("noot"), &network.BlockRequestMessage{ RequestedData: 19, StartingBlock: *startingBlock, Direction: 0, Max: &max1, - }).Return(resp, nil) - cs.network = mockNetwork + }, &network.BlockResponseMessage{}).Do( + func(_ peer.ID, _ *network.BlockRequestMessage, resp *network.BlockResponseMessage) { + *resp = *expectedResp + }, + ) workerErr = cs.doSync(req, make(map[peer.ID]struct{})) require.Nil(t, workerErr) bd, err := readyBlocks.pop(context.Background()) require.NotNil(t, bd) require.NoError(t, err) - require.Equal(t, resp.BlockData[0], bd) + require.Equal(t, expectedResp.BlockData[0], bd) parent := (&types.Header{ Number: 2, }).Hash() - resp = &network.BlockResponseMessage{ + expectedResp = &network.BlockResponseMessage{ BlockData: []*types.BlockData{ { Hash: common.Hash{0x3}, @@ -996,25 +1007,30 @@ func TestChainSync_doSync(t *testing.T) { // test to see if descending blocks get reversed req.Direction = network.Descending - mockNetwork = NewMockNetwork(ctrl) - mockNetwork.EXPECT().DoBlockRequest(peer.ID("noot"), &network.BlockRequestMessage{ + + mockReqRes.EXPECT().Do(peer.ID("noot"), &network.BlockRequestMessage{ RequestedData: 19, StartingBlock: *startingBlock, Direction: 1, Max: &max1, - }).Return(resp, nil) + }, &network.BlockResponseMessage{}).Do( + func(_ peer.ID, _ *network.BlockRequestMessage, resp *network.BlockResponseMessage) { + *resp = *expectedResp + }, + ) + cs.network = mockNetwork workerErr = cs.doSync(req, make(map[peer.ID]struct{})) require.Nil(t, workerErr) bd, err = readyBlocks.pop(context.Background()) require.NotNil(t, bd) - require.Equal(t, resp.BlockData[0], bd) + require.Equal(t, expectedResp.BlockData[0], bd) require.NoError(t, err) bd, err = readyBlocks.pop(context.Background()) require.NotNil(t, bd) - require.Equal(t, resp.BlockData[1], bd) + require.Equal(t, expectedResp.BlockData[1], bd) require.NoError(t, err) } @@ -1634,8 +1650,9 @@ func newTestChainSyncWithReadyBlocks(ctrl *gomock.Controller, readyBlocks *block maxPeers: 5, slotDuration: defaultSlotDuration, } + mockReqRes := NewMockRequestMaker(ctrl) - return newChainSync(cfg) + return newChainSync(cfg, mockReqRes) } func newTestChainSync(ctrl *gomock.Controller) *chainSync { diff --git a/dot/sync/errors.go b/dot/sync/errors.go index 1a90802d68..fa4b4ebcad 100644 --- a/dot/sync/errors.go +++ b/dot/sync/errors.go @@ -22,7 +22,6 @@ var ( // chainSync errors errEmptyBlockData = errors.New("empty block data") errNilBlockData = errors.New("block data is nil") - errNilResponse = errors.New("block response is nil") errNilHeaderInResponse = errors.New("expected header, received none") errNilBodyInResponse = errors.New("expected body, received none") errNoPeers = errors.New("no peers to sync with") diff --git a/dot/sync/interfaces.go b/dot/sync/interfaces.go index ebc3cf7192..db38d06e3e 100644 --- a/dot/sync/interfaces.go +++ b/dot/sync/interfaces.go @@ -7,7 +7,6 @@ import ( "encoding/json" "sync" - "github.com/ChainSafe/gossamer/dot/network" "github.com/ChainSafe/gossamer/dot/peerset" "github.com/ChainSafe/gossamer/dot/types" "github.com/ChainSafe/gossamer/lib/common" @@ -71,11 +70,6 @@ type BlockImportHandler interface { // Network is the interface for the network type Network interface { - // DoBlockRequest sends a request to the given peer. - // If a response is received within a certain time period, - // it is returned, otherwise an error is returned. - DoBlockRequest(to peer.ID, req *network.BlockRequestMessage) (*network.BlockResponseMessage, error) - // Peers returns a list of currently connected peers Peers() []common.PeerInfo diff --git a/dot/sync/mock_req_res.go b/dot/sync/mock_req_res.go new file mode 100644 index 0000000000..b4d133f7e7 --- /dev/null +++ b/dot/sync/mock_req_res.go @@ -0,0 +1,50 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: github.com/ChainSafe/gossamer/dot/network (interfaces: RequestMaker) + +// Package sync is a generated GoMock package. +package sync + +import ( + reflect "reflect" + + network "github.com/ChainSafe/gossamer/dot/network" + gomock "github.com/golang/mock/gomock" + peer "github.com/libp2p/go-libp2p/core/peer" +) + +// MockRequestMaker is a mock of RequestMaker interface. +type MockRequestMaker struct { + ctrl *gomock.Controller + recorder *MockRequestMakerMockRecorder +} + +// MockRequestMakerMockRecorder is the mock recorder for MockRequestMaker. +type MockRequestMakerMockRecorder struct { + mock *MockRequestMaker +} + +// NewMockRequestMaker creates a new mock instance. +func NewMockRequestMaker(ctrl *gomock.Controller) *MockRequestMaker { + mock := &MockRequestMaker{ctrl: ctrl} + mock.recorder = &MockRequestMakerMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockRequestMaker) EXPECT() *MockRequestMakerMockRecorder { + return m.recorder +} + +// Do mocks base method. +func (m *MockRequestMaker) Do(arg0 peer.ID, arg1 network.Message, arg2 network.ResponseMessage) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Do", arg0, arg1, arg2) + ret0, _ := ret[0].(error) + return ret0 +} + +// Do indicates an expected call of Do. +func (mr *MockRequestMakerMockRecorder) Do(arg0, arg1, arg2 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Do", reflect.TypeOf((*MockRequestMaker)(nil).Do), arg0, arg1, arg2) +} diff --git a/dot/sync/mocks_generate_test.go b/dot/sync/mocks_generate_test.go index 0889fd2845..7d4e8cb064 100644 --- a/dot/sync/mocks_generate_test.go +++ b/dot/sync/mocks_generate_test.go @@ -6,6 +6,7 @@ package sync //go:generate mockgen -destination=mocks_test.go -package=$GOPACKAGE . BlockState,StorageState,TransactionState,BabeVerifier,FinalityGadget,BlockImportHandler,Network //go:generate mockgen -destination=mock_telemetry_test.go -package $GOPACKAGE . Telemetry //go:generate mockgen -destination=mock_runtime_test.go -package $GOPACKAGE github.com/ChainSafe/gossamer/lib/runtime Instance +//go:generate mockgen -destination=mock_req_res.go -package $GOPACKAGE github.com/ChainSafe/gossamer/dot/network RequestMaker //go:generate mockgen -destination=mock_chain_processor_test.go -package=$GOPACKAGE . ChainProcessor //go:generate mockgen -destination=mock_chain_sync_test.go -package $GOPACKAGE -source chain_sync.go . ChainSync,workHandler //go:generate mockgen -destination=mock_disjoint_block_set_test.go -package=$GOPACKAGE . DisjointBlockSet diff --git a/dot/sync/mocks_test.go b/dot/sync/mocks_test.go index 2a424f0c71..57a85eb954 100644 --- a/dot/sync/mocks_test.go +++ b/dot/sync/mocks_test.go @@ -7,7 +7,6 @@ package sync import ( reflect "reflect" - network "github.com/ChainSafe/gossamer/dot/network" peerset "github.com/ChainSafe/gossamer/dot/peerset" types "github.com/ChainSafe/gossamer/dot/types" common "github.com/ChainSafe/gossamer/lib/common" @@ -609,21 +608,6 @@ func (m *MockNetwork) EXPECT() *MockNetworkMockRecorder { return m.recorder } -// DoBlockRequest mocks base method. -func (m *MockNetwork) DoBlockRequest(arg0 peer.ID, arg1 *network.BlockRequestMessage) (*network.BlockResponseMessage, error) { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "DoBlockRequest", arg0, arg1) - ret0, _ := ret[0].(*network.BlockResponseMessage) - ret1, _ := ret[1].(error) - return ret0, ret1 -} - -// DoBlockRequest indicates an expected call of DoBlockRequest. -func (mr *MockNetworkMockRecorder) DoBlockRequest(arg0, arg1 interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DoBlockRequest", reflect.TypeOf((*MockNetwork)(nil).DoBlockRequest), arg0, arg1) -} - // Peers mocks base method. func (m *MockNetwork) Peers() []common.PeerInfo { m.ctrl.T.Helper() diff --git a/dot/sync/syncer.go b/dot/sync/syncer.go index cc268beb79..8cb53ef044 100644 --- a/dot/sync/syncer.go +++ b/dot/sync/syncer.go @@ -40,7 +40,7 @@ type Config struct { } // NewService returns a new *sync.Service -func NewService(cfg *Config) (*Service, error) { +func NewService(cfg *Config, blockReqRes network.RequestMaker) (*Service, error) { logger.Patch(log.SetLevel(cfg.LogLvl)) readyBlocks := newBlockQueue(maxResponseSize * 30) @@ -55,7 +55,7 @@ func NewService(cfg *Config) (*Service, error) { maxPeers: cfg.MaxPeers, slotDuration: cfg.SlotDuration, } - chainSync := newChainSync(csCfg) + chainSync := newChainSync(csCfg, blockReqRes) cpCfg := chainProcessorConfig{ readyBlocks: readyBlocks, diff --git a/dot/sync/syncer_integration_test.go b/dot/sync/syncer_integration_test.go index 76a6e816ed..9c33f79370 100644 --- a/dot/sync/syncer_integration_test.go +++ b/dot/sync/syncer_integration_test.go @@ -116,7 +116,8 @@ func newTestSyncer(t *testing.T) *Service { cfg.FinalityGadget = mockFinalityGadget cfg.Network = NewMockNetwork(ctrl) cfg.Telemetry = mockTelemetryClient - syncer, err := NewService(cfg) + mockReqRes := NewMockRequestMaker(ctrl) + syncer, err := NewService(cfg, mockReqRes) require.NoError(t, err) return syncer } diff --git a/dot/sync/syncer_test.go b/dot/sync/syncer_test.go index 89372bc3ba..750d8886e1 100644 --- a/dot/sync/syncer_test.go +++ b/dot/sync/syncer_test.go @@ -46,8 +46,9 @@ func TestNewService(t *testing.T) { ctrl := gomock.NewController(t) config := tt.cfgBuilder(ctrl) + mockReqRes := NewMockRequestMaker(ctrl) - got, err := NewService(config) + got, err := NewService(config, mockReqRes) if tt.err != nil { assert.EqualError(t, err, tt.err.Error()) } else {