Skip to content

Commit

Permalink
executor: add some comments and fix typos (pingcap#46987)
Browse files Browse the repository at this point in the history
  • Loading branch information
Rustin170506 authored Sep 14, 2023
1 parent 82c3af1 commit b75e8f0
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 13 deletions.
4 changes: 2 additions & 2 deletions distsql/request_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -488,15 +488,15 @@ func encodeHandleKey(ran *ranger.Range) ([]byte, []byte) {
// 1. signedRanges is less or equal than MaxInt64
// 2. unsignedRanges is greater than MaxInt64
//
// We do this because every key of tikv is encoded as an int64. As a result, MaxUInt64 is small than zero when
// We do this because every key of tikv is encoded as an int64. As a result, MaxUInt64 is smaller than zero when
// interpreted as an int64 variable.
//
// This function does the following:
// 1. split ranges into two groups as described above.
// 2. if there's a range that straddles the int64 boundary, split it into two ranges, which results in one smaller and
// one greater than MaxInt64.
//
// if `KeepOrder` is false, we merge the two groups of ranges into one group, to save an rpc call later
// if `KeepOrder` is false, we merge the two groups of ranges into one group, to save a rpc call later
// if `desc` is false, return signed ranges first, vice versa.
func SplitRangesAcrossInt64Boundary(ranges []*ranger.Range, keepOrder bool, desc bool, isCommonHandle bool) ([]*ranger.Range, []*ranger.Range) {
if isCommonHandle || len(ranges) == 0 || ranges[0].LowVal[0].Kind() == types.KindInt64 {
Expand Down
32 changes: 24 additions & 8 deletions executor/analyze_col_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,17 +55,16 @@ type AnalyzeColumnsExecV2 struct {

func (e *AnalyzeColumnsExecV2) analyzeColumnsPushDownWithRetryV2() *statistics.AnalyzeResults {
analyzeResult := e.analyzeColumnsPushDownV2()
// do not retry if succeed / not oom error / not auto-analyze / samplerate not set
if analyzeResult.Err == nil || analyzeResult.Err != errAnalyzeOOM ||
!e.ctx.GetSessionVars().InRestrictedSQL ||
e.analyzePB.ColReq == nil || *e.analyzePB.ColReq.SampleRate <= 0 {
if e.notRetryable(analyzeResult) {
return analyzeResult
}

finishJobWithLog(e.ctx, analyzeResult.Job, analyzeResult.Err)
statsHandle := domain.GetDomain(e.ctx).StatsHandle()
if statsHandle == nil {
return analyzeResult
}

var statsTbl *statistics.Table
tid := e.tableID.GetStatisticsID()
if tid == e.tableInfo.ID {
Expand All @@ -76,6 +75,7 @@ func (e *AnalyzeColumnsExecV2) analyzeColumnsPushDownWithRetryV2() *statistics.A
if statsTbl == nil || statsTbl.RealtimeCount <= 0 {
return analyzeResult
}

newSampleRate := math.Min(1, float64(config.DefRowsForSampleRate)/float64(statsTbl.RealtimeCount))
if newSampleRate >= *e.analyzePB.ColReq.SampleRate {
return analyzeResult
Expand All @@ -87,6 +87,13 @@ func (e *AnalyzeColumnsExecV2) analyzeColumnsPushDownWithRetryV2() *statistics.A
return e.analyzeColumnsPushDownV2()
}

// Do **not** retry if succeed / not oom error / not auto-analyze / samplerate not set.
func (e *AnalyzeColumnsExecV2) notRetryable(analyzeResult *statistics.AnalyzeResults) bool {
return analyzeResult.Err == nil || analyzeResult.Err != errAnalyzeOOM ||
!e.ctx.GetSessionVars().InRestrictedSQL ||
e.analyzePB.ColReq == nil || *e.analyzePB.ColReq.SampleRate <= 0
}

func (e *AnalyzeColumnsExecV2) analyzeColumnsPushDownV2() *statistics.AnalyzeResults {
var ranges []*ranger.Range
if hc := e.handleCols; hc != nil {
Expand All @@ -98,6 +105,7 @@ func (e *AnalyzeColumnsExecV2) analyzeColumnsPushDownV2() *statistics.AnalyzeRes
} else {
ranges = ranger.FullIntRange(false)
}

collExtStats := e.ctx.GetSessionVars().EnableExtendedStats
specialIndexes := make([]*model.IndexInfo, 0, len(e.indexes))
specialIndexesOffsets := make([]int, 0, len(e.indexes))
Expand Down Expand Up @@ -126,15 +134,15 @@ func (e *AnalyzeColumnsExecV2) analyzeColumnsPushDownV2() *statistics.AnalyzeRes
e.handleNDVForSpecialIndexes(specialIndexes, idxNDVPushDownCh)
})
defer wg.Wait()
count, hists, topns, fmSketches, extStats, err := e.buildSamplingStats(ranges, collExtStats, specialIndexesOffsets, idxNDVPushDownCh)
count, hists, topNs, fmSketches, extStats, err := e.buildSamplingStats(ranges, collExtStats, specialIndexesOffsets, idxNDVPushDownCh)
if err != nil {
e.memTracker.Release(e.memTracker.BytesConsumed())
return &statistics.AnalyzeResults{Err: err, Job: e.job}
}
cLen := len(e.analyzePB.ColReq.ColumnsInfo)
colGroupResult := &statistics.AnalyzeResult{
Hist: hists[cLen:],
TopNs: topns[cLen:],
TopNs: topNs[cLen:],
Fms: fmSketches[cLen:],
IsIndex: 1,
}
Expand All @@ -148,7 +156,7 @@ func (e *AnalyzeColumnsExecV2) analyzeColumnsPushDownV2() *statistics.AnalyzeRes
}
colResult := &statistics.AnalyzeResult{
Hist: hists[:cLen],
TopNs: topns[:cLen],
TopNs: topNs[:cLen],
Fms: fmSketches[:cLen],
}
return &statistics.AnalyzeResults{
Expand Down Expand Up @@ -235,6 +243,7 @@ func (e *AnalyzeColumnsExecV2) buildSamplingStats(
extStats *statistics.ExtendedStatsColl,
err error,
) {
// Open memory tracker and resultHandler.
if err = e.open(ranges); err != nil {
return 0, nil, nil, nil, nil, err
}
Expand All @@ -243,27 +252,34 @@ func (e *AnalyzeColumnsExecV2) buildSamplingStats(
err = err1
}
}()

l := len(e.analyzePB.ColReq.ColumnsInfo) + len(e.analyzePB.ColReq.ColumnGroups)
rootRowCollector := statistics.NewRowSampleCollector(int(e.analyzePB.ColReq.SampleSize), e.analyzePB.ColReq.GetSampleRate(), l)
for i := 0; i < l; i++ {
rootRowCollector.Base().FMSketches = append(rootRowCollector.Base().FMSketches, statistics.NewFMSketch(maxSketchSize))
}

sc := e.ctx.GetSessionVars().StmtCtx
statsConcurrency, err := getBuildStatsConcurrency(e.ctx)
if err != nil {
return 0, nil, nil, nil, nil, err
}

// Start workers to merge the result from collectors.
mergeResultCh := make(chan *samplingMergeResult, statsConcurrency)
mergeTaskCh := make(chan []byte, statsConcurrency)
e.samplingMergeWg = &util.WaitGroupWrapper{}
e.samplingMergeWg.Add(statsConcurrency)
for i := 0; i < statsConcurrency; i++ {
go e.subMergeWorker(mergeResultCh, mergeTaskCh, l, i)
}

// Start read data from resultHandler and send them to mergeTaskCh.
if err = readDataAndSendTask(e.ctx, e.resultHandler, mergeTaskCh, e.memTracker); err != nil {
return 0, nil, nil, nil, nil, getAnalyzePanicErr(err)
}

// Merge the result from collectors.
mergeWorkerPanicCnt := 0
for mergeWorkerPanicCnt < statsConcurrency {
mergeResult, ok := <-mergeResultCh
Expand Down Expand Up @@ -291,7 +307,7 @@ func (e *AnalyzeColumnsExecV2) buildSamplingStats(
return 0, nil, nil, nil, nil, err
}

// handling virtual columns
// Handling virtual columns.
virtualColIdx := buildVirtualColumnIndex(e.schemaForVirtualColEval, e.colsInfo)
if len(virtualColIdx) > 0 {
fieldTps := make([]*types.FieldType, 0, len(virtualColIdx))
Expand Down
22 changes: 19 additions & 3 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -2670,7 +2670,11 @@ func (b *executorBuilder) buildAnalyzeIndexIncremental(task plannercore.AnalyzeI
return analyzeTask
}

func (b *executorBuilder) buildAnalyzeSamplingPushdown(task plannercore.AnalyzeColumnsTask, opts map[ast.AnalyzeOptionType]uint64, schemaForVirtualColEval *expression.Schema) *analyzeTask {
func (b *executorBuilder) buildAnalyzeSamplingPushdown(
task plannercore.AnalyzeColumnsTask,
opts map[ast.AnalyzeOptionType]uint64,
schemaForVirtualColEval *expression.Schema,
) *analyzeTask {
if task.V2Options != nil {
opts = task.V2Options.FilledOpts
}
Expand Down Expand Up @@ -2844,7 +2848,12 @@ func (b *executorBuilder) getApproximateTableCountFromStorage(tid int64, task pl
return pdhelper.GlobalPDHelper.GetApproximateTableCountFromStorage(b.ctx, tid, task.DBName, task.TableName, task.PartitionName)
}

func (b *executorBuilder) buildAnalyzeColumnsPushdown(task plannercore.AnalyzeColumnsTask, opts map[ast.AnalyzeOptionType]uint64, autoAnalyze string, schemaForVirtualColEval *expression.Schema) *analyzeTask {
func (b *executorBuilder) buildAnalyzeColumnsPushdown(
task plannercore.AnalyzeColumnsTask,
opts map[ast.AnalyzeOptionType]uint64,
autoAnalyze string,
schemaForVirtualColEval *expression.Schema,
) *analyzeTask {
if task.StatsVersion == statistics.Version2 {
return b.buildAnalyzeSamplingPushdown(task, opts, schemaForVirtualColEval)
}
Expand Down Expand Up @@ -3085,7 +3094,13 @@ func (b *executorBuilder) buildAnalyze(v *plannercore.Analyze) exec.Executor {
if enableFastAnalyze {
b.buildAnalyzeFastColumn(e, task, v.Opts)
} else {
columns, _, err := expression.ColumnInfos2ColumnsAndNames(b.ctx, model.NewCIStr(task.AnalyzeInfo.DBName), task.TblInfo.Name, task.ColsInfo, task.TblInfo)
columns, _, err := expression.ColumnInfos2ColumnsAndNames(
b.ctx,
model.NewCIStr(task.AnalyzeInfo.DBName),
task.TblInfo.Name,
task.ColsInfo,
task.TblInfo,
)
if err != nil {
b.err = err
return nil
Expand All @@ -3094,6 +3109,7 @@ func (b *executorBuilder) buildAnalyze(v *plannercore.Analyze) exec.Executor {
e.tasks = append(e.tasks, b.buildAnalyzeColumnsPushdown(task, v.Opts, autoAnalyze, schema))
}
}
// Other functions may set b.err, so we need to check it here.
if b.err != nil {
return nil
}
Expand Down

0 comments on commit b75e8f0

Please sign in to comment.