Skip to content

Commit

Permalink
stmtsummary: add persistence implementation (#40814)
Browse files Browse the repository at this point in the history
ref #40812
  • Loading branch information
mornyx authored Feb 9, 2023
1 parent 9f5cc51 commit 1167faf
Show file tree
Hide file tree
Showing 33 changed files with 4,861 additions and 128 deletions.
2 changes: 1 addition & 1 deletion bindinfo/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ go_library(
"//util/memory",
"//util/parser",
"//util/sqlexec",
"//util/stmtsummary",
"//util/stmtsummary/v2:stmtsummary",
"//util/table-filter",
"//util/timeutil",
"@org_golang_x_exp//maps",
Expand Down
4 changes: 2 additions & 2 deletions bindinfo/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ import (
"github.com/pingcap/tidb/util/logutil"
utilparser "github.com/pingcap/tidb/util/parser"
"github.com/pingcap/tidb/util/sqlexec"
"github.com/pingcap/tidb/util/stmtsummary"
stmtsummaryv2 "github.com/pingcap/tidb/util/stmtsummary/v2"
tablefilter "github.com/pingcap/tidb/util/table-filter"
"github.com/pingcap/tidb/util/timeutil"
"go.uber.org/zap"
Expand Down Expand Up @@ -892,7 +892,7 @@ func (h *BindHandle) CaptureBaselines() {
parser4Capture := parser.New()
captureFilter := h.extractCaptureFilterFromStorage()
emptyCaptureFilter := captureFilter.isEmpty()
bindableStmts := stmtsummary.StmtSummaryByDigestMap.GetMoreThanCntBindableStmt(captureFilter.frequency)
bindableStmts := stmtsummaryv2.GetMoreThanCntBindableStmt(captureFilter.frequency)
for _, bindableStmt := range bindableStmts {
stmt, err := parser4Capture.ParseOneStmt(bindableStmt.Query, bindableStmt.Charset, bindableStmt.Collation)
if err != nil {
Expand Down
19 changes: 19 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -502,6 +502,20 @@ type Instance struct {
DDLSlowOprThreshold uint32 `toml:"ddl_slow_threshold" json:"ddl_slow_threshold"`
// ExpensiveQueryTimeThreshold indicates the time threshold of expensive query.
ExpensiveQueryTimeThreshold uint64 `toml:"tidb_expensive_query_time_threshold" json:"tidb_expensive_query_time_threshold"`
// StmtSummaryEnablePersistent indicates whether to enable file persistence for stmtsummary.
StmtSummaryEnablePersistent bool `toml:"tidb_stmt_summary_enable_persistent" json:"tidb_stmt_summary_enable_persistent"`
// StmtSummaryFilename indicates the file name written by stmtsummary
// when StmtSummaryEnablePersistent is true.
StmtSummaryFilename string `toml:"tidb_stmt_summary_filename" json:"tidb_stmt_summary_filename"`
// StmtSummaryFileMaxDays indicates how many days the files written by
// stmtsummary will be kept when StmtSummaryEnablePersistent is true.
StmtSummaryFileMaxDays int `toml:"tidb_stmt_summary_file_max_days" json:"tidb_stmt_summary_file_max_days"`
// StmtSummaryFileMaxSize indicates the maximum size (in mb) of a single file
// written by stmtsummary when StmtSummaryEnablePersistent is true.
StmtSummaryFileMaxSize int `toml:"tidb_stmt_summary_file_max_size" json:"tidb_stmt_summary_file_max_size"`
// StmtSummaryFileMaxBackups indicates the maximum number of files written
// by stmtsummary when StmtSummaryEnablePersistent is true.
StmtSummaryFileMaxBackups int `toml:"tidb_stmt_summary_file_max_backups" json:"tidb_stmt_summary_file_max_backups"`

// These variables exist in both 'instance' section and another place.
// The configuration in 'instance' section takes precedence.
Expand Down Expand Up @@ -901,6 +915,11 @@ var defaultConf = Config{
EnablePProfSQLCPU: false,
DDLSlowOprThreshold: DefDDLSlowOprThreshold,
ExpensiveQueryTimeThreshold: DefExpensiveQueryTimeThreshold,
StmtSummaryEnablePersistent: false,
StmtSummaryFilename: "tidb-statements.log",
StmtSummaryFileMaxDays: 3,
StmtSummaryFileMaxSize: 64,
StmtSummaryFileMaxBackups: 0,
EnableSlowLog: *NewAtomicBool(logutil.DefaultTiDBEnableSlowLog),
SlowThreshold: logutil.DefaultSlowThreshold,
RecordPlanInSlowLog: logutil.DefaultRecordPlanInSlowLog,
Expand Down
4 changes: 4 additions & 0 deletions executor/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ go_library(
"slow_query.go",
"sort.go",
"split.go",
"stmtsummary.go",
"table_reader.go",
"trace.go",
"union_scan.go",
Expand Down Expand Up @@ -194,6 +195,7 @@ go_library(
"//util/size",
"//util/sqlexec",
"//util/stmtsummary",
"//util/stmtsummary/v2:stmtsummary",
"//util/stringutil",
"//util/table-filter",
"//util/timeutil",
Expand Down Expand Up @@ -326,6 +328,7 @@ go_test(
"split_test.go",
"stale_txn_test.go",
"statement_context_test.go",
"stmtsummary_test.go",
"table_readers_required_rows_test.go",
"temporary_table_test.go",
"tikv_regions_peers_table_test.go",
Expand Down Expand Up @@ -419,6 +422,7 @@ go_test(
"//util/rowcodec",
"//util/set",
"//util/sqlexec",
"//util/stmtsummary/v2:stmtsummary",
"//util/stringutil",
"//util/tableutil",
"//util/timeutil",
Expand Down
5 changes: 3 additions & 2 deletions executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ import (
"github.com/pingcap/tidb/util/replayer"
"github.com/pingcap/tidb/util/sqlexec"
"github.com/pingcap/tidb/util/stmtsummary"
stmtsummaryv2 "github.com/pingcap/tidb/util/stmtsummary/v2"
"github.com/pingcap/tidb/util/stringutil"
"github.com/pingcap/tidb/util/topsql"
topsqlstate "github.com/pingcap/tidb/util/topsql/state"
Expand Down Expand Up @@ -1804,7 +1805,7 @@ func (a *ExecStmt) SummaryStmt(succ bool) {
}

// Internal SQLs must also be recorded to keep the consistency of `PrevStmt` and `PrevStmtDigest`.
if !stmtsummary.StmtSummaryByDigestMap.Enabled() || ((sessVars.InRestrictedSQL || len(userString) == 0) && !stmtsummary.StmtSummaryByDigestMap.EnabledInternal()) {
if !stmtsummaryv2.Enabled() || ((sessVars.InRestrictedSQL || len(userString) == 0) && !stmtsummaryv2.EnabledInternal()) {
sessVars.SetPrevStmtDigest("")
return
}
Expand Down Expand Up @@ -1921,7 +1922,7 @@ func (a *ExecStmt) SummaryStmt(succ bool) {
if a.retryCount > 0 {
stmtExecInfo.ExecRetryTime = costTime - sessVars.DurationParse - sessVars.DurationCompile - time.Since(a.retryStartTime)
}
stmtsummary.StmtSummaryByDigestMap.AddStatement(stmtExecInfo)
stmtsummaryv2.Add(stmtExecInfo)
}

// GetTextToLog return the query text to log.
Expand Down
17 changes: 8 additions & 9 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -1937,8 +1937,6 @@ func (b *executorBuilder) buildMemTable(v *plannercore.PhysicalMemTable) Executo
strings.ToLower(infoschema.TableTiFlashReplica),
strings.ToLower(infoschema.TableTiDBServersInfo),
strings.ToLower(infoschema.TableTiKVStoreStatus),
strings.ToLower(infoschema.TableStatementsSummaryEvicted),
strings.ToLower(infoschema.ClusterTableStatementsSummaryEvicted),
strings.ToLower(infoschema.TableClientErrorsSummaryGlobal),
strings.ToLower(infoschema.TableClientErrorsSummaryByUser),
strings.ToLower(infoschema.TableClientErrorsSummaryByHost),
Expand Down Expand Up @@ -1993,16 +1991,18 @@ func (b *executorBuilder) buildMemTable(v *plannercore.PhysicalMemTable) Executo
}
case strings.ToLower(infoschema.TableStatementsSummary),
strings.ToLower(infoschema.TableStatementsSummaryHistory),
strings.ToLower(infoschema.TableStatementsSummaryEvicted),
strings.ToLower(infoschema.ClusterTableStatementsSummary),
strings.ToLower(infoschema.ClusterTableStatementsSummaryHistory),
strings.ToLower(infoschema.ClusterTableStatementsSummary):
strings.ToLower(infoschema.ClusterTableStatementsSummaryEvicted):
var extractor *plannercore.StatementsSummaryExtractor
if v.Extractor != nil {
extractor = v.Extractor.(*plannercore.StatementsSummaryExtractor)
}
return &MemTableReaderExec{
baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ID()),
table: v.Table,
retriever: &stmtSummaryTableRetriever{
table: v.Table,
columns: v.Columns,
extractor: v.Extractor.(*plannercore.StatementsSummaryExtractor),
},
retriever: buildStmtSummaryRetriever(b.ctx, v.Table, v.Columns, extractor),
}
case strings.ToLower(infoschema.TableColumns):
return &MemTableReaderExec{
Expand All @@ -2016,7 +2016,6 @@ func (b *executorBuilder) buildMemTable(v *plannercore.PhysicalMemTable) Executo
viewOutputNamesMap: make(map[int64]types.NameSlice),
},
}

case strings.ToLower(infoschema.TableSlowQuery), strings.ToLower(infoschema.ClusterTableSlowLog):
memTracker := memory.NewTracker(v.ID(), -1)
memTracker.AttachTo(b.ctx.GetSessionVars().StmtCtx.MemTracker)
Expand Down
67 changes: 0 additions & 67 deletions executor/infoschema_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ import (
"github.com/pingcap/tidb/util/servermemorylimit"
"github.com/pingcap/tidb/util/set"
"github.com/pingcap/tidb/util/sqlexec"
"github.com/pingcap/tidb/util/stmtsummary"
"github.com/pingcap/tidb/util/stringutil"
"github.com/tikv/client-go/v2/txnkv/txnlock"
"go.uber.org/zap"
Expand Down Expand Up @@ -158,9 +157,6 @@ func (e *memtableRetriever) retrieve(ctx context.Context, sctx sessionctx.Contex
e.dataForTableTiFlashReplica(sctx, dbs)
case infoschema.TableTiKVStoreStatus:
err = e.dataForTiKVStoreStatus(sctx)
case infoschema.TableStatementsSummaryEvicted,
infoschema.ClusterTableStatementsSummaryEvicted:
err = e.setDataForStatementsSummaryEvicted(sctx)
case infoschema.TableClientErrorsSummaryGlobal,
infoschema.TableClientErrorsSummaryByUser,
infoschema.TableClientErrorsSummaryByHost:
Expand Down Expand Up @@ -2306,22 +2302,6 @@ func (e *memtableRetriever) dataForTableTiFlashReplica(ctx sessionctx.Context, s
e.rows = rows
}

func (e *memtableRetriever) setDataForStatementsSummaryEvicted(ctx sessionctx.Context) error {
if !hasPriv(ctx, mysql.ProcessPriv) {
return plannercore.ErrSpecificAccessDenied.GenWithStackByArgs("PROCESS")
}
e.rows = stmtsummary.StmtSummaryByDigestMap.ToEvictedCountDatum()
switch e.table.Name.O {
case infoschema.ClusterTableStatementsSummaryEvicted:
rows, err := infoschema.AppendHostInfoToRows(ctx, e.rows)
if err != nil {
return err
}
e.rows = rows
}
return nil
}

func (e *memtableRetriever) setDataForClientErrorsSummary(ctx sessionctx.Context, tableName string) error {
// Seeing client errors should require the PROCESS privilege, with the exception of errors for your own user.
// This is similar to information_schema.processlist, which is the closest comparison.
Expand Down Expand Up @@ -2476,50 +2456,6 @@ func (e *memtableRetriever) setDataForClusterMemoryUsageOpsHistory(ctx sessionct
return nil
}

type stmtSummaryTableRetriever struct {
dummyCloser
table *model.TableInfo
columns []*model.ColumnInfo
retrieved bool
extractor *plannercore.StatementsSummaryExtractor
}

// retrieve implements the infoschemaRetriever interface
func (e *stmtSummaryTableRetriever) retrieve(ctx context.Context, sctx sessionctx.Context) ([][]types.Datum, error) {
if e.extractor.SkipRequest || e.retrieved {
return nil, nil
}
e.retrieved = true

var err error
var instanceAddr string
switch e.table.Name.O {
case infoschema.ClusterTableStatementsSummary,
infoschema.ClusterTableStatementsSummaryHistory:
instanceAddr, err = infoschema.GetInstanceAddr(sctx)
if err != nil {
return nil, err
}
}
user := sctx.GetSessionVars().User
reader := stmtsummary.NewStmtSummaryReader(user, hasPriv(sctx, mysql.ProcessPriv), e.columns, instanceAddr, sctx.GetSessionVars().StmtCtx.TimeZone)
if e.extractor.Enable {
checker := stmtsummary.NewStmtSummaryChecker(e.extractor.Digests)
reader.SetChecker(checker)
}
var rows [][]types.Datum
switch e.table.Name.O {
case infoschema.TableStatementsSummary,
infoschema.ClusterTableStatementsSummary:
rows = reader.GetStmtSummaryCurrentRows()
case infoschema.TableStatementsSummaryHistory,
infoschema.ClusterTableStatementsSummaryHistory:
rows = reader.GetStmtSummaryHistoryRows()
}

return rows, nil
}

// tidbTrxTableRetriever is the memtable retriever for the TIDB_TRX and CLUSTER_TIDB_TRX table.
type tidbTrxTableRetriever struct {
dummyCloser
Expand Down Expand Up @@ -3018,9 +2954,6 @@ func (e *hugeMemTableRetriever) retrieve(ctx context.Context, sctx sessionctx.Co
}

func adjustColumns(input [][]types.Datum, outColumns []*model.ColumnInfo, table *model.TableInfo) [][]types.Datum {
if table.Name.O == infoschema.TableStatementsSummary {
return input
}
if len(outColumns) == len(table.Columns) {
return input
}
Expand Down
28 changes: 1 addition & 27 deletions executor/slow_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,33 +275,7 @@ func (sc *slowLogChecker) isTimeValid(t types.Time) bool {
}

func getOneLine(reader *bufio.Reader) ([]byte, error) {
var resByte []byte
lineByte, isPrefix, err := reader.ReadLine()
if isPrefix {
// Need to read more data.
resByte = make([]byte, len(lineByte), len(lineByte)*2)
} else {
resByte = make([]byte, len(lineByte))
}
// Use copy here to avoid shallow copy problem.
copy(resByte, lineByte)
if err != nil {
return resByte, err
}

var tempLine []byte
for isPrefix {
tempLine, isPrefix, err = reader.ReadLine()
resByte = append(resByte, tempLine...) // nozero
// Use the max value of max_allowed_packet to check the single line length.
if len(resByte) > int(variable.MaxOfMaxAllowedPacket) {
return resByte, errors.Errorf("single line length exceeds limit: %v", variable.MaxOfMaxAllowedPacket)
}
if err != nil {
return resByte, err
}
}
return resByte, err
return util.ReadLine(reader, int(variable.MaxOfMaxAllowedPacket))
}

type offset struct {
Expand Down
Loading

0 comments on commit 1167faf

Please sign in to comment.