Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
24823: distsqlrun: add benchmarks for distsql infrastructure r=jordanlewis a=jordanlewis

For cockroachdb#24690 - this completes all of them except for the outbox (and `RowChannel` which was already benchmarked).

```
BenchmarkRouter/BY_RANGE/outputs=2-8                  30          35692346 ns/op          29.38 MB/s
BenchmarkRouter/BY_RANGE/outputs=4-8                  30          34937864 ns/op          60.03 MB/s
BenchmarkRouter/BY_RANGE/outputs=8-8                  30          35112627 ns/op         119.45 MB/s
BenchmarkRouter/BY_HASH/outputs=2-8                  100          19840820 ns/op          52.85 MB/s
BenchmarkRouter/BY_HASH/outputs=4-8                  100          18774629 ns/op         111.70 MB/s
BenchmarkRouter/BY_HASH/outputs=8-8                   50          27249006 ns/op         153.93 MB/s
BenchmarkRouter/MIRROR/outputs=2-8                    30          34873995 ns/op          30.07 MB/s
BenchmarkRouter/MIRROR/outputs=4-8                    20          59782584 ns/op          35.08 MB/s
BenchmarkRouter/MIRROR/outputs=8-8                    10         153532931 ns/op          27.32 MB/s
```

```
BenchmarkMultiplexedRowChannel/senders=2-8                   500           2841863 ns/op          56.30 MB/s
BenchmarkMultiplexedRowChannel/senders=4-8                   100          10030833 ns/op          31.90 MB/s
BenchmarkMultiplexedRowChannel/senders=8-8                    50          32353212 ns/op          19.78 MB/s
```

```
BenchmarkStreamEncoder/cols=1-8                      500           3074456 ns/op         170.53 MB/s
BenchmarkStreamEncoder/cols=4-8                      200           8792420 ns/op         238.52 MB/s
BenchmarkStreamEncoder/cols=16-8                      50          34338533 ns/op         244.29 MB/s
BenchmarkStreamEncoder/cols=64-8                      10         131163270 ns/op         255.82 MB/s
```

```
BenchmarkStreamDecoder/cols=1-8                  1000000              1058 ns/op         120.92 MB/s
BenchmarkStreamDecoder/cols=4-8                   500000              2792 ns/op         183.32 MB/s
BenchmarkStreamDecoder/cols=16-8                  200000              9592 ns/op         213.50 MB/s
BenchmarkStreamDecoder/cols=64-8                   50000             35636 ns/op         229.88 MB/s
```

Co-authored-by: Jordan Lewis <jordanthelewis@gmail.com>
  • Loading branch information
craig[bot] and jordanlewis committed Apr 18, 2018
2 parents 617b3a1 + 67607b2 commit 56ee335
Show file tree
Hide file tree
Showing 4 changed files with 181 additions and 4 deletions.
39 changes: 36 additions & 3 deletions pkg/sql/distsqlrun/base_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"fmt"
"sync"
"testing"
"unsafe"

"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sqlbase"
Expand Down Expand Up @@ -63,7 +62,7 @@ func BenchmarkRowChannelPipeline(b *testing.B) {
wg.Add(len(rc))

for i := range rc {
rc[i].Init([]sqlbase.ColumnType{columnTypeInt})
rc[i].Init(oneIntCol)

go func(i int) {
defer wg.Done()
Expand All @@ -90,7 +89,7 @@ func BenchmarkRowChannelPipeline(b *testing.B) {
row := sqlbase.EncDatumRow{
sqlbase.DatumToEncDatum(columnTypeInt, tree.NewDInt(tree.DInt(1))),
}
b.SetBytes(int64(unsafe.Sizeof(tree.DInt(1))))
b.SetBytes(int64(8 * 1 * 1))
for i := 0; i < b.N; i++ {
_ = rc[0].Push(row, nil /* meta */)
}
Expand All @@ -99,3 +98,37 @@ func BenchmarkRowChannelPipeline(b *testing.B) {
})
}
}

func BenchmarkMultiplexedRowChannel(b *testing.B) {
numRows := 1 << 16
row := sqlbase.EncDatumRow{intEncDatum(0)}
for _, senders := range []int{2, 4, 8} {
b.Run(fmt.Sprintf("senders=%d", senders), func(b *testing.B) {
b.SetBytes(int64(senders * numRows * 8))
for i := 0; i < b.N; i++ {
var wg sync.WaitGroup
wg.Add(senders + 1)
mrc := &MultiplexedRowChannel{}
mrc.Init(senders, oneIntCol)
go func() {
for {
if r, _ := mrc.Next(); r == nil {
break
}
}
wg.Done()
}()
for j := 0; j < senders; j++ {
go func() {
for k := 0; k < numRows; k++ {
mrc.Push(row, nil /* meta */)
}
mrc.ProducerDone()
wg.Done()
}()
}
wg.Wait()
}
})
}
}
55 changes: 54 additions & 1 deletion pkg/sql/distsqlrun/routers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ import (
// setupRouter creates and starts a router. Returns the router and a WaitGroup
// that tracks the lifetime of the background router goroutines.
func setupRouter(
t *testing.T,
t testing.TB,
evalCtx *tree.EvalContext,
spec OutputRouterSpec,
inputTypes []sqlbase.ColumnType,
Expand Down Expand Up @@ -697,3 +697,56 @@ func TestRangeRouterInit(t *testing.T) {
})
}
}

