diff --git a/domain/domain_sysvars.go b/domain/domain_sysvars.go index da2b3aeac3fd3..6988cedcc9b52 100644 --- a/domain/domain_sysvars.go +++ b/domain/domain_sysvars.go @@ -37,6 +37,9 @@ func (do *Domain) initDomainSysVars() { variable.SetExternalTimestamp = do.setExternalTimestamp variable.GetExternalTimestamp = do.getExternalTimestamp + + setGlobalResourceControlFunc := do.setGlobalResourceControl + variable.SetGlobalResourceControl.Store(&setGlobalResourceControlFunc) } // setStatsCacheCapacity sets statsCache cap @@ -67,6 +70,15 @@ func (do *Domain) setPDClientDynamicOption(name, sVal string) { } } +func (do *Domain) setGlobalResourceControl(enable bool) { + if enable { + variable.EnableGlobalResourceControlFunc() + } else { + variable.DisableGlobalResourceControlFunc() + } + logutil.BgLogger().Info("set resource control", zap.Bool("enable", enable)) +} + // updatePDClient is used to set the dynamic option into the PD client. func (do *Domain) updatePDClient(option pd.DynamicOption, val interface{}) error { store, ok := do.store.(interface{ GetPDClient() pd.Client }) diff --git a/go.mod b/go.mod index 7e121ca1e76e2..a11045833f165 100644 --- a/go.mod +++ b/go.mod @@ -91,6 +91,7 @@ require ( github.com/tdakkota/asciicheck v0.1.1 github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2 github.com/tikv/client-go/v2 v2.0.5-0.20230120021435-f89383775234 + github.com/tikv/pd v1.1.0-beta.0.20230119114149-402c2bfee2f3 github.com/tikv/pd/client v0.0.0-20230119115149-5c518d079b93 github.com/timakin/bodyclose v0.0.0-20210704033933-f49887972144 github.com/twmb/murmur3 v1.1.3 @@ -225,7 +226,6 @@ require ( github.com/sirupsen/logrus v1.9.0 // indirect github.com/spaolacci/murmur3 v1.1.0 // indirect github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a // indirect - github.com/tikv/pd v1.1.0-beta.0.20230119114149-402c2bfee2f3 // indirect github.com/tklauser/go-sysconf v0.3.10 // indirect github.com/tklauser/numcpus v0.4.0 // indirect github.com/tmc/grpc-websocket-proxy v0.0.0-20201229170055-e5319fda7802 // indirect diff --git a/resourcemanager/rm.go b/resourcemanager/rm.go index ed73cda8e1abf..e6e48de2059cd 100644 --- a/resourcemanager/rm.go +++ b/resourcemanager/rm.go @@ -24,8 +24,8 @@ import ( "github.com/pingcap/tidb/util/cpu" ) -// GlobalResourceManager is a global resource manager -var GlobalResourceManager = NewResourceManger() +// InstanceResourceManager is a local instance resource manager +var InstanceResourceManager = NewResourceManger() // RandomName is to get a random name for register pool. It is just for test. func RandomName() string { diff --git a/sessionctx/variable/sysvar.go b/sessionctx/variable/sysvar.go index fd278d5ff8c0e..909c3b7c7c415 100644 --- a/sessionctx/variable/sysvar.go +++ b/sessionctx/variable/sysvar.go @@ -2258,6 +2258,7 @@ var defaultSysVars = []*SysVar{ }, {Scope: ScopeGlobal, Name: TiDBEnableResourceControl, Value: BoolToOnOff(DefTiDBEnableResourceControl), Type: TypeBool, SetGlobal: func(ctx context.Context, vars *SessionVars, s string) error { EnableResourceControl.Store(TiDBOptOn(s)) + (*SetGlobalResourceControl.Load())(TiDBOptOn(s)) return nil }, GetGlobal: func(ctx context.Context, vars *SessionVars) (string, error) { return BoolToOnOff(EnableResourceControl.Load()), nil diff --git a/sessionctx/variable/sysvar_test.go b/sessionctx/variable/sysvar_test.go index 91ebc8ec996a0..cf879d3ec4344 100644 --- a/sessionctx/variable/sysvar_test.go +++ b/sessionctx/variable/sysvar_test.go @@ -1115,21 +1115,37 @@ func TestSetJobScheduleWindow(t *testing.T) { } func TestTiDBEnableResourceControl(t *testing.T) { + // setup the hooks for test + enable := false + EnableGlobalResourceControlFunc = func() { enable = true } + DisableGlobalResourceControlFunc = func() { enable = false } + setGlobalResourceControlFunc := func(enable bool) { + if enable { + EnableGlobalResourceControlFunc() + } else { + DisableGlobalResourceControlFunc() + } + } + SetGlobalResourceControl.Store(&setGlobalResourceControlFunc) + vars := NewSessionVars(nil) mock := NewMockGlobalAccessor4Tests() mock.SessionVars = vars vars.GlobalVarsAccessor = mock resourceControlEnabled := GetSysVar(TiDBEnableResourceControl) - // Default true + // Default false require.Equal(t, resourceControlEnabled.Value, Off) + require.Equal(t, enable, false) // Set to On err := mock.SetGlobalSysVar(context.Background(), TiDBEnableResourceControl, On) + require.NoError(t, err) val, err1 := mock.GetGlobalSysVar(TiDBEnableResourceControl) require.NoError(t, err1) require.Equal(t, On, val) + require.Equal(t, enable, true) // Set to off err = mock.SetGlobalSysVar(context.Background(), TiDBEnableResourceControl, Off) @@ -1137,4 +1153,5 @@ func TestTiDBEnableResourceControl(t *testing.T) { val, err1 = mock.GetGlobalSysVar(TiDBEnableResourceControl) require.NoError(t, err1) require.Equal(t, Off, val) + require.Equal(t, enable, false) } diff --git a/sessionctx/variable/tidb_vars.go b/sessionctx/variable/tidb_vars.go index e0d59ee233fe0..c147fdda69ba7 100644 --- a/sessionctx/variable/tidb_vars.go +++ b/sessionctx/variable/tidb_vars.go @@ -1265,6 +1265,16 @@ var ( SetExternalTimestamp func(ctx context.Context, ts uint64) error // GetExternalTimestamp is the func registered by staleread to get externaltimestamp from pd GetExternalTimestamp func(ctx context.Context) (uint64, error) + // SetGlobalResourceControl is the func registered by domain to set cluster resource control. + SetGlobalResourceControl atomic.Pointer[func(bool)] +) + +// Hooks functions for Cluster Resource Control. +var ( + // EnableGlobalResourceControlFunc is the function registered by tikv_driver to set cluster resource control. + EnableGlobalResourceControlFunc func() = func() {} + // DisableGlobalResourceControlFunc is the function registered by tikv_driver to unset cluster resource control. + DisableGlobalResourceControlFunc func() = func() {} ) func serverMemoryLimitDefaultValue() string { diff --git a/store/driver/BUILD.bazel b/store/driver/BUILD.bazel index e3b0480f7e11d..d68f71cd44839 100644 --- a/store/driver/BUILD.bazel +++ b/store/driver/BUILD.bazel @@ -7,6 +7,7 @@ go_library( visibility = ["//visibility:public"], deps = [ "//kv", + "//sessionctx/variable", "//store/copr", "//store/driver/error", "//store/driver/txn", @@ -19,6 +20,7 @@ go_library( "@com_github_tikv_client_go_v2//tikv", "@com_github_tikv_client_go_v2//tikvrpc", "@com_github_tikv_client_go_v2//util", + "@com_github_tikv_pd//pkg/mcs/resource_manager/client", "@com_github_tikv_pd_client//:client", "@org_golang_google_grpc//:grpc", "@org_golang_google_grpc//keepalive", diff --git a/store/driver/tikv_driver.go b/store/driver/tikv_driver.go index bf0f1272184dd..d732665bb45a4 100644 --- a/store/driver/tikv_driver.go +++ b/store/driver/tikv_driver.go @@ -28,6 +28,7 @@ import ( deadlockpb "github.com/pingcap/kvproto/pkg/deadlock" "github.com/pingcap/kvproto/pkg/kvrpcpb" "github.com/pingcap/tidb/kv" + "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/store/copr" derr "github.com/pingcap/tidb/store/driver/error" txn_driver "github.com/pingcap/tidb/store/driver/txn" @@ -38,6 +39,7 @@ import ( "github.com/tikv/client-go/v2/tikvrpc" "github.com/tikv/client-go/v2/util" pd "github.com/tikv/pd/client" + rmclient "github.com/tikv/pd/pkg/mcs/resource_manager/client" "go.uber.org/zap" "google.golang.org/grpc" "google.golang.org/grpc/keepalive" @@ -53,6 +55,10 @@ var mc storeCache func init() { mc.cache = make(map[string]*tikvStore) rand.Seed(time.Now().UnixNano()) + + // Setup the Hooks to dynamic control global resource controller. + variable.EnableGlobalResourceControlFunc = tikv.EnableResourceControl + variable.DisableGlobalResourceControlFunc = tikv.DisableResourceControl } // Option is a function that changes some config of Driver @@ -86,6 +92,25 @@ func WithPDClientConfig(client config.PDClient) Option { } } +// TrySetupGlobalResourceController tries to setup global resource controller. +func TrySetupGlobalResourceController(ctx context.Context, serverID uint64, s kv.Storage) error { + var ( + store *tikvStore + ok bool + ) + if store, ok = s.(*tikvStore); !ok { + return errors.New("cannot setup up resource controller, should use tikv storage") + } + + control, err := rmclient.NewResourceGroupController(serverID, store.GetPDClient(), rmclient.DefaultRequestUnitConfig()) + if err != nil { + return err + } + tikv.SetResourceControlInterceptor(control) + control.Start(ctx) + return nil +} + // TiKVDriver implements engine TiKV. type TiKVDriver struct { keyspaceName string diff --git a/testkit/mockstore.go b/testkit/mockstore.go index 9756d5bb65804..c0cfb83a53149 100644 --- a/testkit/mockstore.go +++ b/testkit/mockstore.go @@ -92,7 +92,7 @@ func bootstrap(t testing.TB, store kv.Storage, lease time.Duration) *domain.Doma err := store.Close() require.NoError(t, err) view.Stop() - resourcemanager.GlobalResourceManager.Reset() + resourcemanager.InstanceResourceManager.Reset() }) return dom } diff --git a/tidb-server/main.go b/tidb-server/main.go index 0843d624741d8..3c8c37d8cbd06 100644 --- a/tidb-server/main.go +++ b/tidb-server/main.go @@ -219,9 +219,13 @@ func main() { keyspaceName := config.GetGlobalKeyspaceName() - resourcemanager.GlobalResourceManager.Start() + resourcemanager.InstanceResourceManager.Start() storage, dom := createStoreAndDomain(keyspaceName) svr := createServer(storage, dom) + err = driver.TrySetupGlobalResourceController(context.Background(), dom.ServerID(), storage) + if err != nil { + logutil.BgLogger().Warn("failed to setup global resource controller", zap.Error(err)) + } // Register error API is not thread-safe, the caller MUST NOT register errors after initialization. // To prevent misuse, set a flag to indicate that register new error will panic immediately. @@ -233,7 +237,7 @@ func main() { svr.Close() cleanup(svr, storage, dom, graceful) cpuprofile.StopCPUProfiler() - resourcemanager.GlobalResourceManager.Stop() + resourcemanager.InstanceResourceManager.Stop() close(exited) }) topsql.SetupTopSQL() diff --git a/util/gpool/spmc/spmcpool.go b/util/gpool/spmc/spmcpool.go index abef899961657..6644a0e895650 100644 --- a/util/gpool/spmc/spmcpool.go +++ b/util/gpool/spmc/spmcpool.go @@ -81,7 +81,7 @@ func NewSPMCPool[T any, U any, C any, CT any, TF pooltask.Context[CT]](name stri result.capacity.Add(size) result.workers = newWorkerLoopQueue[T, U, C, CT, TF](int(size)) result.cond = sync.NewCond(result.lock) - err := resourcemanager.GlobalResourceManager.Register(result, name, component) + err := resourcemanager.InstanceResourceManager.Register(result, name, component) if err != nil { return nil, err } @@ -225,7 +225,7 @@ func (p *Pool[T, U, C, CT, TF]) ReleaseAndWait() { close(p.stopCh) p.release() - defer resourcemanager.GlobalResourceManager.Unregister(p.Name()) + defer resourcemanager.InstanceResourceManager.Unregister(p.Name()) for { // Wait for all workers to exit and all task to be completed. if p.Running() == 0 && p.heartbeatDone.Load() && p.waitingTask.Load() == 0 {