Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

domain: Support plan replayer continus capture #39926

Merged
merged 11 commits into from
Dec 15, 2022
2 changes: 2 additions & 0 deletions domain/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ go_library(
"//util/memory",
"//util/memoryusagealarm",
"//util/printer",
"//util/replayer",
"//util/servermemorylimit",
"//util/sqlexec",
"@com_github_burntsushi_toml//:toml",
Expand Down Expand Up @@ -122,6 +123,7 @@ go_test(
"//testkit/testsetup",
"//util",
"//util/mock",
"//util/replayer",
"@com_github_ngaut_pools//:pools",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_failpoint//:failpoint",
Expand Down
7 changes: 4 additions & 3 deletions domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ import (
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/memory"
"github.com/pingcap/tidb/util/memoryusagealarm"
"github.com/pingcap/tidb/util/replayer"
"github.com/pingcap/tidb/util/servermemorylimit"
"github.com/pingcap/tidb/util/sqlexec"
"github.com/tikv/client-go/v2/tikv"
Expand Down Expand Up @@ -890,7 +891,7 @@ func NewDomain(store kv.Storage, ddlLease time.Duration, statsLease time.Duratio
infoCache: infoschema.NewCache(16),
slowQuery: newTopNSlowQueries(30, time.Hour*24*7, 500),
indexUsageSyncLease: idxUsageSyncLease,
dumpFileGcChecker: &dumpFileGcChecker{gcLease: dumpFileGcLease, paths: []string{GetPlanReplayerDirName(), GetOptimizerTraceDirName()}},
dumpFileGcChecker: &dumpFileGcChecker{gcLease: dumpFileGcLease, paths: []string{replayer.GetPlanReplayerDirName(), GetOptimizerTraceDirName()}},
onClose: onClose,
expiredTimeStamp4PC: types.NewTime(types.ZeroCoreTime, mysql.TypeTimestamp, types.DefaultFsp),
mdlCheckTableInfo: &mdlCheckTableInfo{
Expand Down Expand Up @@ -1647,8 +1648,8 @@ func (do *Domain) SetupPlanReplayerHandle(collectorSctx sessionctx.Context, work
}
taskCH := make(chan *PlanReplayerDumpTask, 16)
taskStatus := &planReplayerDumpTaskStatus{}
taskStatus.finishedTaskMu.finishedTask = map[PlanReplayerTaskKey]struct{}{}
taskStatus.runningTaskMu.runningTasks = map[PlanReplayerTaskKey]struct{}{}
taskStatus.finishedTaskMu.finishedTask = map[replayer.PlanReplayerTaskKey]struct{}{}
taskStatus.runningTaskMu.runningTasks = map[replayer.PlanReplayerTaskKey]struct{}{}

do.planReplayerHandle.planReplayerTaskDumpHandle = &planReplayerTaskDumpHandle{
taskCH: taskCH,
Expand Down
129 changes: 46 additions & 83 deletions domain/plan_replayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,8 @@ package domain

import (
"context"
"encoding/base64"
"fmt"
"io/ioutil"
"math/rand"
"os"
"path/filepath"
"strconv"
Expand All @@ -29,7 +27,6 @@ import (

"github.com/pingcap/errors"
"github.com/pingcap/tidb/bindinfo"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/domain/infosync"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/parser/ast"
Expand All @@ -41,6 +38,7 @@ import (
"github.com/pingcap/tidb/statistics/handle"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/replayer"
"github.com/pingcap/tidb/util/sqlexec"
"go.uber.org/zap"
)
Expand All @@ -55,13 +53,6 @@ type dumpFileGcChecker struct {
planReplayerTaskStatus *planReplayerDumpTaskStatus
}

// GetPlanReplayerDirName returns plan replayer directory path.
// The path is related to the process id.
func GetPlanReplayerDirName() string {
tidbLogDir := filepath.Dir(config.GetGlobalConfig().Log.File.Filename)
return filepath.Join(tidbLogDir, "replayer")
}

func parseType(s string) string {
return strings.Split(s, "_")[0]
}
Expand Down Expand Up @@ -188,7 +179,9 @@ func (h *planReplayerHandle) SendTask(task *PlanReplayerDumpTask) {
case h.planReplayerTaskDumpHandle.taskCH <- task:
// we directly remove the task key if we put task in channel successfully, if the task was failed to dump,
// the task handle will re-add the task in next loop
h.planReplayerTaskCollectorHandle.removeTask(task.PlanReplayerTaskKey)
if !task.IsContinuesCapture {
h.planReplayerTaskCollectorHandle.removeTask(task.PlanReplayerTaskKey)
}
default:
// TODO: add metrics here
// directly discard the task if the task channel is full in order not to block the query process
Expand All @@ -200,7 +193,7 @@ func (h *planReplayerHandle) SendTask(task *PlanReplayerDumpTask) {
type planReplayerTaskCollectorHandle struct {
taskMu struct {
sync.RWMutex
tasks map[PlanReplayerTaskKey]struct{}
tasks map[replayer.PlanReplayerTaskKey]struct{}
}
ctx context.Context
sctx sessionctx.Context
Expand All @@ -212,7 +205,7 @@ func (h *planReplayerTaskCollectorHandle) CollectPlanReplayerTask() error {
if err != nil {
return err
}
tasks := make([]PlanReplayerTaskKey, 0)
tasks := make([]replayer.PlanReplayerTaskKey, 0)
for _, key := range allKeys {
unhandled, err := checkUnHandledReplayerTask(h.ctx, h.sctx, key)
if err != nil {
Expand All @@ -227,8 +220,8 @@ func (h *planReplayerTaskCollectorHandle) CollectPlanReplayerTask() error {
}

// GetTasks get all tasks
func (h *planReplayerTaskCollectorHandle) GetTasks() []PlanReplayerTaskKey {
tasks := make([]PlanReplayerTaskKey, 0)
func (h *planReplayerTaskCollectorHandle) GetTasks() []replayer.PlanReplayerTaskKey {
tasks := make([]replayer.PlanReplayerTaskKey, 0)
h.taskMu.RLock()
defer h.taskMu.RUnlock()
for taskKey := range h.taskMu.tasks {
Expand All @@ -237,8 +230,8 @@ func (h *planReplayerTaskCollectorHandle) GetTasks() []PlanReplayerTaskKey {
return tasks
}

func (h *planReplayerTaskCollectorHandle) setupTasks(tasks []PlanReplayerTaskKey) {
r := make(map[PlanReplayerTaskKey]struct{})
func (h *planReplayerTaskCollectorHandle) setupTasks(tasks []replayer.PlanReplayerTaskKey) {
r := make(map[replayer.PlanReplayerTaskKey]struct{})
for _, task := range tasks {
r[task] = struct{}{}
}
Expand All @@ -247,13 +240,13 @@ func (h *planReplayerTaskCollectorHandle) setupTasks(tasks []PlanReplayerTaskKey
h.taskMu.tasks = r
}

func (h *planReplayerTaskCollectorHandle) removeTask(taskKey PlanReplayerTaskKey) {
func (h *planReplayerTaskCollectorHandle) removeTask(taskKey replayer.PlanReplayerTaskKey) {
h.taskMu.Lock()
defer h.taskMu.Unlock()
delete(h.taskMu.tasks, taskKey)
}

func (h *planReplayerTaskCollectorHandle) collectAllPlanReplayerTask(ctx context.Context) ([]PlanReplayerTaskKey, error) {
func (h *planReplayerTaskCollectorHandle) collectAllPlanReplayerTask(ctx context.Context) ([]replayer.PlanReplayerTaskKey, error) {
exec := h.sctx.(sqlexec.SQLExecutor)
rs, err := exec.ExecuteInternal(ctx, "select sql_digest, plan_digest from mysql.plan_replayer_task")
if err != nil {
Expand All @@ -267,10 +260,10 @@ func (h *planReplayerTaskCollectorHandle) collectAllPlanReplayerTask(ctx context
if rows, err = sqlexec.DrainRecordSet(ctx, rs, 8); err != nil {
return nil, errors.Trace(err)
}
allKeys := make([]PlanReplayerTaskKey, 0, len(rows))
allKeys := make([]replayer.PlanReplayerTaskKey, 0, len(rows))
for _, row := range rows {
sqlDigest, planDigest := row.GetString(0), row.GetString(1)
allKeys = append(allKeys, PlanReplayerTaskKey{
allKeys = append(allKeys, replayer.PlanReplayerTaskKey{
SQLDigest: sqlDigest,
PlanDigest: planDigest,
})
Expand All @@ -282,13 +275,13 @@ type planReplayerDumpTaskStatus struct {
// running task records the task running by all workers in order to avoid multi workers running the same task key
runningTaskMu struct {
sync.RWMutex
runningTasks map[PlanReplayerTaskKey]struct{}
runningTasks map[replayer.PlanReplayerTaskKey]struct{}
}

// finished task records the finished task in order to avoid running finished task key
finishedTaskMu struct {
sync.RWMutex
finishedTask map[PlanReplayerTaskKey]struct{}
finishedTask map[replayer.PlanReplayerTaskKey]struct{}
}
}

Expand All @@ -303,7 +296,7 @@ func (r *planReplayerDumpTaskStatus) GetRunningTaskStatusLen() int {
func (r *planReplayerDumpTaskStatus) CleanFinishedTaskStatus() {
r.finishedTaskMu.Lock()
defer r.finishedTaskMu.Unlock()
r.finishedTaskMu.finishedTask = map[PlanReplayerTaskKey]struct{}{}
r.finishedTaskMu.finishedTask = map[replayer.PlanReplayerTaskKey]struct{}{}
}

// GetFinishedTaskStatusLen used for unit test
Expand Down Expand Up @@ -346,7 +339,7 @@ func (r *planReplayerDumpTaskStatus) setTaskFinished(task *PlanReplayerDumpTask)
func (r *planReplayerDumpTaskStatus) clearFinishedTask() {
r.finishedTaskMu.Lock()
defer r.finishedTaskMu.Unlock()
r.finishedTaskMu.finishedTask = map[PlanReplayerTaskKey]struct{}{}
r.finishedTaskMu.finishedTask = map[replayer.PlanReplayerTaskKey]struct{}{}
}

type planReplayerTaskDumpWorker struct {
Expand All @@ -373,7 +366,7 @@ func (w *planReplayerTaskDumpWorker) run() {
// HandleTask handled task
func (w *planReplayerTaskDumpWorker) HandleTask(task *PlanReplayerDumpTask) (success bool) {
defer func() {
if success {
if success && task.IsContinuesCapture {
w.status.setTaskFinished(task)
}
}()
Expand All @@ -391,7 +384,7 @@ func (w *planReplayerTaskDumpWorker) HandleTask(task *PlanReplayerDumpTask) (suc
return true
}

file, fileName, err := GeneratePlanReplayerFile()
file, fileName, err := replayer.GeneratePlanReplayerFile()
if err != nil {
logutil.BgLogger().Warn("generate plan replayer capture task file failed",
zap.String("sqlDigest", taskKey.SQLDigest),
Expand All @@ -404,26 +397,28 @@ func (w *planReplayerTaskDumpWorker) HandleTask(task *PlanReplayerDumpTask) (suc
task.EncodedPlan, _ = task.EncodePlan(task.SessionVars.StmtCtx, false)
jsStats := make(map[int64]*handle.JSONTable)
is := GetDomain(w.sctx).InfoSchema()
for tblID, stat := range task.TblStats {
tbl, ok := is.TableByID(tblID)
if !ok {
return false
}
schema, ok := is.SchemaByTable(tbl.Meta())
if !ok {
return false
}
r, err := handle.GenJSONTableFromStats(schema.Name.String(), tbl.Meta(), stat.(*statistics.Table))
if err != nil {
logutil.BgLogger().Warn("generate plan replayer capture task json stats failed",
zap.String("sqlDigest", taskKey.SQLDigest),
zap.String("planDigest", taskKey.PlanDigest),
zap.Error(err))
return false
if task.IsCapture && !task.IsContinuesCapture {
for tblID, stat := range task.TblStats {
tbl, ok := is.TableByID(tblID)
if !ok {
return false
}
schema, ok := is.SchemaByTable(tbl.Meta())
if !ok {
return false
}
r, err := handle.GenJSONTableFromStats(schema.Name.String(), tbl.Meta(), stat.(*statistics.Table))
if err != nil {
logutil.BgLogger().Warn("generate plan replayer capture task json stats failed",
zap.String("sqlDigest", taskKey.SQLDigest),
zap.String("planDigest", taskKey.PlanDigest),
zap.Error(err))
return false
}
jsStats[tblID] = r
}
jsStats[tblID] = r
task.JSONTblStats = jsStats
}
task.JSONTblStats = jsStats
err = DumpPlanReplayerInfo(w.ctx, w.sctx, task)
if err != nil {
logutil.BgLogger().Warn("dump plan replayer capture task result failed",
Expand Down Expand Up @@ -461,7 +456,7 @@ func (h *planReplayerTaskDumpHandle) DrainTask() *PlanReplayerDumpTask {
return <-h.taskCH
}

func checkUnHandledReplayerTask(ctx context.Context, sctx sessionctx.Context, task PlanReplayerTaskKey) (bool, error) {
func checkUnHandledReplayerTask(ctx context.Context, sctx sessionctx.Context, task replayer.PlanReplayerTaskKey) (bool, error) {
exec := sctx.(sqlexec.SQLExecutor)
rs, err := exec.ExecuteInternal(ctx, fmt.Sprintf("select * from mysql.plan_replayer_status where sql_digest = '%v' and plan_digest = '%v' and fail_reason is null", task.SQLDigest, task.PlanDigest))
if err != nil {
Expand Down Expand Up @@ -512,15 +507,9 @@ type PlanReplayerStatusRecord struct {
FailedReason string
}

// PlanReplayerTaskKey indicates key of a plan replayer task
type PlanReplayerTaskKey struct {
SQLDigest string
PlanDigest string
}

// PlanReplayerDumpTask wrap the params for plan replayer dump
type PlanReplayerDumpTask struct {
PlanReplayerTaskKey
replayer.PlanReplayerTaskKey

// tmp variables stored during the query
EncodePlan func(*stmtctx.StatementContext, bool) (string, string)
Expand All @@ -537,35 +526,9 @@ type PlanReplayerDumpTask struct {

FileName string
Zf *os.File
}

// GeneratePlanReplayerFile generates plan replayer file
func GeneratePlanReplayerFile() (*os.File, string, error) {
path := GetPlanReplayerDirName()
err := os.MkdirAll(path, os.ModePerm)
if err != nil {
return nil, "", errors.AddStack(err)
}
fileName, err := generatePlanReplayerFileName()
if err != nil {
return nil, "", errors.AddStack(err)
}
zf, err := os.Create(filepath.Join(path, fileName))
if err != nil {
return nil, "", errors.AddStack(err)
}
return zf, fileName, err
}

func generatePlanReplayerFileName() (string, error) {
// Generate key and create zip file
time := time.Now().UnixNano()
b := make([]byte, 16)
//nolint: gosec
_, err := rand.Read(b)
if err != nil {
return "", err
}
key := base64.URLEncoding.EncodeToString(b)
return fmt.Sprintf("replayer_%v_%v.zip", key, time), nil
// IsCapture indicates whether the task is from capture
IsCapture bool
// IsContinuesCapture indicates whether the task is from continues capture
IsContinuesCapture bool
}
Loading