diff --git a/store/copr/coprocessor.go b/store/copr/coprocessor.go index 573f746a6b0d3..e1c5fb0b91d00 100644 --- a/store/copr/coprocessor.go +++ b/store/copr/coprocessor.go @@ -70,6 +70,7 @@ const ( copNextMaxBackoff = 20000 CopSmallTaskRow = 32 // 32 is the initial batch size of TiKV smallTaskSigma = 0.5 + smallConcPerCore = 20 ) // CopClient is coprocessor client. @@ -200,7 +201,7 @@ func (c *CopClient) BuildCopIterator(ctx context.Context, req *kv.Request, vars } if tryRowHint { var smallTasks int - smallTasks, it.smallTaskConcurrency = smallTaskConcurrency(tasks) + smallTasks, it.smallTaskConcurrency = smallTaskConcurrency(tasks, c.store.numcpu) if len(tasks)-smallTasks < it.concurrency { it.concurrency = len(tasks) - smallTasks } @@ -580,7 +581,7 @@ func isSmallTask(task *copTask) bool { // smallTaskConcurrency counts the small tasks of tasks, // then returns the task count and extra concurrency for small tasks. -func smallTaskConcurrency(tasks []*copTask) (int, int) { +func smallTaskConcurrency(tasks []*copTask, numcpu int) (int, int) { res := 0 for _, task := range tasks { if isSmallTask(task) { @@ -592,8 +593,15 @@ func smallTaskConcurrency(tasks []*copTask) (int, int) { } // Calculate the extra concurrency for small tasks // extra concurrency = tasks / (1 + sigma * sqrt(log(tasks ^ 2))) - extraConc := float64(res) / (1 + smallTaskSigma*math.Sqrt(2*math.Log(float64(res)))) - return res, int(extraConc) + extraConc := int(float64(res) / (1 + smallTaskSigma*math.Sqrt(2*math.Log(float64(res))))) + if numcpu <= 0 { + numcpu = 1 + } + smallTaskConcurrencyLimit := smallConcPerCore * numcpu + if extraConc > smallTaskConcurrencyLimit { + extraConc = smallTaskConcurrencyLimit + } + return res, extraConc } type copIterator struct { diff --git a/store/copr/coprocessor_test.go b/store/copr/coprocessor_test.go index c94d441932d8c..ed6f2c6f3cb81 100644 --- a/store/copr/coprocessor_test.go +++ b/store/copr/coprocessor_test.go @@ -696,7 +696,7 @@ func TestBasicSmallTaskConc(t *testing.T) { require.True(t, isSmallTask(&copTask{RowCountHint: 6})) require.True(t, isSmallTask(&copTask{RowCountHint: CopSmallTaskRow})) require.False(t, isSmallTask(&copTask{RowCountHint: CopSmallTaskRow + 1})) - _, conc := smallTaskConcurrency([]*copTask{}) + _, conc := smallTaskConcurrency([]*copTask{}, 16) require.GreaterOrEqual(t, conc, 0) } @@ -734,7 +734,7 @@ func TestBuildCopTasksWithRowCountHint(t *testing.T) { require.Equal(t, tasks[2].RowCountHint, 3) // task[3] ["t"-"x", "y"-"z"] require.Equal(t, tasks[3].RowCountHint, 3+CopSmallTaskRow) - _, conc := smallTaskConcurrency(tasks) + _, conc := smallTaskConcurrency(tasks, 16) require.Equal(t, conc, 1) ranges = buildCopRanges("a", "c", "d", "e", "h", "x", "y", "z") @@ -753,7 +753,7 @@ func TestBuildCopTasksWithRowCountHint(t *testing.T) { require.Equal(t, tasks[2].RowCountHint, 3) // task[3] ["t"-"x", "y"-"z"] require.Equal(t, tasks[3].RowCountHint, 6) - _, conc = smallTaskConcurrency(tasks) + _, conc = smallTaskConcurrency(tasks, 16) require.Equal(t, conc, 2) // cross-region long range @@ -774,3 +774,20 @@ func TestBuildCopTasksWithRowCountHint(t *testing.T) { // task[3] ["t"-"z"] require.Equal(t, tasks[3].RowCountHint, 10) } + +func TestSmallTaskConcurrencyLimit(t *testing.T) { + smallTaskCount := 1000 + tasks := make([]*copTask, 0, smallTaskCount) + for i := 0; i < smallTaskCount; i++ { + tasks = append(tasks, &copTask{ + RowCountHint: 1, + }) + } + count, conc := smallTaskConcurrency(tasks, 1) + require.Equal(t, smallConcPerCore, conc) + require.Equal(t, smallTaskCount, count) + // also handle 0 value. + count, conc = smallTaskConcurrency(tasks, 0) + require.Equal(t, smallConcPerCore, conc) + require.Equal(t, smallTaskCount, count) +} diff --git a/store/copr/store.go b/store/copr/store.go index 32553961acc67..afd1004bdba4d 100644 --- a/store/copr/store.go +++ b/store/copr/store.go @@ -17,6 +17,7 @@ package copr import ( "context" "math/rand" + "runtime" "sync/atomic" "time" @@ -76,6 +77,7 @@ type Store struct { *kvStore coprCache *coprCache replicaReadSeed uint32 + numcpu int } // NewStore creates a new store instance. @@ -90,6 +92,7 @@ func NewStore(s *tikv.KVStore, coprCacheConfig *config.CoprocessorCache) (*Store kvStore: &kvStore{store: s}, coprCache: coprCache, replicaReadSeed: rand.Uint32(), + numcpu: runtime.GOMAXPROCS(0), }, nil }