diff --git a/ddl/attributes_sql_test.go b/ddl/attributes_sql_test.go index 95f881e6fb3fe..2d2685ffd06b2 100644 --- a/ddl/attributes_sql_test.go +++ b/ddl/attributes_sql_test.go @@ -269,7 +269,7 @@ PARTITION BY RANGE (c) ( func TestFlashbackTable(t *testing.T) { store, dom := testkit.CreateMockStoreAndDomain(t) - _, err := infosync.GlobalInfoSyncerInit(context.Background(), dom.DDL().GetID(), dom.ServerID, dom.GetEtcdClient(), true) + _, err := infosync.GlobalInfoSyncerInit(context.Background(), dom.DDL().GetID(), dom.ServerID, dom.GetEtcdClient(), dom.GetEtcdClient(), true) require.NoError(t, err) tk := testkit.NewTestKit(t, store) tk.MustExec("use test") @@ -327,7 +327,7 @@ PARTITION BY RANGE (c) ( func TestDropTable(t *testing.T) { store, dom := testkit.CreateMockStoreAndDomain(t) - _, err := infosync.GlobalInfoSyncerInit(context.Background(), dom.DDL().GetID(), dom.ServerID, dom.GetEtcdClient(), true) + _, err := infosync.GlobalInfoSyncerInit(context.Background(), dom.DDL().GetID(), dom.ServerID, dom.GetEtcdClient(), dom.GetEtcdClient(), true) require.NoError(t, err) tk := testkit.NewTestKit(t, store) tk.MustExec("use test") @@ -380,7 +380,7 @@ PARTITION BY RANGE (c) ( func TestCreateWithSameName(t *testing.T) { store, dom := testkit.CreateMockStoreAndDomain(t) - _, err := infosync.GlobalInfoSyncerInit(context.Background(), dom.DDL().GetID(), dom.ServerID, dom.GetEtcdClient(), true) + _, err := infosync.GlobalInfoSyncerInit(context.Background(), dom.DDL().GetID(), dom.ServerID, dom.GetEtcdClient(), dom.GetEtcdClient(), true) require.NoError(t, err) tk := testkit.NewTestKit(t, store) tk.MustExec("use test") @@ -444,7 +444,7 @@ PARTITION BY RANGE (c) ( func TestPartition(t *testing.T) { store, dom := testkit.CreateMockStoreAndDomain(t) - _, err := infosync.GlobalInfoSyncerInit(context.Background(), dom.DDL().GetID(), dom.ServerID, dom.GetEtcdClient(), true) + _, err := infosync.GlobalInfoSyncerInit(context.Background(), dom.DDL().GetID(), dom.ServerID, dom.GetEtcdClient(), dom.GetEtcdClient(), true) require.NoError(t, err) tk := testkit.NewTestKit(t, store) tk.MustExec("use test") @@ -504,7 +504,7 @@ PARTITION BY RANGE (c) ( func TestDropSchema(t *testing.T) { store, dom := testkit.CreateMockStoreAndDomain(t) - _, err := infosync.GlobalInfoSyncerInit(context.Background(), dom.DDL().GetID(), dom.ServerID, dom.GetEtcdClient(), true) + _, err := infosync.GlobalInfoSyncerInit(context.Background(), dom.DDL().GetID(), dom.ServerID, dom.GetEtcdClient(), dom.GetEtcdClient(), true) require.NoError(t, err) tk := testkit.NewTestKit(t, store) tk.MustExec("use test") @@ -530,7 +530,7 @@ PARTITION BY RANGE (c) ( func TestDefaultKeyword(t *testing.T) { store, dom := testkit.CreateMockStoreAndDomain(t) - _, err := infosync.GlobalInfoSyncerInit(context.Background(), dom.DDL().GetID(), dom.ServerID, dom.GetEtcdClient(), true) + _, err := infosync.GlobalInfoSyncerInit(context.Background(), dom.DDL().GetID(), dom.ServerID, dom.GetEtcdClient(), dom.GetEtcdClient(), true) require.NoError(t, err) tk := testkit.NewTestKit(t, store) tk.MustExec("use test") diff --git a/ddl/main_test.go b/ddl/main_test.go index 6a8642ae34380..a10374b04f0f2 100644 --- a/ddl/main_test.go +++ b/ddl/main_test.go @@ -52,7 +52,7 @@ func TestMain(m *testing.M) { conf.Experimental.AllowsExpressionIndex = true }) - _, err := infosync.GlobalInfoSyncerInit(context.Background(), "t", func() uint64 { return 1 }, nil, true) + _, err := infosync.GlobalInfoSyncerInit(context.Background(), "t", func() uint64 { return 1 }, nil, nil, true) if err != nil { _, _ = fmt.Fprintf(os.Stderr, "ddl: infosync.GlobalInfoSyncerInit: %v\n", err) os.Exit(1) diff --git a/domain/db_test.go b/domain/db_test.go index ff6e6b625788a..9b122664f8397 100644 --- a/domain/db_test.go +++ b/domain/db_test.go @@ -73,7 +73,7 @@ func TestNormalSessionPool(t *testing.T) { domain, err := session.BootstrapSession(store) require.NoError(t, err) defer domain.Close() - info, err1 := infosync.GlobalInfoSyncerInit(context.Background(), "t", func() uint64 { return 1 }, nil, true) + info, err1 := infosync.GlobalInfoSyncerInit(context.Background(), "t", func() uint64 { return 1 }, nil, nil, true) require.NoError(t, err1) conf := config.GetGlobalConfig() conf.Socket = "" @@ -107,7 +107,7 @@ func TestAbnormalSessionPool(t *testing.T) { domain, err := session.BootstrapSession(store) require.NoError(t, err) defer domain.Close() - info, err1 := infosync.GlobalInfoSyncerInit(context.Background(), "t", func() uint64 { return 1 }, nil, true) + info, err1 := infosync.GlobalInfoSyncerInit(context.Background(), "t", func() uint64 { return 1 }, nil, nil, true) require.NoError(t, err1) conf := config.GetGlobalConfig() conf.Socket = "" diff --git a/domain/domain.go b/domain/domain.go index 0d06fdc5a7a13..3f1c833b148c0 100644 --- a/domain/domain.go +++ b/domain/domain.go @@ -98,20 +98,26 @@ func NewMockDomain() *Domain { // Domain represents a storage space. Different domains can use the same database name. // Multiple domains can be used in parallel without synchronization. type Domain struct { - store kv.Storage - infoCache *infoschema.InfoCache - privHandle *privileges.Handle - bindHandle atomic.Pointer[bindinfo.BindHandle] - statsHandle unsafe.Pointer - statsLease time.Duration - ddl ddl.DDL - info *infosync.InfoSyncer - globalCfgSyncer *globalconfigsync.GlobalConfigSyncer - m sync.Mutex - SchemaValidator SchemaValidator - sysSessionPool *sessionPool - exit chan struct{} - etcdClient *clientv3.Client + store kv.Storage + infoCache *infoschema.InfoCache + privHandle *privileges.Handle + bindHandle atomic.Pointer[bindinfo.BindHandle] + statsHandle unsafe.Pointer + statsLease time.Duration + ddl ddl.DDL + info *infosync.InfoSyncer + globalCfgSyncer *globalconfigsync.GlobalConfigSyncer + m sync.Mutex + SchemaValidator SchemaValidator + sysSessionPool *sessionPool + exit chan struct{} + // `etcdClient` must be used when keyspace is not set, or when the logic to each etcd path needs to be separated by keyspace. + etcdClient *clientv3.Client + // `unprefixedEtcdCli` will never set the etcd namespace prefix by keyspace. + // It is only used in storeMinStartTS and RemoveMinStartTS now. + // It must be used when the etcd path isn't needed to separate by keyspace. + // See keyspace RFC: https://github.com/pingcap/tidb/pull/39685 + unprefixedEtcdCli *clientv3.Client sysVarCache sysVarCache // replaces GlobalVariableCache slowQuery *topNSlowQueries expensiveQueryHandle *expensivequery.Handle @@ -881,6 +887,10 @@ func (do *Domain) Close() { terror.Log(errors.Trace(do.etcdClient.Close())) } + if do.unprefixedEtcdCli != nil { + terror.Log(errors.Trace(do.unprefixedEtcdCli.Close())) + } + do.slowQuery.Close() if do.cancel != nil { do.cancel() @@ -927,6 +937,27 @@ func NewDomain(store kv.Storage, ddlLease time.Duration, statsLease time.Duratio const serverIDForStandalone = 1 // serverID for standalone deployment. +func newEtcdCli(addrs []string, ebd kv.EtcdBackend) (*clientv3.Client, error) { + cfg := config.GetGlobalConfig() + etcdLogCfg := zap.NewProductionConfig() + etcdLogCfg.Level = zap.NewAtomicLevelAt(zap.ErrorLevel) + cli, err := clientv3.New(clientv3.Config{ + LogConfig: &etcdLogCfg, + Endpoints: addrs, + AutoSyncInterval: 30 * time.Second, + DialTimeout: 5 * time.Second, + DialOptions: []grpc.DialOption{ + grpc.WithBackoffMaxDelay(time.Second * 3), + grpc.WithKeepaliveParams(keepalive.ClientParameters{ + Time: time.Duration(cfg.TiKVClient.GrpcKeepAliveTime) * time.Second, + Timeout: time.Duration(cfg.TiKVClient.GrpcKeepAliveTimeout) * time.Second, + }), + }, + TLS: ebd.TLSConfig(), + }) + return cli, err +} + // Init initializes a domain. func (do *Domain) Init( ddlLease time.Duration, @@ -942,25 +973,7 @@ func (do *Domain) Init( return err } if addrs != nil { - cfg := config.GetGlobalConfig() - // silence etcd warn log, when domain closed, it won't randomly print warn log - // see details at the issue https://github.com/pingcap/tidb/issues/15479 - etcdLogCfg := zap.NewProductionConfig() - etcdLogCfg.Level = zap.NewAtomicLevelAt(zap.ErrorLevel) - cli, err := clientv3.New(clientv3.Config{ - LogConfig: &etcdLogCfg, - Endpoints: addrs, - AutoSyncInterval: 30 * time.Second, - DialTimeout: 5 * time.Second, - DialOptions: []grpc.DialOption{ - grpc.WithBackoffMaxDelay(time.Second * 3), - grpc.WithKeepaliveParams(keepalive.ClientParameters{ - Time: time.Duration(cfg.TiKVClient.GrpcKeepAliveTime) * time.Second, - Timeout: time.Duration(cfg.TiKVClient.GrpcKeepAliveTimeout) * time.Second, - }), - }, - TLS: ebd.TLSConfig(), - }) + cli, err := newEtcdCli(addrs, ebd) if err != nil { return errors.Trace(err) } @@ -968,6 +981,12 @@ func (do *Domain) Init( etcd.SetEtcdCliByNamespace(cli, keyspace.MakeKeyspaceEtcdNamespace(do.store.GetCodec())) do.etcdClient = cli + + unprefixedEtcdCli, err := newEtcdCli(addrs, ebd) + if err != nil { + return errors.Trace(err) + } + do.unprefixedEtcdCli = unprefixedEtcdCli } } @@ -1029,7 +1048,7 @@ func (do *Domain) Init( // step 1: prepare the info/schema syncer which domain reload needed. skipRegisterToDashboard := config.GetGlobalConfig().SkipRegisterToDashboard - do.info, err = infosync.GlobalInfoSyncerInit(ctx, do.ddl.GetID(), do.ServerID, do.etcdClient, skipRegisterToDashboard) + do.info, err = infosync.GlobalInfoSyncerInit(ctx, do.ddl.GetID(), do.ServerID, do.etcdClient, do.unprefixedEtcdCli, skipRegisterToDashboard) if err != nil { return err } diff --git a/domain/infosync/info.go b/domain/infosync/info.go index 7732a831b057e..6c9e721959cf9 100644 --- a/domain/infosync/info.go +++ b/domain/infosync/info.go @@ -95,12 +95,18 @@ var ErrPrometheusAddrIsNotSet = dbterror.ClassDomain.NewStd(errno.ErrPrometheusA // InfoSyncer stores server info to etcd when the tidb-server starts and delete when tidb-server shuts down. type InfoSyncer struct { - etcdCli *clientv3.Client - info *ServerInfo - serverInfoPath string - minStartTS uint64 - minStartTSPath string - managerMu struct { + // `etcdClient` must be used when keyspace is not set, or when the logic to each etcd path needs to be separated by keyspace. + etcdCli *clientv3.Client + // `unprefixedEtcdCli` will never set the etcd namespace prefix by keyspace. + // It is only used in storeMinStartTS and RemoveMinStartTS now. + // It must be used when the etcd path isn't needed to separate by keyspace. + // See keyspace RFC: https://github.com/pingcap/tidb/pull/39685 + unprefixedEtcdCli *clientv3.Client + info *ServerInfo + serverInfoPath string + minStartTS uint64 + minStartTSPath string + managerMu struct { mu sync.RWMutex util2.SessionManager } @@ -180,12 +186,13 @@ func setGlobalInfoSyncer(is *InfoSyncer) { } // GlobalInfoSyncerInit return a new InfoSyncer. It is exported for testing. -func GlobalInfoSyncerInit(ctx context.Context, id string, serverIDGetter func() uint64, etcdCli *clientv3.Client, skipRegisterToDashBoard bool) (*InfoSyncer, error) { +func GlobalInfoSyncerInit(ctx context.Context, id string, serverIDGetter func() uint64, etcdCli *clientv3.Client, unprefixedEtcdCli *clientv3.Client, skipRegisterToDashBoard bool) (*InfoSyncer, error) { is := &InfoSyncer{ - etcdCli: etcdCli, - info: getServerInfo(id, serverIDGetter), - serverInfoPath: fmt.Sprintf("%s/%s", ServerInformationPath, id), - minStartTSPath: fmt.Sprintf("%s/%s", ServerMinStartTSPath, id), + etcdCli: etcdCli, + unprefixedEtcdCli: unprefixedEtcdCli, + info: getServerInfo(id, serverIDGetter), + serverInfoPath: fmt.Sprintf("%s/%s", ServerInformationPath, id), + minStartTSPath: fmt.Sprintf("%s/%s", ServerMinStartTSPath, id), } err := is.init(ctx, skipRegisterToDashBoard) if err != nil { @@ -721,20 +728,20 @@ func (is *InfoSyncer) GetMinStartTS() uint64 { // storeMinStartTS stores self server min start timestamp to etcd. func (is *InfoSyncer) storeMinStartTS(ctx context.Context) error { - if is.etcdCli == nil { + if is.unprefixedEtcdCli == nil { return nil } - return util.PutKVToEtcd(ctx, is.etcdCli, keyOpDefaultRetryCnt, is.minStartTSPath, + return util.PutKVToEtcd(ctx, is.unprefixedEtcdCli, keyOpDefaultRetryCnt, is.minStartTSPath, strconv.FormatUint(is.minStartTS, 10), clientv3.WithLease(is.session.Lease())) } // RemoveMinStartTS removes self server min start timestamp from etcd. func (is *InfoSyncer) RemoveMinStartTS() { - if is.etcdCli == nil { + if is.unprefixedEtcdCli == nil { return } - err := util.DeleteKeyFromEtcd(is.minStartTSPath, is.etcdCli, keyOpDefaultRetryCnt, keyOpDefaultTimeout) + err := util.DeleteKeyFromEtcd(is.minStartTSPath, is.unprefixedEtcdCli, keyOpDefaultRetryCnt, keyOpDefaultTimeout) if err != nil { logutil.BgLogger().Error("remove minStartTS failed", zap.Error(err)) } diff --git a/domain/infosync/info_test.go b/domain/infosync/info_test.go index 90a30d8f1f161..3264c0adca3c6 100644 --- a/domain/infosync/info_test.go +++ b/domain/infosync/info_test.go @@ -67,7 +67,7 @@ func TestTopology(t *testing.T) { require.NoError(t, err) }() - info, err := GlobalInfoSyncerInit(ctx, currentID, func() uint64 { return 1 }, client, false) + info, err := GlobalInfoSyncerInit(ctx, currentID, func() uint64 { return 1 }, client, client, false) require.NoError(t, err) err = info.newTopologySessionAndStoreServerInfo(ctx, util2.NewSessionDefaultRetryCnt) @@ -152,7 +152,7 @@ func (is *InfoSyncer) ttlKeyExists(ctx context.Context) (bool, error) { } func TestPutBundlesRetry(t *testing.T) { - _, err := GlobalInfoSyncerInit(context.TODO(), "test", func() uint64 { return 1 }, nil, false) + _, err := GlobalInfoSyncerInit(context.TODO(), "test", func() uint64 { return 1 }, nil, nil, false) require.NoError(t, err) bundle, err := placement.NewBundleFromOptions(&model.PlacementSettings{PrimaryRegion: "r1", Regions: "r1,r2"}) @@ -216,7 +216,7 @@ func TestPutBundlesRetry(t *testing.T) { func TestTiFlashManager(t *testing.T) { ctx := context.Background() - _, err := GlobalInfoSyncerInit(ctx, "test", func() uint64 { return 1 }, nil, false) + _, err := GlobalInfoSyncerInit(ctx, "test", func() uint64 { return 1 }, nil, nil, false) tiflash := NewMockTiFlash() SetMockTiFlash(tiflash) diff --git a/server/stat_test.go b/server/stat_test.go index 66c974a3deeea..4484823f6dc83 100644 --- a/server/stat_test.go +++ b/server/stat_test.go @@ -46,7 +46,7 @@ func TestUptime(t *testing.T) { }() require.NoError(t, err) - _, err = infosync.GlobalInfoSyncerInit(context.Background(), dom.DDL().GetID(), dom.ServerID, dom.GetEtcdClient(), true) + _, err = infosync.GlobalInfoSyncerInit(context.Background(), dom.DDL().GetID(), dom.ServerID, dom.GetEtcdClient(), dom.GetEtcdClient(), true) require.NoError(t, err) tidbdrv := NewTiDBDriver(store) diff --git a/store/driver/tikv_driver.go b/store/driver/tikv_driver.go index 8a28b91b6cb2c..21557f09bec2c 100644 --- a/store/driver/tikv_driver.go +++ b/store/driver/tikv_driver.go @@ -196,10 +196,6 @@ func (d TiKVDriver) OpenWithOptions(path string, options ...Option) (kv.Storage, if err != nil { return nil, errors.Trace(err) } - // If there's setting keyspace-name, then skipped GC worker logic. - // It needs a group of special tidb nodes to execute GC worker logic. - // TODO: remove this restriction while merged keyspace GC worker logic. - disableGC = true } codec := pdClient.GetCodec() diff --git a/store/gcworker/gc_worker.go b/store/gcworker/gc_worker.go index f9b53e55988d4..474096fbca7ec 100644 --- a/store/gcworker/gc_worker.go +++ b/store/gcworker/gc_worker.go @@ -301,6 +301,85 @@ func (w *GCWorker) tick(ctx context.Context) { } } +// getGCSafePoint returns the current gc safe point. +func getGCSafePoint(ctx context.Context, pdClient pd.Client) (uint64, error) { + // If there is try to set gc safepoint is 0, the interface will not set gc safepoint to 0, + // it will return current gc safepoint. + safePoint, err := pdClient.UpdateGCSafePoint(ctx, 0) + if err != nil { + return 0, errors.Trace(err) + } + return safePoint, nil +} + +func (w *GCWorker) logIsGCSafePointTooEarly(ctx context.Context, safePoint uint64) error { + now, err := w.getOracleTime() + if err != nil { + return errors.Trace(err) + } + + checkTs := oracle.GoTimeToTS(now.Add(-gcDefaultLifeTime * 2)) + if checkTs > safePoint { + logutil.Logger(ctx).Info("[gc worker] gc safepoint is too early. " + + "Maybe there is a bit BR/Lightning/CDC task, " + + "or a long transaction is running" + + "or need a tidb without setting keyspace-name to calculate and update gc safe point.") + } + return nil +} + +func (w *GCWorker) runKeyspaceDeleteRange(ctx context.Context, concurrency int) error { + // Get safe point from PD. + // The GC safe point is updated only after the global GC have done resolveLocks phase globally. + // So, in the following code, resolveLocks must have been done by the global GC on the ranges to be deleted, + // so its safe to delete the ranges. + safePoint, err := getGCSafePoint(ctx, w.pdClient) + if err != nil { + logutil.Logger(ctx).Info("[gc worker] get gc safe point error", zap.Error(errors.Trace(err))) + return nil + } + + if safePoint == 0 { + logutil.Logger(ctx).Info("[gc worker] skip keyspace delete range, because gc safe point is 0") + return nil + } + + err = w.logIsGCSafePointTooEarly(ctx, safePoint) + if err != nil { + logutil.Logger(ctx).Info("[gc worker] log is gc safe point is too early error", zap.Error(errors.Trace(err))) + return nil + } + + keyspaceID := w.store.GetCodec().GetKeyspaceID() + logutil.Logger(ctx).Info("[gc worker] start keyspace delete range", + zap.String("uuid", w.uuid), + zap.Int("concurrency", concurrency), + zap.Uint32("keyspaceID", uint32(keyspaceID)), + zap.Uint64("GCSafepoint", safePoint)) + + // Do deleteRanges. + err = w.deleteRanges(ctx, safePoint, concurrency) + if err != nil { + logutil.Logger(ctx).Error("[gc worker] delete range returns an error", + zap.String("uuid", w.uuid), + zap.Error(err)) + metrics.GCJobFailureCounter.WithLabelValues("delete_range").Inc() + return errors.Trace(err) + } + + // Do redoDeleteRanges. + err = w.redoDeleteRanges(ctx, safePoint, concurrency) + if err != nil { + logutil.Logger(ctx).Error("[gc worker] redo-delete range returns an error", + zap.String("uuid", w.uuid), + zap.Error(err)) + metrics.GCJobFailureCounter.WithLabelValues("redo_delete_range").Inc() + return errors.Trace(err) + } + + return nil +} + // leaderTick of GC worker checks if it should start a GC job every tick. func (w *GCWorker) leaderTick(ctx context.Context) error { if w.gcIsRunning { @@ -317,6 +396,19 @@ func (w *GCWorker) leaderTick(ctx context.Context) error { return errors.Trace(err) } + // Gc safe point is not separated by keyspace now. The whole cluster has only one global gc safe point. + // So at least one TiDB with `keyspace-name` not set is required in the whole cluster to calculate and update gc safe point. + // If `keyspace-name` is set, the TiDB node will only do its own delete range, and will not calculate gc safe point and resolve locks. + // Note that when `keyspace-name` is set, `checkLeader` will be done within the key space. + // Therefore only one TiDB node in each key space will be responsible to do delete range. + if w.store.GetCodec().GetKeyspace() != nil { + err = w.runKeyspaceGCJob(ctx, concurrency) + if err != nil { + return errors.Trace(err) + } + return nil + } + ok, safePoint, err := w.prepare(ctx) if err != nil { metrics.GCJobFailureCounter.WithLabelValues("prepare").Inc() @@ -354,6 +446,36 @@ func (w *GCWorker) leaderTick(ctx context.Context) error { return nil } +func (w *GCWorker) runKeyspaceGCJob(ctx context.Context, concurrency int) error { + // When the worker is just started, or an old GC job has just finished, + // wait a while before starting a new job. + if time.Since(w.lastFinish) < gcWaitTime { + logutil.Logger(ctx).Info("[gc worker] another keyspace gc job has just finished, skipped.", + zap.String("leaderTick on ", w.uuid)) + return nil + } + + now, err := w.getOracleTime() + if err != nil { + return errors.Trace(err) + } + ok, err := w.checkGCInterval(now) + if err != nil || !ok { + return errors.Trace(err) + } + + go func() { + w.done <- w.runKeyspaceDeleteRange(ctx, concurrency) + }() + + err = w.saveTime(gcLastRunTimeKey, now) + if err != nil { + return errors.Trace(err) + } + + return nil +} + // prepare checks preconditions for starting a GC job. It returns a bool // that indicates whether the GC job should start and the new safePoint. func (w *GCWorker) prepare(ctx context.Context) (bool, uint64, error) { @@ -1974,6 +2096,11 @@ func (w *GCWorker) saveValueToSysTable(key, value string) error { // Placement rules cannot be removed immediately after drop table / truncate table, // because the tables can be flashed back or recovered. func (w *GCWorker) doGCPlacementRules(se session.Session, safePoint uint64, dr util.DelRangeTask, gcPlacementRuleCache map[int64]interface{}) (err error) { + if w.store.GetCodec().GetKeyspace() != nil { + logutil.BgLogger().Info("[gc worker] skip doGCPlacementRules when keyspace_name is set.", zap.String("uuid", w.uuid)) + return nil + } + // Get the job from the job history var historyJob *model.Job failpoint.Inject("mockHistoryJobForGC", func(v failpoint.Value) {