Skip to content

Commit

Permalink
eth, eth/protocols/eth: added metadium message handlers
Browse files Browse the repository at this point in the history
  • Loading branch information
sadoci committed Oct 31, 2021
1 parent 117f254 commit 0121483
Show file tree
Hide file tree
Showing 6 changed files with 216 additions and 9 deletions.
5 changes: 2 additions & 3 deletions eth/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package eth

import (
"errors"
"fmt"
"math"
"math/big"
"sync"
Expand Down Expand Up @@ -538,7 +537,7 @@ func (h *handler) txBroadcastLoop() {

// RequestMinerStatus sends GetStatusExMsg to the given peer
func (h *handler) RequestMinerStatus(id enode.ID) error {
if p := h.peers.peer(fmt.Sprintf("%x", id[:8])); p != nil {
if p := h.peers.peer(id.String()); p != nil {
return p.RequestStatusEx()
} else {
return ethereum.NotFound
Expand All @@ -547,7 +546,7 @@ func (h *handler) RequestMinerStatus(id enode.ID) error {

// RequestEtcdAddMember is an internal protocol level command to add a node to the etcd cluster
func (h *handler) RequestEtcdAddMember(id enode.ID) error {
if p := h.peers.peer(fmt.Sprintf("%x", id[:8])); p != nil {
if p := h.peers.peer(id.String()); p != nil {
return p.RequestEtcdAddMember()
} else {
return ethereum.NotFound
Expand Down
14 changes: 14 additions & 0 deletions eth/protocols/eth/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,13 @@ var eth65 = map[uint64]msgHandler{
NewPooledTransactionHashesMsg: handleNewPooledTransactionHashes,
GetPooledTransactionsMsg: handleGetPooledTransactions,
PooledTransactionsMsg: handlePooledTransactions,
// metadium message handlers
GetPendingTxsMsg: handleGetPendingTxs,
GetStatusExMsg: handleGetStatusEx,
StatusExMsg: handleStatusEx,
EtcdAddMemberMsg: handleEtcdAddMember,
EtcdClusterMsg: handleEtcdCluster,
TransactionsExMsg: handleTransactionsEx,
}

var eth66 = map[uint64]msgHandler{
Expand All @@ -204,6 +211,13 @@ var eth66 = map[uint64]msgHandler{
ReceiptsMsg: handleReceipts66,
GetPooledTransactionsMsg: handleGetPooledTransactions66,
PooledTransactionsMsg: handlePooledTransactions66,
// metadium message handlers - not eth/66 yet
GetPendingTxsMsg: handleGetPendingTxs,
GetStatusExMsg: handleGetStatusEx,
StatusExMsg: handleStatusEx,
EtcdAddMemberMsg: handleEtcdAddMember,
EtcdClusterMsg: handleEtcdCluster,
TransactionsExMsg: handleTransactionsEx,
}

// handleMessage is invoked whenever an inbound message is received from a remote
Expand Down
28 changes: 28 additions & 0 deletions eth/protocols/eth/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
metaminer "github.com/ethereum/go-ethereum/metadium/miner"
"github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/trie"
)
Expand Down Expand Up @@ -477,6 +478,33 @@ func handleTransactions(backend Backend, msg Decoder, peer *Peer) error {
return backend.Handle(peer, &txs)
}

func handleTransactionsEx(backend Backend, msg Decoder, peer *Peer) error {
// Transactions arrived, make sure we have a valid and fresh chain to handle them
if !backend.AcceptTxs() {
return nil
}
// Transactions can be processed, parse all of them and deliver to the pool
var txexs TransactionsExPacket
if err := msg.Decode(&txexs); err != nil {
return fmt.Errorf("%w: message %v: %v", errDecode, msg, err)
}

go func() error {
signer := types.MakeSigner(backend.Chain().Config(), backend.Chain().CurrentBlock().Number())
txs := types.TxExs2Txs(signer, txexs, metaminer.IsPartner(peer.ID()))
for i, tx := range txs {
// Validate and mark the remote transaction
if tx == nil {
return fmt.Errorf("%w: transaction %d is nil", errDecode, i)
}
peer.markTransaction(tx.Hash())
}
txsp := TransactionsPacket(txs)
return backend.Handle(peer, &txsp)
}()
return nil
}

func handlePooledTransactions(backend Backend, msg Decoder, peer *Peer) error {
// Transactions arrived, make sure we have a valid and fresh chain to handle them
if !backend.AcceptTxs() {
Expand Down
78 changes: 78 additions & 0 deletions eth/protocols/eth/metadium_handlers.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package eth

import (
"fmt"

metaapi "github.com/ethereum/go-ethereum/metadium/api"
metaminer "github.com/ethereum/go-ethereum/metadium/miner"
)

func handleGetPendingTxs(backend Backend, msg Decoder, peer *Peer) error {
// not supported, just ignore it.
return nil
}

func handleGetStatusEx(backend Backend, msg Decoder, peer *Peer) error {
if !metaminer.AmPartner() || !metaminer.IsPartner(peer.ID()) {
return nil
}

go func() {
statusEx := metaapi.GetMinerStatus()
statusEx.LatestBlockTd = backend.Chain().GetTd(statusEx.LatestBlockHash,
statusEx.LatestBlockHeight.Uint64())
if err := peer.SendStatusEx(statusEx); err != nil {
// ignore the error
}
}()

return nil
}

func handleStatusEx(backend Backend, msg Decoder, peer *Peer) error {
if !metaminer.AmPartner() || !metaminer.IsPartner(peer.ID()) {
return nil
}
var status metaapi.MetadiumMinerStatus
if err := msg.Decode(&status); err != nil {
return fmt.Errorf("%w: message %v: %v", errDecode, msg, err)
}

go func() {
if _, td := peer.Head(); status.LatestBlockTd.Cmp(td) > 0 {
peer.SetHead(status.LatestBlockHash, status.LatestBlockTd)
}
metaapi.GotStatusEx(&status)
}()

return nil
}

func handleEtcdAddMember(backend Backend, msg Decoder, peer *Peer) error {
if !metaminer.AmPartner() || !metaminer.IsPartner(peer.ID()) {
return nil
}

go func() {
cluster, _ := metaapi.EtcdAddMember(peer.ID())
if err := peer.SendEtcdCluster(cluster); err != nil {
// ignore the error
}
}()

return nil
}

func handleEtcdCluster(backend Backend, msg Decoder, peer *Peer) error {
if !metaminer.AmPartner() || !metaminer.IsPartner(peer.ID()) {
return nil
}
var cluster string
if err := msg.Decode(&cluster); err != nil {
return fmt.Errorf("%w: message %v: %v", errDecode, msg, err)
}

go metaapi.GotEtcdCluster(cluster)

return nil
}
33 changes: 33 additions & 0 deletions eth/protocols/eth/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
mapset "github.com/deckarep/golang-set"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
metaapi "github.com/ethereum/go-ethereum/metadium/api"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/rlp"
)
Expand Down Expand Up @@ -402,6 +403,32 @@ func (p *Peer) ReplyReceiptsRLP(id uint64, receipts []rlp.RawValue) error {
})
}

// SendStatusEx sends this node's miner status
func (p *Peer) SendStatusEx(status *metaapi.MetadiumMinerStatus) error {
return p2p.Send(p.rw, StatusExMsg, status)
}

// ReplyStatusEx is the eth/66 response to GetStatusEx
func (p *Peer) ReplyStatusEx(id uint64, status *metaapi.MetadiumMinerStatus) error {
return p2p.Send(p.rw, StatusExMsg, StatusExPacket66{
RequestId: id,
StatusExPacket: StatusExPacket(*status),
})
}

// SendEtcdCluster sends this node's etcd cluster
func (p *Peer) SendEtcdCluster(cluster string) error {
return p2p.Send(p.rw, EtcdClusterMsg, cluster)
}

// ReplyEtcdCluster is the eth/66 response to EtcdAddMember
func (p *Peer) ReplyEtcdCluster(id uint64, cluster string) error {
return p2p.Send(p.rw, EtcdClusterMsg, EtcdClusterPacket66{
RequestId: id,
EtcdClusterPacket: EtcdClusterPacket(cluster),
})
}

// RequestOneHeader is a wrapper around the header query functions to fetch a
// single header. It is used solely by the fetcher.
func (p *Peer) RequestOneHeader(hash common.Hash) error {
Expand Down Expand Up @@ -545,11 +572,17 @@ func (p *Peer) RequestTxs(hashes []common.Hash) error {
// RequestStatusEx fetches extended status of the peer
func (p *Peer) RequestStatusEx() error {
p.Log().Debug("Fetching extended status")
id := rand.Uint64()

requestTracker.Track(p.id, p.version, GetStatusExMsg, StatusExMsg, id)
return p2p.Send(p.rw, GetStatusExMsg, common.Big1)
}

// RequestEtcdAddMember requests the peer to add this node to the cluster
func (p *Peer) RequestEtcdAddMember() error {
p.Log().Debug("Trying to join etcd network")
id := rand.Uint64()

requestTracker.Track(p.id, p.version, EtcdAddMemberMsg, EtcdClusterMsg, id)
return p2p.Send(p.rw, EtcdAddMemberMsg, common.Big1)
}
67 changes: 61 additions & 6 deletions eth/protocols/eth/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/forkid"
"github.com/ethereum/go-ethereum/core/types"
metaapi "github.com/ethereum/go-ethereum/metadium/api"
"github.com/ethereum/go-ethereum/rlp"
)

Expand Down Expand Up @@ -70,12 +71,12 @@ const (
PooledTransactionsMsg = 0x0a

// Added by Metadium, meta/64
GetPendingTxsMsg = 0x11
GetStatusExMsg = 0x12
StatusExMsg = 0x13
EtcdAddMemberMsg = 0x14
EtcdClusterMsg = 0x15
TxExMsg = 0x16
GetPendingTxsMsg = 0x11
GetStatusExMsg = 0x12
StatusExMsg = 0x13
EtcdAddMemberMsg = 0x14
EtcdClusterMsg = 0x15
TransactionsExMsg = 0x16
)

var (
Expand Down Expand Up @@ -128,6 +129,9 @@ func (p *NewBlockHashesPacket) Unpack() ([]common.Hash, []uint64) {
// TransactionsPacket is the network packet for broadcasting new transactions.
type TransactionsPacket []*types.Transaction

// TransactionsExPacket is the network packet for broadcasting new extended transactions.
type TransactionsExPacket []*types.TransactionEx

// GetBlockHeadersPacket represents a block header query.
type GetBlockHeadersPacket struct {
Origin HashOrNumber // Block from which to retrieve headers
Expand Down Expand Up @@ -329,6 +333,42 @@ type PooledTransactionsRLPPacket66 struct {
PooledTransactionsRLPPacket
}

// GetStatusExPacket is the network packet for GetStatusEx
type GetStatusExPacket int

// GetStatusExPacket66 is the eth/66 form of GetSTatusExPacket
type GetStatusExPacket66 struct {
RequestId uint64
GetStatusExPacket
}

// StatusExPacket is the network packet for extended status of a node
type StatusExPacket metaapi.MetadiumMinerStatus

// StatusExPacket66 is the eth/66 form of StatusExPacket
type StatusExPacket66 struct {
RequestId uint64
StatusExPacket
}

// EtcdAddMemberPacket is the netowkr packet for EtcdAddMember
type EtcdAddMemberPacket int

// EtcdAddMemberPacket66 is the eth/66 form of EtcdAddMember
type EtcdAddMemberPacket66 struct {
RequestId uint64
EtcdAddMemberPacket
}

// EtcdClusterPacket is the network packet for EtcdAddMember / EtcdCluster exchange
type EtcdClusterPacket string

// EtcdClusterPacket66 is the eth/66 form of EtcdClusterPacket
type EtcdClusterPacket66 struct {
RequestId uint64
EtcdClusterPacket
}

func (*StatusPacket) Name() string { return "Status" }
func (*StatusPacket) Kind() byte { return StatusMsg }

Expand All @@ -338,6 +378,9 @@ func (*NewBlockHashesPacket) Kind() byte { return NewBlockHashesMsg }
func (*TransactionsPacket) Name() string { return "Transactions" }
func (*TransactionsPacket) Kind() byte { return TransactionsMsg }

func (*TransactionsExPacket) Name() string { return "TransactionsEx" }
func (*TransactionsExPacket) Kind() byte { return TransactionsExMsg }

func (*GetBlockHeadersPacket) Name() string { return "GetBlockHeaders" }
func (*GetBlockHeadersPacket) Kind() byte { return GetBlockHeadersMsg }

Expand Down Expand Up @@ -373,3 +416,15 @@ func (*GetPooledTransactionsPacket) Kind() byte { return GetPooledTransactions

func (*PooledTransactionsPacket) Name() string { return "PooledTransactions" }
func (*PooledTransactionsPacket) Kind() byte { return PooledTransactionsMsg }

func (*GetStatusExPacket) Name() string { return "GetStatusEx" }
func (*GetStatusExPacket) Kind() byte { return GetStatusExMsg }

func (*StatusExPacket) Name() string { return "StatusEx" }
func (*StatusExPacket) Kind() byte { return StatusExMsg }

func (*EtcdAddMemberPacket) Name() string { return "EtcdAddMember" }
func (*EtcdAddMemberPacket) Kind() byte { return EtcdAddMemberMsg }

func (*EtcdClusterPacket) Name() string { return "EtcdCluster" }
func (*EtcdClusterPacket) Kind() byte { return EtcdClusterMsg }

0 comments on commit 0121483

Please sign in to comment.