Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: Improve IAVL Import speed for state sync by 2x~3x #664

Closed
wants to merge 14 commits into from
184 changes: 152 additions & 32 deletions import.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,17 @@ import (
"bytes"
"errors"
"fmt"

db "github.com/cosmos/cosmos-db"
"sync"
)

// maxBatchSize is the maximum size of the import batch before flushing it to the database
const maxBatchSize = 10000
// desiredBatchSize is the desired batch write size of the import batch before flushing it to the database.
// The actual batch write size could exceed this value when the previous batch is still flushing.
const defaultDesiredBatchSize = 20000

// If there's an ongoing pending batch write, we will keep batching more writes
// until the ongoing batch write completes or we reach maxBatchSize
const defaultMaxBatchSize = 500000

// ErrNoImport is returned when calling methods on a closed importer
var ErrNoImport = errors.New("no import in progress")
Expand All @@ -22,15 +27,47 @@ var ErrNoImport = errors.New("no import in progress")
// Importer is not concurrency-safe, it is the caller's responsibility to ensure the tree is not
// modified while performing an import.
type Importer struct {
tree *MutableTree
version int64
batch db.Batch
batchSize uint32
stack []*Node
nonces []uint32
tree *MutableTree
version int64
batch db.Batch
batchSize uint32
batchMtx sync.RWMutex
stack []*Node
nonces []uint32
chanConfig ChannelConfig
}

type ChannelConfig struct {
desiredBatchSize uint32
maxBatchSize uint32
chNodeData chan NodeData
chNodeDataWg sync.WaitGroup
chBatch chan db.Batch
chBatchWg sync.WaitGroup
chError chan error
allChannelClosed bool
}

func DefaultChannelConfig() ChannelConfig {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this function is not intended to be run from outside this package, unexport it.

return ChannelConfig{
desiredBatchSize: defaultDesiredBatchSize,
maxBatchSize: defaultMaxBatchSize,
chNodeData: make(chan NodeData, 2*defaultDesiredBatchSize),
chNodeDataWg: sync.WaitGroup{},
chBatch: make(chan db.Batch, 1),
chBatchWg: sync.WaitGroup{},
chError: make(chan error, 1),
allChannelClosed: false,
}
}

type NodeData struct {
node *Node
data []byte
}

// newImporter creates a new Importer for an empty MutableTree.
// Underneath it spawns three goroutines to process the data import flow.
//
// version should correspond to the version that was initially exported. It must be greater than
// or equal to the highest ExportNode version number given.
Expand All @@ -45,13 +82,70 @@ func newImporter(tree *MutableTree, version int64) (*Importer, error) {
return nil, errors.New("tree must be empty")
}

return &Importer{
tree: tree,
version: version,
batch: tree.ndb.db.NewBatch(),
stack: make([]*Node, 0, 8),
nonces: make([]uint32, version+1),
}, nil
importer := &Importer{
tree: tree,
version: version,
batch: tree.ndb.db.NewBatch(),
stack: make([]*Node, 0, 8),
nonces: make([]uint32, version+1),
batchMtx: sync.RWMutex{},
chanConfig: DefaultChannelConfig(),
}

importer.chanConfig.chNodeDataWg.Add(1)
go setBatchData(importer)

importer.chanConfig.chBatchWg.Add(1)
go batchWrite(importer)

return importer, nil
}

// setBatchData get the next serialized node data from channel, and write the data to the current batch
func setBatchData(i *Importer) {
for i.batch != nil {
if nodeData, open := <-i.chanConfig.chNodeData; open {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can reduce nesting and ease reading by inverting this condition per

nodeData, open := ...
if !open {
   break
}

i.batchMtx.Lock()
...

i.batchMtx.RLock()
if i.batch != nil {
err := i.batch.Set(i.tree.ndb.nodeKey(nodeData.node.GetKey()), nodeData.data)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a mutating operation being invoked under a read lock. One do the reasons why it is important to just use a plain mutex, and also please see https://cyber.orijtech.com/scsec/cosmos-go-coding-guide#avoid-syncrwmutex

if err != nil {
i.batchMtx.RUnlock()
i.chanConfig.chError <- err
break
}
}
i.batchSize++
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another mutation while a read lock is being held

i.batchMtx.RUnlock()
// Only commit a new batch if size meet desiredBatchSize and there's no pending batch write or we exceed maxBatchSize
if (i.batchSize >= i.chanConfig.desiredBatchSize && len(i.chanConfig.chBatch) < 1) || i.batchSize >= i.chanConfig.maxBatchSize {
i.chanConfig.chBatch <- i.batch
i.batch = i.tree.ndb.db.NewBatch()
i.batchSize = 0
}
} else {
break
}
}
i.chanConfig.chNodeDataWg.Done()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please make this a defer and move it to the top too.

}

