diff --git a/statistics/handle/handle.go b/statistics/handle/handle.go index aaaf2475e800b..96239262b42c6 100644 --- a/statistics/handle/handle.go +++ b/statistics/handle/handle.go @@ -16,6 +16,12 @@ package handle import ( "context" "fmt" +<<<<<<< HEAD +======= + "sort" + "strconv" + "strings" +>>>>>>> 6266817ce... statistics: batch insert topn and bucket when saving table stats (#35326) "sync" "sync/atomic" "time" @@ -652,7 +658,245 @@ func (h *Handle) SaveStatsToStorage(tableID int64, count int64, isIndex int, hg if err != nil { return } +<<<<<<< HEAD if _, err = exec.ExecuteInternal(ctx, "insert into mysql.stats_buckets(table_id, is_index, hist_id, bucket_id, count, repeats, lower_bound, upper_bound) values(%?, %?, %?, %?, %?, %?, %?, %?)", tableID, isIndex, hg.ID, i, count, hg.Buckets[i].Repeat, lowerBound.GetBytes(), upperBound.GetBytes()); err != nil { +======= + if _, err = exec.ExecuteInternal(ctx, "update mysql.stats_meta set version=%?, modify_count=%?, count=%?, snapshot=%? where table_id=%?", version, modifyCnt, cnt, results.Snapshot, tableID); err != nil { + return err + } + statsVer = version + } + // 2. Save histograms. + const maxInsertLength = 1024 * 1024 + for _, result := range results.Ars { + for i, hg := range result.Hist { + // It's normal virtual column, skip it. + if hg == nil { + continue + } + var cms *statistics.CMSketch + if results.StatsVer != statistics.Version2 { + cms = result.Cms[i] + } + cmSketch, err := statistics.EncodeCMSketchWithoutTopN(cms) + if err != nil { + return err + } + fmSketch, err := statistics.EncodeFMSketch(result.Fms[i]) + if err != nil { + return err + } + // Delete outdated data + if _, err = exec.ExecuteInternal(ctx, "delete from mysql.stats_top_n where table_id = %? and is_index = %? and hist_id = %?", tableID, result.IsIndex, hg.ID); err != nil { + return err + } + if topN := result.TopNs[i]; topN != nil { + for j := 0; j < len(topN.TopN); { + end := j + batchInsertSize + if end > len(topN.TopN) { + end = len(topN.TopN) + } + sql := new(strings.Builder) + sql.WriteString("insert into mysql.stats_top_n (table_id, is_index, hist_id, value, count) values ") + for k := j; k < end; k++ { + val := sqlexec.MustEscapeSQL("(%?, %?, %?, %?, %?)", tableID, result.IsIndex, hg.ID, topN.TopN[k].Encoded, topN.TopN[k].Count) + if k > j { + val = "," + val + } + if k > j && sql.Len()+len(val) > maxInsertLength { + end = k + break + } + sql.WriteString(val) + } + j = end + if _, err = exec.ExecuteInternal(ctx, sql.String()); err != nil { + return err + } + } + } + if _, err := exec.ExecuteInternal(ctx, "delete from mysql.stats_fm_sketch where table_id = %? and is_index = %? and hist_id = %?", tableID, result.IsIndex, hg.ID); err != nil { + return err + } + if fmSketch != nil && needDumpFMS { + if _, err = exec.ExecuteInternal(ctx, "insert into mysql.stats_fm_sketch (table_id, is_index, hist_id, value) values (%?, %?, %?, %?)", tableID, result.IsIndex, hg.ID, fmSketch); err != nil { + return err + } + } + if _, err = exec.ExecuteInternal(ctx, "replace into mysql.stats_histograms (table_id, is_index, hist_id, distinct_count, version, null_count, cm_sketch, tot_col_size, stats_ver, flag, correlation) values (%?, %?, %?, %?, %?, %?, %?, %?, %?, %?, %?)", + tableID, result.IsIndex, hg.ID, hg.NDV, version, hg.NullCount, cmSketch, hg.TotColSize, results.StatsVer, statistics.AnalyzeFlag, hg.Correlation); err != nil { + return err + } + if _, err = exec.ExecuteInternal(ctx, "delete from mysql.stats_buckets where table_id = %? and is_index = %? and hist_id = %?", tableID, result.IsIndex, hg.ID); err != nil { + return err + } + sc := h.mu.ctx.GetSessionVars().StmtCtx + var lastAnalyzePos []byte + for j := 0; j < len(hg.Buckets); { + end := j + batchInsertSize + if end > len(hg.Buckets) { + end = len(hg.Buckets) + } + sql := new(strings.Builder) + sql.WriteString("insert into mysql.stats_buckets (table_id, is_index, hist_id, bucket_id, count, repeats, lower_bound, upper_bound, ndv) values ") + for k := j; k < end; k++ { + count := hg.Buckets[k].Count + if k > 0 { + count -= hg.Buckets[k-1].Count + } + var upperBound types.Datum + upperBound, err = hg.GetUpper(k).ConvertTo(sc, types.NewFieldType(mysql.TypeBlob)) + if err != nil { + return err + } + if k == len(hg.Buckets)-1 { + lastAnalyzePos = upperBound.GetBytes() + } + var lowerBound types.Datum + lowerBound, err = hg.GetLower(k).ConvertTo(sc, types.NewFieldType(mysql.TypeBlob)) + if err != nil { + return err + } + val := sqlexec.MustEscapeSQL("(%?, %?, %?, %?, %?, %?, %?, %?, %?)", tableID, result.IsIndex, hg.ID, k, count, hg.Buckets[k].Repeat, lowerBound.GetBytes(), upperBound.GetBytes(), hg.Buckets[k].NDV) + if k > j { + val = "," + val + } + if k > j && sql.Len()+len(val) > maxInsertLength { + end = k + break + } + sql.WriteString(val) + } + j = end + if _, err = exec.ExecuteInternal(ctx, sql.String()); err != nil { + return err + } + } + if len(lastAnalyzePos) > 0 { + if _, err = exec.ExecuteInternal(ctx, "update mysql.stats_histograms set last_analyze_pos = %? where table_id = %? and is_index = %? and hist_id = %?", lastAnalyzePos, tableID, result.IsIndex, hg.ID); err != nil { + return err + } + } + if result.IsIndex == 0 { + if _, err = exec.ExecuteInternal(ctx, "insert into mysql.column_stats_usage (table_id, column_id, last_analyzed_at) values(%?, %?, current_timestamp()) on duplicate key update last_analyzed_at = values(last_analyzed_at)", tableID, hg.ID); err != nil { + return err + } + } + } + } + // 3. Save extended statistics. + extStats := results.ExtStats + if extStats == nil || len(extStats.Stats) == 0 { + return nil + } + var bytes []byte + var statsStr string + for name, item := range extStats.Stats { + bytes, err = json.Marshal(item.ColIDs) + if err != nil { + return err + } + strColIDs := string(bytes) + switch item.Tp { + case ast.StatsTypeCardinality, ast.StatsTypeCorrelation: + statsStr = fmt.Sprintf("%f", item.ScalarVals) + case ast.StatsTypeDependency: + statsStr = item.StringVals + } + if _, err = exec.ExecuteInternal(ctx, "replace into mysql.stats_extended values (%?, %?, %?, %?, %?, %?, %?)", name, item.Tp, tableID, strColIDs, statsStr, version, StatsStatusAnalyzed); err != nil { + return err + } + } + return +} + +// SaveStatsToStorage saves the stats to storage. +// TODO: refactor to reduce the number of parameters +func (h *Handle) SaveStatsToStorage(tableID int64, count int64, isIndex int, hg *statistics.Histogram, cms *statistics.CMSketch, topN *statistics.TopN, statsVersion int, isAnalyzed int64, updateAnalyzeTime bool) (err error) { + statsVer := uint64(0) + defer func() { + if err == nil && statsVer != 0 { + err = h.recordHistoricalStatsMeta(tableID, statsVer) + } + }() + h.mu.Lock() + defer h.mu.Unlock() + ctx := context.TODO() + exec := h.mu.ctx.(sqlexec.SQLExecutor) + _, err = exec.ExecuteInternal(ctx, "begin pessimistic") + if err != nil { + return errors.Trace(err) + } + defer func() { + err = finishTransaction(context.Background(), exec, err) + }() + txn, err := h.mu.ctx.Txn(true) + if err != nil { + return errors.Trace(err) + } + + version := txn.StartTS() + // If the count is less than 0, then we do not want to update the modify count and count. + if count >= 0 { + _, err = exec.ExecuteInternal(ctx, "replace into mysql.stats_meta (version, table_id, count) values (%?, %?, %?)", version, tableID, count) + } else { + _, err = exec.ExecuteInternal(ctx, "update mysql.stats_meta set version = %? where table_id = %?", version, tableID) + } + if err != nil { + return err + } + statsVer = version + cmSketch, err := statistics.EncodeCMSketchWithoutTopN(cms) + if err != nil { + return err + } + // Delete outdated data + if _, err = exec.ExecuteInternal(ctx, "delete from mysql.stats_top_n where table_id = %? and is_index = %? and hist_id = %?", tableID, isIndex, hg.ID); err != nil { + return err + } + if topN != nil { + for _, meta := range topN.TopN { + if _, err = exec.ExecuteInternal(ctx, "insert into mysql.stats_top_n (table_id, is_index, hist_id, value, count) values (%?, %?, %?, %?, %?)", tableID, isIndex, hg.ID, meta.Encoded, meta.Count); err != nil { + return err + } + } + } + if _, err := exec.ExecuteInternal(ctx, "delete from mysql.stats_fm_sketch where table_id = %? and is_index = %? and hist_id = %?", tableID, isIndex, hg.ID); err != nil { + return err + } + flag := 0 + if isAnalyzed == 1 { + flag = statistics.AnalyzeFlag + } + if _, err = exec.ExecuteInternal(ctx, "replace into mysql.stats_histograms (table_id, is_index, hist_id, distinct_count, version, null_count, cm_sketch, tot_col_size, stats_ver, flag, correlation) values (%?, %?, %?, %?, %?, %?, %?, %?, %?, %?, %?)", + tableID, isIndex, hg.ID, hg.NDV, version, hg.NullCount, cmSketch, hg.TotColSize, statsVersion, flag, hg.Correlation); err != nil { + return err + } + if _, err = exec.ExecuteInternal(ctx, "delete from mysql.stats_buckets where table_id = %? and is_index = %? and hist_id = %?", tableID, isIndex, hg.ID); err != nil { + return err + } + sc := h.mu.ctx.GetSessionVars().StmtCtx + var lastAnalyzePos []byte + for i := range hg.Buckets { + count := hg.Buckets[i].Count + if i > 0 { + count -= hg.Buckets[i-1].Count + } + var upperBound types.Datum + upperBound, err = hg.GetUpper(i).ConvertTo(sc, types.NewFieldType(mysql.TypeBlob)) + if err != nil { + return + } + if i == len(hg.Buckets)-1 { + lastAnalyzePos = upperBound.GetBytes() + } + var lowerBound types.Datum + lowerBound, err = hg.GetLower(i).ConvertTo(sc, types.NewFieldType(mysql.TypeBlob)) + if err != nil { + return + } + if _, err = exec.ExecuteInternal(ctx, "insert into mysql.stats_buckets(table_id, is_index, hist_id, bucket_id, count, repeats, lower_bound, upper_bound, ndv) values(%?, %?, %?, %?, %?, %?, %?, %?, %?)", tableID, isIndex, hg.ID, i, count, hg.Buckets[i].Repeat, lowerBound.GetBytes(), upperBound.GetBytes(), hg.Buckets[i].NDV); err != nil { +>>>>>>> 6266817ce... statistics: batch insert topn and bucket when saving table stats (#35326) return err } }