diff --git a/eth/fetcher/block_fetcher.go b/eth/fetcher/block_fetcher.go index fc75dd4310..47f649082c 100644 --- a/eth/fetcher/block_fetcher.go +++ b/eth/fetcher/block_fetcher.go @@ -32,10 +32,12 @@ import ( ) const ( - lightTimeout = time.Millisecond // Time allowance before an announced header is explicitly requested - arriveTimeout = 500 * time.Millisecond // Time allowance before an announced block/transaction is explicitly requested - gatherSlack = 100 * time.Millisecond // Interval used to collate almost-expired announces with fetches - fetchTimeout = 5 * time.Second // Maximum allotted time to return an explicitly requested block/transaction + lightTimeout = time.Millisecond // Time allowance before an announced header is explicitly requested + arriveTimeout = 500 * time.Millisecond // Time allowance before an announced block/transaction is explicitly requested + gatherSlack = 100 * time.Millisecond // Interval used to collate almost-expired announces with fetches + fetchTimeout = 5 * time.Second // Maximum allotted time to return an explicitly requested block/transaction + reQueueBlockTimeout = 500 * time.Millisecond // Time allowance before blocks are requeued for import + ) const ( @@ -166,6 +168,8 @@ type BlockFetcher struct { done chan common.Hash quit chan struct{} + requeue chan *blockOrHeaderInject + // Announce states announces map[string]int // Per peer blockAnnounce counts to prevent memory exhaustion announced map[common.Hash][]*blockAnnounce // Announced blocks, scheduled for fetching @@ -206,6 +210,7 @@ func NewBlockFetcher(light bool, getHeader HeaderRetrievalFn, getBlock blockRetr bodyFilter: make(chan chan *bodyFilterTask), done: make(chan common.Hash), quit: make(chan struct{}), + requeue: make(chan *blockOrHeaderInject), announces: make(map[string]int), announced: make(map[common.Hash][]*blockAnnounce), fetching: make(map[common.Hash]*blockAnnounce), @@ -370,9 +375,9 @@ func (f *BlockFetcher) loop() { continue } if f.light { - f.importHeaders(op.origin, op.header) + f.importHeaders(op) } else { - f.importBlocks(op.origin, op.block) + f.importBlocks(op) } } // Wait for an outside event to occur @@ -415,6 +420,21 @@ func (f *BlockFetcher) loop() { f.rescheduleFetch(fetchTimer) } + case op := <-f.requeue: + // Re-queue blocks that have not been written due to fork block competition + number := int64(0) + hash := "" + if op.header != nil { + number = op.header.Number.Int64() + hash = op.header.Hash().String() + } else if op.block != nil { + number = op.block.Number().Int64() + hash = op.block.Hash().String() + } + + log.Info("Re-queue blocks", "number", number, "hash", hash) + f.enqueue(op.origin, op.header, op.block) + case op := <-f.inject: // A direct block insertion was requested, try and fill any pending gaps blockBroadcastInMeter.Mark(1) @@ -750,7 +770,9 @@ func (f *BlockFetcher) enqueue(peer string, header *types.Header, block *types.B // importHeaders spawns a new goroutine to run a header insertion into the chain. // If the header's number is at the same height as the current import phase, it // updates the phase states accordingly. -func (f *BlockFetcher) importHeaders(peer string, header *types.Header) { +func (f *BlockFetcher) importHeaders(op *blockOrHeaderInject) { + peer := op.origin + header := op.header hash := header.Hash() log.Debug("Importing propagated header", "peer", peer, "number", header.Number, "hash", hash) @@ -760,6 +782,8 @@ func (f *BlockFetcher) importHeaders(peer string, header *types.Header) { parent := f.getHeader(header.ParentHash) if parent == nil { log.Debug("Unknown parent of propagated header", "peer", peer, "number", header.Number, "hash", hash, "parent", header.ParentHash) + time.Sleep(reQueueBlockTimeout) + f.requeue <- op return } // Validate the header and if something went wrong, drop the peer @@ -783,7 +807,9 @@ func (f *BlockFetcher) importHeaders(peer string, header *types.Header) { // importBlocks spawns a new goroutine to run a block insertion into the chain. If the // block's number is at the same height as the current import phase, it updates // the phase states accordingly. -func (f *BlockFetcher) importBlocks(peer string, block *types.Block) { +func (f *BlockFetcher) importBlocks(op *blockOrHeaderInject) { + peer := op.origin + block := op.block hash := block.Hash() // Run the import on a new thread @@ -795,6 +821,8 @@ func (f *BlockFetcher) importBlocks(peer string, block *types.Block) { parent := f.getBlock(block.ParentHash()) if parent == nil { log.Debug("Unknown parent of propagated block", "peer", peer, "number", block.Number(), "hash", hash, "parent", block.ParentHash()) + time.Sleep(reQueueBlockTimeout) + f.requeue <- op return } // Quickly validate the header and propagate the block if it passes