// batchWrite get a new batch from the channel and execute the batch write to the underlying DB.
func batchWrite(i *Importer) {
for i.batch != nil {
if nextBatch, open := <-i.chanConfig.chBatch; open {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please invert this condition and break early for !open instead of that else after many lines of code.

err := nextBatch.Write()
if err != nil {
i.chanConfig.chError <- err
break
}
i.batchMtx.Lock()
nextBatch.Close()
i.batchMtx.Unlock()
} else {
break
}
}
i.chanConfig.chBatchWg.Done()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use defer

}

// writeNode writes the node content to the storage.
Expand All @@ -65,27 +159,18 @@ func (i *Importer) writeNode(node *Node) error {

buf := bufPool.Get().(*bytes.Buffer)
buf.Reset()
defer bufPool.Put(buf)

if err := node.writeBytes(buf); err != nil {
return err
}

bytesCopy := make([]byte, buf.Len())
copy(bytesCopy, buf.Bytes())
bufPool.Put(buf)

if err := i.batch.Set(i.tree.ndb.nodeKey(node.GetKey()), bytesCopy); err != nil {
return err
}

i.batchSize++
if i.batchSize >= maxBatchSize {
if err := i.batch.Write(); err != nil {
return err
}
i.batch.Close()
i.batch = i.tree.ndb.db.NewBatch()
i.batchSize = 0
// Handle the remaining steps in a separate goroutine
i.chanConfig.chNodeData <- NodeData{
node: node,
data: bytesCopy,
}

return nil
Expand All @@ -94,8 +179,9 @@ func (i *Importer) writeNode(node *Node) error {
// Close frees all resources. It is safe to call multiple times. Uncommitted nodes may already have
// been flushed to the database, but will not be visible.
func (i *Importer) Close() {
_ = i.waitAndCloseChannels()
if i.batch != nil {
i.batch.Close()
_ = i.batch.Close()
}
i.batch = nil
i.tree = nil
Expand Down Expand Up @@ -149,6 +235,14 @@ func (i *Importer) Add(exportNode *ExportNode) error {
if err := i.writeNode(rightNode); err != nil {
return err
}

// Check errors from channel
select {
case err := <-i.chanConfig.chError:
return err
default:
}

i.stack = i.stack[:stackSize-2]

// remove the recursive references to avoid memory leak
Expand Down Expand Up @@ -177,6 +271,11 @@ func (i *Importer) Commit() error {
return ErrNoImport
}

err := i.waitAndCloseChannels()
if err != nil {
return err
}

switch len(i.stack) {
case 0:
if err := i.batch.Set(i.tree.ndb.nodeKey(GetRootKey(i.version)), []byte{}); err != nil {
Expand All @@ -197,7 +296,7 @@ func (i *Importer) Commit() error {
len(i.stack))
}

err := i.batch.WriteSync()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can transform this into: if err := ...; err != nil

err = i.batch.WriteSync()
if err != nil {
return err
}
Expand All @@ -211,3 +310,24 @@ func (i *Importer) Commit() error {
i.Close()
return nil
}

// waitAndCloseChannels will try to close all the channels for importer and wait for remaining work to be done.
// This function should only be called in the Commit or Close action. If any error happens when draining the remaining data in the channel,
// The error will be popped out and returned.
func (i *Importer) waitAndCloseChannels() error {
// Make sure all pending works are drained and close the channels in order
if !i.chanConfig.allChannelClosed {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please invert then return early

if i.chanConfig.allChannelsClosed { // Already closed
   return nil
}

i.chanConfig.allChannelClosed = true
close(i.chanConfig.chNodeData)
i.chanConfig.chNodeDataWg.Wait()
close(i.chanConfig.chBatch)
i.chanConfig.chBatchWg.Wait()
// Check errors
select {
case err := <-i.chanConfig.chError:
return err
default:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you intend to add this default case? I ask because if an error wasn't already in the channel under multiplexing, this select default clause immediately executes and doesn't wait for anything on the channel.

}
}
return nil
}