Skip to content

Commit

Permalink
Merge branch 'release-5.3' into release-5.3-15748a4996b1
Browse files Browse the repository at this point in the history
  • Loading branch information
3pointer authored Feb 22, 2022
2 parents 4fe64d7 + 27ffd11 commit 62a9e2f
Show file tree
Hide file tree
Showing 70 changed files with 1,236 additions and 420 deletions.
2 changes: 1 addition & 1 deletion .golangci.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
run:
timeout: 7m
timeout: 10m
linters:
disable-all: true
enable:
Expand Down
7 changes: 5 additions & 2 deletions br/cmd/tidb-lightning/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,10 @@ import (

func main() {
globalCfg := config.Must(config.LoadGlobalConfig(os.Args[1:], nil))
fmt.Fprintf(os.Stdout, "Verbose debug logs will be written to %s\n\n", globalCfg.App.Config.File)
logToFile := globalCfg.App.File != "" && globalCfg.App.File != "-"
if logToFile {
fmt.Fprintf(os.Stdout, "Verbose debug logs will be written to %s\n\n", globalCfg.App.Config.File)
}

app := lightning.New(globalCfg)

Expand Down Expand Up @@ -95,7 +98,7 @@ func main() {
}

// call Sync() with log to stdout may return error in some case, so just skip it
if globalCfg.App.File != "" {
if logToFile {
syncErr := logger.Sync()
if syncErr != nil {
fmt.Fprintln(os.Stderr, "sync log failed", syncErr)
Expand Down
2 changes: 0 additions & 2 deletions br/pkg/backup/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,6 @@ func (r *testBackup) TestSendCreds(c *C) {
c.Assert(err, IsNil)
opts := &storage.ExternalStorageOptions{
SendCredentials: true,
SkipCheckPath: true,
}
_, err = storage.New(r.ctx, backend, opts)
c.Assert(err, IsNil)
Expand All @@ -277,7 +276,6 @@ func (r *testBackup) TestSendCreds(c *C) {
c.Assert(err, IsNil)
opts = &storage.ExternalStorageOptions{
SendCredentials: false,
SkipCheckPath: true,
}
_, err = storage.New(r.ctx, backend, opts)
c.Assert(err, IsNil)
Expand Down
10 changes: 8 additions & 2 deletions br/pkg/lightning/backend/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,8 @@ const (
gRPCBackOffMaxDelay = 10 * time.Minute

// See: https://github.com/tikv/tikv/blob/e030a0aae9622f3774df89c62f21b2171a72a69e/etc/config-template.toml#L360
regionMaxKeyCount = 1_440_000
// lower the max-key-count to avoid tikv trigger region auto split
regionMaxKeyCount = 1_280_000
defaultRegionSplitSize = 96 * units.MiB

propRangeIndex = "tikv.range_index"
Expand Down Expand Up @@ -1513,7 +1514,12 @@ func (local *local) WriteToTiKV(
size := int64(0)
totalCount := int64(0)
firstLoop := true
regionMaxSize := regionSplitSize * 4 / 3
// if region-split-size <= 96MiB, we bump the threshold a bit to avoid too many retry split
// because the range-properties is not 100% accurate
regionMaxSize := regionSplitSize
if regionSplitSize <= defaultRegionSplitSize {
regionMaxSize = regionSplitSize * 4 / 3
}

for iter.First(); iter.Valid(); iter.Next() {
size += int64(len(iter.Key()) + len(iter.Value()))
Expand Down
19 changes: 13 additions & 6 deletions br/pkg/lightning/backend/noop/noop.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ func (b noopBackend) ResetEngine(ctx context.Context, engineUUID uuid.UUID) erro

// LocalWriter obtains a thread-local EngineWriter for writing rows into the given engine.
func (b noopBackend) LocalWriter(context.Context, *backend.LocalWriterConfig, uuid.UUID) (backend.EngineWriter, error) {
return noopWriter{}, nil
return Writer{}, nil
}

func (b noopBackend) CollectLocalDuplicateRows(ctx context.Context, tbl table.Table, tableName string, opts *kv.SessionOptions) (bool, error) {
Expand Down Expand Up @@ -174,16 +174,23 @@ func (r noopRow) Size() uint64 {
func (r noopRow) ClassifyAndAppend(*kv.Rows, *verification.KVChecksum, *kv.Rows, *verification.KVChecksum) {
}

type noopWriter struct{}
// Writer define a local writer that do nothing.
type Writer struct{}

func (w noopWriter) AppendRows(context.Context, string, []string, kv.Rows) error {
func (w Writer) AppendRows(context.Context, string, []string, kv.Rows) error {
return nil
}

func (w noopWriter) IsSynced() bool {
func (w Writer) IsSynced() bool {
return true
}

func (w noopWriter) Close(context.Context) (backend.ChunkFlushStatus, error) {
return nil, nil
func (w Writer) Close(context.Context) (backend.ChunkFlushStatus, error) {
return trueStatus{}, nil
}

type trueStatus struct{}

func (s trueStatus) Flushed() bool {
return true
}
4 changes: 2 additions & 2 deletions br/pkg/lightning/backend/tidb/tidb.go
Original file line number Diff line number Diff line change
Expand Up @@ -331,12 +331,12 @@ func (enc *tidbEncoder) Encode(logger log.Logger, row []types.Datum, _ int64, co
}

// EncodeRowForRecord encodes a row to a string compatible with INSERT statements.
func EncodeRowForRecord(encTable table.Table, sqlMode mysql.SQLMode, row []types.Datum) string {
func EncodeRowForRecord(encTable table.Table, sqlMode mysql.SQLMode, row []types.Datum, columnPermutation []int) string {
enc := tidbEncoder{
tbl: encTable,
mode: sqlMode,
}
resRow, err := enc.Encode(log.L(), row, 0, nil, "", 0)
resRow, err := enc.Encode(log.L(), row, 0, columnPermutation, "", 0)
if err != nil {
return fmt.Sprintf("/* ERROR: %s */", err)
}
Expand Down
7 changes: 6 additions & 1 deletion br/pkg/lightning/common/storage_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,18 @@ import (
"syscall"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"golang.org/x/sys/unix"
)

// GetStorageSize gets storage's capacity and available size
func GetStorageSize(dir string) (size StorageSize, err error) {
var stat unix.Statfs_t
failpoint.Inject("GetStorageSize", func(val failpoint.Value) {
injectedSize := val.(int)
failpoint.Return(StorageSize{Capacity: uint64(injectedSize), Available: uint64(injectedSize)}, nil)
})

var stat unix.Statfs_t
err = unix.Statfs(dir, &stat)
if err != nil {
return size, errors.Annotatef(err, "cannot get disk capacity at %s", dir)
Expand Down
5 changes: 5 additions & 0 deletions br/pkg/lightning/common/storage_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"unsafe"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
)

var (
Expand All @@ -32,6 +33,10 @@ var (

// GetStorageSize gets storage's capacity and available size
func GetStorageSize(dir string) (size StorageSize, err error) {
failpoint.Inject("GetStorageSize", func(val failpoint.Value) {
injectedSize := val.(int)
failpoint.Return(StorageSize{Capacity: uint64(injectedSize), Available: uint64(injectedSize)}, nil)
})
r, _, e := getDiskFreeSpaceExW.Call(
uintptr(unsafe.Pointer(syscall.StringToUTF16Ptr(dir))),
uintptr(unsafe.Pointer(&size.Available)),
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/lightning/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ const (
)

var (
supportedStorageTypes = []string{"file", "local", "s3", "noop", "gcs"}
supportedStorageTypes = []string{"file", "local", "s3", "noop", "gcs", "gs"}

DefaultFilter = []string{
"*.*",
Expand Down
34 changes: 34 additions & 0 deletions br/pkg/lightning/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,33 @@ func (s *configTestSuite) TestAdjustInvalidBackend(c *C) {
c.Assert(err, ErrorMatches, "invalid config: unsupported `tikv-importer\\.backend` \\(no_such_backend\\)")
}

func (s *configTestSuite) TestCheckAndAdjustFilePath(c *C) {
tmpDir := c.MkDir()
// use slashPath in url to be compatible with windows
slashPath := filepath.ToSlash(tmpDir)

cfg := config.NewConfig()
cases := []string{
tmpDir,
".",
"file://" + slashPath,
"local://" + slashPath,
"s3://bucket_name",
"s3://bucket_name/path/to/dir",
"gcs://bucketname/path/to/dir",
"gs://bucketname/path/to/dir",
"noop:///",
}

for _, testCase := range cases {
cfg.Mydumper.SourceDir = testCase

err := cfg.CheckAndAdjustFilePath()
c.Assert(err, IsNil)
}

}

func (s *configTestSuite) TestAdjustFileRoutePath(c *C) {
cfg := config.NewConfig()
assignMinimalLegalValue(cfg)
Expand Down Expand Up @@ -581,6 +608,13 @@ func (s *configTestSuite) TestLoadConfig(c *C) {

result := taskCfg.String()
c.Assert(result, Matches, `.*"pd-addr":"172.16.30.11:2379,172.16.30.12:2379".*`)

cfg, err = config.LoadGlobalConfig([]string{}, nil)
c.Assert(err, IsNil)
c.Assert(cfg.App.Config.File, Matches, ".*lightning.log.*")
cfg, err = config.LoadGlobalConfig([]string{"--log-file", "-"}, nil)
c.Assert(err, IsNil)
c.Assert(cfg.App.Config.File, Equals, "-")
}

func (s *configTestSuite) TestDefaultImporterBackendValue(c *C) {
Expand Down
5 changes: 1 addition & 4 deletions br/pkg/lightning/config/global.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,10 +200,7 @@ func LoadGlobalConfig(args []string, extraFlags func(*flag.FlagSet)) (*GlobalCon
if *logFilePath != "" {
cfg.App.Config.File = *logFilePath
}
// "-" is a special config for log to stdout
if cfg.App.Config.File == "-" {
cfg.App.Config.File = ""
} else if cfg.App.Config.File == "" {
if cfg.App.Config.File == "" {
cfg.App.Config.File = timestampLogFileName()
}
if *tidbHost != "" {
Expand Down
13 changes: 13 additions & 0 deletions br/pkg/lightning/lightning.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,19 @@ func (l *Lightning) run(taskCtx context.Context, taskCfg *config.Config, g glue.
return errors.Annotate(err, "create storage failed")
}

// return expectedErr means at least meet one file
expectedErr := errors.New("Stop Iter")
walkErr := s.WalkDir(ctx, &storage.WalkOption{ListCount: 1}, func(string, int64) error {
// return an error when meet the first regular file to break the walk loop
return expectedErr
})
if !errors.ErrorEqual(walkErr, expectedErr) {
if walkErr == nil {
return errors.Errorf("data-source-dir '%s' doesn't exist or contains no files", taskCfg.Mydumper.SourceDir)
}
return errors.Annotatef(walkErr, "visit data-source-dir '%s' failed", taskCfg.Mydumper.SourceDir)
}

loadTask := log.L().Begin(zap.InfoLevel, "load data source")
var mdl *mydump.MDLoader
mdl, err = mydump.NewMyDumpLoaderWithStore(ctx, taskCfg, s)
Expand Down
4 changes: 2 additions & 2 deletions br/pkg/lightning/log/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,8 @@ func InitLogger(cfg *Config, tidbLoglevel string) error {
// Filter logs from TiDB and PD.
return NewFilterCore(core, "github.com/tikv/pd/")
})

if len(cfg.File) > 0 {
// "-" is a special config for log to stdout.
if len(cfg.File) > 0 && cfg.File != "-" {
logCfg.File = pclog.FileLogConfig{
Filename: cfg.File,
MaxSize: cfg.FileMaxSize,
Expand Down
43 changes: 43 additions & 0 deletions br/pkg/lightning/log/log_serial_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package log_test

import (
"io"
"os"
"testing"

"github.com/pingcap/tidb/br/pkg/lightning/log"
"github.com/stretchr/testify/require"
)

func TestInitStdoutLogger(t *testing.T) {
r, w, err := os.Pipe()
require.NoError(t, err)
oldStdout := os.Stdout
os.Stdout = w

msg := "logger is initialized to stdout"
outputC := make(chan string, 1)
go func() {
buf := make([]byte, 4096)
n := 0
for {
nn, err := r.Read(buf[n:])
if nn == 0 || err == io.EOF {
break
}
require.NoError(t, err)
n += nn
}
outputC <- string(buf[:n])
}()

logCfg := &log.Config{File: "-"}
log.InitLogger(logCfg, "info")
log.L().Info(msg)

os.Stdout = oldStdout
require.NoError(t, w.Close())
output := <-outputC
require.NoError(t, r.Close())
require.Contains(t, output, msg)
}
Loading

0 comments on commit 62a9e2f

Please sign in to comment.