Skip to content

Commit

Permalink
colexec: fix repeatable batch source
Browse files Browse the repository at this point in the history
RepeatableBatchSource is a utility operator that returns the batch it's
initialized with forever. Previously it would only store the length and
the selection vector of the input batch separately, but this is not
sufficient - downstream operators are allowed to modify the batch which
is output by the repeatable batch source in any way, including
overwriting the actual data. Now this is fixed by storing the contents
of the input batch and populating a separate output batch on every call
to Next.

Release note: None
  • Loading branch information
yuzefovich committed Feb 5, 2020
1 parent c88434b commit a2c697e
Show file tree
Hide file tree
Showing 23 changed files with 139 additions and 97 deletions.
2 changes: 1 addition & 1 deletion pkg/sql/colcontainer/diskqueue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ func BenchmarkDiskQueue(b *testing.B) {
rng, _ := randutil.NewPseudoRand()
typs := []coltypes.T{coltypes.Int64}
batch := colexec.RandomBatch(testAllocator, rng, typs, int(coldata.BatchSize()), 0, 0)
op := colexec.NewRepeatableBatchSource(batch)
op := colexec.NewRepeatableBatchSource(testAllocator, batch)
ctx := context.Background()
for i := 0; i < b.N; i++ {
op.ResetBatchesToReturn(numBatches)
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/colexec/and_or_projection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ func benchmarkLogicalProjOp(
sel[i] = uint16(i)
}
}
input := NewRepeatableBatchSource(batch)
input := NewRepeatableBatchSource(testAllocator, batch)

spec := &execinfrapb.ProcessorSpec{
Input: []execinfrapb.InputSyncSpec{{ColumnTypes: []types.T{*types.Bool, *types.Bool}}},
Expand Down
15 changes: 7 additions & 8 deletions pkg/sql/colexec/builtin_funcs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ func benchmarkBuiltinFunctions(b *testing.B, useSelectionVector bool, hasNulls b
}
}

source := NewRepeatableBatchSource(batch)
source := NewRepeatableBatchSource(testAllocator, batch)
source.Init()

expr, err := parser.ParseExpr("abs(@1)")
Expand Down Expand Up @@ -175,18 +175,17 @@ func BenchmarkCompareSpecializedOperators(b *testing.B) {
ctx := context.Background()
tctx := tree.NewTestingEvalContext(cluster.MakeTestingClusterSettings())

batch := testAllocator.NewMemBatch([]coltypes.T{coltypes.Bytes, coltypes.Int64, coltypes.Int64, coltypes.Bytes})
batch := testAllocator.NewMemBatch([]coltypes.T{coltypes.Bytes, coltypes.Int64, coltypes.Int64})
bCol := batch.ColVec(0).Bytes()
sCol := batch.ColVec(1).Int64()
eCol := batch.ColVec(2).Int64()
outCol := batch.ColVec(3).Bytes()
for i := 0; i < int(coldata.BatchSize()); i++ {
bCol.Set(i, []byte("hello there"))
sCol[i] = 1
eCol[i] = 4
}
batch.SetLength(coldata.BatchSize())
source := NewRepeatableBatchSource(batch)
source := NewRepeatableBatchSource(testAllocator, batch)
source.Init()

// Set up the default operator.
Expand Down Expand Up @@ -229,21 +228,21 @@ func BenchmarkCompareSpecializedOperators(b *testing.B) {
b.SetBytes(int64(len("hello there") * int(coldata.BatchSize())))
b.ResetTimer()
for i := 0; i < b.N; i++ {
defaultOp.Next(ctx)
b := defaultOp.Next(ctx)
// Due to the flat byte updates, we have to reset the output
// bytes col after each next call.
outCol.Reset()
b.ColVec(3).Bytes().Reset()
}
})

b.Run("SpecializedSubstringOperator", func(b *testing.B) {
b.SetBytes(int64(len("hello there") * int(coldata.BatchSize())))
b.ResetTimer()
for i := 0; i < b.N; i++ {
specOp.Next(ctx)
b := specOp.Next(ctx)
// Due to the flat byte updates, we have to reset the output
// bytes col after each next call.
outCol.Reset()
b.ColVec(3).Bytes().Reset()
}
})
}
2 changes: 1 addition & 1 deletion pkg/sql/colexec/cancel_checker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func TestCancelChecker(t *testing.T) {
defer leaktest.AfterTest(t)()
ctx, cancel := context.WithCancel(context.Background())
batch := testAllocator.NewMemBatch([]coltypes.T{coltypes.Int64})
op := NewCancelChecker(NewNoop(NewRepeatableBatchSource(batch)))
op := NewCancelChecker(NewNoop(NewRepeatableBatchSource(testAllocator, batch)))
cancel()
err := execerror.CatchVectorizedRuntimeError(func() {
op.Next(ctx)
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/colexec/deselector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func BenchmarkDeselector(b *testing.B) {
batch.SetSelection(true)
copy(batch.Selection(), sel)
batch.SetLength(batchLen)
input := NewRepeatableBatchSource(batch)
input := NewRepeatableBatchSource(testAllocator, batch)
op := NewDeselectorOp(testAllocator, input, inputTypes)
op.Init()
b.ResetTimer()
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/colexec/distinct_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ func BenchmarkSortedDistinct(b *testing.B) {
bCol[i] = lastB
}
batch.SetLength(coldata.BatchSize())
source := NewRepeatableBatchSource(batch)
source := NewRepeatableBatchSource(testAllocator, batch)
source.Init()

distinct, err := NewOrderedDistinct(source, []uint32{1, 2}, []coltypes.T{coltypes.Int64, coltypes.Int64, coltypes.Int64})
Expand Down
82 changes: 41 additions & 41 deletions pkg/sql/colexec/external_hash_joiner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,14 +93,12 @@ func BenchmarkExternalHashJoiner(b *testing.B) {
}

batch := testAllocator.NewMemBatch(sourceTypes)

for colIdx := 0; colIdx < nCols; colIdx++ {
col := batch.ColVec(colIdx).Int64()
for i := 0; i < int(coldata.BatchSize()); i++ {
col[i] = int64(i)
}
}

batch.SetLength(coldata.BatchSize())

var (
Expand All @@ -119,48 +117,50 @@ func BenchmarkExternalHashJoiner(b *testing.B) {
vec.Nulls().UnsetNulls()
}
}
leftSource := newFiniteBatchSource(batch, 0)
rightSource := newFiniteBatchSource(batch, 0)
for _, fullOuter := range []bool{false, true} {
for _, nBatches := range []int{1 << 2, 1 << 7} {
for _, memoryLimit := range []int64{0, 1} {
flowCtx.Cfg.TestingKnobs.MemoryLimitBytes = memoryLimit
shouldSpill := memoryLimit > 0
name := fmt.Sprintf(
"nulls=%t/fullOuter=%t/batches=%d/spilled=%t",
hasNulls, fullOuter, nBatches, shouldSpill)
joinType := sqlbase.JoinType_INNER
if fullOuter {
joinType = sqlbase.JoinType_FULL_OUTER
}
spec := createSpecForHashJoiner(joinTestCase{
joinType: joinType,
leftTypes: sourceTypes,
leftOutCols: []uint32{0, 1},
leftEqCols: []uint32{0, 2},
rightTypes: sourceTypes,
rightOutCols: []uint32{2, 3},
rightEqCols: []uint32{0, 1},
})
b.Run(name, func(b *testing.B) {
// 8 (bytes / int64) * nBatches (number of batches) * col.BatchSize() (rows /
// batch) * nCols (number of columns / row) * 2 (number of sources).
b.SetBytes(int64(8 * nBatches * int(coldata.BatchSize()) * nCols * 2))
b.ResetTimer()
for i := 0; i < b.N; i++ {
leftSource := newFiniteBatchSource(batch, nBatches)
rightSource := newFiniteBatchSource(batch, nBatches)
hj, accounts, monitors, err := createDiskBackedHashJoiner(
ctx, flowCtx, spec, []Operator{leftSource, rightSource}, func() {},
)
memAccounts = append(memAccounts, accounts...)
memMonitors = append(memMonitors, monitors...)
require.NoError(b, err)
hj.Init()
for b := hj.Next(ctx); b.Length() > 0; b = hj.Next(ctx) {
}
}
})
for _, nBatches := range []int{1 << 2, 1 << 7} {
for _, memoryLimit := range []int64{0, 1} {
flowCtx.Cfg.TestingKnobs.MemoryLimitBytes = memoryLimit
shouldSpill := memoryLimit > 0
name := fmt.Sprintf(
"nulls=%t/fullOuter=%t/batches=%d/spilled=%t",
hasNulls, fullOuter, nBatches, shouldSpill)
joinType := sqlbase.JoinType_INNER
if fullOuter {
joinType = sqlbase.JoinType_FULL_OUTER
}
spec := createSpecForHashJoiner(joinTestCase{
joinType: joinType,
leftTypes: sourceTypes,
leftOutCols: []uint32{0, 1},
leftEqCols: []uint32{0, 2},
rightTypes: sourceTypes,
rightOutCols: []uint32{2, 3},
rightEqCols: []uint32{0, 1},
})
b.Run(name, func(b *testing.B) {
// 8 (bytes / int64) * nBatches (number of batches) * col.BatchSize() (rows /
// batch) * nCols (number of columns / row) * 2 (number of sources).
b.SetBytes(int64(8 * nBatches * int(coldata.BatchSize()) * nCols * 2))
b.ResetTimer()
for i := 0; i < b.N; i++ {
leftSource.reset(nBatches)
rightSource.reset(nBatches)
hj, accounts, monitors, err := createDiskBackedHashJoiner(
ctx, flowCtx, spec, []Operator{leftSource, rightSource}, func() {},
)
memAccounts = append(memAccounts, accounts...)
memMonitors = append(memMonitors, monitors...)
require.NoError(b, err)
hj.Init()
for b := hj.Next(ctx); b.Length() > 0; b = hj.Next(ctx) {
}
}
})
}
}
}
}
for _, memAccount := range memAccounts {
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/colexec/hashjoiner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1022,7 +1022,7 @@ func BenchmarkHashJoiner(b *testing.B) {
b.SetBytes(int64(8 * nBatches * int(coldata.BatchSize()) * nCols * 2))
b.ResetTimer()
for i := 0; i < b.N; i++ {
leftSource := NewRepeatableBatchSource(batch)
leftSource := NewRepeatableBatchSource(testAllocator, batch)
rightSource := newFiniteBatchSource(batch, nBatches)
joinType := sqlbase.JoinType_INNER
if fullOuter {
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/colexec/like_ops_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ func BenchmarkLikeOps(b *testing.B) {
}

batch.SetLength(coldata.BatchSize())
source := NewRepeatableBatchSource(batch)
source := NewRepeatableBatchSource(testAllocator, batch)
source.Init()

base := selConstOpBase{
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/colexec/offset_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func BenchmarkOffset(b *testing.B) {
ctx := context.Background()
batch := testAllocator.NewMemBatch([]coltypes.T{coltypes.Int64, coltypes.Int64, coltypes.Int64})
batch.SetLength(coldata.BatchSize())
source := NewRepeatableBatchSource(batch)
source := NewRepeatableBatchSource(testAllocator, batch)
source.Init()

o := NewOffsetOp(source, 1)
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/colexec/orderedsynchronizer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ func BenchmarkOrderedSynchronizer(b *testing.B) {

inputs := make([]Operator, len(batches))
for i := range batches {
inputs[i] = NewRepeatableBatchSource(batches[i])
inputs[i] = NewRepeatableBatchSource(testAllocator, batches[i])
}

op := OrderedSynchronizer{
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/colexec/ordinality_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func BenchmarkOrdinality(b *testing.B) {

batch := testAllocator.NewMemBatch([]coltypes.T{coltypes.Int64, coltypes.Int64, coltypes.Int64})
batch.SetLength(coldata.BatchSize())
source := NewRepeatableBatchSource(batch)
source := NewRepeatableBatchSource(testAllocator, batch)
ordinality := NewOrdinalityOp(testAllocator, source, batch.Width())
ordinality.Init()

Expand Down
7 changes: 5 additions & 2 deletions pkg/sql/colexec/parallel_unordered_synchronizer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,10 @@ func TestParallelUnorderedSynchronizer(t *testing.T) {

inputs := make([]Operator, numInputs)
for i := range inputs {
source := NewRepeatableBatchSource(RandomBatch(testAllocator, rng, typs, int(coldata.BatchSize()), 0 /* length */, rng.Float64()))
source := NewRepeatableBatchSource(
testAllocator,
RandomBatch(testAllocator, rng, typs, int(coldata.BatchSize()), 0 /* length */, rng.Float64()),
)
source.ResetBatchesToReturn(numBatches)
inputs[i] = source
}
Expand Down Expand Up @@ -132,7 +135,7 @@ func BenchmarkParallelUnorderedSynchronizer(b *testing.B) {
for i := range inputs {
batch := testAllocator.NewMemBatchWithSize(typs, int(coldata.BatchSize()))
batch.SetLength(coldata.BatchSize())
inputs[i] = NewRepeatableBatchSource(batch)
inputs[i] = NewRepeatableBatchSource(testAllocator, batch)
}
var wg sync.WaitGroup
ctx, cancelFn := context.WithCancel(context.Background())
Expand Down
4 changes: 2 additions & 2 deletions pkg/sql/colexec/projection_ops_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ func benchmarkProjPlusInt64Int64ConstOp(b *testing.B, useSelectionVector bool, h
sel[i] = uint16(i)
}
}
source := NewRepeatableBatchSource(batch)
source := NewRepeatableBatchSource(testAllocator, batch)
source.Init()

plusOp := &projPlusInt64Int64ConstOp{
Expand Down Expand Up @@ -336,7 +336,7 @@ func benchmarkProjOp(
sel[i] = i
}
}
source := NewRepeatableBatchSource(batch)
source := NewRepeatableBatchSource(testAllocator, batch)
source.Init()

op := makeProjOp(source, intType)
Expand Down
4 changes: 2 additions & 2 deletions pkg/sql/colexec/routers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -536,7 +536,7 @@ func TestHashRouterCancellation(t *testing.T) {
// Never-ending input of 0s.
batch := testAllocator.NewMemBatch([]coltypes.T{coltypes.Int64})
batch.SetLength(coldata.BatchSize())
in := NewRepeatableBatchSource(batch)
in := NewRepeatableBatchSource(testAllocator, batch)

unbufferedCh := make(chan struct{})
r := newHashRouterWithOutputs(in, []coltypes.T{coltypes.Int64}, []uint32{0}, unbufferedCh, outputs)
Expand Down Expand Up @@ -799,7 +799,7 @@ func BenchmarkHashRouter(b *testing.B) {
// numbers.
batch := testAllocator.NewMemBatch(types)
batch.SetLength(coldata.BatchSize())
input := NewRepeatableBatchSource(batch)
input := NewRepeatableBatchSource(testAllocator, batch)

var wg sync.WaitGroup
for _, numOutputs := range []int{2, 4, 8, 16} {
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/colexec/select_in_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func benchmarkSelectInInt64(b *testing.B, useSelectionVector bool, hasNulls bool
}
}

source := NewRepeatableBatchSource(batch)
source := NewRepeatableBatchSource(testAllocator, batch)
source.Init()
inOp := &selectInOpInt64{
OneInputNode: NewOneInputNode(source),
Expand Down
4 changes: 2 additions & 2 deletions pkg/sql/colexec/selection_ops_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ func benchmarkSelLTInt64Int64ConstOp(b *testing.B, useSelectionVector bool, hasN
sel[i] = i
}
}
source := NewRepeatableBatchSource(batch)
source := NewRepeatableBatchSource(testAllocator, batch)
source.Init()

plusOp := &selLTInt64Int64ConstOp{
Expand Down Expand Up @@ -222,7 +222,7 @@ func benchmarkSelLTInt64Int64Op(b *testing.B, useSelectionVector bool, hasNulls
sel[i] = i
}
}
source := NewRepeatableBatchSource(batch)
source := NewRepeatableBatchSource(testAllocator, batch)
source.Init()

plusOp := &selLTInt64Int64Op{
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/colexec/serial_unordered_synchronizer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func TestSerialUnorderedSynchronizer(t *testing.T) {
inputs := make([]Operator, numInputs)
for i := range inputs {
batch := RandomBatch(testAllocator, rng, typs, int(coldata.BatchSize()), 0 /* length */, rng.Float64())
source := NewRepeatableBatchSource(batch)
source := NewRepeatableBatchSource(testAllocator, batch)
source.ResetBatchesToReturn(numBatches)
inputs[i] = source
}
Expand Down
Loading

0 comments on commit a2c697e

Please sign in to comment.