Skip to content

Commit

Permalink
cherry pick pingcap#30887 to release-5.2
Browse files Browse the repository at this point in the history
Signed-off-by: ti-srebot <ti-srebot@pingcap.com>
  • Loading branch information
glorv authored and ti-srebot committed Apr 12, 2022
1 parent ba8e845 commit cf28319
Show file tree
Hide file tree
Showing 12 changed files with 830 additions and 6 deletions.
14 changes: 14 additions & 0 deletions br/pkg/lightning/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,7 @@ type FileRouteRule struct {
}

type TikvImporter struct {
<<<<<<< HEAD
Addr string `toml:"addr" json:"addr"`
Backend string `toml:"backend" json:"backend"`
OnDuplicate string `toml:"on-duplicate" json:"on-duplicate"`
Expand All @@ -321,6 +322,19 @@ type TikvImporter struct {
DiskQuota ByteSize `toml:"disk-quota" json:"disk-quota"`
RangeConcurrency int `toml:"range-concurrency" json:"range-concurrency"`
DuplicateDetection bool `toml:"duplicate-detection" json:"duplicate-detection"`
=======
Addr string `toml:"addr" json:"addr"`
Backend string `toml:"backend" json:"backend"`
OnDuplicate string `toml:"on-duplicate" json:"on-duplicate"`
MaxKVPairs int `toml:"max-kv-pairs" json:"max-kv-pairs"`
SendKVPairs int `toml:"send-kv-pairs" json:"send-kv-pairs"`
RegionSplitSize ByteSize `toml:"region-split-size" json:"region-split-size"`
SortedKVDir string `toml:"sorted-kv-dir" json:"sorted-kv-dir"`
DiskQuota ByteSize `toml:"disk-quota" json:"disk-quota"`
RangeConcurrency int `toml:"range-concurrency" json:"range-concurrency"`
DuplicateResolution DuplicateResolutionAlgorithm `toml:"duplicate-resolution" json:"duplicate-resolution"`
IncrementalImport bool `toml:"incremental-import" json:"incremental-import"`
>>>>>>> 393415782... lightning: add back table empty check and add a switch config (#30887)

EngineMemCacheSize ByteSize `toml:"engine-mem-cache-size" json:"engine-mem-cache-size"`
LocalWriterMemCacheSize ByteSize `toml:"local-writer-mem-cache-size" json:"local-writer-mem-cache-size"`
Expand Down
101 changes: 101 additions & 0 deletions br/pkg/lightning/restore/check_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,27 @@ package restore
import (
"bytes"
"context"
"database/sql"
"fmt"
"io"
"path/filepath"
"reflect"
"strings"
"sync"

"github.com/docker/go-units"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
<<<<<<< HEAD
"github.com/pingcap/parser/model"
"github.com/pingcap/parser/mysql"
=======
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
"modernc.org/mathutil"

"github.com/pingcap/kvproto/pkg/metapb"
>>>>>>> 393415782... lightning: add back table empty check and add a switch config (#30887)
"github.com/pingcap/tidb/br/pkg/lightning/backend"
"github.com/pingcap/tidb/br/pkg/lightning/backend/kv"
"github.com/pingcap/tidb/br/pkg/lightning/checkpoints"
Expand All @@ -36,11 +46,21 @@ import (
"github.com/pingcap/tidb/br/pkg/lightning/mydump"
"github.com/pingcap/tidb/br/pkg/lightning/verification"
"github.com/pingcap/tidb/br/pkg/storage"
<<<<<<< HEAD
=======
"github.com/pingcap/tidb/br/pkg/utils"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/table"
>>>>>>> 393415782... lightning: add back table empty check and add a switch config (#30887)
"github.com/pingcap/tidb/table/tables"
"github.com/tikv/pd/pkg/typeutil"
"github.com/tikv/pd/server/api"
pdconfig "github.com/tikv/pd/server/config"
<<<<<<< HEAD
"go.uber.org/zap"
=======
>>>>>>> 393415782... lightning: add back table empty check and add a switch config (#30887)
)

const (
Expand Down Expand Up @@ -651,3 +671,84 @@ outloop:
log.L().Info("Sample source data", zap.String("table", tableMeta.Name), zap.Float64("IndexRatio", tableMeta.IndexRatio), zap.Bool("IsSourceOrder", tableMeta.IsRowOrdered))
return nil
}

func (rc *Controller) checkTableEmpty(ctx context.Context) error {
if rc.cfg.TikvImporter.Backend == config.BackendTiDB || rc.cfg.TikvImporter.IncrementalImport {
return nil
}
db, _ := rc.tidbGlue.GetDB()

tableCount := 0
for _, db := range rc.dbMetas {
tableCount += len(db.Tables)
}

var lock sync.Mutex
tableNames := make([]string, 0)
concurrency := utils.MinInt(tableCount, rc.cfg.App.RegionConcurrency)
ch := make(chan string, concurrency)
eg, gCtx := errgroup.WithContext(ctx)
for i := 0; i < concurrency; i++ {
eg.Go(func() error {
for tblName := range ch {
// skip tables that have checkpoint
if rc.cfg.Checkpoint.Enable {
_, err := rc.checkpointsDB.Get(gCtx, tblName)
switch {
case err == nil:
continue
case errors.IsNotFound(err):
default:
return errors.Trace(err)
}
}

hasData, err1 := tableContainsData(gCtx, db, tblName)
if err1 != nil {
return err1
}
if hasData {
lock.Lock()
tableNames = append(tableNames, tblName)
lock.Unlock()
}
}
return nil
})
}
for _, db := range rc.dbMetas {
for _, tbl := range db.Tables {
ch <- common.UniqueTable(tbl.DB, tbl.Name)
}
}
close(ch)
if err := eg.Wait(); err != nil {
if common.IsContextCanceledError(err) {
return nil
}
return errors.Trace(err)
}

if len(tableNames) > 0 {
// sort the failed names
sort.Strings(tableNames)
msg := fmt.Sprintf("table(s) [%s] are not empty", strings.Join(tableNames, ", "))
rc.checkTemplate.Collect(Critical, false, msg)
}
return nil
}

func tableContainsData(ctx context.Context, db utils.QueryExecutor, tableName string) (bool, error) {
query := "select 1 from " + tableName + " limit 1"
var dump int
err := db.QueryRowContext(ctx, query).Scan(&dump)

switch {
case err == sql.ErrNoRows:
return false, nil
case err != nil:
return false, errors.Trace(err)
default:
return true, nil
}
}
Loading

0 comments on commit cf28319

Please sign in to comment.