Skip to content

Commit

Permalink
[release2.1] reparo: add unit test && add safe mode (#662)
Browse files Browse the repository at this point in the history
* reparo/syncer: add unit test (#540)
* update log and add safe mode config (#652)
  • Loading branch information
WangXiangUSTC authored and july2993 committed Jul 3, 2019
1 parent 5ba5f7b commit d3361c7
Show file tree
Hide file tree
Showing 14 changed files with 495 additions and 14 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ require (
github.com/gorilla/websocket v1.4.0 // indirect
github.com/grpc-ecosystem/go-grpc-middleware v1.0.0 // indirect
github.com/grpc-ecosystem/go-grpc-prometheus v0.0.0-20180820150422-93bf4626fba7 // indirect
github.com/kami-zh/go-capturer v0.0.0-20171211120116-e492ea43421d
github.com/kr/pretty v0.1.0 // indirect
github.com/montanaflynn/stats v0.0.0-20180911141734-db72e6cae808 // indirect
github.com/ngaut/log v0.0.0-20160810023011-cec23d3e10b0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@ github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI=
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
github.com/jonboulle/clockwork v0.1.0 h1:VKV+ZcuP6l3yW9doeqz6ziZGgcynBVQO+obU0+0hcPo=
github.com/jonboulle/clockwork v0.1.0/go.mod h1:Ii8DK3G1RaLaWxj9trq07+26W01tbo22gdxWY5EU2bo=
github.com/kami-zh/go-capturer v0.0.0-20171211120116-e492ea43421d h1:cVtBfNW5XTHiKQe7jDaDBSh/EVM4XLPutLAGboIXuM0=
github.com/kami-zh/go-capturer v0.0.0-20171211120116-e492ea43421d/go.mod h1:P2viExyCEfeWGU259JnaQ34Inuec4R38JCyBx2edgD0=
github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
Expand Down
15 changes: 14 additions & 1 deletion reparo/config.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package reparo

import (
"encoding/json"
"flag"
"fmt"
"os"
Expand All @@ -23,7 +24,7 @@ const (

// Config is the main configuration for the retore tool.
type Config struct {
*flag.FlagSet
*flag.FlagSet `toml:"-" json:"-"`
Dir string `toml:"data-dir" json:"data-dir"`
StartDatetime string `toml:"start-datetime" json:"start-datetime"`
StopDatetime string `toml:"stop-datetime" json:"stop-datetime"`
Expand All @@ -43,6 +44,8 @@ type Config struct {
LogRotate string `toml:"log-rotate" json:"log-rotate"`
LogLevel string `toml:"log-level" json:"log-level"`

SafeMode bool `toml:"safe-mode" json:"safe-mode"`

configFile string
printVersion bool
}
Expand All @@ -67,9 +70,19 @@ func NewConfig() *Config {
fs.StringVar(&c.LogLevel, "L", "info", "log level: debug, info, warn, error, fatal")
fs.StringVar(&c.configFile, "config", "", "[REQUIRED] path to configuration file")
fs.BoolVar(&c.printVersion, "V", false, "print reparo version info")
fs.BoolVar(&c.SafeMode, "safe-mode", false, "enable safe mode to support reentrant")
return c
}

func (c *Config) String() string {
cfgBytes, err := json.Marshal(c)
if err != nil {
log.Errorf("marshal config failed %v", err)
}

return string(cfgBytes)
}

// Parse parses keys/values from command line flags and toml configuration file.
func (c *Config) Parse(args []string) (err error) {
// Parse first to get config file
Expand Down
2 changes: 1 addition & 1 deletion reparo/reparo.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ type Reparo struct {
func New(cfg *Config) (*Reparo, error) {
log.Infof("cfg %+v", cfg)

syncer, err := syncer.New(cfg.DestType, cfg.DestDB)
syncer, err := syncer.New(cfg.DestType, cfg.DestDB, cfg.SafeMode)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down
22 changes: 22 additions & 0 deletions reparo/syncer/memory_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package syncer

import (
"github.com/pingcap/check"
)

type testMemorySuite struct{}

var _ = check.Suite(&testMemorySuite{})

func (s *testMemorySuite) TestMemorySyncer(c *check.C) {
syncer, err := newMemSyncer()
c.Assert(err, check.IsNil)

syncTest(c, Syncer(syncer))

binlog := syncer.GetBinlogs()
c.Assert(binlog, check.HasLen, 2)

err = syncer.Close()
c.Assert(err, check.IsNil)
}
20 changes: 16 additions & 4 deletions reparo/syncer/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,19 +29,31 @@ type mysqlSyncer struct {
loaderErr error
}

var _ Syncer = &mysqlSyncer{}
var (
_ Syncer = &mysqlSyncer{}
defaultWorkerCount = 16
defaultBatchSize = 20
)

// should be only used for unit test to create mock db
var createDB = loader.CreateDB

func newMysqlSyncer(cfg *DBConfig) (*mysqlSyncer, error) {
db, err := loader.CreateDB(cfg.User, cfg.Password, cfg.Host, cfg.Port)
func newMysqlSyncer(cfg *DBConfig, safemode bool) (*mysqlSyncer, error) {
db, err := createDB(cfg.User, cfg.Password, cfg.Host, cfg.Port)
if err != nil {
return nil, errors.Trace(err)
}

loader, err := loader.NewLoader(db, loader.WorkerCount(16), loader.BatchSize(20))
return newMysqlSyncerFromSQLDB(db, safemode)
}

func newMysqlSyncerFromSQLDB(db *sql.DB, safemode bool) (*mysqlSyncer, error) {
loader, err := loader.NewLoader(db, loader.WorkerCount(defaultWorkerCount), loader.BatchSize(defaultBatchSize))
if err != nil {
return nil, errors.Annotate(err, "new loader failed")
}

loader.SetSafeMode(safemode)
syncer := &mysqlSyncer{db: db, loader: loader}
syncer.runLoader()

Expand Down
100 changes: 100 additions & 0 deletions reparo/syncer/mysql_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
package syncer

import (
"database/sql"
"time"

sqlmock "github.com/DATA-DOG/go-sqlmock"
"github.com/pingcap/check"
pb "github.com/pingcap/tidb-binlog/proto/binlog"
)

type testMysqlSuite struct{}

var _ = check.Suite(&testMysqlSuite{})

func (s *testMysqlSuite) TestMysqlSyncer(c *check.C) {
s.testMysqlSyncer(c, true)
s.testMysqlSyncer(c, false)
}

func (s *testMysqlSuite) testMysqlSyncer(c *check.C, safemode bool) {
var (
mock sqlmock.Sqlmock
)
originWorkerCount := defaultWorkerCount
defaultWorkerCount = 1
defer func() {
defaultWorkerCount = originWorkerCount
}()

oldCreateDB := createDB
createDB = func(string, string, string, int) (db *sql.DB, err error) {
db, mock, err = sqlmock.New()
return
}
defer func() {
createDB = oldCreateDB
}()

syncer, err := newMysqlSyncer(&DBConfig{}, safemode)
c.Assert(err, check.IsNil)

mock.ExpectBegin()
mock.ExpectExec("create database test").WillReturnResult(sqlmock.NewResult(0, 0))
mock.ExpectCommit()

mock.ExpectQuery("show columns from `test`.`t1`").WillReturnRows(sqlmock.NewRows([]string{"Field", "Type", "Null", "Key", "Default", "Extra"}).AddRow("a", "int", "YES", "", "NULL", "").AddRow("b", "varchar(24)", "YES", "", "NULL", "").AddRow("c", "varchar(24)", "YES", "", "NULL", ""))

rows := sqlmock.NewRows([]string{"Table", "Non_unique", "Key_name", "Seq_in_index", "Column_name", "Collation", "Cardinality", "Sub_part", "Packed", "Null", "Index_type", "Comment", "Index_comment"})
mock.ExpectQuery("show index from `test`.`t1`").WillReturnRows(rows)

mock.ExpectBegin()
insertPattern := "INSERT INTO"
if safemode {
insertPattern = "REPLACE INTO"
}
mock.ExpectExec(insertPattern).WithArgs(1, "test", nil).WillReturnResult(sqlmock.NewResult(0, 1))
mock.ExpectExec("DELETE FROM").WithArgs(1, "test").WillReturnResult(sqlmock.NewResult(0, 1))
if safemode {
mock.ExpectExec("DELETE FROM").WithArgs().WillReturnResult(sqlmock.NewResult(0, 1))
mock.ExpectExec(insertPattern).WithArgs(nil, nil, "abc").WillReturnResult(sqlmock.NewResult(0, 1))
} else {
mock.ExpectExec("UPDATE").WithArgs("abc", "test").WillReturnResult(sqlmock.NewResult(0, 1))
}
mock.ExpectCommit()

syncTest(c, Syncer(syncer))

err = syncer.Close()
c.Assert(err, check.IsNil)
}

func syncTest(c *check.C, syncer Syncer) {
ddlBinlog := &pb.Binlog{
Tp: pb.BinlogType_DDL,
DdlQuery: []byte("create database test;"),
}
dmlBinlog := &pb.Binlog{
Tp: pb.BinlogType_DML,
DmlData: &pb.DMLData{
Events: generateDMLEvents(c),
},
}

binlogs := make([]*pb.Binlog, 0, 2)
err := syncer.Sync(ddlBinlog, func(binlog *pb.Binlog) {
c.Log(binlog)
binlogs = append(binlogs, binlog)
})
c.Assert(err, check.IsNil)

err = syncer.Sync(dmlBinlog, func(binlog *pb.Binlog) {
c.Log(binlog)
binlogs = append(binlogs, binlog)
})
c.Assert(err, check.IsNil)

time.Sleep(100 * time.Millisecond)
c.Assert(binlogs, check.HasLen, 2)
}
2 changes: 1 addition & 1 deletion reparo/syncer/print.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,6 @@ func printInsertOrDeleteEvent(row [][]byte) {
}

tp := col.Tp[0]
fmt.Printf("%s(%s): %s \n", col.Name, col.MysqlType, formatValueToString(val, tp))
fmt.Printf("%s(%s): %s\n", col.Name, col.MysqlType, formatValueToString(val, tp))
}
}
110 changes: 110 additions & 0 deletions reparo/syncer/print_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
package syncer

import (
"strings"

capturer "github.com/kami-zh/go-capturer"
"github.com/pingcap/check"
pb "github.com/pingcap/tidb-binlog/proto/binlog"
)

type testPrintSuite struct{}

var _ = check.Suite(&testPrintSuite{})

func (s *testPrintSuite) TestPrintSyncer(c *check.C) {
syncer, err := newPrintSyncer()
c.Assert(err, check.IsNil)

out := capturer.CaptureStdout(func() {
syncTest(c, Syncer(syncer))
})

c.Assert(out, check.Equals,
"DDL query: create database test;\n"+
"schema: test; table: t1; type: Insert\n"+
"a(int): 1\n"+
"b(varchar): test\n"+
"schema: test; table: t1; type: Delete\n"+
"a(int): 1\n"+
"b(varchar): test\n"+
"schema: test; table: t1; type: Update\n"+
"c(varchar): test => abc\n")

err = syncer.Close()
c.Assert(err, check.IsNil)
}

func (s *testPrintSuite) TestPrintEventHeader(c *check.C) {
schema := "test"
table := "t1"
event := &pb.Event{
Tp: pb.EventType_Insert,
SchemaName: &schema,
TableName: &table,
}

out := capturer.CaptureStdout(func() {
printEventHeader(event)
})
lines := strings.Split(strings.TrimSpace(out), "\n")
c.Assert(lines, check.HasLen, 1)
c.Assert(lines[0], check.Matches, ".*schema: test; table: t1; type: Insert.*")
}

func (s *testPrintSuite) TestPrintDDL(c *check.C) {
ddlBinlog := &pb.Binlog{
Tp: pb.BinlogType_DDL,
DdlQuery: []byte("create database test;"),
}

out := capturer.CaptureStdout(func() {
printDDL(ddlBinlog)
})
lines := strings.Split(strings.TrimSpace(out), "\n")
c.Assert(lines, check.HasLen, 1)
c.Assert(lines[0], check.Matches, ".*DDL query: create database test;.*")
}

func (s *testPrintSuite) TestPrintRow(c *check.C) {
cols := generateColumns(c)

insertEvent := &pb.Event{
Tp: pb.EventType_Insert,
Row: [][]byte{cols[0], cols[1]},
}

out := capturer.CaptureStdout(func() {
printEvent(insertEvent)
})
lines := strings.Split(strings.TrimSpace(out), "\n")
c.Assert(lines, check.HasLen, 3)
c.Assert(lines[0], check.Equals, "schema: ; table: ; type: Insert")
c.Assert(lines[1], check.Equals, "a(int): 1")
c.Assert(lines[2], check.Equals, "b(varchar): test")

deleteEvent := &pb.Event{
Tp: pb.EventType_Delete,
Row: [][]byte{cols[0], cols[1]},
}
out = capturer.CaptureStdout(func() {
printEvent(deleteEvent)
})
lines = strings.Split(strings.TrimSpace(out), "\n")
c.Assert(lines, check.HasLen, 3)
c.Assert(lines[0], check.Equals, "schema: ; table: ; type: Delete")
c.Assert(lines[1], check.Equals, "a(int): 1")
c.Assert(lines[2], check.Equals, "b(varchar): test")

updateEvent := &pb.Event{
Tp: pb.EventType_Update,
Row: [][]byte{cols[2]},
}
out = capturer.CaptureStdout(func() {
printEvent(updateEvent)
})
lines = strings.Split(strings.TrimSpace(out), "\n")
c.Assert(lines, check.HasLen, 2)
c.Assert(lines[0], check.Equals, "schema: ; table: ; type: Update")
c.Assert(lines[1], check.Equals, "c(varchar): test => abc")
}
4 changes: 2 additions & 2 deletions reparo/syncer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@ type Syncer interface {
}

// New creates a new executor based on the name.
func New(name string, cfg *DBConfig) (Syncer, error) {
func New(name string, cfg *DBConfig, safemode bool) (Syncer, error) {
switch name {
case "mysql":
return newMysqlSyncer(cfg)
return newMysqlSyncer(cfg, safemode)
case "print":
return newPrintSyncer()
case "memory":
Expand Down
Loading

0 comments on commit d3361c7

Please sign in to comment.