Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support backup to S3 with br #1280

Merged
merged 15 commits into from
Dec 30, 2019
Merged
236 changes: 80 additions & 156 deletions cmd/backup-manager/app/backup/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,200 +14,124 @@
package backup
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the backup file for BR.


import (
"database/sql"
"context"
"fmt"
"io/ioutil"
"io"
"os/exec"
"path/filepath"
"strconv"
"strings"
"time"

"github.com/mholt/archiver"
"github.com/gogo/protobuf/proto"
glog "k8s.io/klog"

kvbackup "github.com/pingcap/kvproto/pkg/backup"
"github.com/pingcap/tidb-operator/cmd/backup-manager/app/constants"
"github.com/pingcap/tidb-operator/cmd/backup-manager/app/util"
glog "k8s.io/klog"
"github.com/pingcap/tidb-operator/pkg/apis/pingcap/v1alpha1"
)

// BackupOpts contains the input arguments to the backup command
type BackupOpts struct {
Namespace string
BackupName string
Bucket string
Host string
Port int32
Password string
User string
StorageType string
// Options contains the input arguments to the backup command
type Options struct {
Namespace string
BackupName string
}

func (bo *BackupOpts) String() string {
func (bo *Options) String() string {
return fmt.Sprintf("%s/%s", bo.Namespace, bo.BackupName)
}

func (bo *BackupOpts) getBackupFullPath() string {
return filepath.Join(constants.BackupRootPath, bo.getBackupRelativePath())
}

func (bo *BackupOpts) getBackupRelativePath() string {
backupName := fmt.Sprintf("backup-%s", time.Now().UTC().Format(time.RFC3339))
return fmt.Sprintf("%s/%s", bo.Bucket, backupName)
}

func (bo *BackupOpts) getDestBucketURI(remotePath string) string {
return fmt.Sprintf("%s://%s", bo.StorageType, remotePath)
}

func (bo *BackupOpts) getTikvGCLifeTime(db *sql.DB) (string, error) {
var tikvGCTime string
sql := fmt.Sprintf("select variable_value from %s where variable_name= ?", constants.TidbMetaTable)
row := db.QueryRow(sql, constants.TikvGCVariable)
err := row.Scan(&tikvGCTime)
func (bo *Options) backupData(backup *v1alpha1.Backup) (string, error) {
args, path, err := constructOptions(backup)
if err != nil {
return tikvGCTime, fmt.Errorf("query cluster %s %s failed, sql: %s, err: %v", bo, constants.TikvGCVariable, sql, err)
return "", err
}
return tikvGCTime, nil
}

func (bo *BackupOpts) setTikvGCLifeTime(db *sql.DB, gcTime string) error {
sql := fmt.Sprintf("update %s set variable_value = ? where variable_name = ?", constants.TidbMetaTable)
_, err := db.Exec(sql, gcTime, constants.TikvGCVariable)
var btype string
if backup.Spec.Type == "" {
btype = string(v1alpha1.BackupTypeFull)
} else {
btype = string(backup.Spec.Type)
DanielZhangQD marked this conversation as resolved.
Show resolved Hide resolved
}
fullArgs := []string{
"backup",
btype,
}
fullArgs = append(fullArgs, args...)
output, err := exec.Command("br", fullArgs...).CombinedOutput()
DanielZhangQD marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return fmt.Errorf("set cluster %s %s failed, sql: %s, err: %v", bo, constants.TikvGCVariable, sql, err)
return path, fmt.Errorf("cluster %s, execute br command %v failed, output: %s, err: %v", bo, args, string(output), err)
}
return nil
glog.Infof("Backup data for cluster %s successfully, output: %s", bo, string(output))
return path, nil
}

func (bo *BackupOpts) dumpTidbClusterData() (string, error) {
bfPath := bo.getBackupFullPath()
err := util.EnsureDirectoryExist(bfPath)
// getCommitTs get backup position from `EndVersion` in BR backup meta
func getCommitTs(backup *v1alpha1.Backup) (uint64, error) {
var commitTs uint64
s, err := util.NewRemoteStorage(backup)
if err != nil {
return "", err
return commitTs, err
}
args := []string{
fmt.Sprintf("--outputdir=%s", bfPath),
fmt.Sprintf("--host=%s", bo.Host),
fmt.Sprintf("--port=%d", bo.Port),
fmt.Sprintf("--user=%s", bo.User),
fmt.Sprintf("--password=%s", bo.Password),
"--long-query-guard=3600",
"--tidb-force-priority=LOW_PRIORITY",
"--verbose=3",
"--regex",
"^(?!(mysql|test|INFORMATION_SCHEMA|PERFORMANCE_SCHEMA))",
}

output, err := exec.Command("/mydumper", args...).CombinedOutput()
defer s.Close()
ctx := context.Background()
exist, err := s.Exists(ctx, constants.MetaFile)
if err != nil {
return bfPath, fmt.Errorf("cluster %s, execute mydumper command %v failed, output: %s, err: %v", bo, args, string(output), err)
return commitTs, err
}
return bfPath, nil
}
if !exist {
return commitTs, fmt.Errorf("%s not exist", constants.MetaFile)

func (bo *BackupOpts) backupDataToRemote(source, bucketURI string) error {
destBucket := util.NormalizeBucketURI(bucketURI)
tmpDestBucket := fmt.Sprintf("%s.tmp", destBucket)
// TODO: We may need to use exec.CommandContext to control timeouts.
output, err := exec.Command("rclone", constants.RcloneConfigArg, "copyto", source, tmpDestBucket).CombinedOutput()
if err != nil {
return fmt.Errorf("cluster %s, execute rclone copyto command for upload backup data %s failed, output: %s, err: %v", bo, bucketURI, string(output), err)
}

glog.Infof("upload cluster %s backup data to %s successfully, now move it to permanent URL %s", bo, tmpDestBucket, destBucket)

// the backup was a success
// remove .tmp extension
output, err = exec.Command("rclone", constants.RcloneConfigArg, "moveto", tmpDestBucket, destBucket).CombinedOutput()
metaData, err := s.ReadAll(ctx, constants.MetaFile)
if err != nil {
return fmt.Errorf("cluster %s, execute rclone moveto command failed, output: %s, err: %v", bo, string(output), err)
return commitTs, err
DanielZhangQD marked this conversation as resolved.
Show resolved Hide resolved
}
return nil
}

func (bo *BackupOpts) cleanRemoteBackupData(bucket string) error {
destBucket := util.NormalizeBucketURI(bucket)
output, err := exec.Command("rclone", constants.RcloneConfigArg, "deletefile", destBucket).CombinedOutput()
backupMeta := &kvbackup.BackupMeta{}
err = proto.Unmarshal(metaData, backupMeta)
if err != nil {
return fmt.Errorf("cluster %s, execute rclone deletefile command failed, output: %s, err: %v", bo, string(output), err)
return commitTs, err
DanielZhangQD marked this conversation as resolved.
Show resolved Hide resolved
}

glog.Infof("cluster %s backup %s was deleted successfully", bo, bucket)
return nil
}

func (bo *BackupOpts) getDSN(db string) string {
return fmt.Sprintf("%s:%s@(%s:%d)/%s?charset=utf8", bo.User, bo.Password, bo.Host, bo.Port, db)
return backupMeta.EndVersion, nil
}

/*
getCommitTsFromMetadata get commitTs from mydumper's metadata file

metadata file format is as follows:

Started dump at: 2019-06-13 10:00:04
SHOW MASTER STATUS:
Log: tidb-binlog
Pos: 409054741514944513
GTID:

Finished dump at: 2019-06-13 10:00:04
*/
func getCommitTsFromMetadata(backupPath string) (string, error) {
var commitTs string

metaFile := filepath.Join(backupPath, constants.MetaDataFile)
if exist := util.IsFileExist(metaFile); !exist {
return commitTs, fmt.Errorf("file %s does not exist or is not regular file", metaFile)
}
contents, err := ioutil.ReadFile(metaFile)
// constructOptions constructs options for BR and also return the remote path
func constructOptions(backup *v1alpha1.Backup) ([]string, string, error) {
args, path, err := util.ConstructBRGlobalOptions(backup)
if err != nil {
return commitTs, fmt.Errorf("read metadata file %s failed, err: %v", metaFile, err)
return args, path, err
}

for _, lineStr := range strings.Split(string(contents), "\n") {
if !strings.Contains(lineStr, "Pos") {
continue
}
lineStrSlice := strings.Split(lineStr, ":")
if len(lineStrSlice) != 2 {
return commitTs, fmt.Errorf("parse mydumper's metadata file %s failed, str: %s", metaFile, lineStr)
}
commitTs = strings.TrimSpace(lineStrSlice[1])
break
config := backup.Spec.BR
if config.Concurrency != nil {
args = append(args, fmt.Sprintf("--concurrency=%d", *config.Concurrency))
}
return commitTs, nil
}

// getBackupSize get the backup data size
func getBackupSize(backupPath string) (int64, error) {
var size int64
if exist := util.IsFileExist(backupPath); !exist {
return size, fmt.Errorf("file %s does not exist or is not regular file", backupPath)
if config.RateLimit != nil {
args = append(args, fmt.Sprintf("--ratelimit=%d", *config.RateLimit))
}
out, err := exec.Command("rclone", constants.RcloneConfigArg, "ls", backupPath).CombinedOutput()
if err != nil {
return size, fmt.Errorf("failed to get backup %s size, err: %v", backupPath, err)
if config.TimeAgo != "" {
args = append(args, fmt.Sprintf("--timeago=%s", config.TimeAgo))
}
sizeStr := strings.Fields(string(out))[0]
size, err = strconv.ParseInt(sizeStr, 10, 64)
if err != nil {
return size, fmt.Errorf("failed to parse size string %s, err: %v", sizeStr, err)
if config.Checksum != nil {
args = append(args, fmt.Sprintf("--checksum=%t", *config.Checksum))
}
return size, nil
return args, path, nil
}

// archiveBackupData archive backup data by destFile's extension name
func archiveBackupData(backupDir, destFile string) error {
if exist := util.IsDirExist(backupDir); !exist {
return fmt.Errorf("dir %s does not exist or is not a dir", backupDir)
}
destDir := filepath.Dir(destFile)
if err := util.EnsureDirectoryExist(destDir); err != nil {
return err
}
err := archiver.Archive([]string{backupDir}, destFile)
// getBackupSize get the backup data size from remote
func getBackupSize(backup *v1alpha1.Backup) (int64, error) {
var size int64
s, err := util.NewRemoteStorage(backup)
if err != nil {
return fmt.Errorf("archive backup data %s to %s failed, err: %v", backupDir, destFile, err)
return size, err
}
defer s.Close()
ctx := context.Background()
iter := s.List(nil)
for {
obj, err := iter.Next(ctx)
if err == io.EOF {
break
}
if err != nil {
return size, err
}
size += obj.Size
}
return nil
return size, nil
}
Loading