Skip to content

Commit

Permalink
planner: introduce an interface for StatsHandle to simplify the code (
Browse files Browse the repository at this point in the history
  • Loading branch information
qw4990 authored Oct 16, 2023
1 parent 79db4ec commit 4765838
Show file tree
Hide file tree
Showing 11 changed files with 146 additions and 156 deletions.
103 changes: 30 additions & 73 deletions pkg/statistics/handle/autoanalyze/autoanalyze.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,60 +44,33 @@ import (
// statsAnalyze implements util.StatsAnalyze.
// statsAnalyze is used to handle auto-analyze and manage analyze jobs.
type statsAnalyze struct {
pool statsutil.SessionPool

// TODO: use interfaces instead of raw function pointers below
sysProcTracker sessionctx.SysProcTracker
getLockedTables func(tableIDs ...int64) (map[int64]struct{}, error)
getTableStats func(tblInfo *model.TableInfo) *statistics.Table
getPartitionStats func(tblInfo *model.TableInfo, pid int64) *statistics.Table
autoAnalyzeProcIDGetter func() uint64
statsLease time.Duration
statsHandle statsutil.StatsHandle
}

// NewStatsAnalyze creates a new StatsAnalyze.
func NewStatsAnalyze(pool statsutil.SessionPool,
sysProcTracker sessionctx.SysProcTracker,
getLockedTables func(tableIDs ...int64) (map[int64]struct{}, error),
getTableStats func(tblInfo *model.TableInfo) *statistics.Table,
getPartitionStats func(tblInfo *model.TableInfo, pid int64) *statistics.Table,
autoAnalyzeProcIDGetter func() uint64,
statsLease time.Duration) statsutil.StatsAnalyze {
return &statsAnalyze{pool: pool,
sysProcTracker: sysProcTracker,
getLockedTables: getLockedTables,
getTableStats: getTableStats,
getPartitionStats: getPartitionStats,
autoAnalyzeProcIDGetter: autoAnalyzeProcIDGetter,
statsLease: statsLease}
func NewStatsAnalyze(statsHandle statsutil.StatsHandle) statsutil.StatsAnalyze {
return &statsAnalyze{statsHandle: statsHandle}
}

// InsertAnalyzeJob inserts the analyze job to the storage.
func (sa *statsAnalyze) InsertAnalyzeJob(job *statistics.AnalyzeJob, instance string, procID uint64) error {
return statsutil.CallWithSCtx(sa.pool, func(sctx sessionctx.Context) error {
return statsutil.CallWithSCtx(sa.statsHandle.SPool(), func(sctx sessionctx.Context) error {
return insertAnalyzeJob(sctx, job, instance, procID)
})
}

// DeleteAnalyzeJobs deletes the analyze jobs whose update time is earlier than updateTime.
func (sa *statsAnalyze) DeleteAnalyzeJobs(updateTime time.Time) error {
return statsutil.CallWithSCtx(sa.pool, func(sctx sessionctx.Context) error {
return statsutil.CallWithSCtx(sa.statsHandle.SPool(), func(sctx sessionctx.Context) error {
_, _, err := statsutil.ExecRows(sctx, "DELETE FROM mysql.analyze_jobs WHERE update_time < CONVERT_TZ(%?, '+00:00', @@TIME_ZONE)", updateTime.UTC().Format(types.TimeFormat))
return err
})
}

// HandleAutoAnalyze analyzes the newly created table or index.
func (sa *statsAnalyze) HandleAutoAnalyze(is infoschema.InfoSchema) (analyzed bool) {
_ = statsutil.CallWithSCtx(sa.pool, func(sctx sessionctx.Context) error {
analyzed = HandleAutoAnalyze(sctx, &Opt{
StatsLease: sa.statsLease,
GetLockedTables: sa.getLockedTables,
GetTableStats: sa.getTableStats,
GetPartitionStats: sa.getPartitionStats,
SysProcTracker: sa.sysProcTracker,
AutoAnalyzeProcIDGetter: sa.autoAnalyzeProcIDGetter,
}, is)
_ = statsutil.CallWithSCtx(sa.statsHandle.SPool(), func(sctx sessionctx.Context) error {
analyzed = HandleAutoAnalyze(sctx, sa.statsHandle, is)
return nil
})
return
Expand Down Expand Up @@ -139,25 +112,9 @@ func getAutoAnalyzeParameters(sctx sessionctx.Context) map[string]string {
return parameters
}

// Opt is used to hold parameters for auto analyze.
type Opt struct {
// SysProcTracker is used to track analyze resource consumption.
SysProcTracker sessionctx.SysProcTracker
// GetLockedTables is used to look up locked tables which will be skipped in auto analyze.
GetLockedTables func(tableIDs ...int64) (map[int64]struct{}, error)
// GetTableStats is used to look up table stats to decide whether to analyze the table.
GetTableStats func(tblInfo *model.TableInfo) *statistics.Table
// GetPartitionStats is used to look up partition stats to decide whether to analyze the partition.
GetPartitionStats func(tblInfo *model.TableInfo, pid int64) *statistics.Table
// AutoAnalyzeProcIDGetter is used to assign job ID for analyze jobs.
AutoAnalyzeProcIDGetter func() uint64
// StatsLease is the current stats lease.
StatsLease time.Duration
}

// HandleAutoAnalyze analyzes the newly created table or index.
func HandleAutoAnalyze(sctx sessionctx.Context,
opt *Opt,
statsHandle statsutil.StatsHandle,
is infoschema.InfoSchema) (analyzed bool) {
defer func() {
if r := recover(); r != nil {
Expand Down Expand Up @@ -207,7 +164,7 @@ func HandleAutoAnalyze(sctx sessionctx.Context,
}
}

lockedTables, err := opt.GetLockedTables(tidsAndPids...)
lockedTables, err := statsHandle.GetLockedTables(tidsAndPids...)
if err != nil {
logutil.BgLogger().Error("check table lock failed",
zap.String("category", "stats"), zap.Error(err))
Expand All @@ -226,9 +183,9 @@ func HandleAutoAnalyze(sctx sessionctx.Context,
}
pi := tblInfo.GetPartitionInfo()
if pi == nil {
statsTbl := opt.GetTableStats(tblInfo)
statsTbl := statsHandle.GetTableStats(tblInfo)
sql := "analyze table %n.%n"
analyzed := autoAnalyzeTable(sctx, opt, tblInfo, statsTbl, autoAnalyzeRatio, sql, db, tblInfo.Name.O)
analyzed := autoAnalyzeTable(sctx, statsHandle, tblInfo, statsTbl, autoAnalyzeRatio, sql, db, tblInfo.Name.O)
if analyzed {
// analyze one table at a time to let it get the freshest parameters.
// others will be analyzed next round which is just 3s later.
Expand All @@ -244,16 +201,16 @@ func HandleAutoAnalyze(sctx sessionctx.Context,
}
}
if pruneMode == variable.Dynamic {
analyzed := autoAnalyzePartitionTableInDynamicMode(sctx, opt, tblInfo, partitionDefs, db, autoAnalyzeRatio)
analyzed := autoAnalyzePartitionTableInDynamicMode(sctx, statsHandle, tblInfo, partitionDefs, db, autoAnalyzeRatio)
if analyzed {
return true
}
continue
}
for _, def := range partitionDefs {
sql := "analyze table %n.%n partition %n"
statsTbl := opt.GetPartitionStats(tblInfo, def.ID)
analyzed := autoAnalyzeTable(sctx, opt, tblInfo, statsTbl, autoAnalyzeRatio, sql, db, tblInfo.Name.O, def.Name.O)
statsTbl := statsHandle.GetPartitionStats(tblInfo, def.ID)
analyzed := autoAnalyzeTable(sctx, statsHandle, tblInfo, statsTbl, autoAnalyzeRatio, sql, db, tblInfo.Name.O, def.Name.O)
if analyzed {
return true
}
Expand All @@ -267,21 +224,21 @@ func HandleAutoAnalyze(sctx sessionctx.Context,
var AutoAnalyzeMinCnt int64 = 1000

func autoAnalyzeTable(sctx sessionctx.Context,
opt *Opt,
statsHandle statsutil.StatsHandle,
tblInfo *model.TableInfo, statsTbl *statistics.Table,
ratio float64, sql string, params ...interface{}) bool {
if statsTbl.Pseudo || statsTbl.RealtimeCount < AutoAnalyzeMinCnt {
return false
}
if needAnalyze, reason := NeedAnalyzeTable(statsTbl, 20*opt.StatsLease, ratio); needAnalyze {
if needAnalyze, reason := NeedAnalyzeTable(statsTbl, 20*statsHandle.Lease(), ratio); needAnalyze {
escaped, err := sqlexec.EscapeSQL(sql, params...)
if err != nil {
return false
}
logutil.BgLogger().Info("auto analyze triggered", zap.String("category", "stats"), zap.String("sql", escaped), zap.String("reason", reason))
tableStatsVer := sctx.GetSessionVars().AnalyzeVersion
statistics.CheckAnalyzeVerOnTable(statsTbl, &tableStatsVer)
execAutoAnalyze(sctx, opt, tableStatsVer, sql, params...)
execAutoAnalyze(sctx, statsHandle, tableStatsVer, sql, params...)
return true
}
for _, idx := range tblInfo.Indices {
Expand All @@ -295,7 +252,7 @@ func autoAnalyzeTable(sctx sessionctx.Context,
logutil.BgLogger().Info("auto analyze for unanalyzed", zap.String("category", "stats"), zap.String("sql", escaped))
tableStatsVer := sctx.GetSessionVars().AnalyzeVersion
statistics.CheckAnalyzeVerOnTable(statsTbl, &tableStatsVer)
execAutoAnalyze(sctx, opt, tableStatsVer, sqlWithIdx, paramsWithIdx...)
execAutoAnalyze(sctx, statsHandle, tableStatsVer, sqlWithIdx, paramsWithIdx...)
return true
}
}
Expand Down Expand Up @@ -347,18 +304,18 @@ func TableAnalyzed(tbl *statistics.Table) bool {
}

func autoAnalyzePartitionTableInDynamicMode(sctx sessionctx.Context,
opt *Opt,
statsHandle statsutil.StatsHandle,
tblInfo *model.TableInfo, partitionDefs []model.PartitionDefinition,
db string, ratio float64) bool {
tableStatsVer := sctx.GetSessionVars().AnalyzeVersion
analyzePartitionBatchSize := int(variable.AutoAnalyzePartitionBatchSize.Load())
partitionNames := make([]interface{}, 0, len(partitionDefs))
for _, def := range partitionDefs {
partitionStatsTbl := opt.GetPartitionStats(tblInfo, def.ID)
partitionStatsTbl := statsHandle.GetPartitionStats(tblInfo, def.ID)
if partitionStatsTbl.Pseudo || partitionStatsTbl.RealtimeCount < AutoAnalyzeMinCnt {
continue
}
if needAnalyze, _ := NeedAnalyzeTable(partitionStatsTbl, 20*opt.StatsLease, ratio); needAnalyze {
if needAnalyze, _ := NeedAnalyzeTable(partitionStatsTbl, 20*statsHandle.Lease(), ratio); needAnalyze {
partitionNames = append(partitionNames, def.Name.O)
statistics.CheckAnalyzeVerOnTable(partitionStatsTbl, &tableStatsVer)
}
Expand All @@ -380,7 +337,7 @@ func autoAnalyzePartitionTableInDynamicMode(sctx sessionctx.Context,
zap.String("table", tblInfo.Name.String()),
zap.Any("partitions", partitionNames),
zap.Int("analyze partition batch size", analyzePartitionBatchSize))
statsTbl := opt.GetTableStats(tblInfo)
statsTbl := statsHandle.GetTableStats(tblInfo)
statistics.CheckAnalyzeVerOnTable(statsTbl, &tableStatsVer)
for i := 0; i < len(partitionNames); i += analyzePartitionBatchSize {
start := i
Expand All @@ -393,7 +350,7 @@ func autoAnalyzePartitionTableInDynamicMode(sctx sessionctx.Context,
logutil.BgLogger().Info("auto analyze triggered", zap.String("category", "stats"),
zap.String("table", tblInfo.Name.String()),
zap.Any("partitions", partitionNames[start:end]))
execAutoAnalyze(sctx, opt, tableStatsVer, sql, params...)
execAutoAnalyze(sctx, statsHandle, tableStatsVer, sql, params...)
}
return true
}
Expand All @@ -402,14 +359,14 @@ func autoAnalyzePartitionTableInDynamicMode(sctx sessionctx.Context,
continue
}
for _, def := range partitionDefs {
partitionStatsTbl := opt.GetPartitionStats(tblInfo, def.ID)
partitionStatsTbl := statsHandle.GetPartitionStats(tblInfo, def.ID)
if _, ok := partitionStatsTbl.Indices[idx.ID]; !ok {
partitionNames = append(partitionNames, def.Name.O)
statistics.CheckAnalyzeVerOnTable(partitionStatsTbl, &tableStatsVer)
}
}
if len(partitionNames) > 0 {
statsTbl := opt.GetTableStats(tblInfo)
statsTbl := statsHandle.GetTableStats(tblInfo)
statistics.CheckAnalyzeVerOnTable(statsTbl, &tableStatsVer)
for i := 0; i < len(partitionNames); i += analyzePartitionBatchSize {
start := i
Expand All @@ -424,7 +381,7 @@ func autoAnalyzePartitionTableInDynamicMode(sctx sessionctx.Context,
zap.String("table", tblInfo.Name.String()),
zap.String("index", idx.Name.String()),
zap.Any("partitions", partitionNames[start:end]))
execAutoAnalyze(sctx, opt, tableStatsVer, sql, params...)
execAutoAnalyze(sctx, statsHandle, tableStatsVer, sql, params...)
}
return true
}
Expand All @@ -439,11 +396,11 @@ var execOptionForAnalyze = map[int]sqlexec.OptionFuncAlias{
}

func execAutoAnalyze(sctx sessionctx.Context,
opt *Opt,
statsHandle statsutil.StatsHandle,
statsVer int,
sql string, params ...interface{}) {
startTime := time.Now()
_, _, err := execAnalyzeStmt(sctx, opt, statsVer, sql, params...)
_, _, err := execAnalyzeStmt(sctx, statsHandle, statsVer, sql, params...)
dur := time.Since(startTime)
metrics.AutoAnalyzeHistogram.Observe(dur.Seconds())
if err != nil {
Expand All @@ -459,7 +416,7 @@ func execAutoAnalyze(sctx sessionctx.Context,
}

func execAnalyzeStmt(sctx sessionctx.Context,
opt *Opt,
statsHandle statsutil.StatsHandle,
statsVer int,
sql string, params ...interface{}) ([]chunk.Row, []*ast.ResultField, error) {
pruneMode := sctx.GetSessionVars().PartitionPruneMode.Load()
Expand All @@ -469,7 +426,7 @@ func execAnalyzeStmt(sctx sessionctx.Context,
sqlexec.GetAnalyzeSnapshotOption(analyzeSnapshot),
sqlexec.GetPartitionPruneModeOption(pruneMode),
sqlexec.ExecOptionUseCurSession,
sqlexec.ExecOptionWithSysProcTrack(opt.AutoAnalyzeProcIDGetter(), opt.SysProcTracker.Track, opt.SysProcTracker.UnTrack),
sqlexec.ExecOptionWithSysProcTrack(statsHandle.AutoAnalyzeProcID(), statsHandle.SysProcTracker().Track, statsHandle.SysProcTracker().UnTrack),
}
return statsutil.ExecWithOpts(sctx, optFuncs, sql, params...)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/statistics/handle/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func (h *Handle) initStatsMeta(is infoschema.InfoSchema) (util.StatsCache, error
return nil, errors.Trace(err)
}
defer terror.Call(rc.Close)
tables, err := cache.NewStatsCacheImpl(h.pool, h.TableInfoGetter, h.Lease(), h.TableStatsFromStorage)
tables, err := cache.NewStatsCacheImpl(h, h.TableStatsFromStorage)
if err != nil {
return nil, err
}
Expand Down
19 changes: 7 additions & 12 deletions pkg/statistics/handle/cache/statscache.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package cache

import (
"sync/atomic"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/tidb/pkg/config"
Expand All @@ -33,26 +32,22 @@ import (

// StatsCacheImpl implements util.StatsCache.
type StatsCacheImpl struct {
pool util.SessionPool
tblInfo util.TableInfoGetter
atomic.Pointer[StatsCache]

statsHandle util.StatsHandle
tableStatsFromStorage func(tableInfo *model.TableInfo, physicalID int64, loadAll bool, snapshot uint64) (statsTbl *statistics.Table, err error)
statsLease time.Duration
}

// NewStatsCacheImpl creates a new StatsCache.
func NewStatsCacheImpl(pool util.SessionPool, tblInfo util.TableInfoGetter, statsLease time.Duration,
func NewStatsCacheImpl(statsHandle util.StatsHandle,
tableStatsFromStorage func(tableInfo *model.TableInfo, physicalID int64, loadAll bool, snapshot uint64) (statsTbl *statistics.Table, err error),
) (util.StatsCache, error) {
newCache, err := NewStatsCache()
if err != nil {
return nil, err
}
result := &StatsCacheImpl{
pool: pool,
tblInfo: tblInfo,
statsLease: statsLease,
statsHandle: statsHandle,
tableStatsFromStorage: tableStatsFromStorage,
}
result.Store(newCache)
Expand All @@ -61,7 +56,7 @@ func NewStatsCacheImpl(pool util.SessionPool, tblInfo util.TableInfoGetter, stat

// NewStatsCacheImplForTest creates a new StatsCache for test.
func NewStatsCacheImplForTest() (util.StatsCache, error) {
return NewStatsCacheImpl(nil, nil, 0, nil)
return NewStatsCacheImpl(nil, nil)
}

// Update reads stats meta from store and updates the stats map.
Expand All @@ -72,7 +67,7 @@ func (s *StatsCacheImpl) Update(is infoschema.InfoSchema) error {
// and A0 < B0 < B1 < A1. We will first read the stats of B, and update the lastVersion to B0, but we cannot read
// the table stats of A0 if we read stats that greater than lastVersion which is B0.
// We can read the stats if the diff between commit time and version is less than three lease.
offset := util.DurationToTS(3 * s.statsLease)
offset := util.DurationToTS(3 * s.statsHandle.Lease())
if s.MaxTableStatsVersion() >= offset {
lastVersion = lastVersion - offset
} else {
Expand All @@ -81,7 +76,7 @@ func (s *StatsCacheImpl) Update(is infoschema.InfoSchema) error {

var rows []chunk.Row
var err error
err = util.CallWithSCtx(s.pool, func(sctx sessionctx.Context) error {
err = util.CallWithSCtx(s.statsHandle.SPool(), func(sctx sessionctx.Context) error {
rows, _, err = util.ExecRows(sctx, "SELECT version, table_id, modify_count, count from mysql.stats_meta where version > %? order by version", lastVersion)
return err
})
Expand All @@ -95,7 +90,7 @@ func (s *StatsCacheImpl) Update(is infoschema.InfoSchema) error {
physicalID := row.GetInt64(1)
modifyCount := row.GetInt64(2)
count := row.GetInt64(3)
table, ok := s.tblInfo.TableInfoByID(is, physicalID)
table, ok := s.statsHandle.TableInfoByID(is, physicalID)
if !ok {
logutil.BgLogger().Debug("unknown physical ID in stats meta table, maybe it has been dropped", zap.Int64("ID", physicalID))
deletedTableIDs = append(deletedTableIDs, physicalID)
Expand Down
Loading

0 comments on commit 4765838

Please sign in to comment.