Skip to content
This repository has been archived by the owner on Aug 21, 2023. It is now read-only.

fix consistency bug #128

Merged
merged 4 commits into from
Aug 7, 2020
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 20 additions & 4 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ LDFLAGS += -X "github.com/pingcap/dumpling/v4/cli.GitHash=$(shell git rev-parse
LDFLAGS += -X "github.com/pingcap/dumpling/v4/cli.GitBranch=$(shell git rev-parse --abbrev-ref HEAD)"
LDFLAGS += -X "github.com/pingcap/dumpling/v4/cli.GoVersion=$(shell go version)"

FAILPOINT_ENABLE := $$(find $$PWD/ -type d | grep -vE "(\.git|tools)" | xargs bin/failpoint-ctl enable)
FAILPOINT_DISABLE := $$(find $$PWD/ -type d | grep -vE "(\.git|tools)" | xargs bin/failpoint-ctl disable)

GO = go
GOLDFLAGS = -ldflags '$(LDFLAGS)'
ifeq ("$(WITH_RACE)", "1")
Expand All @@ -17,8 +20,21 @@ build: bin/dumpling
bin/%: cmd/%/main.go $(wildcard v4/**/*.go)
$(GO) build $(GOLDFLAGS) -tags codes -o $@ $<

test:
$(GO) list ./... | xargs $(GO) test $(GOLDFLAGS) -coverprofile=coverage.txt -covermode=atomic
test: failpoint-enable
$(GO) list ./... | xargs $(GO) test $(GOLDFLAGS) -coverprofile=coverage.txt -covermode=atomic ||{ $(FAILPOINT_DISABLE); exit 1; }
@make failpoint-disable

integration_test: failpoint-enable bin/dumpling
@make failpoint-disable
./tests/run.sh ||{ $(FAILPOINT_DISABLE); exit 1; }

bin/failpoint-ctl: go.mod
$(GO) build -o $@ github.com/pingcap/failpoint/failpoint-ctl

failpoint-enable: bin/failpoint-ctl
# Converting gofail failpoints...
@$(FAILPOINT_ENABLE)

