diff --git a/node/node.go b/node/node.go index 1ec3fa5fc..20a832e6b 100644 --- a/node/node.go +++ b/node/node.go @@ -163,6 +163,21 @@ func CustomReactors(reactors map[string]p2p.Reactor) Option { n.sw.RemoveReactor(name, existingReactor) } n.sw.AddReactor(name, reactor) + // register the new channels to the nodeInfo + // NOTE: This is a bit messy now with the type casting but is + // cleaned up in the following version when NodeInfo is changed from + // and interface to a concrete type + if ni, ok := n.nodeInfo.(p2p.DefaultNodeInfo); ok { + for _, chDesc := range reactor.GetChannels() { + if !ni.HasChannel(chDesc.ID) { + ni.Channels = append(ni.Channels, chDesc.ID) + n.transport.AddChannel(chDesc.ID) + } + } + n.nodeInfo = ni + } else { + n.Logger.Error("Node info is not of type DefaultNodeInfo. Custom reactor channels can not be added.") + } } } } @@ -1265,7 +1280,7 @@ func makeNodeInfo( txIndexer txindex.TxIndexer, genDoc *types.GenesisDoc, state sm.State, -) (p2p.NodeInfo, error) { +) (p2p.DefaultNodeInfo, error) { txIndexerStatus := "on" if _, ok := txIndexer.(*null.TxIndex); ok { txIndexerStatus = "off" @@ -1280,7 +1295,7 @@ func makeNodeInfo( case "v2": bcChannel = bcv2.BlockchainChannel default: - return nil, fmt.Errorf("unknown fastsync version %s", config.FastSync.Version) + return p2p.DefaultNodeInfo{}, fmt.Errorf("unknown fastsync version %s", config.FastSync.Version) } nodeInfo := p2p.DefaultNodeInfo{ diff --git a/node/node_test.go b/node/node_test.go index 53902514e..49edadb32 100644 --- a/node/node_test.go +++ b/node/node_test.go @@ -23,6 +23,7 @@ import ( tmrand "github.com/line/ostracon/libs/rand" mempl "github.com/line/ostracon/mempool" "github.com/line/ostracon/p2p" + "github.com/line/ostracon/p2p/conn" p2pmock "github.com/line/ostracon/p2p/mock" "github.com/line/ostracon/privval" "github.com/line/ostracon/proxy" @@ -388,6 +389,14 @@ func TestNodeNewNodeCustomReactors(t *testing.T) { defer os.RemoveAll(config.RootDir) cr := p2pmock.NewReactor() + cr.Channels = []*conn.ChannelDescriptor{ + { + ID: byte(0x31), + Priority: 5, + SendQueueCapacity: 100, + RecvMessageCapacity: 100, + }, + } customBlockchainReactor := p2pmock.NewReactor() nodeKey, err := p2p.LoadOrGenNodeKey(config.NodeKeyFile()) @@ -416,6 +425,10 @@ func TestNodeNewNodeCustomReactors(t *testing.T) { assert.True(t, customBlockchainReactor.IsRunning()) assert.Equal(t, customBlockchainReactor, n.Switch().Reactor("BLOCKCHAIN")) + + channels := n.NodeInfo().(p2p.DefaultNodeInfo).Channels + assert.Contains(t, channels, mempl.MempoolChannel) + assert.Contains(t, channels, cr.Channels[0].ID) } func TestNodeNewNodeTxIndexIndexer(t *testing.T) { diff --git a/p2p/mock/reactor.go b/p2p/mock/reactor.go index cb80a924a..06aed901f 100644 --- a/p2p/mock/reactor.go +++ b/p2p/mock/reactor.go @@ -8,6 +8,8 @@ import ( type Reactor struct { p2p.BaseReactor + + Channels []*conn.ChannelDescriptor } func NewReactor() *Reactor { @@ -17,7 +19,7 @@ func NewReactor() *Reactor { return r } -func (r *Reactor) GetChannels() []*conn.ChannelDescriptor { return []*conn.ChannelDescriptor{} } +func (r *Reactor) GetChannels() []*conn.ChannelDescriptor { return r.Channels } func (r *Reactor) AddPeer(peer p2p.Peer) {} func (r *Reactor) RemovePeer(peer p2p.Peer, reason interface{}) {} func (r *Reactor) Receive(chID byte, peer p2p.Peer, msgBytes []byte) {} diff --git a/p2p/node_info.go b/p2p/node_info.go index 3ea41fcaf..3f9183e9d 100644 --- a/p2p/node_info.go +++ b/p2p/node_info.go @@ -1,11 +1,12 @@ package p2p import ( + "bytes" "errors" "fmt" "reflect" - "github.com/line/ostracon/libs/bytes" + tmbytes "github.com/line/ostracon/libs/bytes" tmstrings "github.com/line/ostracon/libs/strings" tmp2p "github.com/line/ostracon/proto/ostracon/p2p" "github.com/line/ostracon/version" @@ -85,9 +86,9 @@ type DefaultNodeInfo struct { // Check compatibility. // Channels are HexBytes so easier to read as JSON - Network string `json:"network"` // network/chain ID - Version string `json:"version"` // major.minor.revision - Channels bytes.HexBytes `json:"channels"` // channels this node knows about + Network string `json:"network"` // network/chain ID + Version string `json:"version"` // major.minor.revision + Channels tmbytes.HexBytes `json:"channels"` // channels this node knows about // ASCIIText fields Moniker string `json:"moniker"` // arbitrary moniker @@ -222,6 +223,10 @@ func (info DefaultNodeInfo) NetAddress() (*NetAddress, error) { return NewNetAddressString(idAddr) } +func (info DefaultNodeInfo) HasChannel(chID byte) bool { + return bytes.Contains(info.Channels, []byte{chID}) +} + func (info DefaultNodeInfo) ToProto() *tmp2p.DefaultNodeInfo { dni := new(tmp2p.DefaultNodeInfo) diff --git a/p2p/node_info_test.go b/p2p/node_info_test.go index 69df09322..ed0ec44af 100644 --- a/p2p/node_info_test.go +++ b/p2p/node_info_test.go @@ -102,7 +102,8 @@ func TestNodeInfoCompatible(t *testing.T) { assert.NoError(t, ni1.CompatibleWith(ni2)) // add another channel; still compatible - ni2.Channels = []byte{newTestChannel, testCh} + ni2.Channels = append(ni2.Channels, newTestChannel) + assert.True(t, ni2.HasChannel(newTestChannel)) assert.NoError(t, ni1.CompatibleWith(ni2)) // wrong NodeInfo type is not compatible diff --git a/p2p/peer.go b/p2p/peer.go index 5a6b4480c..bacf7cddf 100644 --- a/p2p/peer.go +++ b/p2p/peer.go @@ -135,15 +135,10 @@ func newPeer( onPeerError func(Peer, interface{}), options ...PeerOption, ) *peer { - var channs = make([]byte, 0, len(chDescs)) - for _, desc := range chDescs { - channs = append(channs, desc.ID) - } - p := &peer{ peerConn: pc, nodeInfo: nodeInfo, - channels: channs, + channels: nodeInfo.(DefaultNodeInfo).Channels, Data: cmap.NewCMap(), metricsTicker: time.NewTicker(metricsTickerDuration), metrics: NopMetrics(), diff --git a/p2p/transport.go b/p2p/transport.go index a9e735f1d..7c2c0493d 100644 --- a/p2p/transport.go +++ b/p2p/transport.go @@ -261,6 +261,19 @@ func (mt *MultiplexTransport) Listen(addr NetAddress) error { return nil } +// AddChannel registers a channel to nodeInfo. +// NOTE: NodeInfo must be of type DefaultNodeInfo else channels won't be updated +// This is a bit messy at the moment but is cleaned up in the following version +// when NodeInfo changes from an interface to a concrete type +func (mt *MultiplexTransport) AddChannel(chID byte) { + if ni, ok := mt.nodeInfo.(DefaultNodeInfo); ok { + if !ni.HasChannel(chID) { + ni.Channels = append(ni.Channels, chID) + } + mt.nodeInfo = ni + } +} + func (mt *MultiplexTransport) acceptPeers() { for { c, err := mt.listener.Accept() diff --git a/p2p/transport_test.go b/p2p/transport_test.go index 4efd78ba8..789ed81c9 100644 --- a/p2p/transport_test.go +++ b/p2p/transport_test.go @@ -621,6 +621,21 @@ func TestTransportHandshake(t *testing.T) { } } +func TestTransportAddChannel(t *testing.T) { + mt := newMultiplexTransport( + emptyNodeInfo(), + NodeKey{ + PrivKey: ed25519.GenPrivKey(), + }, + ) + testChannel := byte(0x01) + + mt.AddChannel(testChannel) + if !mt.nodeInfo.(DefaultNodeInfo).HasChannel(testChannel) { + t.Errorf("missing added channel %v. Got %v", testChannel, mt.nodeInfo.(DefaultNodeInfo).Channels) + } +} + // create listener func testSetupMultiplexTransport(t *testing.T) *MultiplexTransport { var (