Skip to content
This repository has been archived by the owner on Aug 21, 2023. It is now read-only.

Commit

Permalink
avoid get blocked in dumping when mysql connection is broken (#190)
Browse files Browse the repository at this point in the history
* add extra conn to avoid get blocked at concurrently dumping

* add readTimeout/writeTimeout to avoid get blocked at fetching data from database server

* avoid wasting at finishing metadata when dumping is already failed

* add read-timeout parameter and mark it hidden
  • Loading branch information
lichunzhu authored Nov 13, 2020
1 parent 05f1c5d commit 552470f
Show file tree
Hide file tree
Showing 10 changed files with 82 additions and 45 deletions.
16 changes: 14 additions & 2 deletions v4/export/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,9 @@ const (
flagOutputFilenameTemplate = "output-filename-template"
flagCompleteInsert = "complete-insert"
flagParams = "params"
FlagHelp = "help"
flagReadTimeout = "read-timeout"

FlagHelp = "help"
)

type Config struct {
Expand Down Expand Up @@ -104,6 +106,7 @@ type Config struct {
Sql string
CsvSeparator string
CsvDelimiter string
ReadTimeout time.Duration

TableFilter filter.Filter `json:"-"`
Rows uint64
Expand Down Expand Up @@ -166,7 +169,10 @@ func (config *Config) String() string {

// GetDSN generates DSN from Config
func (conf *Config) GetDSN(db string) string {
dsn := fmt.Sprintf("%s:%s@tcp(%s:%d)/%s?charset=utf8mb4", conf.User, conf.Password, conf.Host, conf.Port, db)
// maxAllowedPacket=0 can be used to automatically fetch the max_allowed_packet variable from server on every connection.
// https://github.com/go-sql-driver/mysql#maxallowedpacket
dsn := fmt.Sprintf("%s:%s@tcp(%s:%d)/%s?charset=utf8mb4&readTimeout=%s&writeTimeout=30s&interpolateParams=true&maxAllowedPacket=0",
conf.User, conf.Password, conf.Host, conf.Port, db, conf.ReadTimeout)
if len(conf.Security.CAPath) > 0 {
dsn += "&tls=dumpling-tls-target"
}
Expand Down Expand Up @@ -222,6 +228,8 @@ func (conf *Config) DefineFlags(flags *pflag.FlagSet) {
flags.Bool(flagCompleteInsert, false, "Use complete INSERT statements that include column names")
flags.StringToString(flagParams, nil, `Extra session variables used while dumping, accepted format: --params "character_set_client=latin1,character_set_connection=latin1"`)
flags.Bool(FlagHelp, false, "Print help message and quit")
flags.Duration(flagReadTimeout, 15*time.Minute, "I/O read timeout for db connection.")
flags.MarkHidden(flagReadTimeout)
}

// GetDSN generates DSN from Config
Expand Down Expand Up @@ -355,6 +363,10 @@ func (conf *Config) ParseFromFlags(flags *pflag.FlagSet) error {
if err != nil {
return errors.Trace(err)
}
conf.ReadTimeout, err = flags.GetDuration(flagReadTimeout)
if err != nil {
return errors.Trace(err)
}

if conf.Threads <= 0 {
return errors.Errorf("--threads is set to %d. It should be greater than 0", conf.Threads)
Expand Down
12 changes: 9 additions & 3 deletions v4/export/connectionsPool.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,17 @@ type connectionsPool struct {
func newConnectionsPool(ctx context.Context, n int, pool *sql.DB) (*connectionsPool, error) {
connectPool := &connectionsPool{
conns: make(chan *sql.Conn, n),
createdConns: make([]*sql.Conn, 0, n),
createdConns: make([]*sql.Conn, 0, n+1),
}
for i := 0; i < n; i++ {
for i := 0; i < n+1; i++ {
conn, err := createConnWithConsistency(ctx, pool)
if err != nil {
connectPool.Close()
return connectPool, err
}
connectPool.releaseConn(conn)
if i != n {
connectPool.releaseConn(conn)
}
connectPool.createdConns = append(connectPool.createdConns, conn)
}
return connectPool, nil
Expand All @@ -31,6 +33,10 @@ func (r *connectionsPool) getConn() *sql.Conn {
return <-r.conns
}

func (r *connectionsPool) extraConn() *sql.Conn {
return r.createdConns[len(r.createdConns)-1]
}

func (r *connectionsPool) Close() error {
var err error
for _, conn := range r.createdConns {
Expand Down
31 changes: 12 additions & 19 deletions v4/export/dump.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,11 @@ func Dump(pCtx context.Context, conf *Config) (err error) {

m := newGlobalMetadata(conf.ExternalStorage)
// write metadata even if dump failed
defer m.writeGlobalMetaData(ctx)
defer func() {
if err == nil {
m.writeGlobalMetaData(ctx)
}
}()

// for consistency lock, we should lock tables at first to get the tables we want to lock & dump
// for consistency lock, record meta pos before lock tables because other tables may still be modified while locking tables
Expand Down Expand Up @@ -184,22 +188,17 @@ func Dump(pCtx context.Context, conf *Config) (err error) {
defer connectPool.Close()

if conf.PosAfterConnect {
conn := connectPool.getConn()
// record again, to provide a location to exit safe mode for DM
err = m.recordGlobalMetaData(conn, conf.ServerInfo.ServerType, true, snapshot)
err = m.recordGlobalMetaData(connectPool.extraConn(), conf.ServerInfo.ServerType, true, snapshot)
if err != nil {
log.Info("get global metadata (after connection pool established) failed", zap.Error(err))
}
connectPool.releaseConn(conn)
}

if conf.Consistency != "lock" {
conn := connectPool.getConn()
if err = prepareTableListToDump(conf, conn); err != nil {
connectPool.releaseConn(conn)
if err = prepareTableListToDump(conf, connectPool.extraConn()); err != nil {
return err
}
connectPool.releaseConn(conn)
}

if err = conCtrl.TearDown(ctx); err != nil {
Expand Down Expand Up @@ -240,9 +239,7 @@ func dumpDatabases(pCtx context.Context, conf *Config, connectPool *connectionsP
allTables := conf.Tables
g, ctx := errgroup.WithContext(pCtx)
for dbName, tables := range allTables {
conn := connectPool.getConn()
createDatabaseSQL, err := ShowCreateDatabase(conn, dbName)
connectPool.releaseConn(conn)
createDatabaseSQL, err := ShowCreateDatabase(connectPool.extraConn(), dbName)
if err != nil {
return err
}
Expand All @@ -255,21 +252,19 @@ func dumpDatabases(pCtx context.Context, conf *Config, connectPool *connectionsP
}
for _, table := range tables {
table := table
conn := connectPool.getConn()
tableDataIRArray, err := dumpTable(ctx, conf, conn, dbName, table, writer)
connectPool.releaseConn(conn)
tableDataIRArray, err := dumpTable(ctx, conf, connectPool.extraConn(), dbName, table, writer)
if err != nil {
return err
}
for _, tableIR := range tableDataIRArray {
tableIR := tableIR
g.Go(func() error {
conn := connectPool.getConn()
defer connectPool.releaseConn(conn)
retryTime := 1
return utils.WithRetry(ctx, func() error {
log.Debug("trying to dump table chunk", zap.Int("retryTime", retryTime), zap.String("db", tableIR.DatabaseName()),
zap.String("table", tableIR.TableName()), zap.Int("chunkIndex", tableIR.ChunkIndex()))
conn := connectPool.getConn()
defer connectPool.releaseConn(conn)
retryTime += 1
err := tableIR.Start(ctx, conn)
if err != nil {
Expand Down Expand Up @@ -308,9 +303,7 @@ func prepareTableListToDump(conf *Config, pool *sql.Conn) error {
}

func dumpSql(ctx context.Context, conf *Config, connectPool *connectionsPool, writer Writer) error {
conn := connectPool.getConn()
tableIR, err := SelectFromSql(conf, conn)
connectPool.releaseConn(conn)
tableIR, err := SelectFromSql(conf, connectPool.extraConn())
if err != nil {
return err
}
Expand Down
1 change: 1 addition & 0 deletions v4/export/dump_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ func newMockConnectPool(c *C, db *sql.DB) *connectionsPool {
c.Assert(err, IsNil)
connectPool := &connectionsPool{conns: make(chan *sql.Conn, 1)}
connectPool.releaseConn(conn)
connectPool.createdConns = []*sql.Conn{conn}
return connectPool
}

Expand Down
1 change: 1 addition & 0 deletions v4/export/ir.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ type TableDataIR interface {

SpecialComments() StringIter
Rows() SQLRowIter
Close() error
}

// SQLRowIter is the iterator on a collection of sql.Row.
Expand Down
14 changes: 12 additions & 2 deletions v4/export/ir_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,13 @@ type tableData struct {
selectedField string
specCmts []string
escapeBackslash bool
cancel context.CancelFunc
SQLRowIter
}

func (td *tableData) Start(ctx context.Context, conn *sql.Conn) error {
func (td *tableData) Start(pCtx context.Context, conn *sql.Conn) error {
var ctx context.Context
ctx, td.cancel = context.WithCancel(pCtx)
rows, err := conn.QueryContext(ctx, td.query)
if err != nil {
return err
Expand Down Expand Up @@ -137,6 +140,13 @@ func (td *tableData) Rows() SQLRowIter {
return td.SQLRowIter
}

func (td *tableData) Close() error {
if td.cancel != nil {
td.cancel()
}
return td.Rows().Close()
}

func (td *tableData) SelectedField() string {
if td.selectedField == "*" || td.selectedField == "" {
return td.selectedField
Expand Down Expand Up @@ -187,7 +197,7 @@ func splitTableDataIntoChunks(
return
}
if !smax.Valid || !smin.Valid {
// smax and smin are not valid, but there can also be data to dump, so just skip split chunk logic.
// smax and smin are not valid, but there can also be data to dump, so just skip split chunk logic.
log.Debug("skip concurrent dump due to no valid smax or smin", zap.String("schema", dbName), zap.String("table", tableName))
linear <- struct{}{}
return
Expand Down
4 changes: 4 additions & 0 deletions v4/export/retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ func (b *dumpChunkBackoffer) NextBackoff(err error) time.Duration {
if _, ok := err.(*mysql.MySQLError); ok && !dbutil.IsRetryableError(err) {
b.attempt = 0
return 0
} else if _, ok := err.(*writerError); ok {
// the uploader writer's retry logic is already done in aws client. needn't retry here
b.attempt = 0
return 0
}
b.delayTime = 2 * b.delayTime
b.attempt--
Expand Down
4 changes: 4 additions & 0 deletions v4/export/test_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,10 @@ func (m *mockTableIR) Rows() SQLRowIter {
return m.SQLRowIter
}

func (m *mockTableIR) Close() error {
return nil
}

func (m *mockTableIR) EscapeBackSlash() bool {
return m.escapeBackSlash
}
Expand Down
22 changes: 6 additions & 16 deletions v4/export/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,29 +63,20 @@ func (f SimpleWriter) WriteViewMeta(ctx context.Context, db, view, createTableSQ

type SQLWriter struct{ SimpleWriter }

func (f SQLWriter) WriteTableData(ctx context.Context, ir TableDataIR) error {
func (f SQLWriter) WriteTableData(ctx context.Context, ir TableDataIR) (err error) {
log.Debug("start dumping table...", zap.String("table", ir.TableName()))

// just let `database.table.sql` be `database.table.0.sql`
/*if fileName == "" {
// set initial file name
fileName = fmt.Sprintf("%s.%s.sql", ir.DatabaseName(), ir.TableName())
if f.cfg.FileSize != UnspecifiedSize {
fileName = fmt.Sprintf("%s.%s.%d.sql", ir.DatabaseName(), ir.TableName(), 0)
}
}*/
defer ir.Close()
namer := newOutputFileNamer(ir, f.cfg.Rows != UnspecifiedSize, f.cfg.FileSize != UnspecifiedSize)
fileType := strings.ToLower(f.cfg.FileType)
fileName, err := namer.NextName(f.cfg.OutputFileTemplate, fileType)
if err != nil {
return err
}
chunksIter := ir
defer chunksIter.Rows().Close()

for {
fileWriter, tearDown := buildInterceptFileWriter(f.cfg.ExternalStorage, fileName)
err = WriteInsert(ctx, chunksIter, fileWriter, f.cfg.FileSize, f.cfg.StatementSize)
err = WriteInsert(ctx, ir, fileWriter, f.cfg.FileSize, f.cfg.StatementSize)
tearDown(ctx)
if err != nil {
return err
Expand Down Expand Up @@ -175,17 +166,16 @@ func (namer *outputFileNamer) NextName(tmpl *template.Template, fileType string)
return res + "." + fileType, err
}

func (f CSVWriter) WriteTableData(ctx context.Context, ir TableDataIR) error {
func (f CSVWriter) WriteTableData(ctx context.Context, ir TableDataIR) (err error) {
log.Debug("start dumping table in csv format...", zap.String("table", ir.TableName()))

defer ir.Close()
namer := newOutputFileNamer(ir, f.cfg.Rows != UnspecifiedSize, f.cfg.FileSize != UnspecifiedSize)
fileType := strings.ToLower(f.cfg.FileType)
fileName, err := namer.NextName(f.cfg.OutputFileTemplate, fileType)
if err != nil {
return err
}
chunksIter := ir
defer chunksIter.Rows().Close()

opt := &csvOption{
nullValue: f.cfg.CsvNullValue,
Expand All @@ -195,7 +185,7 @@ func (f CSVWriter) WriteTableData(ctx context.Context, ir TableDataIR) error {

for {
fileWriter, tearDown := buildInterceptFileWriter(f.cfg.ExternalStorage, fileName)
err = WriteInsertInCsv(ctx, chunksIter, fileWriter, f.cfg.NoHeader, opt, f.cfg.FileSize)
err = WriteInsertInCsv(ctx, ir, fileWriter, f.cfg.NoHeader, opt, f.cfg.FileSize)
tearDown(ctx)
if err != nil {
return err
Expand Down
22 changes: 19 additions & 3 deletions v4/export/writer_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -389,13 +389,13 @@ func buildInterceptFileWriter(s storage.ExternalStorage, path string) (storage.W
log.Error("open file failed",
zap.String("path", fullPath),
zap.Error(err))
return err
return newWriterError(err)
}
w := storage.NewUploaderWriter(uploader, hardcodedS3ChunkSize)
writer = w
log.Debug("opened file", zap.String("path", fullPath))
fileWriter.Writer = writer
return err
return nil
}
fileWriter.initRoutine = initRoutine

Expand Down Expand Up @@ -427,6 +427,21 @@ func (l *LazyStringWriter) WriteString(str string) (int, error) {
return l.StringWriter.WriteString(str)
}

type writerError struct {
error
}

func (e *writerError) Error() string {
return e.error.Error()
}

func newWriterError(err error) error {
if err == nil {
return nil
}
return &writerError{error: err}
}

// InterceptFileWriter is an interceptor of os.File,
// tracking whether a StringWriter has written something.
type InterceptFileWriter struct {
Expand All @@ -446,7 +461,8 @@ func (w *InterceptFileWriter) Write(ctx context.Context, p []byte) (int, error)
if w.err != nil {
return 0, errors.Annotate(w.err, "open file error")
}
return w.Writer.Write(ctx, p)
n, err := w.Writer.Write(ctx, p)
return n, newWriterError(err)
}

func (w *InterceptFileWriter) Close(ctx context.Context) error {
Expand Down

0 comments on commit 552470f

Please sign in to comment.