Skip to content

Commit

Permalink
fix: p2p sync with lagging peer (bnb-chain#1301)
Browse files Browse the repository at this point in the history
* fix: p2p sync with lagging peer

no need to sync with lagging peer, which could make the local chain stalling as well.

* fix: do not drop lagging peer, will retry it later.

The lagging peer is probably already the best peer with largest total difficulty.
Shoule not remove it, since p2p is a bidirectional connection, drop it could make
the peer unable to sync with this peer as well.
And the lagging peer could catch up later, so keep it.

* p2p: add lagging field in Peer

lagging peer will be connected, but won't be used to sync.
the lagging flag can be clear once the Peer updates its latest block state.

* test: fix UT compile issue

* fix: lagging peer func rename

* test: fix a UT fail of download test

errStallingPeer is replaced by errLaggingPeer in this case

* fix: lagging issue in light mode

* test: add and resolve UT of lagging peer
  • Loading branch information
setunapo authored and j75689 committed Feb 13, 2023
1 parent 0b653d6 commit abee97e
Show file tree
Hide file tree
Showing 5 changed files with 77 additions and 29 deletions.
58 changes: 36 additions & 22 deletions eth/downloader/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ var (
var (
errBusy = errors.New("busy")
errUnknownPeer = errors.New("peer is unknown or unhealthy")
errLaggingPeer = errors.New("peer is lagging")
errBadPeer = errors.New("action from bad peer ignored")
errStallingPeer = errors.New("peer is stalling")
errUnsyncedPeer = errors.New("unsynced peer")
Expand Down Expand Up @@ -395,7 +396,7 @@ func (d *Downloader) Synchronise(id string, head common.Hash, td *big.Int, mode
}
return err
}
log.Warn("Synchronisation failed, retrying", "err", err)
log.Warn("Synchronisation failed, retrying", "peer", id, "err", err)
return err
}

Expand Down Expand Up @@ -492,7 +493,7 @@ func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td *big.I
}(time.Now())

// Look up the sync boundaries: the common ancestor and the target block
latest, pivot, err := d.fetchHead(p)
remoteHeader, pivot, err := d.fetchHead(p)
if err != nil {
return err
}
Expand All @@ -503,22 +504,43 @@ func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td *big.I
// nil panics on an access.
pivot = d.blockchain.CurrentBlock().Header()
}
height := latest.Number.Uint64()

origin, err := d.findAncestor(p, latest)
// If the remote peer is lagging behind, no need to sync with it, drop the peer.
remoteHeight := remoteHeader.Number.Uint64()
var localHeight uint64
switch mode {
case FullSync:
localHeight = d.blockchain.CurrentBlock().NumberU64()
case SnapSync:
localHeight = d.blockchain.CurrentFastBlock().NumberU64()
default:
localHeight = d.lightchain.CurrentHeader().Number.Uint64()
}

origin, err := d.findAncestor(p, localHeight, remoteHeader)
if err != nil {
return err
}

if localHeight >= remoteHeight {
// if remoteHeader does not exist in local chain, will move on to insert it as a side chain.
if d.blockchain.GetBlockByHash(remoteHeader.Hash()) != nil ||
(mode == LightSync && d.blockchain.GetHeaderByHash(remoteHeader.Hash()) != nil) {
p.log.Warn("syncWithPeer", "local", localHeight, "remote", remoteHeight, "mode", mode, "err", errLaggingPeer)
p.peer.MarkLagging()
return errLaggingPeer
}
}
d.syncStatsLock.Lock()
if d.syncStatsChainHeight <= origin || d.syncStatsChainOrigin > origin {
d.syncStatsChainOrigin = origin
}
d.syncStatsChainHeight = height
d.syncStatsChainHeight = remoteHeight
d.syncStatsLock.Unlock()

