Skip to content

Commit

Permalink
feat: implements before best block by N voting rule
Browse files Browse the repository at this point in the history
  • Loading branch information
EclesioMeloJunior committed Aug 16, 2022
1 parent c93dbe8 commit c401626
Show file tree
Hide file tree
Showing 7 changed files with 173 additions and 421 deletions.
8 changes: 1 addition & 7 deletions dot/network/notifications.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,13 +139,7 @@ func (s *Service) createNotificationsMessageHandler(
return fmt.Errorf("could not check if message was seen before: %w", err)
}

// sometimes substrate can send prevote/precommit messages already seen by
// gossamer but those messages are related to another round, for example to
// finalize a block Y substrate sends prevotes and precommits to Y in round r
// and in the round r + 1 it is possible to receive prevotes for block Y again, this
// is not a problem and we can improve the gossamer behaviour implementing Polite GRANDPA #2505
_, isConsensusMsg := msg.(*ConsensusMessage)
if hasSeen && !isConsensusMsg {
if hasSeen {
// report peer if we get duplicate gossip message.
s.host.cm.peerSetHandler.ReportPeer(peerset.ReputationChange{
Value: peerset.DuplicateGossipValue,
Expand Down
210 changes: 101 additions & 109 deletions lib/grandpa/grandpa.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,9 @@ type Service struct {
bestFinalCandidate map[uint64]*Vote // map of round number -> best final candidate

// channels for communication with other services
in chan *networkVoteMessage // only used to receive *VoteMessage
finalisedCh chan *types.FinalisationInfo
in chan *networkVoteMessage // only used to receive *VoteMessage
finalisedCh chan *types.FinalisationInfo
neighbourMessage *NeighbourMessage // cached neighbour message

telemetry telemetry.Client
}
Expand Down Expand Up @@ -199,7 +200,7 @@ func (s *Service) Start() error {
}
}()

go s.notifyNeighbor(neighbourMessageInterval)
go s.sendNeighbourMessage(neighbourMessageInterval)

return nil
}
Expand Down Expand Up @@ -343,13 +344,6 @@ func (s *Service) initiateRound() error {
s.precommits = new(sync.Map)
s.pvEquivocations = make(map[ed25519.PublicKeyBytes][]*SignedVote)
s.pcEquivocations = make(map[ed25519.PublicKeyBytes][]*SignedVote)

s.sendNeighborMessage(&NeighbourMessage{
Version: 1,
Round: s.state.round,
SetID: s.state.setID,
Number: uint32(s.head.Number),
})
s.roundLock.Unlock()

best, err := s.blockState.BestBlockHeader()
Expand Down Expand Up @@ -484,6 +478,7 @@ func (s *Service) playGrandpaRound() error {
return err
}

logger.Debug("receiving pre-vote messages...")
go s.receiveVoteMessages(ctx)
time.Sleep(s.interval)

Expand All @@ -506,15 +501,25 @@ func (s *Service) playGrandpaRound() error {
s.prevotes.Store(s.publicKeyBytes(), spv)
}

prevoteRoundComplete := make(chan struct{})
go s.sendPrevoteMessage(vm, prevoteRoundComplete)
logger.Debugf("sending pre-vote message %s...", pv)
roundComplete := make(chan struct{})
// roundComplete is a signal channel which is closed when the round completes
// (will receive the default value of channel's type), so we don't need to
// explicitly send a value.
defer close(roundComplete)

// continue to send prevote messages until round is done
go s.sendVoteMessage(prevote, vm, roundComplete)

logger.Debug("receiving pre-commit messages...")
// through goroutine s.receiveVoteMessages(ctx)
time.Sleep(s.interval)

if s.paused.Load().(bool) {
return ErrServicePaused
}

// determine and broadcast pre-commit only after seen prevote messages
<-prevoteRoundComplete
// broadcast pre-commit
pc, err := s.determinePreCommit()
if err != nil {
return err
Expand All @@ -526,13 +531,12 @@ func (s *Service) playGrandpaRound() error {
}

s.precommits.Store(s.publicKeyBytes(), spc)
logger.Debugf("sending pre-commit message %s...", pc)

// continue to send precommit messages until round is done
precommitDoneCh := make(chan struct{})
go s.sendPrecommitMessage(pcm, precommitDoneCh)
go s.sendVoteMessage(precommit, pcm, roundComplete)

err = s.attemptToFinalize(precommitDoneCh)
if err != nil {
if err = s.attemptToFinalize(); err != nil {
logger.Errorf("failed to finalise: %s", err)
return err
}
Expand All @@ -541,77 +545,35 @@ func (s *Service) playGrandpaRound() error {
return nil
}

func (s *Service) sendPrecommitMessage(vm *VoteMessage, done <-chan struct{}) {
logger.Debugf("sending pre-commit message %s...", vm.Message)

func (s *Service) sendVoteMessage(stage Subround, msg *VoteMessage, roundComplete <-chan struct{}) {
ticker := time.NewTicker(s.interval * 4)
defer ticker.Stop()

threshold := s.state.threshold()

// Though this looks like we are sending messages multiple times,
// caching would make sure that they are being sent only once.
for {
if s.paused.Load().(bool) {
return
}

if err := s.sendMessage(vm); err != nil {
logger.Warnf("could not send message for stage %s: %s", precommit, err)
if err := s.sendMessage(msg); err != nil {
logger.Warnf("could not send message for stage %s: %s", stage, err)
} else {
logger.Warnf("sent vote message for stage %s: %s", precommit, vm.Message)
}

if uint64(s.lenVotes(precommit)) >= threshold {
<-done
return
logger.Tracef("sent vote message for stage %s: %s", stage, msg.Message)
}

select {
case <-ticker.C:
case <-done:
return
}
}
}

func (s *Service) sendPrevoteMessage(vm *VoteMessage, done chan<- struct{}) {
defer close(done)
logger.Debugf("sending pre-vote message %s...", vm)

ticker := time.NewTicker(s.interval * 4)
defer ticker.Stop()

threshold := s.state.threshold()

// Though this looks like we are sending messages multiple times,
// caching would make sure that they are being sent only once.
for {
if err := s.sendMessage(vm); err != nil {
logger.Warnf("could not send message for stage %s: %s", prevote, err)
} else {
logger.Warnf("sent vote message for stage %s: %s", prevote, vm.Message)
}

if s.paused.Load().(bool) || uint64(s.lenVotes(prevote)) >= threshold {
case <-roundComplete:
return
case <-ticker.C:
}

<-ticker.C
}
}

// attemptToFinalize loops until the round is finalisable
func (s *Service) attemptToFinalize(precommitDoneCh chan<- struct{}) error {
defer close(precommitDoneCh)

func (s *Service) attemptToFinalize() error {
ticker := time.NewTicker(s.interval / 100)

var (
bestFinalCandidate *types.GrandpaVote
precommitCount uint64
)

for {
select {
case <-s.ctx.Done():
Expand Down Expand Up @@ -642,54 +604,50 @@ func (s *Service) attemptToFinalize(precommitDoneCh chan<- struct{}) error {
return nil // a block was finalised, seems like we missed some messages
}

var err error
bestFinalCandidate, err = s.getBestFinalCandidate()
bfc, err := s.getBestFinalCandidate()
if err != nil {
return err
}

precommitCount, err = s.getTotalVotesForBlock(bestFinalCandidate.Hash, precommit)
pc, err := s.getTotalVotesForBlock(bfc.Hash, precommit)
if err != nil {
return err
}

// once we reach the threshold we should stop sending precommit messages to other peers
if bestFinalCandidate.Number < uint32(s.head.Number) || precommitCount <= s.state.threshold() {
if bfc.Number < uint32(s.head.Number) || pc <= s.state.threshold() {
continue
}

break
}

if err := s.finalise(); err != nil {
return err
}
if err = s.finalise(); err != nil {
return err
}

// if we haven't received a finalisation message for this block yet, broadcast a finalisation message
votes := s.getDirectVotes(precommit)
logger.Debugf("block was finalised for round %d and set id %d. "+
"Head hash is %s, %d direct votes for bfc and %d total votes for bfc",
s.state.round, s.state.setID, s.head.Hash(), votes[*bestFinalCandidate], precommitCount)
// if we haven't received a finalisation message for this block yet, broadcast a finalisation message
votes := s.getDirectVotes(precommit)
logger.Debugf("block was finalised for round %d and set id %d. "+
"Head hash is %s, %d direct votes for bfc and %d total votes for bfc",
s.state.round, s.state.setID, s.head.Hash(), votes[*bfc], pc)

cm, err := s.newCommitMessage(s.head, s.state.round)
if err != nil {
return err
}
cm, err := s.newCommitMessage(s.head, s.state.round)
if err != nil {
return err
}

msg, err := cm.ToConsensusMessage()
if err != nil {
return err
}
msg, err := cm.ToConsensusMessage()
if err != nil {
return err
}

logger.Debugf("sending CommitMessage: %v", cm)
s.network.GossipMessage(msg)
logger.Debugf("sending CommitMessage: %v", cm)
s.network.GossipMessage(msg)

s.telemetry.SendMessage(telemetry.NewAfgFinalizedBlocksUpTo(
s.head.Hash(),
fmt.Sprint(s.head.Number),
))
s.telemetry.SendMessage(telemetry.NewAfgFinalizedBlocksUpTo(
s.head.Hash(),
fmt.Sprint(s.head.Number),
))

return nil
return nil
}
}

func (s *Service) loadVote(key ed25519.PublicKeyBytes, stage Subround) (*SignedVote, bool) {
Expand Down Expand Up @@ -721,6 +679,44 @@ func (s *Service) deleteVote(key ed25519.PublicKeyBytes, stage Subround) {
}
}

// implements `BeforeBestBlockBy` a custom voting rule that guarantees that our vote is always
// behind the best block by at least N blocks, unless the base number is < N blocks behind the
// best, in which case it votes for the base.
// (https://github.com/paritytech/substrate/blob/1fd71c7845d6c28c532795ec79106d959dd1fe30/client/finality-grandpa/src/voting_rule.rs#L92)
func (s *Service) determineBestHeaderToPrevote(finalizedHeader, bestBlockHeader *types.Header) (
headerToPrevote *types.Header, err error) {
gensisHash := s.blockState.GenesisHash()
isGenesisHash := gensisHash.Equal(bestBlockHeader.Hash())
if isGenesisHash || finalizedHeader.Hash().Equal(bestBlockHeader.Hash()) {
return bestBlockHeader, nil
}

isDescendant, err := s.blockState.IsDescendantOf(finalizedHeader.Hash(), bestBlockHeader.Hash())
if err != nil {
return headerToPrevote, fmt.Errorf("determine ancestry: %w", err)
}

if !isDescendant {
return headerToPrevote, fmt.Errorf("%w: %s is not ancestor of %s",
blocktree.ErrNoCommonAncestor, bestBlockHeader.Hash().Short(), bestBlockHeader.Hash().Short())
}

headerToPrevote = bestBlockHeader
for i := 0; i < 2; i++ {
headerToPrevote, err := s.blockState.GetHeader(headerToPrevote.ParentHash)
if err != nil {
return headerToPrevote, fmt.Errorf("get parent header: %w", err)
}

isGenesisHash := gensisHash.Equal(headerToPrevote.Hash())
if finalizedHeader.Hash().Equal(headerToPrevote.Hash()) || isGenesisHash {
break
}
}

return headerToPrevote, nil
}

// determinePreVote determines what block is our pre-voted block for the current round
func (s *Service) determinePreVote() (*Vote, error) {
var vote *Vote
Expand All @@ -730,19 +726,15 @@ func (s *Service) determinePreVote() (*Vote, error) {
return nil, fmt.Errorf("cannot get best block header: %w", err)
}

// if we receive a vote message from the primary with a
// block that's greater than or equal to the current pre-voted block
// and greater than the best final candidate from the last round, we choose that.
// otherwise, we simply choose the head of our chain.
primary := s.derivePrimary()
prm, has := s.loadVote(primary.PublicKeyBytes(), prevote)
if has && prm.Vote.Number >= uint32(s.head.Number) {
vote = &prm.Vote
} else {
vote = NewVoteFromHeader(bestBlockHeader)
headerToPrevote, err := s.determineBestHeaderToPrevote(s.head, bestBlockHeader)
if err != nil {
return nil, fmt.Errorf("determine best hash to prevote: %w", err)
}

nextChange, err := s.grandpaState.NextGrandpaAuthorityChange(bestBlockHeader.Hash(), bestBlockHeader.Number)
vote = NewVoteFromHeader(headerToPrevote)
nextChange, err := s.grandpaState.NextGrandpaAuthorityChange(
headerToPrevote.Hash(), headerToPrevote.Number)

if errors.Is(err, state.ErrNoNextAuthorityChange) {
return vote, nil
} else if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion lib/grandpa/mocks_generate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,4 @@

package grandpa

//go:generate mockgen -destination=mocks_test.go -package $GOPACKAGE . BlockState,GrandpaState,Network
//go:generate mockgen -destination=mocks_test.go -package $GOPACKAGE . BlockState,GrandpaState
Loading

0 comments on commit c401626

Please sign in to comment.