Skip to content
This repository has been archived by the owner on Aug 21, 2023. It is now read-only.

Add more log and compress option #202

Merged
merged 8 commits into from
Nov 20, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions cmd/dumpling/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
_ "net/http/pprof"
"os"

"github.com/prometheus/client_golang/prometheus"
"github.com/spf13/pflag"
"go.uber.org/zap"

Expand Down Expand Up @@ -56,6 +57,11 @@ func main() {
os.Exit(1)
}

registry := prometheus.NewRegistry()
registry.MustRegister(prometheus.NewProcessCollector(prometheus.ProcessCollectorOpts{}))
registry.MustRegister(prometheus.NewGoCollector())
export.RegisterMetrics(registry)
prometheus.DefaultGatherer = registry
err = export.Dump(context.Background(), conf)
if err != nil {
log.Error("dump failed error stack info", zap.Error(err))
Expand Down
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,13 @@ require (
github.com/go-sql-driver/mysql v1.5.0
github.com/mattn/go-colorable v0.1.7 // indirect
github.com/mattn/go-runewidth v0.0.9 // indirect
github.com/pingcap/br v0.0.0-20201027124415-c2ed897feada
github.com/pingcap/br v0.0.0-20201119111016-600102357a27
github.com/pingcap/check v0.0.0-20200212061837-5e12011dc712
github.com/pingcap/errors v0.11.5-0.20200917111840-a15ef68f753d
github.com/pingcap/failpoint v0.0.0-20200702092429-9f69995143ce
github.com/pingcap/log v0.0.0-20200828042413-fce0951f1463
github.com/pingcap/tidb-tools v4.0.8-0.20200927084250-e47e0e12c7f3+incompatible
github.com/prometheus/client_golang v1.5.1
github.com/soheilhy/cmux v0.1.4
github.com/spf13/pflag v1.0.5
github.com/tikv/pd v1.1.0-beta.0.20200910042021-254d1345be09
Expand Down
78 changes: 76 additions & 2 deletions go.sum

Large diffs are not rendered by default.

8 changes: 8 additions & 0 deletions tests/naughty_strings/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,14 @@ run_sql_file "$DUMPLING_BASE_NAME/data/naughty_strings.t.sql"
run_dumpling --escape-backslash=false
# FIXME should compare the schemas too, but they differ too much among MySQL versions.
diff "$DUMPLING_BASE_NAME/expect/naughty_strings.t.sql" "$DUMPLING_OUTPUT_DIR/naughty_strings.t.000000000.sql"

# run with compress option
rm "$DUMPLING_OUTPUT_DIR/naughty_strings.t.000000000.sql"
run_dumpling --escape-backslash=false --compress "gzip"
file_should_exist "$DUMPLING_OUTPUT_DIR/naughty_strings.t.000000000.sql.gz"
gzip "$DUMPLING_OUTPUT_DIR/naughty_strings.t.000000000.sql.gz" -d
diff "$DUMPLING_BASE_NAME/expect/naughty_strings.t.sql" "$DUMPLING_OUTPUT_DIR/naughty_strings.t.000000000.sql"

run_sql_file "$DUMPLING_BASE_NAME/data/naughty_strings.escape-schema.sql"
run_sql_file "$DUMPLING_BASE_NAME/data/naughty_strings.escape.sql"
run_dumpling --escape-backslash=true
Expand Down
25 changes: 25 additions & 0 deletions v4/export/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/pingcap/br/pkg/storage"
"github.com/pingcap/errors"
filter "github.com/pingcap/tidb-tools/pkg/table-filter"
"github.com/prometheus/client_golang/prometheus"
"github.com/spf13/pflag"
"go.uber.org/zap"

Expand Down Expand Up @@ -64,6 +65,7 @@ const (
flagParams = "params"
flagReadTimeout = "read-timeout"
flagTransactionalConsistency = "transactional-consistency"
flagCompress = "compress"

FlagHelp = "help"
)
Expand Down Expand Up @@ -108,6 +110,7 @@ type Config struct {
CsvSeparator string
CsvDelimiter string
ReadTimeout time.Duration
CompressType storage.CompressType

TableFilter filter.Filter `json:"-"`
Rows uint64
Expand All @@ -121,6 +124,7 @@ type Config struct {
SessionParams map[string]interface{}

PosAfterConnect bool
Labels prometheus.Labels `json:"-"`

ExternalStorage storage.ExternalStorage `json:"-"`
}
Expand Down Expand Up @@ -233,6 +237,7 @@ func (conf *Config) DefineFlags(flags *pflag.FlagSet) {
flags.Duration(flagReadTimeout, 15*time.Minute, "I/O read timeout for db connection.")
flags.MarkHidden(flagReadTimeout)
flags.Bool(flagTransactionalConsistency, true, "Only support transactional consistency")
flags.StringP(flagCompress, "c", "", "Compress output file type, support 'gzip', 'no-compression' now")
}

// GetDSN generates DSN from Config
Expand Down Expand Up @@ -437,6 +442,15 @@ func (conf *Config) ParseFromFlags(flags *pflag.FlagSet) error {
}
conf.OutputFileTemplate = tmpl

compressType, err := flags.GetString(flagCompress)
if err != nil {
return errors.Trace(err)
}
conf.CompressType, err = ParseCompressType(compressType)
if err != nil {
return errors.Trace(err)
}

for k, v := range params {
conf.SessionParams[k] = v
}
Expand Down Expand Up @@ -483,6 +497,17 @@ func ParseTableFilter(tablesList, filters []string) (filter.Filter, error) {
return filter.NewTablesFilter(tableNames...), nil
}

func ParseCompressType(compressType string) (storage.CompressType, error) {
switch compressType {
case "", "no-compression":
return storage.NoCompression, nil
case "gzip", "gz":
return storage.Gzip, nil
default:
return storage.NoCompression, errors.Errorf("unknown compress type %s", compressType)
}
}

func (config *Config) createExternalStorage(ctx context.Context) (storage.ExternalStorage, error) {
b, err := storage.ParseBackend(config.OutputDirPath, &config.BackendOptions)
if err != nil {
Expand Down
100 changes: 68 additions & 32 deletions v4/export/dump.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/pingcap/dumpling/v4/log"

_ "github.com/go-sql-driver/mysql"
"github.com/pingcap/br/pkg/summary"
"github.com/pingcap/br/pkg/utils"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
Expand Down Expand Up @@ -43,6 +44,7 @@ func Dump(pCtx context.Context, conf *Config) (err error) {
return errors.Trace(err)
}
resolveAutoConsistency(conf)
log.Info("finish config adjustment", zap.String("config", conf.String()))

ctx, cancel := context.WithCancel(pCtx)
defer cancel()
Expand Down Expand Up @@ -226,6 +228,9 @@ func Dump(pCtx context.Context, conf *Config) (err error) {
return errors.Errorf("unsupported filetype %s", conf.FileType)
}

summary.SetLogCollector(summary.NewLogCollector(log.Info))
summary.SetUnit(summary.BackupUnit)
defer summary.Summary(summary.BackupUnit)
if conf.Sql == "" {
if err = dumpDatabases(ctx, conf, connectPool, writer, func(conn *sql.Conn) (*sql.Conn, error) {
// make sure that the lock connection is still alive
Expand Down Expand Up @@ -257,13 +262,16 @@ func Dump(pCtx context.Context, conf *Config) (err error) {
}
}

summary.SetSuccessStatus(true)
m.recordFinishTime(time.Now())
return nil
}

func dumpDatabases(pCtx context.Context, conf *Config, connectPool *connectionsPool, writer Writer, rebuildConnFunc func(*sql.Conn) (*sql.Conn, error)) error {
allTables := conf.Tables
g, ctx := errgroup.WithContext(pCtx)
tableDataIRTotal := make([]TableDataIR, 0, len(allTables))
splitChunkStart := time.Now()
for dbName, tables := range allTables {
createDatabaseSQL, err := ShowCreateDatabase(connectPool.extraConn(), dbName)
if err != nil {
Expand All @@ -282,39 +290,55 @@ func dumpDatabases(pCtx context.Context, conf *Config, connectPool *connectionsP
if err != nil {
return err
}
for _, tableIR := range tableDataIRArray {
tableIR := tableIR
g.Go(func() error {
conn := connectPool.getConn()
defer func() {
connectPool.releaseConn(conn)
}()
retryTime := 0
var lastErr error
return utils.WithRetry(ctx, func() (err error) {
defer func() {
lastErr = err
}()
retryTime += 1
log.Debug("trying to dump table chunk", zap.Int("retryTime", retryTime), zap.String("db", tableIR.DatabaseName()),
zap.String("table", tableIR.TableName()), zap.Int("chunkIndex", tableIR.ChunkIndex()), zap.NamedError("lastError", lastErr))
if retryTime > 1 {
conn, err = rebuildConnFunc(conn)
if err != nil {
return
}
}
err = tableIR.Start(ctx, conn)
if err != nil {
return
}
return writer.WriteTableData(ctx, tableIR)
}, newDumpChunkBackoffer(canRebuildConn(conf.Consistency, conf.TransactionalConsistency)))
})
}
tableDataIRTotal = append(tableDataIRTotal, tableDataIRArray...)
}
}
return g.Wait()
summary.CollectDuration("split chunks", time.Since(splitChunkStart))
progressPrinter := utils.StartProgress(ctx, "dumpling", int64(len(tableDataIRTotal)), shouldRedirectLog(conf), log.Info)
defer progressPrinter.Close()
tableDataStartTime := time.Now()
for _, tableIR := range tableDataIRTotal {
tableIR := tableIR
g.Go(func() error {
conn := connectPool.getConn()
defer func() {
connectPool.releaseConn(conn)
}()
retryTime := 0
var lastErr error
return utils.WithRetry(ctx, func() (err error) {
defer func() {
lastErr = err
if err == nil {
progressPrinter.Inc()
} else {
errorCount.With(conf.Labels).Inc()
}
}()
retryTime += 1
log.Debug("trying to dump table chunk", zap.Int("retryTime", retryTime), zap.String("db", tableIR.DatabaseName()),
zap.String("table", tableIR.TableName()), zap.Int("chunkIndex", tableIR.ChunkIndex()), zap.NamedError("lastError", lastErr))
if retryTime > 1 {
conn, err = rebuildConnFunc(conn)
if err != nil {
return
}
}
err = tableIR.Start(ctx, conn)
if err != nil {
return
}
return writer.WriteTableData(ctx, tableIR)
}, newDumpChunkBackoffer(canRebuildConn(conf.Consistency, conf.TransactionalConsistency)))
})
}
if err := g.Wait(); err != nil {
summary.CollectFailureUnit("dump", err)
return err
} else {
summary.CollectSuccessUnit("dump cost", len(tableDataIRTotal), time.Since(tableDataStartTime))
}
return nil
}

func prepareTableListToDump(conf *Config, pool *sql.Conn) error {
Expand Down Expand Up @@ -346,7 +370,15 @@ func dumpSql(ctx context.Context, conf *Config, connectPool *connectionsPool, wr
return err
}

return writer.WriteTableData(ctx, tableIR)
tableDataStartTime := time.Now()
err = writer.WriteTableData(ctx, tableIR)
if err != nil {
summary.CollectFailureUnit("dump", err)
return err
} else {
summary.CollectSuccessUnit("dump cost", 1, time.Since(tableDataStartTime))
}
return nil
}

func dumpTable(ctx context.Context, conf *Config, db *sql.Conn, dbName string, table *TableInfo, writer Writer) ([]TableDataIR, error) {
Expand Down Expand Up @@ -462,3 +494,7 @@ func canRebuildConn(consistency string, trxConsistencyOnly bool) bool {
return false
}
}

func shouldRedirectLog(conf *Config) bool {
return conf.Logger != nil || conf.LogFile != ""
}
2 changes: 2 additions & 0 deletions v4/export/http_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/pingcap/dumpling/v4/log"

"github.com/pingcap/errors"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/soheilhy/cmux"
"go.uber.org/zap"
)
Expand All @@ -20,6 +21,7 @@ var (

func startHTTPServer(lis net.Listener) {
router := http.NewServeMux()
router.Handle("/metrics", promhttp.Handler())

router.HandleFunc("/debug/pprof/", pprof.Index)
router.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline)
Expand Down
2 changes: 1 addition & 1 deletion v4/export/ir_impl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func (s *testIRImplSuite) TestChunkRowIter(c *C) {
sqlRowIter := newRowIter(rows, 2)

res := newSimpleRowReceiver(2)
wp := newWriterPipe(nil, testFileSize, testStatementSize)
wp := newWriterPipe(nil, testFileSize, testStatementSize, nil)

var resSize [][]uint64
for sqlRowIter.HasNext() {
Expand Down
3 changes: 2 additions & 1 deletion v4/export/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,8 @@ func recordGlobalMetaData(db *sql.Conn, buffer *bytes.Buffer, serverType ServerT
}

func (m *globalMetadata) writeGlobalMetaData(ctx context.Context) error {
fileWriter, tearDown, err := buildFileWriter(ctx, m.storage, metadataPath)
// keep consistent with mydumper. Never compress metadata
fileWriter, tearDown, err := buildFileWriter(ctx, m.storage, metadataPath, storage.NoCompression)
if err != nil {
return err
}
Expand Down
62 changes: 62 additions & 0 deletions v4/export/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package export

import (
"github.com/prometheus/client_golang/prometheus"
)

var (
finishedSizeCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "dumpling",
Subsystem: "dump",
Name: "finished_size",
Help: "counter for dumpling finished file size",
}, []string{})
finishedRowsCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "dumpling",
Subsystem: "dump",
Name: "finished_rows",
Help: "counter for dumpling finished rows",
}, []string{})
writeTimeHistogram = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "dumpling",
Subsystem: "write",
Name: "write_duration_time",
Help: "Bucketed histogram of write time (s) of files",
Buckets: prometheus.ExponentialBuckets(0.00005, 2, 20),
}, []string{})
receiveWriteChunkTimeHistogram = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "dumpling",
Subsystem: "write",
Name: "receive_chunk_duration_time",
Help: "Bucketed histogram of write time (s) of files",
Buckets: prometheus.ExponentialBuckets(0.00005, 2, 20),
}, []string{})
errorCount = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "dumpling",
Subsystem: "dump",
Name: "error_count",
Help: "Total error count during dumping progress",
}, []string{})
)

