diff --git a/br/pkg/restore/import_retry.go b/br/pkg/restore/import_retry.go index 3882512a1f5bd..17c706c9e4444 100644 --- a/br/pkg/restore/import_retry.go +++ b/br/pkg/restore/import_retry.go @@ -234,7 +234,7 @@ func (r *RPCResult) StrategyForRetryGoError() RetryStrategy { if r.Err == nil { return StrategyGiveUp } - + // we should unwrap the error or we cannot get the write gRPC status. if gRPCErr, ok := status.FromError(errors.Cause(r.Err)); ok { switch gRPCErr.Code() { diff --git a/br/pkg/streamhelper/advancer.go b/br/pkg/streamhelper/advancer.go index 68fff468a5d29..c940759300496 100644 --- a/br/pkg/streamhelper/advancer.go +++ b/br/pkg/streamhelper/advancer.go @@ -74,7 +74,9 @@ type CheckpointAdvancer struct { type advancerState interface { // Note: // Go doesn't support sealed classes or ADTs currently. + // (it can only be used at generic constraints...) // Leave it empty for now. + // ~*fullScan | ~*updateSmallTree } @@ -92,6 +94,7 @@ type updateSmallTree struct { consistencyCheckTick int } +// NewCheckpointAdvancer creates a checkpoint advancer with the env. func NewCheckpointAdvancer(env Env) *CheckpointAdvancer { return &CheckpointAdvancer{ env: env, @@ -337,6 +340,8 @@ func (c *CheckpointAdvancer) consumeAllTask(ctx context.Context, ch <-chan TaskE } } +// beginListenTaskChange bootstraps the initial task set, +// and returns a channel respecting the change of tasks. func (c *CheckpointAdvancer) beginListenTaskChange(ctx context.Context) (<-chan TaskEvent, error) { ch := make(chan TaskEvent, 1024) if err := c.env.Begin(ctx, ch); err != nil { @@ -349,6 +354,8 @@ func (c *CheckpointAdvancer) beginListenTaskChange(ctx context.Context) (<-chan return ch, nil } +// StartTaskListener starts the task listener for the advancer. +// When no task detected, advancer would do nothing, please call this before begin the tick loop. func (c *CheckpointAdvancer) StartTaskListener(ctx context.Context) { cx, cancel := context.WithCancel(ctx) var ch <-chan TaskEvent diff --git a/br/pkg/streamhelper/collector.go b/br/pkg/streamhelper/collector.go index fdaf525dbaa89..bafa5964ae7fa 100644 --- a/br/pkg/streamhelper/collector.go +++ b/br/pkg/streamhelper/collector.go @@ -239,7 +239,7 @@ type clusterCollector struct { } // NewClusterCollector creates a new cluster collector. -// collectors are the structure transform region information to checkpoint information, +// collectors are the structure transform region information to checkpoint information, // by requesting the checkpoint of regions in the store. func NewClusterCollector(ctx context.Context, srv LogBackupService) *clusterCollector { cx, cancel := context.WithCancel(ctx) diff --git a/br/pkg/streamhelper/tsheap.go b/br/pkg/streamhelper/tsheap.go index 74770e3e5fa54..434f00dd2d707 100644 --- a/br/pkg/streamhelper/tsheap.go +++ b/br/pkg/streamhelper/tsheap.go @@ -97,8 +97,8 @@ func (rst *RangesSharesTS) Less(other btree.Item) bool { return rst.TS < other.(*RangesSharesTS).TS } -// Checkpoints is a heap that collectes all checkpoints of -// regions, it supports query the latest checkpoint fastly. +// Checkpoints is a heap that collects all checkpoints of +// regions, it supports query the latest checkpoint fast. // This structure is thread safe. type Checkpoints struct { tree *btree.BTree @@ -210,7 +210,7 @@ func (h *Checkpoints) ConsistencyCheck() error { r := CollapseRanges(len(ranges), func(i int) kv.KeyRange { return ranges[i] }) if len(r) != 1 || len(r[0].StartKey) != 0 || len(r[0].EndKey) != 0 { return errors.Annotatef(berrors.ErrPiTRMalformedMetadata, - "the region tree cannot cover the key space, collpased: %s", logutil.StringifyKeys(r)) + "the region tree cannot cover the key space, collapsed: %s", logutil.StringifyKeys(r)) } return nil } diff --git a/config/config.go b/config/config.go index 83cc1604d4678..3cdf8775539b7 100644 --- a/config/config.go +++ b/config/config.go @@ -414,6 +414,8 @@ func (b *AtomicBool) UnmarshalText(text []byte) error { return nil } +// LogBackup is the config for log backup service. +// For now, it includes the embed advancer. type LogBackup struct { Advancer logbackupconf.Config `toml:"advancer" json:"advancer"` Enabled bool `toml:"enabled" json:"enabled"` diff --git a/domain/domain.go b/domain/domain.go index b3aa1722fa0c8..b278b06ded09f 100644 --- a/domain/domain.go +++ b/domain/domain.go @@ -881,7 +881,7 @@ func (do *Domain) Init(ddlLease time.Duration, sysExecutorFactory func(*Domain) do.wg.Add(1) go do.topologySyncerKeeper() } - err = do.InitLogBackup(ctx, pdClient) + err = do.initLogBackup(ctx, pdClient) if err != nil { return err } @@ -889,7 +889,7 @@ func (do *Domain) Init(ddlLease time.Duration, sysExecutorFactory func(*Domain) return nil } -func (do *Domain) InitLogBackup(ctx context.Context, pdClient pd.Client) error { +func (do *Domain) initLogBackup(ctx context.Context, pdClient pd.Client) error { cfg := config.GetGlobalConfig() if cfg.LogBackup.Enabled { env, err := streamhelper.TiDBEnv(pdClient, do.etcdClient, cfg) diff --git a/go.mod b/go.mod index 1ab50a370ee6b..7de5c6db9ffa5 100644 --- a/go.mod +++ b/go.mod @@ -47,7 +47,7 @@ require ( github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c github.com/pingcap/failpoint v0.0.0-20220423142525-ae43b7f4e5c3 github.com/pingcap/fn v0.0.0-20200306044125-d5540d389059 - github.com/pingcap/kvproto v0.0.0-20220517085838-12e2f5a9d167 + github.com/pingcap/kvproto v0.0.0-20220705090230-a5d4ffd2ba33 github.com/pingcap/log v1.1.0 github.com/pingcap/sysutil v0.0.0-20220114020952-ea68d2dbf5b4 github.com/pingcap/tidb/parser v0.0.0-20211011031125-9b13dc409c5e diff --git a/go.sum b/go.sum index 86e3125807621..3b0289e1efd3d 100644 --- a/go.sum +++ b/go.sum @@ -637,8 +637,9 @@ github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989 h1:surzm05a8C9dN github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989/go.mod h1:O17XtbryoCJhkKGbT62+L2OlrniwqiGLSqrmdHCMzZw= github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w= github.com/pingcap/kvproto v0.0.0-20220302110454-c696585a961b/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI= -github.com/pingcap/kvproto v0.0.0-20220517085838-12e2f5a9d167 h1:dsMpneacHyuVslSVndgUfJKrXFNG7VPdXip2ulG6glo= github.com/pingcap/kvproto v0.0.0-20220517085838-12e2f5a9d167/go.mod h1:OYtxs0786qojVTmkVeufx93xe+jUgm56GUYRIKnmaGI= +github.com/pingcap/kvproto v0.0.0-20220705090230-a5d4ffd2ba33 h1:VKMmvYhtG28j1sCCBdq4s+V9UOYqNgQ6CQviQwOgTeg= +github.com/pingcap/kvproto v0.0.0-20220705090230-a5d4ffd2ba33/go.mod h1:OYtxs0786qojVTmkVeufx93xe+jUgm56GUYRIKnmaGI= github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8= github.com/pingcap/log v0.0.0-20200511115504-543df19646ad/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8= github.com/pingcap/log v0.0.0-20210625125904-98ed8e2eb1c7/go.mod h1:8AanEdAHATuRurdGxZXBz0At+9avep+ub7U1AGYLIMM= diff --git a/metrics/log_backup.go b/metrics/log_backup.go index 619f2ebea59b7..b477f447c2dbb 100644 --- a/metrics/log_backup.go +++ b/metrics/log_backup.go @@ -18,6 +18,8 @@ import ( "github.com/prometheus/client_golang/prometheus" ) +// log backup metrics. +// see the `Help` field for details. var ( LastCheckpoint = prometheus.NewGaugeVec(prometheus.GaugeOpts{ Namespace: "tidb", diff --git a/store/mockstore/unistore/cophandler/cop_handler.go b/store/mockstore/unistore/cophandler/cop_handler.go index fd0e2b81ecf54..2b37cffcf1cef 100644 --- a/store/mockstore/unistore/cophandler/cop_handler.go +++ b/store/mockstore/unistore/cophandler/cop_handler.go @@ -432,7 +432,7 @@ func buildRespWithMPPExec(chunks []tipb.Chunk, counts, ndvs []int64, exec mppExe } } resp.ExecDetails = &kvrpcpb.ExecDetails{ - TimeDetail: &kvrpcpb.TimeDetail{ProcessWallTimeMs: int64(dur / time.Millisecond)}, + TimeDetail: &kvrpcpb.TimeDetail{ProcessWallTimeMs: uint64(dur / time.Millisecond)}, } resp.ExecDetailsV2 = &kvrpcpb.ExecDetailsV2{ TimeDetail: resp.ExecDetails.TimeDetail,