func BenchmarkRouter(b *testing.B) {
numCols := 1
numRows := 1 << 16
colTypes := makeIntCols(numCols)

ctx := context.Background()
evalCtx := tree.NewTestingEvalContext(cluster.MakeTestingClusterSettings())
defer evalCtx.Stop(context.Background())

input := NewRepeatableRowSource(oneIntCol, makeIntRows(numRows, numCols))

for _, spec := range []OutputRouterSpec{
{
Type: OutputRouterSpec_BY_RANGE,
RangeRouterSpec: OutputRouterSpec_RangeRouterSpec{
Spans: testRangeRouterSpec.Spans,
Encodings: testRangeRouterSpec.Encodings,
},
},
{
Type: OutputRouterSpec_BY_HASH,
HashColumns: []uint32{0},
},
{
Type: OutputRouterSpec_MIRROR,
},
} {
b.Run(spec.Type.String(), func(b *testing.B) {
for _, nOutputs := range []int{2, 4, 8} {
chans := make([]RowChannel, nOutputs)
recvs := make([]RowReceiver, nOutputs)
b.Run(fmt.Sprintf("outputs=%d", nOutputs), func(b *testing.B) {
b.SetBytes(int64(nOutputs * numCols * numRows * 8))
for i := 0; i < b.N; i++ {
input.Reset()
for i := 0; i < nOutputs; i++ {
chans[i].InitWithBufSize(colTypes, rowChannelBufSize)
recvs[i] = &chans[i]
}
r, wg := setupRouter(b, evalCtx, spec, colTypes, recvs)
for i := range chans {
go drainRowChannel(&chans[i])
}
Run(ctx, input, r)
r.ProducerDone()
wg.Wait()
}
})
}
})
}
}
83 changes: 83 additions & 0 deletions pkg/sql/distsqlrun/stream_data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,3 +143,86 @@ func TestEmptyStreamEncodeDecode(t *testing.T) {
t.Errorf("received bogus row %v %v", row, meta)
}
}

func BenchmarkStreamEncoder(b *testing.B) {
numRows := 1 << 16

for _, numCols := range []int{1, 4, 16, 64} {
b.Run(fmt.Sprintf("rows=%d,cols=%d", numRows, numCols), func(b *testing.B) {
b.SetBytes(int64(numRows * numCols * 8))
cols := makeIntCols(numCols)
input := NewRepeatableRowSource(cols, makeIntRows(numRows, numCols))

b.ResetTimer()
ctx := context.Background()

for i := 0; i < b.N; i++ {
b.StopTimer()
input.Reset()
// Reset the EncDatums' encoded bytes cache.
for _, row := range input.rows {
for j := range row {
row[j] = sqlbase.EncDatum{
Datum: row[j].Datum,
}
}
}
var se StreamEncoder
se.init(cols)
b.StartTimer()

// Add rows to the StreamEncoder until the input source is exhausted.
// "Flush" every outboxBufRows.
for j := 0; ; j++ {
row, _ := input.Next()
if row == nil {
break
}
if err := se.AddRow(row); err != nil {
b.Fatal(err)
}
if j%outboxBufRows == 0 {
// ignore output
se.FormMessage(ctx)
}
}
}
})
}
}

func BenchmarkStreamDecoder(b *testing.B) {
ctx := context.Background()

for _, numCols := range []int{1, 4, 16, 64} {
b.Run(fmt.Sprintf("cols=%d", numCols), func(b *testing.B) {
b.SetBytes(int64(outboxBufRows * numCols * 8))
var se StreamEncoder
colTypes := makeIntCols(numCols)
se.init(colTypes)
inRow := makeIntRows(1, numCols)[0]
for i := 0; i < outboxBufRows; i++ {
if err := se.AddRow(inRow); err != nil {
b.Fatal(err)
}
}
msg := se.FormMessage(ctx)

for i := 0; i < b.N; i++ {
var sd StreamDecoder
if err := sd.AddMessage(msg); err != nil {
b.Fatal(err)
}
for j := 0; j < outboxBufRows; j++ {
row, meta, err := sd.GetRow(nil)
if err != nil {
b.Fatal(err)
}
if row == nil && meta == nil {
break
}
}
}
})
}
}
8 changes: 8 additions & 0 deletions pkg/sql/distsqlrun/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,14 @@ var oneIntCol = []sqlbase.ColumnType{intType}
var twoIntCols = []sqlbase.ColumnType{intType, intType}
var threeIntCols = []sqlbase.ColumnType{intType, intType, intType}

func makeIntCols(numCols int) []sqlbase.ColumnType {
ret := make([]sqlbase.ColumnType, numCols)
for i := 0; i < numCols; i++ {
ret[i] = intType
}
return ret
}

func intEncDatum(i int) sqlbase.EncDatum {
return sqlbase.EncDatum{Datum: tree.NewDInt(tree.DInt(i))}
}
Expand Down

0 comments on commit 56ee335

Please sign in to comment.