Skip to content

Commit

Permalink
lightning: add back table empty check and add a switch config (pingca…
Browse files Browse the repository at this point in the history
  • Loading branch information
glorv committed Dec 22, 2021
1 parent f586afa commit 3d6b207
Show file tree
Hide file tree
Showing 12 changed files with 328 additions and 13 deletions.
1 change: 1 addition & 0 deletions br/pkg/lightning/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -519,6 +519,7 @@ type TikvImporter struct {
DiskQuota ByteSize `toml:"disk-quota" json:"disk-quota"`
RangeConcurrency int `toml:"range-concurrency" json:"range-concurrency"`
DuplicateResolution DuplicateResolutionAlgorithm `toml:"duplicate-resolution" json:"duplicate-resolution"`
IncrementalImport bool `toml:"incremental-import" json:"incremental-import"`

EngineMemCacheSize ByteSize `toml:"engine-mem-cache-size" json:"engine-mem-cache-size"`
LocalWriterMemCacheSize ByteSize `toml:"local-writer-mem-cache-size" json:"local-writer-mem-cache-size"`
Expand Down
91 changes: 88 additions & 3 deletions br/pkg/lightning/restore/check_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,23 @@ package restore
import (
"bytes"
"context"
"database/sql"
"fmt"
"io"
"path/filepath"
"reflect"
"sort"
"strconv"
"strings"
"sync"

"github.com/docker/go-units"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
"modernc.org/mathutil"

"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/tidb/br/pkg/lightning/backend"
"github.com/pingcap/tidb/br/pkg/lightning/backend/kv"
Expand All @@ -38,15 +44,13 @@ import (
"github.com/pingcap/tidb/br/pkg/lightning/mydump"
"github.com/pingcap/tidb/br/pkg/lightning/verification"
"github.com/pingcap/tidb/br/pkg/storage"
"github.com/pingcap/tidb/br/pkg/utils"
"github.com/pingcap/tidb/br/pkg/version"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/table/tables"
"github.com/tikv/pd/server/api"
pdconfig "github.com/tikv/pd/server/config"

"go.uber.org/zap"
"modernc.org/mathutil"
)

const (
Expand Down Expand Up @@ -868,3 +872,84 @@ outloop:
log.L().Info("Sample source data", zap.String("table", tableMeta.Name), zap.Float64("IndexRatio", tableMeta.IndexRatio), zap.Bool("IsSourceOrder", tableMeta.IsRowOrdered))
return nil
}

func (rc *Controller) checkTableEmpty(ctx context.Context) error {
if rc.cfg.TikvImporter.Backend == config.BackendTiDB || rc.cfg.TikvImporter.IncrementalImport {
return nil
}
db, _ := rc.tidbGlue.GetDB()

tableCount := 0
for _, db := range rc.dbMetas {
tableCount += len(db.Tables)
}

var lock sync.Mutex
tableNames := make([]string, 0)
concurrency := utils.MinInt(tableCount, rc.cfg.App.RegionConcurrency)
ch := make(chan string, concurrency)
eg, gCtx := errgroup.WithContext(ctx)
for i := 0; i < concurrency; i++ {
eg.Go(func() error {
for tblName := range ch {
// skip tables that have checkpoint
if rc.cfg.Checkpoint.Enable {
_, err := rc.checkpointsDB.Get(gCtx, tblName)
switch {
case err == nil:
continue
case errors.IsNotFound(err):
default:
return errors.Trace(err)
}
}

hasData, err1 := tableContainsData(gCtx, db, tblName)
if err1 != nil {
return err1
}
if hasData {
lock.Lock()
tableNames = append(tableNames, tblName)
lock.Unlock()
}
}
return nil
})
}
for _, db := range rc.dbMetas {
for _, tbl := range db.Tables {
ch <- common.UniqueTable(tbl.DB, tbl.Name)
}
}
close(ch)
if err := eg.Wait(); err != nil {
if common.IsContextCanceledError(err) {
return nil
}
return errors.Trace(err)
}

if len(tableNames) > 0 {
// sort the failed names
sort.Strings(tableNames)
msg := fmt.Sprintf("table(s) [%s] are not empty", strings.Join(tableNames, ", "))
rc.checkTemplate.Collect(Critical, false, msg)
}
return nil
}

func tableContainsData(ctx context.Context, db utils.QueryExecutor, tableName string) (bool, error) {
query := "select 1 from " + tableName + " limit 1"
var dump int
err := db.QueryRowContext(ctx, query).Scan(&dump)

switch {
case err == sql.ErrNoRows:
return false, nil
case err != nil:
return false, errors.Trace(err)
default:
return true, nil
}
}
143 changes: 142 additions & 1 deletion br/pkg/lightning/restore/check_info_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,18 @@ package restore

import (
"context"
"database/sql"
"path/filepath"

"github.com/DATA-DOG/go-sqlmock"
. "github.com/pingcap/check"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/parser/mysql"

"github.com/pingcap/tidb/br/pkg/lightning/checkpoints"
"github.com/pingcap/tidb/br/pkg/lightning/config"
"github.com/pingcap/tidb/br/pkg/lightning/glue"
"github.com/pingcap/tidb/br/pkg/lightning/mydump"
"github.com/pingcap/tidb/br/pkg/lightning/worker"
"github.com/pingcap/tidb/br/pkg/storage"
)
Expand All @@ -29,7 +36,141 @@ var _ = Suite(&checkInfoSuite{})

type checkInfoSuite struct{}

const passed CheckType = "pass"
func (s *checkInfoSuite) TestCheckTableEmpty(c *C) {
dir := c.MkDir()
cfg := config.NewConfig()
cfg.Checkpoint.Enable = false
dbMetas := []*mydump.MDDatabaseMeta{
{
Name: "test1",
Tables: []*mydump.MDTableMeta{
{
DB: "test1",
Name: "tbl1",
},
{
DB: "test1",
Name: "tbl2",
},
},
},
{
Name: "test2",
Tables: []*mydump.MDTableMeta{
{
DB: "test2",
Name: "tbl1",
},
},
},
}

rc := &Controller{
cfg: cfg,
dbMetas: dbMetas,
checkpointsDB: checkpoints.NewNullCheckpointsDB(),
}

ctx := context.Background()

// test tidb will do nothing
rc.cfg.TikvImporter.Backend = config.BackendTiDB
err := rc.checkTableEmpty(ctx)
c.Assert(err, IsNil)

// test incremental mode
rc.cfg.TikvImporter.Backend = config.BackendLocal
rc.cfg.TikvImporter.IncrementalImport = true
err = rc.checkTableEmpty(ctx)
c.Assert(err, IsNil)

rc.cfg.TikvImporter.IncrementalImport = false
db, mock, err := sqlmock.New()
c.Assert(err, IsNil)
mock.MatchExpectationsInOrder(false)
rc.tidbGlue = glue.NewExternalTiDBGlue(db, mysql.ModeNone)
mock.ExpectQuery("select 1 from `test1`.`tbl1` limit 1").
WillReturnRows(sqlmock.NewRows([]string{""}).RowError(0, sql.ErrNoRows))
mock.ExpectQuery("select 1 from `test1`.`tbl2` limit 1").
WillReturnRows(sqlmock.NewRows([]string{""}).RowError(0, sql.ErrNoRows))
mock.ExpectQuery("select 1 from `test2`.`tbl1` limit 1").
WillReturnRows(sqlmock.NewRows([]string{""}).RowError(0, sql.ErrNoRows))
// not error, need not to init check template
err = rc.checkTableEmpty(ctx)
c.Assert(err, IsNil)
c.Assert(mock.ExpectationsWereMet(), IsNil)

// single table contains data
db, mock, err = sqlmock.New()
c.Assert(err, IsNil)
rc.tidbGlue = glue.NewExternalTiDBGlue(db, mysql.ModeNone)
mock.MatchExpectationsInOrder(false)
mock.ExpectQuery("select 1 from `test1`.`tbl1` limit 1").
WillReturnRows(sqlmock.NewRows([]string{""}).RowError(0, sql.ErrNoRows))
mock.ExpectQuery("select 1 from `test1`.`tbl2` limit 1").
WillReturnRows(sqlmock.NewRows([]string{""}).RowError(0, sql.ErrNoRows))
mock.ExpectQuery("select 1 from `test2`.`tbl1` limit 1").
WillReturnRows(sqlmock.NewRows([]string{""}).AddRow(1))
rc.checkTemplate = NewSimpleTemplate()
err = rc.checkTableEmpty(ctx)
c.Assert(err, IsNil)
c.Assert(mock.ExpectationsWereMet(), IsNil)
tmpl := rc.checkTemplate.(*SimpleTemplate)
c.Assert(len(tmpl.criticalMsgs), Equals, 1)
c.Assert(tmpl.criticalMsgs[0], Matches, "table\\(s\\) \\[`test2`.`tbl1`\\] are not empty")

// multi tables contains data
db, mock, err = sqlmock.New()
c.Assert(err, IsNil)
rc.tidbGlue = glue.NewExternalTiDBGlue(db, mysql.ModeNone)
mock.MatchExpectationsInOrder(false)
mock.ExpectQuery("select 1 from `test1`.`tbl1` limit 1").
WillReturnRows(sqlmock.NewRows([]string{""}).AddRow(1))
mock.ExpectQuery("select 1 from `test1`.`tbl2` limit 1").
WillReturnRows(sqlmock.NewRows([]string{""}).RowError(0, sql.ErrNoRows))
mock.ExpectQuery("select 1 from `test2`.`tbl1` limit 1").
WillReturnRows(sqlmock.NewRows([]string{""}).AddRow(1))
rc.checkTemplate = NewSimpleTemplate()
err = rc.checkTableEmpty(ctx)
c.Assert(err, IsNil)
c.Assert(mock.ExpectationsWereMet(), IsNil)
tmpl = rc.checkTemplate.(*SimpleTemplate)
c.Assert(len(tmpl.criticalMsgs), Equals, 1)
c.Assert(tmpl.criticalMsgs[0], Matches, "table\\(s\\) \\[`test1`.`tbl1`, `test2`.`tbl1`\\] are not empty")

// init checkpoint with only two of the three tables
dbInfos := map[string]*checkpoints.TidbDBInfo{
"test1": {
Name: "test1",
Tables: map[string]*checkpoints.TidbTableInfo{
"tbl1": {
Name: "tbl1",
},
},
},
"test2": {
Name: "test2",
Tables: map[string]*checkpoints.TidbTableInfo{
"tbl1": {
Name: "tbl1",
},
},
},
}
rc.cfg.Checkpoint.Enable = true
rc.checkpointsDB = checkpoints.NewFileCheckpointsDB(filepath.Join(dir, "cp.pb"))
err = rc.checkpointsDB.Initialize(ctx, cfg, dbInfos)
c.Check(err, IsNil)
db, mock, err = sqlmock.New()
c.Assert(err, IsNil)
rc.tidbGlue = glue.NewExternalTiDBGlue(db, mysql.ModeNone)
// only need to check the one that is not in checkpoint
mock.ExpectQuery("select 1 from `test1`.`tbl2` limit 1").
WillReturnRows(sqlmock.NewRows([]string{""}).RowError(0, sql.ErrNoRows))
err = rc.checkTableEmpty(ctx)
c.Assert(err, IsNil)
c.Assert(mock.ExpectationsWereMet(), IsNil)
}

func (s *checkInfoSuite) TestLocalResource(c *C) {
dir := c.MkDir()
Expand Down
58 changes: 57 additions & 1 deletion br/pkg/lightning/restore/meta_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -1027,9 +1027,65 @@ func (m noopTableMetaMgr) UpdateTableBaseChecksum(ctx context.Context, checksum
}

func (m noopTableMetaMgr) CheckAndUpdateLocalChecksum(ctx context.Context, checksum *verify.KVChecksum, hasLocalDupes bool) (bool, bool, *verify.KVChecksum, error) {
return false, false, nil, nil
return true, true, &verify.KVChecksum{}, nil
}

func (m noopTableMetaMgr) FinishTable(ctx context.Context) error {
return nil
}

type singleMgrBuilder struct{}

func (b singleMgrBuilder) Init(context.Context) error {
return nil
}

func (b singleMgrBuilder) TaskMetaMgr(pd *pdutil.PdController) taskMetaMgr {
return &singleTaskMetaMgr{
pd: pd,
}
}

func (b singleMgrBuilder) TableMetaMgr(tr *TableRestore) tableMetaMgr {
return noopTableMetaMgr{}
}

type singleTaskMetaMgr struct {
pd *pdutil.PdController
}

func (m *singleTaskMetaMgr) InitTask(ctx context.Context, source int64) error {
return nil
}

func (m *singleTaskMetaMgr) CheckTasksExclusively(ctx context.Context, action func(tasks []taskMeta) ([]taskMeta, error)) error {
_, err := action(nil)
return err
}

func (m *singleTaskMetaMgr) CheckAndPausePdSchedulers(ctx context.Context) (pdutil.UndoFunc, error) {
return m.pd.RemoveSchedulers(ctx)
}

func (m *singleTaskMetaMgr) CheckTaskExist(ctx context.Context) (bool, error) {
return true, nil
}

func (m *singleTaskMetaMgr) CheckAndFinishRestore(context.Context, bool) (shouldSwitchBack bool, shouldCleanupMeta bool, err error) {
return true, true, nil
}

func (m *singleTaskMetaMgr) Cleanup(ctx context.Context) error {
return nil
}

func (m *singleTaskMetaMgr) CleanupTask(ctx context.Context) error {
return nil
}

func (m *singleTaskMetaMgr) CleanupAllMetas(ctx context.Context) error {
return nil
}

func (m *singleTaskMetaMgr) Close() {
}
15 changes: 13 additions & 2 deletions br/pkg/lightning/restore/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -379,14 +379,17 @@ func NewRestoreControllerWithPauser(
}

var metaBuilder metaMgrBuilder
switch cfg.TikvImporter.Backend {
case config.BackendLocal, config.BackendImporter:
isSSTImport := cfg.TikvImporter.Backend == config.BackendLocal || cfg.TikvImporter.Backend == config.BackendImporter
switch {
case isSSTImport && cfg.TikvImporter.IncrementalImport:
metaBuilder = &dbMetaMgrBuilder{
db: db,
taskID: cfg.TaskID,
schema: cfg.App.MetaSchemaName,
needChecksum: cfg.PostRestore.Checksum != config.OpLevelOff,
}
case isSSTImport:
metaBuilder = singleMgrBuilder{}
default:
metaBuilder = noopMetaMgrBuilder{}
}
Expand Down Expand Up @@ -1977,6 +1980,14 @@ func (rc *Controller) DataCheck(ctx context.Context) error {
} else {
rc.checkTemplate.Collect(Critical, true, "table schemas are valid")
}

if err := rc.checkTableEmpty(ctx); err != nil {
return errors.Trace(err)
}
if err = rc.checkCSVHeader(ctx, rc.dbMetas); err != nil {
return err
}

return nil
}

Expand Down
Loading

0 comments on commit 3d6b207

Please sign in to comment.