Skip to content

Commit

Permalink
executor: add cache for approximate table count (pingcap#44979)
Browse files Browse the repository at this point in the history
  • Loading branch information
hawkingrei authored Jul 3, 2023
1 parent 60784a3 commit 96dcbb7
Show file tree
Hide file tree
Showing 10 changed files with 239 additions and 64 deletions.
1 change: 1 addition & 0 deletions executor/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ go_library(
"//executor/internal/builder",
"//executor/internal/exec",
"//executor/internal/mpp",
"//executor/internal/pdhelper",
"//executor/internal/util",
"//executor/metrics",
"//executor/mppcoordmanager",
Expand Down
4 changes: 2 additions & 2 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ import (
"github.com/pingcap/tidb/executor/aggfuncs"
"github.com/pingcap/tidb/executor/internal/builder"
"github.com/pingcap/tidb/executor/internal/exec"
internalutil "github.com/pingcap/tidb/executor/internal/util"
"github.com/pingcap/tidb/executor/internal/pdhelper"
executor_metrics "github.com/pingcap/tidb/executor/metrics"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/expression/aggregation"
Expand Down Expand Up @@ -2787,7 +2787,7 @@ func (b *executorBuilder) getAdjustedSampleRate(task plannercore.AnalyzeColumnsT
}

func (b *executorBuilder) getApproximateTableCountFromStorage(tid int64, task plannercore.AnalyzeColumnsTask) (float64, bool) {
return internalutil.GetApproximateTableCountFromStorage(b.ctx, tid, task.DBName, task.TableName, task.PartitionName)
return pdhelper.GlobalPDHelper.GetApproximateTableCountFromStorage(b.ctx, tid, task.DBName, task.TableName, task.PartitionName)
}

func (b *executorBuilder) buildAnalyzeColumnsPushdown(task plannercore.AnalyzeColumnsTask, opts map[ast.AnalyzeOptionType]uint64, autoAnalyze string, schemaForVirtualColEval *expression.Schema) *analyzeTask {
Expand Down
11 changes: 11 additions & 0 deletions executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/domain/infosync"
"github.com/pingcap/tidb/executor/internal/exec"
"github.com/pingcap/tidb/executor/internal/pdhelper"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/kv"
Expand Down Expand Up @@ -171,6 +172,16 @@ func init() {
CheckTableFastBucketSize.Store(1024)
}

// Start the backend components
func Start() {
pdhelper.GlobalPDHelper.Start()
}

// Stop the backend components
func Stop() {
pdhelper.GlobalPDHelper.Stop()
}

// Action panics when storage usage exceeds storage quota.
func (a *globalPanicOnExceed) Action(t *memory.Tracker) {
a.mutex.Lock()
Expand Down
4 changes: 2 additions & 2 deletions executor/infoschema_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ import (
"github.com/pingcap/tidb/domain/resourcegroup"
"github.com/pingcap/tidb/errno"
"github.com/pingcap/tidb/executor/internal/exec"
internalutil "github.com/pingcap/tidb/executor/internal/util"
"github.com/pingcap/tidb/executor/internal/pdhelper"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/kv"
Expand Down Expand Up @@ -2097,7 +2097,7 @@ func getRemainDurationForAnalyzeStatusHelper(
}
}
if tid > 0 && totalCnt == 0 {
totalCnt, _ = internalutil.GetApproximateTableCountFromStorage(sctx, tid, dbName, tableName, partitionName)
totalCnt, _ = pdhelper.GlobalPDHelper.GetApproximateTableCountFromStorage(sctx, tid, dbName, tableName, partitionName)
}
RemainingDuration, percentage = calRemainInfoForAnalyzeStatus(ctx, int64(totalCnt), processedRows, duration)
}
Expand Down
30 changes: 30 additions & 0 deletions executor/internal/pdhelper/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "pdhelper",
srcs = ["pd.go"],
importpath = "github.com/pingcap/tidb/executor/internal/pdhelper",
visibility = ["//executor:__subpackages__"],
deps = [
"//kv",
"//sessionctx",
"//store/helper",
"//util",
"//util/sqlexec",
"@com_github_jellydator_ttlcache_v3//:ttlcache",
"@com_github_pingcap_failpoint//:failpoint",
],
)

go_test(
name = "pdhelper_test",
timeout = "short",
srcs = ["pd_test.go"],
embed = [":pdhelper"],
flaky = True,
deps = [
"//sessionctx",
"@com_github_jellydator_ttlcache_v3//:ttlcache",
"@com_github_stretchr_testify//require",
],
)
124 changes: 124 additions & 0 deletions executor/internal/pdhelper/pd.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package pdhelper

import (
"context"
"strconv"
"strings"
"sync"
"time"

"github.com/jellydator/ttlcache/v3"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/store/helper"
"github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/sqlexec"
)

// GlobalPDHelper is the global variable for PDHelper.
var GlobalPDHelper = defaultPDHelper()
var globalPDHelperOnce sync.Once

// PDHelper is used to get some information from PD.
type PDHelper struct {
wg util.WaitGroupWrapper
cacheForApproximateTableCountFromStorage *ttlcache.Cache[string, float64]

getApproximateTableCountFromStorageFunc func(sctx sessionctx.Context, tid int64, dbName, tableName, partitionName string) (float64, bool)
}

func defaultPDHelper() *PDHelper {
cache := ttlcache.New[string, float64](
ttlcache.WithTTL[string, float64](30*time.Second),
ttlcache.WithCapacity[string, float64](1024*1024),
)
return &PDHelper{
cacheForApproximateTableCountFromStorage: cache,
getApproximateTableCountFromStorageFunc: getApproximateTableCountFromStorage,
}
}

// Start is used to start the background task of PDHelper. Currently, the background task is used to clean up TTL cache.
func (p *PDHelper) Start() {
globalPDHelperOnce.Do(func() {
p.wg.Run(p.cacheForApproximateTableCountFromStorage.Start)
})
}

// Stop stops the background task of PDHelper.
func (p *PDHelper) Stop() {
p.cacheForApproximateTableCountFromStorage.Stop()
p.wg.Wait()
}

func approximateTableCountKey(tid int64, dbName, tableName, partitionName string) string {
return strings.Join([]string{strconv.FormatInt(tid, 10), dbName, tableName, partitionName}, "_")
}

// GetApproximateTableCountFromStorage gets the approximate count of the table.
func (p *PDHelper) GetApproximateTableCountFromStorage(sctx sessionctx.Context, tid int64, dbName, tableName, partitionName string) (float64, bool) {
key := approximateTableCountKey(tid, dbName, tableName, partitionName)
if item := p.cacheForApproximateTableCountFromStorage.Get(key); item != nil {
return item.Value(), true
}
result, hasPD := p.getApproximateTableCountFromStorageFunc(sctx, tid, dbName, tableName, partitionName)
p.cacheForApproximateTableCountFromStorage.Set(key, result, ttlcache.DefaultTTL)
return result, hasPD
}

func getApproximateTableCountFromStorage(sctx sessionctx.Context, tid int64, dbName, tableName, partitionName string) (float64, bool) {
tikvStore, ok := sctx.GetStore().(helper.Storage)
if !ok {
return 0, false
}
regionStats := &helper.PDRegionStats{}
pdHelper := helper.NewHelper(tikvStore)
err := pdHelper.GetPDRegionStats(tid, regionStats, true)
failpoint.Inject("calcSampleRateByStorageCount", func() {
// Force the TiDB thinking that there's PD and the count of region is small.
err = nil
regionStats.Count = 1
// Set a very large approximate count.
regionStats.StorageKeys = 1000000
})
if err != nil {
return 0, false
}
// If this table is not small, we directly use the count from PD,
// since for a small table, it's possible that it's data is in the same region with part of another large table.
// Thus, we use the number of the regions of the table's table KV to decide whether the table is small.
if regionStats.Count > 2 {
return float64(regionStats.StorageKeys), true
}
// Otherwise, we use count(*) to calc it's size, since it's very small, the table data can be filled in no more than 2 regions.
sql := new(strings.Builder)
sqlexec.MustFormatSQL(sql, "select count(*) from %n.%n", dbName, tableName)
if partitionName != "" {
sqlexec.MustFormatSQL(sql, " partition(%n)", partitionName)
}
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats)
rows, _, err := sctx.(sqlexec.RestrictedSQLExecutor).ExecRestrictedSQL(ctx, nil, sql.String())
if err != nil {
return 0, false
}
// If the record set is nil, there's something wrong with the execution. The COUNT(*) would always return one row.
if len(rows) == 0 || rows[0].Len() == 0 {
return 0, false
}
return float64(rows[0].GetInt64(0)), true
}
67 changes: 67 additions & 0 deletions executor/internal/pdhelper/pd_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package pdhelper

import (
"testing"
"time"

"github.com/jellydator/ttlcache/v3"
"github.com/pingcap/tidb/sessionctx"
"github.com/stretchr/testify/require"
)

var globalMockClient mockClient

type mockClient struct {
missCnt int
}

func (m *mockClient) getMissCnt() int {
return m.missCnt
}

func (m *mockClient) getFakeApproximateTableCountFromStorage(_ sessionctx.Context, _ int64, _, _, _ string) (float64, bool) {
m.missCnt++
return 1.0, true
}

func TestTTLCache(t *testing.T) {
cache := ttlcache.New[string, float64](
ttlcache.WithTTL[string, float64](100*time.Millisecond),
ttlcache.WithCapacity[string, float64](2),
)
helper := &PDHelper{
cacheForApproximateTableCountFromStorage: cache,
getApproximateTableCountFromStorageFunc: globalMockClient.getFakeApproximateTableCountFromStorage,
}
helper.GetApproximateTableCountFromStorage(nil, 1, "db", "table", "partition") // Miss
require.Equal(t, 1, globalMockClient.getMissCnt())
helper.GetApproximateTableCountFromStorage(nil, 1, "db", "table", "partition") // Hit
require.Equal(t, 1, globalMockClient.getMissCnt())
helper.GetApproximateTableCountFromStorage(nil, 2, "db1", "table1", "partition") // Miss
require.Equal(t, 2, globalMockClient.getMissCnt())
helper.GetApproximateTableCountFromStorage(nil, 3, "db2", "table2", "partition") // Miss
helper.GetApproximateTableCountFromStorage(nil, 1, "db", "table", "partition") // Miss
require.Equal(t, 4, globalMockClient.getMissCnt())
helper.GetApproximateTableCountFromStorage(nil, 3, "db2", "table2", "partition") // Hit
require.Equal(t, 4, globalMockClient.getMissCnt())
time.Sleep(200 * time.Millisecond)
// All is miss.
helper.GetApproximateTableCountFromStorage(nil, 1, "db", "table", "partition")
helper.GetApproximateTableCountFromStorage(nil, 2, "db1", "table1", "partition")
helper.GetApproximateTableCountFromStorage(nil, 3, "db2", "table2", "partition")
require.Equal(t, 7, globalMockClient.getMissCnt())
}
5 changes: 0 additions & 5 deletions executor/internal/util/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,7 @@ go_library(
importpath = "github.com/pingcap/tidb/executor/internal/util",
visibility = ["//executor:__subpackages__"],
deps = [
"//kv",
"//sessionctx",
"//store/helper",
"//util/sqlexec",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_pingcap_tipb//go-tipb",
],
)
54 changes: 0 additions & 54 deletions executor/internal/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,57 +13,3 @@
// limitations under the License.

package util

import (
"context"
"strings"

"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/store/helper"
"github.com/pingcap/tidb/util/sqlexec"
)

// GetApproximateTableCountFromStorage gets the approximate count of the table.
func GetApproximateTableCountFromStorage(sctx sessionctx.Context, tid int64, dbName, tableName, partitionName string) (float64, bool) {
tikvStore, ok := sctx.GetStore().(helper.Storage)
if !ok {
return 0, false
}
regionStats := &helper.PDRegionStats{}
pdHelper := helper.NewHelper(tikvStore)
err := pdHelper.GetPDRegionStats(tid, regionStats, true)
failpoint.Inject("calcSampleRateByStorageCount", func() {
// Force the TiDB thinking that there's PD and the count of region is small.
err = nil
regionStats.Count = 1
// Set a very large approximate count.
regionStats.StorageKeys = 1000000
})
if err != nil {
return 0, false
}
// If this table is not small, we directly use the count from PD,
// since for a small table, it's possible that it's data is in the same region with part of another large table.
// Thus, we use the number of the regions of the table's table KV to decide whether the table is small.
if regionStats.Count > 2 {
return float64(regionStats.StorageKeys), true
}
// Otherwise, we use count(*) to calc it's size, since it's very small, the table data can be filled in no more than 2 regions.
sql := new(strings.Builder)
sqlexec.MustFormatSQL(sql, "select count(*) from %n.%n", dbName, tableName)
if partitionName != "" {
sqlexec.MustFormatSQL(sql, " partition(%n)", partitionName)
}
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats)
rows, _, err := sctx.(sqlexec.RestrictedSQLExecutor).ExecRestrictedSQL(ctx, nil, sql.String())
if err != nil {
return 0, false
}
// If the record set is nil, there's something wrong with the execution. The COUNT(*) would always return one row.
if len(rows) == 0 || rows[0].Len() == 0 {
return 0, false
}
return float64(rows[0].GetInt64(0)), true
}
3 changes: 2 additions & 1 deletion tidb-server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ func main() {
setupMetrics()

keyspaceName := keyspace.GetKeyspaceNameBySettings()

executor.Start()
resourcemanager.InstanceResourceManager.Start()
storage, dom := createStoreAndDomain(keyspaceName)
svr := createServer(storage, dom)
Expand All @@ -251,6 +251,7 @@ func main() {
cleanup(svr, storage, dom)
cpuprofile.StopCPUProfiler()
resourcemanager.InstanceResourceManager.Stop()
executor.Stop()
close(exited)
})
topsql.SetupTopSQL()
Expand Down

0 comments on commit 96dcbb7

Please sign in to comment.