Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: p2p sync with lagging peer #1301

Merged
merged 8 commits into from
Feb 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 36 additions & 22 deletions eth/downloader/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ var (
var (
errBusy = errors.New("busy")
errUnknownPeer = errors.New("peer is unknown or unhealthy")
errLaggingPeer = errors.New("peer is lagging")
errBadPeer = errors.New("action from bad peer ignored")
errStallingPeer = errors.New("peer is stalling")
errUnsyncedPeer = errors.New("unsynced peer")
Expand Down Expand Up @@ -395,7 +396,7 @@ func (d *Downloader) Synchronise(id string, head common.Hash, td *big.Int, mode
}
return err
}
log.Warn("Synchronisation failed, retrying", "err", err)
log.Warn("Synchronisation failed, retrying", "peer", id, "err", err)
return err
}

Expand Down Expand Up @@ -492,7 +493,7 @@ func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td *big.I
}(time.Now())

// Look up the sync boundaries: the common ancestor and the target block
latest, pivot, err := d.fetchHead(p)
remoteHeader, pivot, err := d.fetchHead(p)
if err != nil {
return err
}
Expand All @@ -503,22 +504,43 @@ func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td *big.I
// nil panics on an access.
pivot = d.blockchain.CurrentBlock().Header()
}
height := latest.Number.Uint64()

origin, err := d.findAncestor(p, latest)
// If the remote peer is lagging behind, no need to sync with it, drop the peer.
remoteHeight := remoteHeader.Number.Uint64()
var localHeight uint64
switch mode {
unclezoro marked this conversation as resolved.
Show resolved Hide resolved
case FullSync:
localHeight = d.blockchain.CurrentBlock().NumberU64()
case SnapSync:
localHeight = d.blockchain.CurrentFastBlock().NumberU64()
default:
localHeight = d.lightchain.CurrentHeader().Number.Uint64()
}

origin, err := d.findAncestor(p, localHeight, remoteHeader)
if err != nil {
return err
}

if localHeight >= remoteHeight {
// if remoteHeader does not exist in local chain, will move on to insert it as a side chain.
if d.blockchain.GetBlockByHash(remoteHeader.Hash()) != nil ||
(mode == LightSync && d.blockchain.GetHeaderByHash(remoteHeader.Hash()) != nil) {
p.log.Warn("syncWithPeer", "local", localHeight, "remote", remoteHeight, "mode", mode, "err", errLaggingPeer)
p.peer.MarkLagging()
return errLaggingPeer
}
}
d.syncStatsLock.Lock()
if d.syncStatsChainHeight <= origin || d.syncStatsChainOrigin > origin {
d.syncStatsChainOrigin = origin
}
d.syncStatsChainHeight = height
d.syncStatsChainHeight = remoteHeight
d.syncStatsLock.Unlock()

