From 0a5347975c582115c6014cd436a8bf42272f1e91 Mon Sep 17 00:00:00 2001 From: you06 Date: Tue, 7 Feb 2023 14:34:14 +0800 Subject: [PATCH 1/3] store/copr: set upper limit for extra concurrency Signed-off-by: you06 --- store/copr/coprocessor.go | 10 ++++++++-- store/copr/coprocessor_test.go | 18 ++++++++++++++++++ 2 files changed, 26 insertions(+), 2 deletions(-) diff --git a/store/copr/coprocessor.go b/store/copr/coprocessor.go index eca0b8037daa6..b02570a38090f 100644 --- a/store/copr/coprocessor.go +++ b/store/copr/coprocessor.go @@ -18,6 +18,7 @@ import ( "context" "fmt" "math" + "runtime" "strconv" "strings" "sync" @@ -72,6 +73,8 @@ const ( smallTaskSigma = 0.5 ) +var smallTaskConcurrencyLimit = 20 * runtime.NumCPU() + // CopClient is coprocessor client. type CopClient struct { kv.RequestTypeSupportedChecker @@ -590,8 +593,11 @@ func smallTaskConcurrency(tasks []*copTask) (int, int) { } // Calculate the extra concurrency for small tasks // extra concurrency = tasks / (1 + sigma * sqrt(log(tasks ^ 2))) - extraConc := float64(res) / (1 + smallTaskSigma*math.Sqrt(2*math.Log(float64(res)))) - return res, int(extraConc) + extraConc := int(float64(res) / (1 + smallTaskSigma*math.Sqrt(2*math.Log(float64(res))))) + if extraConc > smallTaskConcurrencyLimit { + extraConc = smallTaskConcurrencyLimit + } + return res, extraConc } type copIterator struct { diff --git a/store/copr/coprocessor_test.go b/store/copr/coprocessor_test.go index f7b15ebfd682d..baa6175c04ce3 100644 --- a/store/copr/coprocessor_test.go +++ b/store/copr/coprocessor_test.go @@ -745,3 +745,21 @@ func TestBuildCopTasksWithRowCountHint(t *testing.T) { // task[3] ["t"-"z"] require.Equal(t, tasks[3].RowCountHint, 10) } + +func TestSmallTaskConcurrency(t *testing.T) { + originSmallTaskLimit := smallTaskConcurrencyLimit + defer func() { + smallTaskConcurrencyLimit = originSmallTaskLimit + }() + smallTaskConcurrencyLimit = 10 + smallTaskCount := 1000 + tasks := make([]*copTask, 0, smallTaskCount) + for i := 0; i < smallTaskCount; i++ { + tasks = append(tasks, &copTask{ + RowCountHint: 1, + }) + } + count, conc := smallTaskConcurrency(tasks) + require.Equal(t, smallTaskConcurrencyLimit, conc) + require.Equal(t, smallTaskCount, count) +} From fd3cfab1467aa0c615500f703ac202a6a34b9152 Mon Sep 17 00:00:00 2001 From: you06 Date: Tue, 7 Feb 2023 15:54:21 +0800 Subject: [PATCH 2/3] use runtime.GOMAXPROCS(0) Signed-off-by: you06 --- store/copr/coprocessor.go | 13 ++++++++----- store/copr/coprocessor_test.go | 21 ++++++++++----------- store/copr/store.go | 2 ++ 3 files changed, 20 insertions(+), 16 deletions(-) diff --git a/store/copr/coprocessor.go b/store/copr/coprocessor.go index b1ff9f5d03c5d..2a4e217cba3ca 100644 --- a/store/copr/coprocessor.go +++ b/store/copr/coprocessor.go @@ -18,7 +18,6 @@ import ( "context" "fmt" "math" - "runtime" "strconv" "strings" "sync" @@ -71,15 +70,15 @@ const ( copNextMaxBackoff = 20000 CopSmallTaskRow = 32 // 32 is the initial batch size of TiKV smallTaskSigma = 0.5 + smallConcPerCore = 20 ) -var smallTaskConcurrencyLimit = 20 * runtime.NumCPU() - // CopClient is coprocessor client. type CopClient struct { kv.RequestTypeSupportedChecker store *Store replicaReadSeed uint32 + numcpu int } // Send builds the request and gets the coprocessor iterator response. @@ -203,7 +202,7 @@ func (c *CopClient) BuildCopIterator(ctx context.Context, req *kv.Request, vars } if tryRowHint { var smallTasks int - smallTasks, it.smallTaskConcurrency = smallTaskConcurrency(tasks) + smallTasks, it.smallTaskConcurrency = smallTaskConcurrency(tasks, c.numcpu) if len(tasks)-smallTasks < it.concurrency { it.concurrency = len(tasks) - smallTasks } @@ -583,7 +582,7 @@ func isSmallTask(task *copTask) bool { // smallTaskConcurrency counts the small tasks of tasks, // then returns the task count and extra concurrency for small tasks. -func smallTaskConcurrency(tasks []*copTask) (int, int) { +func smallTaskConcurrency(tasks []*copTask, numcpu int) (int, int) { res := 0 for _, task := range tasks { if isSmallTask(task) { @@ -596,6 +595,10 @@ func smallTaskConcurrency(tasks []*copTask) (int, int) { // Calculate the extra concurrency for small tasks // extra concurrency = tasks / (1 + sigma * sqrt(log(tasks ^ 2))) extraConc := int(float64(res) / (1 + smallTaskSigma*math.Sqrt(2*math.Log(float64(res))))) + if numcpu <= 0 { + numcpu = 1 + } + smallTaskConcurrencyLimit := smallConcPerCore * numcpu if extraConc > smallTaskConcurrencyLimit { extraConc = smallTaskConcurrencyLimit } diff --git a/store/copr/coprocessor_test.go b/store/copr/coprocessor_test.go index c817e7aa417ec..ed6f2c6f3cb81 100644 --- a/store/copr/coprocessor_test.go +++ b/store/copr/coprocessor_test.go @@ -696,7 +696,7 @@ func TestBasicSmallTaskConc(t *testing.T) { require.True(t, isSmallTask(&copTask{RowCountHint: 6})) require.True(t, isSmallTask(&copTask{RowCountHint: CopSmallTaskRow})) require.False(t, isSmallTask(&copTask{RowCountHint: CopSmallTaskRow + 1})) - _, conc := smallTaskConcurrency([]*copTask{}) + _, conc := smallTaskConcurrency([]*copTask{}, 16) require.GreaterOrEqual(t, conc, 0) } @@ -734,7 +734,7 @@ func TestBuildCopTasksWithRowCountHint(t *testing.T) { require.Equal(t, tasks[2].RowCountHint, 3) // task[3] ["t"-"x", "y"-"z"] require.Equal(t, tasks[3].RowCountHint, 3+CopSmallTaskRow) - _, conc := smallTaskConcurrency(tasks) + _, conc := smallTaskConcurrency(tasks, 16) require.Equal(t, conc, 1) ranges = buildCopRanges("a", "c", "d", "e", "h", "x", "y", "z") @@ -753,7 +753,7 @@ func TestBuildCopTasksWithRowCountHint(t *testing.T) { require.Equal(t, tasks[2].RowCountHint, 3) // task[3] ["t"-"x", "y"-"z"] require.Equal(t, tasks[3].RowCountHint, 6) - _, conc = smallTaskConcurrency(tasks) + _, conc = smallTaskConcurrency(tasks, 16) require.Equal(t, conc, 2) // cross-region long range @@ -775,12 +775,7 @@ func TestBuildCopTasksWithRowCountHint(t *testing.T) { require.Equal(t, tasks[3].RowCountHint, 10) } -func TestSmallTaskConcurrency(t *testing.T) { - originSmallTaskLimit := smallTaskConcurrencyLimit - defer func() { - smallTaskConcurrencyLimit = originSmallTaskLimit - }() - smallTaskConcurrencyLimit = 10 +func TestSmallTaskConcurrencyLimit(t *testing.T) { smallTaskCount := 1000 tasks := make([]*copTask, 0, smallTaskCount) for i := 0; i < smallTaskCount; i++ { @@ -788,7 +783,11 @@ func TestSmallTaskConcurrency(t *testing.T) { RowCountHint: 1, }) } - count, conc := smallTaskConcurrency(tasks) - require.Equal(t, smallTaskConcurrencyLimit, conc) + count, conc := smallTaskConcurrency(tasks, 1) + require.Equal(t, smallConcPerCore, conc) + require.Equal(t, smallTaskCount, count) + // also handle 0 value. + count, conc = smallTaskConcurrency(tasks, 0) + require.Equal(t, smallConcPerCore, conc) require.Equal(t, smallTaskCount, count) } diff --git a/store/copr/store.go b/store/copr/store.go index 32553961acc67..05c1fd6d8cc33 100644 --- a/store/copr/store.go +++ b/store/copr/store.go @@ -17,6 +17,7 @@ package copr import ( "context" "math/rand" + "runtime" "sync/atomic" "time" @@ -109,6 +110,7 @@ func (s *Store) GetClient() kv.Client { return &CopClient{ store: s, replicaReadSeed: s.nextReplicaReadSeed(), + numcpu: runtime.GOMAXPROCS(0), } } From bb5121086f0ac4e684a30369c3691629931ddf69 Mon Sep 17 00:00:00 2001 From: you06 Date: Tue, 7 Feb 2023 17:02:37 +0800 Subject: [PATCH 3/3] avoid mutex contention Signed-off-by: you06 --- store/copr/coprocessor.go | 3 +-- store/copr/store.go | 3 ++- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/store/copr/coprocessor.go b/store/copr/coprocessor.go index 2a4e217cba3ca..e1c5fb0b91d00 100644 --- a/store/copr/coprocessor.go +++ b/store/copr/coprocessor.go @@ -78,7 +78,6 @@ type CopClient struct { kv.RequestTypeSupportedChecker store *Store replicaReadSeed uint32 - numcpu int } // Send builds the request and gets the coprocessor iterator response. @@ -202,7 +201,7 @@ func (c *CopClient) BuildCopIterator(ctx context.Context, req *kv.Request, vars } if tryRowHint { var smallTasks int - smallTasks, it.smallTaskConcurrency = smallTaskConcurrency(tasks, c.numcpu) + smallTasks, it.smallTaskConcurrency = smallTaskConcurrency(tasks, c.store.numcpu) if len(tasks)-smallTasks < it.concurrency { it.concurrency = len(tasks) - smallTasks } diff --git a/store/copr/store.go b/store/copr/store.go index 05c1fd6d8cc33..afd1004bdba4d 100644 --- a/store/copr/store.go +++ b/store/copr/store.go @@ -77,6 +77,7 @@ type Store struct { *kvStore coprCache *coprCache replicaReadSeed uint32 + numcpu int } // NewStore creates a new store instance. @@ -91,6 +92,7 @@ func NewStore(s *tikv.KVStore, coprCacheConfig *config.CoprocessorCache) (*Store kvStore: &kvStore{store: s}, coprCache: coprCache, replicaReadSeed: rand.Uint32(), + numcpu: runtime.GOMAXPROCS(0), }, nil } @@ -110,7 +112,6 @@ func (s *Store) GetClient() kv.Client { return &CopClient{ store: s, replicaReadSeed: s.nextReplicaReadSeed(), - numcpu: runtime.GOMAXPROCS(0), } }