integration_test: bin/dumpling
./tests/run.sh
failpoint-disable: bin/failpoint-ctl
# Restoring gofail failpoints...
@$(FAILPOINT_DISABLE)
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,13 @@ require (
github.com/go-sql-driver/mysql v1.5.0
github.com/pingcap/check v0.0.0-20191216031241-8a5a85928f12
github.com/pingcap/errors v0.11.5-0.20190809092503-95897b64e011
github.com/pingcap/failpoint v0.0.0-20200702092429-9f69995143ce
github.com/pingcap/log v0.0.0-20200511115504-543df19646ad
github.com/pingcap/pd/v4 v4.0.0
github.com/pingcap/tidb-tools v4.0.0-rc.2.0.20200521050818-6dd445d83fe0+incompatible
github.com/pkg/errors v0.9.1
github.com/soheilhy/cmux v0.1.4
github.com/spf13/pflag v1.0.3
github.com/stretchr/testify v1.5.1 // indirect
go.etcd.io/etcd v0.5.0-alpha.5.0.20191023171146-3cf2f69b5738
go.uber.org/zap v1.14.0
golang.org/x/mod v0.3.0 // indirect
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -228,9 +228,12 @@ github.com/pingcap/check v0.0.0-20191216031241-8a5a85928f12/go.mod h1:PYMCGwN0JH
github.com/pingcap/errcode v0.0.0-20180921232412-a1a7271709d9 h1:KH4f4Si9XK6/IW50HtoaiLIFHGkapOM6w83za47UYik=
github.com/pingcap/errcode v0.0.0-20180921232412-a1a7271709d9/go.mod h1:4b2X8xSqxIroj/IZ9MX/VGZhAwc11wB9wRIzHvz6SeM=
github.com/pingcap/errors v0.11.0/go.mod h1:Oi8TUi2kEtXXLMJk9l1cGmz20kV3TaQ0usTwv5KuLY8=
github.com/pingcap/errors v0.11.4/go.mod h1:Oi8TUi2kEtXXLMJk9l1cGmz20kV3TaQ0usTwv5KuLY8=
github.com/pingcap/errors v0.11.5-0.20190809092503-95897b64e011 h1:58naV4XMEqm0hl9LcYo6cZoGBGiLtefMQMF/vo3XLgQ=
github.com/pingcap/errors v0.11.5-0.20190809092503-95897b64e011/go.mod h1:Oi8TUi2kEtXXLMJk9l1cGmz20kV3TaQ0usTwv5KuLY8=
github.com/pingcap/failpoint v0.0.0-20191029060244-12f4ac2fd11d/go.mod h1:DNS3Qg7bEDhU6EXNHF+XSv/PGznQaMJ5FWvctpm6pQI=
github.com/pingcap/failpoint v0.0.0-20200702092429-9f69995143ce h1:Y1kCxlCtlPTMtVcOkjUcuQKh+YrluSo7+7YMCQSzy30=
github.com/pingcap/failpoint v0.0.0-20200702092429-9f69995143ce/go.mod h1:w4PEZ5y16LeofeeGwdgZB4ddv9bLyDuIX+ljstgKZyk=
github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w=
github.com/pingcap/kvproto v0.0.0-20200411081810-b85805c9476c/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI=
github.com/pingcap/kvproto v0.0.0-20200518112156-d4aeb467de29 h1:NpW1OuYrIl+IQrSsVbtyHpHpazmSCHy+ysrOixY0xY4=
Expand Down Expand Up @@ -270,6 +273,7 @@ github.com/rs/cors v1.7.0/go.mod h1:gFx+x8UowdsKA9AchylcLynDq+nNFfI8FkUZdN/jGCU=
github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
github.com/sasha-s/go-deadlock v0.2.0/go.mod h1:StQn567HiB1fF2yJ44N9au7wOhrPS3iZqiDbRupzT10=
github.com/satori/go.uuid v1.2.0/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0=
github.com/sergi/go-diff v1.0.1-0.20180205163309-da645544ed44 h1:tB9NOR21++IjLyVx3/PCPhWMwqGNCMQEH96A6dMZ/gc=
github.com/sergi/go-diff v1.0.1-0.20180205163309-da645544ed44/go.mod h1:0CfEIISq7TuYL3j771MWULgwwjU+GofnZX9QAmXWZgo=
github.com/shirou/gopsutil v2.19.10+incompatible/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA=
github.com/shirou/w32 v0.0.0-20160930032740-bb4de0191aa4/go.mod h1:qsXQc7+bwAM3Q1u/4XEfrquwF8Lw7D7y5cD8CuHnfIc=
Expand Down
60 changes: 60 additions & 0 deletions tests/consistency/run.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
#!/bin/sh

set -eu
cur=$(cd `dirname $0`; pwd)

DB_NAME="mysql_consistency"
TABLE_NAME="t"

# drop database on mysql
run_sql "drop database if exists \`$DB_NAME\`;"

# build data on mysql
run_sql "create database $DB_NAME;"
run_sql "create table $DB_NAME.$TABLE_NAME (a int(255));"

# insert 100 records
run_sql "insert into $DB_NAME.$TABLE_NAME values $(seq -s, 100 | sed 's/,*$//g' | sed "s/[0-9]*/('1')/g");"

# dumping with consistency flush
export DUMPLING_TEST_DATABASE=$DB_NAME
export GO_FAILPOINTS="github.com/pingcap/dumpling/v4/export/ConsistencyCheck=return(\"5s\")"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
export GO_FAILPOINTS="github.com/pingcap/dumpling/v4/export/ConsistencyCheck=return(\"5s\")"
export GO_FAILPOINTS="github.com/pingcap/dumpling/v4/export/ConsistencyCheck=1*sleep(5000)"

run_dumpling &
# wait dumpling process to start to sleep
sleep 2

# record metadata info
metadata=`run_sql "show master status;"`
metaLog=`echo $metadata | awk -F 'File:' '{print $2}' | awk '{print $1}'`
metaPos=`echo $metadata | awk -F 'Position:' '{print $2}' | awk '{print $1}'`
metaGTID=`echo $metadata | awk -F 'Executed_Gtid_Set:' '{print $2}' | awk '{print $1}'`
# insert 100 more records, test whether dumpling will dump these data out
run_sql "insert into $DB_NAME.$TABLE_NAME values $(seq -s, 100 | sed 's/,*$//g' | sed "s/[0-9]*/('1')/g");"

wait

# check data record count
cnt=`grep -o "(1)" ${DUMPLING_OUTPUT_DIR}/${DB_NAME}.${TABLE_NAME}.0.sql|wc -l`
echo "1st records count is ${cnt}"
[ $cnt = 100 ]

# check metadata
echo "metaLog: $metaLog"
echo "metaPos: $metaPos"
echo "metaGTID: $metaGTID"
if [ $metaLog != "" ]; then
[ `grep -o "Log: $metaLog" ${DUMPLING_OUTPUT_DIR}/metadata|wc -l` ]
fi
if [ $metaPos != "" ]; then
[ `grep -o "Pos: $metaPos" ${DUMPLING_OUTPUT_DIR}/metadata|wc -l` ]
fi
if [ $metaGTID != "" ]; then
[ `grep -o "GTID: $metaGTID" ${DUMPLING_OUTPUT_DIR}/metadata|wc -l` ]
fi

# test dumpling normally
export GO_FAILPOINTS=""
run_dumpling
cnt=`grep -o "(1)" ${DUMPLING_OUTPUT_DIR}/${DB_NAME}.${TABLE_NAME}.0.sql|wc -l`
echo "2nd records count is ${cnt}"
[ $cnt = 200 ]
37 changes: 20 additions & 17 deletions v4/export/consistency.go
Original file line number Diff line number Diff line change
@@ -1,22 +1,29 @@
package export

import (
"context"
"database/sql"
"errors"
"fmt"
)

func NewConsistencyController(conf *Config, session *sql.DB) (ConsistencyController, error) {
func NewConsistencyController(ctx context.Context, conf *Config, session *sql.DB) (ConsistencyController, error) {
resolveAutoConsistency(conf)
conn, err := session.Conn(ctx)
if err != nil {
return nil, err
}
switch conf.Consistency {
case "flush":
return &ConsistencyFlushTableWithReadLock{
ctx: ctx,
serverType: conf.ServerInfo.ServerType,
db: session,
conn: conn,
}, nil
case "lock":
return &ConsistencyLockDumpingTables{
db: session,
ctx: ctx,
conn: conn,
allTables: conf.Tables,
}, nil
case "snapshot":
Expand Down Expand Up @@ -47,34 +54,33 @@ func (c *ConsistencyNone) TearDown() error {
}

type ConsistencyFlushTableWithReadLock struct {
ctx context.Context
serverType ServerType
db *sql.DB
conn *sql.Conn
}

func (c *ConsistencyFlushTableWithReadLock) Setup() error {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about

Suggested change
func (c *ConsistencyFlushTableWithReadLock) Setup() error {
func (c *ConsistencyFlushTableWithReadLock) Setup(ctx context.Context) error {

if c.serverType == ServerTypeTiDB {
return withStack(errors.New("'flush table with read lock' cannot be used to ensure the consistency in TiDB"))
}
return FlushTableWithReadLock(c.db)
return FlushTableWithReadLock(c.ctx, c.conn)
}

func (c *ConsistencyFlushTableWithReadLock) TearDown() error {
err := c.db.Ping()
if err != nil {
return withStack(errors.New("ConsistencyFlushTableWithReadLock lost database connection"))
}
return UnlockTables(c.db)
defer c.conn.Close()
return UnlockTables(c.ctx, c.conn)
}

type ConsistencyLockDumpingTables struct {
db *sql.DB
ctx context.Context
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

conn *sql.Conn
allTables DatabaseTables
}

func (c *ConsistencyLockDumpingTables) Setup() error {
for dbName, tables := range c.allTables {
for _, table := range tables {
err := LockTables(c.db, dbName, table.Name)
err := LockTables(c.ctx, c.conn, dbName, table.Name)
if err != nil {
return err
}
Expand All @@ -84,11 +90,8 @@ func (c *ConsistencyLockDumpingTables) Setup() error {
}

func (c *ConsistencyLockDumpingTables) TearDown() error {
err := c.db.Ping()
if err != nil {
return withStack(errors.New("ConsistencyLockDumpingTables lost database connection"))
}
return UnlockTables(c.db)
defer c.conn.Close()
return UnlockTables(c.ctx, c.conn)
}

const showMasterStatusFieldNum = 5
Expand Down
21 changes: 13 additions & 8 deletions v4/export/consistency_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package export

import (
"context"
"errors"
"strings"

Expand All @@ -27,19 +28,21 @@ func (s *testConsistencySuite) TestConsistencyController(c *C) {
db, mock, err := sqlmock.New()
c.Assert(err, IsNil)
defer db.Close()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
conf := DefaultConfig()
resultOk := sqlmock.NewResult(0, 1)

conf.Consistency = "none"
ctrl, _ := NewConsistencyController(conf, db)
ctrl, _ := NewConsistencyController(ctx, conf, db)
_, ok := ctrl.(*ConsistencyNone)
c.Assert(ok, IsTrue)
s.assertLifetimeErrNil(ctrl, c)

conf.Consistency = "flush"
mock.ExpectExec("FLUSH TABLES WITH READ LOCK").WillReturnResult(resultOk)
mock.ExpectExec("UNLOCK TABLES").WillReturnResult(resultOk)
ctrl, _ = NewConsistencyController(conf, db)
ctrl, _ = NewConsistencyController(ctx, conf, db)
_, ok = ctrl.(*ConsistencyFlushTableWithReadLock)
c.Assert(ok, IsTrue)
s.assertLifetimeErrNil(ctrl, c)
Expand All @@ -49,7 +52,7 @@ func (s *testConsistencySuite) TestConsistencyController(c *C) {

conf.Consistency = "snapshot"
conf.ServerInfo.ServerType = ServerTypeTiDB
ctrl, _ = NewConsistencyController(conf, db)
ctrl, _ = NewConsistencyController(ctx, conf, db)
_, ok = ctrl.(*ConsistencyNone)
c.Assert(ok, IsTrue)
s.assertLifetimeErrNil(ctrl, c)
Expand All @@ -62,7 +65,7 @@ func (s *testConsistencySuite) TestConsistencyController(c *C) {
mock.ExpectExec("LOCK TABLES").WillReturnResult(resultOk)
}
mock.ExpectExec("UNLOCK TABLES").WillReturnResult(resultOk)
ctrl, _ = NewConsistencyController(conf, db)
ctrl, _ = NewConsistencyController(ctx, conf, db)
_, ok = ctrl.(*ConsistencyLockDumpingTables)
c.Assert(ok, IsTrue)
s.assertLifetimeErrNil(ctrl, c)
Expand Down Expand Up @@ -96,31 +99,33 @@ func (s *testConsistencySuite) TestConsistencyControllerError(c *C) {
db, mock, err := sqlmock.New()
c.Assert(err, IsNil)
defer db.Close()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
conf := DefaultConfig()

conf.Consistency = "invalid_str"
_, err = NewConsistencyController(conf, db)
_, err = NewConsistencyController(ctx, conf, db)
c.Assert(err, NotNil)
c.Assert(strings.Contains(err.Error(), "invalid consistency option"), IsTrue)

// snapshot consistency is only available in TiDB
conf.Consistency = "snapshot"
conf.ServerInfo.ServerType = ServerTypeUnknown
_, err = NewConsistencyController(conf, db)
_, err = NewConsistencyController(ctx, conf, db)
c.Assert(err, NotNil)

// flush consistency is unavailable in TiDB
conf.Consistency = "flush"
conf.ServerInfo.ServerType = ServerTypeTiDB
ctrl, _ := NewConsistencyController(conf, db)
ctrl, _ := NewConsistencyController(ctx, conf, db)
err = ctrl.Setup()
c.Assert(err, NotNil)

// lock table fail
conf.Consistency = "lock"
conf.Tables = NewDatabaseTables().AppendTables("db", "t")
mock.ExpectExec("LOCK TABLE").WillReturnError(errors.New(""))
ctrl, _ = NewConsistencyController(conf, db)
ctrl, _ = NewConsistencyController(ctx, conf, db)
err = ctrl.Setup()
c.Assert(err, NotNil)
}
21 changes: 16 additions & 5 deletions v4/export/dump.go
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/pingcap/dumpling/v4/log"

_ "github.com/go-sql-driver/mysql"
"github.com/pingcap/failpoint"
pd "github.com/pingcap/pd/v4/client"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
Expand Down Expand Up @@ -133,7 +134,7 @@ func Dump(pCtx context.Context, conf *Config) (err error) {
conn.Close()
}

conCtrl, err := NewConsistencyController(conf, pool)
conCtrl, err := NewConsistencyController(ctx, conf, pool)
if err != nil {
return err
}
Expand All @@ -146,10 +147,6 @@ func Dump(pCtx context.Context, conf *Config) (err error) {
return err
}

if err = conCtrl.TearDown(); err != nil {
return err
}

// for other consistencies, we should get table list after consistency is set up and GlobalMetaData is cached
// for other consistencies, record snapshot after whole tables are locked. The recorded meta info is exactly the locked snapshot.
if conf.Consistency != "lock" {
Expand All @@ -165,6 +162,20 @@ func Dump(pCtx context.Context, conf *Config) (err error) {
connectPool.releaseConn(conn)
}

if err = conCtrl.TearDown(); err != nil {
return err
}

failpoint.Inject("ConsistencyCheck", func(val failpoint.Value) {
interval, err := time.ParseDuration(val.(string))
if err != nil {
log.Warn("inject failpoint ConsistencyCheck failed", zap.Reflect("value", val), zap.Error(err))
} else {
log.Info("start to sleep for failpoint ConsistencyCheck", zap.Duration("sleepTime", interval))
time.Sleep(interval)
}
})
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
failpoint.Inject("ConsistencyCheck", func(val failpoint.Value) {
interval, err := time.ParseDuration(val.(string))
if err != nil {
log.Warn("inject failpoint ConsistencyCheck failed", zap.Reflect("value", val), zap.Error(err))
} else {
log.Info("start to sleep for failpoint ConsistencyCheck", zap.Duration("sleepTime", interval))
time.Sleep(interval)
}
})
failpoint.Inject("consistency-check", nil)


var writer Writer
switch strings.ToLower(conf.FileType) {
case "sql":
Expand Down
13 changes: 7 additions & 6 deletions v4/export/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ func SelectAllFromTable(conf *Config, db *sql.Conn, database, table string) (Tab
}

func SelectFromSql(conf *Config, db *sql.Conn) (TableDataIR, error) {
log.Info("dump data from sql", zap.String("sql", conf.Sql))
rows, err := db.QueryContext(context.Background(), conf.Sql)
if err != nil {
return nil, withStack(errors.WithMessage(err, conf.Sql))
Expand Down Expand Up @@ -295,18 +296,18 @@ func GetUniqueIndexName(db *sql.Conn, database, table string) (string, error) {
return colName, nil
}

func FlushTableWithReadLock(db *sql.DB) error {
_, err := db.Exec("FLUSH TABLES WITH READ LOCK")
func FlushTableWithReadLock(ctx context.Context, db *sql.Conn) error {
_, err := db.ExecContext(ctx, "FLUSH TABLES WITH READ LOCK")
return withStack(err)
}

func LockTables(db *sql.DB, database, table string) error {
_, err := db.Exec(fmt.Sprintf("LOCK TABLES `%s`.`%s` READ", escapeString(database), escapeString(table)))
func LockTables(ctx context.Context, db *sql.Conn, database, table string) error {
_, err := db.ExecContext(ctx, fmt.Sprintf("LOCK TABLES `%s`.`%s` READ", escapeString(database), escapeString(table)))
return withStack(err)
}

func UnlockTables(db *sql.DB) error {
_, err := db.Exec("UNLOCK TABLES")
func UnlockTables(ctx context.Context, db *sql.Conn) error {
_, err := db.ExecContext(ctx, "UNLOCK TABLES")
return withStack(err)
}

Expand Down