Skip to content

Commit

Permalink
fix useful difflayer item in cache been prune issue
Browse files Browse the repository at this point in the history
add logs

fix useful difflayer item in cache been prune issue

fix too many open files
  • Loading branch information
unclezoro committed Nov 8, 2021
1 parent 176407a commit 0aec16e
Show file tree
Hide file tree
Showing 11 changed files with 93 additions and 67 deletions.
2 changes: 1 addition & 1 deletion cmd/utils/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -445,7 +445,7 @@ var (
DiffBlockFlag = cli.Uint64Flag{
Name: "diffblock",
Usage: "The number of blocks should be persisted in db (default = 864000 )",
Value: uint64(864000),
Value: uint64(86400),
}
// Miner settings
MiningEnabledFlag = cli.BoolFlag{
Expand Down
15 changes: 15 additions & 0 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ const (
bodyCacheLimit = 256
blockCacheLimit = 256
diffLayerCacheLimit = 1024
diffLayerPruneRatio = 90 // 90%
oneHundredPercent = 100
diffLayerRLPCacheLimit = 256
receiptsCacheLimit = 10000
txLookupCacheLimit = 1024
Expand Down Expand Up @@ -478,6 +480,9 @@ func (bc *BlockChain) cacheReceipts(hash common.Hash, receipts types.Receipts) {
}

func (bc *BlockChain) cacheDiffLayer(diffLayer *types.DiffLayer) {
if bc.diffLayerCache.Len() >= diffLayerCacheLimit {
bc.diffLayerCache.RemoveOldest()
}
bc.diffLayerCache.Add(diffLayer.BlockHash, diffLayer)
if bc.db.DiffStore() != nil {
// push to priority queue before persisting
Expand Down Expand Up @@ -2051,6 +2056,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, er

//Process block using the parent state as reference point
substart := time.Now()
execStart := time.Now()
statedb, receipts, logs, usedGas, err := bc.processor.Process(block, statedb, bc.vmConfig)
activeState = statedb
if err != nil {
Expand All @@ -2076,6 +2082,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, er
return it.index, err
}
}

bc.cacheReceipts(block.Hash(), receipts)
bc.cacheBlock(block.Hash(), block)
proctime := time.Since(start)
Expand All @@ -2086,6 +2093,12 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, er

blockValidationTimer.Update(time.Since(substart))

if statedb.IsLightProcessed() {
log.Info("===debug light process and verify", "number", block.Number(), "time", time.Since(execStart))
} else {
log.Info("===debug full process and verify", "number", block.Number(), "time", time.Since(execStart))
}

// Write the block to the chain and get the status.
substart = time.Now()
status, err := bc.writeBlockWithState(block, receipts, logs, statedb, false)
Expand All @@ -2099,6 +2112,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, er

blockWriteTimer.Update(time.Since(substart))
blockInsertTimer.UpdateSince(start)
log.Info("===debug write block state", "number", block.Number(), "time", time.Since(substart).String())

switch status {
case CanonStatTy:
Expand Down Expand Up @@ -2756,6 +2770,7 @@ func (bc *BlockChain) HandleDiffLayer(diffLayer *types.DiffLayer, pid string, fu
}
bc.blockHashToDiffLayers[diffLayer.BlockHash][diffLayer.DiffHash] = diffLayer
bc.diffHashToBlockHash[diffLayer.DiffHash] = diffLayer.BlockHash
log.Info("===debug process and verify, handle difflayer", "num", diffLayer.Number, "time", time.Now().String())

return nil
}
Expand Down
56 changes: 27 additions & 29 deletions core/blockchain_insert.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,36 +46,34 @@ func (st *insertStats) report(chain []*types.Block, index int, dirty common.Stor
elapsed = now.Sub(st.startTime)
)
// If we're at the last block of the batch or report period reached, log
if index == len(chain)-1 || elapsed >= statsReportLimit {
// Count the number of transactions in this segment
var txs int
for _, block := range chain[st.lastIndex : index+1] {
txs += len(block.Transactions())
}
end := chain[index]

// Assemble the log context and send it to the logger
context := []interface{}{
"blocks", st.processed, "txs", txs, "mgas", float64(st.usedGas) / 1000000,
"elapsed", common.PrettyDuration(elapsed), "mgasps", float64(st.usedGas) * 1000 / float64(elapsed),
"number", end.Number(), "hash", end.Hash(),
}
if timestamp := time.Unix(int64(end.Time()), 0); time.Since(timestamp) > time.Minute {
context = append(context, []interface{}{"age", common.PrettyAge(timestamp)}...)
}
context = append(context, []interface{}{"dirty", dirty}...)

if st.queued > 0 {
context = append(context, []interface{}{"queued", st.queued}...)
}
if st.ignored > 0 {
context = append(context, []interface{}{"ignored", st.ignored}...)
}
log.Info("Imported new chain segment", context...)

// Bump the stats reported to the next section
*st = insertStats{startTime: now, lastIndex: index + 1}
// Count the number of transactions in this segment
var txs int
for _, block := range chain[st.lastIndex : index+1] {
txs += len(block.Transactions())
}
end := chain[index]

// Assemble the log context and send it to the logger
context := []interface{}{
"blocks", st.processed, "txs", txs, "mgas", float64(st.usedGas) / 1000000,
"elapsed", common.PrettyDuration(elapsed), "mgasps", float64(st.usedGas) * 1000 / float64(elapsed),
"number", end.Number(), "hash", end.Hash(),
}
if timestamp := time.Unix(int64(end.Time()), 0); time.Since(timestamp) > time.Minute {
context = append(context, []interface{}{"age", common.PrettyAge(timestamp)}...)
}
context = append(context, []interface{}{"dirty", dirty}...)

if st.queued > 0 {
context = append(context, []interface{}{"queued", st.queued}...)
}
if st.ignored > 0 {
context = append(context, []interface{}{"ignored", st.ignored}...)
}
log.Info("===debug Imported new chain segment", context...)

// Bump the stats reported to the next section
*st = insertStats{startTime: now, lastIndex: index + 1}
}

// insertIterator is a helper to assist during chain import.
Expand Down
19 changes: 15 additions & 4 deletions core/state_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ import (

const (
fullProcessCheck = 21 // On diff sync mode, will do full process every fullProcessCheck randomly
recentTime = 2048 * 3
recentDiffLayerTimeout = 20
recentTime = 1024 * 3
recentDiffLayerTimeout = 5
farDiffLayerTimeout = 2
)

Expand Down Expand Up @@ -86,6 +86,7 @@ func (p *LightStateProcessor) Process(block *types.Block, statedb *state.StateDB
allowLightProcess = posa.AllowLightProcess(p.bc, block.Header())
}
// random fallback to full process
start := time.Now()
if check := p.randomGenerator.Int63n(fullProcessCheck); allowLightProcess && check != 0 && len(block.Transactions()) != 0 {
var pid string
if peer, ok := block.ReceivedFrom.(PeerIDer); ok {
Expand All @@ -104,6 +105,10 @@ func (p *LightStateProcessor) Process(block *types.Block, statedb *state.StateDB
}
time.Sleep(time.Millisecond)
}
if diffLayer == nil {
log.Info("===debug failed to find difflayer", "number", block.NumberU64())

}
if diffLayer != nil {
if err := diffLayer.Receipts.DeriveFields(p.bc.chainConfig, block.Hash(), block.NumberU64(), block.Transactions()); err != nil {
log.Error("Failed to derive block receipts fields", "hash", block.Hash(), "number", block.NumberU64(), "err", err)
Expand All @@ -113,7 +118,9 @@ func (p *LightStateProcessor) Process(block *types.Block, statedb *state.StateDB

receipts, logs, gasUsed, err := p.LightProcess(diffLayer, block, statedb)
if err == nil {
log.Info("do light process success at block", "num", block.NumberU64())
end := time.Now()
elaps := end.Sub(start)
log.Info("===debug do light process success at block", "num", block.NumberU64(), "elaps", elaps.String())
return statedb, receipts, logs, gasUsed, nil
}
log.Error("do light process err at block", "num", block.NumberU64(), "err", err)
Expand All @@ -130,7 +137,11 @@ func (p *LightStateProcessor) Process(block *types.Block, statedb *state.StateDB
}
}
// fallback to full process
return p.StateProcessor.Process(block, statedb, cfg)
statedb, receipts, logs, gasUsed, err := p.StateProcessor.Process(block, statedb, cfg)
end := time.Now()
elaps := end.Sub(start)
log.Info("===debug do full process success at block", "num", block.NumberU64(), "elaps", elaps.String())
return statedb, receipts, logs, gasUsed, err
}

func (p *LightStateProcessor) LightProcess(diffLayer *types.DiffLayer, block *types.Block, statedb *state.StateDB) (types.Receipts, []*types.Log, uint64, error) {
Expand Down
1 change: 1 addition & 0 deletions eth/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) {
chainDb, err := stack.OpenAndMergeDatabase("chaindata", config.DatabaseCache, config.DatabaseHandles,
config.DatabaseFreezer, config.DatabaseDiff, "eth/db/chaindata/", false, config.PersistDiff)
if err != nil {
panic(err)
return nil, err
}
chainConfig, genesisHash, genesisErr := core.SetupGenesisBlockWithOverride(chainDb, config.Genesis, config.OverrideBerlin)
Expand Down
43 changes: 18 additions & 25 deletions eth/downloader/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,10 +161,10 @@ type Downloader struct {
quitLock sync.Mutex // Lock to prevent double closes

// Testing hooks
syncInitHook func(uint64, uint64) // Method to call upon initiating a new sync run
bodyFetchHook func([]*types.Header, ...interface{}) // Method to call upon starting a block body fetch
receiptFetchHook func([]*types.Header, ...interface{}) // Method to call upon starting a receipt fetch
chainInsertHook func([]*fetchResult) // Method to call upon inserting a chain of blocks (possibly in multiple invocations)
syncInitHook func(uint64, uint64) // Method to call upon initiating a new sync run
bodyFetchHook func([]*types.Header) // Method to call upon starting a block body fetch
receiptFetchHook func([]*types.Header) // Method to call upon starting a receipt fetch
chainInsertHook func([]*fetchResult) // Method to call upon inserting a chain of blocks (possibly in multiple invocations)
}

// LightChain encapsulates functions required to synchronise a light chain.
Expand Down Expand Up @@ -232,27 +232,20 @@ type IPeerSet interface {

func EnableDiffFetchOp(peers IPeerSet) DownloadOption {
return func(dl *Downloader) *Downloader {
var hook = func(headers []*types.Header, args ...interface{}) {
if len(args) < 2 {
return
}
peerID, ok := args[1].(string)
if !ok {
return
}
mode, ok := args[0].(SyncMode)
if !ok {
return
}
if ep := peers.GetDiffPeer(peerID); mode == FullSync && ep != nil {
hashes := make([]common.Hash, 0, len(headers))
for _, header := range headers {
hashes = append(hashes, header.Hash())
}
ep.RequestDiffLayers(hashes)
var hook = func(results []*fetchResult) {
if dl.getMode() == FullSync {
go func() {
for _, r := range results {
if ep := peers.GetDiffPeer(r.pid); ep != nil {
// It turns out a diff layer is 5x larger than block, we just request one diffLayer each time
ep.RequestDiffLayers([]common.Hash{r.Header.Hash()})
log.Info("===debug process and verify, send difflayer pull", "num", r.Header.Number, "time", time.Now().String())
}
}
}()
}
}
dl.bodyFetchHook = hook
dl.chainInsertHook = hook
return dl
}
}
Expand Down Expand Up @@ -1405,7 +1398,7 @@ func (d *Downloader) fetchReceipts(from uint64) error {
// - kind: textual label of the type being downloaded to display in log messages
func (d *Downloader) fetchParts(deliveryCh chan dataPack, deliver func(dataPack) (int, error), wakeCh chan bool,
expire func() map[string]int, pending func() int, inFlight func() bool, reserve func(*peerConnection, int) (*fetchRequest, bool, bool),
fetchHook func([]*types.Header, ...interface{}), fetch func(*peerConnection, *fetchRequest) error, cancel func(*fetchRequest), capacity func(*peerConnection) int,
fetchHook func([]*types.Header), fetch func(*peerConnection, *fetchRequest) error, cancel func(*fetchRequest), capacity func(*peerConnection) int,
idle func() ([]*peerConnection, int), setIdle func(*peerConnection, int, time.Time), kind string) error {

// Create a ticker to detect expired retrieval tasks
Expand Down Expand Up @@ -1554,7 +1547,7 @@ func (d *Downloader) fetchParts(deliveryCh chan dataPack, deliver func(dataPack)
}
// Fetch the chunk and make sure any errors return the hashes to the queue
if fetchHook != nil {
fetchHook(request.Headers, d.getMode(), peer.id)
fetchHook(request.Headers)
}
if err := fetch(peer, request); err != nil {
// Although we could try and make an attempt to fix this, this error really
Expand Down
6 changes: 4 additions & 2 deletions eth/downloader/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,16 +63,18 @@ type fetchRequest struct {
// all outstanding pieces complete and the result as a whole can be processed.
type fetchResult struct {
pending int32 // Flag telling what deliveries are outstanding
pid string

Header *types.Header
Uncles []*types.Header
Transactions types.Transactions
Receipts types.Receipts
}

func newFetchResult(header *types.Header, fastSync bool) *fetchResult {
func newFetchResult(header *types.Header, fastSync bool, pid string) *fetchResult {
item := &fetchResult{
Header: header,
pid: pid,
}
if !header.EmptyBody() {
item.pending |= (1 << bodyType)
Expand Down Expand Up @@ -503,7 +505,7 @@ func (q *queue) reserveHeaders(p *peerConnection, count int, taskPool map[common
// we can ask the resultcache if this header is within the
// "prioritized" segment of blocks. If it is not, we need to throttle

stale, throttle, item, err := q.resultCache.AddFetch(header, q.mode == FastSync)
stale, throttle, item, err := q.resultCache.AddFetch(header, q.mode == FastSync, p.id)
if stale {
// Don't put back in the task queue, this item has already been
// delivered upstream
Expand Down
4 changes: 2 additions & 2 deletions eth/downloader/resultstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func (r *resultStore) SetThrottleThreshold(threshold uint64) uint64 {
// throttled - if true, the store is at capacity, this particular header is not prio now
// item - the result to store data into
// err - any error that occurred
func (r *resultStore) AddFetch(header *types.Header, fastSync bool) (stale, throttled bool, item *fetchResult, err error) {
func (r *resultStore) AddFetch(header *types.Header, fastSync bool, pid string) (stale, throttled bool, item *fetchResult, err error) {
r.lock.Lock()
defer r.lock.Unlock()

Expand All @@ -85,7 +85,7 @@ func (r *resultStore) AddFetch(header *types.Header, fastSync bool) (stale, thro
return stale, throttled, item, err
}
if item == nil {
item = newFetchResult(header, fastSync)
item = newFetchResult(header, fastSync, pid)
r.items[index] = item
}
return stale, throttled, item, err
Expand Down
2 changes: 1 addition & 1 deletion eth/ethconfig/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ var Defaults = Config{
TrieTimeout: 60 * time.Minute,
TriesInMemory: 128,
SnapshotCache: 102,
DiffBlock: uint64(864000),
DiffBlock: uint64(86400),
Miner: miner.Config{
GasFloor: 8000000,
GasCeil: 8000000,
Expand Down
2 changes: 1 addition & 1 deletion eth/protocols/diff/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ const (
softResponseLimit = 2 * 1024 * 1024

// maxDiffLayerServe is the maximum number of diff layers to serve.
maxDiffLayerServe = 1024
maxDiffLayerServe = 128
)

var requestTracker = NewTracker(time.Minute)
Expand Down
10 changes: 8 additions & 2 deletions node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ const (
closedState
)

const chainDataHandlesPercentage = 80

// New creates a new P2P node, ready for protocol registration.
func New(conf *Config) (*Node, error) {
// Copy config and resolve the datadir so future changes to the current
Expand Down Expand Up @@ -580,12 +582,16 @@ func (n *Node) OpenDatabase(name string, cache, handles int, namespace string, r
}

func (n *Node) OpenAndMergeDatabase(name string, cache, handles int, freezer, diff, namespace string, readonly, persistDiff bool) (ethdb.Database, error) {
chainDB, err := n.OpenDatabaseWithFreezer(name, cache, handles, freezer, namespace, readonly)
chainDataHandles := handles
if persistDiff {
chainDataHandles = handles * chainDataHandlesPercentage / 100
}
chainDB, err := n.OpenDatabaseWithFreezer(name, cache, chainDataHandles, freezer, namespace, readonly)
if err != nil {
return nil, err
}
if persistDiff {
diffStore, err := n.OpenDiffDatabase(name, handles, diff, namespace, readonly)
diffStore, err := n.OpenDiffDatabase(name, handles-chainDataHandles, diff, namespace, readonly)
if err != nil {
chainDB.Close()
return nil, err
Expand Down

0 comments on commit 0aec16e

Please sign in to comment.