Skip to content

Commit

Permalink
Merge #44523
Browse files Browse the repository at this point in the history
44523: colexec: add external hash joiner r=yuzefovich a=yuzefovich

**colexec: clean up hashing utility functions**

We have three hashing utility functions that previously had hashTable as
the receiver. However, this is not necessary since they do not rely on
anything in the hash table. This commit makes them standalone. The only
change needed to support that was introduction of cancelChecker as an
argument to `rehash`. This allowed removing the reference to "not fully
initialized hash table" from the hash router.

`bucketSize` variable has been renamed to `numBuckets` (since that's
what it actually means). This commit also adds support for `numBuckets`
values that are not power of 2 to `finalizeHash` function.

Release note: None

**colexec: miscellaneous cleanups and additions**

This commit removes `Eq` from the name of hash join operator. It also
adds several interface implementation assertions and removes a stale
entry from .gitignore file. A new function is introduced into the hash
table to help with the reuse of some allocations in the follow-up PRs.

Next, this commit removes `hashJoinProber` entirely and moves it's
contents into `hashJoiner` into an embedded struct. The reason for this
change is that some fields were duplicated in both structs and there
would be no clear ownership of who should be resetting the hash table.

`reset` method has been added to the hash table and hash join operator.
`bufferingInMemoryOperator`'s method `ExportBuffered` has been modified
to take an argument to support the case of a buffering operator with
several inputs (hash join, for example). The argument will be used to
distinguish which input to "export the buffered tuples from".

**Important change:** the probing code has been changed to not put
`Vec.Copy` calls through the allocator (i.e. when we're probing and
populating the output batch, `Copy` methods are not accounted for). This
is done for several reasons:
1. we have already built the hash table, so it would be a shame to fall
back to disk-backed algorithm at this point
2. actually falling back in such scenario is non-trivial because we
might have emitted partial matches from the current probing batch
3. the deviation from the memory limit should not be significant because
the output batch has already been allocated, so the only concern is
about variable-width types - if our estimation significantly
underestimates the memory footprint of variable-width type vectors, we
will be actually using more memory than what we accounted for.

At this point I think it is an acceptable compromise, but it should be
addressed before the release.

Release note: None

**colexec: add two input disk spiller**

This commit adds a disk spilling operator for an in-memory operator that
operates on two inputs. Most of the logic is shared with
oneInputDiskSpiller, only execinfra.OpNode methods are slightly
different.

Release note: None

**colexec: factor out some hash utilities**

This commit extracts out the logic of "tuple distribution" out of hash
router into a helper struct which will be reused by the external hash
join. It also moves 3 hashing utility functions out of `hashtable` file
into `hash_utils`.

Additionally, now hash router stores `hashCols` as `[]uint32` since
that's what is used in the protos.

Release note: None

**colexec: add external hash joiner**

This commit adds external hash joiner that implements Grace hash join
algorithm. It is done in two phases:
1. we read the batches from both inputs and partition them according to
the vectorized hash function initialized with 2's (unlike default value
of 1)
2. once we have exhausted both inputs, we proceed to join phase in which
we reuse the in-memory hash joiner to actually perform a join operation
on the tuples from the left and right sides that hashed to the same
partition. Here, the in-memory hash joiner uses the default value of
1 when initializing the hash values, so essentially it uses a different
hash function.

Current limitations:
- we use fixed number of buckets (4096) in the partitioning phase
- we do not support recursive partitioning, i.e. if we have a partition
that will not fit into memory, we will just bail on the query.

Fixes: #43790.

Release note: None

**colexec: fix repeatable batch source**

RepeatableBatchSource is a utility operator that returns the batch it's
initialized with forever. Previously it would only store the length and
the selection vector of the input batch separately, but this is not
sufficient - downstream operators are allowed to modify the batch which
is output by the repeatable batch source in any way, including
overwriting the actual data. Now this is fixed by storing the contents
of the input batch and populating a separate output batch on every call
to Next.

Release note: None

Co-authored-by: Yahor Yuzefovich <yahor@cockroachlabs.com>
  • Loading branch information
craig[bot] and yuzefovich committed Feb 5, 2020
2 parents 3dcca6d + a2c697e commit b798c6a
Show file tree
Hide file tree
Showing 48 changed files with 1,927 additions and 954 deletions.
4 changes: 3 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -807,6 +807,7 @@ EXECGEN_TARGETS = \
pkg/sql/colexec/distinct.eg.go \
pkg/sql/colexec/hashjoiner.eg.go \
pkg/sql/colexec/hashtable.eg.go \
pkg/sql/colexec/hash_utils.eg.go \
pkg/sql/colexec/like_ops.eg.go \
pkg/sql/colexec/mergejoinbase.eg.go \
pkg/sql/colexec/mergejoiner_fullouter.eg.go \
Expand Down Expand Up @@ -1493,12 +1494,14 @@ pkg/col/coldata/vec.eg.go: pkg/col/coldata/vec_tmpl.go
pkg/sql/colexec/and_or_projection.eg.go: pkg/sql/colexec/and_or_projection_tmpl.go
pkg/sql/colexec/any_not_null_agg.eg.go: pkg/sql/colexec/any_not_null_agg_tmpl.go
pkg/sql/colexec/avg_agg.eg.go: pkg/sql/colexec/avg_agg_tmpl.go
pkg/sql/colexec/boolean_agg.eg.go: pkg/sql/colexec/boolean_agg_tmpl.go
pkg/sql/colexec/cast.eg.go: pkg/sql/colexec/cast_tmpl.go
pkg/sql/colexec/const.eg.go: pkg/sql/colexec/const_tmpl.go
pkg/sql/colexec/count_agg.eg.go: pkg/sql/colexec/count_agg_tmpl.go
pkg/sql/colexec/distinct.eg.go: pkg/sql/colexec/distinct_tmpl.go
pkg/sql/colexec/hashjoiner.eg.go: pkg/sql/colexec/hashjoiner_tmpl.go
pkg/sql/colexec/hashtable.eg.go: pkg/sql/colexec/hashtable_tmpl.go
pkg/sql/colexec/hash_utils.eg.go: pkg/sql/colexec/hash_utils_tmpl.go
pkg/sql/colexec/mergejoinbase.eg.go: pkg/sql/colexec/mergejoinbase_tmpl.go
pkg/sql/colexec/mergejoiner_fullouter.eg.go: pkg/sql/colexec/mergejoiner_tmpl.go
pkg/sql/colexec/mergejoiner_inner.eg.go: pkg/sql/colexec/mergejoiner_tmpl.go
Expand All @@ -1521,7 +1524,6 @@ pkg/sql/colexec/sort.eg.go: pkg/sql/colexec/sort_tmpl.go
pkg/sql/colexec/sum_agg.eg.go: pkg/sql/colexec/sum_agg_tmpl.go
pkg/sql/colexec/tuples_differ.eg.go: pkg/sql/colexec/tuples_differ_tmpl.go
pkg/sql/colexec/vec_comparators.eg.go: pkg/sql/colexec/vec_comparators_tmpl.go
pkg/sql/colexec/boolean_agg.eg.go: pkg/sql/colexec/boolean_agg_tmpl.go

