Skip to content

Commit

Permalink
Add more log and compress option (pingcap#202)
Browse files Browse the repository at this point in the history
* add percentage log
* add summary
* redirect log to dumpling's logger
* add --compress option and relative integration test
* add prometheus metrics in dumpling
  • Loading branch information
lichunzhu authored Nov 20, 2020
1 parent efdde37 commit 68d86e9
Show file tree
Hide file tree
Showing 14 changed files with 341 additions and 75 deletions.
6 changes: 6 additions & 0 deletions dumpling/cmd/dumpling/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
_ "net/http/pprof"
"os"

"github.com/prometheus/client_golang/prometheus"
"github.com/spf13/pflag"
"go.uber.org/zap"

Expand Down Expand Up @@ -56,6 +57,11 @@ func main() {
os.Exit(1)
}

registry := prometheus.NewRegistry()
registry.MustRegister(prometheus.NewProcessCollector(prometheus.ProcessCollectorOpts{}))
registry.MustRegister(prometheus.NewGoCollector())
export.RegisterMetrics(registry)
prometheus.DefaultGatherer = registry
err = export.Dump(context.Background(), conf)
if err != nil {
log.Error("dump failed error stack info", zap.Error(err))
Expand Down
3 changes: 2 additions & 1 deletion dumpling/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,13 @@ require (
github.com/go-sql-driver/mysql v1.5.0
github.com/mattn/go-colorable v0.1.7 // indirect
github.com/mattn/go-runewidth v0.0.9 // indirect
github.com/pingcap/br v0.0.0-20201027124415-c2ed897feada
github.com/pingcap/br v0.0.0-20201119111016-600102357a27
github.com/pingcap/check v0.0.0-20200212061837-5e12011dc712
github.com/pingcap/errors v0.11.5-0.20200917111840-a15ef68f753d
github.com/pingcap/failpoint v0.0.0-20200702092429-9f69995143ce
github.com/pingcap/log v0.0.0-20200828042413-fce0951f1463
github.com/pingcap/tidb-tools v4.0.8-0.20200927084250-e47e0e12c7f3+incompatible
github.com/prometheus/client_golang v1.5.1
github.com/soheilhy/cmux v0.1.4
github.com/spf13/pflag v1.0.5
github.com/tikv/pd v1.1.0-beta.0.20200910042021-254d1345be09
Expand Down
78 changes: 76 additions & 2 deletions dumpling/go.sum

Large diffs are not rendered by default.

8 changes: 8 additions & 0 deletions dumpling/tests/naughty_strings/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,14 @@ run_sql_file "$DUMPLING_BASE_NAME/data/naughty_strings.t.sql"
run_dumpling --escape-backslash=false
# FIXME should compare the schemas too, but they differ too much among MySQL versions.
diff "$DUMPLING_BASE_NAME/expect/naughty_strings.t.sql" "$DUMPLING_OUTPUT_DIR/naughty_strings.t.000000000.sql"

# run with compress option
rm "$DUMPLING_OUTPUT_DIR/naughty_strings.t.000000000.sql"
run_dumpling --escape-backslash=false --compress "gzip"
file_should_exist "$DUMPLING_OUTPUT_DIR/naughty_strings.t.000000000.sql.gz"
gzip "$DUMPLING_OUTPUT_DIR/naughty_strings.t.000000000.sql.gz" -d
diff "$DUMPLING_BASE_NAME/expect/naughty_strings.t.sql" "$DUMPLING_OUTPUT_DIR/naughty_strings.t.000000000.sql"

run_sql_file "$DUMPLING_BASE_NAME/data/naughty_strings.escape-schema.sql"
run_sql_file "$DUMPLING_BASE_NAME/data/naughty_strings.escape.sql"
run_dumpling --escape-backslash=true
Expand Down
25 changes: 25 additions & 0 deletions dumpling/v4/export/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/pingcap/br/pkg/storage"
"github.com/pingcap/errors"
filter "github.com/pingcap/tidb-tools/pkg/table-filter"
"github.com/prometheus/client_golang/prometheus"
"github.com/spf13/pflag"
"go.uber.org/zap"

Expand Down Expand Up @@ -64,6 +65,7 @@ const (
flagParams = "params"
flagReadTimeout = "read-timeout"
flagTransactionalConsistency = "transactional-consistency"
flagCompress = "compress"

FlagHelp = "help"
)
Expand Down Expand Up @@ -108,6 +110,7 @@ type Config struct {
CsvSeparator string
CsvDelimiter string
ReadTimeout time.Duration
CompressType storage.CompressType

TableFilter filter.Filter `json:"-"`
Rows uint64
Expand All @@ -121,6 +124,7 @@ type Config struct {
SessionParams map[string]interface{}

PosAfterConnect bool
Labels prometheus.Labels `json:"-"`

ExternalStorage storage.ExternalStorage `json:"-"`
}
Expand Down Expand Up @@ -233,6 +237,7 @@ func (conf *Config) DefineFlags(flags *pflag.FlagSet) {
flags.Duration(flagReadTimeout, 15*time.Minute, "I/O read timeout for db connection.")
flags.MarkHidden(flagReadTimeout)
flags.Bool(flagTransactionalConsistency, true, "Only support transactional consistency")
flags.StringP(flagCompress, "c", "", "Compress output file type, support 'gzip', 'no-compression' now")
}

// GetDSN generates DSN from Config
Expand Down Expand Up @@ -437,6 +442,15 @@ func (conf *Config) ParseFromFlags(flags *pflag.FlagSet) error {
}
conf.OutputFileTemplate = tmpl

compressType, err := flags.GetString(flagCompress)
if err != nil {
return errors.Trace(err)
}
conf.CompressType, err = ParseCompressType(compressType)
if err != nil {
return errors.Trace(err)
}

for k, v := range params {
conf.SessionParams[k] = v
}
Expand Down Expand Up @@ -483,6 +497,17 @@ func ParseTableFilter(tablesList, filters []string) (filter.Filter, error) {
return filter.NewTablesFilter(tableNames...), nil
}

func ParseCompressType(compressType string) (storage.CompressType, error) {
switch compressType {
case "", "no-compression":
return storage.NoCompression, nil
case "gzip", "gz":
return storage.Gzip, nil
default:
return storage.NoCompression, errors.Errorf("unknown compress type %s", compressType)
}
}

func (config *Config) createExternalStorage(ctx context.Context) (storage.ExternalStorage, error) {
b, err := storage.ParseBackend(config.OutputDirPath, &config.BackendOptions)
if err != nil {
Expand Down
100 changes: 68 additions & 32 deletions dumpling/v4/export/dump.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/pingcap/dumpling/v4/log"

_ "github.com/go-sql-driver/mysql"
"github.com/pingcap/br/pkg/summary"
"github.com/pingcap/br/pkg/utils"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
Expand Down Expand Up @@ -43,6 +44,7 @@ func Dump(pCtx context.Context, conf *Config) (err error) {
return errors.Trace(err)
}
resolveAutoConsistency(conf)
log.Info("finish config adjustment", zap.String("config", conf.String()))

ctx, cancel := context.WithCancel(pCtx)
defer cancel()
Expand Down Expand Up @@ -226,6 +228,9 @@ func Dump(pCtx context.Context, conf *Config) (err error) {
return errors.Errorf("unsupported filetype %s", conf.FileType)
}

summary.SetLogCollector(summary.NewLogCollector(log.Info))
summary.SetUnit(summary.BackupUnit)
defer summary.Summary(summary.BackupUnit)
if conf.Sql == "" {
if err = dumpDatabases(ctx, conf, connectPool, writer, func(conn *sql.Conn) (*sql.Conn, error) {
// make sure that the lock connection is still alive
Expand Down Expand Up @@ -257,13 +262,16 @@ func Dump(pCtx context.Context, conf *Config) (err error) {
}
}

summary.SetSuccessStatus(true)
m.recordFinishTime(time.Now())
return nil
}

func dumpDatabases(pCtx context.Context, conf *Config, connectPool *connectionsPool, writer Writer, rebuildConnFunc func(*sql.Conn) (*sql.Conn, error)) error {
allTables := conf.Tables
g, ctx := errgroup.WithContext(pCtx)
tableDataIRTotal := make([]TableDataIR, 0, len(allTables))
splitChunkStart := time.Now()
for dbName, tables := range allTables {
createDatabaseSQL, err := ShowCreateDatabase(connectPool.extraConn(), dbName)
if err != nil {
Expand All @@ -282,39 +290,55 @@ func dumpDatabases(pCtx context.Context, conf *Config, connectPool *connectionsP
if err != nil {
return err
}
for _, tableIR := range tableDataIRArray {
tableIR := tableIR
g.Go(func() error {
conn := connectPool.getConn()
defer func() {
connectPool.releaseConn(conn)
}()
retryTime := 0
var lastErr error
return utils.WithRetry(ctx, func() (err error) {
defer func() {
lastErr = err
}()
retryTime += 1
log.Debug("trying to dump table chunk", zap.Int("retryTime", retryTime), zap.String("db", tableIR.DatabaseName()),
zap.String("table", tableIR.TableName()), zap.Int("chunkIndex", tableIR.ChunkIndex()), zap.NamedError("lastError", lastErr))
if retryTime > 1 {
conn, err = rebuildConnFunc(conn)
if err != nil {
return
}
}
err = tableIR.Start(ctx, conn)
if err != nil {
return
}
return writer.WriteTableData(ctx, tableIR)
}, newDumpChunkBackoffer(canRebuildConn(conf.Consistency, conf.TransactionalConsistency)))
})
}
tableDataIRTotal = append(tableDataIRTotal, tableDataIRArray...)
}
}
return g.Wait()
summary.CollectDuration("split chunks", time.Since(splitChunkStart))
progressPrinter := utils.StartProgress(ctx, "dumpling", int64(len(tableDataIRTotal)), shouldRedirectLog(conf), log.Info)
defer progressPrinter.Close()
tableDataStartTime := time.Now()
for _, tableIR := range tableDataIRTotal {
tableIR := tableIR
g.Go(func() error {
conn := connectPool.getConn()
defer func() {
connectPool.releaseConn(conn)
}()
retryTime := 0
var lastErr error
return utils.WithRetry(ctx, func() (err error) {
defer func() {
lastErr = err
if err == nil {
progressPrinter.Inc()
} else {
errorCount.With(conf.Labels).Inc()
}
}()
retryTime += 1
log.Debug("trying to dump table chunk", zap.Int("retryTime", retryTime), zap.String("db", tableIR.DatabaseName()),
zap.String("table", tableIR.TableName()), zap.Int("chunkIndex", tableIR.ChunkIndex()), zap.NamedError("lastError", lastErr))
if retryTime > 1 {
conn, err = rebuildConnFunc(conn)
if err != nil {
return
}
}
err = tableIR.Start(ctx, conn)
if err != nil {
return
}
return writer.WriteTableData(ctx, tableIR)
}, newDumpChunkBackoffer(canRebuildConn(conf.Consistency, conf.TransactionalConsistency)))
})
}
if err := g.Wait(); err != nil {
summary.CollectFailureUnit("dump", err)
return err
} else {
summary.CollectSuccessUnit("dump cost", len(tableDataIRTotal), time.Since(tableDataStartTime))
}
return nil
}

func prepareTableListToDump(conf *Config, pool *sql.Conn) error {
Expand Down Expand Up @@ -346,7 +370,15 @@ func dumpSql(ctx context.Context, conf *Config, connectPool *connectionsPool, wr
return err
}

return writer.WriteTableData(ctx, tableIR)
tableDataStartTime := time.Now()
err = writer.WriteTableData(ctx, tableIR)
if err != nil {
summary.CollectFailureUnit("dump", err)
return err
} else {
summary.CollectSuccessUnit("dump cost", 1, time.Since(tableDataStartTime))
}
return nil
}

func dumpTable(ctx context.Context, conf *Config, db *sql.Conn, dbName string, table *TableInfo, writer Writer) ([]TableDataIR, error) {
Expand Down Expand Up @@ -462,3 +494,7 @@ func canRebuildConn(consistency string, trxConsistencyOnly bool) bool {
return false
}
}

func shouldRedirectLog(conf *Config) bool {
return conf.Logger != nil || conf.LogFile != ""
}
2 changes: 2 additions & 0 deletions dumpling/v4/export/http_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/pingcap/dumpling/v4/log"

"github.com/pingcap/errors"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/soheilhy/cmux"
"go.uber.org/zap"
)
Expand All @@ -20,6 +21,7 @@ var (

func startHTTPServer(lis net.Listener) {
router := http.NewServeMux()
router.Handle("/metrics", promhttp.Handler())

router.HandleFunc("/debug/pprof/", pprof.Index)
router.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline)
Expand Down
2 changes: 1 addition & 1 deletion dumpling/v4/export/ir_impl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func (s *testIRImplSuite) TestChunkRowIter(c *C) {
sqlRowIter := newRowIter(rows, 2)

res := newSimpleRowReceiver(2)
wp := newWriterPipe(nil, testFileSize, testStatementSize)
wp := newWriterPipe(nil, testFileSize, testStatementSize, nil)

var resSize [][]uint64
for sqlRowIter.HasNext() {
Expand Down
3 changes: 2 additions & 1 deletion dumpling/v4/export/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,8 @@ func recordGlobalMetaData(db *sql.Conn, buffer *bytes.Buffer, serverType ServerT
}

func (m *globalMetadata) writeGlobalMetaData(ctx context.Context) error {
fileWriter, tearDown, err := buildFileWriter(ctx, m.storage, metadataPath)
// keep consistent with mydumper. Never compress metadata
fileWriter, tearDown, err := buildFileWriter(ctx, m.storage, metadataPath, storage.NoCompression)
if err != nil {
return err
}
Expand Down
62 changes: 62 additions & 0 deletions dumpling/v4/export/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package export

import (
"github.com/prometheus/client_golang/prometheus"
)

var (
finishedSizeCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "dumpling",
Subsystem: "dump",
Name: "finished_size",
Help: "counter for dumpling finished file size",
}, []string{})
finishedRowsCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "dumpling",
Subsystem: "dump",
Name: "finished_rows",
Help: "counter for dumpling finished rows",
}, []string{})
writeTimeHistogram = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "dumpling",
Subsystem: "write",
Name: "write_duration_time",
Help: "Bucketed histogram of write time (s) of files",
Buckets: prometheus.ExponentialBuckets(0.00005, 2, 20),
}, []string{})
receiveWriteChunkTimeHistogram = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "dumpling",
Subsystem: "write",
Name: "receive_chunk_duration_time",
Help: "Bucketed histogram of write time (s) of files",
Buckets: prometheus.ExponentialBuckets(0.00005, 2, 20),
}, []string{})
errorCount = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "dumpling",
Subsystem: "dump",
Name: "error_count",
Help: "Total error count during dumping progress",
}, []string{})
)

// RegisterMetrics registers metrics.
func RegisterMetrics(registry *prometheus.Registry) {
registry.MustRegister(finishedSizeCounter)
registry.MustRegister(finishedRowsCounter)
registry.MustRegister(writeTimeHistogram)
registry.MustRegister(receiveWriteChunkTimeHistogram)
registry.MustRegister(errorCount)
}

func RemoveLabelValuesWithTaskInMetrics(labels prometheus.Labels) {
finishedSizeCounter.Delete(labels)
finishedRowsCounter.Delete(labels)
writeTimeHistogram.Delete(labels)
receiveWriteChunkTimeHistogram.Delete(labels)
errorCount.Delete(labels)
}
2 changes: 1 addition & 1 deletion dumpling/v4/export/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -630,7 +630,7 @@ func pickupPossibleField(dbName, tableName string, db *sql.Conn, conf *Config) (
}

func estimateCount(dbName, tableName string, db *sql.Conn, field string, conf *Config) uint64 {
query := fmt.Sprintf("EXPLAIN SELECT `%s` FROM `%s`.`%s`", field, escapeString(dbName), escapeString(tableName))
query := fmt.Sprintf("EXPLAIN SELECT `%s` FROM `%s`.`%s`", escapeString(field), escapeString(dbName), escapeString(tableName))

if conf.Where != "" {
query += " WHERE "
Expand Down
Loading

0 comments on commit 68d86e9

Please sign in to comment.