diff --git a/br/pkg/lightning/common/util.go b/br/pkg/lightning/common/util.go index 7a0b4b8095582..9c1b9a28b5a7a 100644 --- a/br/pkg/lightning/common/util.go +++ b/br/pkg/lightning/common/util.go @@ -357,10 +357,10 @@ func TableExists(ctx context.Context, db utils.QueryExecutor, schema, table stri query := "SELECT 1 from INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA = ? AND TABLE_NAME = ?" var exist string err := db.QueryRowContext(ctx, query, schema, table).Scan(&exist) - switch { - case err == nil: + switch err { + case nil: return true, nil - case err == sql.ErrNoRows: + case sql.ErrNoRows: return false, nil default: return false, errors.Annotatef(err, "check table exists failed") @@ -372,10 +372,10 @@ func SchemaExists(ctx context.Context, db utils.QueryExecutor, schema string) (b query := "SELECT 1 from INFORMATION_SCHEMA.SCHEMATA WHERE SCHEMA_NAME = ?" var exist string err := db.QueryRowContext(ctx, query, schema).Scan(&exist) - switch { - case err == nil: + switch err { + case nil: return true, nil - case err == sql.ErrNoRows: + case sql.ErrNoRows: return false, nil default: return false, errors.Annotatef(err, "check schema exists failed") diff --git a/br/pkg/restore/systable_restore.go b/br/pkg/restore/systable_restore.go index 6a1379b3e9524..d7eac8ec2fada 100644 --- a/br/pkg/restore/systable_restore.go +++ b/br/pkg/restore/systable_restore.go @@ -222,8 +222,7 @@ func (rc *Client) getDatabaseByName(name string) (*database, bool) { func (rc *Client) afterSystemTablesReplaced(tables []string) error { var err error for _, table := range tables { - switch { - case table == "user": + if table == "user" { if rc.fullClusterRestore { log.Info("privilege system table restored, please reconnect to make it effective") err = rc.dom.NotifyUpdatePrivilege() diff --git a/build/BUILD.bazel b/build/BUILD.bazel index d0f8d380b0b89..4cfd244da6e19 100644 --- a/build/BUILD.bazel +++ b/build/BUILD.bazel @@ -80,6 +80,7 @@ STATICHECK_ANALYZERS = [ "SA6001", "SA6002", "SA6005", + "QF1002", "U1000", ] diff --git a/build/nogo_config.json b/build/nogo_config.json index 06d746067620f..bfeb63e550285 100644 --- a/build/nogo_config.json +++ b/build/nogo_config.json @@ -1178,5 +1178,12 @@ ".*_test.go": "ignore test code", "external/": "no need to vet third party code" } + }, + "QF1002": { + "exclude_files": { + "parser/parser.go": "parser/parser.go code", + ".*_test.go": "ignore test code", + "external/": "no need to vet third party code" + } } } diff --git a/privilege/privileges/cache.go b/privilege/privileges/cache.go index 9159777340b67..892366a6f250d 100644 --- a/privilege/privileges/cache.go +++ b/privilege/privileges/cache.go @@ -746,8 +746,7 @@ func (p *MySQLPrivilege) decodeUserTableRow(row chunk.Row, fs []*ast.ResultField func (p *MySQLPrivilege) decodeGlobalPrivTableRow(row chunk.Row, fs []*ast.ResultField) error { var value globalPrivRecord for i, f := range fs { - switch { - case f.ColumnAsName.L == "priv": + if f.ColumnAsName.L == "priv" { privData := row.GetString(i) if len(privData) > 0 { var privValue GlobalPrivValue @@ -770,7 +769,7 @@ func (p *MySQLPrivilege) decodeGlobalPrivTableRow(row chunk.Row, fs []*ast.Resul } } } - default: + } else { value.assignUserOrHost(row, i, f) } } @@ -784,10 +783,10 @@ func (p *MySQLPrivilege) decodeGlobalPrivTableRow(row chunk.Row, fs []*ast.Resul func (p *MySQLPrivilege) decodeGlobalGrantsTableRow(row chunk.Row, fs []*ast.ResultField) error { var value dynamicPrivRecord for i, f := range fs { - switch { - case f.ColumnAsName.L == "priv": + switch f.ColumnAsName.L { + case "priv": value.PrivilegeName = strings.ToUpper(row.GetString(i)) - case f.ColumnAsName.L == "with_grant_option": + case "with_grant_option": value.GrantOption = row.GetEnum(i).String() == "Y" default: value.assignUserOrHost(row, i, f) @@ -827,14 +826,14 @@ func (p *MySQLPrivilege) decodeDBTableRow(row chunk.Row, fs []*ast.ResultField) func (p *MySQLPrivilege) decodeTablesPrivTableRow(row chunk.Row, fs []*ast.ResultField) error { var value tablesPrivRecord for i, f := range fs { - switch { - case f.ColumnAsName.L == "db": + switch f.ColumnAsName.L { + case "db": value.DB = row.GetString(i) - case f.ColumnAsName.L == "table_name": + case "table_name": value.TableName = row.GetString(i) - case f.ColumnAsName.L == "table_priv": + case "table_priv": value.TablePriv = decodeSetToPrivilege(row.GetSet(i)) - case f.ColumnAsName.L == "column_priv": + case "column_priv": value.ColumnPriv = decodeSetToPrivilege(row.GetSet(i)) default: value.assignUserOrHost(row, i, f) @@ -847,14 +846,14 @@ func (p *MySQLPrivilege) decodeTablesPrivTableRow(row chunk.Row, fs []*ast.Resul func (p *MySQLPrivilege) decodeRoleEdgesTable(row chunk.Row, fs []*ast.ResultField) error { var fromUser, fromHost, toHost, toUser string for i, f := range fs { - switch { - case f.ColumnAsName.L == "from_host": + switch f.ColumnAsName.L { + case "from_host": fromHost = row.GetString(i) - case f.ColumnAsName.L == "from_user": + case "from_user": fromUser = row.GetString(i) - case f.ColumnAsName.L == "to_host": + case "to_host": toHost = row.GetString(i) - case f.ColumnAsName.L == "to_user": + case "to_user": toUser = row.GetString(i) } } @@ -872,10 +871,10 @@ func (p *MySQLPrivilege) decodeRoleEdgesTable(row chunk.Row, fs []*ast.ResultFie func (p *MySQLPrivilege) decodeDefaultRoleTableRow(row chunk.Row, fs []*ast.ResultField) error { var value defaultRoleRecord for i, f := range fs { - switch { - case f.ColumnAsName.L == "default_role_host": + switch f.ColumnAsName.L { + case "default_role_host": value.DefaultRoleHost = row.GetString(i) - case f.ColumnAsName.L == "default_role_user": + case "default_role_user": value.DefaultRoleUser = row.GetString(i) default: value.assignUserOrHost(row, i, f) @@ -888,20 +887,20 @@ func (p *MySQLPrivilege) decodeDefaultRoleTableRow(row chunk.Row, fs []*ast.Resu func (p *MySQLPrivilege) decodeColumnsPrivTableRow(row chunk.Row, fs []*ast.ResultField) error { var value columnsPrivRecord for i, f := range fs { - switch { - case f.ColumnAsName.L == "db": + switch f.ColumnAsName.L { + case "db": value.DB = row.GetString(i) - case f.ColumnAsName.L == "table_name": + case "table_name": value.TableName = row.GetString(i) - case f.ColumnAsName.L == "column_name": + case "column_name": value.ColumnName = row.GetString(i) - case f.ColumnAsName.L == "timestamp": + case "timestamp": var err error value.Timestamp, err = row.GetTime(i).GoTime(time.Local) if err != nil { return errors.Trace(err) } - case f.ColumnAsName.L == "column_priv": + case "column_priv": value.ColumnPriv = decodeSetToPrivilege(row.GetSet(i)) default: value.assignUserOrHost(row, i, f) diff --git a/resourcemanager/pooltask/BUILD.bazel b/resourcemanager/pooltask/BUILD.bazel deleted file mode 100644 index 176bd1871cd9e..0000000000000 --- a/resourcemanager/pooltask/BUILD.bazel +++ /dev/null @@ -1,26 +0,0 @@ -load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") - -go_library( - name = "pooltask", - srcs = [ - "task.go", - "task_manager.go", - "task_manager_iterator.go", - "task_manager_scheduler.go", - ], - importpath = "github.com/pingcap/tidb/resourcemanager/pooltask", - visibility = ["//visibility:public"], - deps = [ - "//util/channel", - "@org_uber_go_atomic//:atomic", - ], -) - -go_test( - name = "pooltask_test", - timeout = "short", - srcs = ["task_test.go"], - embed = [":pooltask"], - flaky = True, - deps = ["@com_github_stretchr_testify//require"], -) diff --git a/resourcemanager/pooltask/task.go b/resourcemanager/pooltask/task.go deleted file mode 100644 index 774ef58c5e82f..0000000000000 --- a/resourcemanager/pooltask/task.go +++ /dev/null @@ -1,180 +0,0 @@ -// Copyright 2022 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package pooltask - -import ( - "sync" - "sync/atomic" - - "github.com/pingcap/tidb/util/channel" -) - -// Context is a interface that can be used to create a context. -type Context[T any] interface { - GetContext() T -} - -// NilContext is to create a nil as context -type NilContext struct{} - -// GetContext is to get a nil as context -func (NilContext) GetContext() any { - return nil -} - -const ( - // PendingTask is a task waiting to start. - PendingTask int32 = iota - // RunningTask is a task running. - RunningTask - // StopTask is a stop task. - StopTask -) - -// TaskBox is a box which contains all info about pool task. -type TaskBox[T any, U any, C any, CT any, TF Context[CT]] struct { - constArgs C - contextFunc TF - wg *sync.WaitGroup - task chan Task[T] - resultCh chan U - taskID uint64 - status atomic.Int32 // task manager is able to make this task stop, wait or running -} - -// GetStatus is to get the status of task. -func (t *TaskBox[T, U, C, CT, TF]) GetStatus() int32 { - return t.status.Load() -} - -// SetStatus is to set the status of task. -func (t *TaskBox[T, U, C, CT, TF]) SetStatus(s int32) { - t.status.Store(s) -} - -// NewTaskBox is to create a task box for pool. -func NewTaskBox[T any, U any, C any, CT any, TF Context[CT]](constArgs C, contextFunc TF, wg *sync.WaitGroup, taskCh chan Task[T], resultCh chan U, taskID uint64) TaskBox[T, U, C, CT, TF] { - // We still need to do some work after a TaskBox finishes. - // So we need to add 1 to waitgroup. After we finish the work, we need to call TaskBox.Finish() - wg.Add(1) - return TaskBox[T, U, C, CT, TF]{ - constArgs: constArgs, - contextFunc: contextFunc, - wg: wg, - task: taskCh, - resultCh: resultCh, - taskID: taskID, - } -} - -// TaskID is to get the task id. -func (t *TaskBox[T, U, C, CT, TF]) TaskID() uint64 { - return t.taskID -} - -// ConstArgs is to get the const args. -func (t *TaskBox[T, U, C, CT, TF]) ConstArgs() C { - return t.constArgs -} - -// GetTaskCh is to get the task channel. -func (t *TaskBox[T, U, C, CT, TF]) GetTaskCh() chan Task[T] { - return t.task -} - -// GetResultCh is to get result channel -func (t *TaskBox[T, U, C, CT, TF]) GetResultCh() chan U { - return t.resultCh -} - -// GetContextFunc is to get context func. -func (t *TaskBox[T, U, C, CT, TF]) GetContextFunc() TF { - return t.contextFunc -} - -// Done is to set the pooltask status to complete. -func (t *TaskBox[T, U, C, CT, TF]) Done() { - t.wg.Done() -} - -// Finish is to set the TaskBox finish status. -func (t *TaskBox[T, U, C, CT, TF]) Finish() { - t.wg.Done() -} - -// Clone is to copy the box -func (t *TaskBox[T, U, C, CT, TF]) Clone() *TaskBox[T, U, C, CT, TF] { - newBox := NewTaskBox[T, U, C, CT, TF](t.constArgs, t.contextFunc, t.wg, t.task, t.resultCh, t.taskID) - return &newBox -} - -// GPool is a goroutine pool. -type GPool[T any, U any, C any, CT any, TF Context[CT]] interface { - Tune(size int32) - DeleteTask(id uint64) - StopTask(id uint64) -} - -// TaskController is a controller that can control or watch the pool. -type TaskController[T any, U any, C any, CT any, TF Context[CT]] struct { - pool GPool[T, U, C, CT, TF] - productExitCh chan struct{} - wg *sync.WaitGroup - taskID uint64 - resultCh chan U - inputCh chan Task[T] -} - -// NewTaskController create a controller to deal with pooltask's status. -func NewTaskController[T any, U any, C any, CT any, TF Context[CT]](p GPool[T, U, C, CT, TF], taskID uint64, productExitCh chan struct{}, wg *sync.WaitGroup, inputCh chan Task[T], resultCh chan U) TaskController[T, U, C, CT, TF] { - return TaskController[T, U, C, CT, TF]{ - pool: p, - taskID: taskID, - productExitCh: productExitCh, - wg: wg, - resultCh: resultCh, - inputCh: inputCh, - } -} - -// Wait is to wait the pool task to stop. -func (t *TaskController[T, U, C, CT, TF]) Wait() { - t.wg.Wait() - close(t.resultCh) - t.pool.DeleteTask(t.taskID) -} - -// Stop is to send stop command to the task. But you still need to wait the task to stop. -func (t *TaskController[T, U, C, CT, TF]) Stop() { - close(t.productExitCh) - // Clear all the task in the task queue and mark all task complete. - // so that ```t.Wait``` is able to close resultCh - for range t.inputCh { - t.wg.Done() - } - t.pool.StopTask(t.TaskID()) - // Clear the resultCh to avoid blocking the consumer put result into the channel and cannot exit. - channel.Clear(t.resultCh) -} - -// TaskID is to get the task id. -func (t *TaskController[T, U, C, CT, TF]) TaskID() uint64 { - return t.taskID -} - -// Task is a task that can be executed. -type Task[T any] struct { - Task T -} diff --git a/resourcemanager/pooltask/task_manager.go b/resourcemanager/pooltask/task_manager.go deleted file mode 100644 index e87c222569792..0000000000000 --- a/resourcemanager/pooltask/task_manager.go +++ /dev/null @@ -1,155 +0,0 @@ -// Copyright 2022 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package pooltask - -import ( - "container/list" - "sync" - "time" - - "go.uber.org/atomic" -) - -const shard int = 8 - -func getShardID(id uint64) uint64 { - return id % uint64(shard) -} - -type tContainer[T any, U any, C any, CT any, TF Context[CT]] struct { - task *TaskBox[T, U, C, CT, TF] -} - -type meta[T any, U any, C any, CT any, TF Context[CT]] struct { - stats *list.List - createTS time.Time - initialConcurrency int32 - running atomic.Int32 -} - -func newStats[T any, U any, C any, CT any, TF Context[CT]](concurrency int32) *meta[T, U, C, CT, TF] { - s := &meta[T, U, C, CT, TF]{ - createTS: time.Now(), - stats: list.New(), - initialConcurrency: concurrency, - } - return s -} - -func (m *meta[T, U, C, CT, TF]) getOriginConcurrency() int32 { - return m.initialConcurrency -} - -// TaskStatusContainer is a container that can control or watch the pool. -type TaskStatusContainer[T any, U any, C any, CT any, TF Context[CT]] struct { - stats map[uint64]*meta[T, U, C, CT, TF] - rw sync.RWMutex -} - -// TaskManager is a manager that can control or watch the pool. -type TaskManager[T any, U any, C any, CT any, TF Context[CT]] struct { - task []TaskStatusContainer[T, U, C, CT, TF] - running atomic.Int32 - concurrency int32 -} - -// NewTaskManager create a new pool task manager. -func NewTaskManager[T any, U any, C any, CT any, TF Context[CT]](c int32) TaskManager[T, U, C, CT, TF] { - task := make([]TaskStatusContainer[T, U, C, CT, TF], shard) - for i := 0; i < shard; i++ { - task[i] = TaskStatusContainer[T, U, C, CT, TF]{ - stats: make(map[uint64]*meta[T, U, C, CT, TF]), - } - } - return TaskManager[T, U, C, CT, TF]{ - task: task, - concurrency: c, - } -} - -// RegisterTask register a task to the manager. -func (t *TaskManager[T, U, C, CT, TF]) RegisterTask(taskID uint64, concurrency int32) { - id := getShardID(taskID) - t.task[id].rw.Lock() - t.task[id].stats[taskID] = newStats[T, U, C, CT, TF](concurrency) - t.task[id].rw.Unlock() -} - -// DeleteTask delete a task from the manager. -func (t *TaskManager[T, U, C, CT, TF]) DeleteTask(taskID uint64) { - shardID := getShardID(taskID) - t.task[shardID].rw.Lock() - delete(t.task[shardID].stats, taskID) - t.task[shardID].rw.Unlock() -} - -// hasTask check if the task is in the manager. -func (t *TaskManager[T, U, C, CT, TF]) hasTask(taskID uint64) bool { - shardID := getShardID(taskID) - t.task[shardID].rw.Lock() - defer t.task[shardID].rw.Unlock() - _, ok := t.task[shardID].stats[taskID] - return ok -} - -// AddSubTask AddTask add a task to the manager. -func (t *TaskManager[T, U, C, CT, TF]) AddSubTask(taskID uint64, task *TaskBox[T, U, C, CT, TF]) { - shardID := getShardID(taskID) - tc := tContainer[T, U, C, CT, TF]{ - task: task, - } - t.running.Inc() - t.task[shardID].rw.Lock() - t.task[shardID].stats[taskID].stats.PushBack(tc) - t.task[shardID].stats[taskID].running.Inc() // running job in this task - t.task[shardID].rw.Unlock() -} - -// ExitSubTask is to exit a task, and it will decrease the count of running pooltask. -func (t *TaskManager[T, U, C, CT, TF]) ExitSubTask(taskID uint64) { - shardID := getShardID(taskID) - t.running.Dec() // total running tasks - t.task[shardID].rw.Lock() - t.task[shardID].stats[taskID].running.Dec() // running job in this task - t.task[shardID].rw.Unlock() -} - -// Running return the count of running job in this task. -func (t *TaskManager[T, U, C, CT, TF]) Running(taskID uint64) int32 { - shardID := getShardID(taskID) - t.task[shardID].rw.Lock() - defer t.task[shardID].rw.Unlock() - return t.task[shardID].stats[taskID].running.Load() -} - -// StopTask is to stop a task by TaskID. -func (t *TaskManager[T, U, C, CT, TF]) StopTask(taskID uint64) { - shardID := getShardID(taskID) - t.task[shardID].rw.Lock() - defer t.task[shardID].rw.Unlock() - // When call the StopTask, the task may have been deleted from the manager. - s, ok := t.task[shardID].stats[taskID] - if ok { - l := s.stats - for e := l.Front(); e != nil; e = e.Next() { - e.Value.(tContainer[T, U, C, CT, TF]).task.SetStatus(StopTask) - } - } -} - -// GetOriginConcurrency return the concurrency of the pool at the init. -func (t *TaskManager[T, U, C, CT, TF]) GetOriginConcurrency() int32 { - return t.concurrency -} diff --git a/resourcemanager/pooltask/task_manager_iterator.go b/resourcemanager/pooltask/task_manager_iterator.go deleted file mode 100644 index ada5994599ff5..0000000000000 --- a/resourcemanager/pooltask/task_manager_iterator.go +++ /dev/null @@ -1,131 +0,0 @@ -// Copyright 2023 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package pooltask - -import ( - "container/list" - "time" -) - -func (t *TaskManager[T, U, C, CT, TF]) getBoostTask() (tid uint64, result *TaskBox[T, U, C, CT, TF]) { - // boost task - // 1、the count of running task is less than concurrency - // 2、less run time, more possible to boost - tid, element := t.iter(canBoost[T, U, C, CT, TF]) - if element != nil { - return tid, element.Value.(tContainer[T, U, C, CT, TF]).task - } - return 0, nil -} - -func (t *TaskManager[T, U, C, CT, TF]) pauseTask() { - // pause task, - // 1、more run time, more possible to pause - // 2、if task have been boosted, first to pause. - tid, result := t.iter(canPause[T, U, C, CT, TF]) - if result != nil { - result.Value.(tContainer[T, U, C, CT, TF]).task.status.CompareAndSwap(RunningTask, StopTask) - // delete it from list - shardID := getShardID(tid) - t.task[shardID].rw.Lock() - defer t.task[shardID].rw.Unlock() - t.task[shardID].stats[tid].stats.Remove(result) - } -} - -func (t *TaskManager[T, U, C, CT, TF]) iter(fn func(m *meta[T, U, C, CT, TF], max time.Time) (*list.Element, bool)) (tid uint64, result *list.Element) { - var compareTS time.Time - for i := 0; i < shard; i++ { - breakFind := func(index int) (breakFind bool) { - t.task[i].rw.RLock() - defer t.task[i].rw.RUnlock() - for id, stats := range t.task[i].stats { - if result == nil { - result = findTask[T, U, C, CT, TF](stats, RunningTask) - tid = id - compareTS = stats.createTS - continue - } - newResult, pauseFind := fn(stats, compareTS) - if pauseFind { - result = newResult - tid = id - compareTS = stats.createTS - return true - } - if newResult != nil { - result = newResult - tid = id - compareTS = stats.createTS - } - } - return false - }(shard) - if breakFind { - break - } - } - return tid, result -} - -func canPause[T any, U any, C any, CT any, TF Context[CT]](m *meta[T, U, C, CT, TF], max time.Time) (result *list.Element, isBreak bool) { - if m.initialConcurrency < m.running.Load() { - box := findTask[T, U, C, CT, TF](m, RunningTask) - if box != nil { - return box, true - } - } - if m.createTS.Before(max) { - box := findTask[T, U, C, CT, TF](m, RunningTask) - if box != nil { - return box, false - } - } - return nil, false -} - -func canBoost[T any, U any, C any, CT any, TF Context[CT]](m *meta[T, U, C, CT, TF], min time.Time) (result *list.Element, isBreak bool) { - if m.running.Load() < m.initialConcurrency { - box := getTask[T, U, C, CT, TF](m) - if box != nil { - return box, true - } - } - if m.createTS.After(min) { - box := getTask[T, U, C, CT, TF](m) - if box != nil { - return box, false - } - } - return nil, false -} - -func findTask[T any, U any, C any, CT any, TF Context[CT]](m *meta[T, U, C, CT, TF], status int32) *list.Element { - for e := m.stats.Front(); e != nil; e = e.Next() { - box := e.Value.(tContainer[T, U, C, CT, TF]) - if box.task.status.Load() == status { - return e - } - } - return nil -} - -func getTask[T any, U any, C any, CT any, TF Context[CT]](m *meta[T, U, C, CT, TF]) *list.Element { - e := m.stats.Front() - if e != nil { - return e - } - return nil -} diff --git a/resourcemanager/pooltask/task_manager_scheduler.go b/resourcemanager/pooltask/task_manager_scheduler.go deleted file mode 100644 index 73c5ee46f099a..0000000000000 --- a/resourcemanager/pooltask/task_manager_scheduler.go +++ /dev/null @@ -1,31 +0,0 @@ -// Copyright 2022 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package pooltask - -// Overclock is to increase the concurrency of pool. -func (t *TaskManager[T, U, C, CT, TF]) Overclock(capacity int) (tid uint64, task *TaskBox[T, U, C, CT, TF]) { - if t.running.Load() >= int32(capacity) { - return - } - return t.getBoostTask() -} - -// Downclock is to decrease the concurrency of pool. -func (t *TaskManager[T, U, C, CT, TF]) Downclock(capacity int) { - if t.running.Load() <= int32(capacity) { - return - } - t.pauseTask() -} diff --git a/resourcemanager/pooltask/task_test.go b/resourcemanager/pooltask/task_test.go deleted file mode 100644 index b4f189fb14525..0000000000000 --- a/resourcemanager/pooltask/task_test.go +++ /dev/null @@ -1,40 +0,0 @@ -// Copyright 2022 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package pooltask - -import ( - "sync" - "testing" - - "github.com/stretchr/testify/require" -) - -func TestTaskManager(t *testing.T) { - size := 32 - taskConcurrency := 8 - tm := NewTaskManager[int, int, int, any, NilContext](int32(size)) - tm.RegisterTask(1, int32(taskConcurrency)) - for i := 0; i < taskConcurrency; i++ { - tid := NewTaskBox[int, int, int, any, NilContext](1, NilContext{}, &sync.WaitGroup{}, make(chan Task[int]), make(chan int), 1) - tm.AddSubTask(1, &tid) - } - for i := 0; i < taskConcurrency; i++ { - tm.ExitSubTask(1) - } - require.Equal(t, int32(0), tm.Running(1)) - require.True(t, tm.hasTask(1)) - tm.DeleteTask(1) - require.False(t, tm.hasTask(1)) -}