Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: Improve IAVL Import speed for state sync by 2x~3x #664

Closed
wants to merge 14 commits into from

Conversation

yzang2019
Copy link

@yzang2019 yzang2019 commented Jan 23, 2023

Problem Statement
Currently the cosmos state sync bottleneck when restoring the offered snapshot chunks is on the IAVL Store side.

This is because in order to restore all the chunks from the snapshot, we need to load all the items from the chunk data into the IAVL Store to rebuild the tree from bottom up in order and do this one chunk at a time. Since rebuilding the tree requires strict ordering guarantee, all the steps to import a new node into tree is now processed by a single goroutine.

While single threaded model guarantees correctness, this is extremely inefficient and will not be able to fully utilize multicores or faster disk IOPs to speed things up. In certain scenarios when we want to bootstrap a new pruned node as fast as possible (e.g. aws autoscaling group), the slowness of state sync will become a bottleneck and blocker.

Context
In the currently architecture, whenever we import a new node to the IAVL tree, it needs to go through the following process:

  1. Build the tree structure using a stack (fast)
  2. Compute the hash of the node (fast)
  3. Validate the node contents (fast)
  4. Serialize the node data into byte array (medium)
  5. Append node data to the current batch (heavy ops)
  6. If batch size is full, flush the whole batch and rotate a new batch (heavy ops)
  7. Update the stack (fast)

One major problem of using a single thread for all these step is that step 5 and step 6 could become really slow and when a batch write takes dozens of seconds or up till a minute to complete, the whole thread becomes idle, waiting for the previous batch write IO to complete.

Proposal

  • Optimize threading model
  • Optimize batch write efficiency with larger configurable batch size

The proposal is to try our best to parallelize the node import steps while still keep the minimum ordering guarantee needed to rebuild the IAVL Tree. Upon deep investigation, we found out that the logic of building the tree from stack , and the remaining logic to persist node data can happen in parallel. That means we can use a few goroutines with channels to parallel processing the nodes as well as maintaining the necessary order for each step.

Another biggest gain is by using a larger batch write size. We find out different batch size would impact total time by a lot. With a proper tuned batch size, we will be able to speed up the import by a lot.

IAVL drawio (3)

