Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: p2p sync with lagging peer #1301

Merged
merged 8 commits into from
Feb 9, 2023
Merged
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 30 additions & 21 deletions eth/downloader/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ var (
var (
errBusy = errors.New("busy")
errUnknownPeer = errors.New("peer is unknown or unhealthy")
errLaggingPeer = errors.New("peer is lagging")
errBadPeer = errors.New("action from bad peer ignored")
errStallingPeer = errors.New("peer is stalling")
errUnsyncedPeer = errors.New("unsynced peer")
Expand Down Expand Up @@ -492,7 +493,7 @@ func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td *big.I
}(time.Now())

// Look up the sync boundaries: the common ancestor and the target block
latest, pivot, err := d.fetchHead(p)
remoteHeader, pivot, err := d.fetchHead(p)
if err != nil {
return err
}
Expand All @@ -503,22 +504,38 @@ func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td *big.I
// nil panics on an access.
pivot = d.blockchain.CurrentBlock().Header()
}
height := latest.Number.Uint64()

origin, err := d.findAncestor(p, latest)
// If the remote peer is lagging behind, no need to sync with it, drop the peer.
remoteHeight := remoteHeader.Number.Uint64()
var localHeight uint64
switch mode {
unclezoro marked this conversation as resolved.
Show resolved Hide resolved
case FullSync:
localHeight = d.blockchain.CurrentBlock().NumberU64()
case SnapSync:
localHeight = d.blockchain.CurrentFastBlock().NumberU64()
default:
localHeight = d.lightchain.CurrentHeader().Number.Uint64()
}
if localHeight >= remoteHeight {
if d.blockchain.GetBlockByHash(remoteHeader.Hash()) != nil {
p.log.Warn("syncWithPeer", "local", localHeight, "remote", remoteHeight, "mode", mode, "err", errLaggingPeer)
return errLaggingPeer
}
}
origin, err := d.findAncestor(p, localHeight, remoteHeader)
if err != nil {
return err
}
d.syncStatsLock.Lock()
if d.syncStatsChainHeight <= origin || d.syncStatsChainOrigin > origin {
d.syncStatsChainOrigin = origin
}
d.syncStatsChainHeight = height
d.syncStatsChainHeight = remoteHeight
d.syncStatsLock.Unlock()

// Ensure our origin point is below any snap sync pivot point
if mode == SnapSync {
if height <= uint64(fsMinFullBlocks) {
if remoteHeight <= uint64(fsMinFullBlocks) {
origin = 0
} else {
pivotNumber := pivot.Number.Uint64()
Expand Down Expand Up @@ -551,8 +568,8 @@ func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td *big.I
// could cause issues.
if d.checkpoint != 0 && d.checkpoint > fullMaxForkAncestry+1 {
d.ancientLimit = d.checkpoint
} else if height > fullMaxForkAncestry+1 {
d.ancientLimit = height - fullMaxForkAncestry - 1
} else if remoteHeight > fullMaxForkAncestry+1 {
d.ancientLimit = remoteHeight - fullMaxForkAncestry - 1
} else {
d.ancientLimit = 0
}
Expand All @@ -576,12 +593,12 @@ func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td *big.I
// Initiate the sync using a concurrent header and content retrieval algorithm
d.queue.Prepare(origin+1, mode)
if d.syncInitHook != nil {
d.syncInitHook(origin, height)
d.syncInitHook(origin, remoteHeight)
}
fetchers := []func() error{
func() error { return d.fetchHeaders(p, origin+1, latest.Number.Uint64()) }, // Headers are always retrieved
func() error { return d.fetchBodies(origin + 1) }, // Bodies are retrieved during normal and snap sync
func() error { return d.fetchReceipts(origin + 1) }, // Receipts are retrieved during snap sync
func() error { return d.fetchHeaders(p, origin+1, remoteHeader.Number.Uint64()) }, // Headers are always retrieved
func() error { return d.fetchBodies(origin + 1) }, // Bodies are retrieved during normal and snap sync
func() error { return d.fetchReceipts(origin + 1) }, // Receipts are retrieved during snap sync
func() error { return d.processHeaders(origin+1, td) },
}
if mode == SnapSync {
Expand Down Expand Up @@ -766,22 +783,14 @@ func calculateRequestSpan(remoteHeight, localHeight uint64) (int64, int, int, ui
// on the correct chain, checking the top N links should already get us a match.
// In the rare scenario when we ended up on a long reorganisation (i.e. none of
// the head links match), we do a binary search to find the common ancestor.
func (d *Downloader) findAncestor(p *peerConnection, remoteHeader *types.Header) (uint64, error) {
func (d *Downloader) findAncestor(p *peerConnection, localHeight uint64, remoteHeader *types.Header) (uint64, error) {
// Figure out the valid ancestor range to prevent rewrite attacks
var (
floor = int64(-1)
localHeight uint64
remoteHeight = remoteHeader.Number.Uint64()
)
mode := d.getMode()
switch mode {
case FullSync:
localHeight = d.blockchain.CurrentBlock().NumberU64()
case SnapSync:
localHeight = d.blockchain.CurrentFastBlock().NumberU64()
default:
localHeight = d.lightchain.CurrentHeader().Number.Uint64()
}

p.log.Debug("Looking for common ancestor", "local", localHeight, "remote", remoteHeight)

// Recap floor value for binary search
Expand Down