diff --git a/executor/BUILD.bazel b/executor/BUILD.bazel index 398810aa45451..0d689a369bc96 100644 --- a/executor/BUILD.bazel +++ b/executor/BUILD.bazel @@ -123,6 +123,7 @@ go_library( "//executor/internal/builder", "//executor/internal/exec", "//executor/internal/mpp", + "//executor/internal/pdhelper", "//executor/internal/util", "//executor/metrics", "//executor/mppcoordmanager", diff --git a/executor/builder.go b/executor/builder.go index 873efc799c2fc..03e0ff26354df 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -37,7 +37,7 @@ import ( "github.com/pingcap/tidb/executor/aggfuncs" "github.com/pingcap/tidb/executor/internal/builder" "github.com/pingcap/tidb/executor/internal/exec" - internalutil "github.com/pingcap/tidb/executor/internal/util" + "github.com/pingcap/tidb/executor/internal/pdhelper" executor_metrics "github.com/pingcap/tidb/executor/metrics" "github.com/pingcap/tidb/expression" "github.com/pingcap/tidb/expression/aggregation" @@ -2787,7 +2787,7 @@ func (b *executorBuilder) getAdjustedSampleRate(task plannercore.AnalyzeColumnsT } func (b *executorBuilder) getApproximateTableCountFromStorage(tid int64, task plannercore.AnalyzeColumnsTask) (float64, bool) { - return internalutil.GetApproximateTableCountFromStorage(b.ctx, tid, task.DBName, task.TableName, task.PartitionName) + return pdhelper.GlobalPDHelper.GetApproximateTableCountFromStorage(b.ctx, tid, task.DBName, task.TableName, task.PartitionName) } func (b *executorBuilder) buildAnalyzeColumnsPushdown(task plannercore.AnalyzeColumnsTask, opts map[ast.AnalyzeOptionType]uint64, autoAnalyze string, schemaForVirtualColEval *expression.Schema) *analyzeTask { diff --git a/executor/executor.go b/executor/executor.go index ad8691530f691..621984f0c25a6 100644 --- a/executor/executor.go +++ b/executor/executor.go @@ -35,6 +35,7 @@ import ( "github.com/pingcap/tidb/domain" "github.com/pingcap/tidb/domain/infosync" "github.com/pingcap/tidb/executor/internal/exec" + "github.com/pingcap/tidb/executor/internal/pdhelper" "github.com/pingcap/tidb/expression" "github.com/pingcap/tidb/infoschema" "github.com/pingcap/tidb/kv" @@ -171,6 +172,16 @@ func init() { CheckTableFastBucketSize.Store(1024) } +// Start the backend components +func Start() { + pdhelper.GlobalPDHelper.Start() +} + +// Stop the backend components +func Stop() { + pdhelper.GlobalPDHelper.Stop() +} + // Action panics when storage usage exceeds storage quota. func (a *globalPanicOnExceed) Action(t *memory.Tracker) { a.mutex.Lock() diff --git a/executor/infoschema_reader.go b/executor/infoschema_reader.go index 88f3b51675719..87e979a729590 100644 --- a/executor/infoschema_reader.go +++ b/executor/infoschema_reader.go @@ -37,7 +37,7 @@ import ( "github.com/pingcap/tidb/domain/resourcegroup" "github.com/pingcap/tidb/errno" "github.com/pingcap/tidb/executor/internal/exec" - internalutil "github.com/pingcap/tidb/executor/internal/util" + "github.com/pingcap/tidb/executor/internal/pdhelper" "github.com/pingcap/tidb/expression" "github.com/pingcap/tidb/infoschema" "github.com/pingcap/tidb/kv" @@ -2097,7 +2097,7 @@ func getRemainDurationForAnalyzeStatusHelper( } } if tid > 0 && totalCnt == 0 { - totalCnt, _ = internalutil.GetApproximateTableCountFromStorage(sctx, tid, dbName, tableName, partitionName) + totalCnt, _ = pdhelper.GlobalPDHelper.GetApproximateTableCountFromStorage(sctx, tid, dbName, tableName, partitionName) } RemainingDuration, percentage = calRemainInfoForAnalyzeStatus(ctx, int64(totalCnt), processedRows, duration) } diff --git a/executor/internal/pdhelper/BUILD.bazel b/executor/internal/pdhelper/BUILD.bazel new file mode 100644 index 0000000000000..5bf8a8d78d01d --- /dev/null +++ b/executor/internal/pdhelper/BUILD.bazel @@ -0,0 +1,30 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") + +go_library( + name = "pdhelper", + srcs = ["pd.go"], + importpath = "github.com/pingcap/tidb/executor/internal/pdhelper", + visibility = ["//executor:__subpackages__"], + deps = [ + "//kv", + "//sessionctx", + "//store/helper", + "//util", + "//util/sqlexec", + "@com_github_jellydator_ttlcache_v3//:ttlcache", + "@com_github_pingcap_failpoint//:failpoint", + ], +) + +go_test( + name = "pdhelper_test", + timeout = "short", + srcs = ["pd_test.go"], + embed = [":pdhelper"], + flaky = True, + deps = [ + "//sessionctx", + "@com_github_jellydator_ttlcache_v3//:ttlcache", + "@com_github_stretchr_testify//require", + ], +) diff --git a/executor/internal/pdhelper/pd.go b/executor/internal/pdhelper/pd.go new file mode 100644 index 0000000000000..00f4d3903d832 --- /dev/null +++ b/executor/internal/pdhelper/pd.go @@ -0,0 +1,124 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package pdhelper + +import ( + "context" + "strconv" + "strings" + "sync" + "time" + + "github.com/jellydator/ttlcache/v3" + "github.com/pingcap/failpoint" + "github.com/pingcap/tidb/kv" + "github.com/pingcap/tidb/sessionctx" + "github.com/pingcap/tidb/store/helper" + "github.com/pingcap/tidb/util" + "github.com/pingcap/tidb/util/sqlexec" +) + +// GlobalPDHelper is the global variable for PDHelper. +var GlobalPDHelper = defaultPDHelper() +var globalPDHelperOnce sync.Once + +// PDHelper is used to get some information from PD. +type PDHelper struct { + wg util.WaitGroupWrapper + cacheForApproximateTableCountFromStorage *ttlcache.Cache[string, float64] + + getApproximateTableCountFromStorageFunc func(sctx sessionctx.Context, tid int64, dbName, tableName, partitionName string) (float64, bool) +} + +func defaultPDHelper() *PDHelper { + cache := ttlcache.New[string, float64]( + ttlcache.WithTTL[string, float64](30*time.Second), + ttlcache.WithCapacity[string, float64](1024*1024), + ) + return &PDHelper{ + cacheForApproximateTableCountFromStorage: cache, + getApproximateTableCountFromStorageFunc: getApproximateTableCountFromStorage, + } +} + +// Start is used to start the background task of PDHelper. Currently, the background task is used to clean up TTL cache. +func (p *PDHelper) Start() { + globalPDHelperOnce.Do(func() { + p.wg.Run(p.cacheForApproximateTableCountFromStorage.Start) + }) +} + +// Stop stops the background task of PDHelper. +func (p *PDHelper) Stop() { + p.cacheForApproximateTableCountFromStorage.Stop() + p.wg.Wait() +} + +func approximateTableCountKey(tid int64, dbName, tableName, partitionName string) string { + return strings.Join([]string{strconv.FormatInt(tid, 10), dbName, tableName, partitionName}, "_") +} + +// GetApproximateTableCountFromStorage gets the approximate count of the table. +func (p *PDHelper) GetApproximateTableCountFromStorage(sctx sessionctx.Context, tid int64, dbName, tableName, partitionName string) (float64, bool) { + key := approximateTableCountKey(tid, dbName, tableName, partitionName) + if item := p.cacheForApproximateTableCountFromStorage.Get(key); item != nil { + return item.Value(), true + } + result, hasPD := p.getApproximateTableCountFromStorageFunc(sctx, tid, dbName, tableName, partitionName) + p.cacheForApproximateTableCountFromStorage.Set(key, result, ttlcache.DefaultTTL) + return result, hasPD +} + +func getApproximateTableCountFromStorage(sctx sessionctx.Context, tid int64, dbName, tableName, partitionName string) (float64, bool) { + tikvStore, ok := sctx.GetStore().(helper.Storage) + if !ok { + return 0, false + } + regionStats := &helper.PDRegionStats{} + pdHelper := helper.NewHelper(tikvStore) + err := pdHelper.GetPDRegionStats(tid, regionStats, true) + failpoint.Inject("calcSampleRateByStorageCount", func() { + // Force the TiDB thinking that there's PD and the count of region is small. + err = nil + regionStats.Count = 1 + // Set a very large approximate count. + regionStats.StorageKeys = 1000000 + }) + if err != nil { + return 0, false + } + // If this table is not small, we directly use the count from PD, + // since for a small table, it's possible that it's data is in the same region with part of another large table. + // Thus, we use the number of the regions of the table's table KV to decide whether the table is small. + if regionStats.Count > 2 { + return float64(regionStats.StorageKeys), true + } + // Otherwise, we use count(*) to calc it's size, since it's very small, the table data can be filled in no more than 2 regions. + sql := new(strings.Builder) + sqlexec.MustFormatSQL(sql, "select count(*) from %n.%n", dbName, tableName) + if partitionName != "" { + sqlexec.MustFormatSQL(sql, " partition(%n)", partitionName) + } + ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats) + rows, _, err := sctx.(sqlexec.RestrictedSQLExecutor).ExecRestrictedSQL(ctx, nil, sql.String()) + if err != nil { + return 0, false + } + // If the record set is nil, there's something wrong with the execution. The COUNT(*) would always return one row. + if len(rows) == 0 || rows[0].Len() == 0 { + return 0, false + } + return float64(rows[0].GetInt64(0)), true +} diff --git a/executor/internal/pdhelper/pd_test.go b/executor/internal/pdhelper/pd_test.go new file mode 100644 index 0000000000000..5ca14a9e0a7c8 --- /dev/null +++ b/executor/internal/pdhelper/pd_test.go @@ -0,0 +1,67 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package pdhelper + +import ( + "testing" + "time" + + "github.com/jellydator/ttlcache/v3" + "github.com/pingcap/tidb/sessionctx" + "github.com/stretchr/testify/require" +) + +var globalMockClient mockClient + +type mockClient struct { + missCnt int +} + +func (m *mockClient) getMissCnt() int { + return m.missCnt +} + +func (m *mockClient) getFakeApproximateTableCountFromStorage(_ sessionctx.Context, _ int64, _, _, _ string) (float64, bool) { + m.missCnt++ + return 1.0, true +} + +func TestTTLCache(t *testing.T) { + cache := ttlcache.New[string, float64]( + ttlcache.WithTTL[string, float64](100*time.Millisecond), + ttlcache.WithCapacity[string, float64](2), + ) + helper := &PDHelper{ + cacheForApproximateTableCountFromStorage: cache, + getApproximateTableCountFromStorageFunc: globalMockClient.getFakeApproximateTableCountFromStorage, + } + helper.GetApproximateTableCountFromStorage(nil, 1, "db", "table", "partition") // Miss + require.Equal(t, 1, globalMockClient.getMissCnt()) + helper.GetApproximateTableCountFromStorage(nil, 1, "db", "table", "partition") // Hit + require.Equal(t, 1, globalMockClient.getMissCnt()) + helper.GetApproximateTableCountFromStorage(nil, 2, "db1", "table1", "partition") // Miss + require.Equal(t, 2, globalMockClient.getMissCnt()) + helper.GetApproximateTableCountFromStorage(nil, 3, "db2", "table2", "partition") // Miss + helper.GetApproximateTableCountFromStorage(nil, 1, "db", "table", "partition") // Miss + require.Equal(t, 4, globalMockClient.getMissCnt()) + helper.GetApproximateTableCountFromStorage(nil, 3, "db2", "table2", "partition") // Hit + require.Equal(t, 4, globalMockClient.getMissCnt()) + time.Sleep(200 * time.Millisecond) + // All is miss. + helper.GetApproximateTableCountFromStorage(nil, 1, "db", "table", "partition") + helper.GetApproximateTableCountFromStorage(nil, 2, "db1", "table1", "partition") + helper.GetApproximateTableCountFromStorage(nil, 3, "db2", "table2", "partition") + require.Equal(t, 7, globalMockClient.getMissCnt()) +} diff --git a/executor/internal/util/BUILD.bazel b/executor/internal/util/BUILD.bazel index a697a759a9b77..94dc9d923c2b9 100644 --- a/executor/internal/util/BUILD.bazel +++ b/executor/internal/util/BUILD.bazel @@ -9,12 +9,7 @@ go_library( importpath = "github.com/pingcap/tidb/executor/internal/util", visibility = ["//executor:__subpackages__"], deps = [ - "//kv", - "//sessionctx", - "//store/helper", - "//util/sqlexec", "@com_github_pingcap_errors//:errors", - "@com_github_pingcap_failpoint//:failpoint", "@com_github_pingcap_tipb//go-tipb", ], ) diff --git a/executor/internal/util/util.go b/executor/internal/util/util.go index 8773e9ef988a2..3aa79ce5134ff 100644 --- a/executor/internal/util/util.go +++ b/executor/internal/util/util.go @@ -13,57 +13,3 @@ // limitations under the License. package util - -import ( - "context" - "strings" - - "github.com/pingcap/failpoint" - "github.com/pingcap/tidb/kv" - "github.com/pingcap/tidb/sessionctx" - "github.com/pingcap/tidb/store/helper" - "github.com/pingcap/tidb/util/sqlexec" -) - -// GetApproximateTableCountFromStorage gets the approximate count of the table. -func GetApproximateTableCountFromStorage(sctx sessionctx.Context, tid int64, dbName, tableName, partitionName string) (float64, bool) { - tikvStore, ok := sctx.GetStore().(helper.Storage) - if !ok { - return 0, false - } - regionStats := &helper.PDRegionStats{} - pdHelper := helper.NewHelper(tikvStore) - err := pdHelper.GetPDRegionStats(tid, regionStats, true) - failpoint.Inject("calcSampleRateByStorageCount", func() { - // Force the TiDB thinking that there's PD and the count of region is small. - err = nil - regionStats.Count = 1 - // Set a very large approximate count. - regionStats.StorageKeys = 1000000 - }) - if err != nil { - return 0, false - } - // If this table is not small, we directly use the count from PD, - // since for a small table, it's possible that it's data is in the same region with part of another large table. - // Thus, we use the number of the regions of the table's table KV to decide whether the table is small. - if regionStats.Count > 2 { - return float64(regionStats.StorageKeys), true - } - // Otherwise, we use count(*) to calc it's size, since it's very small, the table data can be filled in no more than 2 regions. - sql := new(strings.Builder) - sqlexec.MustFormatSQL(sql, "select count(*) from %n.%n", dbName, tableName) - if partitionName != "" { - sqlexec.MustFormatSQL(sql, " partition(%n)", partitionName) - } - ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats) - rows, _, err := sctx.(sqlexec.RestrictedSQLExecutor).ExecRestrictedSQL(ctx, nil, sql.String()) - if err != nil { - return 0, false - } - // If the record set is nil, there's something wrong with the execution. The COUNT(*) would always return one row. - if len(rows) == 0 || rows[0].Len() == 0 { - return 0, false - } - return float64(rows[0].GetInt64(0)), true -} diff --git a/tidb-server/main.go b/tidb-server/main.go index 835ae99859022..c51a80c702145 100644 --- a/tidb-server/main.go +++ b/tidb-server/main.go @@ -235,7 +235,7 @@ func main() { setupMetrics() keyspaceName := keyspace.GetKeyspaceNameBySettings() - + executor.Start() resourcemanager.InstanceResourceManager.Start() storage, dom := createStoreAndDomain(keyspaceName) svr := createServer(storage, dom) @@ -251,6 +251,7 @@ func main() { cleanup(svr, storage, dom) cpuprofile.StopCPUProfiler() resourcemanager.InstanceResourceManager.Stop() + executor.Stop() close(exited) }) topsql.SetupTopSQL()