Skip to content

Commit

Permalink
Merge branch 'master' into fk-in-replace
Browse files Browse the repository at this point in the history
  • Loading branch information
crazycs520 authored Dec 22, 2022
2 parents 5303daf + 5fdd3bd commit cce6b0d
Show file tree
Hide file tree
Showing 7 changed files with 214 additions and 16 deletions.
2 changes: 1 addition & 1 deletion domain/plan_replayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -384,7 +384,7 @@ func (w *planReplayerTaskDumpWorker) HandleTask(task *PlanReplayerDumpTask) (suc
return true
}

file, fileName, err := replayer.GeneratePlanReplayerFile()
file, fileName, err := replayer.GeneratePlanReplayerFile(task.IsContinuesCapture)
if err != nil {
logutil.BgLogger().Warn("generate plan replayer capture task file failed",
zap.String("sqlDigest", taskKey.SQLDigest),
Expand Down
18 changes: 10 additions & 8 deletions executor/compiler.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,14 +158,16 @@ func (c *Compiler) Compile(ctx context.Context, stmtNode ast.StmtNode) (_ *ExecS
}
}
if c.Ctx.GetSessionVars().IsPlanReplayerCaptureEnabled() && !c.Ctx.GetSessionVars().InRestrictedSQL {
startTS, err := sessiontxn.GetTxnManager(c.Ctx).GetStmtReadTS()
if err != nil {
return nil, err
}
if c.Ctx.GetSessionVars().EnablePlanReplayedContinuesCapture {
checkPlanReplayerContinuesCapture(c.Ctx, stmtNode, startTS)
} else {
checkPlanReplayerCaptureTask(c.Ctx, stmtNode, startTS)
if _, ok := stmtNode.(*ast.SelectStmt); ok {
startTS, err := sessiontxn.GetTxnManager(c.Ctx).GetStmtReadTS()
if err != nil {
return nil, err
}
if c.Ctx.GetSessionVars().EnablePlanReplayedContinuesCapture {
checkPlanReplayerContinuesCapture(c.Ctx, stmtNode, startTS)
} else {
checkPlanReplayerCaptureTask(c.Ctx, stmtNode, startTS)
}
}
}

Expand Down
2 changes: 1 addition & 1 deletion executor/plan_replayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ func (e *PlanReplayerExec) registerCaptureTask(ctx context.Context) error {

func (e *PlanReplayerExec) createFile() error {
var err error
e.DumpInfo.File, e.DumpInfo.FileName, err = replayer.GeneratePlanReplayerFile()
e.DumpInfo.File, e.DumpInfo.FileName, err = replayer.GeneratePlanReplayerFile(false)
if err != nil {
return err
}
Expand Down
2 changes: 2 additions & 0 deletions server/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ go_library(
"//sessionctx/stmtctx",
"//sessionctx/variable",
"//sessiontxn",
"//statistics/handle",
"//store",
"//store/driver/error",
"//store/gcworker",
Expand Down Expand Up @@ -91,6 +92,7 @@ go_library(
"//util/topsql/stmtstats",
"//util/versioninfo",
"@com_github_blacktear23_go_proxyprotocol//:go-proxyprotocol",
"@com_github_burntsushi_toml//:toml",
"@com_github_gorilla_mux//:mux",
"@com_github_opentracing_opentracing_go//:opentracing-go",
"@com_github_pingcap_errors//:errors",
Expand Down
196 changes: 193 additions & 3 deletions server/plan_replayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,26 @@
package server

import (
"archive/zip"
"bytes"
"encoding/json"
"fmt"
"io"
"net/http"
"os"
"path/filepath"
"strconv"
"strings"

"github.com/BurntSushi/toml"
"github.com/gorilla/mux"
"github.com/pingcap/errors"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/domain/infosync"
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/statistics/handle"
"github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/replayer"
Expand All @@ -32,9 +43,11 @@ import (

// PlanReplayerHandler is the handler for dumping plan replayer file.
type PlanReplayerHandler struct {
infoGetter *infosync.InfoSyncer
address string
statusPort uint
is infoschema.InfoSchema
statsHandle *handle.Handle
infoGetter *infosync.InfoSyncer
address string
statusPort uint
}

func (s *Server) newPlanReplayerHandler() *PlanReplayerHandler {
Expand All @@ -46,6 +59,12 @@ func (s *Server) newPlanReplayerHandler() *PlanReplayerHandler {
if s.dom != nil && s.dom.InfoSyncer() != nil {
prh.infoGetter = s.dom.InfoSyncer()
}
if s.dom != nil && s.dom.InfoSchema() != nil {
prh.is = s.dom.InfoSchema()
}
if s.dom != nil && s.dom.StatsHandle() != nil {
prh.statsHandle = s.dom.StatsHandle()
}
return prh
}

Expand All @@ -61,6 +80,8 @@ func (prh PlanReplayerHandler) ServeHTTP(w http.ResponseWriter, req *http.Reques
urlPath: fmt.Sprintf("plan_replayer/dump/%s", name),
downloadedFilename: "plan_replayer",
scheme: util.InternalHTTPSchema(),
statsHandle: prh.statsHandle,
is: prh.is,
}
handleDownloadFile(handler, w, req)
}
Expand Down Expand Up @@ -93,6 +114,13 @@ func handleDownloadFile(handler downloadFileHandler, w http.ResponseWriter, req
writeError(w, err)
return
}
if handler.downloadedFilename == "plan_replayer" {
content, err = handlePlanReplayerContinuesCaptureFile(content, path, handler)
if err != nil {
writeError(w, err)
return
}
}
_, err = w.Write(content)
if err != nil {
writeError(w, err)
Expand Down Expand Up @@ -175,6 +203,9 @@ type downloadFileHandler struct {
statusPort uint
urlPath string
downloadedFilename string

statsHandle *handle.Handle
is infoschema.InfoSchema
}

func isExists(path string) (bool, error) {
Expand All @@ -187,3 +218,162 @@ func isExists(path string) (bool, error) {
}
return true, nil
}

func handlePlanReplayerContinuesCaptureFile(content []byte, path string, handler downloadFileHandler) ([]byte, error) {
if !strings.Contains(handler.filePath, "continues_replayer") {
return content, nil
}
b := bytes.NewReader(content)
zr, err := zip.NewReader(b, int64(len(content)))
if err != nil {
return nil, err
}
startTS, err := loadSQLMetaFile(zr)
if err != nil {
return nil, err
}
if startTS == 0 {
return content, nil
}
tbls, err := loadSchemaMeta(zr, handler.is)
if err != nil {
return nil, err
}
for _, tbl := range tbls {
jsonStats, err := handler.statsHandle.DumpHistoricalStatsBySnapshot(tbl.dbName, tbl.info, startTS)
if err != nil {
return nil, err
}
tbl.jsonStats = jsonStats
}
newPath, err := dumpJSONStatsIntoZip(tbls, content, path)
if err != nil {
return nil, err
}
//nolint: gosec
file, err := os.Open(newPath)
if err != nil {
return nil, err
}
content, err = io.ReadAll(file)
if err != nil {
return nil, err
}
err = file.Close()
if err != nil {
return nil, err
}
return content, nil
}

func loadSQLMetaFile(z *zip.Reader) (uint64, error) {
for _, zipFile := range z.File {
if zipFile.Name == domain.PlanReplayerSQLMetaFile {
varMap := make(map[string]string)
v, err := zipFile.Open()
if err != nil {
return 0, errors.AddStack(err)
}
//nolint: errcheck,all_revive
defer v.Close()
_, err = toml.DecodeReader(v, &varMap)
if err != nil {
return 0, errors.AddStack(err)
}
startTS, err := strconv.ParseUint(varMap[domain.PlanReplayerSQLMetaStartTS], 10, 64)
if err != nil {
return 0, err
}
return startTS, nil
}
}
return 0, nil
}

func loadSchemaMeta(z *zip.Reader, is infoschema.InfoSchema) (map[int64]*tblInfo, error) {
r := make(map[int64]*tblInfo, 0)
for _, zipFile := range z.File {
if zipFile.Name == fmt.Sprintf("schema/%v", domain.PlanReplayerSchemaMetaFile) {
v, err := zipFile.Open()
if err != nil {
return nil, errors.AddStack(err)
}
//nolint: errcheck,all_revive
defer v.Close()
buf := new(bytes.Buffer)
_, err = buf.ReadFrom(v)
if err != nil {
return nil, errors.AddStack(err)
}
rows := strings.Split(buf.String(), "\n")
for _, row := range rows {
s := strings.Split(row, ";")
databaseName := s[0]
tableName := s[1]
t, err := is.TableByName(model.NewCIStr(databaseName), model.NewCIStr(tableName))
if err != nil {
return nil, err
}
r[t.Meta().ID] = &tblInfo{
info: t.Meta(),
dbName: databaseName,
tblName: tableName,
}
}
break
}
}
return r, nil
}

func dumpJSONStatsIntoZip(tbls map[int64]*tblInfo, content []byte, path string) (string, error) {
zr, err := zip.NewReader(bytes.NewReader(content), int64(len(content)))
if err != nil {
return "", err
}
newPath := fmt.Sprintf("copy_%v.zip", path[0:len(path)-4])
zf, err := os.Create(newPath)
if err != nil {
return "", err
}
zw := zip.NewWriter(zf)
for _, f := range zr.File {
err = zw.Copy(f)
if err != nil {
logutil.BgLogger().Error("copy plan replayer zip file failed", zap.Error(err))
return "", err
}
}
for _, tbl := range tbls {
w, err := zw.Create(fmt.Sprintf("stats/%v.%v.json", tbl.dbName, tbl.tblName))
if err != nil {
return "", err
}
data, err := json.Marshal(tbl.jsonStats)
if err != nil {
return "", err
}
_, err = w.Write(data)
if err != nil {
return "", err
}
}
err = zw.Close()
if err != nil {
logutil.BgLogger().Error("Closing file failed", zap.Error(err))
return "", err
}
err = zf.Close()
if err != nil {
logutil.BgLogger().Error("Closing file failed", zap.Error(err))
return "", err
}
return newPath, nil
}

type tblInfo struct {
info *model.TableInfo
jsonStats *handle.JSONTable
dbName string
tblName string
}
1 change: 1 addition & 0 deletions statistics/handle/gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ func (h *Handle) ClearOutdatedHistoryStats() error {
}
sql = "delete from mysql.stats_history where NOW() - create_time >= %? "
_, err = exec.ExecuteInternal(ctx, sql, variable.HistoricalStatsDuration.Load().Seconds())
logutil.BgLogger().Info("clear outdated historical stats")
return err
}

Expand Down
9 changes: 6 additions & 3 deletions util/replayer/replayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,13 @@ type PlanReplayerTaskKey struct {
}

// GeneratePlanReplayerFile generates plan replayer file
func GeneratePlanReplayerFile() (*os.File, string, error) {
func GeneratePlanReplayerFile(isContinues bool) (*os.File, string, error) {
path := GetPlanReplayerDirName()
err := os.MkdirAll(path, os.ModePerm)
if err != nil {
return nil, "", errors.AddStack(err)
}
fileName, err := generatePlanReplayerFileName()
fileName, err := generatePlanReplayerFileName(isContinues)
if err != nil {
return nil, "", errors.AddStack(err)
}
Expand All @@ -50,7 +50,7 @@ func GeneratePlanReplayerFile() (*os.File, string, error) {
return zf, fileName, err
}

func generatePlanReplayerFileName() (string, error) {
func generatePlanReplayerFileName(isContinues bool) (string, error) {
// Generate key and create zip file
time := time.Now().UnixNano()
b := make([]byte, 16)
Expand All @@ -60,6 +60,9 @@ func generatePlanReplayerFileName() (string, error) {
return "", err
}
key := base64.URLEncoding.EncodeToString(b)
if isContinues {
return fmt.Sprintf("continues_replayer_%v_%v.zip", key, time), nil
}
return fmt.Sprintf("replayer_%v_%v.zip", key, time), nil
}

Expand Down

0 comments on commit cce6b0d

Please sign in to comment.