diff --git a/br/pkg/gluetidb/BUILD.bazel b/br/pkg/gluetidb/BUILD.bazel new file mode 100644 index 0000000000000..eddbd41ee46d4 --- /dev/null +++ b/br/pkg/gluetidb/BUILD.bazel @@ -0,0 +1,45 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") + +go_library( + name = "gluetidb", + srcs = ["glue.go"], + importpath = "github.com/pingcap/tidb/br/pkg/gluetidb", + visibility = ["//visibility:public"], + deps = [ + "//br/pkg/glue", + "//br/pkg/gluetikv", + "//br/pkg/logutil", + "//config", + "//ddl", + "//domain", + "//executor", + "//kv", + "//meta/autoid", + "//parser/model", + "//parser/mysql", + "//session", + "//sessionctx", + "@com_github_pingcap_errors//:errors", + "@com_github_pingcap_log//:log", + "@com_github_tikv_pd_client//:client", + "@org_uber_go_zap//:zap", + ], +) + +go_test( + name = "gluetidb_test", + timeout = "short", + srcs = ["glue_test.go"], + embed = [":gluetidb"], + flaky = True, + deps = [ + "//ddl", + "//kv", + "//meta", + "//parser/model", + "//sessionctx", + "//testkit", + "@com_github_pingcap_failpoint//:failpoint", + "@com_github_stretchr_testify//require", + ], +) diff --git a/br/pkg/gluetidb/glue.go b/br/pkg/gluetidb/glue.go index a8cc7f485599b..56c09a2efbe60 100644 --- a/br/pkg/gluetidb/glue.go +++ b/br/pkg/gluetidb/glue.go @@ -207,6 +207,35 @@ func (gs *tidbSession) CreatePlacementPolicy(ctx context.Context, policy *model. return d.CreatePlacementPolicyWithInfo(gs.se, policy, ddl.OnExistIgnore) } +<<<<<<< HEAD +======= +// SplitBatchCreateTable provide a way to split batch into small batch when batch size is large than 6 MB. +// The raft entry has limit size of 6 MB, a batch of CreateTables may hit this limitation +// TODO: shall query string be set for each split batch create, it looks does not matter if we set once for all. +func (gs *tidbSession) SplitBatchCreateTable(schema model.CIStr, infos []*model.TableInfo, cs ...ddl.CreateTableWithInfoConfigurier) error { + var err error + d := domain.GetDomain(gs.se).DDL() + + if err = d.BatchCreateTableWithInfo(gs.se, schema, infos, append(cs, ddl.OnExistIgnore)...); kv.ErrEntryTooLarge.Equal(err) { + log.Info("entry too large, split batch create table", zap.Int("num table", len(infos))) + if len(infos) == 1 { + return err + } + mid := len(infos) / 2 + err = gs.SplitBatchCreateTable(schema, infos[:mid], cs...) + if err != nil { + return err + } + err = gs.SplitBatchCreateTable(schema, infos[mid:], cs...) + if err != nil { + return err + } + return nil + } + return err +} + +>>>>>>> 41c1250c265 (br: reused table id is disabled when restore a brand-new cluster (#41358)) // CreateTables implements glue.BatchCreateTableSession. func (gs *tidbSession) CreateTables(ctx context.Context, tables map[string][]*model.TableInfo) error { d := domain.GetDomain(gs.se).DDL() @@ -235,8 +264,12 @@ func (gs *tidbSession) CreateTables(ctx context.Context, tables map[string][]*mo cloneTables = append(cloneTables, table) } gs.se.SetValue(sessionctx.QueryString, queryBuilder.String()) +<<<<<<< HEAD err := d.BatchCreateTableWithInfo(gs.se, dbName, cloneTables, ddl.OnExistIgnore) if err != nil { +======= + if err := gs.SplitBatchCreateTable(dbName, cloneTables, cs...); err != nil { +>>>>>>> 41c1250c265 (br: reused table id is disabled when restore a brand-new cluster (#41358)) //It is possible to failure when TiDB does not support model.ActionCreateTables. //In this circumstance, BatchCreateTableWithInfo returns errno.ErrInvalidDDLJob, //we fall back to old way that creating table one by one diff --git a/br/pkg/gluetidb/glue_test.go b/br/pkg/gluetidb/glue_test.go index 3aff118e47007..1d3398c37b342 100644 --- a/br/pkg/gluetidb/glue_test.go +++ b/br/pkg/gluetidb/glue_test.go @@ -16,6 +16,7 @@ package gluetidb import ( "context" +<<<<<<< HEAD "testing" "github.com/pingcap/tidb/br/pkg/glue" @@ -84,4 +85,195 @@ func TestTheSessionIsoation(t *testing.T) { req.NoError(session.(glue.BatchCreateTableSession).CreateTables(ctx, map[string][]*model.TableInfo{ "test": infos, })) +======= + "strconv" + "testing" + + "github.com/pingcap/failpoint" + "github.com/pingcap/tidb/ddl" + "github.com/pingcap/tidb/kv" + "github.com/pingcap/tidb/meta" + "github.com/pingcap/tidb/parser/model" + "github.com/pingcap/tidb/sessionctx" + "github.com/pingcap/tidb/testkit" + "github.com/stretchr/testify/require" +) + +// batch create table with table id reused +func TestSplitBatchCreateTableWithTableId(t *testing.T) { + store, dom := testkit.CreateMockStoreAndDomain(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("drop table if exists table_id_resued1") + tk.MustExec("drop table if exists table_id_resued2") + tk.MustExec("drop table if exists table_id_new") + + d := dom.DDL() + require.NotNil(t, d) + + infos1 := []*model.TableInfo{} + infos1 = append(infos1, &model.TableInfo{ + ID: 124, + Name: model.NewCIStr("table_id_resued1"), + }) + infos1 = append(infos1, &model.TableInfo{ + ID: 125, + Name: model.NewCIStr("table_id_resued2"), + }) + + se := &tidbSession{se: tk.Session()} + + // keep/reused table id verification + tk.Session().SetValue(sessionctx.QueryString, "skip") + err := se.SplitBatchCreateTable(model.NewCIStr("test"), infos1, ddl.AllocTableIDIf(func(ti *model.TableInfo) bool { + return false + })) + require.NoError(t, err) + + tk.MustQuery("select tidb_table_id from information_schema.tables where table_name = 'table_id_resued1'").Check(testkit.Rows("124")) + tk.MustQuery("select tidb_table_id from information_schema.tables where table_name = 'table_id_resued2'").Check(testkit.Rows("125")) + ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnOthers) + + // allocate new table id verification + // query the global id + var id int64 + err = kv.RunInNewTxn(ctx, store, true, func(_ context.Context, txn kv.Transaction) error { + m := meta.NewMeta(txn) + var err error + id, err = m.GenGlobalID() + return err + }) + + require.NoError(t, err) + + infos2 := []*model.TableInfo{} + infos2 = append(infos2, &model.TableInfo{ + ID: 124, + Name: model.NewCIStr("table_id_new"), + }) + + tk.Session().SetValue(sessionctx.QueryString, "skip") + err = se.SplitBatchCreateTable(model.NewCIStr("test"), infos2, ddl.AllocTableIDIf(func(ti *model.TableInfo) bool { + return true + })) + require.NoError(t, err) + + idGen, ok := tk.MustQuery("select tidb_table_id from information_schema.tables where table_name = 'table_id_new'").Rows()[0][0].(string) + require.True(t, ok) + idGenNum, err := strconv.ParseInt(idGen, 10, 64) + require.NoError(t, err) + require.Greater(t, idGenNum, id) + + // a empty table info with len(info3) = 0 + infos3 := []*model.TableInfo{} + + err = se.SplitBatchCreateTable(model.NewCIStr("test"), infos3, ddl.AllocTableIDIf(func(ti *model.TableInfo) bool { + return false + })) + require.NoError(t, err) +} + +// batch create table with table id reused +func TestSplitBatchCreateTable(t *testing.T) { + store, dom := testkit.CreateMockStoreAndDomain(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("drop table if exists table_1") + tk.MustExec("drop table if exists table_2") + tk.MustExec("drop table if exists table_3") + + d := dom.DDL() + require.NotNil(t, d) + + infos := []*model.TableInfo{} + infos = append(infos, &model.TableInfo{ + ID: 1234, + Name: model.NewCIStr("tables_1"), + }) + infos = append(infos, &model.TableInfo{ + ID: 1235, + Name: model.NewCIStr("tables_2"), + }) + infos = append(infos, &model.TableInfo{ + ID: 1236, + Name: model.NewCIStr("tables_3"), + }) + + se := &tidbSession{se: tk.Session()} + + // keep/reused table id verification + tk.Session().SetValue(sessionctx.QueryString, "skip") + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/RestoreBatchCreateTableEntryTooLarge", "return(1)")) + err := se.SplitBatchCreateTable(model.NewCIStr("test"), infos, ddl.AllocTableIDIf(func(ti *model.TableInfo) bool { + return false + })) + + require.NoError(t, err) + tk.MustQuery("show tables like '%tables_%'").Check(testkit.Rows("tables_1", "tables_2", "tables_3")) + jobs := tk.MustQuery("admin show ddl jobs").Rows() + require.Greater(t, len(jobs), 3) + // check table_1 + job1 := jobs[0] + require.Equal(t, "test", job1[1]) + require.Equal(t, "tables_3", job1[2]) + require.Equal(t, "create tables", job1[3]) + require.Equal(t, "public", job1[4]) + + // check table_2 + job2 := jobs[1] + require.Equal(t, "test", job2[1]) + require.Equal(t, "tables_2", job2[2]) + require.Equal(t, "create tables", job2[3]) + require.Equal(t, "public", job2[4]) + + // check table_3 + job3 := jobs[2] + require.Equal(t, "test", job3[1]) + require.Equal(t, "tables_1", job3[2]) + require.Equal(t, "create tables", job3[3]) + require.Equal(t, "public", job3[4]) + + // check reused table id + tk.MustQuery("select tidb_table_id from information_schema.tables where table_name = 'tables_1'").Check(testkit.Rows("1234")) + tk.MustQuery("select tidb_table_id from information_schema.tables where table_name = 'tables_2'").Check(testkit.Rows("1235")) + tk.MustQuery("select tidb_table_id from information_schema.tables where table_name = 'tables_3'").Check(testkit.Rows("1236")) + + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/RestoreBatchCreateTableEntryTooLarge")) +} + +// batch create table with table id reused +func TestSplitBatchCreateTableFailWithEntryTooLarge(t *testing.T) { + store, dom := testkit.CreateMockStoreAndDomain(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("drop table if exists table_1") + tk.MustExec("drop table if exists table_2") + tk.MustExec("drop table if exists table_3") + + d := dom.DDL() + require.NotNil(t, d) + + infos := []*model.TableInfo{} + infos = append(infos, &model.TableInfo{ + Name: model.NewCIStr("tables_1"), + }) + infos = append(infos, &model.TableInfo{ + Name: model.NewCIStr("tables_2"), + }) + infos = append(infos, &model.TableInfo{ + Name: model.NewCIStr("tables_3"), + }) + + se := &tidbSession{se: tk.Session()} + + tk.Session().SetValue(sessionctx.QueryString, "skip") + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/RestoreBatchCreateTableEntryTooLarge", "return(0)")) + err := se.SplitBatchCreateTable(model.NewCIStr("test"), infos, ddl.AllocTableIDIf(func(ti *model.TableInfo) bool { + return true + })) + + require.True(t, kv.ErrEntryTooLarge.Equal(err)) + + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/RestoreBatchCreateTableEntryTooLarge")) +>>>>>>> 41c1250c265 (br: reused table id is disabled when restore a brand-new cluster (#41358)) } diff --git a/ddl/ddl_api.go b/ddl/ddl_api.go index e9574e5dc0231..8b6a3ca94df3b 100644 --- a/ddl/ddl_api.go +++ b/ddl/ddl_api.go @@ -2409,7 +2409,20 @@ func (d *ddl) CreateTableWithInfo( func (d *ddl) BatchCreateTableWithInfo(ctx sessionctx.Context, dbName model.CIStr, infos []*model.TableInfo, +<<<<<<< HEAD onExist OnExist) error { +======= + cs ...CreateTableWithInfoConfigurier, +) error { + failpoint.Inject("RestoreBatchCreateTableEntryTooLarge", func(val failpoint.Value) { + injectBatchSize := val.(int) + if len(infos) > injectBatchSize { + failpoint.Return(kv.ErrEntryTooLarge) + } + }) + c := GetCreateTableWithInfoConfig(cs) + +>>>>>>> 41c1250c265 (br: reused table id is disabled when restore a brand-new cluster (#41358)) jobs := &model.Job{ BinlogInfo: &model.HistoryInfo{}, }