Skip to content
This repository has been archived by the owner on Dec 8, 2021. It is now read-only.

backend: fix auto random default value for primary key #457

Merged
merged 9 commits into from
Nov 9, 2020
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions lightning/backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import (
"fmt"
"time"

"github.com/pingcap/tidb-lightning/lightning/checkpoints"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/parser/model"
Expand Down Expand Up @@ -104,7 +106,7 @@ type AbstractBackend interface {
ShouldPostProcess() bool

// NewEncoder creates an encoder of a TiDB table.
NewEncoder(tbl table.Table, options *SessionOptions) Encoder
NewEncoder(tbl table.Table, options *SessionOptions, chunk *checkpoints.ChunkCheckpoint) Encoder
kennytm marked this conversation as resolved.
Show resolved Hide resolved

OpenEngine(ctx context.Context, engineUUID uuid.UUID) error

Expand Down Expand Up @@ -196,8 +198,8 @@ func (be Backend) MakeEmptyRows() Rows {
return be.abstract.MakeEmptyRows()
}

func (be Backend) NewEncoder(tbl table.Table, options *SessionOptions) Encoder {
return be.abstract.NewEncoder(tbl, options)
func (be Backend) NewEncoder(tbl table.Table, options *SessionOptions, chunk *checkpoints.ChunkCheckpoint) Encoder {
return be.abstract.NewEncoder(tbl, options, chunk)
}

func (be Backend) ShouldPostProcess() bool {
Expand Down
6 changes: 4 additions & 2 deletions lightning/backend/backend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"context"
"time"

"github.com/pingcap/tidb-lightning/lightning/checkpoints"

"github.com/golang/mock/gomock"
. "github.com/pingcap/check"
"github.com/pingcap/errors"
Expand Down Expand Up @@ -322,7 +324,7 @@ func (s *backendSuite) TestNewEncoder(c *C) {

encoder := mock.NewMockEncoder(s.controller)
options := &kv.SessionOptions{SQLMode: mysql.ModeANSIQuotes, Timestamp: 1234567890, RowFormatVersion: "1"}
s.mockBackend.EXPECT().NewEncoder(nil, options).Return(encoder)
s.mockBackend.EXPECT().NewEncoder(nil, options, &checkpoints.ChunkCheckpoint{}).Return(encoder)

c.Assert(s.mockBackend.NewEncoder(nil, options), Equals, encoder)
c.Assert(s.mockBackend.NewEncoder(nil, options, &checkpoints.ChunkCheckpoint{}), Equals, encoder)
}
6 changes: 4 additions & 2 deletions lightning/backend/importer.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
"sync"
"time"

"github.com/pingcap/tidb-lightning/lightning/checkpoints"

"github.com/coreos/go-semver/semver"
"github.com/pingcap/errors"
kv "github.com/pingcap/kvproto/pkg/import_kvpb"
Expand Down Expand Up @@ -236,8 +238,8 @@ func (*importer) MakeEmptyRows() Rows {
return kvPairs(nil)
}

func (*importer) NewEncoder(tbl table.Table, options *SessionOptions) Encoder {
return NewTableKVEncoder(tbl, options)
func (*importer) NewEncoder(tbl table.Table, options *SessionOptions, chunk *checkpoints.ChunkCheckpoint) Encoder {
return NewTableKVEncoder(tbl, options, chunk)
}

func (importer *importer) CheckRequirements() error {
Expand Down
6 changes: 4 additions & 2 deletions lightning/backend/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ import (
"sync/atomic"
"time"

"github.com/pingcap/tidb-lightning/lightning/checkpoints"

"github.com/cockroachdb/pebble"
"github.com/coreos/go-semver/semver"
"github.com/google/uuid"
Expand Down Expand Up @@ -1212,8 +1214,8 @@ func (local *local) MakeEmptyRows() Rows {
return kvPairs(nil)
}

func (local *local) NewEncoder(tbl table.Table, options *SessionOptions) Encoder {
return NewTableKVEncoder(tbl, options)
func (local *local) NewEncoder(tbl table.Table, options *SessionOptions, chunk *checkpoints.ChunkCheckpoint) Encoder {
return NewTableKVEncoder(tbl, options, chunk)
}

func (local *local) isIngestRetryable(
Expand Down
44 changes: 41 additions & 3 deletions lightning/backend/sql2kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@
package backend

import (
"math/rand"

"github.com/pingcap/tidb-lightning/lightning/checkpoints"

"github.com/pingcap/errors"
"github.com/pingcap/parser/model"
"github.com/pingcap/parser/mysql"
Expand All @@ -37,17 +41,37 @@ type tableKVEncoder struct {
tbl table.Table
se *session
recordCache []types.Datum
// auto random bits value for this chunk
autoRandomHeaderBits int64
}

func NewTableKVEncoder(tbl table.Table, options *SessionOptions) Encoder {
func NewTableKVEncoder(tbl table.Table, options *SessionOptions, chunk *checkpoints.ChunkCheckpoint) Encoder {
metric.KvEncoderCounter.WithLabelValues("open").Inc()
se := newSession(options)
// Set CommonAddRecordCtx to session to reuse the slices and BufStore in AddRecord
recordCtx := tables.NewCommonAddRecordCtx(len(tbl.Cols()))
tables.SetAddRecordCtx(se, recordCtx)

var autoRandomBits int64
if tbl.Meta().PKIsHandle && tbl.Meta().ContainsAutoRandomBits() {
for _, col := range tbl.Cols() {
if mysql.HasPriKeyFlag(col.Flag) {
typeBitsLength := uint64(mysql.DefaultLengthOfMysqlTypes[col.Tp] * 8)
incrementalBits := typeBitsLength - tbl.Meta().AutoRandomBits
hasSignBit := !mysql.HasUnsignedFlag(col.Flag)
if hasSignBit {
incrementalBits -= 1
}
autoRandomBits = rand.New(rand.NewSource(chunk.Chunk.Offset)).Int63n(1<<tbl.Meta().AutoRandomBits) << incrementalBits
kennytm marked this conversation as resolved.
Show resolved Hide resolved
break
}
}
}

return &tableKVEncoder{
tbl: tbl,
se: se,
tbl: tbl,
se: se,
autoRandomHeaderBits: autoRandomBits,
}
}

Expand Down Expand Up @@ -184,6 +208,20 @@ func (kvcodec *tableKVEncoder) Encode(
} else if isAutoIncCol {
// we still need a conversion, e.g. to catch overflow with a TINYINT column.
value, err = table.CastValue(kvcodec.se, types.NewIntDatum(rowID), col.ToInfo(), false, false)
} else if isAutoRandom && isPk {
var val types.Datum
if mysql.HasUnsignedFlag(col.Flag) {
val = types.NewUintDatum(uint64(kvcodec.autoRandomHeaderBits | rowID))
} else {
val = types.NewIntDatum(kvcodec.autoRandomHeaderBits | rowID)
}
value, err = table.CastValue(kvcodec.se, val, col.ToInfo(), false, false)
typeBitsLength := uint64(mysql.DefaultLengthOfMysqlTypes[col.Tp] * 8)
incrementalBits := typeBitsLength - kvcodec.tbl.Meta().AutoRandomBits
hasSignBit := !mysql.HasUnsignedFlag(col.Flag)
if hasSignBit {
incrementalBits -= 1
}
} else {
value, err = table.GetColDefaultValue(kvcodec.se, col.ToInfo())
}
Expand Down
63 changes: 57 additions & 6 deletions lightning/backend/sql2kv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,12 @@ package backend
import (
"errors"

"github.com/pingcap/tidb-lightning/lightning/mydump"

"github.com/pingcap/tidb/meta/autoid"

"github.com/pingcap/tidb-lightning/lightning/checkpoints"

. "github.com/pingcap/check"
"github.com/pingcap/parser"
"github.com/pingcap/parser/ast"
Expand Down Expand Up @@ -86,7 +92,7 @@ func (s *kvSuite) TestEncode(c *C) {
SQLMode: mysql.ModeStrictAllTables,
Timestamp: 1234567890,
RowFormatVersion: "1",
})
}, &checkpoints.ChunkCheckpoint{})
pairs, err := strictMode.Encode(logger, rows, 1, []int{0, 1})
c.Assert(err, ErrorMatches, "failed to cast `10000000` as tinyint\\(4\\) for column `c1` \\(#1\\):.*overflows tinyint")
c.Assert(pairs, IsNil)
Expand Down Expand Up @@ -117,7 +123,7 @@ func (s *kvSuite) TestEncode(c *C) {
SQLMode: mysql.ModeStrictAllTables,
Timestamp: 1234567891,
RowFormatVersion: "1",
})
}, &checkpoints.ChunkCheckpoint{})
pairs, err = mockMode.Encode(logger, rowsWithPk2, 2, []int{0, 1})
c.Assert(err, ErrorMatches, "mock error")

Expand All @@ -126,7 +132,7 @@ func (s *kvSuite) TestEncode(c *C) {
SQLMode: mysql.ModeNone,
Timestamp: 1234567892,
RowFormatVersion: "1",
})
}, &checkpoints.ChunkCheckpoint{})
pairs, err = noneMode.Encode(logger, rows, 1, []int{0, 1})
c.Assert(err, IsNil)
c.Assert(pairs, DeepEquals, kvPairs([]common.KvPair{
Expand Down Expand Up @@ -155,7 +161,7 @@ func (s *kvSuite) TestEncodeRowFormatV2(c *C) {
SQLMode: mysql.ModeNone,
Timestamp: 1234567892,
RowFormatVersion: "2",
})
}, &checkpoints.ChunkCheckpoint{})
pairs, err := noneMode.Encode(logger, rows, 1, []int{0, 1})
c.Assert(err, IsNil)
c.Assert(pairs, DeepEquals, kvPairs([]common.KvPair{
Expand Down Expand Up @@ -199,7 +205,8 @@ func (s *kvSuite) TestEncodeTimestamp(c *C) {
SQLMode: mysql.ModeStrictAllTables,
Timestamp: 1234567893,
RowFormatVersion: "1",
})
}, &checkpoints.ChunkCheckpoint{})

pairs, err := encoder.Encode(logger, nil, 70, []int{-1, 1})
c.Assert(err, IsNil)
c.Assert(pairs, DeepEquals, kvPairs([]common.KvPair{
Expand All @@ -210,6 +217,50 @@ func (s *kvSuite) TestEncodeTimestamp(c *C) {
}))
}

func mockTableInfo(c *C, createSql string) *model.TableInfo {
parser := parser.New()
node, err := parser.ParseOneStmt(createSql, "", "")
c.Assert(err, IsNil)
sctx := mock.NewContext()
info, err := ddl.MockTableInfo(sctx, node.(*ast.CreateTableStmt), 1)
c.Assert(err, IsNil)
info.State = model.StatePublic
return info
}

func (s *kvSuite) TestDefaultAutoRandoms(c *C) {
tblInfo := mockTableInfo(c, "create table t (id bigint unsigned NOT NULL auto_random primary key, a varchar(100));")
// seems parser can't parse auto_random properly.
tblInfo.AutoRandomBits = 5
tbl, err := tables.TableFromMeta(NewPanickingAllocators(0), tblInfo)
c.Assert(err, IsNil)
encoder := NewTableKVEncoder(tbl, &SessionOptions{
SQLMode: mysql.ModeStrictAllTables,
Timestamp: 1234567893,
RowFormatVersion: "2",
}, &checkpoints.ChunkCheckpoint{Chunk: mydump.Chunk{Offset: 456}})
logger := log.Logger{Logger: zap.NewNop()}
pairs, err := encoder.Encode(logger, []types.Datum{types.NewStringDatum("")}, 70, []int{-1, 0})
c.Assert(err, IsNil)
c.Assert(pairs, DeepEquals, kvPairs([]common.KvPair{
{
Key: []uint8{0x74, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1, 0x5f, 0x72, 0xf0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x46},
Val: []uint8{0x80, 0x0, 0x1, 0x0, 0x0, 0x0, 0x2, 0x0, 0x0},
},
}))
c.Assert(tbl.Allocators(encoder.(*tableKVEncoder).se).Get(autoid.AutoRandomType).Base(), Equals, int64(70))

pairs, err = encoder.Encode(logger, []types.Datum{types.NewStringDatum("")}, 71, []int{-1, 0})
c.Assert(err, IsNil)
c.Assert(pairs, DeepEquals, kvPairs([]common.KvPair{
{
Key: []uint8{0x74, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1, 0x5f, 0x72, 0xf0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x47},
Val: []uint8{0x80, 0x0, 0x1, 0x0, 0x0, 0x0, 0x2, 0x0, 0x0},
},
}))
c.Assert(tbl.Allocators(encoder.(*tableKVEncoder).se).Get(autoid.AutoRandomType).Base(), Equals, int64(71))
}

func (s *kvSuite) TestSplitIntoChunks(c *C) {
pairs := []common.KvPair{
{
Expand Down Expand Up @@ -347,7 +398,7 @@ func (s *benchSQL2KVSuite) SetUpTest(c *C) {
// Construct the corresponding KV encoder.
tbl, err := tables.TableFromMeta(NewPanickingAllocators(0), tableInfo)
c.Assert(err, IsNil)
s.encoder = NewTableKVEncoder(tbl, &SessionOptions{RowFormatVersion: "2"})
s.encoder = NewTableKVEncoder(tbl, &SessionOptions{RowFormatVersion: "2"}, &checkpoints.ChunkCheckpoint{})
s.logger = log.Logger{Logger: zap.NewNop()}

// Prepare the row to insert.
Expand Down
4 changes: 3 additions & 1 deletion lightning/backend/tidb.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import (
"strings"
"time"

"github.com/pingcap/tidb-lightning/lightning/checkpoints"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/parser/model"
Expand Down Expand Up @@ -290,7 +292,7 @@ func (be *tidbBackend) CheckRequirements() error {
return nil
}

func (be *tidbBackend) NewEncoder(tbl table.Table, options *SessionOptions) Encoder {
func (be *tidbBackend) NewEncoder(tbl table.Table, options *SessionOptions, chunk *checkpoints.ChunkCheckpoint) Encoder {
se := newSession(options)
if options.SQLMode.HasStrictMode() {
se.vars.SkipUTF8Check = false
Expand Down
10 changes: 6 additions & 4 deletions lightning/backend/tidb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import (
"database/sql"
"fmt"

"github.com/pingcap/tidb-lightning/lightning/checkpoints"

"github.com/pingcap/parser/charset"

"github.com/DATA-DOG/go-sqlmock"
Expand Down Expand Up @@ -91,7 +93,7 @@ func (s *mysqlSuite) TestWriteRowsReplaceOnDup(c *C) {
perms = append(perms, i)
}
perms = append(perms, -1)
encoder := s.backend.NewEncoder(s.tbl, &kv.SessionOptions{SQLMode: 0, Timestamp: 1234567890, RowFormatVersion: "1"})
encoder := s.backend.NewEncoder(s.tbl, &kv.SessionOptions{SQLMode: 0, Timestamp: 1234567890, RowFormatVersion: "1"}, &checkpoints.ChunkCheckpoint{})
row, err := encoder.Encode(logger, []types.Datum{
types.NewUintDatum(18446744073709551615),
types.NewIntDatum(-9223372036854775808),
Expand Down Expand Up @@ -131,7 +133,7 @@ func (s *mysqlSuite) TestWriteRowsIgnoreOnDup(c *C) {
indexRows := ignoreBackend.MakeEmptyRows()
indexChecksum := verification.MakeKVChecksum(0, 0, 0)

encoder := ignoreBackend.NewEncoder(s.tbl, &kv.SessionOptions{})
encoder := ignoreBackend.NewEncoder(s.tbl, &kv.SessionOptions{}, &checkpoints.ChunkCheckpoint{})
row, err := encoder.Encode(logger, []types.Datum{
types.NewIntDatum(1),
}, 1, []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, -1})
Expand Down Expand Up @@ -159,7 +161,7 @@ func (s *mysqlSuite) TestWriteRowsErrorOnDup(c *C) {
indexRows := ignoreBackend.MakeEmptyRows()
indexChecksum := verification.MakeKVChecksum(0, 0, 0)

encoder := ignoreBackend.NewEncoder(s.tbl, &kv.SessionOptions{})
encoder := ignoreBackend.NewEncoder(s.tbl, &kv.SessionOptions{}, &checkpoints.ChunkCheckpoint{})
row, err := encoder.Encode(logger, []types.Datum{
types.NewIntDatum(1),
}, 1, []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, -1})
Expand All @@ -183,7 +185,7 @@ func (s *mysqlSuite) TestStrictMode(c *C) {
c.Assert(err, IsNil)

bk := kv.NewTiDBBackend(s.dbHandle, config.ErrorOnDup)
encoder := bk.NewEncoder(tbl, &kv.SessionOptions{SQLMode: mysql.ModeStrictAllTables})
encoder := bk.NewEncoder(tbl, &kv.SessionOptions{SQLMode: mysql.ModeStrictAllTables}, &checkpoints.ChunkCheckpoint{})

logger := log.L()
_, err = encoder.Encode(logger, []types.Datum{
Expand Down
2 changes: 1 addition & 1 deletion lightning/restore/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -1904,7 +1904,7 @@ func (cr *chunkRestore) restore(
SQLMode: rc.cfg.TiDB.SQLMode,
Timestamp: cr.chunk.Timestamp,
RowFormatVersion: rc.rowFormatVer,
})
}, cr.chunk)
kvsCh := make(chan []deliveredKVs, maxKVQueueSize)
deliverCompleteCh := make(chan deliverResult)

Expand Down
8 changes: 4 additions & 4 deletions lightning/restore/restore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -927,7 +927,7 @@ func (s *chunkRestoreSuite) TestEncodeLoop(c *C) {
SQLMode: s.cfg.TiDB.SQLMode,
Timestamp: 1234567895,
RowFormatVersion: "1",
})
}, &checkpoints.ChunkCheckpoint{})
cfg := config.NewConfig()
rc := &RestoreController{pauser: DeliverPauser, cfg: cfg}
_, _, err := s.cr.encodeLoop(ctx, kvsCh, s.tr, s.tr.logger, kvEncoder, deliverCompleteCh, rc)
Expand All @@ -952,7 +952,7 @@ func (s *chunkRestoreSuite) TestEncodeLoopCanceled(c *C) {
SQLMode: s.cfg.TiDB.SQLMode,
Timestamp: 1234567896,
RowFormatVersion: "1",
})
}, &checkpoints.ChunkCheckpoint{})

go cancel()
cfg := config.NewConfig()
Expand All @@ -970,7 +970,7 @@ func (s *chunkRestoreSuite) TestEncodeLoopForcedError(c *C) {
SQLMode: s.cfg.TiDB.SQLMode,
Timestamp: 1234567897,
RowFormatVersion: "1",
})
}, &checkpoints.ChunkCheckpoint{})

// close the chunk so reading it will result in the "file already closed" error.
s.cr.parser.Close()
Expand All @@ -990,7 +990,7 @@ func (s *chunkRestoreSuite) TestEncodeLoopDeliverErrored(c *C) {
SQLMode: s.cfg.TiDB.SQLMode,
Timestamp: 1234567898,
RowFormatVersion: "1",
})
}, &checkpoints.ChunkCheckpoint{})

go func() {
deliverCompleteCh <- deliverResult{
Expand Down
Loading