Skip to content

Commit

Permalink
*: support auto_random on composite clustered primary key (#38617) (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored Feb 13, 2023
1 parent ac6560f commit 3450c19
Show file tree
Hide file tree
Showing 4 changed files with 105 additions and 24 deletions.
56 changes: 50 additions & 6 deletions ddl/ddl_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -1419,7 +1419,20 @@ func containsColumnOption(colDef *ast.ColumnDef, opTp ast.ColumnOptionType) bool

// IsAutoRandomColumnID returns true if the given column ID belongs to an auto_random column.
func IsAutoRandomColumnID(tblInfo *model.TableInfo, colID int64) bool {
return tblInfo.PKIsHandle && tblInfo.ContainsAutoRandomBits() && tblInfo.GetPkColInfo().ID == colID
if !tblInfo.ContainsAutoRandomBits() {
return false
}
if tblInfo.PKIsHandle {
return tblInfo.GetPkColInfo().ID == colID
} else if tblInfo.IsCommonHandle {
pk := tables.FindPrimaryIndex(tblInfo)
if pk == nil {
return false
}
offset := pk.Columns[0].Offset
return tblInfo.Columns[offset].ID == colID
}
return false
}

func checkGeneratedColumn(ctx sessionctx.Context, colDefs []*ast.ColumnDef) error {
Expand Down Expand Up @@ -1686,17 +1699,31 @@ func getPrimaryKey(tblInfo *model.TableInfo) *model.IndexInfo {
}

func setTableAutoRandomBits(ctx sessionctx.Context, tbInfo *model.TableInfo, colDefs []*ast.ColumnDef) error {
pkColName := tbInfo.GetPkName()
for _, col := range colDefs {
if containsColumnOption(col, ast.ColumnOptionAutoRandom) {
if col.Tp.GetType() != mysql.TypeLonglong {
return dbterror.ErrInvalidAutoRandom.GenWithStackByArgs(
fmt.Sprintf(autoid.AutoRandomOnNonBigIntColumn, types.TypeStr(col.Tp.GetType())))
}
if !tbInfo.PKIsHandle || col.Name.Name.L != pkColName.L {
errMsg := fmt.Sprintf(autoid.AutoRandomPKisNotHandleErrMsg, col.Name.Name.O)
return dbterror.ErrInvalidAutoRandom.GenWithStackByArgs(errMsg)
switch {
case tbInfo.PKIsHandle:
if tbInfo.GetPkName().L != col.Name.Name.L {
errMsg := fmt.Sprintf(autoid.AutoRandomMustFirstColumnInPK, col.Name.Name.O)
return dbterror.ErrInvalidAutoRandom.GenWithStackByArgs(errMsg)
}
case tbInfo.IsCommonHandle:
pk := tables.FindPrimaryIndex(tbInfo)
if pk == nil {
return dbterror.ErrInvalidAutoRandom.GenWithStackByArgs(autoid.AutoRandomNoClusteredPKErrMsg)
}
if col.Name.Name.L != pk.Columns[0].Name.L {
errMsg := fmt.Sprintf(autoid.AutoRandomMustFirstColumnInPK, col.Name.Name.O)
return dbterror.ErrInvalidAutoRandom.GenWithStackByArgs(errMsg)
}
default:
return dbterror.ErrInvalidAutoRandom.GenWithStackByArgs(autoid.AutoRandomNoClusteredPKErrMsg)
}

if containsColumnOption(col, ast.ColumnOptionAutoIncrement) {
return dbterror.ErrInvalidAutoRandom.GenWithStackByArgs(autoid.AutoRandomIncompatibleWithAutoIncErrMsg)
}
Expand Down Expand Up @@ -4642,9 +4669,26 @@ func checkIndexInModifiableColumns(columns []*model.ColumnInfo, idxColumns []*mo
return nil
}

func isClusteredPKColumn(col *table.Column, tblInfo *model.TableInfo) bool {
switch {
case tblInfo.PKIsHandle:
return mysql.HasPriKeyFlag(col.GetFlag())
case tblInfo.IsCommonHandle:
pk := tables.FindPrimaryIndex(tblInfo)
for _, c := range pk.Columns {
if c.Name.L == col.Name.L {
return true
}
}
return false
default:
return false
}
}

func checkAutoRandom(tableInfo *model.TableInfo, originCol *table.Column, specNewColumn *ast.ColumnDef) (uint64, error) {
var oldRandBits uint64
if originCol.IsPKHandleColumn(tableInfo) {
if isClusteredPKColumn(originCol, tableInfo) {
oldRandBits = tableInfo.AutoRandomBits
}
newRandBits, err := extractAutoRandomBitsFromColDef(specNewColumn)
Expand Down
52 changes: 36 additions & 16 deletions ddl/serial_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -838,8 +838,11 @@ func TestAutoRandom(t *testing.T) {
require.EqualError(t, err, dbterror.ErrInvalidAutoRandom.GenWithStackByArgs(fmt.Sprintf(errMsg, args...)).Error())
}

assertPKIsNotHandle := func(sql, errCol string) {
assertInvalidAutoRandomErr(sql, autoid.AutoRandomPKisNotHandleErrMsg, errCol)
assertNotFirstColPK := func(sql, errCol string) {
assertInvalidAutoRandomErr(sql, autoid.AutoRandomMustFirstColumnInPK, errCol)
}
assertNoClusteredPK := func(sql string) {
assertInvalidAutoRandomErr(sql, autoid.AutoRandomNoClusteredPKErrMsg)
}
assertAlterValue := func(sql string) {
assertInvalidAutoRandomErr(sql, autoid.AutoRandomAlterErrMsg)
Expand Down Expand Up @@ -882,36 +885,36 @@ func TestAutoRandom(t *testing.T) {
tk.MustExec("drop table t")
}

// Only bigint column can set auto_random
// Only bigint column can set auto_random.
assertBigIntOnly("create table t (a char primary key auto_random(3), b int)", "char")
assertBigIntOnly("create table t (a varchar(255) primary key auto_random(3), b int)", "varchar")
assertBigIntOnly("create table t (a timestamp primary key auto_random(3), b int)", "timestamp")
assertBigIntOnly("create table t (a timestamp auto_random(3), b int, primary key (a, b) clustered)", "timestamp")

// PKIsHandle, but auto_random is defined on non-primary key.
assertPKIsNotHandle("create table t (a bigint auto_random (3) primary key, b bigint auto_random (3))", "b")
assertPKIsNotHandle("create table t (a bigint auto_random (3), b bigint auto_random(3), primary key(a))", "b")
assertPKIsNotHandle("create table t (a bigint auto_random (3), b bigint auto_random(3) primary key)", "a")
// Clustered, but auto_random is defined on non-primary key.
assertNotFirstColPK("create table t (a bigint auto_random (3) primary key, b bigint auto_random (3))", "b")
assertNotFirstColPK("create table t (a bigint auto_random (3), b bigint auto_random(3), primary key(a))", "b")
assertNotFirstColPK("create table t (a bigint auto_random (3), b bigint auto_random(3) primary key)", "a")
assertNotFirstColPK("create table t (a bigint auto_random, b bigint, primary key (b, a) clustered);", "a")

// PKIsNotHandle: no primary key.
assertPKIsNotHandle("create table t (a bigint auto_random(3), b int)", "a")
// PKIsNotHandle: primary key is not a single column.
assertPKIsNotHandle("create table t (a bigint auto_random(3), b bigint, primary key (a, b))", "a")
assertPKIsNotHandle("create table t (a bigint auto_random(3), b int, c char, primary key (a, c))", "a")
// No primary key.
assertNoClusteredPK("create table t (a bigint auto_random(3), b int)")

// PKIsNotHandle: nonclustered integer primary key.
assertPKIsNotHandle("create table t (a bigint auto_random(3) primary key nonclustered, b int)", "a")
assertPKIsNotHandle("create table t (a bigint auto_random(3) primary key nonclustered, b int)", "a")
assertPKIsNotHandle("create table t (a int, b bigint auto_random(3) primary key nonclustered)", "b")
// No clustered primary key.
assertNoClusteredPK("create table t (a bigint auto_random(3) primary key nonclustered, b int)")
assertNoClusteredPK("create table t (a int, b bigint auto_random(3) primary key nonclustered)")

// Can not set auto_random along with auto_increment.
assertWithAutoInc("create table t (a bigint auto_random(3) primary key auto_increment)")
assertWithAutoInc("create table t (a bigint primary key auto_increment auto_random(3))")
assertWithAutoInc("create table t (a bigint auto_increment primary key auto_random(3))")
assertWithAutoInc("create table t (a bigint auto_random(3) auto_increment, primary key (a))")
assertWithAutoInc("create table t (a bigint auto_random(3) auto_increment, b int, primary key (a, b) clustered)")

// Can not set auto_random along with default.
assertDefault("create table t (a bigint auto_random primary key default 3)")
assertDefault("create table t (a bigint auto_random(2) primary key default 5)")
assertDefault("create table t (a bigint auto_random(2) default 5, b int, primary key (a, b) clustered)")
mustExecAndDrop("create table t (a bigint auto_random primary key)", func() {
assertDefault("alter table t modify column a bigint auto_random default 3")
assertDefault("alter table t alter column a set default 3")
Expand All @@ -920,12 +923,14 @@ func TestAutoRandom(t *testing.T) {
// Overflow data type max length.
assertMaxOverflow("create table t (a bigint auto_random(64) primary key)", "a", 64)
assertMaxOverflow("create table t (a bigint auto_random(16) primary key)", "a", 16)
assertMaxOverflow("create table t (a bigint auto_random(16), b int, primary key (a, b) clustered)", "a", 16)
mustExecAndDrop("create table t (a bigint auto_random(5) primary key)", func() {
assertMaxOverflow("alter table t modify a bigint auto_random(64)", "a", 64)
assertMaxOverflow("alter table t modify a bigint auto_random(16)", "a", 16)
})

assertNonPositive("create table t (a bigint auto_random(0) primary key)")
assertNonPositive("create table t (a bigint auto_random(0), b int, primary key (a, b) clustered)")
tk.MustGetErrMsg("create table t (a bigint auto_random(-1) primary key)",
`[parser:1064]You have an error in your SQL syntax; check the manual that corresponds to your TiDB version for the right syntax to use line 1 column 38 near "-1) primary key)" `)

Expand All @@ -935,18 +940,26 @@ func TestAutoRandom(t *testing.T) {
mustExecAndDrop("create table t (a bigint auto_random(15) primary key)")
mustExecAndDrop("create table t (a bigint primary key auto_random(4))")
mustExecAndDrop("create table t (a bigint auto_random(4), primary key (a))")
mustExecAndDrop("create table t (a bigint auto_random(3), b bigint, primary key (a, b) clustered)")
mustExecAndDrop("create table t (a bigint auto_random(3), b int, c char, primary key (a, c) clustered)")

// Increase auto_random bits.
mustExecAndDrop("create table t (a bigint auto_random(5) primary key)", func() {
tk.MustExec("alter table t modify a bigint auto_random(8)")
tk.MustExec("alter table t modify a bigint auto_random(10)")
tk.MustExec("alter table t modify a bigint auto_random(12)")
})
mustExecAndDrop("create table t (a bigint auto_random(5), b char(255), primary key (a, b) clustered)", func() {
tk.MustExec("alter table t modify a bigint auto_random(8)")
tk.MustExec("alter table t modify a bigint auto_random(10)")
tk.MustExec("alter table t modify a bigint auto_random(12)")
})

// Auto_random can occur multiple times like other column attributes.
mustExecAndDrop("create table t (a bigint auto_random(3) auto_random(2) primary key)")
mustExecAndDrop("create table t (a bigint, b bigint auto_random(3) primary key auto_random(2))")
mustExecAndDrop("create table t (a bigint auto_random(1) auto_random(2) auto_random(3), primary key (a))")
mustExecAndDrop("create table t (a bigint auto_random(1) auto_random(2) auto_random(3), b int, primary key (a, b) clustered)")

// Add/drop the auto_random attribute is not allowed.
mustExecAndDrop("create table t (a bigint auto_random(3) primary key)", func() {
Expand All @@ -958,6 +971,10 @@ func TestAutoRandom(t *testing.T) {
assertAlterValue("alter table t modify column c bigint")
assertAlterValue("alter table t change column c d bigint")
})
mustExecAndDrop("create table t (a bigint, b char, c bigint auto_random(3), primary key(c, a) clustered)", func() {
assertAlterValue("alter table t modify column c bigint")
assertAlterValue("alter table t change column c d bigint")
})
mustExecAndDrop("create table t (a bigint primary key)", func() {
assertOnlyChangeFromAutoIncPK("alter table t modify column a bigint auto_random(3)")
})
Expand Down Expand Up @@ -985,6 +1002,9 @@ func TestAutoRandom(t *testing.T) {
mustExecAndDrop("create table t (a bigint auto_random(10) primary key)", func() {
assertDecreaseBitErr("alter table t modify column a bigint auto_random(1)")
})
mustExecAndDrop("create table t (a bigint auto_random(10), b int, primary key (a, b) clustered)", func() {
assertDecreaseBitErr("alter table t modify column a bigint auto_random(6)")
})

originStep := autoid.GetStep()
autoid.SetStep(1)
Expand Down
15 changes: 15 additions & 0 deletions executor/ddl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1162,6 +1162,21 @@ func TestAutoRandomTableOption(t *testing.T) {
require.Contains(t, err.Error(), autoid.AutoRandomRebaseNotApplicable)
}

func TestAutoRandomClusteredPrimaryKey(t *testing.T) {
store, clean := testkit.CreateMockStore(t)
defer clean()
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("create table t (a bigint auto_random(5), b int, primary key (a, b) clustered);")
tk.MustExec("insert into t (b) values (1);")
tk.MustExec("set @@allow_auto_random_explicit_insert = 0;")
tk.MustGetErrCode("insert into t values (100, 2);", errno.ErrInvalidAutoRandom)
tk.MustExec("set @@allow_auto_random_explicit_insert = 1;")
tk.MustExec("insert into t values (100, 2);")
tk.MustQuery("select b from t order by b;").Check(testkit.Rows("1", "2"))
tk.MustExec("alter table t modify column a bigint auto_random(6);")
}

// Test filter different kind of allocators.
// In special ddl type, for example:
// 1: ActionRenameTable : it will abandon all the old allocators.
Expand Down
6 changes: 4 additions & 2 deletions meta/autoid/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,10 @@ var (
)

const (
// AutoRandomPKisNotHandleErrMsg indicates the auto_random column attribute is defined on a non-primary key column, or the primary key is nonclustered.
AutoRandomPKisNotHandleErrMsg = "column %s is not the integer primary key, or the primary key is nonclustered"
// AutoRandomMustFirstColumnInPK is reported when auto_random is not the first column in primary key.
AutoRandomMustFirstColumnInPK = "column '%s' must be the first column in primary key"
// AutoRandomNoClusteredPKErrMsg indicates the primary key is not clustered.
AutoRandomNoClusteredPKErrMsg = "auto_random is only supported on the tables with clustered primary key"
// AutoRandomIncompatibleWithAutoIncErrMsg is reported when auto_random and auto_increment are specified on the same column.
AutoRandomIncompatibleWithAutoIncErrMsg = "auto_random is incompatible with auto_increment"
// AutoRandomIncompatibleWithDefaultValueErrMsg is reported when auto_random and default are specified on the same column.
Expand Down

0 comments on commit 3450c19

Please sign in to comment.