-
Notifications
You must be signed in to change notification settings - Fork 263
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
perf: Improve IAVL Import speed for state sync by 2x~3x #664
Conversation
import.go
Outdated
buf := bufPool.Get().(*bytes.Buffer) | ||
buf.Reset() | ||
if err := currNode.writeBytes(buf); err != nil { | ||
panic(err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How does this change impact the application's error handling holistically? Previously we'd get an error back from Importer.Add
if there was a problem, now we'll have a panic in a goroutine.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point! One solution can be sending any error happened in any goroutine to a channel. The other solution can be adding an error slice variable to the Importer struct and whenever an error occurs, append to the slice.
import.go
Outdated
i.batchMtx.RUnlock() | ||
i.batchSize++ | ||
// Only commit a new batch if size meet desiredBatchSize and there's no pending batch write | ||
if i.batchSize >= desiredBatchSize && len(i.chBatch) < 1 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If write I/O is blocked for any reason then the batch could inflate (in memory) to the size of the remaining dataset to import (on disk) right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That is a valid concern, we can add back max batch size to prevent unbounded memory usage, so batch size would be controlled between desired size and max size, what do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank for the contribution! Leveraging concurrency is a good direction, and impressive improvements and benchmarks.
Just some questions about error handling and how we're buffering batches if write is blocked. I think we should limit that somehow.
Also tests now appear to breaking with
WARNING: DATA RACE
Read at 0x00c01b1f2650 by goroutine 399:
Looks great. But I guess step 4 is pretty fast, maybe not justified a separate goroutine for it. Footnotes |
Yeah, I'll look into the test failures, thanks for taking a look and providing the feedbacks! |
I'm not that familiar with RocksDB, but if we can improve the ingestion itself, it's definitely going to help the most. AFAIK, we provide users to choose their own underline database implementation, so optimizing for one type of DB might not help for other DB engine. However, if RocksDB outperforms other DB a lot, we can switch the default DB engine to RocksDB and recommend users to migrate? And yeah, you are absolutely right, step 4 doesn't have to be a in separate goroutine, I'll move it back to Add function |
Addressed most of the comments, could you please take another look? Thanks! |
import.go
Outdated
if err = node.writeBytes(buf); err != nil { | ||
return err | ||
if err := node.writeBytes(buf); err != nil { | ||
panic(err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why panic
here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah good catch, let me fix it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cool-develope Should be fixed now! Please take another look, thanks!
sorry for delay, we will merge this work after merging the node key refactor. The rebase should not be large if at all. Thank you for the PR |
@yzang2019 could you rebase and then we can take care of merging this |
@elias-orijtech could i get your eyes on this |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The speedup and general approach seems good. Two issues:
- A test would be nice. From a glance at
import_test.go
I see only tests for cases for small tree and edge cases, which doesn't exercise this new code much. Import
is (partially) mutable during an ongoing import. That seems like unwanted complexity to me, and would explain (some of) the rather larger amount of new state needed for managing the new goroutines.
I suggest a design where you set up parameters, and then start the import, which can from then on only be Cancel
ed or Commit
ed. That may involve an ImportConfig
type, or you could even make desiredBatchSize
and desiredMaxBatchSize
non-configurable by the YAGNI principle.
import.go
Outdated
i.chNodeDataWg.Done() | ||
} | ||
|
||
// batchWrite get a new batch from the channel and execute the batch write to the underline DB. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: "underline" -> "underlying"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed
import.go
Outdated
|
||
// WithDesiredBatchSize set the desired batch size for write | ||
func (i *Importer) WithDesiredBatchSize(batchSize uint32) *Importer { | ||
i.desiredBatchSize = batchSize |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The mutability of desiredBatchSize
and maxBatchSize
during an ongoing import seems like too much complexity for little gain.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point, removed the function to configure batch size, and will stick to the tuned value then
import.go
Outdated
// WithMaxBatchSize set the maximum allowed batch size for write, should be greater than desired batch size. | ||
// Consider increase max batch size to reduce overall import time. | ||
func (i *Importer) WithMaxBatchSize(batchSize uint32) *Importer { | ||
i.maxBatchSize = batchSize |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Addressed
import.go
Outdated
batchMtx: sync.RWMutex{}, | ||
desiredBatchSize: defaultDesiredBatchSize, | ||
maxBatchSize: defaultMaxBatchSize, | ||
chNodeData: make(chan NodeData, 2*defaultDesiredBatchSize), | ||
chNodeDataWg: sync.WaitGroup{}, | ||
chBatch: make(chan db.Batch, 1), | ||
chBatchWg: sync.WaitGroup{}, | ||
chError: make(chan error, 1), | ||
allChannelClosed: false, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Without digging into details, this seems like a lot of state to keep 2 goroutines in check. See also my PR comment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Moved them to another struct called ChannelConfig
@yzang2019 can you help update and address comments please |
Yes, let me do a rebase to merge the latest changes and address the comments |
@yihuang @yzang2019 would be great to include this in the next release |
yup we will include this in the 1.0 release. abci++ work in the sdk is concluding so there is more time for these things |
Will work on it this weekend, sorry for the delay |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM!
Could you please add the benchmark codes? Also, it is preferred to add some tests of the batch channel with small custom config, since the default config is too large
what benchmark code shall I add? I can see there's already a BenchmarkImport test? |
yeah, but it doesn't demonstrate the current updates |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I still think this PR is more complicated than need be. From your PR description, step (5) and (6) are expensive,
(5) Append node data to the current batch (heavy ops)
(6) If batch size is full, flush the whole batch and rotate a new batch (heavy ops)
and the goal of the PR is to start a batch (5) while the previous batch is in (6). Correct?
If so, I don't see the need for 2 extra goroutines. I think (5) can be done as before this PR (synchronously from Importer.Add
) and only the commit phase (6) needs a separate goroutine.
In the detail, there still seem to be too much synchronization state. For example, why have batchMtx
when you communicate with the worker goroutines through chNodeData
and chBatch
? Similarly, why have the WaitGroup
s chNodeDataWg
and chbatchWg
when you can close, say, chBatch
and wait for a value on chError
?
If one goroutine is enough, I expect the only state you need is a chan error
. A sketch:
type Importer struct {
...
commitBatch <-chan error
...
}
func (i *Importer) Add(exportNode *ExportNode) error {
...
i.writeNode(...)
if batchSize >= commitBatchSize {
// Wait for previous batch.
if i.commitBatch != nil {
if err := <-i.commitBatch; err != nil {
return err
}
}
result := make(chan error)
i.commitBatch = result
go i.commitBatch(result, i.batch)
// Start new batch.
i.batch = ...
}
}
allChannelClosed bool | ||
} | ||
|
||
func DefaultChannelConfig() ChannelConfig { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If this function is not intended to be run from outside this package, unexport it.
As described in cosmos#664, both the construction and subsequent flushing of transaction batches are slow. This change is a minimal alternative to nothing else. This ensures simplicity and hopefully gains most of the performance improvement from cosmos#664.
As described in cosmos#664, both the construction and subsequent flushing of transaction batches are slow. This change is a minimal alternative to nothing else. This ensures simplicity and hopefully gains most of the performance improvement from cosmos#664. On my macOS M1 machine: goos: darwin goarch: arm64 pkg: github.com/cosmos/iavl BenchmarkImportBatch BenchmarkImportBatch-8 4 300864156 ns/op running the same benchmark without this change: BenchmarkImportBatch-8 3 353229292 ns/op
hey we recently merged a fix that helps with performance, would you want to test that and see how it compares |
@yzang2019 I'd like to test it myself
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for this update @yzang2019, just some initial review comments.
if nodeData, open := <-i.chanConfig.chNodeData; open { | ||
i.batchMtx.RLock() | ||
if i.batch != nil { | ||
err := i.batch.Set(i.tree.ndb.nodeKey(nodeData.node.GetKey()), nodeData.data) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a mutating operation being invoked under a read lock. One do the reasons why it is important to just use a plain mutex, and also please see https://cyber.orijtech.com/scsec/cosmos-go-coding-guide#avoid-syncrwmutex
// setBatchData get the next serialized node data from channel, and write the data to the current batch | ||
func setBatchData(i *Importer) { | ||
for i.batch != nil { | ||
if nodeData, open := <-i.chanConfig.chNodeData; open { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can reduce nesting and ease reading by inverting this condition per
nodeData, open := ...
if !open {
break
}
i.batchMtx.Lock()
...
break | ||
} | ||
} | ||
i.batchSize++ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another mutation while a read lock is being held
break | ||
} | ||
} | ||
i.chanConfig.chNodeDataWg.Done() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please make this a defer and move it to the top too.
// batchWrite get a new batch from the channel and execute the batch write to the underlying DB. | ||
func batchWrite(i *Importer) { | ||
for i.batch != nil { | ||
if nextBatch, open := <-i.chanConfig.chBatch; open { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please invert this condition and break early for !open instead of that else after many lines of code.
|
||
if nodeData, err := i.getDataBytes(node); err != nil { | ||
return err | ||
} else { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No need for this else after the prior return err, for idiomatic Go
return nil | ||
} | ||
|
||
func (i *Importer) validate(node *Node) error { | ||
if _, err := node._hash(node.nodeKey.version); err != nil { | ||
return err | ||
} | ||
if err := node.validate(); err != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or you can simply invoke: return node.validate()
Then delete the following 4 lines
@@ -197,7 +319,7 @@ func (i *Importer) Commit() error { | |||
len(i.stack)) | |||
} | |||
|
|||
err := i.batch.WriteSync() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can transform this into: if err := ...; err != nil
// The error will be popped out and returned. | ||
func (i *Importer) waitAndCloseChannels() error { | ||
// Make sure all pending works are drained and close the channels in order | ||
if !i.chanConfig.allChannelClosed { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please invert then return early
if i.chanConfig.allChannelsClosed { // Already closed
return nil
}
select { | ||
case err := <-i.chanConfig.chError: | ||
return err | ||
default: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did you intend to add this default case? I ask because if an error wasn't already in the channel under multiplexing, this select default clause immediately executes and doesn't wait for anything on the channel.
Seems like this patch is already cherry picked in #793, going to close this original PR now |
Problem Statement
Currently the cosmos state sync bottleneck when restoring the offered snapshot chunks is on the IAVL Store side.
This is because in order to restore all the chunks from the snapshot, we need to load all the items from the chunk data into the IAVL Store to rebuild the tree from bottom up in order and do this one chunk at a time. Since rebuilding the tree requires strict ordering guarantee, all the steps to import a new node into tree is now processed by a single goroutine.
While single threaded model guarantees correctness, this is extremely inefficient and will not be able to fully utilize multicores or faster disk IOPs to speed things up. In certain scenarios when we want to bootstrap a new pruned node as fast as possible (e.g. aws autoscaling group), the slowness of state sync will become a bottleneck and blocker.
Context
In the currently architecture, whenever we import a new node to the IAVL tree, it needs to go through the following process:
One major problem of using a single thread for all these step is that step 5 and step 6 could become really slow and when a batch write takes dozens of seconds or up till a minute to complete, the whole thread becomes idle, waiting for the previous batch write IO to complete.
Proposal
The proposal is to try our best to parallelize the node import steps while still keep the minimum ordering guarantee needed to rebuild the IAVL Tree. Upon deep investigation, we found out that the logic of building the tree from stack , and the remaining logic to persist node data can happen in parallel. That means we can use a few goroutines with channels to parallel processing the nodes as well as maintaining the necessary order for each step.
Another biggest gain is by using a larger batch write size. We find out different batch size would impact total time by a lot. With a proper tuned batch size, we will be able to speed up the import by a lot.
Here's what the threading model would look like:
Add: [Step1 -> Step2 -> Step3 -> Step4 -> channelA -> Step7
goroutineA: [channelA -> Step5 -> channelB] (Act as the buffer to reduce pre processing time)
goroutineC: [channelC -> Step6] (Do the batch write in the background)
With this new threading model, when we are busy doing disk IO write, our CPU will not become idle and can continuously doing its job to build the tree structure, compute hash and perform data serializations, as well as append node data to the next batch in parallel. This would save us huge amount of time compare to the past where all the remaining work has to stop and wait for the batch to complete.
Of course, there's still a certain limitation we will hit. We have limited memory and we can not buffer everything into the channel. When the batch write becomes really really slow, the channel buffer might still get filled up, the side effect would be that we will see memory usage increased and the next batch size becomes really large. But with the assumption of bigger batches is more efficient than small batches, we think this will still benefit us in some way to improve the speed of the whole process.
Benchmark
We've done a few benchmark for sei atlantic-1 chain. Here's the benchmark result:
Testing Setup:
Testing Result:
Key Takeaway: