Skip to content

Commit

Permalink
parser,ddl: add background job syntax and meta management for resourc…
Browse files Browse the repository at this point in the history
…e control (pingcap#45189)

ref pingcap#44517
  • Loading branch information
glorv authored Jul 11, 2023
1 parent bd59ea3 commit 43763d2
Show file tree
Hide file tree
Showing 20 changed files with 11,066 additions and 10,776 deletions.
4 changes: 2 additions & 2 deletions DEPS.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -3195,8 +3195,8 @@ def go_deps():
name = "com_github_pingcap_kvproto",
build_file_proto_mode = "disable_global",
importpath = "github.com/pingcap/kvproto",
sum = "h1:yCWc6ZivbE5Ia7VxznP4R3tW6Njgyc3Wn3ZhnoKXy8U=",
version = "v0.0.0-20230626064255-5f60f7f5d8e2",
sum = "h1:TN9FcS+r19rKyrsPJDPfcXWkztVHfbpZ9Xkic6kE+v0=",
version = "v0.0.0-20230703085931-3788ab4ee6b3",
)
go_repository(
name = "com_github_pingcap_log",
Expand Down
1 change: 1 addition & 0 deletions ddl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ go_library(
"@com_github_tikv_client_go_v2//tikv",
"@com_github_tikv_client_go_v2//tikvrpc",
"@com_github_tikv_client_go_v2//txnkv/rangetask",
"@com_github_tikv_client_go_v2//util",
"@io_etcd_go_etcd_client_v3//:client",
"@org_golang_x_exp//slices",
"@org_golang_x_sync//errgroup",
Expand Down
64 changes: 58 additions & 6 deletions ddl/ddl_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,9 @@ import (
"github.com/pingcap/tidb/util/sqlexec"
"github.com/pingcap/tidb/util/stringutil"
"github.com/tikv/client-go/v2/oracle"
kvutil "github.com/tikv/client-go/v2/util"
"go.uber.org/zap"
"golang.org/x/exp/slices"
)

const (
Expand Down Expand Up @@ -3166,7 +3168,8 @@ func SetDirectPlacementOpt(placementSettings *model.PlacementSettings, placement
}

// SetDirectResourceGroupSettings tries to set the ResourceGroupSettings.
func SetDirectResourceGroupSettings(resourceGroupSettings *model.ResourceGroupSettings, opt *ast.ResourceGroupOption) error {
func SetDirectResourceGroupSettings(groupInfo *model.ResourceGroupInfo, opt *ast.ResourceGroupOption) error {
resourceGroupSettings := groupInfo.ResourceGroupSettings
switch opt.Tp {
case ast.ResourceRURate:
resourceGroupSettings.RURate = opt.UintValue
Expand All @@ -3189,12 +3192,24 @@ func SetDirectResourceGroupSettings(resourceGroupSettings *model.ResourceGroupSe
}
resourceGroupSettings.BurstLimit = limit
case ast.ResourceGroupRunaway:
if len(opt.ResourceGroupRunawayOptionList) == 0 {
if len(opt.RunawayOptionList) == 0 {
resourceGroupSettings.Runaway = nil
}
for _, opt := range opt.ResourceGroupRunawayOptionList {
err := SetDirectResourceGroupRunawayOption(resourceGroupSettings, opt.Tp, opt.StrValue, opt.IntValue)
if err != nil {
for _, opt := range opt.RunawayOptionList {
if err := SetDirectResourceGroupRunawayOption(resourceGroupSettings, opt.Tp, opt.StrValue, opt.IntValue); err != nil {
return err
}
}
case ast.ResourceGroupBackground:
if groupInfo.Name.L != rg.DefaultResourceGroupName {
// FIXME: this is a temporary restriction, so we don't add a error-code for it.
return errors.New("unsupported operation. Currently, only the default resource group support change background settings")
}
if len(opt.BackgroundOptions) == 0 {
resourceGroupSettings.Background = nil
}
for _, opt := range opt.BackgroundOptions {
if err := SetDirectResourceGroupBackgroundOption(resourceGroupSettings, opt); err != nil {
return err
}
}
Expand Down Expand Up @@ -3233,6 +3248,43 @@ func SetDirectResourceGroupRunawayOption(resourceGroupSettings *model.ResourceGr
return nil
}

// SetDirectResourceGroupBackgroundOption set background configs of the ResourceGroupSettings.
func SetDirectResourceGroupBackgroundOption(resourceGroupSettings *model.ResourceGroupSettings, opt *ast.ResourceGroupBackgroundOption) error {
if resourceGroupSettings.Background == nil {
resourceGroupSettings.Background = &model.ResourceGroupBackgroundSettings{}
}
switch opt.Type {
case ast.BackgroundOptionTaskNames:
jobTypes, err := parseBackgroundJobTypes(opt.StrValue)
if err != nil {
return err
}
resourceGroupSettings.Background.JobTypes = jobTypes
default:
return errors.Trace(errors.New("unknown background option type"))
}
return nil
}

func parseBackgroundJobTypes(t string) ([]string, error) {
if len(t) == 0 {
return []string{}, nil
}

segs := strings.Split(t, ",")
res := make([]string, 0, len(segs))
for _, s := range segs {
ty := strings.ToLower(strings.TrimSpace(s))
if len(ty) > 0 {
if !slices.Contains(kvutil.ExplicitTypeList, ty) {
return nil, infoschema.ErrResourceGroupInvalidBackgroundTaskName.GenWithStackByArgs(ty)
}
res = append(res, ty)
}
}
return res, nil
}

// handleTableOptions updates tableInfo according to table options.
func handleTableOptions(options []*ast.TableOption, tbInfo *model.TableInfo) error {
var ttlOptionsHandled bool
Expand Down Expand Up @@ -8284,7 +8336,7 @@ func buildResourceGroup(oldGroup *model.ResourceGroupInfo, options []*ast.Resour
*groupInfo.ResourceGroupSettings = *oldGroup.ResourceGroupSettings
}
for _, opt := range options {
err := SetDirectResourceGroupSettings(groupInfo.ResourceGroupSettings, opt)
err := SetDirectResourceGroupSettings(groupInfo, opt)
if err != nil {
return nil, err
}
Expand Down
7 changes: 7 additions & 0 deletions ddl/resourcegroup/group.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,12 @@ func NewGroupFromOptions(groupName string, options *model.ResourceGroupSettings)
group.RunawaySettings = runaway
}

if options.Background != nil {
group.BackgroundSettings = &rmpb.BackgroundSettings{
JobTypes: options.Background.JobTypes,
}
}

if options.RURate > 0 {
group.Mode = rmpb.GroupMode_RUMode
group.RUSettings = &rmpb.GroupRequestUnitSettings{
Expand All @@ -72,6 +78,7 @@ func NewGroupFromOptions(groupName string, options *model.ResourceGroupSettings)
}
return group, nil
}

// Only support RU mode now
return nil, ErrUnknownResourceGroupMode
}
38 changes: 24 additions & 14 deletions ddl/tests/resourcegroup/resource_group_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,9 @@ func TestResourceGroupBasic(t *testing.T) {
tk.MustExec("set global tidb_enable_resource_control = 'on'")

// test default resource group.
tk.MustQuery("select * from information_schema.resource_groups where name = 'default'").Check(testkit.Rows("default UNLIMITED MEDIUM YES <nil>"))
tk.MustQuery("select * from information_schema.resource_groups where name = 'default'").Check(testkit.Rows("default UNLIMITED MEDIUM YES <nil> <nil>"))
tk.MustExec("alter resource group `default` RU_PER_SEC=1000 PRIORITY=LOW")
tk.MustQuery("select * from information_schema.resource_groups where name = 'default'").Check(testkit.Rows("default 1000 LOW YES <nil>"))
tk.MustQuery("select * from information_schema.resource_groups where name = 'default'").Check(testkit.Rows("default 1000 LOW YES <nil> <nil>"))
tk.MustContainErrMsg("drop resource group `default`", "can't drop reserved resource group")

tk.MustExec("create resource group x RU_PER_SEC=1000")
Expand Down Expand Up @@ -105,7 +105,7 @@ func TestResourceGroupBasic(t *testing.T) {
re.Equal(model.WatchSimilar, g.Runaway.WatchType)
re.Equal(uint64(time.Minute*10/time.Millisecond), g.Runaway.WatchDurationMs)

tk.MustQuery("select * from information_schema.resource_groups where name = 'x'").Check(testkit.Rows("x 2000 MEDIUM YES EXEC_ELAPSED='15s', ACTION=DRYRUN, WATCH=SIMILAR DURATION='10m0s'"))
tk.MustQuery("select * from information_schema.resource_groups where name = 'x'").Check(testkit.Rows("x 2000 MEDIUM YES EXEC_ELAPSED='15s', ACTION=DRYRUN, WATCH=SIMILAR DURATION='10m0s' <nil>"))

tk.MustExec("drop resource group x")
g = testResourceGroupNameFromIS(t, tk.Session(), "x")
Expand Down Expand Up @@ -139,7 +139,7 @@ func TestResourceGroupBasic(t *testing.T) {
}
g = testResourceGroupNameFromIS(t, tk.Session(), "y")
checkFunc(g)
tk.MustQuery("select * from information_schema.resource_groups where name = 'y'").Check(testkit.Rows("y 5000 MEDIUM YES EXEC_ELAPSED='15s', ACTION=KILL"))
tk.MustQuery("select * from information_schema.resource_groups where name = 'y'").Check(testkit.Rows("y 5000 MEDIUM YES EXEC_ELAPSED='15s', ACTION=KILL <nil>"))
tk.MustExec("drop resource group y")
g = testResourceGroupNameFromIS(t, tk.Session(), "y")
re.Nil(g)
Expand All @@ -166,30 +166,30 @@ func TestResourceGroupBasic(t *testing.T) {

// Check information schema table information_schema.resource_groups
tk.MustExec("create resource group x RU_PER_SEC=1000 PRIORITY=LOW")
tk.MustQuery("select * from information_schema.resource_groups where name = 'x'").Check(testkit.Rows("x 1000 LOW NO <nil>"))
tk.MustQuery("select * from information_schema.resource_groups where name = 'x'").Check(testkit.Rows("x 1000 LOW NO <nil> <nil>"))
tk.MustExec("alter resource group x RU_PER_SEC=2000 BURSTABLE QUERY_LIMIT=(EXEC_ELAPSED='15s' action kill)")
tk.MustQuery("select * from information_schema.resource_groups where name = 'x'").Check(testkit.Rows("x 2000 LOW YES EXEC_ELAPSED='15s', ACTION=KILL"))
tk.MustQuery("select * from information_schema.resource_groups where name = 'x'").Check(testkit.Rows("x 2000 LOW YES EXEC_ELAPSED='15s', ACTION=KILL <nil>"))
tk.MustQuery("show create resource group x").Check(testkit.Rows("x CREATE RESOURCE GROUP `x` RU_PER_SEC=2000, PRIORITY=LOW, BURSTABLE, QUERY_LIMIT=(EXEC_ELAPSED=\"15s\" ACTION=KILL)"))
tk.MustExec("CREATE RESOURCE GROUP `x_new` RU_PER_SEC=2000 PRIORITY=LOW BURSTABLE=true QUERY_LIMIT=(EXEC_ELAPSED=\"15s\" ACTION=KILL)")
tk.MustQuery("select * from information_schema.resource_groups where name = 'x_new'").Check(testkit.Rows("x_new 2000 LOW YES EXEC_ELAPSED='15s', ACTION=KILL"))
tk.MustQuery("select * from information_schema.resource_groups where name = 'x_new'").Check(testkit.Rows("x_new 2000 LOW YES EXEC_ELAPSED='15s', ACTION=KILL <nil>"))
tk.MustExec("alter resource group x BURSTABLE=false RU_PER_SEC=3000")
tk.MustQuery("select * from information_schema.resource_groups where name = 'x'").Check(testkit.Rows("x 3000 LOW NO EXEC_ELAPSED='15s', ACTION=KILL"))
tk.MustQuery("select * from information_schema.resource_groups where name = 'x'").Check(testkit.Rows("x 3000 LOW NO EXEC_ELAPSED='15s', ACTION=KILL <nil>"))
tk.MustQuery("show create resource group x").Check(testkit.Rows("x CREATE RESOURCE GROUP `x` RU_PER_SEC=3000, PRIORITY=LOW, QUERY_LIMIT=(EXEC_ELAPSED=\"15s\" ACTION=KILL)"))

tk.MustExec("create resource group y BURSTABLE RU_PER_SEC=2000 QUERY_LIMIT=(EXEC_ELAPSED='1s' action COOLDOWN WATCH EXACT duration '1h')")
tk.MustQuery("select * from information_schema.resource_groups where name = 'y'").Check(testkit.Rows("y 2000 MEDIUM YES EXEC_ELAPSED='1s', ACTION=COOLDOWN, WATCH=EXACT DURATION='1h0m0s'"))
tk.MustQuery("select * from information_schema.resource_groups where name = 'y'").Check(testkit.Rows("y 2000 MEDIUM YES EXEC_ELAPSED='1s', ACTION=COOLDOWN, WATCH=EXACT DURATION='1h0m0s' <nil>"))
tk.MustQuery("show create resource group y").Check(testkit.Rows("y CREATE RESOURCE GROUP `y` RU_PER_SEC=2000, PRIORITY=MEDIUM, BURSTABLE, QUERY_LIMIT=(EXEC_ELAPSED=\"1s\" ACTION=COOLDOWN WATCH=EXACT DURATION=\"1h0m0s\")"))
tk.MustExec("CREATE RESOURCE GROUP `y_new` RU_PER_SEC=2000 PRIORITY=MEDIUM QUERY_LIMIT=(EXEC_ELAPSED=\"1s\" ACTION=COOLDOWN WATCH EXACT DURATION=\"1h0m0s\")")
tk.MustQuery("select * from information_schema.resource_groups where name = 'y_new'").Check(testkit.Rows("y_new 2000 MEDIUM NO EXEC_ELAPSED='1s', ACTION=COOLDOWN, WATCH=EXACT DURATION='1h0m0s'"))
tk.MustQuery("select * from information_schema.resource_groups where name = 'y_new'").Check(testkit.Rows("y_new 2000 MEDIUM NO EXEC_ELAPSED='1s', ACTION=COOLDOWN, WATCH=EXACT DURATION='1h0m0s' <nil>"))
tk.MustExec("alter resource group y_new RU_PER_SEC=3000")
tk.MustQuery("select * from information_schema.resource_groups where name = 'y_new'").Check(testkit.Rows("y_new 3000 MEDIUM NO EXEC_ELAPSED='1s', ACTION=COOLDOWN, WATCH=EXACT DURATION='1h0m0s'"))
tk.MustQuery("select * from information_schema.resource_groups where name = 'y_new'").Check(testkit.Rows("y_new 3000 MEDIUM NO EXEC_ELAPSED='1s', ACTION=COOLDOWN, WATCH=EXACT DURATION='1h0m0s' <nil>"))

tk.MustExec("alter resource group y RU_PER_SEC=4000")
tk.MustQuery("select * from information_schema.resource_groups where name = 'y'").Check(testkit.Rows("y 4000 MEDIUM YES EXEC_ELAPSED='1s', ACTION=COOLDOWN, WATCH=EXACT DURATION='1h0m0s'"))
tk.MustQuery("select * from information_schema.resource_groups where name = 'y'").Check(testkit.Rows("y 4000 MEDIUM YES EXEC_ELAPSED='1s', ACTION=COOLDOWN, WATCH=EXACT DURATION='1h0m0s' <nil>"))
tk.MustQuery("show create resource group y").Check(testkit.Rows("y CREATE RESOURCE GROUP `y` RU_PER_SEC=4000, PRIORITY=MEDIUM, BURSTABLE, QUERY_LIMIT=(EXEC_ELAPSED=\"1s\" ACTION=COOLDOWN WATCH=EXACT DURATION=\"1h0m0s\")"))

tk.MustExec("alter resource group y RU_PER_SEC=4000 PRIORITY=HIGH BURSTABLE")
tk.MustQuery("select * from information_schema.resource_groups where name = 'y'").Check(testkit.Rows("y 4000 HIGH YES EXEC_ELAPSED='1s', ACTION=COOLDOWN, WATCH=EXACT DURATION='1h0m0s'"))
tk.MustQuery("select * from information_schema.resource_groups where name = 'y'").Check(testkit.Rows("y 4000 HIGH YES EXEC_ELAPSED='1s', ACTION=COOLDOWN, WATCH=EXACT DURATION='1h0m0s' <nil>"))
tk.MustQuery("show create resource group y").Check(testkit.Rows("y CREATE RESOURCE GROUP `y` RU_PER_SEC=4000, PRIORITY=HIGH, BURSTABLE, QUERY_LIMIT=(EXEC_ELAPSED=\"1s\" ACTION=COOLDOWN WATCH=EXACT DURATION=\"1h0m0s\")"))

tk.MustQuery("select count(*) from information_schema.resource_groups").Check(testkit.Rows("5"))
Expand All @@ -207,6 +207,16 @@ func TestResourceGroupBasic(t *testing.T) {
tk.MustExec("alter user usr3 resource group ``")
tk.MustExec("alter user usr3 resource group `DeFault`")
tk.MustQuery("select user_attributes from mysql.user where user = 'usr3'").Check(testkit.Rows(`{"resource_group": "default"}`))

tk.MustExec("alter resource group default ru_per_sec = 1000, priority = medium, background = (task_names = 'lightning, BR');")
tk.MustQuery("select * from information_schema.resource_groups where name = 'default'").Check(testkit.Rows("default 1000 MEDIUM YES <nil> TASK_NAMES='lightning,br'"))
tk.MustQuery("show create resource group default").Check(testkit.Rows("default CREATE RESOURCE GROUP `default` RU_PER_SEC=1000, PRIORITY=MEDIUM, BURSTABLE, BACKGROUND=(TASK_NAMES='lightning,br')"))
g = testResourceGroupNameFromIS(t, tk.Session(), "default")
require.EqualValues(t, g.Background.JobTypes, []string{"lightning", "br"})

tk.MustContainErrMsg("create resource group bg ru_per_sec = 1000 background = (task_names = 'lightning')", "unsupported operation")
tk.MustContainErrMsg("alter resource group x background=(task_names='')", "unsupported operation")
tk.MustGetErrCode("alter resource group default background=(task_names='a,b,c')", mysql.ErrResourceGroupInvalidBackgroundTaskName)
}

func testResourceGroupNameFromIS(t *testing.T, ctx sessionctx.Context, name string) *model.ResourceGroupInfo {
Expand Down Expand Up @@ -234,7 +244,7 @@ func TestResourceGroupRunaway(t *testing.T) {
tk.MustExec("set global tidb_enable_resource_control='on'")
tk.MustExec("create resource group rg1 RU_PER_SEC=1000 QUERY_LIMIT=(EXEC_ELAPSED='50ms' ACTION=KILL)")
tk.MustExec("create resource group rg2 BURSTABLE RU_PER_SEC=2000 QUERY_LIMIT=(EXEC_ELAPSED='50ms' action KILL WATCH EXACT duration '1s')")
tk.MustQuery("select * from information_schema.resource_groups where name = 'rg2'").Check(testkit.Rows("rg2 2000 MEDIUM YES EXEC_ELAPSED='50ms', ACTION=KILL, WATCH=EXACT DURATION='1s'"))
tk.MustQuery("select * from information_schema.resource_groups where name = 'rg2'").Check(testkit.Rows("rg2 2000 MEDIUM YES EXEC_ELAPSED='50ms', ACTION=KILL, WATCH=EXACT DURATION='1s' <nil>"))
tk.MustQuery("select /*+ resource_group(rg1) */ * from t").Check(testkit.Rows("1"))
tk.MustQuery("select /*+ resource_group(rg2) */ * from t").Check(testkit.Rows("1"))

Expand Down
1 change: 1 addition & 0 deletions domain/infosync/resource_manager_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ func (m *mockResourceManagerClient) AddResourceGroup(ctx context.Context, group
func (m *mockResourceManagerClient) ModifyResourceGroup(ctx context.Context, group *rmpb.ResourceGroup) (string, error) {
m.Lock()
defer m.Unlock()

m.groups[group.Name] = group
value, err := proto.Marshal(group)
if err != nil {
Expand Down
15 changes: 8 additions & 7 deletions errno/errcode.go
Original file line number Diff line number Diff line change
Expand Up @@ -1133,13 +1133,14 @@ const (
ErrPausedDDLJob = 8262

// Resource group errors.
ErrResourceGroupExists = 8248
ErrResourceGroupNotExists = 8249
ErrResourceGroupSupportDisabled = 8250
ErrResourceGroupConfigUnavailable = 8251
ErrResourceGroupThrottled = 8252
ErrResourceGroupQueryRunawayInterrupted = 8253
ErrResourceGroupQueryRunawayQuarantine = 8254
ErrResourceGroupExists = 8248
ErrResourceGroupNotExists = 8249
ErrResourceGroupSupportDisabled = 8250
ErrResourceGroupConfigUnavailable = 8251
ErrResourceGroupThrottled = 8252
ErrResourceGroupQueryRunawayInterrupted = 8253
ErrResourceGroupQueryRunawayQuarantine = 8254
ErrResourceGroupInvalidBackgroundTaskName = 8255

// TiKV/PD/TiFlash errors.
ErrPDServerTimeout = 9001
Expand Down
13 changes: 7 additions & 6 deletions errno/errname.go
Original file line number Diff line number Diff line change
Expand Up @@ -1134,12 +1134,13 @@ var MySQLErrName = map[uint16]*mysql.ErrMessage{
ErrResourceGroupExists: mysql.Message("Resource group '%-.192s' already exists", nil),
ErrResourceGroupNotExists: mysql.Message("Unknown resource group '%-.192s'", nil),

ErrColumnInChange: mysql.Message("column %s id %d does not exist, this column may have been updated by other DDL ran in parallel", nil),
ErrResourceGroupSupportDisabled: mysql.Message("Resource control feature is disabled. Run `SET GLOBAL tidb_enable_resource_control='on'` to enable the feature", nil),
ErrResourceGroupConfigUnavailable: mysql.Message("Resource group configuration is unavailable", nil),
ErrResourceGroupThrottled: mysql.Message("Exceeded resource group quota limitation", nil),
ErrResourceGroupQueryRunawayInterrupted: mysql.Message("Query execution was interrupted, identified as runaway query", nil),
ErrResourceGroupQueryRunawayQuarantine: mysql.Message("Quarantined and interrupted because of being in runaway watch list", nil),
ErrColumnInChange: mysql.Message("column %s id %d does not exist, this column may have been updated by other DDL ran in parallel", nil),
ErrResourceGroupSupportDisabled: mysql.Message("Resource control feature is disabled. Run `SET GLOBAL tidb_enable_resource_control='on'` to enable the feature", nil),
ErrResourceGroupConfigUnavailable: mysql.Message("Resource group configuration is unavailable", nil),
ErrResourceGroupThrottled: mysql.Message("Exceeded resource group quota limitation", nil),
ErrResourceGroupQueryRunawayInterrupted: mysql.Message("Query execution was interrupted, identified as runaway query", nil),
ErrResourceGroupQueryRunawayQuarantine: mysql.Message("Quarantined and interrupted because of being in runaway watch list", nil),
ErrResourceGroupInvalidBackgroundTaskName: mysql.Message("Unknown background task name '%-.192s'", nil),

// TiKV/PD errors.
ErrPDServerTimeout: mysql.Message("PD server timeout: %s", nil),
Expand Down
5 changes: 5 additions & 0 deletions errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -1861,6 +1861,11 @@ error = '''
Quarantined and interrupted because of being in runaway watch list
'''

["executor:8255"]
error = '''
Unknown background task name '%-.192s'
'''

["expression:1139"]
error = '''
Got error '%-.64s' from regexp
Expand Down
Loading

0 comments on commit 43763d2

Please sign in to comment.