-
Notifications
You must be signed in to change notification settings - Fork 263
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
perf: Improve IAVL Import speed for state sync by 2x~3x #664
Changes from 13 commits
a38d7a5
ebf75d3
6cb6cab
1827f77
30468ce
9d2601c
fac3f37
cc6268f
3704703
48e454a
c8b1f81
994aba4
7b76765
0e163d2
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -4,12 +4,17 @@ import ( | |
"bytes" | ||
"errors" | ||
"fmt" | ||
|
||
db "github.com/cosmos/cosmos-db" | ||
"sync" | ||
) | ||
|
||
// maxBatchSize is the maximum size of the import batch before flushing it to the database | ||
const maxBatchSize = 10000 | ||
// desiredBatchSize is the desired batch write size of the import batch before flushing it to the database. | ||
// The actual batch write size could exceed this value when the previous batch is still flushing. | ||
const defaultDesiredBatchSize = 20000 | ||
|
||
// If there's an ongoing pending batch write, we will keep batching more writes | ||
// until the ongoing batch write completes or we reach maxBatchSize | ||
const defaultMaxBatchSize = 500000 | ||
|
||
// ErrNoImport is returned when calling methods on a closed importer | ||
var ErrNoImport = errors.New("no import in progress") | ||
|
@@ -22,15 +27,47 @@ var ErrNoImport = errors.New("no import in progress") | |
// Importer is not concurrency-safe, it is the caller's responsibility to ensure the tree is not | ||
// modified while performing an import. | ||
type Importer struct { | ||
tree *MutableTree | ||
version int64 | ||
batch db.Batch | ||
batchSize uint32 | ||
stack []*Node | ||
nonces []uint32 | ||
tree *MutableTree | ||
version int64 | ||
batch db.Batch | ||
batchSize uint32 | ||
batchMtx sync.RWMutex | ||
stack []*Node | ||
nonces []uint32 | ||
chanConfig ChannelConfig | ||
} | ||
|
||
type ChannelConfig struct { | ||
desiredBatchSize uint32 | ||
maxBatchSize uint32 | ||
chNodeData chan NodeData | ||
chNodeDataWg sync.WaitGroup | ||
chBatch chan db.Batch | ||
chBatchWg sync.WaitGroup | ||
chError chan error | ||
allChannelClosed bool | ||
} | ||
|
||
func DefaultChannelConfig() ChannelConfig { | ||
return ChannelConfig{ | ||
desiredBatchSize: defaultDesiredBatchSize, | ||
maxBatchSize: defaultMaxBatchSize, | ||
chNodeData: make(chan NodeData, 2*defaultDesiredBatchSize), | ||
chNodeDataWg: sync.WaitGroup{}, | ||
chBatch: make(chan db.Batch, 1), | ||
chBatchWg: sync.WaitGroup{}, | ||
chError: make(chan error, 1), | ||
allChannelClosed: false, | ||
} | ||
} | ||
|
||
type NodeData struct { | ||
node *Node | ||
data []byte | ||
} | ||
|
||
// newImporter creates a new Importer for an empty MutableTree. | ||
// Underneath it spawns three goroutines to process the data import flow. | ||
// | ||
// version should correspond to the version that was initially exported. It must be greater than | ||
// or equal to the highest ExportNode version number given. | ||
|
@@ -45,13 +82,70 @@ func newImporter(tree *MutableTree, version int64) (*Importer, error) { | |
return nil, errors.New("tree must be empty") | ||
} | ||
|
||
return &Importer{ | ||
tree: tree, | ||
version: version, | ||
batch: tree.ndb.db.NewBatch(), | ||
stack: make([]*Node, 0, 8), | ||
nonces: make([]uint32, version+1), | ||
}, nil | ||
importer := &Importer{ | ||
tree: tree, | ||
version: version, | ||
batch: tree.ndb.db.NewBatch(), | ||
stack: make([]*Node, 0, 8), | ||
nonces: make([]uint32, version+1), | ||
batchMtx: sync.RWMutex{}, | ||
chanConfig: DefaultChannelConfig(), | ||
} | ||
|
||
importer.chanConfig.chNodeDataWg.Add(1) | ||
go setBatchData(importer) | ||
|
||
importer.chanConfig.chBatchWg.Add(1) | ||
go batchWrite(importer) | ||
|
||
return importer, nil | ||
} | ||
|
||
// setBatchData get the next serialized node data from channel, and write the data to the current batch | ||
func setBatchData(i *Importer) { | ||
for i.batch != nil { | ||
if nodeData, open := <-i.chanConfig.chNodeData; open { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You can reduce nesting and ease reading by inverting this condition per nodeData, open := ...
if !open {
break
}
i.batchMtx.Lock()
... |
||
i.batchMtx.RLock() | ||
if i.batch != nil { | ||
err := i.batch.Set(i.tree.ndb.nodeKey(nodeData.node.GetKey()), nodeData.data) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is a mutating operation being invoked under a read lock. One do the reasons why it is important to just use a plain mutex, and also please see https://cyber.orijtech.com/scsec/cosmos-go-coding-guide#avoid-syncrwmutex |
||
if err != nil { | ||
i.batchMtx.RUnlock() | ||
i.chanConfig.chError <- err | ||
break | ||
} | ||
} | ||
i.batchSize++ | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Another mutation while a read lock is being held |
||
i.batchMtx.RUnlock() | ||
// Only commit a new batch if size meet desiredBatchSize and there's no pending batch write or we exceed maxBatchSize | ||
if (i.batchSize >= i.chanConfig.desiredBatchSize && len(i.chanConfig.chBatch) < 1) || i.batchSize >= i.chanConfig.maxBatchSize { | ||
i.chanConfig.chBatch <- i.batch | ||
i.batch = i.tree.ndb.db.NewBatch() | ||
i.batchSize = 0 | ||
} | ||
} else { | ||
break | ||
} | ||
} | ||
i.chanConfig.chNodeDataWg.Done() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please make this a defer and move it to the top too. |
||
} | ||
|
||
// batchWrite get a new batch from the channel and execute the batch write to the underlying DB. | ||
func batchWrite(i *Importer) { | ||
for i.batch != nil { | ||
if nextBatch, open := <-i.chanConfig.chBatch; open { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please invert this condition and break early for !open instead of that else after many lines of code. |
||
err := nextBatch.Write() | ||
if err != nil { | ||
i.chanConfig.chError <- err | ||
break | ||
} | ||
i.batchMtx.Lock() | ||
nextBatch.Close() | ||
i.batchMtx.Unlock() | ||
} else { | ||
break | ||
} | ||
} | ||
i.chanConfig.chBatchWg.Done() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please use defer |
||
} | ||
|
||
// writeNode writes the node content to the storage. | ||
|
@@ -65,27 +159,18 @@ func (i *Importer) writeNode(node *Node) error { | |
|
||
buf := bufPool.Get().(*bytes.Buffer) | ||
buf.Reset() | ||
defer bufPool.Put(buf) | ||
|
||
if err := node.writeBytes(buf); err != nil { | ||
return err | ||
} | ||
|
||
bytesCopy := make([]byte, buf.Len()) | ||
copy(bytesCopy, buf.Bytes()) | ||
bufPool.Put(buf) | ||
|
||
if err := i.batch.Set(i.tree.ndb.nodeKey(node.GetKey()), bytesCopy); err != nil { | ||
return err | ||
} | ||
|
||
i.batchSize++ | ||
if i.batchSize >= maxBatchSize { | ||
if err := i.batch.Write(); err != nil { | ||
return err | ||
} | ||
i.batch.Close() | ||
i.batch = i.tree.ndb.db.NewBatch() | ||
i.batchSize = 0 | ||
// Handle the remaining steps in a separate goroutine | ||
i.chanConfig.chNodeData <- NodeData{ | ||
node: node, | ||
data: bytesCopy, | ||
} | ||
|
||
return nil | ||
|
@@ -94,8 +179,9 @@ func (i *Importer) writeNode(node *Node) error { | |
// Close frees all resources. It is safe to call multiple times. Uncommitted nodes may already have | ||
// been flushed to the database, but will not be visible. | ||
func (i *Importer) Close() { | ||
_ = i.waitAndCloseChannels() | ||
if i.batch != nil { | ||
i.batch.Close() | ||
_ = i.batch.Close() | ||
} | ||
i.batch = nil | ||
i.tree = nil | ||
|
@@ -149,6 +235,14 @@ func (i *Importer) Add(exportNode *ExportNode) error { | |
if err := i.writeNode(rightNode); err != nil { | ||
return err | ||
} | ||
|
||
// Check errors from channel | ||
select { | ||
case err := <-i.chanConfig.chError: | ||
return err | ||
default: | ||
} | ||
|
||
i.stack = i.stack[:stackSize-2] | ||
|
||
// remove the recursive references to avoid memory leak | ||
|
@@ -177,6 +271,11 @@ func (i *Importer) Commit() error { | |
return ErrNoImport | ||
} | ||
|
||
err := i.waitAndCloseChannels() | ||
if err != nil { | ||
return err | ||
} | ||
|
||
switch len(i.stack) { | ||
case 0: | ||
if err := i.batch.Set(i.tree.ndb.nodeKey(GetRootKey(i.version)), []byte{}); err != nil { | ||
|
@@ -197,7 +296,7 @@ func (i *Importer) Commit() error { | |
len(i.stack)) | ||
} | ||
|
||
err := i.batch.WriteSync() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You can transform this into: if err := ...; err != nil |
||
err = i.batch.WriteSync() | ||
if err != nil { | ||
return err | ||
} | ||
|
@@ -211,3 +310,24 @@ func (i *Importer) Commit() error { | |
i.Close() | ||
return nil | ||
} | ||
|
||
// waitAndCloseChannels will try to close all the channels for importer and wait for remaining work to be done. | ||
// This function should only be called in the Commit or Close action. If any error happens when draining the remaining data in the channel, | ||
// The error will be popped out and returned. | ||
func (i *Importer) waitAndCloseChannels() error { | ||
// Make sure all pending works are drained and close the channels in order | ||
if !i.chanConfig.allChannelClosed { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please invert then return early if i.chanConfig.allChannelsClosed { // Already closed
return nil
} |
||
i.chanConfig.allChannelClosed = true | ||
close(i.chanConfig.chNodeData) | ||
i.chanConfig.chNodeDataWg.Wait() | ||
close(i.chanConfig.chBatch) | ||
i.chanConfig.chBatchWg.Wait() | ||
// Check errors | ||
select { | ||
case err := <-i.chanConfig.chError: | ||
return err | ||
default: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Did you intend to add this default case? I ask because if an error wasn't already in the channel under multiplexing, this select default clause immediately executes and doesn't wait for anything on the channel. |
||
} | ||
} | ||
return nil | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If this function is not intended to be run from outside this package, unexport it.