Skip to content

Commit

Permalink
Parallel encoding reworked
Browse files Browse the repository at this point in the history
- Add `Type()` method on `Node`
- Encode in parallel only branches, up to N goroutines
- Set the max N to the number of CPUs
- Only write to buffer if no error was encountered
  • Loading branch information
qdm12 committed Nov 30, 2021
1 parent 91ed716 commit 718d796
Show file tree
Hide file tree
Showing 5 changed files with 86 additions and 38 deletions.
9 changes: 9 additions & 0 deletions lib/trie/branch/branch.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,15 @@ type Branch struct {
sync.RWMutex
}

// Type returns node.BranchType if the branch value
// is nil, and node.BranchWithValueType otherwise.
func (b *Branch) Type() node.Type {
if b.Value == nil {
return node.BranchType
}
return node.BranchWithValueType
}

func (b *Branch) String() string {
if len(b.Value) > 1024 {
return fmt.Sprintf("key=%x childrenBitmap=%16b value (hashed)=%x dirty=%v",
Expand Down
97 changes: 65 additions & 32 deletions lib/trie/branch/encode.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"fmt"
"hash"
"io"
"runtime"

"github.com/ChainSafe/gossamer/lib/common"
"github.com/ChainSafe/gossamer/lib/trie/encode"
Expand Down Expand Up @@ -120,43 +121,72 @@ func (b *Branch) Encode(buffer encode.Buffer) (err error) {
}
}

const parallel = false // TODO
if parallel {
err = encodeChildrenInParallel(b.Children, buffer)
} else {
err = encodeChildrenSequentially(b.Children, buffer)
}
err = encodeChildrenInParallelPlease(b.Children, buffer)
if err != nil {
return fmt.Errorf("cannot encode children of branch: %w", err)
}

return nil
}

func encodeChildrenInParallel(children [16]node.Node, buffer io.Writer) (err error) {
type result struct {
index int
buffer *bytes.Buffer
err error
type encodingAsyncResult struct {
index int
buffer *bytes.Buffer
err error
}

func runEncodeChild(child node.Node, index int,
results chan<- encodingAsyncResult, rateLimit <-chan struct{}) {
buffer := pools.EncodingBuffers.Get().(*bytes.Buffer)
buffer.Reset()
// buffer is put back in the pool after processing its
// data in the select block below.

err := encodeChild(child, buffer)

results <- encodingAsyncResult{
index: index,
buffer: buffer,
err: err,
}
if rateLimit != nil {
// Only run if runEncodeChild is launched
// in its own goroutine.
<-rateLimit
}
}

var parallelLimit = runtime.NumCPU()

var parallelEncodingRateLimit = make(chan struct{}, parallelLimit)

resultsCh := make(chan result)
// encodeChildrenInParallelPlease encodes children in parallel eventually.
// Leaves are encoded in a blocking way, and branches are encoded in separate
// goroutines IF they are less than the parallelLimit number of goroutines already
// running. This is designed to limit the total number of goroutines in order to
// avoid using too much memory on the stack.
func encodeChildrenInParallelPlease(children [16]node.Node, buffer io.Writer) (err error) {
// Buffered channels since children might be encoded in this
// goroutine or another one.
resultsCh := make(chan encodingAsyncResult, len(children))

for i, child := range children {
go func(index int, child node.Node) {
buffer := pools.EncodingBuffers.Get().(*bytes.Buffer)
buffer.Reset()
// buffer is put back in the pool after processing its
// data in the select block below.

err := encodeChild(child, buffer)

resultsCh <- result{
index: index,
buffer: buffer,
err: err,
}
}(i, child)
if isNodeNil(child) || child.Type() == node.LeafType {
runEncodeChild(child, i, resultsCh, nil)
continue
}

// Branch child
select {
case parallelEncodingRateLimit <- struct{}{}:
// We have a goroutine available to encode
// the branch in parallel.
go runEncodeChild(child, i, resultsCh, parallelEncodingRateLimit)
default:
// we reached the maximum parallel goroutines
// so encode this branch in this goroutine
runEncodeChild(child, i, resultsCh, nil)
}
}

currentIndex := 0
Expand All @@ -173,7 +203,7 @@ func encodeChildrenInParallel(children [16]node.Node, buffer io.Writer) (err err
for currentIndex < len(children) &&
resultBuffers[currentIndex] != nil {
bufferSlice := resultBuffers[currentIndex].Bytes()
if len(bufferSlice) > 0 {
if err == nil && len(bufferSlice) > 0 {
// note buffer.Write copies the byte slice given as argument
_, writeErr := buffer.Write(bufferSlice)
if writeErr != nil && err == nil {
Expand Down Expand Up @@ -210,17 +240,20 @@ func encodeChildrenSequentially(children [16]node.Node, buffer io.Writer) (err e
return nil
}

func encodeChild(child node.Node, buffer io.Writer) (err error) {
var isNil bool
switch impl := child.(type) {
func isNodeNil(n node.Node) (isNil bool) {
switch impl := n.(type) {
case *Branch:
isNil = impl == nil
case *leaf.Leaf:
isNil = impl == nil
default:
isNil = child == nil
isNil = n == nil
}
if isNil {
return isNil
}

func encodeChild(child node.Node, buffer io.Writer) (err error) {
if isNodeNil(child) {
return nil
}

Expand Down
12 changes: 6 additions & 6 deletions lib/trie/branch/encode_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ func Test_Branch_Encode(t *testing.T) {
wrappedErr: errTest,
errMessage: "cannot write encoded value to buffer: test error",
},
"buffer write error for children encoded sequentially": {
"buffer write error for children encoding": {
branch: &Branch{
Key: []byte{1, 2, 3},
Value: []byte{100},
Expand Down Expand Up @@ -175,10 +175,10 @@ func Test_Branch_Encode(t *testing.T) {
},
wrappedErr: errTest,
errMessage: "cannot encode children of branch: " +
"cannot encode child at index 3: " +
"failed to write child to buffer: test error",
"cannot write encoding of child at index 3: " +
"test error",
},
"success with sequential children encoding": {
"success with children encoding": {
branch: &Branch{
Key: []byte{1, 2, 3},
Value: []byte{100},
Expand Down Expand Up @@ -241,7 +241,7 @@ func Test_Branch_Encode(t *testing.T) {
}
}

func Test_encodeChildrenInParallel(t *testing.T) {
func Test_encodeChildrenInParallelPlease(t *testing.T) {
t.Parallel()

testCases := map[string]struct {
Expand Down Expand Up @@ -329,7 +329,7 @@ func Test_encodeChildrenInParallel(t *testing.T) {
previousCall = call
}

err := encodeChildrenInParallel(testCase.children, buffer)
err := encodeChildrenInParallelPlease(testCase.children, buffer)

if testCase.wrappedErr != nil {
assert.ErrorIs(t, err, testCase.wrappedErr)
Expand Down
5 changes: 5 additions & 0 deletions lib/trie/leaf/leaf.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,11 @@ type Leaf struct {
sync.RWMutex
}

// Type returns node.LeafType.
func (l *Leaf) Type() node.Type {
return node.LeafType
}

func (l *Leaf) String() string {
if len(l.Value) > 1024 {
return fmt.Sprintf("leaf key=%x value (hashed)=%x dirty=%v", l.Key, common.MustBlake2bHash(l.Value), l.Dirty)
Expand Down
1 change: 1 addition & 0 deletions lib/trie/node/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,5 @@ type Node interface {
GetGeneration() uint64
SetGeneration(uint64)
Copy() Node
Type() Type
}

0 comments on commit 718d796

Please sign in to comment.