Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

executor: Add session variable for hash join v2 #56023

Merged
2 changes: 1 addition & 1 deletion pkg/executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -1657,7 +1657,7 @@ func (b *executorBuilder) buildHashJoinV2(v *plannercore.PhysicalHashJoin) exec.
}

func (b *executorBuilder) buildHashJoin(v *plannercore.PhysicalHashJoin) exec.Executor {
if join.IsHashJoinV2Enabled() && v.CanUseHashJoinV2() {
if b.ctx.GetSessionVars().UseHashJoinV2 && join.IsHashJoinV2Supported() && v.CanUseHashJoinV2() {
return b.buildHashJoinV2(v)
}
leftExec := b.build(v.Children()[0])
Expand Down
1 change: 1 addition & 0 deletions pkg/executor/join/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ go_library(
"//pkg/executor/internal/applycache",
"//pkg/executor/internal/exec",
"//pkg/executor/internal/vecgroupchecker",
"//pkg/executor/join/joinversion",
"//pkg/executor/unionexec",
"//pkg/expression",
"//pkg/parser/mysql",
Expand Down
24 changes: 10 additions & 14 deletions pkg/executor/join/hash_join_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (

"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/pkg/executor/internal/exec"
"github.com/pingcap/tidb/pkg/executor/join/joinversion"
"github.com/pingcap/tidb/pkg/expression"
"github.com/pingcap/tidb/pkg/parser/mysql"
"github.com/pingcap/tidb/pkg/planner/core/operator/logicalop"
Expand All @@ -40,25 +41,20 @@ import (

var (
_ exec.Executor = &HashJoinV2Exec{}
// enableHashJoinV2 is a variable used only in test
enableHashJoinV2 = atomic.Bool{}
// EnableHashJoinV2 enable hash join v2, used for test
EnableHashJoinV2 = "set tidb_hash_join_version = " + joinversion.HashJoinVersionOptimized
// DisableHashJoinV2 disable hash join v2, used for test
DisableHashJoinV2 = "set tidb_hash_join_version = " + joinversion.HashJoinVersionLegacy
// HashJoinV2Strings is used for test
HashJoinV2Strings = []string{DisableHashJoinV2, EnableHashJoinV2}
)

func init() {
enableHashJoinV2.Store(true)
}

// IsHashJoinV2Enabled return true if hash join v2 is enabled
func IsHashJoinV2Enabled() bool {
// IsHashJoinV2Supported return true if hash join v2 is supported in current env
func IsHashJoinV2Supported() bool {
// sizeOfUintptr should always equal to sizeOfUnsafePointer, because according to golang's doc,
// a Pointer can be converted to an uintptr. Add this check here in case in the future go runtime
// change this
return !heapObjectsCanMove() && enableHashJoinV2.Load() && sizeOfUintptr >= sizeOfUnsafePointer
}

// SetEnableHashJoinV2 enable/disable hash join v2
func SetEnableHashJoinV2(enable bool) {
enableHashJoinV2.Store(enable)
return !heapObjectsCanMove() && sizeOfUintptr >= sizeOfUnsafePointer
}

type hashTableContext struct {
Expand Down
8 changes: 8 additions & 0 deletions pkg/executor/join/joinversion/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")

go_library(
name = "joinversion",
srcs = ["join_version.go"],
importpath = "github.com/pingcap/tidb/pkg/executor/join/joinversion",
visibility = ["//visibility:public"],
)
31 changes: 31 additions & 0 deletions pkg/executor/join/joinversion/join_version.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
// Copyright 2024 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package joinversion

import (
"strings"
)

const (
// HashJoinVersionLegacy means hash join v1
HashJoinVersionLegacy = "legacy"
// HashJoinVersionOptimized means hash join v2
HashJoinVersionOptimized = "optimized"
)

// IsOptimizedVersion returns true if hashJoinVersion equals to HashJoinVersionOptimized
func IsOptimizedVersion(hashJoinVersion string) bool {
return strings.ToLower(hashJoinVersion) == HashJoinVersionOptimized
}
27 changes: 6 additions & 21 deletions pkg/executor/test/issuetest/executor_issue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,14 +178,8 @@ func TestIssue30289(t *testing.T) {
tk.MustExec("drop table if exists t")
tk.MustExec("create table t(a int)")
require.NoError(t, failpoint.Enable(fpName, `return(true)`))
isHashJoinV2Enabled := join.IsHashJoinV2Enabled()
defer func() {
join.SetEnableHashJoinV2(isHashJoinV2Enabled)
require.NoError(t, failpoint.Disable(fpName))
}()
useHashJoinV2 := []bool{true, false}
for _, hashJoinV2 := range useHashJoinV2 {
join.SetEnableHashJoinV2(hashJoinV2)
for _, hashJoinV2 := range join.HashJoinV2Strings {
tk.MustExec(hashJoinV2)
err := tk.QueryToErr("select /*+ hash_join(t1) */ * from t t1 join t t2 on t1.a=t2.a")
require.EqualError(t, err, "issue30289 build return error")
}
Expand All @@ -199,14 +193,8 @@ func TestIssue51998(t *testing.T) {
tk.MustExec("drop table if exists t")
tk.MustExec("create table t(a int)")
require.NoError(t, failpoint.Enable(fpName, `return(true)`))
isHashJoinV2Enabled := join.IsHashJoinV2Enabled()
defer func() {
join.SetEnableHashJoinV2(isHashJoinV2Enabled)
require.NoError(t, failpoint.Disable(fpName))
}()
useHashJoinV2 := []bool{true, false}
for _, hashJoinV2 := range useHashJoinV2 {
join.SetEnableHashJoinV2(hashJoinV2)
for _, hashJoinV2 := range join.HashJoinV2Strings {
tk.MustExec(hashJoinV2)
err := tk.QueryToErr("select /*+ hash_join(t1) */ * from t t1 join t t2 on t1.a=t2.a")
require.EqualError(t, err, "issue51998 build return error")
}
Expand Down Expand Up @@ -621,11 +609,8 @@ func TestIssue42662(t *testing.T) {
tk.MustExec("set global tidb_server_memory_limit='1600MB'")
tk.MustExec("set global tidb_server_memory_limit_sess_min_size=128*1024*1024")
tk.MustExec("set global tidb_mem_oom_action = 'cancel'")
isHashJoinV2Enabled := join.IsHashJoinV2Enabled()
defer join.SetEnableHashJoinV2(isHashJoinV2Enabled)
useHashJoinV2 := []bool{true, false}
for _, hashJoinV2 := range useHashJoinV2 {
join.SetEnableHashJoinV2(hashJoinV2)
for _, hashJoinV2 := range join.HashJoinV2Strings {
tk.MustExec(hashJoinV2)
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/executor/join/issue42662_1", `return(true)`))
// tk.Session() should be marked as MemoryTop1Tracker but not killed.
tk.MustQuery("select /*+ hash_join(t1)*/ * from t1 join t2 on t1.a = t2.a and t1.b = t2.b")
Expand Down
60 changes: 11 additions & 49 deletions pkg/executor/test/jointest/hashjoin/hash_join_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -411,9 +411,7 @@ func TestIssue20270(t *testing.T) {
tk.MustExec("create table t1(c1 int, c2 int)")
tk.MustExec("insert into t values(1,1),(2,2)")
tk.MustExec("insert into t1 values(2,3),(4,4)")
enableHashJoinV2 := join.IsHashJoinV2Enabled()
join.SetEnableHashJoinV2(false)
defer join.SetEnableHashJoinV2(enableHashJoinV2)
tk.MustExec(join.DisableHashJoinV2)
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/executor/join/killedInJoin2Chunk", "return(true)"))
err := tk.QueryToErr("select /*+ HASH_JOIN(t, t1) */ * from t left join t1 on t.c1 = t1.c1 where t.c1 = 1 or t1.c2 > 20")
require.Equal(t, exeerrors.ErrQueryInterrupted, err)
Expand Down Expand Up @@ -487,11 +485,7 @@ func TestFinalizeCurrentSegPanic(t *testing.T) {
tk.MustExec("create table t2 (a int, b int, c int)")
tk.MustExec("insert into t1 values (1, 1, 1), (1, 2, 2), (2, 1, 3), (2, 2, 4)")
tk.MustExec("insert into t2 values (1, 1, 1), (1, 2, 2), (2, 1, 3), (2, 2, 4)")
isHashJoinV2Enabled := join.IsHashJoinV2Enabled()
join.SetEnableHashJoinV2(true)
defer func() {
join.SetEnableHashJoinV2(isHashJoinV2Enabled)
}()
tk.MustExec(join.EnableHashJoinV2)
fpName := "github.com/pingcap/tidb/pkg/executor/join/finalizeCurrentSegPanic"
require.NoError(t, failpoint.Enable(fpName, "panic(\"finalizeCurrentSegPanic\")"))
defer func() {
Expand All @@ -511,11 +505,7 @@ func TestSplitPartitionPanic(t *testing.T) {
tk.MustExec("create table t2 (a int, b int, c int)")
tk.MustExec("insert into t1 values (1, 1, 1), (1, 2, 2), (2, 1, 3), (2, 2, 4)")
tk.MustExec("insert into t2 values (1, 1, 1), (1, 2, 2), (2, 1, 3), (2, 2, 4)")
isHashJoinV2Enabled := join.IsHashJoinV2Enabled()
join.SetEnableHashJoinV2(true)
defer func() {
join.SetEnableHashJoinV2(isHashJoinV2Enabled)
}()
tk.MustExec(join.EnableHashJoinV2)
fpName := "github.com/pingcap/tidb/pkg/executor/join/splitPartitionPanic"
require.NoError(t, failpoint.Enable(fpName, "panic(\"splitPartitionPanic\")"))
defer func() {
Expand All @@ -535,11 +525,7 @@ func TestProcessOneProbeChunkPanic(t *testing.T) {
tk.MustExec("create table t2 (a int, b int, c int)")
tk.MustExec("insert into t1 values (1, 1, 1), (1, 2, 2), (2, 1, 3), (2, 2, 4)")
tk.MustExec("insert into t2 values (1, 1, 1), (1, 2, 2), (2, 1, 3), (2, 2, 4)")
isHashJoinV2Enabled := join.IsHashJoinV2Enabled()
join.SetEnableHashJoinV2(true)
defer func() {
join.SetEnableHashJoinV2(isHashJoinV2Enabled)
}()
tk.MustExec(join.EnableHashJoinV2)
fpName := "github.com/pingcap/tidb/pkg/executor/join/processOneProbeChunkPanic"
require.NoError(t, failpoint.Enable(fpName, "panic(\"processOneProbeChunkPanic\")"))
defer func() {
Expand All @@ -559,11 +545,7 @@ func TestCreateTasksPanic(t *testing.T) {
tk.MustExec("create table t2 (a int, b int, c int)")
tk.MustExec("insert into t1 values (1, 1, 1), (1, 2, 2), (2, 1, 3), (2, 2, 4)")
tk.MustExec("insert into t2 values (1, 1, 1), (1, 2, 2), (2, 1, 3), (2, 2, 4)")
isHashJoinV2Enabled := join.IsHashJoinV2Enabled()
join.SetEnableHashJoinV2(true)
defer func() {
join.SetEnableHashJoinV2(isHashJoinV2Enabled)
}()
tk.MustExec(join.EnableHashJoinV2)
fpName := "github.com/pingcap/tidb/pkg/executor/join/createTasksPanic"
require.NoError(t, failpoint.Enable(fpName, "panic(\"createTasksPanic\")"))
defer func() {
Expand All @@ -583,11 +565,7 @@ func TestBuildHashTablePanic(t *testing.T) {
tk.MustExec("create table t2 (a int, b int, c int)")
tk.MustExec("insert into t1 values (1, 1, 1), (1, 2, 2), (2, 1, 3), (2, 2, 4)")
tk.MustExec("insert into t2 values (1, 1, 1), (1, 2, 2), (2, 1, 3), (2, 2, 4)")
isHashJoinV2Enabled := join.IsHashJoinV2Enabled()
join.SetEnableHashJoinV2(true)
defer func() {
join.SetEnableHashJoinV2(isHashJoinV2Enabled)
}()
tk.MustExec(join.EnableHashJoinV2)
fpName := "github.com/pingcap/tidb/pkg/executor/join/buildHashTablePanic"
require.NoError(t, failpoint.Enable(fpName, "panic(\"buildHashTablePanic\")"))
defer func() {
Expand All @@ -607,11 +585,7 @@ func TestKillDuringProbe(t *testing.T) {
tk.MustExec("create table t1(c1 int, c2 int)")
tk.MustExec("insert into t values(1,1),(2,2)")
tk.MustExec("insert into t1 values(2,3),(4,4)")
isHashJoinV2Enabled := join.IsHashJoinV2Enabled()
join.SetEnableHashJoinV2(true)
defer func() {
join.SetEnableHashJoinV2(isHashJoinV2Enabled)
}()
tk.MustExec(join.EnableHashJoinV2)
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/executor/join/killedDuringProbe", "return(true)"))
defer func() {
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/executor/join/killedDuringProbe"))
Expand Down Expand Up @@ -641,11 +615,7 @@ func TestKillDuringBuild(t *testing.T) {
tk.MustExec("create table t1(c1 int, c2 int)")
tk.MustExec("insert into t values(1,1),(2,2)")
tk.MustExec("insert into t1 values(2,3),(4,4)")
isHashJoinV2Enabled := join.IsHashJoinV2Enabled()
join.SetEnableHashJoinV2(true)
defer func() {
join.SetEnableHashJoinV2(isHashJoinV2Enabled)
}()
tk.MustExec(join.EnableHashJoinV2)
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/executor/join/killedDuringBuild", "return(true)"))
defer func() {
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/executor/join/killedDuringBuild"))
Expand All @@ -671,10 +641,7 @@ func TestIssue54755(t *testing.T) {
tk.MustExec("create table t2(pk INTEGER AUTO_INCREMENT, col_int_nokey INTEGER, col_int_key INTEGER, col_varchar_key VARCHAR(1), col_varchar_nokey VARCHAR(1), PRIMARY KEY (pk), KEY (col_int_key), KEY (col_varchar_key, col_int_key))")
tk.MustExec("insert into t1(col_int_key, col_int_nokey,col_varchar_key, col_varchar_nokey) values(4,2,'v','v'),(62,150,'v','v')")
tk.MustExec("insert into t2(col_int_key, col_int_nokey,col_varchar_key, col_varchar_nokey) values(8,null,'x','x'),(7,8,'d','d')")
join.SetEnableHashJoinV2(true)
defer func() {
join.SetEnableHashJoinV2(false)
}()
tk.MustExec(join.EnableHashJoinV2)
// right join
tk.MustQuery("select max(SQ1_alias2.col_int_nokey) as SQ1_field1 from ( t2 as SQ1_alias1 right join t1 as SQ1_alias2 on ( SQ1_alias2.col_varchar_key = SQ1_alias1.col_varchar_nokey ))").Check(testkit.Rows("150"))
// left join
Expand All @@ -688,13 +655,8 @@ func TestIssue55016(t *testing.T) {
tk.MustExec("drop table if exists t;")
tk.MustExec("create table t(a varchar(10), b char(10))")
tk.MustExec("insert into t values('aa','a')")
isHashJoinV2Enabled := join.IsHashJoinV2Enabled()
defer func() {
join.SetEnableHashJoinV2(isHashJoinV2Enabled)
}()
hashJoinV2Enable := []bool{true, false}
for _, enableHashJoinV2 := range hashJoinV2Enable {
join.SetEnableHashJoinV2(enableHashJoinV2)
for _, hashJoinV2 := range join.HashJoinV2Strings {
tk.MustExec(hashJoinV2)
tk.MustQuery("select count(*) from t t1 join t t2 on t1.a = t2.b and t2.a = t1.b").Check(testkit.Rows("0"))
}
}
10 changes: 2 additions & 8 deletions pkg/executor/test/seqtest/seq_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1297,14 +1297,8 @@ func TestOOMPanicInHashJoinWhenFetchBuildRows(t *testing.T) {
tk.MustExec("insert into t values(1,1),(2,2)")
fpName := "github.com/pingcap/tidb/pkg/executor/join/errorFetchBuildSideRowsMockOOMPanic"
require.NoError(t, failpoint.Enable(fpName, `panic("ERROR 1105 (HY000): Out Of Memory Quota![conn=1]")`))
isHashJoinV2Enabled := join.IsHashJoinV2Enabled()
defer func() {
join.SetEnableHashJoinV2(isHashJoinV2Enabled)
require.NoError(t, failpoint.Disable(fpName))
}()
useHashJoinV2 := []bool{true, false}
for _, hashJoinV2 := range useHashJoinV2 {
join.SetEnableHashJoinV2(hashJoinV2)
for _, hashJoinV2 := range join.HashJoinV2Strings {
tk.MustExec(hashJoinV2)
err := tk.QueryToErr("select * from t as t2 join t as t1 where t1.c1=t2.c1")
require.EqualError(t, err, "failpoint panic: ERROR 1105 (HY000): Out Of Memory Quota![conn=1]")
}
Expand Down
1 change: 1 addition & 0 deletions pkg/sessionctx/variable/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ go_library(
deps = [
"//pkg/config",
"//pkg/errno",
"//pkg/executor/join/joinversion",
"//pkg/keyspace",
"//pkg/kv",
"//pkg/meta/model",
Expand Down
3 changes: 3 additions & 0 deletions pkg/sessionctx/variable/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -1357,6 +1357,9 @@ type SessionVars struct {
// DisableHashJoin indicates whether to disable hash join.
DisableHashJoin bool

// UseHashJoinV2 indicates whether to use hash join v2.
UseHashJoinV2 bool

// EnableHistoricalStats indicates whether to enable historical statistics.
EnableHistoricalStats bool

Expand Down
1 change: 1 addition & 0 deletions pkg/sessionctx/variable/setvar_affect.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ var isHintUpdatableVerified = map[string]struct{}{
"tiflash_fastscan": {},
"tiflash_fine_grained_shuffle_batch_size": {},
"tiflash_fine_grained_shuffle_stream_count": {},
"tidb_hash_join_version": {},
// Variables that is compatible with MySQL.
"cte_max_recursion_depth": {},
"sql_mode": {},
Expand Down
15 changes: 15 additions & 0 deletions pkg/sessionctx/variable/sysvar.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (

"github.com/pingcap/errors"
"github.com/pingcap/tidb/pkg/config"
"github.com/pingcap/tidb/pkg/executor/join/joinversion"
"github.com/pingcap/tidb/pkg/keyspace"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/metrics"
Expand Down Expand Up @@ -2392,6 +2393,20 @@ var defaultSysVars = []*SysVar{
s.AnalyzeVersion = tidbOptPositiveInt32(val, DefTiDBAnalyzeVersion)
return nil
}},
{Scope: ScopeGlobal | ScopeSession, Name: TiDBHashJoinVersion, Value: DefTiDBHashJoinVersion, Type: TypeStr,
Validation: func(_ *SessionVars, normalizedValue string, originalValue string, _ ScopeFlag) (string, error) {
lowerValue := strings.ToLower(normalizedValue)
if lowerValue != joinversion.HashJoinVersionLegacy && lowerValue != joinversion.HashJoinVersionOptimized {
err := fmt.Errorf("incorrect value: `%s`. %s options: %s", originalValue, TiDBHashJoinVersion, joinversion.HashJoinVersionLegacy+", "+joinversion.HashJoinVersionOptimized)
return normalizedValue, err
}
return normalizedValue, nil
},
SetSession: func(s *SessionVars, val string) error {
s.UseHashJoinV2 = joinversion.IsOptimizedVersion(val)
return nil
},
},
{Scope: ScopeGlobal | ScopeSession, Name: TiDBOptEnableHashJoin, Value: BoolToOnOff(DefTiDBOptEnableHashJoin), Type: TypeBool, SetSession: func(s *SessionVars, val string) error {
s.DisableHashJoin = !TiDBOptOn(val)
return nil
Expand Down
21 changes: 21 additions & 0 deletions pkg/sessionctx/variable/sysvar_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1753,6 +1753,27 @@ func TestEnableWindowFunction(t *testing.T) {
require.Equal(t, vars.EnableWindowFunction, true)
}

func TestTiDBHashJoinVersion(t *testing.T) {
vars := NewSessionVars(nil)
sv := GetSysVar(TiDBHashJoinVersion)
// set error value
_, err := sv.Validation(vars, "invalid", "invalid", ScopeSession)
require.NotNil(t, err)
// set valid value
_, err = sv.Validation(vars, "legacy", "legacy", ScopeSession)
require.NoError(t, err)
_, err = sv.Validation(vars, "optimized", "optimized", ScopeSession)
require.NoError(t, err)
_, err = sv.Validation(vars, "Legacy", "Legacy", ScopeSession)
require.NoError(t, err)
_, err = sv.Validation(vars, "Optimized", "Optimized", ScopeSession)
require.NoError(t, err)
_, err = sv.Validation(vars, "LegaCy", "LegaCy", ScopeSession)
require.NoError(t, err)
_, err = sv.Validation(vars, "OptimiZed", "OptimiZed", ScopeSession)
require.NoError(t, err)
}

func TestTiDBAutoAnalyzeConcurrencyValidation(t *testing.T) {
vars := NewSessionVars(nil)

Expand Down
Loading