From c401626b22f5f33b31b0b92b36cdc07ce2682f8c Mon Sep 17 00:00:00 2001 From: EclesioMeloJunior Date: Tue, 16 Aug 2022 14:30:34 -0400 Subject: [PATCH] feat: implements before best block by N voting rule --- dot/network/notifications.go | 8 +- lib/grandpa/grandpa.go | 210 ++++++++++++++--------------- lib/grandpa/mocks_generate_test.go | 2 +- lib/grandpa/network.go | 44 +++--- lib/grandpa/network_test.go | 127 +++++++---------- lib/grandpa/round_test.go | 202 +-------------------------- lib/grandpa/vote_message.go | 1 - 7 files changed, 173 insertions(+), 421 deletions(-) diff --git a/dot/network/notifications.go b/dot/network/notifications.go index e737d372bfa..97179b3f3f6 100644 --- a/dot/network/notifications.go +++ b/dot/network/notifications.go @@ -139,13 +139,7 @@ func (s *Service) createNotificationsMessageHandler( return fmt.Errorf("could not check if message was seen before: %w", err) } - // sometimes substrate can send prevote/precommit messages already seen by - // gossamer but those messages are related to another round, for example to - // finalize a block Y substrate sends prevotes and precommits to Y in round r - // and in the round r + 1 it is possible to receive prevotes for block Y again, this - // is not a problem and we can improve the gossamer behaviour implementing Polite GRANDPA #2505 - _, isConsensusMsg := msg.(*ConsensusMessage) - if hasSeen && !isConsensusMsg { + if hasSeen { // report peer if we get duplicate gossip message. s.host.cm.peerSetHandler.ReportPeer(peerset.ReputationChange{ Value: peerset.DuplicateGossipValue, diff --git a/lib/grandpa/grandpa.go b/lib/grandpa/grandpa.go index 778489ecbba..4e4d290525c 100644 --- a/lib/grandpa/grandpa.go +++ b/lib/grandpa/grandpa.go @@ -74,8 +74,9 @@ type Service struct { bestFinalCandidate map[uint64]*Vote // map of round number -> best final candidate // channels for communication with other services - in chan *networkVoteMessage // only used to receive *VoteMessage - finalisedCh chan *types.FinalisationInfo + in chan *networkVoteMessage // only used to receive *VoteMessage + finalisedCh chan *types.FinalisationInfo + neighbourMessage *NeighbourMessage // cached neighbour message telemetry telemetry.Client } @@ -199,7 +200,7 @@ func (s *Service) Start() error { } }() - go s.notifyNeighbor(neighbourMessageInterval) + go s.sendNeighbourMessage(neighbourMessageInterval) return nil } @@ -343,13 +344,6 @@ func (s *Service) initiateRound() error { s.precommits = new(sync.Map) s.pvEquivocations = make(map[ed25519.PublicKeyBytes][]*SignedVote) s.pcEquivocations = make(map[ed25519.PublicKeyBytes][]*SignedVote) - - s.sendNeighborMessage(&NeighbourMessage{ - Version: 1, - Round: s.state.round, - SetID: s.state.setID, - Number: uint32(s.head.Number), - }) s.roundLock.Unlock() best, err := s.blockState.BestBlockHeader() @@ -484,6 +478,7 @@ func (s *Service) playGrandpaRound() error { return err } + logger.Debug("receiving pre-vote messages...") go s.receiveVoteMessages(ctx) time.Sleep(s.interval) @@ -506,15 +501,25 @@ func (s *Service) playGrandpaRound() error { s.prevotes.Store(s.publicKeyBytes(), spv) } - prevoteRoundComplete := make(chan struct{}) - go s.sendPrevoteMessage(vm, prevoteRoundComplete) + logger.Debugf("sending pre-vote message %s...", pv) + roundComplete := make(chan struct{}) + // roundComplete is a signal channel which is closed when the round completes + // (will receive the default value of channel's type), so we don't need to + // explicitly send a value. + defer close(roundComplete) + + // continue to send prevote messages until round is done + go s.sendVoteMessage(prevote, vm, roundComplete) + + logger.Debug("receiving pre-commit messages...") + // through goroutine s.receiveVoteMessages(ctx) + time.Sleep(s.interval) if s.paused.Load().(bool) { return ErrServicePaused } - // determine and broadcast pre-commit only after seen prevote messages - <-prevoteRoundComplete + // broadcast pre-commit pc, err := s.determinePreCommit() if err != nil { return err @@ -526,13 +531,12 @@ func (s *Service) playGrandpaRound() error { } s.precommits.Store(s.publicKeyBytes(), spc) + logger.Debugf("sending pre-commit message %s...", pc) // continue to send precommit messages until round is done - precommitDoneCh := make(chan struct{}) - go s.sendPrecommitMessage(pcm, precommitDoneCh) + go s.sendVoteMessage(precommit, pcm, roundComplete) - err = s.attemptToFinalize(precommitDoneCh) - if err != nil { + if err = s.attemptToFinalize(); err != nil { logger.Errorf("failed to finalise: %s", err) return err } @@ -541,14 +545,10 @@ func (s *Service) playGrandpaRound() error { return nil } -func (s *Service) sendPrecommitMessage(vm *VoteMessage, done <-chan struct{}) { - logger.Debugf("sending pre-commit message %s...", vm.Message) - +func (s *Service) sendVoteMessage(stage Subround, msg *VoteMessage, roundComplete <-chan struct{}) { ticker := time.NewTicker(s.interval * 4) defer ticker.Stop() - threshold := s.state.threshold() - // Though this looks like we are sending messages multiple times, // caching would make sure that they are being sent only once. for { @@ -556,62 +556,24 @@ func (s *Service) sendPrecommitMessage(vm *VoteMessage, done <-chan struct{}) { return } - if err := s.sendMessage(vm); err != nil { - logger.Warnf("could not send message for stage %s: %s", precommit, err) + if err := s.sendMessage(msg); err != nil { + logger.Warnf("could not send message for stage %s: %s", stage, err) } else { - logger.Warnf("sent vote message for stage %s: %s", precommit, vm.Message) - } - - if uint64(s.lenVotes(precommit)) >= threshold { - <-done - return + logger.Tracef("sent vote message for stage %s: %s", stage, msg.Message) } select { - case <-ticker.C: - case <-done: - return - } - } -} - -func (s *Service) sendPrevoteMessage(vm *VoteMessage, done chan<- struct{}) { - defer close(done) - logger.Debugf("sending pre-vote message %s...", vm) - - ticker := time.NewTicker(s.interval * 4) - defer ticker.Stop() - - threshold := s.state.threshold() - - // Though this looks like we are sending messages multiple times, - // caching would make sure that they are being sent only once. - for { - if err := s.sendMessage(vm); err != nil { - logger.Warnf("could not send message for stage %s: %s", prevote, err) - } else { - logger.Warnf("sent vote message for stage %s: %s", prevote, vm.Message) - } - - if s.paused.Load().(bool) || uint64(s.lenVotes(prevote)) >= threshold { + case <-roundComplete: return + case <-ticker.C: } - - <-ticker.C } } // attemptToFinalize loops until the round is finalisable -func (s *Service) attemptToFinalize(precommitDoneCh chan<- struct{}) error { - defer close(precommitDoneCh) - +func (s *Service) attemptToFinalize() error { ticker := time.NewTicker(s.interval / 100) - var ( - bestFinalCandidate *types.GrandpaVote - precommitCount uint64 - ) - for { select { case <-s.ctx.Done(): @@ -642,54 +604,50 @@ func (s *Service) attemptToFinalize(precommitDoneCh chan<- struct{}) error { return nil // a block was finalised, seems like we missed some messages } - var err error - bestFinalCandidate, err = s.getBestFinalCandidate() + bfc, err := s.getBestFinalCandidate() if err != nil { return err } - precommitCount, err = s.getTotalVotesForBlock(bestFinalCandidate.Hash, precommit) + pc, err := s.getTotalVotesForBlock(bfc.Hash, precommit) if err != nil { return err } - // once we reach the threshold we should stop sending precommit messages to other peers - if bestFinalCandidate.Number < uint32(s.head.Number) || precommitCount <= s.state.threshold() { + if bfc.Number < uint32(s.head.Number) || pc <= s.state.threshold() { continue } - break - } - - if err := s.finalise(); err != nil { - return err - } + if err = s.finalise(); err != nil { + return err + } - // if we haven't received a finalisation message for this block yet, broadcast a finalisation message - votes := s.getDirectVotes(precommit) - logger.Debugf("block was finalised for round %d and set id %d. "+ - "Head hash is %s, %d direct votes for bfc and %d total votes for bfc", - s.state.round, s.state.setID, s.head.Hash(), votes[*bestFinalCandidate], precommitCount) + // if we haven't received a finalisation message for this block yet, broadcast a finalisation message + votes := s.getDirectVotes(precommit) + logger.Debugf("block was finalised for round %d and set id %d. "+ + "Head hash is %s, %d direct votes for bfc and %d total votes for bfc", + s.state.round, s.state.setID, s.head.Hash(), votes[*bfc], pc) - cm, err := s.newCommitMessage(s.head, s.state.round) - if err != nil { - return err - } + cm, err := s.newCommitMessage(s.head, s.state.round) + if err != nil { + return err + } - msg, err := cm.ToConsensusMessage() - if err != nil { - return err - } + msg, err := cm.ToConsensusMessage() + if err != nil { + return err + } - logger.Debugf("sending CommitMessage: %v", cm) - s.network.GossipMessage(msg) + logger.Debugf("sending CommitMessage: %v", cm) + s.network.GossipMessage(msg) - s.telemetry.SendMessage(telemetry.NewAfgFinalizedBlocksUpTo( - s.head.Hash(), - fmt.Sprint(s.head.Number), - )) + s.telemetry.SendMessage(telemetry.NewAfgFinalizedBlocksUpTo( + s.head.Hash(), + fmt.Sprint(s.head.Number), + )) - return nil + return nil + } } func (s *Service) loadVote(key ed25519.PublicKeyBytes, stage Subround) (*SignedVote, bool) { @@ -721,6 +679,44 @@ func (s *Service) deleteVote(key ed25519.PublicKeyBytes, stage Subround) { } } +// implements `BeforeBestBlockBy` a custom voting rule that guarantees that our vote is always +// behind the best block by at least N blocks, unless the base number is < N blocks behind the +// best, in which case it votes for the base. +// (https://github.com/paritytech/substrate/blob/1fd71c7845d6c28c532795ec79106d959dd1fe30/client/finality-grandpa/src/voting_rule.rs#L92) +func (s *Service) determineBestHeaderToPrevote(finalizedHeader, bestBlockHeader *types.Header) ( + headerToPrevote *types.Header, err error) { + gensisHash := s.blockState.GenesisHash() + isGenesisHash := gensisHash.Equal(bestBlockHeader.Hash()) + if isGenesisHash || finalizedHeader.Hash().Equal(bestBlockHeader.Hash()) { + return bestBlockHeader, nil + } + + isDescendant, err := s.blockState.IsDescendantOf(finalizedHeader.Hash(), bestBlockHeader.Hash()) + if err != nil { + return headerToPrevote, fmt.Errorf("determine ancestry: %w", err) + } + + if !isDescendant { + return headerToPrevote, fmt.Errorf("%w: %s is not ancestor of %s", + blocktree.ErrNoCommonAncestor, bestBlockHeader.Hash().Short(), bestBlockHeader.Hash().Short()) + } + + headerToPrevote = bestBlockHeader + for i := 0; i < 2; i++ { + headerToPrevote, err := s.blockState.GetHeader(headerToPrevote.ParentHash) + if err != nil { + return headerToPrevote, fmt.Errorf("get parent header: %w", err) + } + + isGenesisHash := gensisHash.Equal(headerToPrevote.Hash()) + if finalizedHeader.Hash().Equal(headerToPrevote.Hash()) || isGenesisHash { + break + } + } + + return headerToPrevote, nil +} + // determinePreVote determines what block is our pre-voted block for the current round func (s *Service) determinePreVote() (*Vote, error) { var vote *Vote @@ -730,19 +726,15 @@ func (s *Service) determinePreVote() (*Vote, error) { return nil, fmt.Errorf("cannot get best block header: %w", err) } - // if we receive a vote message from the primary with a - // block that's greater than or equal to the current pre-voted block - // and greater than the best final candidate from the last round, we choose that. - // otherwise, we simply choose the head of our chain. - primary := s.derivePrimary() - prm, has := s.loadVote(primary.PublicKeyBytes(), prevote) - if has && prm.Vote.Number >= uint32(s.head.Number) { - vote = &prm.Vote - } else { - vote = NewVoteFromHeader(bestBlockHeader) + headerToPrevote, err := s.determineBestHeaderToPrevote(s.head, bestBlockHeader) + if err != nil { + return nil, fmt.Errorf("determine best hash to prevote: %w", err) } - nextChange, err := s.grandpaState.NextGrandpaAuthorityChange(bestBlockHeader.Hash(), bestBlockHeader.Number) + vote = NewVoteFromHeader(headerToPrevote) + nextChange, err := s.grandpaState.NextGrandpaAuthorityChange( + headerToPrevote.Hash(), headerToPrevote.Number) + if errors.Is(err, state.ErrNoNextAuthorityChange) { return vote, nil } else if err != nil { diff --git a/lib/grandpa/mocks_generate_test.go b/lib/grandpa/mocks_generate_test.go index 5ffc6451cb7..84d6188cefe 100644 --- a/lib/grandpa/mocks_generate_test.go +++ b/lib/grandpa/mocks_generate_test.go @@ -3,4 +3,4 @@ package grandpa -//go:generate mockgen -destination=mocks_test.go -package $GOPACKAGE . BlockState,GrandpaState,Network +//go:generate mockgen -destination=mocks_test.go -package $GOPACKAGE . BlockState,GrandpaState diff --git a/lib/grandpa/network.go b/lib/grandpa/network.go index c4ae4019ca7..84d40e2df11 100644 --- a/lib/grandpa/network.go +++ b/lib/grandpa/network.go @@ -174,43 +174,39 @@ func (s *Service) sendMessage(msg GrandpaMessage) error { return nil } -// notifyNeighbor will gossip a NeighbourMessage every 5 minutes, however we reset the ticker -// whenever a finalisation occur meaning that a neighbour message already was sent by s.initiateRound() -func (s *Service) notifyNeighbor(interval time.Duration) { +func (s *Service) sendNeighbourMessage(interval time.Duration) { t := time.NewTicker(interval) defer t.Stop() - for { select { case <-s.ctx.Done(): return case <-t.C: - s.roundLock.Lock() - nm := &NeighbourMessage{ - Version: 1, - Round: s.state.round, - SetID: s.state.setID, - Number: uint32(s.head.Number), + if s.neighbourMessage == nil { + continue + } + case info, ok := <-s.finalisedCh: + if !ok { + // channel was closed + return } - s.roundLock.Unlock() - s.sendNeighborMessage(nm) - case <-s.finalisedCh: - t = time.NewTicker(interval) + s.neighbourMessage = &NeighbourMessage{ + Version: 1, + Round: info.Round, + SetID: info.SetID, + Number: uint32(info.Header.Number), + } } - } -} -func (s *Service) sendNeighborMessage(nm *NeighbourMessage) { - logger.Tracef("send neighbour message: %v", nm) + cm, err := s.neighbourMessage.ToConsensusMessage() + if err != nil { + logger.Warnf("failed to convert NeighbourMessage to network message: %s", err) + continue + } - cm, err := nm.ToConsensusMessage() - if err != nil { - logger.Warnf("failed to convert NeighbourMessage to network message: %s", err) - return + s.network.GossipMessage(cm) } - - s.network.GossipMessage(cm) } // decodeMessage decodes a network-level consensus message into a GRANDPA VoteMessage or CommitMessage diff --git a/lib/grandpa/network_test.go b/lib/grandpa/network_test.go index 18caa23197d..4684cbf6d84 100644 --- a/lib/grandpa/network_test.go +++ b/lib/grandpa/network_test.go @@ -4,17 +4,13 @@ package grandpa import ( - "context" - "sync" "testing" "time" - "github.com/ChainSafe/gossamer/dot/network" "github.com/ChainSafe/gossamer/dot/types" "github.com/golang/mock/gomock" "github.com/libp2p/go-libp2p-core/peer" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -82,82 +78,57 @@ func TestHandleNetworkMessage(t *testing.T) { require.False(t, propagate) } -func TestNotifyNeighbor(t *testing.T) { - const interval = 2 * time.Second - - tests := map[string]struct { - notifyInterval time.Duration - finalizeBlock bool - finalizeBlockAfter time.Duration - expectWithin time.Duration - }{ - "should_send_neighbour_message": { - expectWithin: 2 * time.Second, - notifyInterval: interval, - }, - "should_reset_timer_and_then_send_neighbour_message": { - finalizeBlock: true, - finalizeBlockAfter: 1 * time.Second, - notifyInterval: interval, - expectWithin: 3 * time.Second, +func TestSendNeighbourMessage(t *testing.T) { + gs, st := newTestService(t) + go gs.sendNeighbourMessage(time.Second) + + digest := types.NewDigest() + prd, err := types.NewBabeSecondaryPlainPreDigest(0, 1).ToPreRuntimeDigest() + require.NoError(t, err) + err = digest.Add(*prd) + require.NoError(t, err) + block := &types.Block{ + Header: types.Header{ + ParentHash: st.Block.GenesisHash(), + Number: 1, + Digest: digest, }, + Body: types.Body{}, + } + + err = st.Block.AddBlock(block) + require.NoError(t, err) + + hash := block.Header.Hash() + round := uint64(7) + setID := uint64(33) + err = st.Block.SetFinalisedHash(hash, round, setID) + require.NoError(t, err) + + expected := &NeighbourMessage{ + Version: 1, + SetID: setID, + Round: round, + Number: 1, } - for tname, tt := range tests { - tt := tt - t.Run(tname, func(t *testing.T) { - ctrl := gomock.NewController(t) - mockedNet := NewMockNetwork(ctrl) - mockedCh := make(chan *types.FinalisationInfo) - - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - s := Service{ - ctx: ctx, - state: &State{ - round: 1, - setID: 0, - }, - finalisedCh: mockedCh, - head: &types.Header{Number: 0}, - network: mockedNet, - } - - expectedNeighborMessage := &NeighbourMessage{ - Version: 1, - Round: s.state.round, - SetID: s.state.setID, - Number: uint32(s.head.Number), - } - cm, err := expectedNeighborMessage.ToConsensusMessage() - require.NoError(t, err) - - timecheck := new(time.Time) - - wg := new(sync.WaitGroup) - wg.Add(1) - - ensureGossipMessageCalledRightTime := func(_ network.NotificationsMessage) { - defer wg.Done() - const roundAround = 1 * time.Second - - calledWithin := time.Since(*timecheck) - calledWithin = calledWithin.Round(roundAround) // avoid decimal points - assert.Equal(t, tt.expectWithin, calledWithin) - } - - mockedNet.EXPECT().GossipMessage(cm).Times(1).DoAndReturn(ensureGossipMessageCalledRightTime) - - *timecheck = time.Now() - go s.notifyNeighbor(tt.notifyInterval) - - if tt.finalizeBlock { - <-time.After(tt.finalizeBlockAfter) - mockedCh <- &types.FinalisationInfo{} - } - - wg.Wait() - }) + select { + case <-time.After(time.Second): + t.Fatal("did not send message") + case msg := <-gs.network.(*testNetwork).out: + nm, ok := msg.(*NeighbourMessage) + require.True(t, ok) + require.Equal(t, expected, nm) + } + + require.Equal(t, expected, gs.neighbourMessage) + + select { + case <-time.After(time.Second * 2): + t.Fatal("did not send message") + case msg := <-gs.network.(*testNetwork).out: + nm, ok := msg.(*NeighbourMessage) + require.True(t, ok) + require.Equal(t, expected, nm) } } diff --git a/lib/grandpa/round_test.go b/lib/grandpa/round_test.go index fd16f7d043b..e8493e4f8b0 100644 --- a/lib/grandpa/round_test.go +++ b/lib/grandpa/round_test.go @@ -5,17 +5,14 @@ package grandpa import ( //"fmt" - "context" - "fmt" + "math/rand" "sync" - "sync/atomic" "testing" "time" "github.com/ChainSafe/gossamer/dot/network" "github.com/ChainSafe/gossamer/dot/state" - "github.com/ChainSafe/gossamer/dot/telemetry" "github.com/ChainSafe/gossamer/dot/types" "github.com/ChainSafe/gossamer/internal/log" "github.com/ChainSafe/gossamer/lib/common" @@ -24,7 +21,6 @@ import ( "github.com/golang/mock/gomock" "github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p-core/protocol" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -610,199 +606,3 @@ func TestPlayGrandpaRound_MultipleRounds(t *testing.T) { } } - -func TestSendingVotesInRightStage(t *testing.T) { - ed25519Keyring, err := keystore.NewEd25519Keyring() - require.NoError(t, err) - - currentAuthority := ed25519Keyring.Bob().(*ed25519.Keypair) - votersPublicKeys := []*ed25519.PublicKey{ - ed25519Keyring.Alice().(*ed25519.Keypair).Public().(*ed25519.PublicKey), - currentAuthority.Public().(*ed25519.PublicKey), - ed25519Keyring.Charlie().(*ed25519.Keypair).Public().(*ed25519.PublicKey), - ed25519Keyring.Dave().(*ed25519.Keypair).Public().(*ed25519.PublicKey), - } - - grandpaVoters := make([]types.GrandpaVoter, len(votersPublicKeys)) - for idx, pk := range votersPublicKeys { - grandpaVoters[idx] = types.GrandpaVoter{ - Key: *pk, - } - } - - ctrl := gomock.NewController(t) - mockedGrandpaState := NewMockGrandpaState(ctrl) - mockedGrandpaState.EXPECT(). - NextGrandpaAuthorityChange(testGenesisHeader.Hash(), testGenesisHeader.Number). - Return(uint(0), state.ErrNoNextAuthorityChange). - Times(2) - mockedGrandpaState.EXPECT(). - SetPrevotes(uint64(0), uint64(0), gomock.AssignableToTypeOf([]types.GrandpaSignedVote{})). - Return(nil). - Times(1) - mockedGrandpaState.EXPECT(). - SetPrecommits(uint64(0), uint64(0), gomock.AssignableToTypeOf([]types.GrandpaSignedVote{})). - Return(nil). - Times(1) - mockedGrandpaState.EXPECT(). - SetLatestRound(uint64(0)). - Return(nil). - Times(1) - mockedGrandpaState.EXPECT(). - GetPrecommits(uint64(0), uint64(0)). - Return([]types.GrandpaSignedVote{}, nil). - Times(1) - - mockedState := NewMockBlockState(ctrl) - mockedState.EXPECT(). - GenesisHash(). - Return(testGenesisHeader.Hash()). - Times(2) - // since the next 3 function has been called based on the amount of time we wait until we get enough - // prevotes is hard to define a corret amount of times this function shoud be called - mockedState.EXPECT(). - HasFinalisedBlock(uint64(0), uint64(0)). - Return(false, nil). - AnyTimes() - mockedState.EXPECT(). - GetHighestRoundAndSetID(). - Return(uint64(0), uint64(0), nil). - AnyTimes() - mockedState.EXPECT(). - IsDescendantOf(testGenesisHeader.Hash(), testGenesisHeader.Hash()). - Return(true, nil). - AnyTimes() - mockedState.EXPECT(). - BestBlockHeader(). - Return(testGenesisHeader, nil). - Times(2) - // we cannot assert the bytes since some votes is defined while playing grandpa round - mockedState.EXPECT(). - SetJustification(testGenesisHeader.Hash(), gomock.AssignableToTypeOf([]byte{})). - Return(nil). - Times(1) - mockedState.EXPECT(). - GetHeader(testGenesisHeader.Hash()). - Return(testGenesisHeader, nil). - Times(1) - mockedState.EXPECT(). - SetFinalisedHash(testGenesisHeader.Hash(), uint64(0), uint64(0)). - Return(nil). - Times(1) - - mockedTelemetry := NewMockClient(ctrl) - expectedFinalizedTelemetryMessage := telemetry.NewAfgFinalizedBlocksUpTo( - testGenesisHeader.Hash(), - fmt.Sprint(testGenesisHeader.Number), - ) - mockedTelemetry.EXPECT(). - SendMessage(expectedFinalizedTelemetryMessage). - Times(1) - - mockedNet := NewMockNetwork(ctrl) - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - // gossamer gossip a prevote/precommit message and then waits `subroundInterval` * 4 - // to issue another prevote/precommit message - const subroundInterval = 100 * time.Millisecond - grandpa := &Service{ - ctx: ctx, - cancel: cancel, - paused: atomic.Value{}, - network: mockedNet, - blockState: mockedState, - grandpaState: mockedGrandpaState, - in: make(chan *networkVoteMessage), - interval: subroundInterval, - state: &State{ - round: 0, - setID: 0, - voters: grandpaVoters, - }, - head: testGenesisHeader, - authority: true, - keypair: currentAuthority, - prevotes: new(sync.Map), - precommits: new(sync.Map), - preVotedBlock: make(map[uint64]*Vote), - bestFinalCandidate: make(map[uint64]*Vote), - telemetry: mockedTelemetry, - } - grandpa.paused.Store(false) - - ed25519Keyring.Bob().(*ed25519.Keypair).Public() - persistVote := func(grandpaSrvc *Service, pk ed25519.PublicKey, stage Subround) { - // dummy vote, the goal is ensure we stop sending - // messages when we reach a enough amount of prevotes - vote := NewVote(testGenesisHeader.Hash(), uint32(testGenesisHeader.Number)) - signedVote := &SignedVote{ - Vote: *vote, - Signature: [64]byte{}, - AuthorityID: pk.AsBytes(), - } - - var stageMap *sync.Map - switch stage { - case precommit: - stageMap = grandpaSrvc.precommits - case prevote: - stageMap = grandpaSrvc.prevotes - } - - stageMap.Store(pk.AsBytes(), signedVote) - } - - go func() { - expectedVote := NewVote(testGenesisHeader.Hash(), uint32(testGenesisHeader.Number)) - _, expectedPrevoteMessage, err := grandpa.createSignedVoteAndVoteMessage(expectedVote, prevote) - require.NoError(t, err) - - pv, err := expectedPrevoteMessage.ToConsensusMessage() - require.NoError(t, err) - mockedNet.EXPECT(). - GossipMessage(pv). - Times(2) - - // should send 2 prevote messages and then stop since we reach the enough amount of prevotes - time.Sleep(subroundInterval * 4) - - // given that we are BOB and we already had predetermined our prevote in a set - // of 4 authorities (ALICE, BOB, CHARLIE and DAVE) then we only need 2 more prevotes - persistVote(grandpa, *votersPublicKeys[0], prevote) // persiste prevote for alice - persistVote(grandpa, *votersPublicKeys[2], prevote) // persiste prevote for charlie - - _, expectedPrecommit, err := grandpa.createSignedVoteAndVoteMessage(expectedVote, precommit) - require.NoError(t, err) - - pc, err := expectedPrecommit.ToConsensusMessage() - require.NoError(t, err) - mockedNet.EXPECT(). - GossipMessage(pc). - Times(1) - - commitMessage := &CommitMessage{ - Round: 0, - Vote: *NewVoteFromHeader(testGenesisHeader), - Precommits: []types.GrandpaVote{}, - AuthData: []AuthData{}, - } - expectedGossipCommitMessage, err := commitMessage.ToConsensusMessage() - require.NoError(t, err) - mockedNet.EXPECT(). - GossipMessage(expectedGossipCommitMessage). - Times(1) - - // should send 1 precommit message and after we persit enough precommit - // votes we will close the `done` channel which will return from the `sendPrecommitMessage` goroutine - time.Sleep(subroundInterval * 2) - - // given that we are BOB and we already had predetermined the precommit given the prevotes - // we only need 2 more precommit messages - persistVote(grandpa, *votersPublicKeys[0], precommit) // persiste prevote for alice - persistVote(grandpa, *votersPublicKeys[2], precommit) // persiste prevote for charlie - }() - - err = grandpa.playGrandpaRound() - assert.NoError(t, err) -} diff --git a/lib/grandpa/vote_message.go b/lib/grandpa/vote_message.go index 92e0e7f72a4..d54e605fa14 100644 --- a/lib/grandpa/vote_message.go +++ b/lib/grandpa/vote_message.go @@ -24,7 +24,6 @@ type networkVoteMessage struct { // receiveVoteMessages receives messages from the in channel until a grandpa round finishes. func (s *Service) receiveVoteMessages(ctx context.Context) { - logger.Debug("receiving pre-vote messages...") for { select { case msg, ok := <-s.in: