Skip to content

Commit

Permalink
Merge branch 'master' into fix-cast-str-to-time
Browse files Browse the repository at this point in the history
  • Loading branch information
qw4990 authored Jun 22, 2022
2 parents 16accff + 007d64c commit dbc91dc
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 8 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ The [community repository](https://github.com/pingcap/community) hosts all infor

[<img src="docs/contribution-map.png" alt="contribution-map" width="180">](https://github.com/pingcap/tidb-map/blob/master/maps/contribution-map.md#tidb-is-an-open-source-distributed-htap-database-compatible-with-the-mysql-protocol)

Contributions are welcomed and greatly appreciated. See [Contribution to TiDB](https://pingcap.github.io/tidb-dev-guide/contribute-to-tidb/introduction.html) for details on typical contribution workflows. For more contributing information, click on the contributor icon above.
Contributions are welcomed and greatly appreciated. All the contributors are welcomed to claim your reward by filing this [form](https://forms.pingcap.com/f/tidb-contribution-swag). See [Contribution to TiDB](https://pingcap.github.io/tidb-dev-guide/contribute-to-tidb/introduction.html) for details on typical contribution workflows. For more contributing information, click on the contributor icon above.

## Adopters

Expand Down
24 changes: 17 additions & 7 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -4509,6 +4509,7 @@ func (b *executorBuilder) buildShuffle(v *plannercore.PhysicalShuffle) *ShuffleE
concurrency: v.Concurrency,
}

// 1. initialize the splitters
splitters := make([]partitionSplitter, len(v.ByItemArrays))
switch v.SplitterType {
case plannercore.PartitionHashSplitterType:
Expand All @@ -4524,6 +4525,7 @@ func (b *executorBuilder) buildShuffle(v *plannercore.PhysicalShuffle) *ShuffleE
}
shuffle.splitters = splitters

// 2. initialize the data sources (build the data sources from physical plan to executors)
shuffle.dataSources = make([]Executor, len(v.DataSources))
for i, dataSource := range v.DataSources {
shuffle.dataSources[i] = b.build(dataSource)
Expand All @@ -4532,26 +4534,34 @@ func (b *executorBuilder) buildShuffle(v *plannercore.PhysicalShuffle) *ShuffleE
}
}

// 3. initialize the workers
head := v.Children()[0]
// A `PhysicalShuffleReceiverStub` for every worker have the same `DataSource` but different `Receiver`.
// We preallocate `PhysicalShuffleReceiverStub`s here and reuse them below.
stubs := make([]*plannercore.PhysicalShuffleReceiverStub, 0, len(v.DataSources))
for _, dataSource := range v.DataSources {
stub := plannercore.PhysicalShuffleReceiverStub{
DataSource: dataSource,
}.Init(b.ctx, dataSource.Stats(), dataSource.SelectBlockOffset(), nil)
stub.SetSchema(dataSource.Schema())
stubs = append(stubs, stub)
}
shuffle.workers = make([]*shuffleWorker, shuffle.concurrency)
for i := range shuffle.workers {
receivers := make([]*shuffleReceiver, len(v.DataSources))
for j, dataSource := range v.DataSources {
receivers[j] = &shuffleReceiver{
baseExecutor: newBaseExecutor(b.ctx, dataSource.Schema(), dataSource.ID()),
baseExecutor: newBaseExecutor(b.ctx, dataSource.Schema(), stubs[j].ID()),
}
}

w := &shuffleWorker{
receivers: receivers,
}

for j, dataSource := range v.DataSources {
stub := plannercore.PhysicalShuffleReceiverStub{
Receiver: (unsafe.Pointer)(receivers[j]),
DataSource: dataSource,
}.Init(b.ctx, dataSource.Stats(), dataSource.SelectBlockOffset(), nil)
stub.SetSchema(dataSource.Schema())
for j := range v.DataSources {
stub := stubs[j]
stub.Receiver = (unsafe.Pointer)(receivers[j])
v.Tails[j].SetChildren(stub)
}

Expand Down
9 changes: 9 additions & 0 deletions executor/explain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,7 @@ func TestCheckActRowsWithUnistore(t *testing.T) {
tk.MustExec("create table t_unistore_act_rows(a int, b int, index(a, b))")
tk.MustExec("insert into t_unistore_act_rows values (1, 0), (1, 0), (2, 0), (2, 1)")
tk.MustExec("analyze table t_unistore_act_rows")
tk.MustExec("set @@tidb_merge_join_concurrency= 5;")

type testStruct struct {
sql string
Expand Down Expand Up @@ -353,6 +354,14 @@ func TestCheckActRowsWithUnistore(t *testing.T) {
sql: "with cte(a) as (select a from t_unistore_act_rows) select (select 1 from cte limit 1) from cte;",
expected: []string{"4", "4", "4", "4", "4"},
},
{
sql: "select a, row_number() over (partition by b) from t_unistore_act_rows;",
expected: []string{"4", "4", "4", "4", "4", "4", "4"},
},
{
sql: "select /*+ merge_join(t1, t2) */ * from t_unistore_act_rows t1 join t_unistore_act_rows t2 on t1.b = t2.b;",
expected: []string{"10", "10", "4", "4", "4", "4", "4", "4", "4", "4", "4", "4"},
},
}

// Default RPC encoding may cause statistics explain result differ and then the test unstable.
Expand Down

0 comments on commit dbc91dc

Please sign in to comment.