Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ddl: add some tests for concurrent DDL #36378

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
261 changes: 261 additions & 0 deletions ddl/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"math"
"strconv"
"strings"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -1701,3 +1702,263 @@ func TestBuildMaxLengthIndexWithNonRestrictedSqlMode(t *testing.T) {
}
}
}

// DDLs related to the same table block each other.
func TestConcurrentBasicSpec1(t *testing.T) {
defer config.RestoreFunc()
config.UpdateGlobal(func(conf *config.Config) {
conf.TiKVClient.AsyncCommit.SafeWindow = 0
conf.TiKVClient.AsyncCommit.AllowedClockDrift = 0
})
Comment on lines +1708 to +1712
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It has been set by ddl/main_test.go, so we can remove this.


store, dom, clean := testkit.CreateMockStoreAndDomainWithSchemaLease(t, dbTestLease)
defer clean()
// Prepare work.
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk2 := testkit.NewTestKit(t, store)
tk2.MustExec("use test")

tk.MustExec("create table t1(a int, b int)")

concurrentDDLs := []string{
"alter table t1 add index idx(a)",
"alter table t1 add column tcol int",
}

var wg sync.WaitGroup
hook := &ddl.TestDDLCallback{Do: dom}
//var checkErr error
concurrentDDLsOffset := 0
once := false
hook.OnJobRunBeforeExported = func(job *model.Job) {
if once {
return
}
if job.SchemaState != model.StateDeleteOnly {
return
}
once = true
startTs := time.Now()
wg.Add(1)
go func() {
tk2.MustExec(concurrentDDLs[concurrentDDLsOffset])
endTs := time.Since(startTs)
require.Greater(t, endTs.Milliseconds(), int64(1000))
wg.Done()
}()
time.Sleep(time.Second)
}
dom.DDL().SetHook(hook)

tk.MustExec("alter table t1 add column c int")
wg.Wait()

concurrentDDLsOffset = 1
once = false
tk.MustExec("alter table t1 add column d int")
wg.Wait()
}

// Drop database and DDL of all objects in Database block each other.
func TestConcurrentBasicSpec2(t *testing.T) {
defer config.RestoreFunc()
config.UpdateGlobal(func(conf *config.Config) {
conf.TiKVClient.AsyncCommit.SafeWindow = 0
conf.TiKVClient.AsyncCommit.AllowedClockDrift = 0
})

store, dom, clean := testkit.CreateMockStoreAndDomainWithSchemaLease(t, dbTestLease)
defer clean()
// Prepare work.
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk2 := testkit.NewTestKit(t, store)
tk2.MustExec("use test")

tk.MustExec("create table t1(a int, b int)")

concurrentDDLs := []string{
"alter table t1 add index idx(a)",
"alter table t1 add column c int",
"drop database test",
}

var wg sync.WaitGroup
hook := &ddl.TestDDLCallback{Do: dom}
//var checkErr error
concurrentDDLsOffset := 0
once := false
checkState := model.StatePublic
hook.OnJobRunBeforeExported = func(job *model.Job) {
if once {
return
}
if job.SchemaState != checkState {
return
}
once = true
startTs := time.Now()
wg.Add(1)
go func() {
_, err := tk2.Exec(concurrentDDLs[concurrentDDLsOffset])
if concurrentDDLsOffset < 2 {
require.Errorf(t, err, "concurrentDDLsOffset %d", concurrentDDLsOffset)
} else {
require.NoError(t, err)
}
endTs := time.Since(startTs)
require.Greater(t, endTs.Milliseconds(), int64(1000))
wg.Done()
}()
time.Sleep(time.Second)
}
dom.DDL().SetHook(hook)

// drop database blocks other DDLs.
tk.MustExec("drop database test")
wg.Wait()

tk.MustExec("create database test")
tk.MustExec("use test")
tk2.MustExec("use test")
tk.MustExec("create table t1(a int, b int)")
concurrentDDLsOffset = 1
once = false
tk.MustExec("drop database test")
wg.Wait()

// Other DDLs block drop database.
checkState = model.StateDeleteOnly
tk.MustExec("create database test")
tk.MustExec("use test")
tk2.MustExec("use test")
tk.MustExec("create table t1(a int, b int)")
concurrentDDLsOffset = 2
once = false
tk.MustExec("alter table t1 add column c int")
wg.Wait()

// Other DDLs block drop database.
tk.MustExec("create database test")
tk.MustExec("use test")
tk2.MustExec("use test")
tk.MustExec("create table t1(a int, b int)")
concurrentDDLsOffset = 2
once = false
tk.MustExec("alter table t1 add index idx(a)")
wg.Wait()
}

// Add Index and column type changes involving different tables can be executed concurrently, with a maximum concurrency of 10.
func TestConcurrentBasicSpec3(t *testing.T) {
defer config.RestoreFunc()
config.UpdateGlobal(func(conf *config.Config) {
conf.TiKVClient.AsyncCommit.SafeWindow = 0
conf.TiKVClient.AsyncCommit.AllowedClockDrift = 0
})

store, dom, clean := testkit.CreateMockStoreAndDomainWithSchemaLease(t, dbTestLease)
defer clean()
// Prepare work.
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk2 := testkit.NewTestKit(t, store)
tk2.MustExec("use test")

for i := 1; i <= 20; i++ {
tk.MustExec(fmt.Sprintf("create table t%d(a int, b int)", i))
}

var wg sync.WaitGroup
hook := &ddl.TestDDLCallback{Do: dom}
//var checkErr error
skip := false
checkState := model.StateDeleteOnly
hook.OnJobUpdatedExported = func(job *model.Job) {
if skip {
return
}
if job.SchemaState != checkState {
return
}
time.Sleep(5 * time.Second)
}
dom.DDL().SetHook(hook)

for i := 1; i <= 20; i++ {
ii := i
wg.Add(1)
go func() {
newTk := testkit.NewTestKit(t, store)
newTk.MustExec("use test")
startTs := time.Now()
if ii%2 == 0 {
newTk.MustExec(fmt.Sprintf("alter table t%d add index idx(a)", ii))
} else {
newTk.MustExec(fmt.Sprintf("alter table t%d change column a a tinyint", ii))
}
spendTime := time.Since(startTs).Milliseconds()
if ii <= 10 {
require.Lessf(t, spendTime, int64(9000), "ii %d spend time %d ms", ii, spendTime)
} else {
require.Greaterf(t, spendTime, int64(5000), "ii %d spend time %d ms", ii, spendTime)
require.Less(t, spendTime, int64(14000), "ii %d spend time %d ms", ii, spendTime)
}
wg.Done()
}()
if i == 10 {
time.Sleep(100 * time.Millisecond)
}
}

wg.Wait()
}

// Non-(add index/column type change) DDL needs to wait for the non-blocked ( non-(add index/column type change) ) DDL to complete before executing.
func TestConcurrentBasicSpec4(t *testing.T) {
defer config.RestoreFunc()
config.UpdateGlobal(func(conf *config.Config) {
conf.TiKVClient.AsyncCommit.SafeWindow = 0
conf.TiKVClient.AsyncCommit.AllowedClockDrift = 0
})

store, dom, clean := testkit.CreateMockStoreAndDomainWithSchemaLease(t, dbTestLease)
defer clean()
// Prepare work.
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk2 := testkit.NewTestKit(t, store)
tk2.MustExec("use test")

tk.MustExec("create table t1(a int, b int)")
tk.MustExec("create table t2(a int, b int)")

var wg sync.WaitGroup
hook := &ddl.TestDDLCallback{Do: dom}
//var checkErr error
once := false
checkState := model.StateDeleteOnly
hook.OnJobRunBeforeExported = func(job *model.Job) {
if once {
return
}
if job.SchemaState != checkState {
return
}
once = true
wg.Add(1)
go func() {
startTs := time.Now()
tk2.MustExec("alter table t2 add column c int")
endTs := time.Since(startTs)
require.Greater(t, endTs.Milliseconds(), int64(1000))
wg.Done()
}()
time.Sleep(time.Second)
}
dom.DDL().SetHook(hook)

tk.MustExec("alter table t1 add column c int")
wg.Wait()
}