From ede35854d5b4ef165d90ac927b2ec89f7509b226 Mon Sep 17 00:00:00 2001 From: Olivia Chen Date: Tue, 23 Jan 2024 09:55:37 -0800 Subject: [PATCH] backup: advacned prepare implementation (#48439) (#50520) (#39) close pingcap/tidb#50359 Co-authored-by: Ti Chi Robot --- DEPS.bzl | 4 +- br/pkg/backup/prepare_snap/BUILD.bazel | 53 +++ br/pkg/backup/prepare_snap/env.go | 190 ++++++++ br/pkg/backup/prepare_snap/errors.go | 39 ++ br/pkg/backup/prepare_snap/prepare.go | 430 +++++++++++++++++ br/pkg/backup/prepare_snap/prepare_test.go | 458 +++++++++++++++++++ br/pkg/backup/prepare_snap/stream.go | 212 +++++++++ br/pkg/logutil/logging.go | 5 + br/pkg/task/BUILD.bazel | 2 + br/pkg/task/backup_ebs.go | 62 ++- br/pkg/task/operator/BUILD.bazel | 2 + br/pkg/task/operator/cmd.go | 89 ++-- br/pkg/utils/BUILD.bazel | 7 +- br/pkg/utils/misc.go | 24 + br/pkg/utils/store_manager.go | 12 +- br/pkg/utils/suspend_importing_test.go | 210 --------- br/pkg/utils/worker.go | 29 ++ go.mod | 2 +- go.sum | 4 +- store/mockstore/unistore/tikv/mock_region.go | 13 + tests/realtikvtest/brietest/BUILD.bazel | 2 + tests/realtikvtest/brietest/operator_test.go | 54 ++- 22 files changed, 1594 insertions(+), 309 deletions(-) create mode 100644 br/pkg/backup/prepare_snap/BUILD.bazel create mode 100644 br/pkg/backup/prepare_snap/env.go create mode 100644 br/pkg/backup/prepare_snap/errors.go create mode 100644 br/pkg/backup/prepare_snap/prepare.go create mode 100644 br/pkg/backup/prepare_snap/prepare_test.go create mode 100644 br/pkg/backup/prepare_snap/stream.go delete mode 100644 br/pkg/utils/suspend_importing_test.go diff --git a/DEPS.bzl b/DEPS.bzl index 1ca6af86b095a..def29eb9fef4e 100644 --- a/DEPS.bzl +++ b/DEPS.bzl @@ -3061,8 +3061,8 @@ def go_deps(): name = "com_github_pingcap_kvproto", build_file_proto_mode = "disable_global", importpath = "github.com/pingcap/kvproto", - sum = "h1:tBKPWWqgWEBs04BV4UN7RhtUkZDs0oz+WyMbtRDVtL8=", - version = "v0.0.0-20230928035022-1bdcc25ed63c", + sum = "h1:ZWFeZNN+6poqqEQ3XU6M/Gw6oiNexbDD3yqIZ05GxlM=", + version = "v0.0.0-20240112060601-a0e3fbb1eeee", ) go_repository( name = "com_github_pingcap_log", diff --git a/br/pkg/backup/prepare_snap/BUILD.bazel b/br/pkg/backup/prepare_snap/BUILD.bazel new file mode 100644 index 0000000000000..43ef1e611bc3a --- /dev/null +++ b/br/pkg/backup/prepare_snap/BUILD.bazel @@ -0,0 +1,53 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") + +go_library( + name = "prepare_snap", + srcs = [ + "env.go", + "errors.go", + "prepare.go", + "stream.go", + ], + importpath = "github.com/pingcap/tidb/br/pkg/backup/prepare_snap", + visibility = ["//visibility:public"], + deps = [ + "//br/pkg/logutil", + "//br/pkg/utils", + "//util/engine", + "@com_github_docker_go_units//:go-units", + "@com_github_google_btree//:btree", + "@com_github_pingcap_errors//:errors", + "@com_github_pingcap_kvproto//pkg/brpb", + "@com_github_pingcap_kvproto//pkg/errorpb", + "@com_github_pingcap_kvproto//pkg/metapb", + "@com_github_pingcap_log//:log", + "@com_github_tikv_client_go_v2//tikv", + "@com_github_tikv_pd_client//:client", + "@org_golang_google_grpc//:grpc", + "@org_golang_x_sync//errgroup", + "@org_uber_go_zap//:zap", + "@org_uber_go_zap//zapcore", + ], +) + +go_test( + name = "prepare_snap_test", + timeout = "short", + srcs = ["prepare_test.go"], + flaky = True, + shard_count = 7, + deps = [ + ":prepare_snap", + "//br/pkg/utils", + "//store/mockstore/unistore", + "@com_github_pingcap_errors//:errors", + "@com_github_pingcap_kvproto//pkg/brpb", + "@com_github_pingcap_kvproto//pkg/errorpb", + "@com_github_pingcap_kvproto//pkg/metapb", + "@com_github_pingcap_log//:log", + "@com_github_stretchr_testify//require", + "@com_github_tikv_client_go_v2//tikv", + "@com_github_tikv_pd_client//:client", + "@org_uber_go_zap//zapcore", + ], +) diff --git a/br/pkg/backup/prepare_snap/env.go b/br/pkg/backup/prepare_snap/env.go new file mode 100644 index 0000000000000..8d4cdaccead01 --- /dev/null +++ b/br/pkg/backup/prepare_snap/env.go @@ -0,0 +1,190 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package preparesnap + +import ( + "context" + "time" + + "github.com/docker/go-units" + "github.com/pingcap/errors" + brpb "github.com/pingcap/kvproto/pkg/brpb" + "github.com/pingcap/kvproto/pkg/metapb" + "github.com/pingcap/log" + "github.com/pingcap/tidb/br/pkg/logutil" + "github.com/pingcap/tidb/br/pkg/utils" + "github.com/pingcap/tidb/util/engine" + "github.com/tikv/client-go/v2/tikv" + pd "github.com/tikv/pd/client" + "go.uber.org/zap" + "google.golang.org/grpc" +) + +// deleteFunc is copied from the "slices" package from go1.21. +func deleteFunc[S ~[]E, E any](s S, del func(E) bool) S { + // Don't start copying elements until we find one to delete. + for i, v := range s { + if del(v) { + j := i + for i++; i < len(s); i++ { + v = s[i] + if !del(v) { + s[j] = v + j++ + } + } + return s[:j] + } + } + return s +} + +const ( + // default max gRPC message size is 10MiB. + // split requests to chunks of 1MiB will reduce the possibility of being rejected + // due to max gRPC message size. + maxRequestSize = units.MiB +) + +type Env interface { + ConnectToStore(ctx context.Context, storeID uint64) (PrepareClient, error) + GetAllLiveStores(ctx context.Context) ([]*metapb.Store, error) + + LoadRegionsInKeyRange(ctx context.Context, startKey, endKey []byte) (regions []Region, err error) +} + +type PrepareClient interface { + Send(*brpb.PrepareSnapshotBackupRequest) error + Recv() (*brpb.PrepareSnapshotBackupResponse, error) +} + +type SplitRequestClient struct { + PrepareClient + MaxRequestSize int +} + +func (s SplitRequestClient) Send(req *brpb.PrepareSnapshotBackupRequest) error { + // Try best to keeping the request untouched. + if req.Ty == brpb.PrepareSnapshotBackupRequestType_WaitApply && req.Size() > s.MaxRequestSize { + rs := req.Regions + findSplitIndex := func() int { + if len(rs) == 0 { + return -1 + } + + // Select at least one request. + // So we won't get sutck if there were a really huge (!) request. + collected := 0 + lastI := 1 + for i := 2; i < len(rs) && collected+rs[i].Size() < s.MaxRequestSize; i++ { + lastI = i + collected += rs[i].Size() + } + return lastI + } + for splitIdx := findSplitIndex(); splitIdx > 0; splitIdx = findSplitIndex() { + split := &brpb.PrepareSnapshotBackupRequest{ + Ty: brpb.PrepareSnapshotBackupRequestType_WaitApply, + Regions: rs[:splitIdx], + } + rs = rs[splitIdx:] + if err := s.PrepareClient.Send(split); err != nil { + return err + } + } + return nil + } + return s.PrepareClient.Send(req) +} + +type Region interface { + GetMeta() *metapb.Region + GetLeaderStoreID() uint64 +} + +type CliEnv struct { + Cache *tikv.RegionCache + Mgr *utils.StoreManager +} + +func (c CliEnv) GetAllLiveStores(ctx context.Context) ([]*metapb.Store, error) { + stores, err := c.Cache.PDClient().GetAllStores(ctx, pd.WithExcludeTombstone()) + if err != nil { + return nil, err + } + withoutTiFlash := deleteFunc(stores, engine.IsTiFlash) + return withoutTiFlash, err +} + +func (c CliEnv) ConnectToStore(ctx context.Context, storeID uint64) (PrepareClient, error) { + var cli brpb.Backup_PrepareSnapshotBackupClient + err := c.Mgr.TryWithConn(ctx, storeID, func(cc *grpc.ClientConn) error { + bcli := brpb.NewBackupClient(cc) + c, err := bcli.PrepareSnapshotBackup(ctx) + if err != nil { + return errors.Annotatef(err, "failed to create prepare backup stream") + } + cli = c + return nil + }) + if err != nil { + return nil, err + } + return cli, nil +} + +func (c CliEnv) LoadRegionsInKeyRange(ctx context.Context, startKey []byte, endKey []byte) (regions []Region, err error) { + bo := tikv.NewBackoffer(ctx, regionCacheMaxBackoffMs) + if len(endKey) == 0 { + // This is encoded [0xff; 8]. + // Workaround for https://github.com/tikv/client-go/issues/1051. + endKey = []byte{0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff} + } + rs, err := c.Cache.LoadRegionsInKeyRange(bo, startKey, endKey) + if err != nil { + return nil, err + } + rrs := make([]Region, 0, len(rs)) + for _, r := range rs { + rrs = append(rrs, r) + } + return rrs, nil +} + +type RetryAndSplitRequestEnv struct { + Env + GetBackoffer func() utils.Backoffer +} + +func (r RetryAndSplitRequestEnv) ConnectToStore(ctx context.Context, storeID uint64) (PrepareClient, error) { + // Retry for about 2 minutes. + rs := utils.InitialRetryState(12, 10*time.Second, 10*time.Second) + bo := utils.Backoffer(&rs) + if r.GetBackoffer != nil { + bo = r.GetBackoffer() + } + cli, err := utils.WithRetryV2(ctx, bo, func(ctx context.Context) (PrepareClient, error) { + cli, err := r.Env.ConnectToStore(ctx, storeID) + if err != nil { + log.Warn("Failed to connect to store, will retry.", zap.Uint64("store", storeID), logutil.ShortError(err)) + return nil, err + } + return cli, nil + }) + if err != nil { + return nil, err + } + return SplitRequestClient{PrepareClient: cli, MaxRequestSize: maxRequestSize}, nil +} diff --git a/br/pkg/backup/prepare_snap/errors.go b/br/pkg/backup/prepare_snap/errors.go new file mode 100644 index 0000000000000..eb2ca151ec514 --- /dev/null +++ b/br/pkg/backup/prepare_snap/errors.go @@ -0,0 +1,39 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package preparesnap + +import ( + "github.com/pingcap/errors" + "github.com/pingcap/kvproto/pkg/errorpb" +) + +func convertErr(err *errorpb.Error) error { + if err == nil { + return nil + } + return errors.New(err.Message) +} + +func leaseExpired() error { + return errors.New("the lease has expired") +} + +func unsupported() error { + return errors.New("unsupported operation") +} + +func retryLimitExceeded() error { + return errors.New("the limit of retrying exceeded") +} diff --git a/br/pkg/backup/prepare_snap/prepare.go b/br/pkg/backup/prepare_snap/prepare.go new file mode 100644 index 0000000000000..46f1916873831 --- /dev/null +++ b/br/pkg/backup/prepare_snap/prepare.go @@ -0,0 +1,430 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package preparesnap + +import ( + "bytes" + "context" + "fmt" + "time" + + "github.com/google/btree" + "github.com/pingcap/errors" + brpb "github.com/pingcap/kvproto/pkg/brpb" + "github.com/pingcap/kvproto/pkg/metapb" + "github.com/pingcap/log" + "github.com/pingcap/tidb/br/pkg/logutil" + "go.uber.org/zap" + "go.uber.org/zap/zapcore" + "golang.org/x/sync/errgroup" +) + +const ( + /* The combination of defaultMaxRetry and defaultRetryBackoff limits + the whole procedure to about 5 min if there is a region always fail. + Also note that we are batching during retrying. Retrying many region + costs only one chance of retrying if they are batched. */ + + defaultMaxRetry = 60 + defaultRetryBackoff = 5 * time.Second + defaultLeaseDur = 120 * time.Second + + /* Give pd enough time to find the region. If we aren't able to fetch + the region, the whole procedure might be aborted. */ + + regionCacheMaxBackoffMs = 60000 +) + +type pendingRequests map[uint64]*brpb.PrepareSnapshotBackupRequest + +type rangeOrRegion struct { + // If it is a range, this should be zero. + id uint64 + startKey []byte + endKey []byte +} + +func (r rangeOrRegion) String() string { + rng := logutil.StringifyRangeOf(r.startKey, r.endKey) + if r.id == 0 { + return fmt.Sprintf("range%s", rng) + } + return fmt.Sprintf("region(id=%d, range=%s)", r.id, rng) +} + +func (r rangeOrRegion) compareWith(than rangeOrRegion) bool { + return bytes.Compare(r.startKey, than.startKey) < 0 +} + +type Preparer struct { + /* Environments. */ + env Env + + /* Internal Status. */ + inflightReqs map[uint64]metapb.Region + failed []rangeOrRegion + waitApplyDoneRegions btree.BTreeG[rangeOrRegion] + retryTime int + nextRetry *time.Timer + + /* Internal I/O. */ + eventChan chan event + clients map[uint64]*prepareStream + + /* Interface for caller. */ + waitApplyFinished bool + + /* Some configurations. They aren't thread safe. + You may need to configure them before starting the Preparer. */ + RetryBackoff time.Duration + RetryLimit int + LeaseDuration time.Duration +} + +func New(env Env) *Preparer { + prep := &Preparer{ + env: env, + + inflightReqs: make(map[uint64]metapb.Region), + waitApplyDoneRegions: *btree.NewG(16, rangeOrRegion.compareWith), + eventChan: make(chan event, 128), + clients: make(map[uint64]*prepareStream), + + RetryBackoff: defaultRetryBackoff, + RetryLimit: defaultMaxRetry, + LeaseDuration: defaultLeaseDur, + } + return prep +} + +func (p *Preparer) MarshalLogObject(om zapcore.ObjectEncoder) error { + om.AddInt("inflight_requests", len(p.inflightReqs)) + reqs := 0 + for _, r := range p.inflightReqs { + om.AddString("simple_inflight_region", rangeOrRegion{id: r.Id, startKey: r.StartKey, endKey: r.EndKey}.String()) + reqs += 1 + if reqs > 3 { + break + } + } + om.AddInt("failed_requests", len(p.failed)) + failed := 0 + for _, r := range p.failed { + om.AddString("simple_failed_region", r.String()) + failed += 1 + if failed > 5 { + break + } + } + err := om.AddArray("connected_stores", zapcore.ArrayMarshalerFunc(func(ae zapcore.ArrayEncoder) error { + for id := range p.clients { + ae.AppendUint64(id) + } + return nil + })) + if err != nil { + return err + } + om.AddInt("retry_time", p.retryTime) + om.AddBool("wait_apply_finished", p.waitApplyFinished) + return nil +} + +// DriveLoopAndWaitPrepare drives the state machine and block the +// current goroutine until we are safe to start taking snapshot. +// +// After this invoked, you shouldn't share this `Preparer` with any other goroutines. +// +// After this the cluster will enter the land between normal and taking snapshot. +// This state will continue even this function returns, until `Finalize` invoked. +// Splitting, ingesting and conf changing will all be blocked. +func (p *Preparer) DriveLoopAndWaitPrepare(ctx context.Context) error { + logutil.CL(ctx).Info("Start drive the loop.", zap.Duration("retry_backoff", p.RetryBackoff), + zap.Int("retry_limit", p.RetryLimit), + zap.Duration("lease_duration", p.LeaseDuration)) + p.retryTime = 0 + if err := p.prepareConnections(ctx); err != nil { + log.Error("failed to prepare connections", logutil.ShortError(err)) + return errors.Annotate(err, "failed to prepare connections") + } + if err := p.AdvanceState(ctx); err != nil { + log.Error("failed to check the progress of our work", logutil.ShortError(err)) + return errors.Annotate(err, "failed to begin step") + } + for !p.waitApplyFinished { + if err := p.WaitAndHandleNextEvent(ctx); err != nil { + log.Error("failed to wait and handle next event", logutil.ShortError(err)) + return errors.Annotate(err, "failed to step") + } + } + return nil +} + +// Finalize notify the cluster to go back to the normal mode. +// This will return an error if the cluster has already entered the normal mode when this is called. +func (p *Preparer) Finalize(ctx context.Context) error { + eg := new(errgroup.Group) + for id, cli := range p.clients { + cli := cli + id := id + eg.Go(func() error { + if err := cli.Finalize(ctx); err != nil { + return errors.Annotatef(err, "failed to finalize the prepare stream for %d", id) + } + return nil + }) + } + if err := eg.Wait(); err != nil { + logutil.CL(ctx).Warn("failed to finalize some prepare streams.", logutil.ShortError(err)) + return err + } + logutil.CL(ctx).Info("all connections to store have shuted down.") + for { + select { + case event := <-p.eventChan: + if err := p.onEvent(ctx, event); err != nil { + return err + } + default: + return nil + } + } +} + +func (p *Preparer) batchEvents(evts *[]event) { + for { + select { + case evt := <-p.eventChan: + *evts = append(*evts, evt) + default: + return + } + } +} + +// WaitAndHandleNextEvent is exported for test usage. +// This waits the next event (wait apply done, errors, etc..) of preparing. +// Generally `DriveLoopAndWaitPrepare` is all you need. +func (p *Preparer) WaitAndHandleNextEvent(ctx context.Context) error { + select { + case <-ctx.Done(): + logutil.CL(ctx).Warn("User canceled.", logutil.ShortError(ctx.Err())) + return ctx.Err() + case evt := <-p.eventChan: + logutil.CL(ctx).Debug("received event", zap.Stringer("event", evt)) + events := []event{evt} + p.batchEvents(&events) + for _, evt := range events { + err := p.onEvent(ctx, evt) + if err != nil { + return errors.Annotatef(err, "failed to handle event %v", evt) + } + } + return p.AdvanceState(ctx) + case <-p.retryChan(): + return p.workOnPendingRanges(ctx) + } +} + +func (p *Preparer) removePendingRequest(r *metapb.Region) bool { + r2, ok := p.inflightReqs[r.GetId()] + if !ok { + return false + } + matches := r2.GetRegionEpoch().GetVersion() == r.GetRegionEpoch().GetVersion() && + r2.GetRegionEpoch().GetConfVer() == r.GetRegionEpoch().GetConfVer() + if !matches { + return false + } + delete(p.inflightReqs, r.GetId()) + return true +} + +func (p *Preparer) onEvent(ctx context.Context, e event) error { + switch e.ty { + case eventMiscErr: + // Note: some of errors might be able to be retry. + // But for now it seems there isn't one. + return errors.Annotatef(e.err, "unrecoverable error at store %d", e.storeID) + case eventWaitApplyDone: + if !p.removePendingRequest(e.region) { + logutil.CL(ctx).Warn("received unmatched response, perhaps stale, drop it", zap.Stringer("region", e.region)) + return nil + } + r := rangeOrRegion{ + id: e.region.GetId(), + startKey: e.region.GetStartKey(), + endKey: e.region.GetEndKey(), + } + if e.err != nil { + logutil.CL(ctx).Warn("requesting a region failed.", zap.Uint64("store", e.storeID), logutil.ShortError(e.err)) + p.failed = append(p.failed, r) + if p.nextRetry != nil { + p.nextRetry.Stop() + } + // Reset the timer so we can collect more regions. + // Note: perhaps it is better to make a deadline heap or something + // so every region backoffs the same time. + p.nextRetry = time.NewTimer(p.RetryBackoff) + return nil + } + if item, ok := p.waitApplyDoneRegions.ReplaceOrInsert(r); ok { + logutil.CL(ctx).Warn("overlapping in success region", + zap.Stringer("old_region", item), + zap.Stringer("new_region", r)) + } + default: + return errors.Annotatef(unsupported(), "unsupported event type %d", e.ty) + } + + return nil +} + +func (p *Preparer) retryChan() <-chan time.Time { + if p.nextRetry == nil { + return nil + } + return p.nextRetry.C +} + +// AdvanceState is exported for test usage. +// This call will check whether now we are safe to forward the whole procedure. +// If we can, this will set `p.waitApplyFinished` to true. +// Generally `DriveLoopAndWaitPrepare` is all you need, you may not want to call this. +func (p *Preparer) AdvanceState(ctx context.Context) error { + logutil.CL(ctx).Info("Checking the progress of our work.", zap.Object("current", p)) + if len(p.inflightReqs) == 0 && len(p.failed) == 0 { + holes := p.checkHole() + if len(holes) == 0 { + p.waitApplyFinished = true + return nil + } + logutil.CL(ctx).Warn("It seems there are still some works to be done.", zap.Stringers("regions", holes)) + p.failed = holes + return p.workOnPendingRanges(ctx) + } + + return nil +} + +func (p *Preparer) checkHole() []rangeOrRegion { + log.Info("Start checking the hole.", zap.Int("len", p.waitApplyDoneRegions.Len())) + if p.waitApplyDoneRegions.Len() == 0 { + return []rangeOrRegion{{}} + } + + last := []byte("") + failed := []rangeOrRegion{} + p.waitApplyDoneRegions.Ascend(func(item rangeOrRegion) bool { + if bytes.Compare(last, item.startKey) < 0 { + failed = append(failed, rangeOrRegion{startKey: last, endKey: item.startKey}) + } + last = item.endKey + return true + }) + // Not the end key of key space. + if len(last) > 0 { + failed = append(failed, rangeOrRegion{ + startKey: last, + }) + } + return failed +} + +func (p *Preparer) workOnPendingRanges(ctx context.Context) error { + p.nextRetry = nil + if len(p.failed) == 0 { + return nil + } + p.retryTime += 1 + if p.retryTime > p.RetryLimit { + return retryLimitExceeded() + } + + logutil.CL(ctx).Info("retrying some ranges incomplete.", zap.Int("ranges", len(p.failed))) + preqs := pendingRequests{} + for _, r := range p.failed { + rs, err := p.env.LoadRegionsInKeyRange(ctx, r.startKey, r.endKey) + if err != nil { + return errors.Annotatef(err, "retrying range of %s: get region", logutil.StringifyRangeOf(r.startKey, r.endKey)) + } + logutil.CL(ctx).Info("loaded regions in range for retry.", zap.Int("regions", len(rs))) + for _, region := range rs { + p.pushWaitApply(preqs, region) + } + } + p.failed = nil + return p.sendWaitApply(ctx, preqs) +} + +func (p *Preparer) sendWaitApply(ctx context.Context, reqs pendingRequests) error { + for store, req := range reqs { + stream, err := p.streamOf(ctx, store) + if err != nil { + return errors.Annotatef(err, "failed to dial the store %d", store) + } + err = stream.cli.Send(req) + if err != nil { + return errors.Annotatef(err, "failed to send message to the store %d", store) + } + logutil.CL(ctx).Info("sent wait apply requests to store", zap.Uint64("store", store), zap.Int("regions", len(req.Regions))) + } + return nil +} + +func (p *Preparer) streamOf(ctx context.Context, storeID uint64) (*prepareStream, error) { + s, ok := p.clients[storeID] + if !ok { + cli, err := p.env.ConnectToStore(ctx, storeID) + if err != nil { + return nil, errors.Annotatef(err, "failed to dial store %d", storeID) + } + s = new(prepareStream) + s.storeID = storeID + s.output = p.eventChan + s.leaseDuration = p.LeaseDuration + err = s.InitConn(ctx, cli) + if err != nil { + return nil, err + } + p.clients[storeID] = s + } + return s, nil +} + +func (p *Preparer) pushWaitApply(reqs pendingRequests, region Region) { + leader := region.GetLeaderStoreID() + if _, ok := reqs[leader]; !ok { + reqs[leader] = new(brpb.PrepareSnapshotBackupRequest) + reqs[leader].Ty = brpb.PrepareSnapshotBackupRequestType_WaitApply + } + reqs[leader].Regions = append(reqs[leader].Regions, region.GetMeta()) + p.inflightReqs[region.GetMeta().Id] = *region.GetMeta() +} + +func (p *Preparer) prepareConnections(ctx context.Context) error { + log.Info("Preparing connections to stores.") + stores, err := p.env.GetAllLiveStores(ctx) + if err != nil { + return errors.Annotate(err, "failed to get all live stores") + } + for _, store := range stores { + _, err := p.streamOf(ctx, store.Id) + if err != nil { + return errors.Annotatef(err, "failed to prepare connection to store %d", store.Id) + } + } + return nil +} diff --git a/br/pkg/backup/prepare_snap/prepare_test.go b/br/pkg/backup/prepare_snap/prepare_test.go new file mode 100644 index 0000000000000..052ce62c7a491 --- /dev/null +++ b/br/pkg/backup/prepare_snap/prepare_test.go @@ -0,0 +1,458 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package preparesnap_test + +import ( + "bytes" + "context" + "encoding/hex" + "io" + "sort" + "sync" + "testing" + "time" + + "github.com/pingcap/errors" + brpb "github.com/pingcap/kvproto/pkg/brpb" + "github.com/pingcap/kvproto/pkg/errorpb" + "github.com/pingcap/kvproto/pkg/metapb" + "github.com/pingcap/log" + . "github.com/pingcap/tidb/br/pkg/backup/prepare_snap" + "github.com/pingcap/tidb/br/pkg/utils" + "github.com/pingcap/tidb/store/mockstore/unistore" + "github.com/stretchr/testify/require" + "github.com/tikv/client-go/v2/tikv" + pd "github.com/tikv/pd/client" + "go.uber.org/zap/zapcore" +) + +type mockStore struct { + mu sync.Mutex + + output chan brpb.PrepareSnapshotBackupResponse + leaseUntil time.Time + + successRegions []metapb.Region + onWaitApply func(*metapb.Region) error + now func() time.Time +} + +func (s *mockStore) Send(req *brpb.PrepareSnapshotBackupRequest) error { + switch req.Ty { + case brpb.PrepareSnapshotBackupRequestType_WaitApply: + s.mu.Lock() + defer s.mu.Unlock() + for _, region := range req.Regions { + resp := brpb.PrepareSnapshotBackupResponse{ + Ty: brpb.PrepareSnapshotBackupEventType_WaitApplyDone, + Region: region, + } + if s.onWaitApply != nil { + if err := s.onWaitApply(region); err != nil { + resp.Error = &errorpb.Error{ + Message: err.Error(), + } + } + } + s.sendResp(resp) + if resp.Error == nil { + s.successRegions = append(s.successRegions, *region) + } + } + case brpb.PrepareSnapshotBackupRequestType_UpdateLease: + s.mu.Lock() + defer s.mu.Unlock() + expired := s.leaseUntil.Before(s.now()) + s.leaseUntil = s.now().Add(time.Duration(req.LeaseInSeconds) * time.Second) + s.sendResp(brpb.PrepareSnapshotBackupResponse{ + Ty: brpb.PrepareSnapshotBackupEventType_UpdateLeaseResult, + LastLeaseIsValid: !expired, + }) + case brpb.PrepareSnapshotBackupRequestType_Finish: + s.mu.Lock() + defer s.mu.Unlock() + expired := s.leaseUntil.Before(s.now()) + s.leaseUntil = time.Time{} + s.sendResp(brpb.PrepareSnapshotBackupResponse{ + Ty: brpb.PrepareSnapshotBackupEventType_UpdateLeaseResult, + LastLeaseIsValid: !expired, + }) + close(s.output) + } + return nil +} + +func (s *mockStore) sendResp(resp brpb.PrepareSnapshotBackupResponse) { + s.output <- resp +} + +func (s *mockStore) Recv() (*brpb.PrepareSnapshotBackupResponse, error) { + out, ok := <-s.output + if !ok { + return nil, io.EOF + } + return &out, nil +} + +type mockStores struct { + mu sync.Mutex + stores map[uint64]*mockStore + onCreateStore func(*mockStore) + onConnectToStore func(uint64) error + + pdc *tikv.RegionCache +} + +func newTestEnv(pdc pd.Client) *mockStores { + r := tikv.NewRegionCache(pdc) + ms := &mockStores{ + stores: map[uint64]*mockStore{}, + pdc: r, + onCreateStore: func(ms *mockStore) {}, + } + return ms +} + +func (m *mockStores) GetAllLiveStores(ctx context.Context) ([]*metapb.Store, error) { + m.mu.Lock() + defer m.mu.Unlock() + + res := []*metapb.Store{} + for id := range m.stores { + res = append(res, &metapb.Store{Id: id}) + } + return res, nil +} + +func (m *mockStores) ConnectToStore(ctx context.Context, storeID uint64) (PrepareClient, error) { + m.mu.Lock() + defer m.mu.Unlock() + + if m.onConnectToStore != nil { + err := m.onConnectToStore(storeID) + if err != nil { + return nil, err + } + } + + _, ok := m.stores[storeID] + if !ok { + m.stores[storeID] = &mockStore{ + output: make(chan brpb.PrepareSnapshotBackupResponse, 16), + successRegions: []metapb.Region{}, + onWaitApply: func(r *metapb.Region) error { + return nil + }, + now: func() time.Time { + return time.Now() + }, + } + m.onCreateStore(m.stores[storeID]) + } + return m.stores[storeID], nil +} + +func (m *mockStores) LoadRegionsInKeyRange(ctx context.Context, startKey []byte, endKey []byte) (regions []Region, err error) { + if len(endKey) == 0 { + // This is encoded [0xff; 8]. + // Workaround for https://github.com/tikv/client-go/issues/1051. + endKey = []byte{0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff} + } + rs, err := m.pdc.LoadRegionsInKeyRange(tikv.NewBackoffer(ctx, 100), startKey, endKey) + if err != nil { + return nil, err + } + rrs := make([]Region, 0, len(rs)) + for _, r := range rs { + rrs = append(rrs, r) + } + return rrs, nil +} + +type rng [2][]byte + +func (m *mockStores) AssertSafeForBackup(t *testing.T) { + m.mu.Lock() + defer m.mu.Unlock() + + res := []rng{} + for _, store := range m.stores { + store.mu.Lock() + for _, region := range store.successRegions { + res = append(res, rng{region.StartKey, region.EndKey}) + } + now := store.now() + if store.leaseUntil.Before(now) { + t.Fatalf("lease has expired: at %s, now is %s", store.leaseUntil, now) + } + store.mu.Unlock() + } + sort.Slice(res, func(a, b int) bool { + return bytes.Compare(res[a][0], res[b][0]) < 0 + }) + for i := 1; i < len(res); i++ { + if bytes.Compare(res[i-1][1], res[i][0]) < 0 { + t.Fatalf("hole: %s %s", hex.EncodeToString(res[i-1][1]), hex.EncodeToString(res[i][0])) + } + } +} + +func (m *mockStores) AssertIsNormalMode(t *testing.T) { + m.mu.Lock() + defer m.mu.Unlock() + + for id, store := range m.stores { + store.mu.Lock() + if !store.leaseUntil.Before(store.now()) { + t.Fatalf("lease in store %d doesn't expire, the store may not work as normal", id) + } + store.mu.Unlock() + } +} + +func fakeCluster(t *testing.T, nodes int, keys ...[]byte) pd.Client { + tmp := t.TempDir() + _, pdc, cluster, err := unistore.New(tmp) + unistore.BootstrapWithMultiStores(cluster, nodes) + require.NoError(t, err) + cluster.SplitArbitrary(keys...) + return pdc +} + +func dummyRegions(size int) [][]byte { + // Generate regions like "a", "b", ..., "z", "aa", "ba", ..., "zz", "aaa" + res := [][]byte{} + for i := 0; i < size; i++ { + s := make([]byte, 0, i/26) + for j := i; j > 0; j /= 26 { + s = append(s, byte('a')+byte(j%26)) + } + res = append(res, s) + } + sort.Slice(res, func(i, j int) bool { return bytes.Compare(res[i], res[j]) < 0 }) + return res +} + +func TestBasic(t *testing.T) { + req := require.New(t) + pdc := fakeCluster(t, 3, dummyRegions(100)...) + ms := newTestEnv(pdc) + + ctx := context.Background() + prep := New(ms) + req.NoError(prep.DriveLoopAndWaitPrepare(ctx)) + ms.AssertSafeForBackup(t) + req.NoError(prep.Finalize(ctx)) + ms.AssertIsNormalMode(t) +} + +func TestFailDueToErr(t *testing.T) { + req := require.New(t) + pdc := fakeCluster(t, 3, dummyRegions(100)...) + ms := newTestEnv(pdc) + + ms.onCreateStore = func(ms *mockStore) { + ms.onWaitApply = func(r *metapb.Region) error { + return errors.New("failed meow") + } + } + + ctx := context.Background() + prep := New(ms) + prep.RetryBackoff = 100 * time.Millisecond + prep.RetryLimit = 3 + now := time.Now() + req.Error(prep.DriveLoopAndWaitPrepare(ctx)) + req.Greater(time.Since(now), 300*time.Millisecond) + req.NoError(prep.Finalize(ctx)) + ms.AssertIsNormalMode(t) +} + +func TestError(t *testing.T) { + req := require.New(t) + pdc := fakeCluster(t, 3, dummyRegions(100)...) + ms := newTestEnv(pdc) + + ms.onCreateStore = func(ms *mockStore) { + failed := false + ms.onWaitApply = func(r *metapb.Region) error { + if !failed { + failed = true + return errors.New("failed") + } + return nil + } + } + + ctx := context.Background() + prep := New(ms) + prep.RetryBackoff = 0 + req.NoError(prep.DriveLoopAndWaitPrepare(ctx)) + ms.AssertSafeForBackup(t) + req.NoError(prep.Finalize(ctx)) + ms.AssertIsNormalMode(t) +} + +func TestLeaseTimeout(t *testing.T) { + log.SetLevel(zapcore.DebugLevel) + req := require.New(t) + pdc := fakeCluster(t, 3, dummyRegions(100)...) + ms := newTestEnv(pdc) + tt := struct { + now time.Time + mu sync.Mutex + }{now: time.Now()} + + ms.onCreateStore = func(ms *mockStore) { + ms.now = func() time.Time { + tt.mu.Lock() + defer tt.mu.Unlock() + return tt.now + } + } + + ctx := context.Background() + prep := New(ms) + req.NoError(prep.DriveLoopAndWaitPrepare(ctx)) + ms.AssertSafeForBackup(t) + tt.mu.Lock() + tt.now = tt.now.Add(100 * time.Minute) + tt.mu.Unlock() + req.Error(prep.Finalize(ctx)) +} + +func TestLeaseTimeoutWhileTakingSnapshot(t *testing.T) { + log.SetLevel(zapcore.DebugLevel) + req := require.New(t) + pdc := fakeCluster(t, 3, dummyRegions(100)...) + ms := newTestEnv(pdc) + tt := struct { + now time.Time + mu sync.Mutex + }{now: time.Now()} + + ms.onCreateStore = func(ms *mockStore) { + ms.now = func() time.Time { + tt.mu.Lock() + defer tt.mu.Unlock() + return tt.now + } + } + + ctx := context.Background() + prep := New(ms) + prep.LeaseDuration = 4 * time.Second + req.NoError(prep.AdvanceState(ctx)) + tt.mu.Lock() + tt.now = tt.now.Add(100 * time.Minute) + tt.mu.Unlock() + time.Sleep(2 * time.Second) + cx, cancel := context.WithTimeout(ctx, 1*time.Second) + defer cancel() + for { + err := prep.WaitAndHandleNextEvent(cx) + if err != nil { + req.ErrorContains(err, "the lease has expired") + break + } + } +} + +func TestRetryEnv(t *testing.T) { + log.SetLevel(zapcore.DebugLevel) + req := require.New(t) + pdc := fakeCluster(t, 3, dummyRegions(100)...) + tms := newTestEnv(pdc) + failed := new(sync.Once) + tms.onConnectToStore = func(u uint64) error { + shouldFail := false + failed.Do(func() { + shouldFail = true + }) + if shouldFail { + return errors.New("nya?") + } + return nil + } + ms := RetryAndSplitRequestEnv{Env: tms} + ms.GetBackoffer = func() utils.Backoffer { + o := utils.InitialRetryState(2, 0, 0) + return &o + } + prep := New(ms) + ctx := context.Background() + req.NoError(prep.DriveLoopAndWaitPrepare(ctx)) + req.NoError(prep.Finalize(ctx)) +} + +type counterClient struct { + send int + regions []*metapb.Region +} + +func (c *counterClient) Send(req *brpb.PrepareSnapshotBackupRequest) error { + c.send += 1 + c.regions = append(c.regions, req.Regions...) + return nil +} + +func (c *counterClient) Recv() (*brpb.PrepareSnapshotBackupResponse, error) { + panic("not implemented") +} + +func TestSplitEnv(t *testing.T) { + log.SetLevel(zapcore.DebugLevel) + cc := SplitRequestClient{PrepareClient: &counterClient{}, MaxRequestSize: 1024} + reset := func() { + cc.PrepareClient.(*counterClient).send = 0 + cc.PrepareClient.(*counterClient).regions = nil + } + makeHugeRequestRegions := func(n int, eachSize int) []*metapb.Region { + regions := []*metapb.Region{} + for i := 0; i < n; i++ { + regions = append(regions, &metapb.Region{ + StartKey: append(make([]byte, eachSize-1), byte(i)), + EndKey: append(make([]byte, eachSize-1), byte(i+1)), + }) + } + return regions + } + + hugeRequest := brpb.PrepareSnapshotBackupRequest{ + Ty: brpb.PrepareSnapshotBackupRequestType_WaitApply, + Regions: makeHugeRequestRegions(100, 128), + } + require.NoError(t, cc.Send(&hugeRequest)) + require.GreaterOrEqual(t, cc.PrepareClient.(*counterClient).send, 20) + require.ElementsMatch(t, cc.PrepareClient.(*counterClient).regions, hugeRequest.Regions) + + reset() + reallyHugeRequest := brpb.PrepareSnapshotBackupRequest{ + Ty: brpb.PrepareSnapshotBackupRequestType_WaitApply, + Regions: makeHugeRequestRegions(10, 2048), + } + require.NoError(t, cc.Send(&reallyHugeRequest)) + require.Equal(t, cc.PrepareClient.(*counterClient).send, 10) + require.ElementsMatch(t, cc.PrepareClient.(*counterClient).regions, reallyHugeRequest.Regions) + + reset() + tinyRequest := brpb.PrepareSnapshotBackupRequest{ + Ty: brpb.PrepareSnapshotBackupRequestType_WaitApply, + Regions: makeHugeRequestRegions(10, 10), + } + require.NoError(t, cc.Send(&tinyRequest)) + require.Equal(t, cc.PrepareClient.(*counterClient).send, 1) + require.ElementsMatch(t, cc.PrepareClient.(*counterClient).regions, tinyRequest.Regions) +} diff --git a/br/pkg/backup/prepare_snap/stream.go b/br/pkg/backup/prepare_snap/stream.go new file mode 100644 index 0000000000000..9e253fc4a4d37 --- /dev/null +++ b/br/pkg/backup/prepare_snap/stream.go @@ -0,0 +1,212 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package preparesnap + +import ( + "context" + "fmt" + "io" + "time" + + "github.com/pingcap/errors" + brpb "github.com/pingcap/kvproto/pkg/brpb" + "github.com/pingcap/kvproto/pkg/metapb" + "github.com/pingcap/log" + "github.com/pingcap/tidb/br/pkg/logutil" + "github.com/pingcap/tidb/br/pkg/utils" + "go.uber.org/zap" + "golang.org/x/sync/errgroup" +) + +type eventType int + +const ( + eventMiscErr eventType = iota + eventWaitApplyDone +) + +type event struct { + ty eventType + + storeID uint64 + err error + region *metapb.Region +} + +func (e event) String() string { + return fmt.Sprintf("Event(Type: %v, StoreID: %v, Error: %v, Region: %v)", e.ty, e.storeID, e.err, e.region) +} + +type prepareStream struct { + storeID uint64 + cli PrepareClient + leaseDuration time.Duration + + output chan<- event + serverStream <-chan utils.Result[*brpb.PrepareSnapshotBackupResponse] + + clientLoopHandle *errgroup.Group + stopBgTasks context.CancelFunc +} + +// InitConn initializes the connection to the stream (i.e. "active" the stream). +// +// Before calling this, make sure you have filled the store ID and output channel. +// +// Once this has been called, them **should not be changed** any more. +func (p *prepareStream) InitConn(ctx context.Context, cli PrepareClient) error { + p.cli = cli + p.clientLoopHandle, ctx = errgroup.WithContext(ctx) + ctx, p.stopBgTasks = context.WithCancel(ctx) + return p.GoLeaseLoop(ctx, p.leaseDuration) +} + +func (p *prepareStream) Finalize(ctx context.Context) error { + log.Info("shutting down", zap.Uint64("store", p.storeID)) + return p.stopClientLoop(ctx) +} + +func (p *prepareStream) GoLeaseLoop(ctx context.Context, dur time.Duration) error { + err := p.cli.Send(&brpb.PrepareSnapshotBackupRequest{ + Ty: brpb.PrepareSnapshotBackupRequestType_UpdateLease, + LeaseInSeconds: uint64(dur.Seconds()), + }) + if err != nil { + return errors.Annotate(err, "failed to initialize the lease") + } + msg, err := p.cli.Recv() + if err != nil { + return errors.Annotate(err, "failed to recv the initialize lease result") + } + if msg.Ty != brpb.PrepareSnapshotBackupEventType_UpdateLeaseResult { + return errors.Errorf("unexpected type of response during creating lease loop: it is %s", msg.Ty) + } + p.serverStream = utils.AsyncStreamBy(p.cli.Recv) + p.clientLoopHandle.Go(func() error { return p.clientLoop(ctx, dur) }) + return nil +} + +func (p *prepareStream) onResponse(ctx context.Context, res utils.Result[*brpb.PrepareSnapshotBackupResponse]) error { + if err := res.Err; err != nil { + return err + } + resp := res.Item + logutil.CL(ctx).Debug("received response", zap.Stringer("resp", resp)) + evt, needDeliver := p.convertToEvent(resp) + if needDeliver { + logutil.CL(ctx).Debug("generating internal event", zap.Stringer("event", evt)) + p.output <- evt + } + return nil +} + +func (p *prepareStream) stopClientLoop(ctx context.Context) error { + p.stopBgTasks() + err := p.cli.Send(&brpb.PrepareSnapshotBackupRequest{ + Ty: brpb.PrepareSnapshotBackupRequestType_Finish, + }) + if err != nil { + return errors.Annotate(err, "failed to send finish request") + } +recv_loop: + for { + select { + case <-ctx.Done(): + return ctx.Err() + case res, ok := <-p.serverStream: + err := p.onResponse(ctx, res) + if err == io.EOF || !ok { + logutil.CL(ctx).Info("close loop done.", zap.Uint64("store", p.storeID), zap.Bool("is-chan-closed", !ok)) + break recv_loop + } + if err != nil { + return err + } + } + } + return p.clientLoopHandle.Wait() +} + +func (p *prepareStream) clientLoop(ctx context.Context, dur time.Duration) error { + ticker := time.NewTicker(dur / 4) + lastSuccess := time.Unix(0, 0) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + logutil.CL(ctx).Info("client loop exits.", zap.Uint64("store", p.storeID)) + return nil + case res := <-p.serverStream: + if err := p.onResponse(ctx, res); err != nil { + p.sendErr(errors.Annotate(err, "failed to recv from the stream")) + return err + } + case <-ticker.C: + err := p.cli.Send(&brpb.PrepareSnapshotBackupRequest{ + Ty: brpb.PrepareSnapshotBackupRequestType_UpdateLease, + LeaseInSeconds: uint64(dur.Seconds()), + }) + if err != nil { + log.Warn("failed to update the lease loop", logutil.ShortError(err)) + if time.Since(lastSuccess) > dur { + err := errors.Annotate(err, "too many times failed to update the lease, it is probably expired") + p.output <- event{ + ty: eventMiscErr, + storeID: p.storeID, + err: err, + } + return err + } + } else { + lastSuccess = time.Now() + } + } + } +} + +func (p *prepareStream) sendErr(err error) { + p.output <- event{ + ty: eventMiscErr, + storeID: p.storeID, + err: err, + } +} + +func (p *prepareStream) convertToEvent(resp *brpb.PrepareSnapshotBackupResponse) (event, bool) { + switch resp.Ty { + case brpb.PrepareSnapshotBackupEventType_WaitApplyDone: + return event{ + ty: eventWaitApplyDone, + storeID: p.storeID, + region: resp.Region, + err: convertErr(resp.Error), + }, true + case brpb.PrepareSnapshotBackupEventType_UpdateLeaseResult: + if !resp.LastLeaseIsValid { + return event{ + ty: eventMiscErr, + storeID: p.storeID, + err: leaseExpired(), + }, true + } + return event{}, false + } + return event{ + ty: eventMiscErr, + storeID: p.storeID, + err: errors.Annotatef(unsupported(), "unknown response type %v (%d)", + resp.Ty, resp.Ty), + }, true +} diff --git a/br/pkg/logutil/logging.go b/br/pkg/logutil/logging.go index 41b8e135c220f..c8cf01e4fc517 100644 --- a/br/pkg/logutil/logging.go +++ b/br/pkg/logutil/logging.go @@ -272,6 +272,11 @@ func Redact(field zap.Field) zap.Field { return field } +// StringifyRangeOf returns a stringer for the key range. +func StringifyRangeOf(start, end []byte) StringifyRange { + return StringifyRange{StartKey: start, EndKey: end} +} + // StringifyKeys wraps the key range into a stringer. type StringifyKeys []kv.KeyRange diff --git a/br/pkg/task/BUILD.bazel b/br/pkg/task/BUILD.bazel index e947d84cbe5c4..3f36ace461f4c 100644 --- a/br/pkg/task/BUILD.bazel +++ b/br/pkg/task/BUILD.bazel @@ -18,6 +18,7 @@ go_library( deps = [ "//br/pkg/aws", "//br/pkg/backup", + "//br/pkg/backup/prepare_snap", "//br/pkg/checksum", "//br/pkg/common", "//br/pkg/config", @@ -67,6 +68,7 @@ go_library( "@com_github_spf13_pflag//:pflag", "@com_github_tikv_client_go_v2//config", "@com_github_tikv_client_go_v2//oracle", + "@com_github_tikv_client_go_v2//tikv", "@com_github_tikv_pd_client//:client", "@com_google_cloud_go_storage//:storage", "@io_etcd_go_etcd_client_pkg_v3//transport", diff --git a/br/pkg/task/backup_ebs.go b/br/pkg/task/backup_ebs.go index 0b59cf6f061a8..faf8e050d19dc 100644 --- a/br/pkg/task/backup_ebs.go +++ b/br/pkg/task/backup_ebs.go @@ -21,12 +21,12 @@ import ( "github.com/pingcap/log" "github.com/pingcap/tidb/br/pkg/aws" "github.com/pingcap/tidb/br/pkg/backup" + preparesnap "github.com/pingcap/tidb/br/pkg/backup/prepare_snap" "github.com/pingcap/tidb/br/pkg/common" "github.com/pingcap/tidb/br/pkg/config" "github.com/pingcap/tidb/br/pkg/conn" "github.com/pingcap/tidb/br/pkg/conn/util" "github.com/pingcap/tidb/br/pkg/glue" - "github.com/pingcap/tidb/br/pkg/logutil" "github.com/pingcap/tidb/br/pkg/metautil" "github.com/pingcap/tidb/br/pkg/pdutil" "github.com/pingcap/tidb/br/pkg/storage" @@ -35,6 +35,8 @@ import ( "github.com/pingcap/tidb/br/pkg/version" "github.com/pingcap/tidb/util/mathutil" "github.com/spf13/pflag" + "github.com/tikv/client-go/v2/tikv" + "go.uber.org/multierr" "go.uber.org/zap" "golang.org/x/sync/errgroup" "google.golang.org/grpc" @@ -137,43 +139,38 @@ func RunBackupEBS(c context.Context, g glue.Glue, cfg *BackupConfig) error { // Step.1.1 stop scheduler as much as possible. log.Info("starting to remove some PD schedulers and pausing GC", zap.Bool("already-paused-by-operator", cfg.SkipPauseGCAndScheduler)) - var restoreFunc pdutil.UndoFunc + var ( + restoreFunc pdutil.UndoFunc + finalizeOnce sync.Once + preparer = preparesnap.New(preparesnap.CliEnv{ + Cache: tikv.NewRegionCache(mgr.PDClient()), + Mgr: mgr.StoreManager, + }) + goBackupToNormal = func(ctx context.Context) error { + var err error + finalizeOnce.Do(func() { + var restoreE error + if restoreFunc != nil { + restoreE = restoreFunc(ctx) + } + err = multierr.Combine(preparer.Finalize(ctx), restoreE) + }) + return err + } + ) + // NOTE: we need to use the same technique as the `operator` command did. + // But it is impossible for now due to importing cycle. if !cfg.SkipPauseGCAndScheduler { var e error restoreFunc, e = mgr.RemoveAllPDSchedulers(ctx) if e != nil { return errors.Trace(err) } - denyLightning := utils.NewSuspendImporting("backup_ebs_command", mgr.StoreManager) - _, err := denyLightning.DenyAllStores(ctx, utils.DefaultBRGCSafePointTTL) - if err != nil { - return errors.Annotate(err, "lightning from running") + if err := preparer.DriveLoopAndWaitPrepare(ctx); err != nil { + return err } - go func() { - if err := denyLightning.Keeper(ctx, utils.DefaultBRGCSafePointTTL); err != nil { - log.Warn("cannot keep deny importing, the backup archive may not be useable if there were importing.", logutil.ShortError(err)) - } - }() - defer func() { - if ctx.Err() != nil { - log.Warn("context canceled, doing clean work with background context") - ctx = context.Background() - } - if restoreFunc == nil { - return - } - if restoreE := restoreFunc(ctx); restoreE != nil { - log.Warn("failed to restore removed schedulers, you may need to restore them manually", zap.Error(restoreE)) - } - res, err := denyLightning.AllowAllStores(ctx) - if err != nil { - log.Warn("failed to restore importing, you may need to wait until you are able to start importing", zap.Duration("wait_for", utils.DefaultBRGCSafePointTTL)) - } - if err := denyLightning.ConsistentWithPrev(res); err != nil { - log.Warn("lightning hasn't been denied, the backup archive may not be usable.", logutil.ShortError(err)) - } - }() + defer utils.WithCleanUp(nil, 2*time.Minute, goBackupToNormal) } if err := waitAllScheduleStoppedAndNoRegionHole(ctx, cfg.Config, mgr); err != nil { @@ -249,11 +246,8 @@ func RunBackupEBS(c context.Context, g glue.Glue, cfg *BackupConfig) error { if !cfg.SkipPauseGCAndScheduler { log.Info("snapshot started, restore schedule") - if restoreE := restoreFunc(ctx); restoreE != nil { + if restoreE := goBackupToNormal(ctx); restoreE != nil { log.Warn("failed to restore removed schedulers, you may need to restore them manually", zap.Error(restoreE)) - } else { - // Clear the restore func, so we won't execute it many times. - restoreFunc = nil } } diff --git a/br/pkg/task/operator/BUILD.bazel b/br/pkg/task/operator/BUILD.bazel index 83f9f042f6a89..52c99c845b57b 100644 --- a/br/pkg/task/operator/BUILD.bazel +++ b/br/pkg/task/operator/BUILD.bazel @@ -9,6 +9,7 @@ go_library( importpath = "github.com/pingcap/tidb/br/pkg/task/operator", visibility = ["//visibility:public"], deps = [ + "//br/pkg/backup/prepare_snap", "//br/pkg/errors", "//br/pkg/logutil", "//br/pkg/pdutil", @@ -17,6 +18,7 @@ go_library( "@com_github_pingcap_errors//:errors", "@com_github_pingcap_log//:log", "@com_github_spf13_pflag//:pflag", + "@com_github_tikv_client_go_v2//tikv", "@org_golang_google_grpc//keepalive", "@org_golang_x_sync//errgroup", "@org_uber_go_multierr//:multierr", diff --git a/br/pkg/task/operator/cmd.go b/br/pkg/task/operator/cmd.go index 1917a9acd3b1b..6ff0e01f1fe6a 100644 --- a/br/pkg/task/operator/cmd.go +++ b/br/pkg/task/operator/cmd.go @@ -8,17 +8,20 @@ import ( "fmt" "math/rand" "os" + "runtime/debug" "strings" "sync" "time" "github.com/pingcap/errors" "github.com/pingcap/log" + preparesnap "github.com/pingcap/tidb/br/pkg/backup/prepare_snap" berrors "github.com/pingcap/tidb/br/pkg/errors" "github.com/pingcap/tidb/br/pkg/logutil" "github.com/pingcap/tidb/br/pkg/pdutil" "github.com/pingcap/tidb/br/pkg/task" "github.com/pingcap/tidb/br/pkg/utils" + "github.com/tikv/client-go/v2/tikv" "go.uber.org/multierr" "go.uber.org/zap" "golang.org/x/sync/errgroup" @@ -55,6 +58,18 @@ func (cx *AdaptEnvForSnapshotBackupContext) cleanUpWithRetErr(errOut *error, f f } } +func (cx *AdaptEnvForSnapshotBackupContext) run(f func() error) { + cx.rdGrp.Add(1) + buf := debug.Stack() + cx.runGrp.Go(func() error { + err := f() + if err != nil { + log.Error("A task failed.", zap.Error(err), zap.ByteString("task-created-at", buf)) + } + return err + }) +} + type AdaptEnvForSnapshotBackupContext struct { context.Context @@ -120,11 +135,9 @@ func AdaptEnvForSnapshotBackup(ctx context.Context, cfg *PauseGcConfig) error { } defer cx.Close() - cx.rdGrp.Add(3) - - eg.Go(func() error { return pauseGCKeeper(cx) }) - eg.Go(func() error { return pauseSchedulerKeeper(cx) }) - eg.Go(func() error { return pauseImporting(cx) }) + cx.run(func() error { return pauseGCKeeper(cx) }) + cx.run(func() error { return pauseSchedulerKeeper(cx) }) + cx.run(func() error { return pauseAdminAndWaitApply(cx) }) go func() { cx.rdGrp.Wait() if cfg.OnAllReady != nil { @@ -141,6 +154,34 @@ func AdaptEnvForSnapshotBackup(ctx context.Context, cfg *PauseGcConfig) error { return eg.Wait() } +func pauseAdminAndWaitApply(cx *AdaptEnvForSnapshotBackupContext) error { + env := preparesnap.CliEnv{ + Cache: tikv.NewRegionCache(cx.pdMgr.GetPDClient()), + Mgr: cx.kvMgr, + } + defer env.Cache.Close() + retryEnv := preparesnap.RetryAndSplitRequestEnv{Env: env} + begin := time.Now() + prep := preparesnap.New(retryEnv) + prep.LeaseDuration = cx.cfg.TTL + + defer cx.cleanUpWith(func(ctx context.Context) { + if err := prep.Finalize(ctx); err != nil { + logutil.CL(ctx).Warn("failed to finalize the prepare stream", logutil.ShortError(err)) + } + }) + + // We must use our own context here, or once we are cleaning up the client will be invalid. + myCtx := logutil.ContextWithField(context.Background(), zap.String("category", "pause_admin_and_wait_apply")) + if err := prep.DriveLoopAndWaitPrepare(myCtx); err != nil { + return err + } + + cx.ReadyL("pause_admin_and_wait_apply", zap.Stringer("take", time.Since(begin))) + <-cx.Done() + return nil +} + func getCallerName() string { name, err := os.Hostname() if err != nil { @@ -149,44 +190,6 @@ func getCallerName() string { return fmt.Sprintf("operator@%sT%d#%d", name, time.Now().Unix(), os.Getpid()) } -func pauseImporting(cx *AdaptEnvForSnapshotBackupContext) error { - suspendLightning := utils.NewSuspendImporting(getCallerName(), cx.kvMgr) - _, err := utils.WithRetryV2(cx, cx.GetBackOffer("suspend_lightning"), func(_ context.Context) (map[uint64]bool, error) { - return suspendLightning.DenyAllStores(cx, cx.cfg.TTL) - }) - if err != nil { - return errors.Trace(err) - } - cx.ReadyL("pause_lightning") - cx.runGrp.Go(func() (err error) { - defer cx.cleanUpWithRetErr(&err, func(ctx context.Context) error { - if ctx.Err() != nil { - //nolint: all_revive,revive // There is a false positive on returning in `defer`. - return errors.Annotate(ctx.Err(), "cleaning up timed out") - } - res, err := utils.WithRetryV2(ctx, cx.GetBackOffer("restore_lightning"), - //nolint: all_revive,revive // There is a false positive on returning in `defer`. - func(ctx context.Context) (map[uint64]bool, error) { return suspendLightning.AllowAllStores(ctx) }) - if err != nil { - //nolint: all_revive,revive // There is a false positive on returning in `defer`. - return errors.Annotatef(err, "failed to allow all stores") - } - //nolint: all_revive,revive // There is a false positive on returning in `defer`. - return suspendLightning.ConsistentWithPrev(res) - }) - - err = suspendLightning.Keeper(cx, cx.cfg.TTL) - if errors.Cause(err) != context.Canceled { - logutil.CL(cx).Warn("keeper encounters error.", logutil.ShortError(err)) - return err - } - // Clean up the canceled error. - err = nil - return - }) - return nil -} - func pauseGCKeeper(cx *AdaptEnvForSnapshotBackupContext) (err error) { // Note: should we remove the service safepoint as soon as this exits? sp := utils.BRServiceSafePoint{ diff --git a/br/pkg/utils/BUILD.bazel b/br/pkg/utils/BUILD.bazel index 5a41ab62597ac..62ba7c85b7f14 100644 --- a/br/pkg/utils/BUILD.bazel +++ b/br/pkg/utils/BUILD.bazel @@ -83,11 +83,10 @@ go_test( "safe_point_test.go", "schema_test.go", "sensitive_test.go", - "suspend_importing_test.go", ], embed = [":utils"], flaky = True, - shard_count = 37, + shard_count = 33, deps = [ "//br/pkg/errors", "//br/pkg/metautil", @@ -107,15 +106,11 @@ go_test( "@com_github_pingcap_errors//:errors", "@com_github_pingcap_kvproto//pkg/brpb", "@com_github_pingcap_kvproto//pkg/encryptionpb", - "@com_github_pingcap_kvproto//pkg/import_sstpb", - "@com_github_pingcap_kvproto//pkg/metapb", "@com_github_stretchr_testify//require", "@com_github_tikv_client_go_v2//tikv", "@com_github_tikv_pd_client//:client", - "@org_golang_google_grpc//:grpc", "@org_golang_google_grpc//codes", "@org_golang_google_grpc//status", - "@org_golang_x_sync//errgroup", "@org_uber_go_goleak//:goleak", "@org_uber_go_multierr//:multierr", ], diff --git a/br/pkg/utils/misc.go b/br/pkg/utils/misc.go index 8460b9b39cd82..656c8300e0299 100644 --- a/br/pkg/utils/misc.go +++ b/br/pkg/utils/misc.go @@ -20,9 +20,12 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/kvproto/pkg/metapb" + "github.com/pingcap/log" berrors "github.com/pingcap/tidb/br/pkg/errors" "github.com/pingcap/tidb/parser/mysql" "github.com/pingcap/tidb/parser/types" + "go.uber.org/multierr" + "go.uber.org/zap" "google.golang.org/grpc" "google.golang.org/grpc/credentials" ) @@ -130,3 +133,24 @@ func CheckStoreLiveness(s *metapb.Store) error { } return nil } + +// WithCleanUp runs a function with a timeout, and register its error to its argument if there is one. +// This is useful while you want to run some must run but error-prone code in a defer context. +// Simple usage: +// +// func foo() (err error) { +// defer WithCleanUp(&err, time.Second, func(ctx context.Context) error { +// // do something +// return nil +// }) +// } +func WithCleanUp(errOut *error, timeout time.Duration, fn func(context.Context) error) { + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + err := fn(ctx) + if errOut != nil { + *errOut = multierr.Combine(err, *errOut) + } else if err != nil { + log.Warn("Encountered but ignored error while cleaning up.", zap.Error(err)) + } +} diff --git a/br/pkg/utils/store_manager.go b/br/pkg/utils/store_manager.go index 9cf0c3c27ee9c..3823880678395 100644 --- a/br/pkg/utils/store_manager.go +++ b/br/pkg/utils/store_manager.go @@ -163,7 +163,7 @@ func (mgr *StoreManager) getGrpcConnLocked(ctx context.Context, storeID uint64) return conn, nil } -func (mgr *StoreManager) WithConn(ctx context.Context, storeID uint64, f func(*grpc.ClientConn)) error { +func (mgr *StoreManager) TryWithConn(ctx context.Context, storeID uint64, f func(*grpc.ClientConn) error) error { if ctx.Err() != nil { return errors.Trace(ctx.Err()) } @@ -173,8 +173,7 @@ func (mgr *StoreManager) WithConn(ctx context.Context, storeID uint64, f func(*g if conn, ok := mgr.grpcClis.clis[storeID]; ok { // Find a cached backup client. - f(conn) - return nil + return f(conn) } conn, err := mgr.getGrpcConnLocked(ctx, storeID) @@ -183,8 +182,11 @@ func (mgr *StoreManager) WithConn(ctx context.Context, storeID uint64, f func(*g } // Cache the conn. mgr.grpcClis.clis[storeID] = conn - f(conn) - return nil + return f(conn) +} + +func (mgr *StoreManager) WithConn(ctx context.Context, storeID uint64, f func(*grpc.ClientConn)) error { + return mgr.TryWithConn(ctx, storeID, func(cc *grpc.ClientConn) error { f(cc); return nil }) } // ResetBackupClient reset the connection for backup client. diff --git a/br/pkg/utils/suspend_importing_test.go b/br/pkg/utils/suspend_importing_test.go deleted file mode 100644 index 9ce3f271a169e..0000000000000 --- a/br/pkg/utils/suspend_importing_test.go +++ /dev/null @@ -1,210 +0,0 @@ -// Copyright 2023 PingCAP, Inc. Licensed under Apache-2.0. -package utils_test - -import ( - "context" - "fmt" - "sync" - "testing" - "time" - - "github.com/pingcap/errors" - "github.com/pingcap/kvproto/pkg/import_sstpb" - "github.com/pingcap/kvproto/pkg/metapb" - "github.com/pingcap/tidb/br/pkg/utils" - "github.com/stretchr/testify/require" - "golang.org/x/sync/errgroup" - "google.golang.org/grpc" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/status" -) - -type ImportTargetStore struct { - mu sync.Mutex - Id uint64 - LastSuccessDenyCall time.Time - SuspendImportFor time.Duration - SuspendedImport bool - - ErrGen func() error -} - -type ImportTargetStores struct { - mu sync.Mutex - items map[uint64]*ImportTargetStore -} - -func initWithIDs(ids []int) *ImportTargetStores { - ss := &ImportTargetStores{ - items: map[uint64]*ImportTargetStore{}, - } - for _, id := range ids { - store := new(ImportTargetStore) - store.Id = uint64(id) - ss.items[uint64(id)] = store - } - return ss -} - -func (s *ImportTargetStores) GetAllStores(ctx context.Context) ([]*metapb.Store, error) { - s.mu.Lock() - defer s.mu.Unlock() - - stores := make([]*metapb.Store, 0, len(s.items)) - for _, store := range s.items { - stores = append(stores, &metapb.Store{Id: store.Id}) - } - return stores, nil -} - -func (s *ImportTargetStores) GetDenyLightningClient(ctx context.Context, storeID uint64) (utils.SuspendImportingClient, error) { - s.mu.Lock() - defer s.mu.Unlock() - - store, ok := s.items[storeID] - if !ok { - return nil, errors.Trace(fmt.Errorf("store %d not found", storeID)) - } - - return store, nil -} - -// Temporarily disable ingest / download / write for data listeners don't support catching import data. -func (s *ImportTargetStore) SuspendImportRPC(ctx context.Context, in *import_sstpb.SuspendImportRPCRequest, opts ...grpc.CallOption) (*import_sstpb.SuspendImportRPCResponse, error) { - s.mu.Lock() - defer s.mu.Unlock() - - if s.ErrGen != nil { - if err := s.ErrGen(); err != nil { - return nil, s.ErrGen() - } - } - - suspended := s.SuspendedImport - if in.ShouldSuspendImports { - s.SuspendedImport = true - s.SuspendImportFor = time.Duration(in.DurationInSecs) * time.Second - s.LastSuccessDenyCall = time.Now() - } else { - s.SuspendedImport = false - } - return &import_sstpb.SuspendImportRPCResponse{ - AlreadySuspended: suspended, - }, nil -} - -func (s *ImportTargetStores) assertAllStoresDenied(t *testing.T) { - s.mu.Lock() - defer s.mu.Unlock() - - for _, store := range s.items { - func() { - store.mu.Lock() - defer store.mu.Unlock() - - require.True(t, store.SuspendedImport, "ID = %d", store.Id) - require.Less(t, time.Since(store.LastSuccessDenyCall), store.SuspendImportFor, "ID = %d", store.Id) - }() - } -} - -func TestBasic(t *testing.T) { - req := require.New(t) - - ss := initWithIDs([]int{1, 4, 5}) - deny := utils.NewSuspendImporting(t.Name(), ss) - - ctx := context.Background() - res, err := deny.DenyAllStores(ctx, 10*time.Second) - req.NoError(err) - req.Error(deny.ConsistentWithPrev(res)) - for id, inner := range ss.items { - req.True(inner.SuspendedImport, "at %d", id) - req.Equal(inner.SuspendImportFor, 10*time.Second, "at %d", id) - } - - res, err = deny.DenyAllStores(ctx, 10*time.Second) - req.NoError(err) - req.NoError(deny.ConsistentWithPrev(res)) - - res, err = deny.AllowAllStores(ctx) - req.NoError(err) - req.NoError(deny.ConsistentWithPrev(res)) -} - -func TestKeeperError(t *testing.T) { - req := require.New(t) - - ctx := context.Background() - ss := initWithIDs([]int{1, 4, 5}) - deny := utils.NewSuspendImporting(t.Name(), ss) - ttl := time.Second - - now := time.Now() - triggeredErr := uint32(0) - _, err := deny.DenyAllStores(ctx, ttl) - req.NoError(err) - - ss.items[4].ErrGen = func() error { - if time.Since(now) > 600*time.Millisecond { - return nil - } - triggeredErr += 1 - return status.Error(codes.Unavailable, "the store is slacking.") - } - - cx, cancel := context.WithCancel(ctx) - - wg := new(errgroup.Group) - wg.Go(func() error { return deny.Keeper(cx, ttl) }) - time.Sleep(ttl) - cancel() - req.ErrorIs(wg.Wait(), context.Canceled) - req.Positive(triggeredErr) -} - -func TestKeeperErrorExit(t *testing.T) { - req := require.New(t) - - ctx := context.Background() - ss := initWithIDs([]int{1, 4, 5}) - deny := utils.NewSuspendImporting(t.Name(), ss) - ttl := time.Second - - triggeredErr := uint32(0) - _, err := deny.DenyAllStores(ctx, ttl) - req.NoError(err) - - ss.items[4].ErrGen = func() error { - triggeredErr += 1 - return status.Error(codes.Unavailable, "the store is slacking.") - } - - wg := new(errgroup.Group) - wg.Go(func() error { return deny.Keeper(ctx, ttl) }) - time.Sleep(ttl) - req.Error(wg.Wait()) - req.Positive(triggeredErr) -} - -func TestKeeperCalled(t *testing.T) { - req := require.New(t) - - ctx := context.Background() - ss := initWithIDs([]int{1, 4, 5}) - deny := utils.NewSuspendImporting(t.Name(), ss) - ttl := 1 * time.Second - - _, err := deny.DenyAllStores(ctx, ttl) - req.NoError(err) - - cx, cancel := context.WithCancel(ctx) - wg := new(errgroup.Group) - wg.Go(func() error { return deny.Keeper(cx, ttl) }) - for i := 0; i < 20; i++ { - ss.assertAllStoresDenied(t) - time.Sleep(ttl / 10) - } - cancel() - req.ErrorIs(wg.Wait(), context.Canceled) -} diff --git a/br/pkg/utils/worker.go b/br/pkg/utils/worker.go index ba69752ca5d23..2adb42ee88bb2 100644 --- a/br/pkg/utils/worker.go +++ b/br/pkg/utils/worker.go @@ -130,3 +130,32 @@ func PanicToErr(err *error) { log.Warn("PanicToErr: panicked, recovering and returning error", zap.StackSkip("stack", 1), logutil.ShortError(*err)) } } + +type Result[T any] struct { + Err error + Item T +} + +func AsyncStreamBy[T any](generator func() (T, error)) <-chan Result[T] { + out := make(chan Result[T]) + go func() { + defer close(out) + for { + item, err := generator() + if err != nil { + out <- Result[T]{Err: err} + return + } + out <- Result[T]{Item: item} + } + }() + return out +} + +func BuildWorkerTokenChannel(size uint) chan struct{} { + ch := make(chan struct{}, size) + for i := 0; i < int(size); i += 1 { + ch <- struct{}{} + } + return ch +} diff --git a/go.mod b/go.mod index c3caa749de93c..bdb685585c384 100644 --- a/go.mod +++ b/go.mod @@ -71,7 +71,7 @@ require ( github.com/pingcap/errors v0.11.5-0.20220729040631-518f63d66278 github.com/pingcap/failpoint v0.0.0-20220423142525-ae43b7f4e5c3 github.com/pingcap/fn v0.0.0-20200306044125-d5540d389059 - github.com/pingcap/kvproto v0.0.0-20230928035022-1bdcc25ed63c + github.com/pingcap/kvproto v0.0.0-20240112060601-a0e3fbb1eeee github.com/pingcap/log v1.1.1-0.20221116035753-734d527bc87c github.com/pingcap/sysutil v0.0.0-20220114020952-ea68d2dbf5b4 github.com/pingcap/tidb/parser v0.0.0-20211011031125-9b13dc409c5e diff --git a/go.sum b/go.sum index c2eacd21aa432..49c491bb84aea 100644 --- a/go.sum +++ b/go.sum @@ -781,8 +781,8 @@ github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989 h1:surzm05a8C9dN github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989/go.mod h1:O17XtbryoCJhkKGbT62+L2OlrniwqiGLSqrmdHCMzZw= github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w= github.com/pingcap/kvproto v0.0.0-20230726063044-73d6d7f3756b/go.mod h1:OYtxs0786qojVTmkVeufx93xe+jUgm56GUYRIKnmaGI= -github.com/pingcap/kvproto v0.0.0-20230928035022-1bdcc25ed63c h1:tBKPWWqgWEBs04BV4UN7RhtUkZDs0oz+WyMbtRDVtL8= -github.com/pingcap/kvproto v0.0.0-20230928035022-1bdcc25ed63c/go.mod h1:OYtxs0786qojVTmkVeufx93xe+jUgm56GUYRIKnmaGI= +github.com/pingcap/kvproto v0.0.0-20240112060601-a0e3fbb1eeee h1:ZWFeZNN+6poqqEQ3XU6M/Gw6oiNexbDD3yqIZ05GxlM= +github.com/pingcap/kvproto v0.0.0-20240112060601-a0e3fbb1eeee/go.mod h1:OYtxs0786qojVTmkVeufx93xe+jUgm56GUYRIKnmaGI= github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8= github.com/pingcap/log v0.0.0-20200511115504-543df19646ad/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8= github.com/pingcap/log v0.0.0-20210625125904-98ed8e2eb1c7/go.mod h1:8AanEdAHATuRurdGxZXBz0At+9avep+ub7U1AGYLIMM= diff --git a/store/mockstore/unistore/tikv/mock_region.go b/store/mockstore/unistore/tikv/mock_region.go index 2e3e4fa01ddb1..f2bdbe9e71c3e 100644 --- a/store/mockstore/unistore/tikv/mock_region.go +++ b/store/mockstore/unistore/tikv/mock_region.go @@ -392,6 +392,19 @@ func (rm *MockRegionManager) SplitKeys(start, end kv.Key, count int) { } } +// SplitArbitrary splits the cluster by the split point manually provided. +// The keys provided are raw key. +func (rm *MockRegionManager) SplitArbitrary(keys ...[]byte) { + splitKeys := make([][]byte, 0, len(keys)) + for _, key := range keys { + encKey := codec.EncodeBytes(nil, key) + splitKeys = append(splitKeys, encKey) + } + if _, err := rm.splitKeys(splitKeys); err != nil { + panic(err) + } +} + // SplitRegion implements the RegionManager interface. func (rm *MockRegionManager) SplitRegion(req *kvrpcpb.SplitRegionRequest) *kvrpcpb.SplitRegionResponse { if _, err := rm.GetRegionFromCtx(req.Context); err != nil { diff --git a/tests/realtikvtest/brietest/BUILD.bazel b/tests/realtikvtest/brietest/BUILD.bazel index 6bd9e8bf7740d..683dd0ca91486 100644 --- a/tests/realtikvtest/brietest/BUILD.bazel +++ b/tests/realtikvtest/brietest/BUILD.bazel @@ -23,8 +23,10 @@ go_test( "//testkit", "//testkit/testsetup", "//tests/realtikvtest", + "@com_github_google_uuid//:uuid", "@com_github_pingcap_failpoint//:failpoint", "@com_github_pingcap_kvproto//pkg/import_sstpb", + "@com_github_pingcap_kvproto//pkg/kvrpcpb", "@com_github_pingcap_tipb//go-binlog", "@com_github_stretchr_testify//require", "@com_github_tikv_client_go_v2//oracle", diff --git a/tests/realtikvtest/brietest/operator_test.go b/tests/realtikvtest/brietest/operator_test.go index 3e3010132c297..99b4a382f5bd8 100644 --- a/tests/realtikvtest/brietest/operator_test.go +++ b/tests/realtikvtest/brietest/operator_test.go @@ -22,7 +22,9 @@ import ( "testing" "time" + "github.com/google/uuid" "github.com/pingcap/kvproto/pkg/import_sstpb" + "github.com/pingcap/kvproto/pkg/kvrpcpb" "github.com/pingcap/tidb/br/pkg/task" "github.com/pingcap/tidb/br/pkg/task/operator" "github.com/stretchr/testify/require" @@ -83,15 +85,55 @@ func verifyLightningStopped(t *require.Assertions, cfg operator.PauseGcConfig) { pdc, err := pd.NewClient(cfg.Config.PD, pd.SecurityOption{}) t.NoError(err) defer pdc.Close() - stores, err := pdc.GetAllStores(cx, pd.WithExcludeTombstone()) t.NoError(err) - s := stores[0] - conn, err := grpc.DialContext(cx, s.Address, grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithBlock()) + region, err := pdc.GetRegion(cx, []byte("a")) + t.NoError(err) + store, err := pdc.GetStore(cx, region.Leader.StoreId) + t.NoError(err) + conn, err := grpc.DialContext(cx, store.Address, grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithBlock()) t.NoError(err) ingestCli := import_sstpb.NewImportSSTClient(conn) - res, err := ingestCli.Ingest(cx, &import_sstpb.IngestRequest{}) + wcli, err := ingestCli.Write(cx) t.NoError(err) - t.NotNil(res.GetError(), "res = %s", res) + u := uuid.New() + meta := &import_sstpb.SSTMeta{ + Uuid: u[:], + RegionId: region.Meta.GetId(), + RegionEpoch: region.Meta.GetRegionEpoch(), + Range: &import_sstpb.Range{ + Start: []byte("a"), + End: []byte("b"), + }, + } + rpcCx := kvrpcpb.Context{ + RegionId: region.Meta.GetId(), + RegionEpoch: region.Meta.GetRegionEpoch(), + Peer: region.Leader, + } + t.NoError(wcli.Send(&import_sstpb.WriteRequest{Chunk: &import_sstpb.WriteRequest_Meta{Meta: meta}})) + phy, log, err := pdc.GetTS(cx) + t.NoError(err) + wb := &import_sstpb.WriteBatch{ + CommitTs: oracle.ComposeTS(phy, log), + Pairs: []*import_sstpb.Pair{ + {Key: []byte("a1"), Value: []byte("You may wondering, why here is such a key.")}, + {Key: []byte("a2"), Value: []byte("And what if this has been really imported?")}, + {Key: []byte("a3"), Value: []byte("I dunno too. But we need to have a try.")}, + }, + } + t.NoError(wcli.Send(&import_sstpb.WriteRequest{Chunk: &import_sstpb.WriteRequest_Batch{Batch: wb}})) + resp, err := wcli.CloseAndRecv() + t.NoError(err) + t.Nil(resp.Error, "res = %s", resp) + realMeta := resp.Metas[0] + + res, err := ingestCli.Ingest(cx, &import_sstpb.IngestRequest{ + Context: &rpcCx, + Sst: realMeta, + }) + t.NoError(err) + t.Contains(res.GetError().GetMessage(), "Suspended", "res = %s", res) + t.NotNil(res.GetError().GetServerIsBusy(), "res = %s", res) } func verifySchedulersStopped(t *require.Assertions, cfg operator.PauseGcConfig) { @@ -182,10 +224,10 @@ func TestOperator(t *testing.T) { } }, 10*time.Second, time.Second) - cancel() verifyGCStopped(req, cfg) verifyLightningStopped(req, cfg) verifySchedulersStopped(req, cfg) + cancel() req.Eventually(func() bool { select {