Skip to content

Commit

Permalink
ddl, table: allow using SHARD_ROW_ID_BITS with auto_incremental colum…
Browse files Browse the repository at this point in the history
…ns (#10759)
  • Loading branch information
bb7133 authored and tiancaiamao committed Jun 12, 2019
1 parent cd8c4e6 commit 7d27fa6
Show file tree
Hide file tree
Showing 9 changed files with 174 additions and 66 deletions.
2 changes: 1 addition & 1 deletion ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ var (
"unsupported drop integer primary key")
errUnsupportedCharset = terror.ClassDDL.New(codeUnsupportedCharset, "unsupported charset %s collate %s")

errUnsupportedShardRowIDBits = terror.ClassDDL.New(codeUnsupportedShardRowIDBits, "unsupported shard_row_id_bits for table with auto_increment column.")
errUnsupportedShardRowIDBits = terror.ClassDDL.New(codeUnsupportedShardRowIDBits, "unsupported shard_row_id_bits for table with primary key as row id.")
errBlobKeyWithoutLength = terror.ClassDDL.New(codeBlobKeyWithoutLength, "index for BLOB/TEXT column must specify a key length")
errIncorrectPrefixKey = terror.ClassDDL.New(codeIncorrectPrefixKey, "Incorrect prefix key; the used key part isn't a string, the used length is longer than the key part, or the storage engine doesn't support unique prefix keys")
errTooLongKey = terror.ClassDDL.New(codeTooLongKey,
Expand Down
10 changes: 4 additions & 6 deletions ddl/ddl_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -1647,8 +1647,7 @@ func handleTableOptions(options []*ast.TableOption, tbInfo *model.TableInfo) err
case ast.TableOptionCompression:
tbInfo.Compression = op.StrValue
case ast.TableOptionShardRowID:
ok, _ := hasAutoIncrementColumn(tbInfo)
if ok && op.UintValue != 0 {
if op.UintValue > 0 && tbInfo.PKIsHandle {
return errUnsupportedShardRowIDBits
}
tbInfo.ShardRowIDBits = op.UintValue
Expand Down Expand Up @@ -1907,14 +1906,13 @@ func (d *ddl) ShardRowID(ctx sessionctx.Context, tableIdent ast.Ident, uVal uint
if err != nil {
return errors.Trace(err)
}
ok, _ := hasAutoIncrementColumn(t.Meta())
if ok && uVal != 0 {
return errUnsupportedShardRowIDBits
}
if uVal == t.Meta().ShardRowIDBits {
// Nothing need to do.
return nil
}
if uVal > 0 && t.Meta().PKIsHandle {
return errUnsupportedShardRowIDBits
}
err = verifyNoOverflowShardBits(d.sessPool, t, uVal)
if err != nil {
return err
Expand Down
105 changes: 84 additions & 21 deletions executor/ddl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"context"
"fmt"
"math"
"strconv"
"strings"
"time"

Expand All @@ -27,6 +28,8 @@ import (
"github.com/pingcap/tidb/ddl"
ddlutil "github.com/pingcap/tidb/ddl/util"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/meta"
"github.com/pingcap/tidb/meta/autoid"
plannercore "github.com/pingcap/tidb/planner/core"
"github.com/pingcap/tidb/sessionctx"
Expand Down Expand Up @@ -522,38 +525,98 @@ func (s *testSuite3) TestShardRowIDBits(c *C) {
for i := 0; i < 100; i++ {
tk.MustExec(fmt.Sprintf("insert t values (%d)", i))
}
tbl, err := domain.GetDomain(tk.Se).InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("t"))
dom := domain.GetDomain(tk.Se)
tbl, err := dom.InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("t"))
c.Assert(err, IsNil)
var hasShardedID bool
var count int
c.Assert(tk.Se.NewTxn(context.Background()), IsNil)
err = tbl.IterRecords(tk.Se, tbl.FirstKey(), nil, func(h int64, rec []types.Datum, cols []*table.Column) (more bool, err error) {
c.Assert(h, GreaterEqual, int64(0))
first8bits := h >> 56
if first8bits > 0 {
hasShardedID = true
}
count++
return true, nil

assertCountAndShard := func(t table.Table, expectCount int) {
var hasShardedID bool
var count int
c.Assert(tk.Se.NewTxn(context.Background()), IsNil)
err = t.IterRecords(tk.Se, t.FirstKey(), nil, func(h int64, rec []types.Datum, cols []*table.Column) (more bool, err error) {
c.Assert(h, GreaterEqual, int64(0))
first8bits := h >> 56
if first8bits > 0 {
hasShardedID = true
}
count++
return true, nil
})
c.Assert(err, IsNil)
c.Assert(count, Equals, expectCount)
c.Assert(hasShardedID, IsTrue)
}

assertCountAndShard(tbl, 100)

// After PR 10759, shard_row_id_bits is supported with tables with auto_increment column.
tk.MustExec("create table auto (id int not null auto_increment unique) shard_row_id_bits = 4")
tk.MustExec("alter table auto shard_row_id_bits = 5")
tk.MustExec("drop table auto")
tk.MustExec("create table auto (id int not null auto_increment unique) shard_row_id_bits = 0")
tk.MustExec("alter table auto shard_row_id_bits = 5")
tk.MustExec("drop table auto")
tk.MustExec("create table auto (id int not null auto_increment unique)")
tk.MustExec("alter table auto shard_row_id_bits = 5")
tk.MustExec("drop table auto")
tk.MustExec("create table auto (id int not null auto_increment unique) shard_row_id_bits = 4")
tk.MustExec("alter table auto shard_row_id_bits = 0")
tk.MustExec("drop table auto")

// After PR 10759, shard_row_id_bits is not supported with pk_is_handle tables.
err = tk.ExecToErr("create table auto (id int not null auto_increment primary key, b int) shard_row_id_bits = 4")
c.Assert(err.Error(), Equals, "[ddl:207]unsupported shard_row_id_bits for table with primary key as row id.")
tk.MustExec("create table auto (id int not null auto_increment primary key, b int) shard_row_id_bits = 0")
err = tk.ExecToErr("alter table auto shard_row_id_bits = 5")
c.Assert(err.Error(), Equals, "[ddl:207]unsupported shard_row_id_bits for table with primary key as row id.")
tk.MustExec("alter table auto shard_row_id_bits = 0")

// Hack an existing table with shard_row_id_bits and primary key as handle
db, ok := dom.InfoSchema().SchemaByName(model.NewCIStr("test"))
c.Assert(ok, IsTrue)
tbl, err = dom.InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("auto"))
tblInfo := tbl.Meta()
tblInfo.ShardRowIDBits = 5
tblInfo.MaxShardRowIDBits = 5

kv.RunInNewTxn(s.store, false, func(txn kv.Transaction) error {
m := meta.NewMeta(txn)
_, err = m.GenSchemaVersion()
c.Assert(err, IsNil)
c.Assert(m.UpdateTable(db.ID, tblInfo), IsNil)
return nil
})
err = dom.Reload()
c.Assert(err, IsNil)
c.Assert(count, Equals, 100)
c.Assert(hasShardedID, IsTrue)

// Test that audo_increment column can not use shard_row_id_bits.
_, err = tk.Exec("create table auto (id int not null auto_increment primary key) shard_row_id_bits = 4")
c.Assert(err, NotNil)
tk.MustExec("create table auto (id int not null auto_increment primary key) shard_row_id_bits = 0")
_, err = tk.Exec("alter table auto shard_row_id_bits = 4")
c.Assert(err, NotNil)
tk.MustExec("insert auto(b) values (1), (3), (5)")
tk.MustQuery("select id from auto order by id").Check(testkit.Rows("1", "2", "3"))

tk.MustExec("alter table auto shard_row_id_bits = 0")
tk.MustExec("drop table auto")

// Test shard_row_id_bits with auto_increment column
tk.MustExec("create table auto (a int, b int auto_increment unique) shard_row_id_bits = 15")
for i := 0; i < 100; i++ {
tk.MustExec(fmt.Sprintf("insert auto(a) values (%d)", i))
}
tbl, err = dom.InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("auto"))
assertCountAndShard(tbl, 100)
prevB, err := strconv.Atoi(tk.MustQuery("select b from auto where a=0").Rows()[0][0].(string))
c.Assert(err, IsNil)
for i := 1; i < 100; i++ {
b, err := strconv.Atoi(tk.MustQuery(fmt.Sprintf("select b from auto where a=%d", i)).Rows()[0][0].(string))
c.Assert(err, IsNil)
c.Assert(b, Greater, prevB)
prevB = b
}

// Test overflow
tk.MustExec("drop table if exists t1")
tk.MustExec("create table t1 (a int) shard_row_id_bits = 15")
defer tk.MustExec("drop table if exists t1")

tbl, err = domain.GetDomain(tk.Se).InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("t1"))
tbl, err = dom.InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("t1"))
c.Assert(err, IsNil)
maxID := 1<<(64-15-1) - 1
err = tbl.RebaseAutoID(tk.Se, int64(maxID)-1, false)
Expand Down
2 changes: 1 addition & 1 deletion executor/insert_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -501,7 +501,7 @@ func (e *InsertValues) adjustAutoIncrementDatum(d types.Datum, hasValue bool, c
// Change NULL to auto id.
// Change value 0 to auto id, if NoAutoValueOnZero SQL mode is not set.
if d.IsNull() || e.ctx.GetSessionVars().SQLMode&mysql.ModeNoAutoValueOnZero == 0 {
recordID, err = e.Table.AllocAutoID(e.ctx)
recordID, err = e.Table.AllocAutoIncrementValue(e.ctx)
if e.filterErr(err) != nil {
return types.Datum{}, err
}
Expand Down
Loading

0 comments on commit 7d27fa6

Please sign in to comment.