Skip to content

Commit

Permalink
feat: concurrent merge difflayer into nodebuffer
Browse files Browse the repository at this point in the history
  • Loading branch information
joeylichang committed Nov 20, 2023
1 parent 86b81b2 commit 0b6284b
Showing 1 changed file with 58 additions and 27 deletions.
85 changes: 58 additions & 27 deletions trie/triedb/pathdb/asyncnodebuffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,43 +236,74 @@ func (nc *nodecache) commit(nodes map[common.Hash]map[string]*trienode.Node) err
if atomic.LoadUint64(&nc.immutable) == 1 {
return errWriteImmutable
}
var (
type subTree struct {
owner common.Hash
pathTree map[string]*trienode.Node
delta int64
overwrite int64
overwriteSize int64
}

var (
mx sync.RWMutex // local concurrent mux
wg sync.WaitGroup
mergeSubResCh = make(chan *subTree)
)
wg.Add(len(nodes))
for owner, subset := range nodes {
current, exist := nc.nodes[owner]
if !exist {
// Allocate a new map for the subset instead of claiming it directly
// from the passed map to avoid potential concurrent map read/write.
// The nodes belong to original diff layer are still accessible even
// after merging, thus the ownership of nodes map should still belong
// to original layer and any mutation on it should be prevented.
current = make(map[string]*trienode.Node)
for path, n := range subset {
o := owner
sub := subset
go func(account common.Hash, storage map[string]*trienode.Node) {
subRes := &subTree{owner: account, pathTree: nil}
mx.RLock()
current, exist := nc.nodes[account]
mx.RUnlock()
if !exist {
// Allocate a new map for the subset instead of claiming it directly
// from the passed map to avoid potential concurrent map read/write.
// The nodes belong to original diff layer are still accessible even
// after merging, thus the ownership of nodes map should still belong
// to original layer and any mutation on it should be prevented.
current = make(map[string]*trienode.Node)
for path, n := range storage {
current[path] = n
subRes.delta += int64(len(n.Blob) + len(path))
}
subRes.pathTree = current
mergeSubResCh <- subRes
return
}
for path, n := range storage {
if orig, exist := current[path]; !exist {
subRes.delta += int64(len(n.Blob) + len(path))
} else {
subRes.delta += int64(len(n.Blob) - len(orig.Blob))
subRes.overwrite++
subRes.overwriteSize += int64(len(orig.Blob) + len(path))
}
current[path] = n
delta += int64(len(n.Blob) + len(path))
}
nc.nodes[owner] = current
continue
}
for path, n := range subset {
if orig, exist := current[path]; !exist {
delta += int64(len(n.Blob) + len(path))
} else {
delta += int64(len(n.Blob) - len(orig.Blob))
overwrite++
overwriteSize += int64(len(orig.Blob) + len(path))
mergeSubResCh <- subRes
return
}(o, sub)
}
go func() {
for res := range mergeSubResCh {
if res.pathTree != nil {
mx.Lock()
nc.nodes[res.owner] = res.pathTree
mx.Unlock()
}
current[path] = n
nc.updateSize(res.delta)
gcNodesMeter.Mark(res.overwrite)
gcBytesMeter.Mark(res.overwriteSize)
wg.Done()
}
nc.nodes[owner] = current
}
nc.updateSize(delta)
}()
wg.Wait()
close(mergeSubResCh)
nc.layers++
gcNodesMeter.Mark(overwrite)
gcBytesMeter.Mark(overwriteSize)

return nil
}

Expand Down

0 comments on commit 0b6284b

Please sign in to comment.