// Ensure our origin point is below any snap sync pivot point
if mode == SnapSync {
if height <= uint64(fsMinFullBlocks) {
if remoteHeight <= uint64(fsMinFullBlocks) {
origin = 0
} else {
pivotNumber := pivot.Number.Uint64()
Expand Down Expand Up @@ -551,8 +573,8 @@ func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td *big.I
// could cause issues.
if d.checkpoint != 0 && d.checkpoint > fullMaxForkAncestry+1 {
d.ancientLimit = d.checkpoint
} else if height > fullMaxForkAncestry+1 {
d.ancientLimit = height - fullMaxForkAncestry - 1
} else if remoteHeight > fullMaxForkAncestry+1 {
d.ancientLimit = remoteHeight - fullMaxForkAncestry - 1
} else {
d.ancientLimit = 0
}
Expand All @@ -576,12 +598,12 @@ func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td *big.I
// Initiate the sync using a concurrent header and content retrieval algorithm
d.queue.Prepare(origin+1, mode)
if d.syncInitHook != nil {
d.syncInitHook(origin, height)
d.syncInitHook(origin, remoteHeight)
}
fetchers := []func() error{
func() error { return d.fetchHeaders(p, origin+1, latest.Number.Uint64()) }, // Headers are always retrieved
func() error { return d.fetchBodies(origin + 1) }, // Bodies are retrieved during normal and snap sync
func() error { return d.fetchReceipts(origin + 1) }, // Receipts are retrieved during snap sync
func() error { return d.fetchHeaders(p, origin+1, remoteHeader.Number.Uint64()) }, // Headers are always retrieved
func() error { return d.fetchBodies(origin + 1) }, // Bodies are retrieved during normal and snap sync
func() error { return d.fetchReceipts(origin + 1) }, // Receipts are retrieved during snap sync
func() error { return d.processHeaders(origin+1, td) },
}
if mode == SnapSync {
Expand Down Expand Up @@ -766,22 +788,14 @@ func calculateRequestSpan(remoteHeight, localHeight uint64) (int64, int, int, ui
// on the correct chain, checking the top N links should already get us a match.
// In the rare scenario when we ended up on a long reorganisation (i.e. none of
// the head links match), we do a binary search to find the common ancestor.
func (d *Downloader) findAncestor(p *peerConnection, remoteHeader *types.Header) (uint64, error) {
func (d *Downloader) findAncestor(p *peerConnection, localHeight uint64, remoteHeader *types.Header) (uint64, error) {
// Figure out the valid ancestor range to prevent rewrite attacks
var (
floor = int64(-1)
localHeight uint64
remoteHeight = remoteHeader.Number.Uint64()
)
mode := d.getMode()
switch mode {
case FullSync:
localHeight = d.blockchain.CurrentBlock().NumberU64()
case SnapSync:
localHeight = d.blockchain.CurrentFastBlock().NumberU64()
default:
localHeight = d.lightchain.CurrentHeader().Number.Uint64()
}

p.log.Debug("Looking for common ancestor", "local", localHeight, "remote", remoteHeight)

// Recap floor value for binary search
Expand Down
28 changes: 24 additions & 4 deletions eth/downloader/downloader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,9 @@ type downloadTesterPeer struct {
withholdHeaders map[common.Hash]struct{}
}

func (dlp *downloadTesterPeer) MarkLagging() {
}

// Head constructs a function to retrieve a peer's current head hash
// and total difficulty.
func (dlp *downloadTesterPeer) Head() (common.Hash, *big.Int) {
Expand Down Expand Up @@ -926,8 +929,8 @@ func testHighTDStarvationAttack(t *testing.T, protocol uint, mode SyncMode) {

chain := testChainBase.shorten(1)
tester.newPeer("attack", protocol, chain.blocks[1:])
if err := tester.sync("attack", big.NewInt(1000000), mode); err != errStallingPeer {
t.Fatalf("synchronisation error mismatch: have %v, want %v", err, errStallingPeer)
if err := tester.sync("attack", big.NewInt(1000000), mode); err != errLaggingPeer {
t.Fatalf("synchronisation error mismatch: have %v, want %v", err, errLaggingPeer)
}
}

Expand Down Expand Up @@ -1235,9 +1238,26 @@ func testFakedSyncProgress(t *testing.T, protocol uint, mode SyncMode) {
pending.Wait()
afterFailedSync := tester.downloader.Progress()

// Synchronise with a good peer and check that the progress height has been reduced to
// it is no longer valid to sync to a lagging peer
laggingChain := chain.shorten(800 / 2)
tester.newPeer("lagging", protocol, laggingChain.blocks[1:])
pending.Add(1)
go func() {
defer pending.Done()
if err := tester.sync("lagging", nil, mode); err != errLaggingPeer {
panic(fmt.Sprintf("unexpected lagging synchronisation err:%v", err))
}
}()
// lagging peer will return before syncInitHook, skip <-starting and progress <- struct{}{}
checkProgress(t, tester.downloader, "lagging", ethereum.SyncProgress{
CurrentBlock: afterFailedSync.CurrentBlock,
HighestBlock: uint64(len(chain.blocks) - 1),
})
pending.Wait()

// Synchronise with a good peer and check that the progress height has been increased to
// the true value.
validChain := chain.shorten(len(chain.blocks) - numMissing)
validChain := chain.shorten(len(chain.blocks))
tester.newPeer("valid", protocol, validChain.blocks[1:])
pending.Add(1)

Expand Down
2 changes: 2 additions & 0 deletions eth/downloader/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ type peerConnection struct {
// LightPeer encapsulates the methods required to synchronise with a remote light peer.
type LightPeer interface {
Head() (common.Hash, *big.Int)
MarkLagging()
RequestHeadersByHash(common.Hash, int, int, bool, chan *eth.Response) (*eth.Request, error)
RequestHeadersByNumber(uint64, int, int, bool, chan *eth.Response) (*eth.Request, error)
}
Expand All @@ -75,6 +76,7 @@ type lightPeerWrapper struct {
}

func (w *lightPeerWrapper) Head() (common.Hash, *big.Int) { return w.peer.Head() }
func (w *lightPeerWrapper) MarkLagging() { w.peer.MarkLagging() }
func (w *lightPeerWrapper) RequestHeadersByHash(h common.Hash, amount int, skip int, reverse bool, sink chan *eth.Response) (*eth.Request, error) {
return w.peer.RequestHeadersByHash(h, amount, skip, reverse, sink)
}
Expand Down
3 changes: 3 additions & 0 deletions eth/peerset.go
Original file line number Diff line number Diff line change
Expand Up @@ -478,6 +478,9 @@ func (ps *peerSet) peerWithHighestTD() *eth.Peer {
bestTd *big.Int
)
for _, p := range ps.peers {
if p.Lagging() {
continue
}
if _, td := p.Head(); bestPeer == nil || td.Cmp(bestTd) > 0 {
bestPeer, bestTd = p.Peer, td
}
Expand Down
15 changes: 12 additions & 3 deletions eth/protocols/eth/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,9 @@ type Peer struct {
version uint // Protocol version negotiated
statusExtension *UpgradeStatusExtension

head common.Hash // Latest advertised head block hash
td *big.Int // Latest advertised head block total difficulty
lagging bool // lagging peer is still connected, but won't be used to sync.
head common.Hash // Latest advertised head block hash
td *big.Int // Latest advertised head block total difficulty

knownBlocks *knownCache // Set of block hashes known to be known by this peer
queuedBlocks chan *blockPropagation // Queue of blocks to broadcast to the peer
Expand Down Expand Up @@ -153,6 +154,14 @@ func (p *Peer) Version() uint {
return p.version
}

func (p *Peer) Lagging() bool {
return p.lagging
}

func (p *Peer) MarkLagging() {
p.lagging = true
}

// Head retrieves the current head hash and total difficulty of the peer.
func (p *Peer) Head() (hash common.Hash, td *big.Int) {
p.lock.RLock()
Expand All @@ -166,7 +175,7 @@ func (p *Peer) Head() (hash common.Hash, td *big.Int) {
func (p *Peer) SetHead(hash common.Hash, td *big.Int) {
p.lock.Lock()
defer p.lock.Unlock()

p.lagging = false
copy(p.head[:], hash[:])
p.td.Set(td)
}
Expand Down

0 comments on commit abee97e

Please sign in to comment.