Skip to content

Commit

Permalink
*: fix bugs that creating index on clustered index table or table wit…
Browse files Browse the repository at this point in the history
…h the virtual generated column (#18114)
  • Loading branch information
wjhuang2016 authored Jun 19, 2020
1 parent df6898d commit 7daa4a7
Show file tree
Hide file tree
Showing 9 changed files with 138 additions and 74 deletions.
41 changes: 35 additions & 6 deletions ddl/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,27 +139,44 @@ type testDBSuite6 struct{ *testDBSuite }
type testDBSuite7 struct{ *testDBSuite }
type testSerialDBSuite struct{ *testDBSuite }

func (s *testDBSuite4) TestAddIndexWithPK(c *C) {
s.tk = testkit.NewTestKit(c, s.store)
s.tk.MustExec("use " + s.schemaName)

func testAddIndexWithPK(s *testSerialDBSuite, c *C) {
s.tk.MustExec("drop table if exists test_add_index_with_pk")
s.tk.MustExec("create table test_add_index_with_pk(a int not null, b int not null default '0', primary key(a))")
s.tk.MustExec("insert into test_add_index_with_pk values(1, 2)")
s.tk.MustExec("alter table test_add_index_with_pk add index idx (a)")
s.tk.MustQuery("select a from test_add_index_with_pk").Check(testkit.Rows("1"))
s.tk.MustExec("insert into test_add_index_with_pk values(2, 2)")
s.tk.MustExec("alter table test_add_index_with_pk add index idx1 (a, b)")
s.tk.MustQuery("select * from test_add_index_with_pk").Check(testkit.Rows("1 2", "2 2"))
s.tk.MustExec("drop table if exists test_add_index_with_pk1")
s.tk.MustExec("create table test_add_index_with_pk1(a int not null, b int not null default '0', c int, d int, primary key(c))")
s.tk.MustExec("insert into test_add_index_with_pk1 values(1, 1, 1, 1)")
s.tk.MustExec("alter table test_add_index_with_pk1 add index idx (c)")
s.tk.MustExec("insert into test_add_index_with_pk1 values(2, 2, 2, 2)")
s.tk.MustQuery("select * from test_add_index_with_pk1").Check(testkit.Rows("1 1 1 1", "2 2 2 2"))
s.tk.MustExec("drop table if exists test_add_index_with_pk2")
s.tk.MustExec("create table test_add_index_with_pk2(a int not null, b int not null default '0', c int unsigned, d int, primary key(c))")
s.tk.MustExec("insert into test_add_index_with_pk2 values(1, 1, 1, 1)")
s.tk.MustExec("alter table test_add_index_with_pk2 add index idx (c)")
s.tk.MustExec("insert into test_add_index_with_pk2 values(2, 2, 2, 2)")
s.tk.MustQuery("select * from test_add_index_with_pk2").Check(testkit.Rows("1 1 1 1", "2 2 2 2"))
s.tk.MustExec("drop table if exists t")
s.tk.MustExec("create table t (a int, b int, c int, primary key(a, b));")
s.tk.MustExec("insert into t values (1, 2, 3);")
s.tk.MustExec("create index idx on t (a, b);")
}

func (s *testSerialDBSuite) TestAddIndexWithPK(c *C) {
s.tk = testkit.NewTestKit(c, s.store)
s.tk.MustExec("use " + s.schemaName)
config.UpdateGlobal(func(conf *config.Config) {
conf.AlterPrimaryKey = false
})
defer config.RestoreFunc()()

testAddIndexWithPK(s, c)
s.tk.MustExec("set @@tidb_enable_clustered_index = 1;")
testAddIndexWithPK(s, c)
}

func (s *testDBSuite1) TestRenameIndex(c *C) {
Expand Down Expand Up @@ -3713,9 +3730,9 @@ func (s *testDBSuite4) TestIfExists(c *C) {
s.tk.MustQuery("show warnings").Check(testutil.RowsWithSep("|", "Note|1507|Error in list of partitions to p1"))
}

func (s *testDBSuite5) TestAddIndexForGeneratedColumn(c *C) {
s.tk = testkit.NewTestKit(c, s.store)
func testAddIndexForGeneratedColumn(s *testSerialDBSuite, c *C) {
s.tk.MustExec("use test_db")
s.tk.MustExec("drop table if exists t")
s.tk.MustExec("create table t(y year NOT NULL DEFAULT '2155')")
defer s.mustExec(c, "drop table t;")
for i := 0; i < 50; i++ {
Expand All @@ -3737,6 +3754,7 @@ func (s *testDBSuite5) TestAddIndexForGeneratedColumn(c *C) {
//s.mustExec(c, "alter table t drop index idx_y")

// Fix issue 9311.
s.tk.MustExec("drop table if exists gcai_table")
s.tk.MustExec("create table gcai_table (id int primary key);")
s.tk.MustExec("insert into gcai_table values(1);")
s.tk.MustExec("ALTER TABLE gcai_table ADD COLUMN d date DEFAULT '9999-12-31';")
Expand All @@ -3752,6 +3770,17 @@ func (s *testDBSuite5) TestAddIndexForGeneratedColumn(c *C) {
s.tk.MustQuery("select id1 from gcai_table use index(idx1)").Check(testkit.Rows("6"))
s.tk.MustExec("admin check table gcai_table")
}
func (s *testSerialDBSuite) TestAddIndexForGeneratedColumn(c *C) {
s.tk = testkit.NewTestKit(c, s.store)
config.UpdateGlobal(func(conf *config.Config) {
conf.AlterPrimaryKey = false
})
defer config.RestoreFunc()()

testAddIndexForGeneratedColumn(s, c)
s.tk.MustExec("set @@tidb_enable_clustered_index = 1;")
testAddIndexForGeneratedColumn(s, c)
}

func (s *testDBSuite5) TestModifyGeneratedColumn(c *C) {
tk := testkit.NewTestKit(c, s.store)
Expand Down
8 changes: 0 additions & 8 deletions ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -882,14 +882,6 @@ func (w *addIndexWorker) getIndexRecord(handle kv.Handle, recordKey []byte, rawR
idxVal := make([]types.Datum, len(idxInfo.Columns))
for j, v := range idxInfo.Columns {
col := cols[v.Offset]
if col.IsPKHandleColumn(t.Meta()) {
if mysql.HasUnsignedFlag(col.Flag) {
idxVal[j].SetUint64(uint64(handle.IntValue()))
} else {
idxVal[j].SetInt64(handle.IntValue())
}
continue
}
idxColumnVal, ok := w.rowMap[col.ID]
if ok {
idxVal[j] = idxColumnVal
Expand Down
9 changes: 2 additions & 7 deletions executor/batch_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,13 +86,8 @@ func getKeysNeedCheck(ctx context.Context, sctx sessionctx.Context, t table.Tabl
break
}
}
}
if t.Meta().IsCommonHandle {
pkIdx := tables.FindPrimaryIndex(t.Meta())
cols := t.Cols()
for _, idxCol := range pkIdx.Columns {
handleCols = append(handleCols, cols[idxCol.Offset])
}
} else {
handleCols = tables.TryGetCommonPkColumns(t)
}

var err error
Expand Down
17 changes: 4 additions & 13 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -1845,13 +1845,8 @@ func (b *executorBuilder) buildAnalyzeColumnsPushdown(task plannercore.AnalyzeCo
CmsketchDepth: &depth,
CmsketchWidth: &width,
}
if task.TblInfo != nil && task.TblInfo.IsCommonHandle {
pkIdx := tables.FindPrimaryIndex(task.TblInfo)
var pkColIds []int64
for _, idxCol := range pkIdx.Columns {
pkColIds = append(pkColIds, task.TblInfo.Columns[idxCol.Offset].ID)
}
e.analyzePB.ColReq.PrimaryColumnIds = pkColIds
if task.TblInfo != nil {
e.analyzePB.ColReq.PrimaryColumnIds = tables.TryGetCommonPkColumnIds(task.TblInfo)
}
b.err = plannercore.SetPBColumnsDefaultValue(b.ctx, e.analyzePB.ColReq.ColumnsInfo, cols)
job := &statistics.AnalyzeJob{DBName: task.DBName, TableName: task.TableName, PartitionName: task.PartitionName, JobInfo: autoAnalyze + "analyze columns"}
Expand Down Expand Up @@ -3017,12 +3012,8 @@ func newRowDecoder(ctx sessionctx.Context, schema *expression.Schema, tbl *model
}
}
if len(pkCols) == 0 {
if tbl.IsCommonHandle {
pkIdx := tables.FindPrimaryIndex(tbl)
for _, idxCol := range pkIdx.Columns {
pkCols = append(pkCols, tbl.Columns[idxCol.Offset].ID)
}
} else {
pkCols = tables.TryGetCommonPkColumnIds(tbl)
if len(pkCols) == 0 {
pkCols = []int64{0}
}
}
Expand Down
33 changes: 22 additions & 11 deletions executor/point_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package executor
import (
"context"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/parser/model"
"github.com/pingcap/parser/mysql"
Expand Down Expand Up @@ -354,6 +355,7 @@ func decodeRowValToChunk(e *baseExecutor, tblInfo *model.TableInfo, handle kv.Ha
}

func decodeOldRowValToChunk(e *baseExecutor, tblInfo *model.TableInfo, handle kv.Handle, rowVal []byte, chk *chunk.Chunk) error {
pkCols := tables.TryGetCommonPkColumnIds(tblInfo)
colID2CutPos := make(map[int64]int, e.schema.Len())
for _, col := range e.schema.Columns {
if _, ok := colID2CutPos[col.ID]; !ok {
Expand All @@ -374,7 +376,11 @@ func decodeOldRowValToChunk(e *baseExecutor, tblInfo *model.TableInfo, handle kv
chk.AppendNull(i)
continue
}
if tryDecodeFromHandle(tblInfo, i, col, handle, chk, decoder) {
ok, err := tryDecodeFromHandle(tblInfo, i, col, handle, chk, decoder, pkCols)
if err != nil {
return err
}
if ok {
continue
}
cutPos := colID2CutPos[col.ID]
Expand All @@ -395,23 +401,28 @@ func decodeOldRowValToChunk(e *baseExecutor, tblInfo *model.TableInfo, handle kv
return nil
}

func tryDecodeFromHandle(tblInfo *model.TableInfo, i int, col *expression.Column, handle kv.Handle, chk *chunk.Chunk, decoder *codec.Decoder) bool {
func tryDecodeFromHandle(tblInfo *model.TableInfo, i int, col *expression.Column, handle kv.Handle, chk *chunk.Chunk, decoder *codec.Decoder, pkCols []int64) (bool, error) {
if tblInfo.PKIsHandle && mysql.HasPriKeyFlag(col.RetType.Flag) {
chk.AppendInt64(i, handle.IntValue())
return true
return true, nil
}
if col.ID == model.ExtraHandleID {
chk.AppendInt64(i, handle.IntValue())
return true
}
if tblInfo.IsCommonHandle && mysql.HasPriKeyFlag(col.RetType.Flag) {
_, err := decoder.DecodeOne(handle.EncodedCol(i), i, col.RetType)
if err != nil {
return false
return true, nil
}
// Try to decode common handle.
if mysql.HasPriKeyFlag(col.RetType.Flag) {
for i, hid := range pkCols {
if col.ID == hid {
_, err := decoder.DecodeOne(handle.EncodedCol(i), i, col.RetType)
if err != nil {
return false, errors.Trace(err)
}
return true, nil
}
}
return true
}
return false
return false, nil
}

func getColInfoByID(tbl *model.TableInfo, colID int64) *model.ColumnInfo {
Expand Down
13 changes: 2 additions & 11 deletions planner/core/plan_to_pb.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,13 +104,7 @@ func (p *PhysicalLimit) ToPB(ctx sessionctx.Context) (*tipb.Executor, error) {

// ToPB implements PhysicalPlan ToPB interface.
func (p *PhysicalTableScan) ToPB(ctx sessionctx.Context) (*tipb.Executor, error) {
var pkColIds []int64
if p.Table.IsCommonHandle {
pkIdx := tables.FindPrimaryIndex(p.Table)
for _, idxCol := range pkIdx.Columns {
pkColIds = append(pkColIds, p.Table.Columns[idxCol.Offset].ID)
}
}
pkColIds := tables.TryGetCommonPkColumnIds(p.Table)
tsExec := &tipb.TableScan{
TableId: p.Table.ID,
Columns: util.ColumnsToProto(p.Columns, p.Table.PKIsHandle),
Expand Down Expand Up @@ -161,10 +155,7 @@ func (p *PhysicalIndexScan) ToPB(ctx sessionctx.Context) (*tipb.Executor, error)
}
var pkColIds []int64
if p.NeedCommonHandle {
pkIdx := tables.FindPrimaryIndex(p.Table)
for _, idxCol := range pkIdx.Columns {
pkColIds = append(pkColIds, p.Table.Columns[idxCol.Offset].ID)
}
pkColIds = tables.TryGetCommonPkColumnIds(p.Table)
}
idxExec := &tipb.IndexScan{
TableId: p.Table.ID,
Expand Down
27 changes: 27 additions & 0 deletions table/tables/tables.go
Original file line number Diff line number Diff line change
Expand Up @@ -497,6 +497,33 @@ func FindPrimaryIndex(tblInfo *model.TableInfo) *model.IndexInfo {
return pkIdx
}

// TryGetCommonPkColumnIds get the IDs of primary key column if the table has common handle.
func TryGetCommonPkColumnIds(tbl *model.TableInfo) []int64 {
var pkColIds []int64
if !tbl.IsCommonHandle {
return nil
}
pkIdx := FindPrimaryIndex(tbl)
for _, idxCol := range pkIdx.Columns {
pkColIds = append(pkColIds, tbl.Columns[idxCol.Offset].ID)
}
return pkColIds
}

// TryGetCommonPkColumns get the primary key columns if the table has common handle.
func TryGetCommonPkColumns(tbl table.Table) []*table.Column {
var pkCols []*table.Column
if !tbl.Meta().IsCommonHandle {
return nil
}
pkIdx := FindPrimaryIndex(tbl.Meta())
cols := tbl.Cols()
for _, idxCol := range pkIdx.Columns {
pkCols = append(pkCols, cols[idxCol.Offset])
}
return pkCols
}

// AddRecord implements table.Table AddRecord interface.
func (t *TableCommon) AddRecord(ctx sessionctx.Context, r []types.Datum, opts ...table.AddRecordOption) (recordID kv.Handle, err error) {
var opt table.AddRecordOpt
Expand Down
10 changes: 1 addition & 9 deletions util/admin/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -444,15 +444,7 @@ func iterRecords(sessCtx sessionctx.Context, retriever kv.Retriever, t table.Tab
}
data := make([]types.Datum, 0, len(cols))
for _, col := range cols {
if col.IsPKHandleColumn(t.Meta()) {
if mysql.HasUnsignedFlag(col.Flag) {
data = append(data, types.NewUintDatum(uint64(handle.IntValue())))
} else {
data = append(data, types.NewIntDatum(handle.IntValue()))
}
} else {
data = append(data, rowMap[col.ID])
}
data = append(data, rowMap[col.ID])
}
more, err := fn(handle.IntValue(), data, cols)
if !more || err != nil {
Expand Down
54 changes: 45 additions & 9 deletions util/rowDecoder/decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/codec"
"github.com/pingcap/tidb/util/rowcodec"
)

Expand All @@ -44,6 +45,7 @@ type RowDecoder struct {
colTypes map[int64]*types.FieldType
haveGenColumn bool
defaultVals []types.Datum
pkCols []int64
}

// NewRowDecoder returns a new RowDecoder.
Expand All @@ -56,11 +58,6 @@ func NewRowDecoder(tbl table.Table, decodeColMap map[int64]Column) *RowDecoder {
haveGenCol = true
}
}
if !haveGenCol {
return &RowDecoder{
colTypes: colFieldMap,
}
}

cols := tbl.Cols()
tps := make([]*types.FieldType, len(cols))
Expand All @@ -74,9 +71,45 @@ func NewRowDecoder(tbl table.Table, decodeColMap map[int64]Column) *RowDecoder {
colTypes: colFieldMap,
haveGenColumn: haveGenCol,
defaultVals: make([]types.Datum, len(cols)),
pkCols: tables.TryGetCommonPkColumnIds(tbl.Meta()),
}
}

func (rd *RowDecoder) tryDecodeFromHandleAndSetRow(dCol Column, handle kv.Handle, row map[int64]types.Datum, decodeLoc *time.Location) (bool, error) {
if handle == nil {
return false, nil
}
colInfo := dCol.Col.ColumnInfo
if dCol.Col.IsPKHandleColumn(rd.tbl.Meta()) {
if mysql.HasUnsignedFlag(colInfo.Flag) {
row[colInfo.ID] = types.NewUintDatum(uint64(handle.IntValue()))
rd.mutRow.SetValue(colInfo.Offset, uint64(handle.IntValue()))
} else {
row[colInfo.ID] = types.NewIntDatum(handle.IntValue())
rd.mutRow.SetValue(colInfo.Offset, handle.IntValue())
}
return true, nil
}
// Try to decode common handle.
if mysql.HasPriKeyFlag(dCol.Col.Flag) {
for i, hid := range rd.pkCols {
if dCol.Col.ID == hid {
_, d, err := codec.DecodeOne(handle.EncodedCol(i))
if err != nil {
return false, errors.Trace(err)
}
if d, err = tablecodec.Unflatten(d, &dCol.Col.FieldType, decodeLoc); err != nil {
return false, err
}
row[colInfo.ID] = d
rd.mutRow.SetValue(colInfo.Offset, d)
return true, nil
}
}
}
return false, nil
}

// DecodeAndEvalRowWithMap decodes a byte slice into datums and evaluates the generated column value.
func (rd *RowDecoder) DecodeAndEvalRowWithMap(ctx sessionctx.Context, handle kv.Handle, b []byte, decodeLoc, sysLoc *time.Location, row map[int64]types.Datum) (map[int64]types.Datum, error) {
var err error
Expand All @@ -88,17 +121,20 @@ func (rd *RowDecoder) DecodeAndEvalRowWithMap(ctx sessionctx.Context, handle kv.
if err != nil {
return nil, err
}
if !rd.haveGenColumn {
return row, nil
}

for _, dCol := range rd.columns {
colInfo := dCol.Col.ColumnInfo
val, ok := row[colInfo.ID]
if ok || dCol.GenExpr != nil {
rd.mutRow.SetValue(colInfo.Offset, val.GetValue())
continue
}
ok, err := rd.tryDecodeFromHandleAndSetRow(dCol, handle, row, decodeLoc)
if err != nil {
return nil, err
}
if ok {
continue
}
// Get the default value of the column in the generated column expression.
val, err = tables.GetColDefaultValue(ctx, dCol.Col, rd.defaultVals)
if err != nil {
Expand Down

0 comments on commit 7daa4a7

Please sign in to comment.