Skip to content

Commit

Permalink
p2p: fix using custom channels (#6339)
Browse files Browse the repository at this point in the history
Co-authored-by: Callum Waters <cmwaters19@gmail.com>
  • Loading branch information
tnasu and cmwaters committed Jan 6, 2022
1 parent 3154738 commit eb6d5ba
Show file tree
Hide file tree
Showing 8 changed files with 73 additions and 14 deletions.
19 changes: 17 additions & 2 deletions node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,21 @@ func CustomReactors(reactors map[string]p2p.Reactor) Option {
n.sw.RemoveReactor(name, existingReactor)
}
n.sw.AddReactor(name, reactor)
// register the new channels to the nodeInfo
// NOTE: This is a bit messy now with the type casting but is
// cleaned up in the following version when NodeInfo is changed from
// and interface to a concrete type
if ni, ok := n.nodeInfo.(p2p.DefaultNodeInfo); ok {
for _, chDesc := range reactor.GetChannels() {
if !ni.HasChannel(chDesc.ID) {
ni.Channels = append(ni.Channels, chDesc.ID)
n.transport.AddChannel(chDesc.ID)
}
}
n.nodeInfo = ni
} else {
n.Logger.Error("Node info is not of type DefaultNodeInfo. Custom reactor channels can not be added.")
}
}
}
}
Expand Down Expand Up @@ -1265,7 +1280,7 @@ func makeNodeInfo(
txIndexer txindex.TxIndexer,
genDoc *types.GenesisDoc,
state sm.State,
) (p2p.NodeInfo, error) {
) (p2p.DefaultNodeInfo, error) {
txIndexerStatus := "on"
if _, ok := txIndexer.(*null.TxIndex); ok {
txIndexerStatus = "off"
Expand All @@ -1280,7 +1295,7 @@ func makeNodeInfo(
case "v2":
bcChannel = bcv2.BlockchainChannel
default:
return nil, fmt.Errorf("unknown fastsync version %s", config.FastSync.Version)
return p2p.DefaultNodeInfo{}, fmt.Errorf("unknown fastsync version %s", config.FastSync.Version)
}

nodeInfo := p2p.DefaultNodeInfo{
Expand Down
13 changes: 13 additions & 0 deletions node/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
tmrand "github.com/line/ostracon/libs/rand"
mempl "github.com/line/ostracon/mempool"
"github.com/line/ostracon/p2p"
"github.com/line/ostracon/p2p/conn"
p2pmock "github.com/line/ostracon/p2p/mock"
"github.com/line/ostracon/privval"
"github.com/line/ostracon/proxy"
Expand Down Expand Up @@ -388,6 +389,14 @@ func TestNodeNewNodeCustomReactors(t *testing.T) {
defer os.RemoveAll(config.RootDir)

cr := p2pmock.NewReactor()
cr.Channels = []*conn.ChannelDescriptor{
{
ID: byte(0x31),
Priority: 5,
SendQueueCapacity: 100,
RecvMessageCapacity: 100,
},
}
customBlockchainReactor := p2pmock.NewReactor()

nodeKey, err := p2p.LoadOrGenNodeKey(config.NodeKeyFile())
Expand Down Expand Up @@ -416,6 +425,10 @@ func TestNodeNewNodeCustomReactors(t *testing.T) {

assert.True(t, customBlockchainReactor.IsRunning())
assert.Equal(t, customBlockchainReactor, n.Switch().Reactor("BLOCKCHAIN"))

channels := n.NodeInfo().(p2p.DefaultNodeInfo).Channels
assert.Contains(t, channels, mempl.MempoolChannel)
assert.Contains(t, channels, cr.Channels[0].ID)
}

func TestNodeNewNodeTxIndexIndexer(t *testing.T) {
Expand Down
4 changes: 3 additions & 1 deletion p2p/mock/reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (

type Reactor struct {
p2p.BaseReactor

Channels []*conn.ChannelDescriptor
}

func NewReactor() *Reactor {
Expand All @@ -17,7 +19,7 @@ func NewReactor() *Reactor {
return r
}

func (r *Reactor) GetChannels() []*conn.ChannelDescriptor { return []*conn.ChannelDescriptor{} }
func (r *Reactor) GetChannels() []*conn.ChannelDescriptor { return r.Channels }
func (r *Reactor) AddPeer(peer p2p.Peer) {}
func (r *Reactor) RemovePeer(peer p2p.Peer, reason interface{}) {}
func (r *Reactor) Receive(chID byte, peer p2p.Peer, msgBytes []byte) {}
13 changes: 9 additions & 4 deletions p2p/node_info.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
package p2p

import (
"bytes"
"errors"
"fmt"
"reflect"

"github.com/line/ostracon/libs/bytes"
tmbytes "github.com/line/ostracon/libs/bytes"
tmstrings "github.com/line/ostracon/libs/strings"
tmp2p "github.com/line/ostracon/proto/ostracon/p2p"
"github.com/line/ostracon/version"
Expand Down Expand Up @@ -85,9 +86,9 @@ type DefaultNodeInfo struct {

// Check compatibility.
// Channels are HexBytes so easier to read as JSON
Network string `json:"network"` // network/chain ID
Version string `json:"version"` // major.minor.revision
Channels bytes.HexBytes `json:"channels"` // channels this node knows about
Network string `json:"network"` // network/chain ID
Version string `json:"version"` // major.minor.revision
Channels tmbytes.HexBytes `json:"channels"` // channels this node knows about

// ASCIIText fields
Moniker string `json:"moniker"` // arbitrary moniker
Expand Down Expand Up @@ -222,6 +223,10 @@ func (info DefaultNodeInfo) NetAddress() (*NetAddress, error) {
return NewNetAddressString(idAddr)
}

func (info DefaultNodeInfo) HasChannel(chID byte) bool {
return bytes.Contains(info.Channels, []byte{chID})
}

func (info DefaultNodeInfo) ToProto() *tmp2p.DefaultNodeInfo {

dni := new(tmp2p.DefaultNodeInfo)
Expand Down
3 changes: 2 additions & 1 deletion p2p/node_info_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,8 @@ func TestNodeInfoCompatible(t *testing.T) {
assert.NoError(t, ni1.CompatibleWith(ni2))

// add another channel; still compatible
ni2.Channels = []byte{newTestChannel, testCh}
ni2.Channels = append(ni2.Channels, newTestChannel)
assert.True(t, ni2.HasChannel(newTestChannel))
assert.NoError(t, ni1.CompatibleWith(ni2))

// wrong NodeInfo type is not compatible
Expand Down
7 changes: 1 addition & 6 deletions p2p/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,15 +135,10 @@ func newPeer(
onPeerError func(Peer, interface{}),
options ...PeerOption,
) *peer {
var channs = make([]byte, 0, len(chDescs))
for _, desc := range chDescs {
channs = append(channs, desc.ID)
}

p := &peer{
peerConn: pc,
nodeInfo: nodeInfo,
channels: channs,
channels: nodeInfo.(DefaultNodeInfo).Channels,
Data: cmap.NewCMap(),
metricsTicker: time.NewTicker(metricsTickerDuration),
metrics: NopMetrics(),
Expand Down
13 changes: 13 additions & 0 deletions p2p/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,19 @@ func (mt *MultiplexTransport) Listen(addr NetAddress) error {
return nil
}

// AddChannel registers a channel to nodeInfo.
// NOTE: NodeInfo must be of type DefaultNodeInfo else channels won't be updated
// This is a bit messy at the moment but is cleaned up in the following version
// when NodeInfo changes from an interface to a concrete type
func (mt *MultiplexTransport) AddChannel(chID byte) {
if ni, ok := mt.nodeInfo.(DefaultNodeInfo); ok {
if !ni.HasChannel(chID) {
ni.Channels = append(ni.Channels, chID)
}
mt.nodeInfo = ni
}
}

func (mt *MultiplexTransport) acceptPeers() {
for {
c, err := mt.listener.Accept()
Expand Down
15 changes: 15 additions & 0 deletions p2p/transport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -621,6 +621,21 @@ func TestTransportHandshake(t *testing.T) {
}
}

func TestTransportAddChannel(t *testing.T) {
mt := newMultiplexTransport(
emptyNodeInfo(),
NodeKey{
PrivKey: ed25519.GenPrivKey(),
},
)
testChannel := byte(0x01)

mt.AddChannel(testChannel)
if !mt.nodeInfo.(DefaultNodeInfo).HasChannel(testChannel) {
t.Errorf("missing added channel %v. Got %v", testChannel, mt.nodeInfo.(DefaultNodeInfo).Channels)
}
}

// create listener
func testSetupMultiplexTransport(t *testing.T) *MultiplexTransport {
var (
Expand Down

0 comments on commit eb6d5ba

Please sign in to comment.