diff --git a/executor/adapter.go b/executor/adapter.go index b2fa6b90644e2..33422ab0bed9c 100644 --- a/executor/adapter.go +++ b/executor/adapter.go @@ -148,12 +148,7 @@ func (a *recordSet) NewChunk() *chunk.Chunk { func (a *recordSet) Close() error { err := a.executor.Close() - a.stmt.FinishExecuteStmt(a.txnStartTS, a.lastErr == nil, false) - a.stmt.logAudit() - // Detach the disk tracker from GlobalDiskUsageTracker after every execution - if stmtCtx := a.stmt.Ctx.GetSessionVars().StmtCtx; stmtCtx != nil && stmtCtx.DiskTracker != nil { - stmtCtx.DiskTracker.DetachFromGlobalTracker() - } + a.stmt.CloseRecordSet(a.txnStartTS, a.lastErr) return err } @@ -413,10 +408,11 @@ func getMaxExecutionTime(sctx sessionctx.Context) uint64 { } type chunkRowRecordSet struct { - rows []chunk.Row - idx int - fields []*ast.ResultField - e Executor + rows []chunk.Row + idx int + fields []*ast.ResultField + e Executor + execStmt *ExecStmt } func (c *chunkRowRecordSet) Fields() []*ast.ResultField { @@ -437,6 +433,7 @@ func (c *chunkRowRecordSet) NewChunk() *chunk.Chunk { } func (c *chunkRowRecordSet) Close() error { + c.execStmt.CloseRecordSet(c.execStmt.Ctx.GetSessionVars().TxnCtx.StartTS, nil) return nil } @@ -468,7 +465,7 @@ func (a *ExecStmt) runPessimisticSelectForUpdate(ctx context.Context, e Executor } if req.NumRows() == 0 { fields := colNames2ResultFields(e.Schema(), a.OutputNames, a.Ctx.GetSessionVars().CurrentDB) - return &chunkRowRecordSet{rows: rows, fields: fields, e: e}, nil + return &chunkRowRecordSet{rows: rows, fields: fields, e: e, execStmt: a}, nil } iter := chunk.NewIterator4Chunk(req) for r := iter.Begin(); r != iter.End(); r = iter.Next() { @@ -792,6 +789,16 @@ func (a *ExecStmt) FinishExecuteStmt(txnTS uint64, succ bool, hasMoreResults boo } } +// CloseRecordSet will finish the execution of current statement and do some record work +func (a *ExecStmt) CloseRecordSet(txnStartTS uint64, lastErr error) { + a.FinishExecuteStmt(txnStartTS, lastErr == nil, false) + a.logAudit() + // Detach the disk tracker from GlobalDiskUsageTracker after every execution + if stmtCtx := a.Ctx.GetSessionVars().StmtCtx; stmtCtx != nil && stmtCtx.DiskTracker != nil { + stmtCtx.DiskTracker.DetachFromGlobalTracker() + } +} + // LogSlowQuery is used to print the slow query in the log files. func (a *ExecStmt) LogSlowQuery(txnTS uint64, succ bool, hasMoreResults bool) { sessVars := a.Ctx.GetSessionVars()