// Ensure our origin point is below any snap sync pivot point
if mode == SnapSync {
if height <= uint64(fsMinFullBlocks) {
if remoteHeight <= uint64(fsMinFullBlocks) {
origin = 0
} else {
pivotNumber := pivot.Number.Uint64()
Expand Down Expand Up @@ -551,8 +573,8 @@ func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td *big.I
// could cause issues.
if d.checkpoint != 0 && d.checkpoint > fullMaxForkAncestry+1 {
d.ancientLimit = d.checkpoint
} else if height > fullMaxForkAncestry+1 {
d.ancientLimit = height - fullMaxForkAncestry - 1
} else if remoteHeight > fullMaxForkAncestry+1 {
d.ancientLimit = remoteHeight - fullMaxForkAncestry - 1
} else {
d.ancientLimit = 0
}
Expand All @@ -576,12 +598,12 @@ func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td *big.I
// Initiate the sync using a concurrent header and content retrieval algorithm
d.queue.Prepare(origin+1, mode)
if d.syncInitHook != nil {
d.syncInitHook(origin, height)
d.syncInitHook(origin, remoteHeight)
}
fetchers := []func() error{
func() error { return d.fetchHeaders(p, origin+1, latest.Number.Uint64()) }, // Headers are always retrieved
func() error { return d.fetchBodies(origin + 1) }, // Bodies are retrieved during normal and snap sync
func() error { return d.fetchReceipts(origin + 1) }, // Receipts are retrieved during snap sync
func() error { return d.fetchHeaders(p, origin+1, remoteHeader.Number.Uint64()) }, // Headers are always retrieved
func() error { return d.fetchBodies(origin + 1) }, // Bodies are retrieved during normal and snap sync
func() error { return d.fetchReceipts(origin + 1) }, // Receipts are retrieved during snap sync
func() error { return d.processHeaders(origin+1, td) },
}
if mode == SnapSync {
Expand Down Expand Up @@ -766,22 +788,14 @@ func calculateRequestSpan(remoteHeight, localHeight uint64) (int64, int, int, ui
// on the correct chain, checking the top N links should already get us a match.
// In the rare scenario when we ended up on a long reorganisation (i.e. none of
// the head links match), we do a binary search to find the common ancestor.
func (d *Downloader) findAncestor(p *peerConnection, remoteHeader *types.Header) (uint64, error) {
func (d *Downloader) findAncestor(p *peerConnection, localHeight uint64, remoteHeader *types.Header) (uint64, error) {
// Figure out the valid ancestor range to prevent rewrite attacks
var (
floor = int64(-1)
localHeight uint64
remoteHeight = remoteHeader.Number.Uint64()
)
mode := d.getMode()
switch mode {
case FullSync:
localHeight = d.blockchain.CurrentBlock().NumberU64()
case SnapSync:
localHeight = d.blockchain.CurrentFastBlock().NumberU64()
default:
localHeight = d.lightchain.CurrentHeader().Number.Uint64()
}

p.log.Debug("Looking for common ancestor", "local", localHeight, "remote", remoteHeight)

// Recap floor value for binary search
Expand Down
28 changes: 24 additions & 4 deletions eth/downloader/downloader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,9 @@ type downloadTesterPeer struct {
withholdHeaders map[common.Hash]struct{}
}

func (dlp *downloadTesterPeer) MarkLagging() {
}

// Head constructs a function to retrieve a peer's current head hash
// and total difficulty.
func (dlp *downloadTesterPeer) Head() (common.Hash, *big.Int) {
Expand Down Expand Up @@ -926,8 +929,8 @@ func testHighTDStarvationAttack(t *testing.T, protocol uint, mode SyncMode) {

chain := testChainBase.shorten(1)
tester.newPeer("attack", protocol, chain.blocks[1:])
if err := tester.sync("attack", big.NewInt(1000000), mode); err != errStallingPeer {
t.Fatalf("synchronisation error mismatch: have %v, want %v", err, errStallingPeer)
if err := tester.sync("attack", big.NewInt(1000000), mode); err != errLaggingPeer {
t.Fatalf("synchronisation error mismatch: have %v, want %v", err, errLaggingPeer)
}
}

Expand Down Expand Up @@ -1235,9 +1238,26 @@ func testFakedSyncProgress(t *testing.T, protocol uint, mode SyncMode) {
pending.Wait()
afterFailedSync := tester.downloader.Progress()

// Synchronise with a good peer and check that the progress height has been reduced to
// it is no longer valid to sync to a lagging peer
laggingChain := chain.shorten(800 / 2)
tester.newPeer("lagging", protocol, laggingChain.blocks[1:])
pending.Add(1)
go func() {
defer pending.Done()
if err := tester.sync("lagging", nil, mode); err != errLaggingPeer {
panic(fmt.Sprintf("unexpected lagging synchronisation err:%v", err))
}
}()
// lagging peer will return before syncInitHook, skip <-starting and progress <- struct{}{}
checkProgress(t, tester.downloader, "lagging", ethereum.SyncProgress{
CurrentBlock: afterFailedSync.CurrentBlock,
HighestBlock: uint64(len(chain.blocks) - 1),
})
pending.Wait()

// Synchronise with a good peer and check that the progress height has been increased to
// the true value.
validChain := chain.shorten(len(chain.blocks) - numMissing)
validChain := chain.shorten(len(chain.blocks))
tester.newPeer("valid", protocol, validChain.blocks[1:])
pending.Add(1)

Expand Down
2 changes: 2 additions & 0 deletions eth/downloader/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ type peerConnection struct {
// LightPeer encapsulates the methods required to synchronise with a remote light peer.
type LightPeer interface {
Head() (common.Hash, *big.Int)
MarkLagging()
RequestHeadersByHash(common.Hash, int, int, bool, chan *eth.Response) (*eth.Request, error)
RequestHeadersByNumber(uint64, int, int, bool, chan *eth.Response) (*eth.Request, error)
}
Expand All @@ -75,6 +76,7 @@ type lightPeerWrapper struct {
}

func (w *lightPeerWrapper) Head() (common.Hash, *big.Int) { return w.peer.Head() }
func (w *lightPeerWrapper) MarkLagging() { w.peer.MarkLagging() }
func (w *lightPeerWrapper) RequestHeadersByHash(h common.Hash, amount int, skip int, reverse bool, sink chan *eth.Response) (*eth.Request, error) {
return w.peer.RequestHeadersByHash(h, amount, skip, reverse, sink)
}
Expand Down
3 changes: 3 additions & 0 deletions eth/peerset.go
Original file line number Diff line number Diff line change
Expand Up @@ -478,6 +478,9 @@ func (ps *peerSet) peerWithHighestTD() *eth.Peer {
bestTd *big.Int
)
for _, p := range ps.peers {
if p.Lagging() {
continue
}
if _, td := p.Head(); bestPeer == nil || td.Cmp(bestTd) > 0 {
bestPeer, bestTd = p.Peer, td
}
Expand Down
15 changes: 12 additions & 3 deletions eth/protocols/eth/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,9 @@ type Peer struct {
version uint // Protocol version negotiated
statusExtension *UpgradeStatusExtension

head common.Hash // Latest advertised head block hash
td *big.Int // Latest advertised head block total difficulty
lagging bool // lagging peer is still connected, but won't be used to sync.
head common.Hash // Latest advertised head block hash
td *big.Int // Latest advertised head block total difficulty

knownBlocks *knownCache // Set of block hashes known to be known by this peer
queuedBlocks chan *blockPropagation // Queue of blocks to broadcast to the peer
Expand Down Expand Up @@ -153,6 +154,14 @@ func (p *Peer) Version() uint {
return p.version
}

func (p *Peer) Lagging() bool {
return p.lagging
}

func (p *Peer) MarkLagging() {
p.lagging = true
}

// Head retrieves the current head hash and total difficulty of the peer.
func (p *Peer) Head() (hash common.Hash, td *big.Int) {
p.lock.RLock()
Expand All @@ -166,7 +175,7 @@ func (p *Peer) Head() (hash common.Hash, td *big.Int) {
func (p *Peer) SetHead(hash common.Hash, td *big.Int) {
p.lock.Lock()
defer p.lock.Unlock()

p.lagging = false
copy(p.head[:], hash[:])
p.td.Set(td)
}
Expand Down