// RegisterMetrics registers metrics.
func RegisterMetrics(registry *prometheus.Registry) {
registry.MustRegister(finishedSizeCounter)
registry.MustRegister(finishedRowsCounter)
registry.MustRegister(writeTimeHistogram)
registry.MustRegister(receiveWriteChunkTimeHistogram)
registry.MustRegister(errorCount)
}

func RemoveLabelValuesWithTaskInMetrics(labels prometheus.Labels) {
finishedSizeCounter.Delete(labels)
finishedRowsCounter.Delete(labels)
writeTimeHistogram.Delete(labels)
receiveWriteChunkTimeHistogram.Delete(labels)
errorCount.Delete(labels)
}
2 changes: 1 addition & 1 deletion v4/export/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -630,7 +630,7 @@ func pickupPossibleField(dbName, tableName string, db *sql.Conn, conf *Config) (
}

func estimateCount(dbName, tableName string, db *sql.Conn, field string, conf *Config) uint64 {
query := fmt.Sprintf("EXPLAIN SELECT `%s` FROM `%s`.`%s`", field, escapeString(dbName), escapeString(tableName))
query := fmt.Sprintf("EXPLAIN SELECT `%s` FROM `%s`.`%s`", escapeString(field), escapeString(dbName), escapeString(tableName))

if conf.Where != "" {
query += " WHERE "
Expand Down
Loading