Skip to content

Commit

Permalink
Parallel encoding reworked
Browse files Browse the repository at this point in the history
- Encode in parallel only branches, up to N goroutines
- Set the max N to the number of CPUs
- Only write to buffer if no error was encountered
  • Loading branch information
qdm12 committed Dec 7, 2021
1 parent 5036b34 commit 3663e83
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 38 deletions.
97 changes: 65 additions & 32 deletions internal/trie/node/branch_encode.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"fmt"
"hash"
"io"
"runtime"

"github.com/ChainSafe/gossamer/internal/trie/codec"
"github.com/ChainSafe/gossamer/internal/trie/pools"
Expand Down Expand Up @@ -113,43 +114,72 @@ func (b *Branch) Encode(buffer Buffer) (err error) {
}
}

const parallel = false // TODO
if parallel {
err = encodeChildrenInParallel(b.Children, buffer)
} else {
err = encodeChildrenSequentially(b.Children, buffer)
}
err = encodeChildrenOpportunisticParallel(b.Children, buffer)
if err != nil {
return fmt.Errorf("cannot encode children of branch: %w", err)
}

return nil
}

func encodeChildrenInParallel(children [16]Node, buffer io.Writer) (err error) {
type result struct {
index int
buffer *bytes.Buffer
err error
type encodingAsyncResult struct {
index int
buffer *bytes.Buffer
err error
}

func runEncodeChild(child Node, index int,
results chan<- encodingAsyncResult, rateLimit <-chan struct{}) {
buffer := pools.EncodingBuffers.Get().(*bytes.Buffer)
buffer.Reset()
// buffer is put back in the pool after processing its
// data in the select block below.

err := encodeChild(child, buffer)

results <- encodingAsyncResult{
index: index,
buffer: buffer,
err: err,
}
if rateLimit != nil {
// Only run if runEncodeChild is launched
// in its own goroutine.
<-rateLimit
}
}

var parallelLimit = runtime.NumCPU()

var parallelEncodingRateLimit = make(chan struct{}, parallelLimit)

resultsCh := make(chan result)
// encodeChildrenOpportunisticParallel encodes children in parallel eventually.
// Leaves are encoded in a blocking way, and branches are encoded in separate
// goroutines IF they are less than the parallelLimit number of goroutines already
// running. This is designed to limit the total number of goroutines in order to
// avoid using too much memory on the stack.
func encodeChildrenOpportunisticParallel(children [16]Node, buffer io.Writer) (err error) {
// Buffered channels since children might be encoded in this
// goroutine or another one.
resultsCh := make(chan encodingAsyncResult, len(children))

for i, child := range children {
go func(index int, child Node) {
buffer := pools.EncodingBuffers.Get().(*bytes.Buffer)
buffer.Reset()
// buffer is put back in the pool after processing its
// data in the select block below.

err := encodeChild(child, buffer)

resultsCh <- result{
index: index,
buffer: buffer,
err: err,
}
}(i, child)
if isNodeNil(child) || child.Type() == LeafType {
runEncodeChild(child, i, resultsCh, nil)
continue
}

// Branch child
select {
case parallelEncodingRateLimit <- struct{}{}:
// We have a goroutine available to encode
// the branch in parallel.
go runEncodeChild(child, i, resultsCh, parallelEncodingRateLimit)
default:
// we reached the maximum parallel goroutines
// so encode this branch in this goroutine
runEncodeChild(child, i, resultsCh, nil)
}
}

currentIndex := 0
Expand All @@ -166,7 +196,7 @@ func encodeChildrenInParallel(children [16]Node, buffer io.Writer) (err error) {
for currentIndex < len(children) &&
resultBuffers[currentIndex] != nil {
bufferSlice := resultBuffers[currentIndex].Bytes()
if len(bufferSlice) > 0 {
if err == nil && len(bufferSlice) > 0 {
// note buffer.Write copies the byte slice given as argument
_, writeErr := buffer.Write(bufferSlice)
if writeErr != nil && err == nil {
Expand Down Expand Up @@ -203,17 +233,20 @@ func encodeChildrenSequentially(children [16]Node, buffer io.Writer) (err error)
return nil
}

func encodeChild(child Node, buffer io.Writer) (err error) {
var isNil bool
switch impl := child.(type) {
func isNodeNil(n Node) (isNil bool) {
switch impl := n.(type) {
case *Branch:
isNil = impl == nil
case *Leaf:
isNil = impl == nil
default:
isNil = child == nil
isNil = n == nil
}
if isNil {
return isNil
}

func encodeChild(child Node, buffer io.Writer) (err error) {
if isNodeNil(child) {
return nil
}

Expand Down
12 changes: 6 additions & 6 deletions internal/trie/node/branch_encode_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func Test_Branch_Encode(t *testing.T) {
wrappedErr: errTest,
errMessage: "cannot write encoded value to buffer: test error",
},
"buffer write error for children encoded sequentially": {
"buffer write error for children encoding": {
branch: &Branch{
Key: []byte{1, 2, 3},
Value: []byte{100},
Expand Down Expand Up @@ -152,10 +152,10 @@ func Test_Branch_Encode(t *testing.T) {
},
wrappedErr: errTest,
errMessage: "cannot encode children of branch: " +
"cannot encode child at index 3: " +
"failed to write child to buffer: test error",
"cannot write encoding of child at index 3: " +
"test error",
},
"success with sequential children encoding": {
"success with children encoding": {
branch: &Branch{
Key: []byte{1, 2, 3},
Value: []byte{100},
Expand Down Expand Up @@ -218,7 +218,7 @@ func Test_Branch_Encode(t *testing.T) {
}
}

func Test_encodeChildrenInParallel(t *testing.T) {
func Test_encodeChildrenOpportunisticParallel(t *testing.T) {
t.Parallel()

testCases := map[string]struct {
Expand Down Expand Up @@ -306,7 +306,7 @@ func Test_encodeChildrenInParallel(t *testing.T) {
previousCall = call
}

err := encodeChildrenInParallel(testCase.children, buffer)
err := encodeChildrenOpportunisticParallel(testCase.children, buffer)

if testCase.wrappedErr != nil {
assert.ErrorIs(t, err, testCase.wrappedErr)
Expand Down

0 comments on commit 3663e83

Please sign in to comment.