diff --git a/dumpling/v4/export/config.go b/dumpling/v4/export/config.go index bfa9bdf5..dc186837 100644 --- a/dumpling/v4/export/config.go +++ b/dumpling/v4/export/config.go @@ -62,7 +62,9 @@ const ( flagOutputFilenameTemplate = "output-filename-template" flagCompleteInsert = "complete-insert" flagParams = "params" - FlagHelp = "help" + flagReadTimeout = "read-timeout" + + FlagHelp = "help" ) type Config struct { @@ -104,6 +106,7 @@ type Config struct { Sql string CsvSeparator string CsvDelimiter string + ReadTimeout time.Duration TableFilter filter.Filter `json:"-"` Rows uint64 @@ -166,7 +169,10 @@ func (config *Config) String() string { // GetDSN generates DSN from Config func (conf *Config) GetDSN(db string) string { - dsn := fmt.Sprintf("%s:%s@tcp(%s:%d)/%s?charset=utf8mb4", conf.User, conf.Password, conf.Host, conf.Port, db) + // maxAllowedPacket=0 can be used to automatically fetch the max_allowed_packet variable from server on every connection. + // https://github.com/go-sql-driver/mysql#maxallowedpacket + dsn := fmt.Sprintf("%s:%s@tcp(%s:%d)/%s?charset=utf8mb4&readTimeout=%s&writeTimeout=30s&interpolateParams=true&maxAllowedPacket=0", + conf.User, conf.Password, conf.Host, conf.Port, db, conf.ReadTimeout) if len(conf.Security.CAPath) > 0 { dsn += "&tls=dumpling-tls-target" } @@ -222,6 +228,8 @@ func (conf *Config) DefineFlags(flags *pflag.FlagSet) { flags.Bool(flagCompleteInsert, false, "Use complete INSERT statements that include column names") flags.StringToString(flagParams, nil, `Extra session variables used while dumping, accepted format: --params "character_set_client=latin1,character_set_connection=latin1"`) flags.Bool(FlagHelp, false, "Print help message and quit") + flags.Duration(flagReadTimeout, 15*time.Minute, "I/O read timeout for db connection.") + flags.MarkHidden(flagReadTimeout) } // GetDSN generates DSN from Config @@ -355,6 +363,10 @@ func (conf *Config) ParseFromFlags(flags *pflag.FlagSet) error { if err != nil { return errors.Trace(err) } + conf.ReadTimeout, err = flags.GetDuration(flagReadTimeout) + if err != nil { + return errors.Trace(err) + } if conf.Threads <= 0 { return errors.Errorf("--threads is set to %d. It should be greater than 0", conf.Threads) diff --git a/dumpling/v4/export/connectionsPool.go b/dumpling/v4/export/connectionsPool.go index 764d9fe7..c803533f 100644 --- a/dumpling/v4/export/connectionsPool.go +++ b/dumpling/v4/export/connectionsPool.go @@ -13,15 +13,17 @@ type connectionsPool struct { func newConnectionsPool(ctx context.Context, n int, pool *sql.DB) (*connectionsPool, error) { connectPool := &connectionsPool{ conns: make(chan *sql.Conn, n), - createdConns: make([]*sql.Conn, 0, n), + createdConns: make([]*sql.Conn, 0, n+1), } - for i := 0; i < n; i++ { + for i := 0; i < n+1; i++ { conn, err := createConnWithConsistency(ctx, pool) if err != nil { connectPool.Close() return connectPool, err } - connectPool.releaseConn(conn) + if i != n { + connectPool.releaseConn(conn) + } connectPool.createdConns = append(connectPool.createdConns, conn) } return connectPool, nil @@ -31,6 +33,10 @@ func (r *connectionsPool) getConn() *sql.Conn { return <-r.conns } +func (r *connectionsPool) extraConn() *sql.Conn { + return r.createdConns[len(r.createdConns)-1] +} + func (r *connectionsPool) Close() error { var err error for _, conn := range r.createdConns { diff --git a/dumpling/v4/export/dump.go b/dumpling/v4/export/dump.go index 1d9eb73c..a5d18042 100755 --- a/dumpling/v4/export/dump.go +++ b/dumpling/v4/export/dump.go @@ -131,7 +131,11 @@ func Dump(pCtx context.Context, conf *Config) (err error) { m := newGlobalMetadata(conf.ExternalStorage) // write metadata even if dump failed - defer m.writeGlobalMetaData(ctx) + defer func() { + if err == nil { + m.writeGlobalMetaData(ctx) + } + }() // for consistency lock, we should lock tables at first to get the tables we want to lock & dump // for consistency lock, record meta pos before lock tables because other tables may still be modified while locking tables @@ -184,22 +188,17 @@ func Dump(pCtx context.Context, conf *Config) (err error) { defer connectPool.Close() if conf.PosAfterConnect { - conn := connectPool.getConn() // record again, to provide a location to exit safe mode for DM - err = m.recordGlobalMetaData(conn, conf.ServerInfo.ServerType, true, snapshot) + err = m.recordGlobalMetaData(connectPool.extraConn(), conf.ServerInfo.ServerType, true, snapshot) if err != nil { log.Info("get global metadata (after connection pool established) failed", zap.Error(err)) } - connectPool.releaseConn(conn) } if conf.Consistency != "lock" { - conn := connectPool.getConn() - if err = prepareTableListToDump(conf, conn); err != nil { - connectPool.releaseConn(conn) + if err = prepareTableListToDump(conf, connectPool.extraConn()); err != nil { return err } - connectPool.releaseConn(conn) } if err = conCtrl.TearDown(ctx); err != nil { @@ -240,9 +239,7 @@ func dumpDatabases(pCtx context.Context, conf *Config, connectPool *connectionsP allTables := conf.Tables g, ctx := errgroup.WithContext(pCtx) for dbName, tables := range allTables { - conn := connectPool.getConn() - createDatabaseSQL, err := ShowCreateDatabase(conn, dbName) - connectPool.releaseConn(conn) + createDatabaseSQL, err := ShowCreateDatabase(connectPool.extraConn(), dbName) if err != nil { return err } @@ -255,21 +252,19 @@ func dumpDatabases(pCtx context.Context, conf *Config, connectPool *connectionsP } for _, table := range tables { table := table - conn := connectPool.getConn() - tableDataIRArray, err := dumpTable(ctx, conf, conn, dbName, table, writer) - connectPool.releaseConn(conn) + tableDataIRArray, err := dumpTable(ctx, conf, connectPool.extraConn(), dbName, table, writer) if err != nil { return err } for _, tableIR := range tableDataIRArray { tableIR := tableIR g.Go(func() error { + conn := connectPool.getConn() + defer connectPool.releaseConn(conn) retryTime := 1 return utils.WithRetry(ctx, func() error { log.Debug("trying to dump table chunk", zap.Int("retryTime", retryTime), zap.String("db", tableIR.DatabaseName()), zap.String("table", tableIR.TableName()), zap.Int("chunkIndex", tableIR.ChunkIndex())) - conn := connectPool.getConn() - defer connectPool.releaseConn(conn) retryTime += 1 err := tableIR.Start(ctx, conn) if err != nil { @@ -308,9 +303,7 @@ func prepareTableListToDump(conf *Config, pool *sql.Conn) error { } func dumpSql(ctx context.Context, conf *Config, connectPool *connectionsPool, writer Writer) error { - conn := connectPool.getConn() - tableIR, err := SelectFromSql(conf, conn) - connectPool.releaseConn(conn) + tableIR, err := SelectFromSql(conf, connectPool.extraConn()) if err != nil { return err } diff --git a/dumpling/v4/export/dump_test.go b/dumpling/v4/export/dump_test.go index 8f9e542e..f80bae8b 100644 --- a/dumpling/v4/export/dump_test.go +++ b/dumpling/v4/export/dump_test.go @@ -36,6 +36,7 @@ func newMockConnectPool(c *C, db *sql.DB) *connectionsPool { c.Assert(err, IsNil) connectPool := &connectionsPool{conns: make(chan *sql.Conn, 1)} connectPool.releaseConn(conn) + connectPool.createdConns = []*sql.Conn{conn} return connectPool } diff --git a/dumpling/v4/export/ir.go b/dumpling/v4/export/ir.go index b2d44d95..a740a2b8 100644 --- a/dumpling/v4/export/ir.go +++ b/dumpling/v4/export/ir.go @@ -22,6 +22,7 @@ type TableDataIR interface { SpecialComments() StringIter Rows() SQLRowIter + Close() error } // SQLRowIter is the iterator on a collection of sql.Row. diff --git a/dumpling/v4/export/ir_impl.go b/dumpling/v4/export/ir_impl.go index e9937875..1eec6283 100644 --- a/dumpling/v4/export/ir_impl.go +++ b/dumpling/v4/export/ir_impl.go @@ -85,10 +85,13 @@ type tableData struct { selectedField string specCmts []string escapeBackslash bool + cancel context.CancelFunc SQLRowIter } -func (td *tableData) Start(ctx context.Context, conn *sql.Conn) error { +func (td *tableData) Start(pCtx context.Context, conn *sql.Conn) error { + var ctx context.Context + ctx, td.cancel = context.WithCancel(pCtx) rows, err := conn.QueryContext(ctx, td.query) if err != nil { return err @@ -137,6 +140,13 @@ func (td *tableData) Rows() SQLRowIter { return td.SQLRowIter } +func (td *tableData) Close() error { + if td.cancel != nil { + td.cancel() + } + return td.Rows().Close() +} + func (td *tableData) SelectedField() string { if td.selectedField == "*" || td.selectedField == "" { return td.selectedField @@ -187,7 +197,7 @@ func splitTableDataIntoChunks( return } if !smax.Valid || !smin.Valid { - // smax and smin are not valid, but there can also be data to dump, so just skip split chunk logic. + // smax and smin are not valid, but there can also be data to dump, so just skip split chunk logic. log.Debug("skip concurrent dump due to no valid smax or smin", zap.String("schema", dbName), zap.String("table", tableName)) linear <- struct{}{} return diff --git a/dumpling/v4/export/retry.go b/dumpling/v4/export/retry.go index 894d7042..e5b7a877 100644 --- a/dumpling/v4/export/retry.go +++ b/dumpling/v4/export/retry.go @@ -33,6 +33,10 @@ func (b *dumpChunkBackoffer) NextBackoff(err error) time.Duration { if _, ok := err.(*mysql.MySQLError); ok && !dbutil.IsRetryableError(err) { b.attempt = 0 return 0 + } else if _, ok := err.(*writerError); ok { + // the uploader writer's retry logic is already done in aws client. needn't retry here + b.attempt = 0 + return 0 } b.delayTime = 2 * b.delayTime b.attempt-- diff --git a/dumpling/v4/export/test_util.go b/dumpling/v4/export/test_util.go index d7300aa4..cc997283 100644 --- a/dumpling/v4/export/test_util.go +++ b/dumpling/v4/export/test_util.go @@ -146,6 +146,10 @@ func (m *mockTableIR) Rows() SQLRowIter { return m.SQLRowIter } +func (m *mockTableIR) Close() error { + return nil +} + func (m *mockTableIR) EscapeBackSlash() bool { return m.escapeBackSlash } diff --git a/dumpling/v4/export/writer.go b/dumpling/v4/export/writer.go index 4977cde2..267c0d17 100644 --- a/dumpling/v4/export/writer.go +++ b/dumpling/v4/export/writer.go @@ -63,29 +63,20 @@ func (f SimpleWriter) WriteViewMeta(ctx context.Context, db, view, createTableSQ type SQLWriter struct{ SimpleWriter } -func (f SQLWriter) WriteTableData(ctx context.Context, ir TableDataIR) error { +func (f SQLWriter) WriteTableData(ctx context.Context, ir TableDataIR) (err error) { log.Debug("start dumping table...", zap.String("table", ir.TableName())) - // just let `database.table.sql` be `database.table.0.sql` - /*if fileName == "" { - // set initial file name - fileName = fmt.Sprintf("%s.%s.sql", ir.DatabaseName(), ir.TableName()) - if f.cfg.FileSize != UnspecifiedSize { - fileName = fmt.Sprintf("%s.%s.%d.sql", ir.DatabaseName(), ir.TableName(), 0) - } - }*/ + defer ir.Close() namer := newOutputFileNamer(ir, f.cfg.Rows != UnspecifiedSize, f.cfg.FileSize != UnspecifiedSize) fileType := strings.ToLower(f.cfg.FileType) fileName, err := namer.NextName(f.cfg.OutputFileTemplate, fileType) if err != nil { return err } - chunksIter := ir - defer chunksIter.Rows().Close() for { fileWriter, tearDown := buildInterceptFileWriter(f.cfg.ExternalStorage, fileName) - err = WriteInsert(ctx, chunksIter, fileWriter, f.cfg.FileSize, f.cfg.StatementSize) + err = WriteInsert(ctx, ir, fileWriter, f.cfg.FileSize, f.cfg.StatementSize) tearDown(ctx) if err != nil { return err @@ -175,17 +166,16 @@ func (namer *outputFileNamer) NextName(tmpl *template.Template, fileType string) return res + "." + fileType, err } -func (f CSVWriter) WriteTableData(ctx context.Context, ir TableDataIR) error { +func (f CSVWriter) WriteTableData(ctx context.Context, ir TableDataIR) (err error) { log.Debug("start dumping table in csv format...", zap.String("table", ir.TableName())) + defer ir.Close() namer := newOutputFileNamer(ir, f.cfg.Rows != UnspecifiedSize, f.cfg.FileSize != UnspecifiedSize) fileType := strings.ToLower(f.cfg.FileType) fileName, err := namer.NextName(f.cfg.OutputFileTemplate, fileType) if err != nil { return err } - chunksIter := ir - defer chunksIter.Rows().Close() opt := &csvOption{ nullValue: f.cfg.CsvNullValue, @@ -195,7 +185,7 @@ func (f CSVWriter) WriteTableData(ctx context.Context, ir TableDataIR) error { for { fileWriter, tearDown := buildInterceptFileWriter(f.cfg.ExternalStorage, fileName) - err = WriteInsertInCsv(ctx, chunksIter, fileWriter, f.cfg.NoHeader, opt, f.cfg.FileSize) + err = WriteInsertInCsv(ctx, ir, fileWriter, f.cfg.NoHeader, opt, f.cfg.FileSize) tearDown(ctx) if err != nil { return err diff --git a/dumpling/v4/export/writer_util.go b/dumpling/v4/export/writer_util.go index 348d5023..4a24c6e1 100644 --- a/dumpling/v4/export/writer_util.go +++ b/dumpling/v4/export/writer_util.go @@ -389,13 +389,13 @@ func buildInterceptFileWriter(s storage.ExternalStorage, path string) (storage.W log.Error("open file failed", zap.String("path", fullPath), zap.Error(err)) - return err + return newWriterError(err) } w := storage.NewUploaderWriter(uploader, hardcodedS3ChunkSize) writer = w log.Debug("opened file", zap.String("path", fullPath)) fileWriter.Writer = writer - return err + return nil } fileWriter.initRoutine = initRoutine @@ -427,6 +427,21 @@ func (l *LazyStringWriter) WriteString(str string) (int, error) { return l.StringWriter.WriteString(str) } +type writerError struct { + error +} + +func (e *writerError) Error() string { + return e.error.Error() +} + +func newWriterError(err error) error { + if err == nil { + return nil + } + return &writerError{error: err} +} + // InterceptFileWriter is an interceptor of os.File, // tracking whether a StringWriter has written something. type InterceptFileWriter struct { @@ -446,7 +461,8 @@ func (w *InterceptFileWriter) Write(ctx context.Context, p []byte) (int, error) if w.err != nil { return 0, errors.Annotate(w.err, "open file error") } - return w.Writer.Write(ctx, p) + n, err := w.Writer.Write(ctx, p) + return n, newWriterError(err) } func (w *InterceptFileWriter) Close(ctx context.Context) error {