Skip to content

Commit

Permalink
update log and add safe mode config (#652)
Browse files Browse the repository at this point in the history
  • Loading branch information
WangXiangUSTC committed Jul 2, 2019
1 parent 76433a7 commit 4650b1b
Show file tree
Hide file tree
Showing 6 changed files with 162 additions and 5 deletions.
15 changes: 14 additions & 1 deletion reparo/config.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package reparo

import (
"encoding/json"
"flag"
"fmt"
"os"
Expand All @@ -23,7 +24,7 @@ const (

// Config is the main configuration for the retore tool.
type Config struct {
*flag.FlagSet
*flag.FlagSet `toml:"-" json:"-"`
Dir string `toml:"data-dir" json:"data-dir"`
StartDatetime string `toml:"start-datetime" json:"start-datetime"`
StopDatetime string `toml:"stop-datetime" json:"stop-datetime"`
Expand All @@ -43,6 +44,8 @@ type Config struct {
LogRotate string `toml:"log-rotate" json:"log-rotate"`
LogLevel string `toml:"log-level" json:"log-level"`

SafeMode bool `toml:"safe-mode" json:"safe-mode"`

configFile string
printVersion bool
}
Expand All @@ -67,9 +70,19 @@ func NewConfig() *Config {
fs.StringVar(&c.LogLevel, "L", "info", "log level: debug, info, warn, error, fatal")
fs.StringVar(&c.configFile, "config", "", "[REQUIRED] path to configuration file")
fs.BoolVar(&c.printVersion, "V", false, "print reparo version info")
fs.BoolVar(&c.SafeMode, "safe-mode", false, "enable safe mode to make syncer reentrant")
return c
}

func (c *Config) String() string {
cfgBytes, err := json.Marshal(c)
if err != nil {
log.Errorf("marshal config failed %v", err)
}

return string(cfgBytes)
}

// Parse parses keys/values from command line flags and toml configuration file.
func (c *Config) Parse(args []string) (err error) {
// Parse first to get config file
Expand Down
2 changes: 1 addition & 1 deletion reparo/reparo.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ type Reparo struct {
func New(cfg *Config) (*Reparo, error) {
log.Infof("cfg %+v", cfg)

syncer, err := syncer.New(cfg.DestType, cfg.DestDB)
syncer, err := syncer.New(cfg.DestType, cfg.DestDB, cfg.SafeMode)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down
3 changes: 2 additions & 1 deletion reparo/syncer/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ type mysqlSyncer struct {

var _ Syncer = &mysqlSyncer{}

func newMysqlSyncer(cfg *DBConfig) (*mysqlSyncer, error) {
func newMysqlSyncer(cfg *DBConfig, safemode bool) (*mysqlSyncer, error) {
db, err := loader.CreateDB(cfg.User, cfg.Password, cfg.Host, cfg.Port)
if err != nil {
return nil, errors.Trace(err)
Expand All @@ -42,6 +42,7 @@ func newMysqlSyncer(cfg *DBConfig) (*mysqlSyncer, error) {
return nil, errors.Annotate(err, "new loader failed")
}

loader.SetSafeMode(safemode)
syncer := &mysqlSyncer{db: db, loader: loader}
syncer.runLoader()

Expand Down
102 changes: 102 additions & 0 deletions reparo/syncer/mysql_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
package syncer

import (
"database/sql"
"time"

sqlmock "github.com/DATA-DOG/go-sqlmock"
"github.com/pingcap/check"
pb "github.com/pingcap/tidb-binlog/proto/binlog"
)

type testMysqlSuite struct{}

var _ = check.Suite(&testMysqlSuite{})

func (s *testMysqlSuite) TestMysqlSyncer(c *check.C) {
s.testMysqlSyncer(c, true)
s.testMysqlSyncer(c, false)
}

func (s *testMysqlSuite) testMysqlSyncer(c *check.C, safemode bool) {
var (
mock sqlmock.Sqlmock
)
originWorkerCount := defaultWorkerCount
defaultWorkerCount = 1
defer func() {
defaultWorkerCount = originWorkerCount
}()

oldCreateDB := createDB
createDB = func(string, string, string, int) (db *sql.DB, err error) {
db, mock, err = sqlmock.New()
return
}
defer func() {
createDB = oldCreateDB
}()

syncer, err := newMysqlSyncer(&DBConfig{}, safemode)
c.Assert(err, check.IsNil)

mock.ExpectBegin()
mock.ExpectExec("create database test").WillReturnResult(sqlmock.NewResult(0, 0))
mock.ExpectCommit()

mock.ExpectQuery("SELECT column_name, extra FROM information_schema.columns").WithArgs("test", "t1").WillReturnRows(sqlmock.NewRows([]string{"column_name", "extra"}).AddRow("a", "").AddRow("b", "").AddRow("c", ""))

rows := sqlmock.NewRows([]string{"non_unique", "index_name", "seq_in_index", "column_name"})
mock.ExpectQuery("SELECT non_unique, index_name, seq_in_index, column_name FROM information_schema.statistics").
WithArgs("test", "t1").
WillReturnRows(rows)

mock.ExpectBegin()
insertPattern := "INSERT INTO"
if safemode {
insertPattern = "REPLACE INTO"
}
mock.ExpectExec(insertPattern).WithArgs(1, "test", nil).WillReturnResult(sqlmock.NewResult(0, 1))
mock.ExpectExec("DELETE FROM").WithArgs(1, "test").WillReturnResult(sqlmock.NewResult(0, 1))
if safemode {
mock.ExpectExec("DELETE FROM").WithArgs().WillReturnResult(sqlmock.NewResult(0, 1))
mock.ExpectExec(insertPattern).WithArgs(nil, nil, "abc").WillReturnResult(sqlmock.NewResult(0, 1))
} else {
mock.ExpectExec("UPDATE").WithArgs("abc", "test").WillReturnResult(sqlmock.NewResult(0, 1))
}
mock.ExpectCommit()

syncTest(c, Syncer(syncer))

err = syncer.Close()
c.Assert(err, check.IsNil)
}

func syncTest(c *check.C, syncer Syncer) {
ddlBinlog := &pb.Binlog{
Tp: pb.BinlogType_DDL,
DdlQuery: []byte("create database test;"),
}
dmlBinlog := &pb.Binlog{
Tp: pb.BinlogType_DML,
DmlData: &pb.DMLData{
Events: generateDMLEvents(c),
},
}

binlogs := make([]*pb.Binlog, 0, 2)
err := syncer.Sync(ddlBinlog, func(binlog *pb.Binlog) {
c.Log(binlog)
binlogs = append(binlogs, binlog)
})
c.Assert(err, check.IsNil)

err = syncer.Sync(dmlBinlog, func(binlog *pb.Binlog) {
c.Log(binlog)
binlogs = append(binlogs, binlog)
})
c.Assert(err, check.IsNil)

time.Sleep(100 * time.Millisecond)
c.Assert(binlogs, check.HasLen, 2)
}
4 changes: 2 additions & 2 deletions reparo/syncer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@ type Syncer interface {
}

// New creates a new executor based on the name.
func New(name string, cfg *DBConfig) (Syncer, error) {
func New(name string, cfg *DBConfig, safemode bool) (Syncer, error) {
switch name {
case "mysql":
return newMysqlSyncer(cfg)
return newMysqlSyncer(cfg, safemode)
case "print":
return newPrintSyncer()
case "memory":
Expand Down
41 changes: 41 additions & 0 deletions reparo/syncer/syncer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package syncer

import (
"reflect"

"github.com/pingcap/check"
)

type testSyncerSuite struct{}

var _ = check.Suite(&testSyncerSuite{})

func (s *testSyncerSuite) TestNewSyncer(c *check.C) {
cfg := new(DBConfig)

testCases := []struct {
typeStr string
tp reflect.Type
checker check.Checker
}{
{
"print",
reflect.TypeOf(new(printSyncer)),
check.Equals,
}, {
"memory",
reflect.TypeOf(new(MemSyncer)),
check.Equals,
}, {
"print",
reflect.TypeOf(new(MemSyncer)),
check.Not(check.Equals),
},
}

for _, testCase := range testCases {
syncer, err := New(testCase.typeStr, cfg, false)
c.Assert(err, check.IsNil)
c.Assert(reflect.TypeOf(syncer), testCase.checker, testCase.tp)
}
}

0 comments on commit 4650b1b

Please sign in to comment.