diff --git a/executor/adapter.go b/executor/adapter.go index 46fd51f1c33fb..20f23ad955d35 100644 --- a/executor/adapter.go +++ b/executor/adapter.go @@ -136,6 +136,7 @@ func (a *recordSet) NewChunk() *chunk.Chunk { func (a *recordSet) Close() error { err := a.executor.Close() +<<<<<<< HEAD // `LowSlowQuery` and `SummaryStmt` must be called before recording `PrevStmt`. a.stmt.LogSlowQuery(a.txnStartTS, a.lastErr == nil, false) a.stmt.SummaryStmt() @@ -143,6 +144,9 @@ func (a *recordSet) Close() error { pps := types.CloneRow(sessVars.PreparedParams) sessVars.PrevStmt = FormatSQL(a.stmt.OriginText(), pps) a.stmt.logAudit() +======= + a.stmt.CloseRecordSet(a.txnStartTS, a.lastErr) +>>>>>>> b32e834... server: let select for update could be recorded for slow log and statements (#16743) return err } @@ -334,10 +338,11 @@ func getMaxExecutionTime(sctx sessionctx.Context, stmtNode ast.StmtNode) uint64 } type chunkRowRecordSet struct { - rows []chunk.Row - idx int - fields []*ast.ResultField - e Executor + rows []chunk.Row + idx int + fields []*ast.ResultField + e Executor + execStmt *ExecStmt } func (c *chunkRowRecordSet) Fields() []*ast.ResultField { @@ -358,6 +363,7 @@ func (c *chunkRowRecordSet) NewChunk() *chunk.Chunk { } func (c *chunkRowRecordSet) Close() error { + c.execStmt.CloseRecordSet(c.execStmt.Ctx.GetSessionVars().TxnCtx.StartTS, nil) return nil } @@ -388,8 +394,13 @@ func (a *ExecStmt) runPessimisticSelectForUpdate(ctx context.Context, e Executor break } if req.NumRows() == 0 { +<<<<<<< HEAD fields := schema2ResultFields(e.Schema(), a.Ctx.GetSessionVars().CurrentDB) return &chunkRowRecordSet{rows: rows, fields: fields, e: e}, nil +======= + fields := colNames2ResultFields(e.Schema(), a.OutputNames, a.Ctx.GetSessionVars().CurrentDB) + return &chunkRowRecordSet{rows: rows, fields: fields, e: e, execStmt: a}, nil +>>>>>>> b32e834... server: let select for update could be recorded for slow log and statements (#16743) } iter := chunk.NewIterator4Chunk(req) for r := iter.Begin(); r != iter.End(); r = iter.Next() { @@ -682,6 +693,44 @@ func FormatSQL(sql string, pps variable.PreparedParams) stringutil.StringerFunc } } +<<<<<<< HEAD +======= +var ( + sessionExecuteRunDurationInternal = metrics.SessionExecuteRunDuration.WithLabelValues(metrics.LblInternal) + sessionExecuteRunDurationGeneral = metrics.SessionExecuteRunDuration.WithLabelValues(metrics.LblGeneral) +) + +// FinishExecuteStmt is used to record some information after `ExecStmt` execution finished: +// 1. record slow log if needed. +// 2. record summary statement. +// 3. record execute duration metric. +// 4. update the `PrevStmt` in session variable. +func (a *ExecStmt) FinishExecuteStmt(txnTS uint64, succ bool, hasMoreResults bool) { + // `LowSlowQuery` and `SummaryStmt` must be called before recording `PrevStmt`. + a.LogSlowQuery(txnTS, succ, hasMoreResults) + a.SummaryStmt(succ) + sessVars := a.Ctx.GetSessionVars() + pps := types.CloneRow(sessVars.PreparedParams) + sessVars.PrevStmt = FormatSQL(a.OriginText(), pps) + executeDuration := time.Since(sessVars.StartTime) - sessVars.DurationCompile + if sessVars.InRestrictedSQL { + sessionExecuteRunDurationInternal.Observe(executeDuration.Seconds()) + } else { + sessionExecuteRunDurationGeneral.Observe(executeDuration.Seconds()) + } +} + +// CloseRecordSet will finish the execution of current statement and do some record work +func (a *ExecStmt) CloseRecordSet(txnStartTS uint64, lastErr error) { + a.FinishExecuteStmt(txnStartTS, lastErr == nil, false) + a.logAudit() + // Detach the disk tracker from GlobalDiskUsageTracker after every execution + if stmtCtx := a.Ctx.GetSessionVars().StmtCtx; stmtCtx != nil && stmtCtx.DiskTracker != nil { + stmtCtx.DiskTracker.DetachFromGlobalTracker() + } +} + +>>>>>>> b32e834... server: let select for update could be recorded for slow log and statements (#16743) // LogSlowQuery is used to print the slow query in the log files. func (a *ExecStmt) LogSlowQuery(txnTS uint64, succ bool, hasMoreResults bool) { sessVars := a.Ctx.GetSessionVars()