Skip to content

Commit

Permalink
backup: advacned prepare implementation (pingcap#48439) (pingcap#50520)…
Browse files Browse the repository at this point in the history
… (pingcap#39)

close pingcap#50359

Co-authored-by: Ti Chi Robot <ti-community-prow-bot@tidb.io>
  • Loading branch information
2 people authored and GitHub Enterprise committed Jan 23, 2024
1 parent 1569531 commit ede3585
Show file tree
Hide file tree
Showing 22 changed files with 1,594 additions and 309 deletions.
4 changes: 2 additions & 2 deletions DEPS.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -3061,8 +3061,8 @@ def go_deps():
name = "com_github_pingcap_kvproto",
build_file_proto_mode = "disable_global",
importpath = "github.com/pingcap/kvproto",
sum = "h1:tBKPWWqgWEBs04BV4UN7RhtUkZDs0oz+WyMbtRDVtL8=",
version = "v0.0.0-20230928035022-1bdcc25ed63c",
sum = "h1:ZWFeZNN+6poqqEQ3XU6M/Gw6oiNexbDD3yqIZ05GxlM=",
version = "v0.0.0-20240112060601-a0e3fbb1eeee",
)
go_repository(
name = "com_github_pingcap_log",
Expand Down
53 changes: 53 additions & 0 deletions br/pkg/backup/prepare_snap/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "prepare_snap",
srcs = [
"env.go",
"errors.go",
"prepare.go",
"stream.go",
],
importpath = "github.com/pingcap/tidb/br/pkg/backup/prepare_snap",
visibility = ["//visibility:public"],
deps = [
"//br/pkg/logutil",
"//br/pkg/utils",
"//util/engine",
"@com_github_docker_go_units//:go-units",
"@com_github_google_btree//:btree",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_kvproto//pkg/brpb",
"@com_github_pingcap_kvproto//pkg/errorpb",
"@com_github_pingcap_kvproto//pkg/metapb",
"@com_github_pingcap_log//:log",
"@com_github_tikv_client_go_v2//tikv",
"@com_github_tikv_pd_client//:client",
"@org_golang_google_grpc//:grpc",
"@org_golang_x_sync//errgroup",
"@org_uber_go_zap//:zap",
"@org_uber_go_zap//zapcore",
],
)

go_test(
name = "prepare_snap_test",
timeout = "short",
srcs = ["prepare_test.go"],
flaky = True,
shard_count = 7,
deps = [
":prepare_snap",
"//br/pkg/utils",
"//store/mockstore/unistore",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_kvproto//pkg/brpb",
"@com_github_pingcap_kvproto//pkg/errorpb",
"@com_github_pingcap_kvproto//pkg/metapb",
"@com_github_pingcap_log//:log",
"@com_github_stretchr_testify//require",
"@com_github_tikv_client_go_v2//tikv",
"@com_github_tikv_pd_client//:client",
"@org_uber_go_zap//zapcore",
],
)
190 changes: 190 additions & 0 deletions br/pkg/backup/prepare_snap/env.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,190 @@
// Copyright 2024 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package preparesnap

import (
"context"
"time"

"github.com/docker/go-units"
"github.com/pingcap/errors"
brpb "github.com/pingcap/kvproto/pkg/brpb"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/log"
"github.com/pingcap/tidb/br/pkg/logutil"
"github.com/pingcap/tidb/br/pkg/utils"
"github.com/pingcap/tidb/util/engine"
"github.com/tikv/client-go/v2/tikv"
pd "github.com/tikv/pd/client"
"go.uber.org/zap"
"google.golang.org/grpc"
)

// deleteFunc is copied from the "slices" package from go1.21.
func deleteFunc[S ~[]E, E any](s S, del func(E) bool) S {
// Don't start copying elements until we find one to delete.
for i, v := range s {
if del(v) {
j := i
for i++; i < len(s); i++ {
v = s[i]
if !del(v) {
s[j] = v
j++
}
}
return s[:j]
}
}
return s
}

const (
// default max gRPC message size is 10MiB.
// split requests to chunks of 1MiB will reduce the possibility of being rejected
// due to max gRPC message size.
maxRequestSize = units.MiB
)

type Env interface {
ConnectToStore(ctx context.Context, storeID uint64) (PrepareClient, error)
GetAllLiveStores(ctx context.Context) ([]*metapb.Store, error)

LoadRegionsInKeyRange(ctx context.Context, startKey, endKey []byte) (regions []Region, err error)
}

type PrepareClient interface {
Send(*brpb.PrepareSnapshotBackupRequest) error
Recv() (*brpb.PrepareSnapshotBackupResponse, error)
}

type SplitRequestClient struct {
PrepareClient
MaxRequestSize int
}

func (s SplitRequestClient) Send(req *brpb.PrepareSnapshotBackupRequest) error {
// Try best to keeping the request untouched.
if req.Ty == brpb.PrepareSnapshotBackupRequestType_WaitApply && req.Size() > s.MaxRequestSize {
rs := req.Regions
findSplitIndex := func() int {
if len(rs) == 0 {
return -1
}

// Select at least one request.
// So we won't get sutck if there were a really huge (!) request.
collected := 0
lastI := 1
for i := 2; i < len(rs) && collected+rs[i].Size() < s.MaxRequestSize; i++ {
lastI = i
collected += rs[i].Size()
}
return lastI
}
for splitIdx := findSplitIndex(); splitIdx > 0; splitIdx = findSplitIndex() {
split := &brpb.PrepareSnapshotBackupRequest{
Ty: brpb.PrepareSnapshotBackupRequestType_WaitApply,
Regions: rs[:splitIdx],
}
rs = rs[splitIdx:]
if err := s.PrepareClient.Send(split); err != nil {
return err
}
}
return nil
}
return s.PrepareClient.Send(req)
}

type Region interface {
GetMeta() *metapb.Region
GetLeaderStoreID() uint64
}

type CliEnv struct {
Cache *tikv.RegionCache
Mgr *utils.StoreManager
}

func (c CliEnv) GetAllLiveStores(ctx context.Context) ([]*metapb.Store, error) {
stores, err := c.Cache.PDClient().GetAllStores(ctx, pd.WithExcludeTombstone())
if err != nil {
return nil, err
}
withoutTiFlash := deleteFunc(stores, engine.IsTiFlash)
return withoutTiFlash, err
}

func (c CliEnv) ConnectToStore(ctx context.Context, storeID uint64) (PrepareClient, error) {
var cli brpb.Backup_PrepareSnapshotBackupClient
err := c.Mgr.TryWithConn(ctx, storeID, func(cc *grpc.ClientConn) error {
bcli := brpb.NewBackupClient(cc)
c, err := bcli.PrepareSnapshotBackup(ctx)
if err != nil {
return errors.Annotatef(err, "failed to create prepare backup stream")
}
cli = c
return nil
})
if err != nil {
return nil, err
}
return cli, nil
}

func (c CliEnv) LoadRegionsInKeyRange(ctx context.Context, startKey []byte, endKey []byte) (regions []Region, err error) {
bo := tikv.NewBackoffer(ctx, regionCacheMaxBackoffMs)
if len(endKey) == 0 {
// This is encoded [0xff; 8].
// Workaround for https://github.com/tikv/client-go/issues/1051.
endKey = []byte{0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff}
}
rs, err := c.Cache.LoadRegionsInKeyRange(bo, startKey, endKey)
if err != nil {
return nil, err
}
rrs := make([]Region, 0, len(rs))
for _, r := range rs {
rrs = append(rrs, r)
}
return rrs, nil
}

type RetryAndSplitRequestEnv struct {
Env
GetBackoffer func() utils.Backoffer
}

func (r RetryAndSplitRequestEnv) ConnectToStore(ctx context.Context, storeID uint64) (PrepareClient, error) {
// Retry for about 2 minutes.
rs := utils.InitialRetryState(12, 10*time.Second, 10*time.Second)
bo := utils.Backoffer(&rs)
if r.GetBackoffer != nil {
bo = r.GetBackoffer()
}
cli, err := utils.WithRetryV2(ctx, bo, func(ctx context.Context) (PrepareClient, error) {
cli, err := r.Env.ConnectToStore(ctx, storeID)
if err != nil {
log.Warn("Failed to connect to store, will retry.", zap.Uint64("store", storeID), logutil.ShortError(err))
return nil, err
}
return cli, nil
})
if err != nil {
return nil, err
}
return SplitRequestClient{PrepareClient: cli, MaxRequestSize: maxRequestSize}, nil
}
39 changes: 39 additions & 0 deletions br/pkg/backup/prepare_snap/errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
// Copyright 2024 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package preparesnap

import (
"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/errorpb"
)

func convertErr(err *errorpb.Error) error {
if err == nil {
return nil
}
return errors.New(err.Message)
}

func leaseExpired() error {
return errors.New("the lease has expired")
}

func unsupported() error {
return errors.New("unsupported operation")
}

func retryLimitExceeded() error {
return errors.New("the limit of retrying exceeded")
}
Loading

0 comments on commit ede3585

Please sign in to comment.