Skip to content

Commit

Permalink
Optimize mysql consistency (pingcap/dumpling#121)
Browse files Browse the repository at this point in the history
* refine conn pool
  • Loading branch information
lichunzhu authored and tisonkun committed Oct 20, 2021
1 parent c8d07b8 commit 1406d9b
Show file tree
Hide file tree
Showing 14 changed files with 281 additions and 171 deletions.
2 changes: 1 addition & 1 deletion dumpling/cmd/dumpling/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
"github.com/docker/go-units"
"github.com/pingcap/dumpling/v4/cli"
"github.com/pingcap/dumpling/v4/export"
"github.com/pingcap/log"
"github.com/pingcap/dumpling/v4/log"
filter "github.com/pingcap/tidb-tools/pkg/table-filter"
"github.com/spf13/pflag"
"go.uber.org/zap"
Expand Down
36 changes: 36 additions & 0 deletions dumpling/v4/export/connectionsPool.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package export

import (
"context"
"database/sql"
)

type connectionsPool struct {
conns chan *sql.Conn
}

func newConnectionsPool(ctx context.Context, n int, pool *sql.DB) (*connectionsPool, error) {
connectPool := &connectionsPool{
conns: make(chan *sql.Conn, n),
}
for i := 0; i < n; i++ {
conn, err := createConnWithConsistency(ctx, pool)
if err != nil {
return nil, err
}
connectPool.releaseConn(conn)
}
return connectPool, nil
}

func (r *connectionsPool) getConn() *sql.Conn {
return <-r.conns
}

func (r *connectionsPool) releaseConn(conn *sql.Conn) {
select {
case r.conns <- conn:
default:
panic("put a redundant conn")
}
}
128 changes: 82 additions & 46 deletions dumpling/v4/export/dump.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,15 @@ func Dump(pCtx context.Context, conf *Config) (err error) {
}

if conf.Snapshot == "" && (doPdGC || conf.Consistency == "snapshot") {
conf.Snapshot, err = getSnapshot(pool)
conn, err := pool.Conn(ctx)
if err != nil {
return withStack(err)
}
conf.Snapshot, err = getSnapshot(conn)
if err != nil {
return err
}
conn.Close()
}

if conf.Snapshot != "" {
Expand Down Expand Up @@ -100,9 +105,10 @@ func Dump(pCtx context.Context, conf *Config) (err error) {
"After dumping: run sql `update mysql.tidb set VARIABLE_VALUE = '10m' where VARIABLE_NAME = 'tikv_gc_life_time';` in tidb.\n")
}

pool, err = resetDBWithSessionParams(pool, conf.getDSN(""), conf.SessionParams)
if err != nil {
return err
if newPool, err := resetDBWithSessionParams(pool, conf.getDSN(""), conf.SessionParams); err != nil {
return withStack(err)
} else {
pool = newPool
}

m := newGlobalMetadata(conf.OutputDirPath)
Expand All @@ -112,14 +118,19 @@ func Dump(pCtx context.Context, conf *Config) (err error) {
// for consistency lock, we should lock tables at first to get the tables we want to lock & dump
// for consistency lock, record meta pos before lock tables because other tables may still be modified while locking tables
if conf.Consistency == "lock" {
conn, err := createConnWithConsistency(ctx, pool)
if err != nil {
return err
}
m.recordStartTime(time.Now())
err = m.recordGlobalMetaData(pool, conf.ServerInfo.ServerType)
err = m.recordGlobalMetaData(conn, conf.ServerInfo.ServerType)
if err != nil {
log.Info("get global metadata failed", zap.Error(err))
}
if err = prepareTableListToDump(conf, pool); err != nil {
if err = prepareTableListToDump(conf, conn); err != nil {
return err
}
conn.Close()
}

conCtrl, err := NewConsistencyController(conf, pool)
Expand All @@ -130,17 +141,28 @@ func Dump(pCtx context.Context, conf *Config) (err error) {
return err
}

connectPool, err := newConnectionsPool(ctx, conf.Threads, pool)
if err != nil {
return err
}

if err = conCtrl.TearDown(); err != nil {
return err
}

// for other consistencies, we should get table list after consistency is set up and GlobalMetaData is cached
// for other consistencies, record snapshot after whole tables are locked. The recorded meta info is exactly the locked snapshot.
if conf.Consistency != "lock" {
m.recordStartTime(time.Now())
err = m.recordGlobalMetaData(pool, conf.ServerInfo.ServerType)
conn := connectPool.getConn()
err = m.recordGlobalMetaData(conn, conf.ServerInfo.ServerType)
if err != nil {
log.Info("get global metadata failed", zap.Error(err))
}
if err = prepareTableListToDump(conf, pool); err != nil {
if err = prepareTableListToDump(conf, conn); err != nil {
return err
}
connectPool.releaseConn(conn)
}

var writer Writer
Expand All @@ -155,24 +177,26 @@ func Dump(pCtx context.Context, conf *Config) (err error) {
}

if conf.Sql == "" {
if err = dumpDatabases(ctx, conf, pool, writer); err != nil {
if err = dumpDatabases(ctx, conf, connectPool, writer); err != nil {
return err
}
} else {
if err = dumpSql(ctx, conf, pool, writer); err != nil {
if err = dumpSql(ctx, conf, connectPool, writer); err != nil {
return err
}
}

m.recordFinishTime(time.Now())

return conCtrl.TearDown()
return nil
}

func dumpDatabases(ctx context.Context, conf *Config, db *sql.DB, writer Writer) error {
func dumpDatabases(ctx context.Context, conf *Config, connectPool *connectionsPool, writer Writer) error {
allTables := conf.Tables
var g errgroup.Group
for dbName, tables := range allTables {
createDatabaseSQL, err := ShowCreateDatabase(db, dbName)
conn := connectPool.getConn()
createDatabaseSQL, err := ShowCreateDatabase(conn, dbName)
connectPool.releaseConn(conn)
if err != nil {
return err
}
Expand All @@ -183,24 +207,35 @@ func dumpDatabases(ctx context.Context, conf *Config, db *sql.DB, writer Writer)
if len(tables) == 0 {
continue
}
rateLimit := newRateLimit(conf.Threads)
var g errgroup.Group
for _, table := range tables {
table := table
g.Go(func() error {
rateLimit.getToken()
defer rateLimit.putToken()
return dumpTable(ctx, conf, db, dbName, table, writer)
})
}
if err := g.Wait(); err != nil {
return err
conn := connectPool.getConn()
tableDataIRArray, err := dumpTable(ctx, conf, conn, dbName, table, writer)
if err != nil {
return err
}
connectPool.releaseConn(conn)
for _, tableIR := range tableDataIRArray {
tableIR := tableIR
g.Go(func() error {
conn := connectPool.getConn()
defer connectPool.releaseConn(conn)
err := tableIR.Start(ctx, conn)
if err != nil {
return err
}
return writer.WriteTableData(ctx, tableIR)
})
}
}
}
if err := g.Wait(); err != nil {
return err
}
return nil
}

func prepareTableListToDump(conf *Config, pool *sql.DB) error {
func prepareTableListToDump(conf *Config, pool *sql.Conn) error {
databases, err := prepareDumpingDatabases(conf, pool)
if err != nil {
return err
Expand All @@ -223,54 +258,56 @@ func prepareTableListToDump(conf *Config, pool *sql.DB) error {
return nil
}

func dumpSql(ctx context.Context, conf *Config, db *sql.DB, writer Writer) error {
tableIR, err := SelectFromSql(conf, db)
func dumpSql(ctx context.Context, conf *Config, connectPool *connectionsPool, writer Writer) error {
conn := connectPool.getConn()
tableIR, err := SelectFromSql(conf, conn)
connectPool.releaseConn(conn)
if err != nil {
return err
}

return writer.WriteTableData(ctx, tableIR)
}

func dumpTable(ctx context.Context, conf *Config, db *sql.DB, dbName string, table *TableInfo, writer Writer) error {
func dumpTable(ctx context.Context, conf *Config, db *sql.Conn, dbName string, table *TableInfo, writer Writer) ([]TableDataIR, error) {
tableName := table.Name
if !conf.NoSchemas {
if table.Type == TableTypeView {
viewName := table.Name
createViewSQL, err := ShowCreateView(db, dbName, viewName)
if err != nil {
return err
return nil, err
}
return writer.WriteTableMeta(ctx, dbName, viewName, createViewSQL)
return nil, writer.WriteTableMeta(ctx, dbName, viewName, createViewSQL)
}
createTableSQL, err := ShowCreateTable(db, dbName, tableName)
if err != nil {
return err
return nil, err
}
if err := writer.WriteTableMeta(ctx, dbName, tableName, createTableSQL); err != nil {
return err
return nil, err
}
}
// Do not dump table data and return nil
if conf.NoData {
return nil
return nil, nil
}

if conf.Rows != UnspecifiedSize {
finished, err := concurrentDumpTable(ctx, writer, conf, db, dbName, tableName)
finished, chunksIterArray, err := concurrentDumpTable(ctx, conf, db, dbName, tableName)
if err != nil || finished {
return err
return chunksIterArray, err
}
}
tableIR, err := SelectAllFromTable(conf, db, dbName, tableName)
if err != nil {
return err
return nil, err
}

return writer.WriteTableData(ctx, tableIR)
return []TableDataIR{tableIR}, nil
}

func concurrentDumpTable(ctx context.Context, writer Writer, conf *Config, db *sql.DB, dbName string, tableName string) (bool, error) {
func concurrentDumpTable(ctx context.Context, conf *Config, db *sql.Conn, dbName string, tableName string) (bool, []TableDataIR, error) {
// try dump table concurrently by split table to chunks
chunksIterCh := make(chan TableDataIR, defaultDumpThreads)
errCh := make(chan error, defaultDumpThreads)
Expand All @@ -279,6 +316,7 @@ func concurrentDumpTable(ctx context.Context, writer Writer, conf *Config, db *s
ctx1, cancel1 := context.WithCancel(ctx)
defer cancel1()
var g errgroup.Group
chunksIterArray := make([]TableDataIR, 0)
g.Go(func() error {
splitTableDataIntoChunks(ctx1, chunksIterCh, errCh, linear, dbName, tableName, db, conf)
return nil
Expand All @@ -288,24 +326,22 @@ Loop:
for {
select {
case <-ctx.Done():
return true, nil
return true, chunksIterArray, nil
case <-linear:
return false, nil
return false, chunksIterArray, nil
case chunksIter, ok := <-chunksIterCh:
if !ok {
break Loop
}
g.Go(func() error {
return writer.WriteTableData(ctx, chunksIter)
})
chunksIterArray = append(chunksIterArray, chunksIter)
case err := <-errCh:
return false, err
return false, chunksIterArray, err
}
}
if err := g.Wait(); err != nil {
return true, err
return true, chunksIterArray, err
}
return true, nil
return true, chunksIterArray, nil
}

func updateServiceSafePoint(ctx context.Context, pdClient pd.Client, ttl int64, snapshotTS uint64) {
Expand Down
30 changes: 27 additions & 3 deletions dumpling/v4/export/dump_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package export

import (
"context"
"database/sql"
"fmt"
"strconv"

Expand All @@ -27,6 +28,14 @@ func newMockWriter() *mockWriter {
}
}

func newMockConnectPool(c *C, db *sql.DB) *connectionsPool {
conn, err := db.Conn(context.Background())
c.Assert(err, IsNil)
connectPool := &connectionsPool{conns: make(chan *sql.Conn, 1)}
connectPool.releaseConn(conn)
return connectPool
}

func (m *mockWriter) WriteDatabaseMeta(ctx context.Context, db, createSQL string) error {
m.databaseMeta[db] = createSQL
return nil
Expand Down Expand Up @@ -64,7 +73,8 @@ func (s *testDumpSuite) TestDumpDatabase(c *C) {
mock.ExpectQuery("SELECT (.) FROM `test`.`t`").WillReturnRows(rows)

mockWriter := newMockWriter()
err = dumpDatabases(context.Background(), mockConfig, db, mockWriter)
connectPool := newMockConnectPool(c, db)
err = dumpDatabases(context.Background(), mockConfig, connectPool, mockWriter)
c.Assert(err, IsNil)

c.Assert(len(mockWriter.databaseMeta), Equals, 1)
Expand All @@ -78,6 +88,8 @@ func (s *testDumpSuite) TestDumpTable(c *C) {
mockConfig.SortByPk = false
db, mock, err := sqlmock.New()
c.Assert(err, IsNil)
conn, err := db.Conn(context.Background())
c.Assert(err, IsNil)

showCreateTableResult := "CREATE TABLE t (a INT)"
rows := mock.NewRows([]string{"Table", "Create Table"}).AddRow("t", showCreateTableResult)
Expand All @@ -90,8 +102,13 @@ func (s *testDumpSuite) TestDumpTable(c *C) {
mock.ExpectQuery("SELECT (.) FROM `test`.`t`").WillReturnRows(rows)

mockWriter := newMockWriter()
err = dumpTable(context.Background(), mockConfig, db, "test", &TableInfo{Name: "t"}, mockWriter)
ctx := context.Background()
tableIRArray, err := dumpTable(ctx, mockConfig, conn, "test", &TableInfo{Name: "t"}, mockWriter)
c.Assert(err, IsNil)
for _, tableIR := range tableIRArray {
c.Assert(tableIR.Start(ctx, conn), IsNil)
c.Assert(mockWriter.WriteTableData(ctx, tableIR), IsNil)
}

c.Assert(mockWriter.tableMeta["test.t"], Equals, showCreateTableResult)
c.Assert(len(mockWriter.tableData), Equals, 1)
Expand Down Expand Up @@ -121,6 +138,8 @@ func (s *testDumpSuite) TestDumpTableWhereClause(c *C) {
mockConfig.Where = "a > 3 and a < 9"
db, mock, err := sqlmock.New()
c.Assert(err, IsNil)
conn, err := db.Conn(context.Background())
c.Assert(err, IsNil)

showCreateTableResult := "CREATE TABLE t (a INT)"
rows := mock.NewRows([]string{"Table", "Create Table"}).AddRow("t", showCreateTableResult)
Expand All @@ -137,8 +156,13 @@ func (s *testDumpSuite) TestDumpTableWhereClause(c *C) {
mock.ExpectQuery("SELECT (.) FROM `test`.`t` WHERE a > 3 and a < 9").WillReturnRows(rows)

mockWriter := newMockWriter()
err = dumpTable(context.Background(), mockConfig, db, "test", &TableInfo{Name: "t"}, mockWriter)
ctx := context.Background()
tableIRArray, err := dumpTable(ctx, mockConfig, conn, "test", &TableInfo{Name: "t"}, mockWriter)
c.Assert(err, IsNil)
for _, tableIR := range tableIRArray {
c.Assert(tableIR.Start(ctx, conn), IsNil)
c.Assert(mockWriter.WriteTableData(ctx, tableIR), IsNil)
}

c.Assert(mockWriter.tableMeta["test.t"], Equals, showCreateTableResult)
c.Assert(len(mockWriter.tableData), Equals, 1)
Expand Down
Loading

0 comments on commit 1406d9b

Please sign in to comment.