Here's what the threading model would look like:
Add: [Step1 -> Step2 -> Step3 -> Step4 -> channelA -> Step7
goroutineA: [channelA -> Step5 -> channelB] (Act as the buffer to reduce pre processing time)
goroutineC: [channelC -> Step6] (Do the batch write in the background)

With this new threading model, when we are busy doing disk IO write, our CPU will not become idle and can continuously doing its job to build the tree structure, compute hash and perform data serializations, as well as append node data to the next batch in parallel. This would save us huge amount of time compare to the past where all the remaining work has to stop and wait for the batch to complete.

Of course, there's still a certain limitation we will hit. We have limited memory and we can not buffer everything into the channel. When the batch write becomes really really slow, the channel buffer might still get filled up, the side effect would be that we will see memory usage increased and the next batch size becomes really large. But with the assumption of bigger batches is more efficient than small batches, we think this will still benefit us in some way to improve the speed of the whole process.

Benchmark
We've done a few benchmark for sei atlantic-1 chain. Here's the benchmark result:

Testing Setup:

  • Total compressed snapshot size: 650MB
  • Total state key-value entries: 28,000,000
  • Total decompressed state size: 7GB

Testing Result:

Instance Type CPU RAM Instance Cost Boot Drive 1.2.2 Beta 1.2.2Beta With IAVL Patch
c6a.4xlarge 16 32G 0.76/h No 15 min 5 min
c6a.2xlarge 8 16G 0.38/h No 16 min 5 min 30s
m5.4xlarge 16 64G 0.89/h No 21 min 8 min
m5.4xlarge 16 64G 0.89/h Yes 26 min 14 min
m4.2xlarge 8 32G 0.47/h No 23 min 8 min 30s
m4.2xlarge 8 32G 0.47/h Yes 27 min 15 min

Key Takeaway:

  • With this IAVL patch, we are able to reduce total state sync time by up to 60% - 67%
  • CPU per core speed matters more than num of cores and ram size for state sync
  • Whether storing data on snapshot boot drive or on a newly created EBS volume also matters a lot

@yzang2019 yzang2019 requested a review from a team as a code owner January 23, 2023 04:33
@yzang2019 yzang2019 changed the title [State Sync] Improve IAVL Import speed by 3~4X feat: [State Sync] Improve IAVL Import speed by 3~4X Jan 23, 2023
@yzang2019 yzang2019 changed the title feat: [State Sync] Improve IAVL Import speed by 3~4X feat: [State Sync] Improve IAVL Import speed by 3x~4x Jan 23, 2023
@yzang2019 yzang2019 changed the title feat: [State Sync] Improve IAVL Import speed by 3x~4x feat: [State Sync] Improve IAVL Import speed by 2x~3x Jan 23, 2023
@yzang2019 yzang2019 changed the title feat: [State Sync] Improve IAVL Import speed by 2x~3x feat: Improve IAVL Import speed for state sync by 2x~3x Jan 23, 2023
@yzang2019 yzang2019 changed the title feat: Improve IAVL Import speed for state sync by 2x~3x perf: Improve IAVL Import speed for state sync by 2x~3x Jan 23, 2023
@kocubinski kocubinski self-assigned this Jan 23, 2023
import.go Outdated
buf := bufPool.Get().(*bytes.Buffer)
buf.Reset()
if err := currNode.writeBytes(buf); err != nil {
panic(err)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How does this change impact the application's error handling holistically? Previously we'd get an error back from Importer.Add if there was a problem, now we'll have a panic in a goroutine.

Copy link
Author

@yzang2019 yzang2019 Jan 25, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point! One solution can be sending any error happened in any goroutine to a channel. The other solution can be adding an error slice variable to the Importer struct and whenever an error occurs, append to the slice.

import.go Outdated
i.batchMtx.RUnlock()
i.batchSize++
// Only commit a new batch if size meet desiredBatchSize and there's no pending batch write
if i.batchSize >= desiredBatchSize && len(i.chBatch) < 1 {
Copy link
Member

@kocubinski kocubinski Jan 23, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If write I/O is blocked for any reason then the batch could inflate (in memory) to the size of the remaining dataset to import (on disk) right?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is a valid concern, we can add back max batch size to prevent unbounded memory usage, so batch size would be controlled between desired size and max size, what do you think?

Copy link
Member

@kocubinski kocubinski left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank for the contribution! Leveraging concurrency is a good direction, and impressive improvements and benchmarks.

Just some questions about error handling and how we're buffering batches if write is blocked. I think we should limit that somehow.

Also tests now appear to breaking with

WARNING: DATA RACE
Read at 0x00c01b1f2650 by goroutine 399:

@yihuang
Copy link
Collaborator

yihuang commented Jan 25, 2023

Looks great. But I guess step 4 is pretty fast, maybe not justified a separate goroutine for it.
And I guess the real bottleneck is the bulk loading the data into db itself, current solution runs it in parallel, it definitely helps. there are also some solutions to speed up the data ingestion itself, for example there are several options for rocksdb1, IMO the best solution for rocksdb is writing out sst files separately using SSTFileWriter (you can even write multiple files concurrently if necessary), then ingest them into db in one step.

Footnotes

  1. https://rockset.com/blog/optimizing-bulk-load-in-rocksdb/

@yzang2019
Copy link
Author

Thank for the contribution! Leveraging concurrency is a good direction, and impressive improvements and benchmarks.

Just some questions about error handling and how we're buffering batches if write is blocked. I think we should limit that somehow.

Also tests now appear to breaking with

WARNING: DATA RACE
Read at 0x00c01b1f2650 by goroutine 399:

Yeah, I'll look into the test failures, thanks for taking a look and providing the feedbacks!

@yzang2019
Copy link
Author

yzang2019 commented Jan 25, 2023

Looks great. But I guess step 4 is pretty fast, maybe not justified a separate goroutine for it. And I guess the real bottleneck is the bulk loading the data into db itself, current solution runs it in parallel, it definitely helps. there are also some solutions to speed up the data ingestion itself, for example there are several options for rocksdb1, IMO the best solution for rocksdb is writing out sst files separately using SSTFileWriter (you can even write multiple files concurrently if necessary), then ingest them into db in one step.

Footnotes

  1. https://rockset.com/blog/optimizing-bulk-load-in-rocksdb/

I'm not that familiar with RocksDB, but if we can improve the ingestion itself, it's definitely going to help the most. AFAIK, we provide users to choose their own underline database implementation, so optimizing for one type of DB might not help for other DB engine. However, if RocksDB outperforms other DB a lot, we can switch the default DB engine to RocksDB and recommend users to migrate?

And yeah, you are absolutely right, step 4 doesn't have to be a in separate goroutine, I'll move it back to Add function

@yzang2019 yzang2019 requested review from kocubinski and removed request for cool-develope February 2, 2023 16:46
@yzang2019
Copy link
Author

Addressed most of the comments, could you please take another look? Thanks!

import.go Outdated
if err = node.writeBytes(buf); err != nil {
return err
if err := node.writeBytes(buf); err != nil {
panic(err)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why panic here?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah good catch, let me fix it

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cool-develope Should be fixed now! Please take another look, thanks!

@tac0turtle
Copy link
Member

sorry for delay, we will merge this work after merging the node key refactor. The rebase should not be large if at all. Thank you for the PR

@tac0turtle
Copy link
Member

@yzang2019 could you rebase and then we can take care of merging this

@tac0turtle
Copy link
Member

@elias-orijtech could i get your eyes on this

Copy link
Contributor

@elias-orijtech elias-orijtech left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The speedup and general approach seems good. Two issues:

  1. A test would be nice. From a glance at import_test.go I see only tests for cases for small tree and edge cases, which doesn't exercise this new code much.
  2. Import is (partially) mutable during an ongoing import. That seems like unwanted complexity to me, and would explain (some of) the rather larger amount of new state needed for managing the new goroutines.

I suggest a design where you set up parameters, and then start the import, which can from then on only be Canceled or Commited. That may involve an ImportConfig type, or you could even make desiredBatchSize and desiredMaxBatchSize non-configurable by the YAGNI principle.

import.go Outdated
i.chNodeDataWg.Done()
}

// batchWrite get a new batch from the channel and execute the batch write to the underline DB.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: "underline" -> "underlying"

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

import.go Outdated

// WithDesiredBatchSize set the desired batch size for write
func (i *Importer) WithDesiredBatchSize(batchSize uint32) *Importer {
i.desiredBatchSize = batchSize
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The mutability of desiredBatchSize and maxBatchSize during an ongoing import seems like too much complexity for little gain.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, removed the function to configure batch size, and will stick to the tuned value then

import.go Outdated
// WithMaxBatchSize set the maximum allowed batch size for write, should be greater than desired batch size.
// Consider increase max batch size to reduce overall import time.
func (i *Importer) WithMaxBatchSize(batchSize uint32) *Importer {
i.maxBatchSize = batchSize
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed

import.go Outdated
Comment on lines 73 to 81
batchMtx: sync.RWMutex{},
desiredBatchSize: defaultDesiredBatchSize,
maxBatchSize: defaultMaxBatchSize,
chNodeData: make(chan NodeData, 2*defaultDesiredBatchSize),
chNodeDataWg: sync.WaitGroup{},
chBatch: make(chan db.Batch, 1),
chBatchWg: sync.WaitGroup{},
chError: make(chan error, 1),
allChannelClosed: false,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Without digging into details, this seems like a lot of state to keep 2 goroutines in check. See also my PR comment.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved them to another struct called ChannelConfig

@tac0turtle
Copy link
Member

@yzang2019 can you help update and address comments please

@yzang2019
Copy link
Author

@yzang2019 can you help update and address comments please

Yes, let me do a rebase to merge the latest changes and address the comments

@fedekunze
Copy link
Contributor

@yihuang @yzang2019 would be great to include this in the next release

@tac0turtle
Copy link
Member

yup we will include this in the 1.0 release. abci++ work in the sdk is concluding so there is more time for these things

@yzang2019
Copy link
Author

@yihuang @yzang2019 would be great to include this in the next release

Will work on it this weekend, sorry for the delay

Copy link
Collaborator

@cool-develope cool-develope left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!
Could you please add the benchmark codes? Also, it is preferred to add some tests of the batch channel with small custom config, since the default config is too large

@yzang2019
Copy link
Author

LGTM! Could you please add the benchmark codes? Also, it is preferred to add some tests of the batch channel with small custom config, since the default config is too large

what benchmark code shall I add? I can see there's already a BenchmarkImport test?

@cool-develope
Copy link
Collaborator

cool-develope commented May 22, 2023

what benchmark code shall I add? I can see there's already a BenchmarkImport test?

yeah, but it doesn't demonstrate the current updates

Copy link
Contributor

@elias-orijtech elias-orijtech left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still think this PR is more complicated than need be. From your PR description, step (5) and (6) are expensive,

(5) Append node data to the current batch (heavy ops)
(6) If batch size is full, flush the whole batch and rotate a new batch (heavy ops)

and the goal of the PR is to start a batch (5) while the previous batch is in (6). Correct?

If so, I don't see the need for 2 extra goroutines. I think (5) can be done as before this PR (synchronously from Importer.Add) and only the commit phase (6) needs a separate goroutine.

In the detail, there still seem to be too much synchronization state. For example, why have batchMtx when you communicate with the worker goroutines through chNodeData and chBatch? Similarly, why have the WaitGroups chNodeDataWg and chbatchWg when you can close, say, chBatch and wait for a value on chError?

If one goroutine is enough, I expect the only state you need is a chan error. A sketch:

   type Importer struct {
      ...
      commitBatch <-chan error
      ...
   }

   func (i *Importer) Add(exportNode *ExportNode) error {
      ...
      i.writeNode(...)
      if batchSize >= commitBatchSize {
          // Wait for previous batch.
          if i.commitBatch != nil {
             if err := <-i.commitBatch; err != nil {
                return err
             }
          }
          result := make(chan error)
          i.commitBatch = result
          go i.commitBatch(result, i.batch)
          // Start new batch.
          i.batch = ...
      }
   }

allChannelClosed bool
}

func DefaultChannelConfig() ChannelConfig {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this function is not intended to be run from outside this package, unexport it.

elias-orijtech added a commit to elias-orijtech/iavl that referenced this pull request Jun 21, 2023
As described in cosmos#664, both the construction and subsequent flushing of
transaction batches are slow. This change is a minimal alternative to
nothing else. This ensures simplicity and hopefully gains most of
the performance improvement from cosmos#664.
elias-orijtech added a commit to elias-orijtech/iavl that referenced this pull request Jul 4, 2023
As described in cosmos#664, both the construction and subsequent flushing of
transaction batches are slow. This change is a minimal alternative to
nothing else. This ensures simplicity and hopefully gains most of
the performance improvement from cosmos#664.

On my macOS M1 machine:

goos: darwin
goarch: arm64
pkg: github.com/cosmos/iavl
BenchmarkImportBatch
BenchmarkImportBatch-8   	       4	 300864156 ns/op

running the same benchmark without this change:

BenchmarkImportBatch-8   	       3	 353229292 ns/op
@tac0turtle
Copy link
Member

hey we recently merged a fix that helps with performance, would you want to test that and see how it compares

@zsystm
Copy link

zsystm commented Aug 29, 2023

@yzang2019
How can I setup testing environment for benchmark?

I'd like to test it myself

Testing Setup:

Total compressed snapshot size: 650MB
Total state key-value entries: 28,000,000
Total decompressed state size: 7GB

Copy link
Contributor

@odeke-em odeke-em left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for this update @yzang2019, just some initial review comments.

if nodeData, open := <-i.chanConfig.chNodeData; open {
i.batchMtx.RLock()
if i.batch != nil {
err := i.batch.Set(i.tree.ndb.nodeKey(nodeData.node.GetKey()), nodeData.data)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a mutating operation being invoked under a read lock. One do the reasons why it is important to just use a plain mutex, and also please see https://cyber.orijtech.com/scsec/cosmos-go-coding-guide#avoid-syncrwmutex

// setBatchData get the next serialized node data from channel, and write the data to the current batch
func setBatchData(i *Importer) {
for i.batch != nil {
if nodeData, open := <-i.chanConfig.chNodeData; open {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can reduce nesting and ease reading by inverting this condition per

nodeData, open := ...
if !open {
   break
}

i.batchMtx.Lock()
...

break
}
}
i.batchSize++
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another mutation while a read lock is being held

break
}
}
i.chanConfig.chNodeDataWg.Done()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please make this a defer and move it to the top too.

// batchWrite get a new batch from the channel and execute the batch write to the underlying DB.
func batchWrite(i *Importer) {
for i.batch != nil {
if nextBatch, open := <-i.chanConfig.chBatch; open {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please invert this condition and break early for !open instead of that else after many lines of code.


if nodeData, err := i.getDataBytes(node); err != nil {
return err
} else {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need for this else after the prior return err, for idiomatic Go

return nil
}

func (i *Importer) validate(node *Node) error {
if _, err := node._hash(node.nodeKey.version); err != nil {
return err
}
if err := node.validate(); err != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or you can simply invoke: return node.validate()
Then delete the following 4 lines

@@ -197,7 +319,7 @@ func (i *Importer) Commit() error {
len(i.stack))
}

err := i.batch.WriteSync()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can transform this into: if err := ...; err != nil

// The error will be popped out and returned.
func (i *Importer) waitAndCloseChannels() error {
// Make sure all pending works are drained and close the channels in order
if !i.chanConfig.allChannelClosed {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please invert then return early

if i.chanConfig.allChannelsClosed { // Already closed
   return nil
}

select {
case err := <-i.chanConfig.chError:
return err
default:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you intend to add this default case? I ask because if an error wasn't already in the channel under multiplexing, this select default clause immediately executes and doesn't wait for anything on the channel.

@yzang2019
Copy link
Author

Seems like this patch is already cherry picked in #793, going to close this original PR now

@yzang2019 yzang2019 closed this Oct 24, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

9 participants