From 0b6284bc7742159658e335afe664a49c767b06e8 Mon Sep 17 00:00:00 2001 From: joeylichang Date: Mon, 20 Nov 2023 20:36:23 +0800 Subject: [PATCH] feat: concurrent merge difflayer into nodebuffer --- trie/triedb/pathdb/asyncnodebuffer.go | 85 ++++++++++++++++++--------- 1 file changed, 58 insertions(+), 27 deletions(-) diff --git a/trie/triedb/pathdb/asyncnodebuffer.go b/trie/triedb/pathdb/asyncnodebuffer.go index 9311eb105a..45cc906a34 100644 --- a/trie/triedb/pathdb/asyncnodebuffer.go +++ b/trie/triedb/pathdb/asyncnodebuffer.go @@ -236,43 +236,74 @@ func (nc *nodecache) commit(nodes map[common.Hash]map[string]*trienode.Node) err if atomic.LoadUint64(&nc.immutable) == 1 { return errWriteImmutable } - var ( + type subTree struct { + owner common.Hash + pathTree map[string]*trienode.Node delta int64 overwrite int64 overwriteSize int64 + } + + var ( + mx sync.RWMutex // local concurrent mux + wg sync.WaitGroup + mergeSubResCh = make(chan *subTree) ) + wg.Add(len(nodes)) for owner, subset := range nodes { - current, exist := nc.nodes[owner] - if !exist { - // Allocate a new map for the subset instead of claiming it directly - // from the passed map to avoid potential concurrent map read/write. - // The nodes belong to original diff layer are still accessible even - // after merging, thus the ownership of nodes map should still belong - // to original layer and any mutation on it should be prevented. - current = make(map[string]*trienode.Node) - for path, n := range subset { + o := owner + sub := subset + go func(account common.Hash, storage map[string]*trienode.Node) { + subRes := &subTree{owner: account, pathTree: nil} + mx.RLock() + current, exist := nc.nodes[account] + mx.RUnlock() + if !exist { + // Allocate a new map for the subset instead of claiming it directly + // from the passed map to avoid potential concurrent map read/write. + // The nodes belong to original diff layer are still accessible even + // after merging, thus the ownership of nodes map should still belong + // to original layer and any mutation on it should be prevented. + current = make(map[string]*trienode.Node) + for path, n := range storage { + current[path] = n + subRes.delta += int64(len(n.Blob) + len(path)) + } + subRes.pathTree = current + mergeSubResCh <- subRes + return + } + for path, n := range storage { + if orig, exist := current[path]; !exist { + subRes.delta += int64(len(n.Blob) + len(path)) + } else { + subRes.delta += int64(len(n.Blob) - len(orig.Blob)) + subRes.overwrite++ + subRes.overwriteSize += int64(len(orig.Blob) + len(path)) + } current[path] = n - delta += int64(len(n.Blob) + len(path)) } - nc.nodes[owner] = current - continue - } - for path, n := range subset { - if orig, exist := current[path]; !exist { - delta += int64(len(n.Blob) + len(path)) - } else { - delta += int64(len(n.Blob) - len(orig.Blob)) - overwrite++ - overwriteSize += int64(len(orig.Blob) + len(path)) + mergeSubResCh <- subRes + return + }(o, sub) + } + go func() { + for res := range mergeSubResCh { + if res.pathTree != nil { + mx.Lock() + nc.nodes[res.owner] = res.pathTree + mx.Unlock() } - current[path] = n + nc.updateSize(res.delta) + gcNodesMeter.Mark(res.overwrite) + gcBytesMeter.Mark(res.overwriteSize) + wg.Done() } - nc.nodes[owner] = current - } - nc.updateSize(delta) + }() + wg.Wait() + close(mergeSubResCh) nc.layers++ - gcNodesMeter.Mark(overwrite) - gcBytesMeter.Mark(overwriteSize) + return nil }