diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index 135efbf5a0..a5ac153afc 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -68,6 +68,7 @@ var ( var ( errBusy = errors.New("busy") errUnknownPeer = errors.New("peer is unknown or unhealthy") + errLaggingPeer = errors.New("peer is lagging") errBadPeer = errors.New("action from bad peer ignored") errStallingPeer = errors.New("peer is stalling") errUnsyncedPeer = errors.New("unsynced peer") @@ -395,7 +396,7 @@ func (d *Downloader) Synchronise(id string, head common.Hash, td *big.Int, mode } return err } - log.Warn("Synchronisation failed, retrying", "err", err) + log.Warn("Synchronisation failed, retrying", "peer", id, "err", err) return err } @@ -492,7 +493,7 @@ func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td *big.I }(time.Now()) // Look up the sync boundaries: the common ancestor and the target block - latest, pivot, err := d.fetchHead(p) + remoteHeader, pivot, err := d.fetchHead(p) if err != nil { return err } @@ -503,22 +504,43 @@ func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td *big.I // nil panics on an access. pivot = d.blockchain.CurrentBlock().Header() } - height := latest.Number.Uint64() - origin, err := d.findAncestor(p, latest) + // If the remote peer is lagging behind, no need to sync with it, drop the peer. + remoteHeight := remoteHeader.Number.Uint64() + var localHeight uint64 + switch mode { + case FullSync: + localHeight = d.blockchain.CurrentBlock().NumberU64() + case SnapSync: + localHeight = d.blockchain.CurrentFastBlock().NumberU64() + default: + localHeight = d.lightchain.CurrentHeader().Number.Uint64() + } + + origin, err := d.findAncestor(p, localHeight, remoteHeader) if err != nil { return err } + + if localHeight >= remoteHeight { + // if remoteHeader does not exist in local chain, will move on to insert it as a side chain. + if d.blockchain.GetBlockByHash(remoteHeader.Hash()) != nil || + (mode == LightSync && d.blockchain.GetHeaderByHash(remoteHeader.Hash()) != nil) { + p.log.Warn("syncWithPeer", "local", localHeight, "remote", remoteHeight, "mode", mode, "err", errLaggingPeer) + p.peer.MarkLagging() + return errLaggingPeer + } + } d.syncStatsLock.Lock() if d.syncStatsChainHeight <= origin || d.syncStatsChainOrigin > origin { d.syncStatsChainOrigin = origin } - d.syncStatsChainHeight = height + d.syncStatsChainHeight = remoteHeight d.syncStatsLock.Unlock() // Ensure our origin point is below any snap sync pivot point if mode == SnapSync { - if height <= uint64(fsMinFullBlocks) { + if remoteHeight <= uint64(fsMinFullBlocks) { origin = 0 } else { pivotNumber := pivot.Number.Uint64() @@ -551,8 +573,8 @@ func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td *big.I // could cause issues. if d.checkpoint != 0 && d.checkpoint > fullMaxForkAncestry+1 { d.ancientLimit = d.checkpoint - } else if height > fullMaxForkAncestry+1 { - d.ancientLimit = height - fullMaxForkAncestry - 1 + } else if remoteHeight > fullMaxForkAncestry+1 { + d.ancientLimit = remoteHeight - fullMaxForkAncestry - 1 } else { d.ancientLimit = 0 } @@ -576,12 +598,12 @@ func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td *big.I // Initiate the sync using a concurrent header and content retrieval algorithm d.queue.Prepare(origin+1, mode) if d.syncInitHook != nil { - d.syncInitHook(origin, height) + d.syncInitHook(origin, remoteHeight) } fetchers := []func() error{ - func() error { return d.fetchHeaders(p, origin+1, latest.Number.Uint64()) }, // Headers are always retrieved - func() error { return d.fetchBodies(origin + 1) }, // Bodies are retrieved during normal and snap sync - func() error { return d.fetchReceipts(origin + 1) }, // Receipts are retrieved during snap sync + func() error { return d.fetchHeaders(p, origin+1, remoteHeader.Number.Uint64()) }, // Headers are always retrieved + func() error { return d.fetchBodies(origin + 1) }, // Bodies are retrieved during normal and snap sync + func() error { return d.fetchReceipts(origin + 1) }, // Receipts are retrieved during snap sync func() error { return d.processHeaders(origin+1, td) }, } if mode == SnapSync { @@ -766,22 +788,14 @@ func calculateRequestSpan(remoteHeight, localHeight uint64) (int64, int, int, ui // on the correct chain, checking the top N links should already get us a match. // In the rare scenario when we ended up on a long reorganisation (i.e. none of // the head links match), we do a binary search to find the common ancestor. -func (d *Downloader) findAncestor(p *peerConnection, remoteHeader *types.Header) (uint64, error) { +func (d *Downloader) findAncestor(p *peerConnection, localHeight uint64, remoteHeader *types.Header) (uint64, error) { // Figure out the valid ancestor range to prevent rewrite attacks var ( floor = int64(-1) - localHeight uint64 remoteHeight = remoteHeader.Number.Uint64() ) mode := d.getMode() - switch mode { - case FullSync: - localHeight = d.blockchain.CurrentBlock().NumberU64() - case SnapSync: - localHeight = d.blockchain.CurrentFastBlock().NumberU64() - default: - localHeight = d.lightchain.CurrentHeader().Number.Uint64() - } + p.log.Debug("Looking for common ancestor", "local", localHeight, "remote", remoteHeight) // Recap floor value for binary search diff --git a/eth/downloader/downloader_test.go b/eth/downloader/downloader_test.go index 88951fba11..8b4a8504cf 100644 --- a/eth/downloader/downloader_test.go +++ b/eth/downloader/downloader_test.go @@ -147,6 +147,9 @@ type downloadTesterPeer struct { withholdHeaders map[common.Hash]struct{} } +func (dlp *downloadTesterPeer) MarkLagging() { +} + // Head constructs a function to retrieve a peer's current head hash // and total difficulty. func (dlp *downloadTesterPeer) Head() (common.Hash, *big.Int) { @@ -926,8 +929,8 @@ func testHighTDStarvationAttack(t *testing.T, protocol uint, mode SyncMode) { chain := testChainBase.shorten(1) tester.newPeer("attack", protocol, chain.blocks[1:]) - if err := tester.sync("attack", big.NewInt(1000000), mode); err != errStallingPeer { - t.Fatalf("synchronisation error mismatch: have %v, want %v", err, errStallingPeer) + if err := tester.sync("attack", big.NewInt(1000000), mode); err != errLaggingPeer { + t.Fatalf("synchronisation error mismatch: have %v, want %v", err, errLaggingPeer) } } @@ -1235,9 +1238,26 @@ func testFakedSyncProgress(t *testing.T, protocol uint, mode SyncMode) { pending.Wait() afterFailedSync := tester.downloader.Progress() - // Synchronise with a good peer and check that the progress height has been reduced to + // it is no longer valid to sync to a lagging peer + laggingChain := chain.shorten(800 / 2) + tester.newPeer("lagging", protocol, laggingChain.blocks[1:]) + pending.Add(1) + go func() { + defer pending.Done() + if err := tester.sync("lagging", nil, mode); err != errLaggingPeer { + panic(fmt.Sprintf("unexpected lagging synchronisation err:%v", err)) + } + }() + // lagging peer will return before syncInitHook, skip <-starting and progress <- struct{}{} + checkProgress(t, tester.downloader, "lagging", ethereum.SyncProgress{ + CurrentBlock: afterFailedSync.CurrentBlock, + HighestBlock: uint64(len(chain.blocks) - 1), + }) + pending.Wait() + + // Synchronise with a good peer and check that the progress height has been increased to // the true value. - validChain := chain.shorten(len(chain.blocks) - numMissing) + validChain := chain.shorten(len(chain.blocks)) tester.newPeer("valid", protocol, validChain.blocks[1:]) pending.Add(1) diff --git a/eth/downloader/peer.go b/eth/downloader/peer.go index 2d140abfd0..b22ce89941 100644 --- a/eth/downloader/peer.go +++ b/eth/downloader/peer.go @@ -58,6 +58,7 @@ type peerConnection struct { // LightPeer encapsulates the methods required to synchronise with a remote light peer. type LightPeer interface { Head() (common.Hash, *big.Int) + MarkLagging() RequestHeadersByHash(common.Hash, int, int, bool, chan *eth.Response) (*eth.Request, error) RequestHeadersByNumber(uint64, int, int, bool, chan *eth.Response) (*eth.Request, error) } @@ -75,6 +76,7 @@ type lightPeerWrapper struct { } func (w *lightPeerWrapper) Head() (common.Hash, *big.Int) { return w.peer.Head() } +func (w *lightPeerWrapper) MarkLagging() { w.peer.MarkLagging() } func (w *lightPeerWrapper) RequestHeadersByHash(h common.Hash, amount int, skip int, reverse bool, sink chan *eth.Response) (*eth.Request, error) { return w.peer.RequestHeadersByHash(h, amount, skip, reverse, sink) } diff --git a/eth/peerset.go b/eth/peerset.go index b68d0e7783..6484dae8dd 100644 --- a/eth/peerset.go +++ b/eth/peerset.go @@ -478,6 +478,9 @@ func (ps *peerSet) peerWithHighestTD() *eth.Peer { bestTd *big.Int ) for _, p := range ps.peers { + if p.Lagging() { + continue + } if _, td := p.Head(); bestPeer == nil || td.Cmp(bestTd) > 0 { bestPeer, bestTd = p.Peer, td } diff --git a/eth/protocols/eth/peer.go b/eth/protocols/eth/peer.go index 2f6b0293f6..8bf973f9c3 100644 --- a/eth/protocols/eth/peer.go +++ b/eth/protocols/eth/peer.go @@ -74,8 +74,9 @@ type Peer struct { version uint // Protocol version negotiated statusExtension *UpgradeStatusExtension - head common.Hash // Latest advertised head block hash - td *big.Int // Latest advertised head block total difficulty + lagging bool // lagging peer is still connected, but won't be used to sync. + head common.Hash // Latest advertised head block hash + td *big.Int // Latest advertised head block total difficulty knownBlocks *knownCache // Set of block hashes known to be known by this peer queuedBlocks chan *blockPropagation // Queue of blocks to broadcast to the peer @@ -153,6 +154,14 @@ func (p *Peer) Version() uint { return p.version } +func (p *Peer) Lagging() bool { + return p.lagging +} + +func (p *Peer) MarkLagging() { + p.lagging = true +} + // Head retrieves the current head hash and total difficulty of the peer. func (p *Peer) Head() (hash common.Hash, td *big.Int) { p.lock.RLock() @@ -166,7 +175,7 @@ func (p *Peer) Head() (hash common.Hash, td *big.Int) { func (p *Peer) SetHead(hash common.Hash, td *big.Int) { p.lock.Lock() defer p.lock.Unlock() - + p.lagging = false copy(p.head[:], hash[:]) p.td.Set(td) }