Skip to content

Commit

Permalink
cherry pick pingcap#35326 to release-4.0
Browse files Browse the repository at this point in the history
Signed-off-by: ti-srebot <ti-srebot@pingcap.com>
  • Loading branch information
xuyifangreeneyes authored and ti-srebot committed Jun 20, 2022
1 parent 75f81d2 commit 91bbde9
Showing 1 changed file with 244 additions and 0 deletions.
244 changes: 244 additions & 0 deletions statistics/handle/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,12 @@ package handle
import (
"context"
"fmt"
<<<<<<< HEAD
=======
"sort"
"strconv"
"strings"
>>>>>>> 6266817ce... statistics: batch insert topn and bucket when saving table stats (#35326)
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -652,7 +658,245 @@ func (h *Handle) SaveStatsToStorage(tableID int64, count int64, isIndex int, hg
if err != nil {
return
}
<<<<<<< HEAD
if _, err = exec.ExecuteInternal(ctx, "insert into mysql.stats_buckets(table_id, is_index, hist_id, bucket_id, count, repeats, lower_bound, upper_bound) values(%?, %?, %?, %?, %?, %?, %?, %?)", tableID, isIndex, hg.ID, i, count, hg.Buckets[i].Repeat, lowerBound.GetBytes(), upperBound.GetBytes()); err != nil {
=======
if _, err = exec.ExecuteInternal(ctx, "update mysql.stats_meta set version=%?, modify_count=%?, count=%?, snapshot=%? where table_id=%?", version, modifyCnt, cnt, results.Snapshot, tableID); err != nil {
return err
}
statsVer = version
}
// 2. Save histograms.
const maxInsertLength = 1024 * 1024
for _, result := range results.Ars {
for i, hg := range result.Hist {
// It's normal virtual column, skip it.
if hg == nil {
continue
}
var cms *statistics.CMSketch
if results.StatsVer != statistics.Version2 {
cms = result.Cms[i]
}
cmSketch, err := statistics.EncodeCMSketchWithoutTopN(cms)
if err != nil {
return err
}
fmSketch, err := statistics.EncodeFMSketch(result.Fms[i])
if err != nil {
return err
}
// Delete outdated data
if _, err = exec.ExecuteInternal(ctx, "delete from mysql.stats_top_n where table_id = %? and is_index = %? and hist_id = %?", tableID, result.IsIndex, hg.ID); err != nil {
return err
}
if topN := result.TopNs[i]; topN != nil {
for j := 0; j < len(topN.TopN); {
end := j + batchInsertSize
if end > len(topN.TopN) {
end = len(topN.TopN)
}
sql := new(strings.Builder)
sql.WriteString("insert into mysql.stats_top_n (table_id, is_index, hist_id, value, count) values ")
for k := j; k < end; k++ {
val := sqlexec.MustEscapeSQL("(%?, %?, %?, %?, %?)", tableID, result.IsIndex, hg.ID, topN.TopN[k].Encoded, topN.TopN[k].Count)
if k > j {
val = "," + val
}
if k > j && sql.Len()+len(val) > maxInsertLength {
end = k
break
}
sql.WriteString(val)
}
j = end
if _, err = exec.ExecuteInternal(ctx, sql.String()); err != nil {
return err
}
}
}
if _, err := exec.ExecuteInternal(ctx, "delete from mysql.stats_fm_sketch where table_id = %? and is_index = %? and hist_id = %?", tableID, result.IsIndex, hg.ID); err != nil {
return err
}
if fmSketch != nil && needDumpFMS {
if _, err = exec.ExecuteInternal(ctx, "insert into mysql.stats_fm_sketch (table_id, is_index, hist_id, value) values (%?, %?, %?, %?)", tableID, result.IsIndex, hg.ID, fmSketch); err != nil {
return err
}
}
if _, err = exec.ExecuteInternal(ctx, "replace into mysql.stats_histograms (table_id, is_index, hist_id, distinct_count, version, null_count, cm_sketch, tot_col_size, stats_ver, flag, correlation) values (%?, %?, %?, %?, %?, %?, %?, %?, %?, %?, %?)",
tableID, result.IsIndex, hg.ID, hg.NDV, version, hg.NullCount, cmSketch, hg.TotColSize, results.StatsVer, statistics.AnalyzeFlag, hg.Correlation); err != nil {
return err
}
if _, err = exec.ExecuteInternal(ctx, "delete from mysql.stats_buckets where table_id = %? and is_index = %? and hist_id = %?", tableID, result.IsIndex, hg.ID); err != nil {
return err
}
sc := h.mu.ctx.GetSessionVars().StmtCtx
var lastAnalyzePos []byte
for j := 0; j < len(hg.Buckets); {
end := j + batchInsertSize
if end > len(hg.Buckets) {
end = len(hg.Buckets)
}
sql := new(strings.Builder)
sql.WriteString("insert into mysql.stats_buckets (table_id, is_index, hist_id, bucket_id, count, repeats, lower_bound, upper_bound, ndv) values ")
for k := j; k < end; k++ {
count := hg.Buckets[k].Count
if k > 0 {
count -= hg.Buckets[k-1].Count
}
var upperBound types.Datum
upperBound, err = hg.GetUpper(k).ConvertTo(sc, types.NewFieldType(mysql.TypeBlob))
if err != nil {
return err
}
if k == len(hg.Buckets)-1 {
lastAnalyzePos = upperBound.GetBytes()
}
var lowerBound types.Datum
lowerBound, err = hg.GetLower(k).ConvertTo(sc, types.NewFieldType(mysql.TypeBlob))
if err != nil {
return err
}
val := sqlexec.MustEscapeSQL("(%?, %?, %?, %?, %?, %?, %?, %?, %?)", tableID, result.IsIndex, hg.ID, k, count, hg.Buckets[k].Repeat, lowerBound.GetBytes(), upperBound.GetBytes(), hg.Buckets[k].NDV)
if k > j {
val = "," + val
}
if k > j && sql.Len()+len(val) > maxInsertLength {
end = k
break
}
sql.WriteString(val)
}
j = end
if _, err = exec.ExecuteInternal(ctx, sql.String()); err != nil {
return err
}
}
if len(lastAnalyzePos) > 0 {
if _, err = exec.ExecuteInternal(ctx, "update mysql.stats_histograms set last_analyze_pos = %? where table_id = %? and is_index = %? and hist_id = %?", lastAnalyzePos, tableID, result.IsIndex, hg.ID); err != nil {
return err
}
}
if result.IsIndex == 0 {
if _, err = exec.ExecuteInternal(ctx, "insert into mysql.column_stats_usage (table_id, column_id, last_analyzed_at) values(%?, %?, current_timestamp()) on duplicate key update last_analyzed_at = values(last_analyzed_at)", tableID, hg.ID); err != nil {
return err
}
}
}
}
// 3. Save extended statistics.
extStats := results.ExtStats
if extStats == nil || len(extStats.Stats) == 0 {
return nil
}
var bytes []byte
var statsStr string
for name, item := range extStats.Stats {
bytes, err = json.Marshal(item.ColIDs)
if err != nil {
return err
}
strColIDs := string(bytes)
switch item.Tp {
case ast.StatsTypeCardinality, ast.StatsTypeCorrelation:
statsStr = fmt.Sprintf("%f", item.ScalarVals)
case ast.StatsTypeDependency:
statsStr = item.StringVals
}
if _, err = exec.ExecuteInternal(ctx, "replace into mysql.stats_extended values (%?, %?, %?, %?, %?, %?, %?)", name, item.Tp, tableID, strColIDs, statsStr, version, StatsStatusAnalyzed); err != nil {
return err
}
}
return
}

// SaveStatsToStorage saves the stats to storage.
// TODO: refactor to reduce the number of parameters
func (h *Handle) SaveStatsToStorage(tableID int64, count int64, isIndex int, hg *statistics.Histogram, cms *statistics.CMSketch, topN *statistics.TopN, statsVersion int, isAnalyzed int64, updateAnalyzeTime bool) (err error) {
statsVer := uint64(0)
defer func() {
if err == nil && statsVer != 0 {
err = h.recordHistoricalStatsMeta(tableID, statsVer)
}
}()
h.mu.Lock()
defer h.mu.Unlock()
ctx := context.TODO()
exec := h.mu.ctx.(sqlexec.SQLExecutor)
_, err = exec.ExecuteInternal(ctx, "begin pessimistic")
if err != nil {
return errors.Trace(err)
}
defer func() {
err = finishTransaction(context.Background(), exec, err)
}()
txn, err := h.mu.ctx.Txn(true)
if err != nil {
return errors.Trace(err)
}

version := txn.StartTS()
// If the count is less than 0, then we do not want to update the modify count and count.
if count >= 0 {
_, err = exec.ExecuteInternal(ctx, "replace into mysql.stats_meta (version, table_id, count) values (%?, %?, %?)", version, tableID, count)
} else {
_, err = exec.ExecuteInternal(ctx, "update mysql.stats_meta set version = %? where table_id = %?", version, tableID)
}
if err != nil {
return err
}
statsVer = version
cmSketch, err := statistics.EncodeCMSketchWithoutTopN(cms)
if err != nil {
return err
}
// Delete outdated data
if _, err = exec.ExecuteInternal(ctx, "delete from mysql.stats_top_n where table_id = %? and is_index = %? and hist_id = %?", tableID, isIndex, hg.ID); err != nil {
return err
}
if topN != nil {
for _, meta := range topN.TopN {
if _, err = exec.ExecuteInternal(ctx, "insert into mysql.stats_top_n (table_id, is_index, hist_id, value, count) values (%?, %?, %?, %?, %?)", tableID, isIndex, hg.ID, meta.Encoded, meta.Count); err != nil {
return err
}
}
}
if _, err := exec.ExecuteInternal(ctx, "delete from mysql.stats_fm_sketch where table_id = %? and is_index = %? and hist_id = %?", tableID, isIndex, hg.ID); err != nil {
return err
}
flag := 0
if isAnalyzed == 1 {
flag = statistics.AnalyzeFlag
}
if _, err = exec.ExecuteInternal(ctx, "replace into mysql.stats_histograms (table_id, is_index, hist_id, distinct_count, version, null_count, cm_sketch, tot_col_size, stats_ver, flag, correlation) values (%?, %?, %?, %?, %?, %?, %?, %?, %?, %?, %?)",
tableID, isIndex, hg.ID, hg.NDV, version, hg.NullCount, cmSketch, hg.TotColSize, statsVersion, flag, hg.Correlation); err != nil {
return err
}
if _, err = exec.ExecuteInternal(ctx, "delete from mysql.stats_buckets where table_id = %? and is_index = %? and hist_id = %?", tableID, isIndex, hg.ID); err != nil {
return err
}
sc := h.mu.ctx.GetSessionVars().StmtCtx
var lastAnalyzePos []byte
for i := range hg.Buckets {
count := hg.Buckets[i].Count
if i > 0 {
count -= hg.Buckets[i-1].Count
}
var upperBound types.Datum
upperBound, err = hg.GetUpper(i).ConvertTo(sc, types.NewFieldType(mysql.TypeBlob))
if err != nil {
return
}
if i == len(hg.Buckets)-1 {
lastAnalyzePos = upperBound.GetBytes()
}
var lowerBound types.Datum
lowerBound, err = hg.GetLower(i).ConvertTo(sc, types.NewFieldType(mysql.TypeBlob))
if err != nil {
return
}
if _, err = exec.ExecuteInternal(ctx, "insert into mysql.stats_buckets(table_id, is_index, hist_id, bucket_id, count, repeats, lower_bound, upper_bound, ndv) values(%?, %?, %?, %?, %?, %?, %?, %?, %?)", tableID, isIndex, hg.ID, i, count, hg.Buckets[i].Repeat, lowerBound.GetBytes(), upperBound.GetBytes(), hg.Buckets[i].NDV); err != nil {
>>>>>>> 6266817ce... statistics: batch insert topn and bucket when saving table stats (#35326)
return err
}
}
Expand Down

0 comments on commit 91bbde9

Please sign in to comment.