Skip to content

Commit

Permalink
admin: refine admin check decoder
Browse files Browse the repository at this point in the history
  • Loading branch information
crazycs520 committed Oct 10, 2018
1 parent 3104c87 commit 09ee890
Show file tree
Hide file tree
Showing 6 changed files with 47 additions and 101 deletions.
2 changes: 1 addition & 1 deletion executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,7 @@ type CheckTableExec struct {
done bool
is infoschema.InfoSchema

genExprs map[string]expression.Expression
genExprs map[model.TableColumnID]expression.Expression
}

// Open implements the Executor Open interface.
Expand Down
8 changes: 4 additions & 4 deletions model/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ package model

import (
"encoding/json"
"fmt"
"strings"
"time"

Expand Down Expand Up @@ -557,7 +556,8 @@ func collationToProto(c string) int32 {
return int32(mysql.DefaultCollationID)
}

// GetTableColumnID gets a ID of a column with table ID
func GetTableColumnID(tableInfo *TableInfo, col *ColumnInfo) string {
return fmt.Sprintf("%d_%d", tableInfo.ID, col.ID)
// TableColumnID is composed by table ID and column ID.
type TableColumnID struct {
TableID int64
ColumnID int64
}
2 changes: 1 addition & 1 deletion planner/core/common_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ type CheckTable struct {

Tables []*ast.TableName

GenExprs map[string]expression.Expression
GenExprs map[model.TableColumnID]expression.Expression
}

// RecoverIndex is used for backfilling corrupted index data.
Expand Down
5 changes: 2 additions & 3 deletions planner/core/planbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -516,7 +516,7 @@ func (b *planBuilder) buildAdmin(as *ast.AdminStmt) (Plan, error) {

func (b *planBuilder) buildAdminCheckTable(as *ast.AdminStmt) (*CheckTable, error) {
p := &CheckTable{Tables: as.Tables}
p.GenExprs = make(map[string]expression.Expression)
p.GenExprs = make(map[model.TableColumnID]expression.Expression, len(p.Tables))

mockTablePlan := LogicalTableDual{}.init(b.ctx)
for _, tbl := range p.Tables {
Expand Down Expand Up @@ -548,8 +548,7 @@ func (b *planBuilder) buildAdminCheckTable(as *ast.AdminStmt) (*CheckTable, erro
return nil, errors.Trace(err)
}
expr = expression.BuildCastFunction(b.ctx, expr, colExpr.GetType())
genColumnName := model.GetTableColumnID(tableInfo, column.ColumnInfo)
p.GenExprs[genColumnName] = expr
p.GenExprs[model.TableColumnID{TableID: tableInfo.ID, ColumnID: column.ColumnInfo.ID}] = expr
}
}
return p, nil
Expand Down
125 changes: 36 additions & 89 deletions util/admin/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"io"
"reflect"
"sort"
"time"

"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/kv"
Expand All @@ -32,7 +33,7 @@ import (
"github.com/pingcap/tidb/terror"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/rowDecoder"
"github.com/pingcap/tidb/util/sqlexec"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
Expand Down Expand Up @@ -295,7 +296,7 @@ func ScanIndexData(sc *stmtctx.StatementContext, txn kv.Transaction, kvIndex tab
// It returns nil if the data from the index is equal to the data from the table columns,
// otherwise it returns an error with a different set of records.
// genExprs is use to calculate the virtual generate column.
func CompareIndexData(sessCtx sessionctx.Context, txn kv.Transaction, t table.Table, idx table.Index, genExprs map[string]expression.Expression) error {
func CompareIndexData(sessCtx sessionctx.Context, txn kv.Transaction, t table.Table, idx table.Index, genExprs map[model.TableColumnID]expression.Expression) error {
err := checkIndexAndRecord(sessCtx, txn, t, idx, genExprs)
if err != nil {
return errors.Trace(err)
Expand Down Expand Up @@ -337,7 +338,7 @@ func adjustDatumKind(vals1, vals2 []types.Datum) {
}
}

func checkIndexAndRecord(sessCtx sessionctx.Context, txn kv.Transaction, t table.Table, idx table.Index, genExprs map[string]expression.Expression) error {
func checkIndexAndRecord(sessCtx sessionctx.Context, txn kv.Transaction, t table.Table, idx table.Index, genExprs map[model.TableColumnID]expression.Expression) error {
it, err := idx.SeekFirst(txn)
if err != nil {
return errors.Trace(err)
Expand All @@ -354,6 +355,7 @@ func checkIndexAndRecord(sessCtx sessionctx.Context, txn kv.Transaction, t table
return errors.Trace(err)
}
sc := new(stmtctx.StatementContext)
rowDecoder := makeRowDecoder(t, cols, genExprs)
for {
vals1, h, err := it.Next()
if terror.ErrorEqual(err, io.EOF) {
Expand All @@ -366,7 +368,7 @@ func checkIndexAndRecord(sessCtx sessionctx.Context, txn kv.Transaction, t table
if err != nil {
return errors.Trace(err)
}
vals2, err := rowWithCols(sessCtx, txn, t, h, cols, genExprs)
vals2, err := rowWithCols(sessCtx, txn, t, h, cols, rowDecoder)
vals2 = tables.TruncateIndexValuesIfNeeded(t.Meta(), idx.Meta(), vals2)
if kv.ErrNotExist.Equal(err) {
record := &RecordData{Handle: h, Values: vals1}
Expand Down Expand Up @@ -406,7 +408,7 @@ func compareDatumSlice(sc *stmtctx.StatementContext, val1s, val2s []types.Datum)
}

// CheckRecordAndIndex is exported for testing.
func CheckRecordAndIndex(sessCtx sessionctx.Context, txn kv.Transaction, t table.Table, idx table.Index, genExprs map[string]expression.Expression) error {
func CheckRecordAndIndex(sessCtx sessionctx.Context, txn kv.Transaction, t table.Table, idx table.Index, genExprs map[model.TableColumnID]expression.Expression) error {
sc := sessCtx.GetSessionVars().StmtCtx
cols := make([]*table.Column, len(idx.Meta().Columns))
for i, col := range idx.Meta().Columns {
Expand Down Expand Up @@ -557,16 +559,38 @@ func CompareTableRecord(sessCtx sessionctx.Context, txn kv.Transaction, t table.
return nil
}

func makeRowDecoder(t table.Table, decodeCol []*table.Column, genExpr map[model.TableColumnID]expression.Expression) decoder.RowDecoder {
cols := t.Cols()
tblInfo := t.Meta()
decodeColsMap := make(map[int64]decoder.Column, len(decodeCol))
for _, v := range decodeCol {
col := cols[v.Offset]
tpExpr := decoder.Column{
Info: col.ToInfo(),
}
if col.IsGenerated() && !col.GeneratedStored {
for _, c := range cols {
if _, ok := col.Dependences[c.Name.L]; ok {
decodeColsMap[c.ID] = decoder.Column{
Info: c.ToInfo(),
}
}
}
tpExpr.GenExpr = genExpr[model.TableColumnID{TableID: tblInfo.ID, ColumnID: col.ID}]
}
decodeColsMap[col.ID] = tpExpr
}
return decoder.NewRowDecoder(cols, decodeColsMap)
}

// genExprs use to calculate generated column value.
func rowWithCols(sessCtx sessionctx.Context, txn kv.Retriever, t table.Table, h int64, cols []*table.Column, genExprs map[string]expression.Expression) ([]types.Datum, error) {
func rowWithCols(sessCtx sessionctx.Context, txn kv.Retriever, t table.Table, h int64, cols []*table.Column, rowDecoder decoder.RowDecoder) ([]types.Datum, error) {
key := t.RecordKey(h)
value, err := txn.Get(key)
genColFlag := false
if err != nil {
return nil, errors.Trace(err)
}
v := make([]types.Datum, len(cols))
colTps := make(map[int64]*types.FieldType, len(cols))
for i, col := range cols {
if col == nil {
continue
Expand All @@ -582,34 +606,13 @@ func rowWithCols(sessCtx sessionctx.Context, txn kv.Retriever, t table.Table, h
}
continue
}
// If have virtual generate column , decode all columns.
if col.IsGenerated() && col.GeneratedStored == false {
genColFlag = true
}
colTps[col.ID] = &col.FieldType
}
// if have virtual generate column, decode all columns
if genColFlag {
for _, c := range t.Cols() {
if c.State != model.StatePublic {
continue
}
colTps[c.ID] = &c.FieldType
}
}

rowMap, err := tablecodec.DecodeRow(value, colTps, sessCtx.GetSessionVars().Location())
rowMap, err := rowDecoder.DecodeAndEvalRowWithMap(sessCtx, value, sessCtx.GetSessionVars().Location(), time.UTC, nil)
if err != nil {
return nil, errors.Trace(err)
}

if genColFlag && genExprs != nil {
err = fillGenColData(sessCtx, rowMap, t, cols, genExprs)
if err != nil {
return v, errors.Trace(err)
}
}

for i, col := range cols {
if col == nil {
continue
Expand Down Expand Up @@ -641,7 +644,7 @@ func rowWithCols(sessCtx sessionctx.Context, txn kv.Retriever, t table.Table, h

// genExprs use to calculate generated column value.
func iterRecords(sessCtx sessionctx.Context, retriever kv.Retriever, t table.Table, startKey kv.Key, cols []*table.Column,
fn table.RecordIterFunc, genExprs map[string]expression.Expression) error {
fn table.RecordIterFunc, genExprs map[model.TableColumnID]expression.Expression) error {
it, err := retriever.Seek(startKey)
if err != nil {
return errors.Trace(err)
Expand All @@ -653,22 +656,7 @@ func iterRecords(sessCtx sessionctx.Context, retriever kv.Retriever, t table.Tab
}

log.Debugf("startKey:%q, key:%q, value:%q", startKey, it.Key(), it.Value())

genColFlag := false
colMap := make(map[int64]*types.FieldType, len(cols))
for _, col := range cols {
if col.IsGenerated() && col.GeneratedStored == false {
genColFlag = true
break
}
colMap[col.ID] = &col.FieldType
}
if genColFlag {
for _, col := range t.Cols() {
colMap[col.ID] = &col.FieldType
}
}

rowDecoder := makeRowDecoder(t, cols, genExprs)
prefix := t.RecordPrefix()
for it.Valid() && it.Key().HasPrefix(prefix) {
// first kv pair is row lock information.
Expand All @@ -679,18 +667,10 @@ func iterRecords(sessCtx sessionctx.Context, retriever kv.Retriever, t table.Tab
return errors.Trace(err)
}

rowMap, err := tablecodec.DecodeRow(it.Value(), colMap, sessCtx.GetSessionVars().Location())
rowMap, err := rowDecoder.DecodeAndEvalRowWithMap(sessCtx, it.Value(), sessCtx.GetSessionVars().Location(), time.UTC, nil)
if err != nil {
return errors.Trace(err)
}

if genColFlag && genExprs != nil {
err = fillGenColData(sessCtx, rowMap, t, cols, genExprs)
if err != nil {
return errors.Trace(err)
}
}

data := make([]types.Datum, 0, len(cols))
for _, col := range cols {
if col.IsPKHandleColumn(t.Meta()) {
Expand Down Expand Up @@ -718,39 +698,6 @@ func iterRecords(sessCtx sessionctx.Context, retriever kv.Retriever, t table.Tab
return nil
}

// genExprs use to calculate generated column value.
func fillGenColData(sessCtx sessionctx.Context, rowMap map[int64]types.Datum, t table.Table, cols []*table.Column, genExprs map[string]expression.Expression) error {
tableInfo := t.Meta()
row := make([]types.Datum, len(t.Cols()))
for _, col := range t.Cols() {
ri, ok := rowMap[col.ID]
if ok {
row[col.Offset] = ri
}
}

var err error
for _, col := range cols {
if !col.IsGenerated() || col.GeneratedStored == true {
continue
}
genColumnName := model.GetTableColumnID(tableInfo, col.ColumnInfo)
if expr, ok := genExprs[genColumnName]; ok {
var val types.Datum
val, err = expr.Eval(chunk.MutRowFromDatums(row).ToRow())
if err != nil {
return errors.Trace(err)
}
val, err = table.CastValue(sessCtx, val, col.ToInfo())
if err != nil {
return errors.Trace(err)
}
rowMap[col.ID] = val
}
}
return nil
}

// admin error codes.
const (
codeDataNotEqual terror.ErrCode = 1
Expand Down
6 changes: 3 additions & 3 deletions util/rowDecoder/decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func NewRowDecoder(cols []*table.Column, decodeColMap map[int64]Column) RowDecod

// DecodeAndEvalRowWithMap decodes a byte slice into datums and evaluates the generated column value.
func (rd RowDecoder) DecodeAndEvalRowWithMap(ctx sessionctx.Context, b []byte, decodeLoc, sysLoc *time.Location, row map[int64]types.Datum) (map[int64]types.Datum, error) {
_, err := tablecodec.DecodeRowWithMap(b, rd.colTypes, decodeLoc, row)
row, err := tablecodec.DecodeRowWithMap(b, rd.colTypes, decodeLoc, row)
if err != nil {
return nil, errors.Trace(err)
}
Expand All @@ -96,9 +96,9 @@ func (rd RowDecoder) DecodeAndEvalRowWithMap(ctx sessionctx.Context, b []byte, d
return nil, errors.Trace(err)
}

if val.Kind() == types.KindMysqlTime {
if val.Kind() == types.KindMysqlTime && sysLoc != time.UTC {
t := val.GetMysqlTime()
if t.Type == mysql.TypeTimestamp && sysLoc != time.UTC {
if t.Type == mysql.TypeTimestamp {
err := t.ConvertTimeZone(sysLoc, time.UTC)
if err != nil {
return nil, errors.Trace(err)
Expand Down

0 comments on commit 09ee890

Please sign in to comment.