From a2c697e8b22627a9ad6de03222c7d23ab0411ce9 Mon Sep 17 00:00:00 2001 From: Yahor Yuzefovich Date: Fri, 31 Jan 2020 10:11:00 -0800 Subject: [PATCH] colexec: fix repeatable batch source RepeatableBatchSource is a utility operator that returns the batch it's initialized with forever. Previously it would only store the length and the selection vector of the input batch separately, but this is not sufficient - downstream operators are allowed to modify the batch which is output by the repeatable batch source in any way, including overwriting the actual data. Now this is fixed by storing the contents of the input batch and populating a separate output batch on every call to Next. Release note: None --- pkg/sql/colcontainer/diskqueue_test.go | 2 +- pkg/sql/colexec/and_or_projection_test.go | 2 +- pkg/sql/colexec/builtin_funcs_test.go | 15 ++-- pkg/sql/colexec/cancel_checker_test.go | 2 +- pkg/sql/colexec/deselector_test.go | 2 +- pkg/sql/colexec/distinct_test.go | 2 +- pkg/sql/colexec/external_hash_joiner_test.go | 82 +++++++++---------- pkg/sql/colexec/hashjoiner_test.go | 2 +- pkg/sql/colexec/like_ops_test.go | 2 +- pkg/sql/colexec/offset_test.go | 2 +- pkg/sql/colexec/orderedsynchronizer_test.go | 2 +- pkg/sql/colexec/ordinality_test.go | 2 +- .../parallel_unordered_synchronizer_test.go | 7 +- pkg/sql/colexec/projection_ops_test.go | 4 +- pkg/sql/colexec/routers_test.go | 4 +- pkg/sql/colexec/select_in_test.go | 2 +- pkg/sql/colexec/selection_ops_test.go | 4 +- .../serial_unordered_synchronizer_test.go | 2 +- pkg/sql/colexec/testutils.go | 73 ++++++++++++----- pkg/sql/colexec/utils_test.go | 12 ++- pkg/sql/colflow/colrpc/colrpc_test.go | 2 +- .../colflow/vectorized_flow_shutdown_test.go | 5 +- pkg/sql/colflow/vectorized_flow_space_test.go | 4 +- 23 files changed, 139 insertions(+), 97 deletions(-) diff --git a/pkg/sql/colcontainer/diskqueue_test.go b/pkg/sql/colcontainer/diskqueue_test.go index 589aaad72e5c..c4cc0a9b66a2 100644 --- a/pkg/sql/colcontainer/diskqueue_test.go +++ b/pkg/sql/colcontainer/diskqueue_test.go @@ -154,7 +154,7 @@ func BenchmarkDiskQueue(b *testing.B) { rng, _ := randutil.NewPseudoRand() typs := []coltypes.T{coltypes.Int64} batch := colexec.RandomBatch(testAllocator, rng, typs, int(coldata.BatchSize()), 0, 0) - op := colexec.NewRepeatableBatchSource(batch) + op := colexec.NewRepeatableBatchSource(testAllocator, batch) ctx := context.Background() for i := 0; i < b.N; i++ { op.ResetBatchesToReturn(numBatches) diff --git a/pkg/sql/colexec/and_or_projection_test.go b/pkg/sql/colexec/and_or_projection_test.go index 3d16b94e82bd..122a191d9160 100644 --- a/pkg/sql/colexec/and_or_projection_test.go +++ b/pkg/sql/colexec/and_or_projection_test.go @@ -266,7 +266,7 @@ func benchmarkLogicalProjOp( sel[i] = uint16(i) } } - input := NewRepeatableBatchSource(batch) + input := NewRepeatableBatchSource(testAllocator, batch) spec := &execinfrapb.ProcessorSpec{ Input: []execinfrapb.InputSyncSpec{{ColumnTypes: []types.T{*types.Bool, *types.Bool}}}, diff --git a/pkg/sql/colexec/builtin_funcs_test.go b/pkg/sql/colexec/builtin_funcs_test.go index a2103668586a..744f36162d89 100644 --- a/pkg/sql/colexec/builtin_funcs_test.go +++ b/pkg/sql/colexec/builtin_funcs_test.go @@ -136,7 +136,7 @@ func benchmarkBuiltinFunctions(b *testing.B, useSelectionVector bool, hasNulls b } } - source := NewRepeatableBatchSource(batch) + source := NewRepeatableBatchSource(testAllocator, batch) source.Init() expr, err := parser.ParseExpr("abs(@1)") @@ -175,18 +175,17 @@ func BenchmarkCompareSpecializedOperators(b *testing.B) { ctx := context.Background() tctx := tree.NewTestingEvalContext(cluster.MakeTestingClusterSettings()) - batch := testAllocator.NewMemBatch([]coltypes.T{coltypes.Bytes, coltypes.Int64, coltypes.Int64, coltypes.Bytes}) + batch := testAllocator.NewMemBatch([]coltypes.T{coltypes.Bytes, coltypes.Int64, coltypes.Int64}) bCol := batch.ColVec(0).Bytes() sCol := batch.ColVec(1).Int64() eCol := batch.ColVec(2).Int64() - outCol := batch.ColVec(3).Bytes() for i := 0; i < int(coldata.BatchSize()); i++ { bCol.Set(i, []byte("hello there")) sCol[i] = 1 eCol[i] = 4 } batch.SetLength(coldata.BatchSize()) - source := NewRepeatableBatchSource(batch) + source := NewRepeatableBatchSource(testAllocator, batch) source.Init() // Set up the default operator. @@ -229,10 +228,10 @@ func BenchmarkCompareSpecializedOperators(b *testing.B) { b.SetBytes(int64(len("hello there") * int(coldata.BatchSize()))) b.ResetTimer() for i := 0; i < b.N; i++ { - defaultOp.Next(ctx) + b := defaultOp.Next(ctx) // Due to the flat byte updates, we have to reset the output // bytes col after each next call. - outCol.Reset() + b.ColVec(3).Bytes().Reset() } }) @@ -240,10 +239,10 @@ func BenchmarkCompareSpecializedOperators(b *testing.B) { b.SetBytes(int64(len("hello there") * int(coldata.BatchSize()))) b.ResetTimer() for i := 0; i < b.N; i++ { - specOp.Next(ctx) + b := specOp.Next(ctx) // Due to the flat byte updates, we have to reset the output // bytes col after each next call. - outCol.Reset() + b.ColVec(3).Bytes().Reset() } }) } diff --git a/pkg/sql/colexec/cancel_checker_test.go b/pkg/sql/colexec/cancel_checker_test.go index 256b4508ca94..78815c1bbc56 100644 --- a/pkg/sql/colexec/cancel_checker_test.go +++ b/pkg/sql/colexec/cancel_checker_test.go @@ -27,7 +27,7 @@ func TestCancelChecker(t *testing.T) { defer leaktest.AfterTest(t)() ctx, cancel := context.WithCancel(context.Background()) batch := testAllocator.NewMemBatch([]coltypes.T{coltypes.Int64}) - op := NewCancelChecker(NewNoop(NewRepeatableBatchSource(batch))) + op := NewCancelChecker(NewNoop(NewRepeatableBatchSource(testAllocator, batch))) cancel() err := execerror.CatchVectorizedRuntimeError(func() { op.Next(ctx) diff --git a/pkg/sql/colexec/deselector_test.go b/pkg/sql/colexec/deselector_test.go index 96924c1652bb..5be5e266c3ad 100644 --- a/pkg/sql/colexec/deselector_test.go +++ b/pkg/sql/colexec/deselector_test.go @@ -103,7 +103,7 @@ func BenchmarkDeselector(b *testing.B) { batch.SetSelection(true) copy(batch.Selection(), sel) batch.SetLength(batchLen) - input := NewRepeatableBatchSource(batch) + input := NewRepeatableBatchSource(testAllocator, batch) op := NewDeselectorOp(testAllocator, input, inputTypes) op.Init() b.ResetTimer() diff --git a/pkg/sql/colexec/distinct_test.go b/pkg/sql/colexec/distinct_test.go index 4b2e6d10ea36..4a623a2020d9 100644 --- a/pkg/sql/colexec/distinct_test.go +++ b/pkg/sql/colexec/distinct_test.go @@ -133,7 +133,7 @@ func BenchmarkSortedDistinct(b *testing.B) { bCol[i] = lastB } batch.SetLength(coldata.BatchSize()) - source := NewRepeatableBatchSource(batch) + source := NewRepeatableBatchSource(testAllocator, batch) source.Init() distinct, err := NewOrderedDistinct(source, []uint32{1, 2}, []coltypes.T{coltypes.Int64, coltypes.Int64, coltypes.Int64}) diff --git a/pkg/sql/colexec/external_hash_joiner_test.go b/pkg/sql/colexec/external_hash_joiner_test.go index bb20752f5f7b..27a8c198b35d 100644 --- a/pkg/sql/colexec/external_hash_joiner_test.go +++ b/pkg/sql/colexec/external_hash_joiner_test.go @@ -93,14 +93,12 @@ func BenchmarkExternalHashJoiner(b *testing.B) { } batch := testAllocator.NewMemBatch(sourceTypes) - for colIdx := 0; colIdx < nCols; colIdx++ { col := batch.ColVec(colIdx).Int64() for i := 0; i < int(coldata.BatchSize()); i++ { col[i] = int64(i) } } - batch.SetLength(coldata.BatchSize()) var ( @@ -119,48 +117,50 @@ func BenchmarkExternalHashJoiner(b *testing.B) { vec.Nulls().UnsetNulls() } } + leftSource := newFiniteBatchSource(batch, 0) + rightSource := newFiniteBatchSource(batch, 0) for _, fullOuter := range []bool{false, true} { - for _, nBatches := range []int{1 << 2, 1 << 7} { - for _, memoryLimit := range []int64{0, 1} { - flowCtx.Cfg.TestingKnobs.MemoryLimitBytes = memoryLimit - shouldSpill := memoryLimit > 0 - name := fmt.Sprintf( - "nulls=%t/fullOuter=%t/batches=%d/spilled=%t", - hasNulls, fullOuter, nBatches, shouldSpill) - joinType := sqlbase.JoinType_INNER - if fullOuter { - joinType = sqlbase.JoinType_FULL_OUTER - } - spec := createSpecForHashJoiner(joinTestCase{ - joinType: joinType, - leftTypes: sourceTypes, - leftOutCols: []uint32{0, 1}, - leftEqCols: []uint32{0, 2}, - rightTypes: sourceTypes, - rightOutCols: []uint32{2, 3}, - rightEqCols: []uint32{0, 1}, - }) - b.Run(name, func(b *testing.B) { - // 8 (bytes / int64) * nBatches (number of batches) * col.BatchSize() (rows / - // batch) * nCols (number of columns / row) * 2 (number of sources). - b.SetBytes(int64(8 * nBatches * int(coldata.BatchSize()) * nCols * 2)) - b.ResetTimer() - for i := 0; i < b.N; i++ { - leftSource := newFiniteBatchSource(batch, nBatches) - rightSource := newFiniteBatchSource(batch, nBatches) - hj, accounts, monitors, err := createDiskBackedHashJoiner( - ctx, flowCtx, spec, []Operator{leftSource, rightSource}, func() {}, - ) - memAccounts = append(memAccounts, accounts...) - memMonitors = append(memMonitors, monitors...) - require.NoError(b, err) - hj.Init() - for b := hj.Next(ctx); b.Length() > 0; b = hj.Next(ctx) { - } - } - }) + for _, nBatches := range []int{1 << 2, 1 << 7} { + for _, memoryLimit := range []int64{0, 1} { + flowCtx.Cfg.TestingKnobs.MemoryLimitBytes = memoryLimit + shouldSpill := memoryLimit > 0 + name := fmt.Sprintf( + "nulls=%t/fullOuter=%t/batches=%d/spilled=%t", + hasNulls, fullOuter, nBatches, shouldSpill) + joinType := sqlbase.JoinType_INNER + if fullOuter { + joinType = sqlbase.JoinType_FULL_OUTER } + spec := createSpecForHashJoiner(joinTestCase{ + joinType: joinType, + leftTypes: sourceTypes, + leftOutCols: []uint32{0, 1}, + leftEqCols: []uint32{0, 2}, + rightTypes: sourceTypes, + rightOutCols: []uint32{2, 3}, + rightEqCols: []uint32{0, 1}, + }) + b.Run(name, func(b *testing.B) { + // 8 (bytes / int64) * nBatches (number of batches) * col.BatchSize() (rows / + // batch) * nCols (number of columns / row) * 2 (number of sources). + b.SetBytes(int64(8 * nBatches * int(coldata.BatchSize()) * nCols * 2)) + b.ResetTimer() + for i := 0; i < b.N; i++ { + leftSource.reset(nBatches) + rightSource.reset(nBatches) + hj, accounts, monitors, err := createDiskBackedHashJoiner( + ctx, flowCtx, spec, []Operator{leftSource, rightSource}, func() {}, + ) + memAccounts = append(memAccounts, accounts...) + memMonitors = append(memMonitors, monitors...) + require.NoError(b, err) + hj.Init() + for b := hj.Next(ctx); b.Length() > 0; b = hj.Next(ctx) { + } + } + }) } + } } } for _, memAccount := range memAccounts { diff --git a/pkg/sql/colexec/hashjoiner_test.go b/pkg/sql/colexec/hashjoiner_test.go index ab96cb1d8309..ceed34491a0b 100644 --- a/pkg/sql/colexec/hashjoiner_test.go +++ b/pkg/sql/colexec/hashjoiner_test.go @@ -1022,7 +1022,7 @@ func BenchmarkHashJoiner(b *testing.B) { b.SetBytes(int64(8 * nBatches * int(coldata.BatchSize()) * nCols * 2)) b.ResetTimer() for i := 0; i < b.N; i++ { - leftSource := NewRepeatableBatchSource(batch) + leftSource := NewRepeatableBatchSource(testAllocator, batch) rightSource := newFiniteBatchSource(batch, nBatches) joinType := sqlbase.JoinType_INNER if fullOuter { diff --git a/pkg/sql/colexec/like_ops_test.go b/pkg/sql/colexec/like_ops_test.go index 55009415e88b..f7c448a1d2b6 100644 --- a/pkg/sql/colexec/like_ops_test.go +++ b/pkg/sql/colexec/like_ops_test.go @@ -118,7 +118,7 @@ func BenchmarkLikeOps(b *testing.B) { } batch.SetLength(coldata.BatchSize()) - source := NewRepeatableBatchSource(batch) + source := NewRepeatableBatchSource(testAllocator, batch) source.Init() base := selConstOpBase{ diff --git a/pkg/sql/colexec/offset_test.go b/pkg/sql/colexec/offset_test.go index 1e523b6124fa..3c5377cda3f0 100644 --- a/pkg/sql/colexec/offset_test.go +++ b/pkg/sql/colexec/offset_test.go @@ -66,7 +66,7 @@ func BenchmarkOffset(b *testing.B) { ctx := context.Background() batch := testAllocator.NewMemBatch([]coltypes.T{coltypes.Int64, coltypes.Int64, coltypes.Int64}) batch.SetLength(coldata.BatchSize()) - source := NewRepeatableBatchSource(batch) + source := NewRepeatableBatchSource(testAllocator, batch) source.Init() o := NewOffsetOp(source, 1) diff --git a/pkg/sql/colexec/orderedsynchronizer_test.go b/pkg/sql/colexec/orderedsynchronizer_test.go index c4c772a128df..cfef4bedff73 100644 --- a/pkg/sql/colexec/orderedsynchronizer_test.go +++ b/pkg/sql/colexec/orderedsynchronizer_test.go @@ -220,7 +220,7 @@ func BenchmarkOrderedSynchronizer(b *testing.B) { inputs := make([]Operator, len(batches)) for i := range batches { - inputs[i] = NewRepeatableBatchSource(batches[i]) + inputs[i] = NewRepeatableBatchSource(testAllocator, batches[i]) } op := OrderedSynchronizer{ diff --git a/pkg/sql/colexec/ordinality_test.go b/pkg/sql/colexec/ordinality_test.go index 40a8af888cdb..8e4289d9ebe0 100644 --- a/pkg/sql/colexec/ordinality_test.go +++ b/pkg/sql/colexec/ordinality_test.go @@ -56,7 +56,7 @@ func BenchmarkOrdinality(b *testing.B) { batch := testAllocator.NewMemBatch([]coltypes.T{coltypes.Int64, coltypes.Int64, coltypes.Int64}) batch.SetLength(coldata.BatchSize()) - source := NewRepeatableBatchSource(batch) + source := NewRepeatableBatchSource(testAllocator, batch) ordinality := NewOrdinalityOp(testAllocator, source, batch.Width()) ordinality.Init() diff --git a/pkg/sql/colexec/parallel_unordered_synchronizer_test.go b/pkg/sql/colexec/parallel_unordered_synchronizer_test.go index d323a43083ff..008e745020a8 100644 --- a/pkg/sql/colexec/parallel_unordered_synchronizer_test.go +++ b/pkg/sql/colexec/parallel_unordered_synchronizer_test.go @@ -43,7 +43,10 @@ func TestParallelUnorderedSynchronizer(t *testing.T) { inputs := make([]Operator, numInputs) for i := range inputs { - source := NewRepeatableBatchSource(RandomBatch(testAllocator, rng, typs, int(coldata.BatchSize()), 0 /* length */, rng.Float64())) + source := NewRepeatableBatchSource( + testAllocator, + RandomBatch(testAllocator, rng, typs, int(coldata.BatchSize()), 0 /* length */, rng.Float64()), + ) source.ResetBatchesToReturn(numBatches) inputs[i] = source } @@ -132,7 +135,7 @@ func BenchmarkParallelUnorderedSynchronizer(b *testing.B) { for i := range inputs { batch := testAllocator.NewMemBatchWithSize(typs, int(coldata.BatchSize())) batch.SetLength(coldata.BatchSize()) - inputs[i] = NewRepeatableBatchSource(batch) + inputs[i] = NewRepeatableBatchSource(testAllocator, batch) } var wg sync.WaitGroup ctx, cancelFn := context.WithCancel(context.Background()) diff --git a/pkg/sql/colexec/projection_ops_test.go b/pkg/sql/colexec/projection_ops_test.go index 2f7937a42c27..3147c134d6c0 100644 --- a/pkg/sql/colexec/projection_ops_test.go +++ b/pkg/sql/colexec/projection_ops_test.go @@ -102,7 +102,7 @@ func benchmarkProjPlusInt64Int64ConstOp(b *testing.B, useSelectionVector bool, h sel[i] = uint16(i) } } - source := NewRepeatableBatchSource(batch) + source := NewRepeatableBatchSource(testAllocator, batch) source.Init() plusOp := &projPlusInt64Int64ConstOp{ @@ -336,7 +336,7 @@ func benchmarkProjOp( sel[i] = i } } - source := NewRepeatableBatchSource(batch) + source := NewRepeatableBatchSource(testAllocator, batch) source.Init() op := makeProjOp(source, intType) diff --git a/pkg/sql/colexec/routers_test.go b/pkg/sql/colexec/routers_test.go index c175eeb30c40..7e298230c641 100644 --- a/pkg/sql/colexec/routers_test.go +++ b/pkg/sql/colexec/routers_test.go @@ -536,7 +536,7 @@ func TestHashRouterCancellation(t *testing.T) { // Never-ending input of 0s. batch := testAllocator.NewMemBatch([]coltypes.T{coltypes.Int64}) batch.SetLength(coldata.BatchSize()) - in := NewRepeatableBatchSource(batch) + in := NewRepeatableBatchSource(testAllocator, batch) unbufferedCh := make(chan struct{}) r := newHashRouterWithOutputs(in, []coltypes.T{coltypes.Int64}, []uint32{0}, unbufferedCh, outputs) @@ -799,7 +799,7 @@ func BenchmarkHashRouter(b *testing.B) { // numbers. batch := testAllocator.NewMemBatch(types) batch.SetLength(coldata.BatchSize()) - input := NewRepeatableBatchSource(batch) + input := NewRepeatableBatchSource(testAllocator, batch) var wg sync.WaitGroup for _, numOutputs := range []int{2, 4, 8, 16} { diff --git a/pkg/sql/colexec/select_in_test.go b/pkg/sql/colexec/select_in_test.go index a829c0d59617..bd875f046642 100644 --- a/pkg/sql/colexec/select_in_test.go +++ b/pkg/sql/colexec/select_in_test.go @@ -123,7 +123,7 @@ func benchmarkSelectInInt64(b *testing.B, useSelectionVector bool, hasNulls bool } } - source := NewRepeatableBatchSource(batch) + source := NewRepeatableBatchSource(testAllocator, batch) source.Init() inOp := &selectInOpInt64{ OneInputNode: NewOneInputNode(source), diff --git a/pkg/sql/colexec/selection_ops_test.go b/pkg/sql/colexec/selection_ops_test.go index 73046e2aeab0..a945942cdebb 100644 --- a/pkg/sql/colexec/selection_ops_test.go +++ b/pkg/sql/colexec/selection_ops_test.go @@ -162,7 +162,7 @@ func benchmarkSelLTInt64Int64ConstOp(b *testing.B, useSelectionVector bool, hasN sel[i] = i } } - source := NewRepeatableBatchSource(batch) + source := NewRepeatableBatchSource(testAllocator, batch) source.Init() plusOp := &selLTInt64Int64ConstOp{ @@ -222,7 +222,7 @@ func benchmarkSelLTInt64Int64Op(b *testing.B, useSelectionVector bool, hasNulls sel[i] = i } } - source := NewRepeatableBatchSource(batch) + source := NewRepeatableBatchSource(testAllocator, batch) source.Init() plusOp := &selLTInt64Int64Op{ diff --git a/pkg/sql/colexec/serial_unordered_synchronizer_test.go b/pkg/sql/colexec/serial_unordered_synchronizer_test.go index 5f8a18ff9484..b83ae6ae297e 100644 --- a/pkg/sql/colexec/serial_unordered_synchronizer_test.go +++ b/pkg/sql/colexec/serial_unordered_synchronizer_test.go @@ -33,7 +33,7 @@ func TestSerialUnorderedSynchronizer(t *testing.T) { inputs := make([]Operator, numInputs) for i := range inputs { batch := RandomBatch(testAllocator, rng, typs, int(coldata.BatchSize()), 0 /* length */, rng.Float64()) - source := NewRepeatableBatchSource(batch) + source := NewRepeatableBatchSource(testAllocator, batch) source.ResetBatchesToReturn(numBatches) inputs[i] = source } diff --git a/pkg/sql/colexec/testutils.go b/pkg/sql/colexec/testutils.go index 2220ce4b9dd7..950981061741 100644 --- a/pkg/sql/colexec/testutils.go +++ b/pkg/sql/colexec/testutils.go @@ -14,6 +14,7 @@ import ( "context" "github.com/cockroachdb/cockroach/pkg/col/coldata" + "github.com/cockroachdb/cockroach/pkg/col/coltypes" ) // BatchBuffer exposes a buffer of coldata.Batches through an Operator @@ -50,10 +51,16 @@ func (b *BatchBuffer) Next(context.Context) coldata.Batch { // RepeatableBatchSource is an Operator that returns the same batch forever. type RepeatableBatchSource struct { ZeroInputNode - internalBatch coldata.Batch - batchLen uint16 - // sel specifies the desired selection vector for the batch. - sel []uint16 + + colVecs []coldata.Vec + typs []coltypes.T + sel []uint16 + batchLen uint16 + // numToCopy indicates the number of tuples that needs to be copied. It is + // equal to batchLen when sel is nil and is equal to maxSelIdx+1 when sel is + // non-nil. + numToCopy uint16 + output coldata.Batch batchesToReturn int batchesReturned int @@ -62,34 +69,62 @@ type RepeatableBatchSource struct { var _ Operator = &RepeatableBatchSource{} // NewRepeatableBatchSource returns a new Operator initialized to return its -// input batch forever (including the selection vector if batch comes with it). -func NewRepeatableBatchSource(batch coldata.Batch) *RepeatableBatchSource { - src := &RepeatableBatchSource{ - internalBatch: batch, - batchLen: batch.Length(), +// input batch forever. Note that it stores the contents of the input batch and +// copies them into a separate output batch. The output batch is allowed to be +// modified whereas the input batch is *not*. +func NewRepeatableBatchSource(allocator *Allocator, batch coldata.Batch) *RepeatableBatchSource { + typs := make([]coltypes.T, batch.Width()) + for i, vec := range batch.ColVecs() { + typs[i] = vec.Type() } - if batch.Selection() != nil { - src.sel = make([]uint16, batch.Length()) - copy(src.sel, batch.Selection()) + sel := batch.Selection() + batchLen := batch.Length() + numToCopy := batchLen + if sel != nil { + maxIdx := uint16(0) + for _, selIdx := range sel[:batchLen] { + if selIdx > maxIdx { + maxIdx = selIdx + } + } + numToCopy = maxIdx + 1 + } + output := allocator.NewMemBatchWithSize(typs, int(numToCopy)) + src := &RepeatableBatchSource{ + colVecs: batch.ColVecs(), + typs: typs, + sel: sel, + batchLen: batchLen, + numToCopy: numToCopy, + output: output, } return src } // Next is part of the Operator interface. func (s *RepeatableBatchSource) Next(context.Context) coldata.Batch { - s.internalBatch.SetSelection(s.sel != nil) s.batchesReturned++ if s.batchesToReturn != 0 && s.batchesReturned > s.batchesToReturn { return coldata.ZeroBatch } - s.internalBatch.SetLength(s.batchLen) + s.output.SetSelection(s.sel != nil) if s.sel != nil { - // Since selection vectors are mutable, to make sure that we return the - // batch with the given selection vector, we need to reset - // s.internalBatch.Selection() to s.sel on every iteration. - copy(s.internalBatch.Selection(), s.sel) + copy(s.output.Selection()[:s.batchLen], s.sel[:s.batchLen]) + } + for i, typ := range s.typs { + // This Copy is outside of the allocator since the RepeatableBatchSource is + // a test utility which is often used in the benchmarks, and we want to + // reduce the performance impact of this operator. + s.output.ColVec(i).Copy(coldata.CopySliceArgs{ + SliceArgs: coldata.SliceArgs{ + ColType: typ, + Src: s.colVecs[i], + SrcEndIdx: uint64(s.numToCopy), + }, + }) } - return s.internalBatch + s.output.SetLength(s.batchLen) + return s.output } // Init is part of the Operator interface. diff --git a/pkg/sql/colexec/utils_test.go b/pkg/sql/colexec/utils_test.go index f3564c858ef9..7cb57c92d755 100644 --- a/pkg/sql/colexec/utils_test.go +++ b/pkg/sql/colexec/utils_test.go @@ -942,7 +942,7 @@ var _ Operator = &finiteBatchSource{} // batch a specified number of times. func newFiniteBatchSource(batch coldata.Batch, usableCount int) *finiteBatchSource { return &finiteBatchSource{ - repeatableBatch: NewRepeatableBatchSource(batch), + repeatableBatch: NewRepeatableBatchSource(testAllocator, batch), usableCount: usableCount, } } @@ -959,6 +959,10 @@ func (f *finiteBatchSource) Next(ctx context.Context) coldata.Batch { return coldata.ZeroBatch } +func (f *finiteBatchSource) reset(usableCount int) { + f.usableCount = usableCount +} + // finiteChunksSource is an Operator that returns a batch specified number of // times. The first matchLen columns of the batch are incremented every time // (except for the first) the batch is returned to emulate source that is @@ -976,7 +980,7 @@ var _ Operator = &finiteChunksSource{} func newFiniteChunksSource(batch coldata.Batch, usableCount int, matchLen int) *finiteChunksSource { return &finiteChunksSource{ - repeatableBatch: NewRepeatableBatchSource(batch), + repeatableBatch: NewRepeatableBatchSource(testAllocator, batch), usableCount: usableCount, matchLen: matchLen, } @@ -1038,7 +1042,7 @@ func TestRepeatableBatchSource(t *testing.T) { batch := testAllocator.NewMemBatch([]coltypes.T{coltypes.Int64}) batchLen := uint16(10) batch.SetLength(batchLen) - input := NewRepeatableBatchSource(batch) + input := NewRepeatableBatchSource(testAllocator, batch) b := input.Next(context.Background()) b.SetLength(0) @@ -1066,7 +1070,7 @@ func TestRepeatableBatchSourceWithFixedSel(t *testing.T) { batch.SetLength(batchLen) batch.SetSelection(true) copy(batch.Selection(), sel) - input := NewRepeatableBatchSource(batch) + input := NewRepeatableBatchSource(testAllocator, batch) b := input.Next(context.Background()) b.SetLength(0) diff --git a/pkg/sql/colflow/colrpc/colrpc_test.go b/pkg/sql/colflow/colrpc/colrpc_test.go index 77b02cc748d8..9f6974ec2fdf 100644 --- a/pkg/sql/colflow/colrpc/colrpc_test.go +++ b/pkg/sql/colflow/colrpc/colrpc_test.go @@ -535,7 +535,7 @@ func BenchmarkOutboxInbox(b *testing.B) { batch := testAllocator.NewMemBatch(typs) batch.SetLength(coldata.BatchSize()) - input := colexec.NewRepeatableBatchSource(batch) + input := colexec.NewRepeatableBatchSource(testAllocator, batch) outboxMemAcc := testMemMonitor.MakeBoundAccount() defer outboxMemAcc.Close(ctx) diff --git a/pkg/sql/colflow/vectorized_flow_shutdown_test.go b/pkg/sql/colflow/vectorized_flow_shutdown_test.go index b6417fe19d59..9189090c159d 100644 --- a/pkg/sql/colflow/vectorized_flow_shutdown_test.go +++ b/pkg/sql/colflow/vectorized_flow_shutdown_test.go @@ -234,9 +234,10 @@ func TestVectorizedFlowShutdown(t *testing.T) { } else { sourceMemAccount := testMemMonitor.MakeBoundAccount() defer sourceMemAccount.Close(ctxRemote) - batch := colexec.NewAllocator(ctxRemote, &sourceMemAccount).NewMemBatch(typs) + remoteAllocator := colexec.NewAllocator(ctxRemote, &sourceMemAccount) + batch := remoteAllocator.NewMemBatch(typs) batch.SetLength(coldata.BatchSize()) - runOutboxInbox(ctxRemote, cancelRemote, &outboxMemAccount, colexec.NewRepeatableBatchSource(batch), inboxes[i], streamID, outboxMetadataSources) + runOutboxInbox(ctxRemote, cancelRemote, &outboxMemAccount, colexec.NewRepeatableBatchSource(remoteAllocator, batch), inboxes[i], streamID, outboxMetadataSources) } streamID++ } diff --git a/pkg/sql/colflow/vectorized_flow_space_test.go b/pkg/sql/colflow/vectorized_flow_space_test.go index e455549c4f27..0ab30f4a83d3 100644 --- a/pkg/sql/colflow/vectorized_flow_space_test.go +++ b/pkg/sql/colflow/vectorized_flow_space_test.go @@ -194,9 +194,9 @@ func TestVectorizeAllocatorSpaceError(t *testing.T) { for _, success := range []bool{true, false} { expected := success || tc.spillingSupported t.Run(fmt.Sprintf("%s-success-expected-%t", tc.desc, expected), func(t *testing.T) { - inputs := []colexec.Operator{colexec.NewRepeatableBatchSource(batch)} + inputs := []colexec.Operator{colexec.NewRepeatableBatchSource(testAllocator, batch)} if len(tc.spec.Input) > 1 { - inputs = append(inputs, colexec.NewRepeatableBatchSource(batch)) + inputs = append(inputs, colexec.NewRepeatableBatchSource(testAllocator, batch)) } memMon := mon.MakeMonitor("MemoryMonitor", mon.MemoryResource, nil, nil, 0, math.MaxInt64, st) if success {