$(EXECGEN_TARGETS): bin/execgen
execgen $@
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/colcontainer/diskqueue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ func BenchmarkDiskQueue(b *testing.B) {
rng, _ := randutil.NewPseudoRand()
typs := []coltypes.T{coltypes.Int64}
batch := colexec.RandomBatch(testAllocator, rng, typs, int(coldata.BatchSize()), 0, 0)
op := colexec.NewRepeatableBatchSource(batch)
op := colexec.NewRepeatableBatchSource(testAllocator, batch)
ctx := context.Background()
for i := 0; i < b.N; i++ {
op.ResetBatchesToReturn(numBatches)
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/colexec/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ count_agg.eg.go
distinct.eg.go
hashjoiner.eg.go
hashtable.eg.go
hash_utils.eg.go
like_ops.eg.go
mergejoinbase.eg.go
mergejoiner_fullouter.eg.go
Expand All @@ -32,4 +33,3 @@ sort.eg.go
sum_agg.eg.go
tuples_differ.eg.go
vec_comparators.eg.go
zerocolumns.eg.go
4 changes: 2 additions & 2 deletions pkg/sql/colexec/aggregator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -827,9 +827,9 @@ func TestHashAggregator(t *testing.T) {
input: tuples{
{0, 3},
{0, 4},
{hashTableBucketSize, 6},
{hashTableNumBuckets, 6},
{0, 5},
{hashTableBucketSize, 7},
{hashTableNumBuckets, 7},
},
colTypes: []coltypes.T{coltypes.Int64, coltypes.Int64},
groupCols: []uint32{0},
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/colexec/and_or_projection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ func benchmarkLogicalProjOp(
sel[i] = uint16(i)
}
}
input := NewRepeatableBatchSource(batch)
input := NewRepeatableBatchSource(testAllocator, batch)

spec := &execinfrapb.ProcessorSpec{
Input: []execinfrapb.InputSyncSpec{{ColumnTypes: []types.T{*types.Bool, *types.Bool}}},
Expand Down
15 changes: 7 additions & 8 deletions pkg/sql/colexec/builtin_funcs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ func benchmarkBuiltinFunctions(b *testing.B, useSelectionVector bool, hasNulls b
}
}

source := NewRepeatableBatchSource(batch)
source := NewRepeatableBatchSource(testAllocator, batch)
source.Init()

expr, err := parser.ParseExpr("abs(@1)")
Expand Down Expand Up @@ -175,18 +175,17 @@ func BenchmarkCompareSpecializedOperators(b *testing.B) {
ctx := context.Background()
tctx := tree.NewTestingEvalContext(cluster.MakeTestingClusterSettings())

batch := testAllocator.NewMemBatch([]coltypes.T{coltypes.Bytes, coltypes.Int64, coltypes.Int64, coltypes.Bytes})
batch := testAllocator.NewMemBatch([]coltypes.T{coltypes.Bytes, coltypes.Int64, coltypes.Int64})
bCol := batch.ColVec(0).Bytes()
sCol := batch.ColVec(1).Int64()
eCol := batch.ColVec(2).Int64()
outCol := batch.ColVec(3).Bytes()
for i := 0; i < int(coldata.BatchSize()); i++ {
bCol.Set(i, []byte("hello there"))
sCol[i] = 1
eCol[i] = 4
}
batch.SetLength(coldata.BatchSize())
source := NewRepeatableBatchSource(batch)
source := NewRepeatableBatchSource(testAllocator, batch)
source.Init()

// Set up the default operator.
Expand Down Expand Up @@ -229,21 +228,21 @@ func BenchmarkCompareSpecializedOperators(b *testing.B) {
b.SetBytes(int64(len("hello there") * int(coldata.BatchSize())))
b.ResetTimer()
for i := 0; i < b.N; i++ {
defaultOp.Next(ctx)
b := defaultOp.Next(ctx)
// Due to the flat byte updates, we have to reset the output
// bytes col after each next call.
outCol.Reset()
b.ColVec(3).Bytes().Reset()
}
})

b.Run("SpecializedSubstringOperator", func(b *testing.B) {
b.SetBytes(int64(len("hello there") * int(coldata.BatchSize())))
b.ResetTimer()
for i := 0; i < b.N; i++ {
specOp.Next(ctx)
b := specOp.Next(ctx)
// Due to the flat byte updates, we have to reset the output
// bytes col after each next call.
outCol.Reset()
b.ColVec(3).Bytes().Reset()
}
})
}
2 changes: 1 addition & 1 deletion pkg/sql/colexec/cancel_checker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func TestCancelChecker(t *testing.T) {
defer leaktest.AfterTest(t)()
ctx, cancel := context.WithCancel(context.Background())
batch := testAllocator.NewMemBatch([]coltypes.T{coltypes.Int64})
op := NewCancelChecker(NewNoop(NewRepeatableBatchSource(batch)))
op := NewCancelChecker(NewNoop(NewRepeatableBatchSource(testAllocator, batch)))
cancel()
err := execerror.CatchVectorizedRuntimeError(func() {
op.Next(ctx)
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/colexec/deselector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func BenchmarkDeselector(b *testing.B) {
batch.SetSelection(true)
copy(batch.Selection(), sel)
batch.SetLength(batchLen)
input := NewRepeatableBatchSource(batch)
input := NewRepeatableBatchSource(testAllocator, batch)
op := NewDeselectorOp(testAllocator, input, inputTypes)
op.Init()
b.ResetTimer()
Expand Down
150 changes: 108 additions & 42 deletions pkg/sql/colexec/disk_spiller.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,20 +27,20 @@ import (
type bufferingInMemoryOperator interface {
Operator

// ExportBuffered returns all the batches that have been buffered up and have
// not yet been processed by the operator. It needs to be called once the
// memory limit has been reached in order to "dump" the buffered tuples into
// a disk-backed operator. It will return a zero-length batch once the buffer
// has been emptied.
// ExportBuffered returns all the batches that have been buffered up from the
// input and have not yet been processed by the operator. It needs to be
// called once the memory limit has been reached in order to "dump" the
// buffered tuples into a disk-backed operator. It will return a zero-length
// batch once the buffer has been emptied.
//
// Calling ExportBuffered may invalidate the contents of the last batch
// returned by ExportBuffered.
ExportBuffered() coldata.Batch
ExportBuffered(input Operator) coldata.Batch
}

// oneInputDiskSpiller is an Operator that manages the fallback from an
// in-memory buffering operator to a disk-backed one when the former hits the
// memory limit.
// oneInputDiskSpiller is an Operator that manages the fallback from a one
// input in-memory buffering operator to a disk-backed one when the former hits
// the memory limit.
//
// NOTE: if an out of memory error occurs during initialization, this operator
// simply propagates the error further.
Expand All @@ -63,29 +63,15 @@ type bufferingInMemoryOperator interface {
// output
//
// Here is the explanation:
// - the main chain of Operators is input -> disk spiller -> output
// - the dist spiller will first try running everything through the left side
// - the main chain of Operators is input -> disk spiller -> output.
// - the disk spiller will first try running everything through the left side
// chain of input -> inMemoryOp. If that succeeds, great! The disk spiller
// will simply propagate the batch to the output. If that fails with an OOM
// error, the disk spiller will then initialize the right side chain and will
// proceed to emit from there
// proceed to emit from there.
// - the right side chain is bufferExportingOperator -> diskBackedOp. The
// former will first export all the buffered tuples from inMemoryOp and then
// will proceed on emitting from input.
type oneInputDiskSpiller struct {
NonExplainable

initialized bool
spilled bool

input Operator
inMemoryOp bufferingInMemoryOperator
inMemoryMemMonitorName string
diskBackedOp Operator
spillingCallbackFn func()
}

var _ Operator = &oneInputDiskSpiller{}

// newOneInputDiskSpiller returns a new oneInputDiskSpiller. It takes the
// following arguments:
Expand All @@ -109,16 +95,99 @@ func newOneInputDiskSpiller(
spillingCallbackFn func(),
) Operator {
diskBackedOpInput := newBufferExportingOperator(inMemoryOp, input)
return &oneInputDiskSpiller{
input: input,
return &diskSpillerBase{
inputs: []Operator{input},
inMemoryOp: inMemoryOp,
inMemoryMemMonitorName: inMemoryMemMonitorName,
diskBackedOp: diskBackedOpConstructor(diskBackedOpInput),
spillingCallbackFn: spillingCallbackFn,
}
}

func (d *oneInputDiskSpiller) Init() {
// twoInputDiskSpiller is an Operator that manages the fallback from a two
// input in-memory buffering operator to a disk-backed one when the former hits
// the memory limit.
//
// NOTE: if an out of memory error occurs during initialization, this operator
// simply propagates the error further.
//
// The diagram of the components involved is as follows:
//
// ----- input1 input2 ----------
// || / | _____________________________________________| | ||
// || / ↓ / | ||
// || | inMemoryOp ------------------------------ | ||
// || | / | | | ||
// || | / ------------------ | | ||
// || |/ (2nd src) ↓ (1st src) ↓ (1st src) ↓ (2nd src) ||
// || / ----------> bufferExportingOperator1 bufferExportingOperator2 ||
// || / | | ||
// || | | | ||
// || | -----> diskBackedOp <----- ||
// || | | ||
// || ------------------------------ | ||
// || ↓ ↓ ||
// ----------------------------> disk spiller <-------------------------------
//
// Here is the explanation:
// - the main chain of Operators is inputs -> disk spiller -> output.
// - the disk spiller will first try running everything through the left side
// chain of inputs -> inMemoryOp. If that succeeds, great! The disk spiller
// will simply propagate the batch to the output. If that fails with an OOM
// error, the disk spiller will then initialize the right side chain and will
// proceed to emit from there.
// - the right side chain is bufferExportingOperators -> diskBackedOp. The
// former will first export all the buffered tuples from inMemoryOp and then
// will proceed on emitting from input.

// newTwoInputDiskSpiller returns a new twoInputDiskSpiller. It takes the
// following arguments:
// - inMemoryOp - the in-memory operator that will be consuming inputs and
// doing computations until it either successfully processes the whole inputs
// or reaches its memory limit.
// - inMemoryMemMonitorName - the name of the memory monitor of the in-memory
// operator. diskSpiller will catch an OOM error only if this name is
// contained within the error message.
// - diskBackedOpConstructor - the function to construct the disk-backed
// operator when given two input operators. We take in a constructor rather
// than an already created operator in order to hide the complexity of buffer
// exporting operators that serves as inputs to the disk-backed operator.
// - spillingCallbackFn will be called when the spilling from in-memory to disk
// backed operator occurs. It should only be set in tests.
func newTwoInputDiskSpiller(
inputOne, inputTwo Operator,
inMemoryOp bufferingInMemoryOperator,
inMemoryMemMonitorName string,
diskBackedOpConstructor func(inputOne, inputTwo Operator) Operator,
spillingCallbackFn func(),
) Operator {
diskBackedOpInputOne := newBufferExportingOperator(inMemoryOp, inputOne)
diskBackedOpInputTwo := newBufferExportingOperator(inMemoryOp, inputTwo)
return &diskSpillerBase{
inputs: []Operator{inputOne, inputTwo},
inMemoryOp: inMemoryOp,
inMemoryMemMonitorName: inMemoryMemMonitorName,
diskBackedOp: diskBackedOpConstructor(diskBackedOpInputOne, diskBackedOpInputTwo),
spillingCallbackFn: spillingCallbackFn,
}
}

// diskSpillerBase is the common base for the one-input and two-input disk
// spillers.
type diskSpillerBase struct {
NonExplainable

inputs []Operator
initialized bool
spilled bool

inMemoryOp bufferingInMemoryOperator
inMemoryMemMonitorName string
diskBackedOp Operator
spillingCallbackFn func()
}

func (d *diskSpillerBase) Init() {
if d.initialized {
return
}
Expand All @@ -131,7 +200,7 @@ func (d *oneInputDiskSpiller) Init() {
d.inMemoryOp.Init()
}

func (d *oneInputDiskSpiller) Next(ctx context.Context) coldata.Batch {
func (d *diskSpillerBase) Next(ctx context.Context) coldata.Batch {
if d.spilled {
return d.diskBackedOp.Next(ctx)
}
Expand All @@ -157,30 +226,26 @@ func (d *oneInputDiskSpiller) Next(ctx context.Context) coldata.Batch {
return batch
}

func (d *oneInputDiskSpiller) ChildCount(verbose bool) int {
func (d *diskSpillerBase) ChildCount(verbose bool) int {
if verbose {
return 3
return len(d.inputs) + 2
}
return 1
}

func (d *oneInputDiskSpiller) Child(nth int, verbose bool) execinfra.OpNode {
// Note: although the main chain is d.input -> diskSpiller -> output (and the
// main chain should be under nth == 0), in order to make the output of
func (d *diskSpillerBase) Child(nth int, verbose bool) execinfra.OpNode {
// Note: although the main chain is d.inputs -> diskSpiller -> output (and
// the main chain should be under nth == 0), in order to make the output of
// EXPLAIN (VEC) less confusing we return the in-memory operator as being on
// the main chain.
if verbose {
switch nth {
case 0:
return d.inMemoryOp
case 1:
return d.input
case 2:
case len(d.inputs) + 1:
return d.diskBackedOp
default:
execerror.VectorizedInternalPanic(fmt.Sprintf("invalid index %d", nth))
// This code is unreachable, but the compiler cannot infer that.
return nil
return d.inputs[nth-1]
}
}
switch nth {
Expand All @@ -199,6 +264,7 @@ func (d *oneInputDiskSpiller) Child(nth int, verbose bool) execinfra.OpNode {
//
// NOTE: bufferExportingOperator assumes that both sources will have been
// initialized when bufferExportingOperator.Init() is called.
// NOTE: it is assumed that secondSource is the input to firstSource.
type bufferExportingOperator struct {
ZeroInputNode
NonExplainable
Expand Down Expand Up @@ -228,7 +294,7 @@ func (b *bufferExportingOperator) Next(ctx context.Context) coldata.Batch {
if b.firstSourceDone {
return b.secondSource.Next(ctx)
}
batch := b.firstSource.ExportBuffered()
batch := b.firstSource.ExportBuffered(b.secondSource)
if batch.Length() == 0 {
b.firstSourceDone = true
return b.Next(ctx)
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/colexec/distinct_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ func BenchmarkSortedDistinct(b *testing.B) {
bCol[i] = lastB
}
batch.SetLength(coldata.BatchSize())
source := NewRepeatableBatchSource(batch)
source := NewRepeatableBatchSource(testAllocator, batch)
source.Init()

distinct, err := NewOrderedDistinct(source, []uint32{1, 2}, []coltypes.T{coltypes.Int64, coltypes.Int64, coltypes.Int64})
Expand Down
Loading

0 comments on commit b798c6a

Please sign in to comment.