Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

server,executor: split ResultSet Close() to Finish() and Close() (#49224) #49304

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 27 additions & 15 deletions pkg/executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"runtime/trace"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"

Expand Down Expand Up @@ -91,6 +92,7 @@ type recordSet struct {
stmt *ExecStmt
lastErr error
txnStartTS uint64
once sync.Once
}

func (a *recordSet) Fields() []*ast.ResultField {
Expand Down Expand Up @@ -180,13 +182,31 @@ func (a *recordSet) NewChunk(alloc chunk.Allocator) *chunk.Chunk {
return alloc.Alloc(base.RetFieldTypes(), base.InitCap(), base.MaxChunkSize())
}

func (a *recordSet) Finish() error {
var err error
a.once.Do(func() {
err = a.executor.Close()
cteErr := resetCTEStorageMap(a.stmt.Ctx)
if cteErr != nil {
logutil.BgLogger().Error("got error when reset cte storage, should check if the spill disk file deleted or not", zap.Error(cteErr))
}
if err == nil {
err = cteErr
}
})
if err != nil {
a.lastErr = err
}
return err
}

func (a *recordSet) Close() error {
err := a.executor.Close()
err1 := a.stmt.CloseRecordSet(a.txnStartTS, a.lastErr)
err := a.Finish()
if err != nil {
return err
logutil.BgLogger().Error("close recordSet error", zap.Error(err))
}
return err1
a.stmt.CloseRecordSet(a.txnStartTS, a.lastErr)
return err
}

// OnFetchReturned implements commandLifeCycle#OnFetchReturned
Expand Down Expand Up @@ -868,7 +888,8 @@ func (c *chunkRowRecordSet) NewChunk(alloc chunk.Allocator) *chunk.Chunk {
}

func (c *chunkRowRecordSet) Close() error {
return c.execStmt.CloseRecordSet(c.execStmt.Ctx.GetSessionVars().TxnCtx.StartTS, nil)
c.execStmt.CloseRecordSet(c.execStmt.Ctx.GetSessionVars().TxnCtx.StartTS, nil)
return nil
}

func (a *ExecStmt) handlePessimisticSelectForUpdate(ctx context.Context, e exec.Executor) (_ sqlexec.RecordSet, retErr error) {
Expand Down Expand Up @@ -1445,19 +1466,10 @@ func (a *ExecStmt) checkPlanReplayerCapture(txnTS uint64) {
}

// CloseRecordSet will finish the execution of current statement and do some record work
func (a *ExecStmt) CloseRecordSet(txnStartTS uint64, lastErr error) error {
cteErr := resetCTEStorageMap(a.Ctx)
if cteErr != nil {
logutil.BgLogger().Error("got error when reset cte storage, should check if the spill disk file deleted or not", zap.Error(cteErr))
}
if lastErr == nil {
// Only overwrite err when it's nil.
lastErr = cteErr
}
func (a *ExecStmt) CloseRecordSet(txnStartTS uint64, lastErr error) {
a.FinishExecuteStmt(txnStartTS, lastErr, false)
a.logAudit()
a.Ctx.GetSessionVars().StmtCtx.DetachMemDiskTracker()
return cteErr
}

// Clean CTE storage shared by different CTEFullScan executor within a SQL stmt.
Expand Down
12 changes: 8 additions & 4 deletions pkg/server/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -947,9 +947,7 @@ func (cc *clientConn) initConnect(ctx context.Context) error {
break
}
}
if err := rs.Close(); err != nil {
return err
}
rs.Close()
}
}
logutil.Logger(ctx).Debug("init_connect complete")
Expand Down Expand Up @@ -2014,7 +2012,7 @@ func (cc *clientConn) handleStmt(ctx context.Context, stmt ast.StmtNode, warns [
// - If the rs is nil and err is not nil, the detachment will be done in
// the `handleNoDelay`.
if rs != nil {
defer terror.Call(rs.Close)
defer rs.Close()
}
if err != nil {
// If error is returned during the planner phase or the executor.Open
Expand Down Expand Up @@ -2282,6 +2280,12 @@ func (cc *clientConn) writeChunks(ctx context.Context, rs resultset.ResultSet, b
stmtDetail.WriteSQLRespDuration += time.Since(start)
}
}
<<<<<<< HEAD
=======
if err := rs.Finish(); err != nil {
return false, err
}
>>>>>>> d23e1c379a5 (server,executor: split ResultSet Close() to Finish() and Close() (#49224))

if stmtDetail != nil {
start = time.Now()
Expand Down
3 changes: 1 addition & 2 deletions pkg/server/conn_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ import (
"github.com/pingcap/tidb/pkg/parser/ast"
"github.com/pingcap/tidb/pkg/parser/charset"
"github.com/pingcap/tidb/pkg/parser/mysql"
"github.com/pingcap/tidb/pkg/parser/terror"
plannercore "github.com/pingcap/tidb/pkg/planner/core"
"github.com/pingcap/tidb/pkg/server/internal/dump"
"github.com/pingcap/tidb/pkg/server/internal/parse"
Expand Down Expand Up @@ -304,7 +303,7 @@ func (cc *clientConn) executePreparedStmtAndWriteResult(ctx context.Context, stm
execStmt.SetText(charset.EncodingUTF8Impl, sql)
rs, err := (&cc.ctx).ExecuteStmt(ctx, execStmt)
if rs != nil {
defer terror.Call(rs.Close)
defer rs.Close()
}
if err != nil {
// If error is returned during the planner phase or the executor.Open
Expand Down
1 change: 1 addition & 0 deletions pkg/server/internal/resultset/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ go_library(
importpath = "github.com/pingcap/tidb/pkg/server/internal/resultset",
visibility = ["//pkg/server:__subpackages__"],
deps = [
"//pkg/parser/terror",
"//pkg/planner/core",
"//pkg/server/internal/column",
"//pkg/types",
Expand Down
18 changes: 13 additions & 5 deletions pkg/server/internal/resultset/resultset.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"sync/atomic"

"github.com/pingcap/tidb/pkg/parser/terror"
"github.com/pingcap/tidb/pkg/planner/core"
"github.com/pingcap/tidb/pkg/server/internal/column"
"github.com/pingcap/tidb/pkg/types"
Expand All @@ -30,11 +31,12 @@ type ResultSet interface {
Columns() []*column.Info
NewChunk(chunk.Allocator) *chunk.Chunk
Next(context.Context, *chunk.Chunk) error
Close() error
Close()
// IsClosed checks whether the result set is closed.
IsClosed() bool
FieldTypes() []*types.FieldType
SetPreparedStmt(stmt *core.PlanCacheStmt)
Finish() error
}

var _ ResultSet = &tidbResultSet{}
Expand Down Expand Up @@ -62,13 +64,19 @@ func (trs *tidbResultSet) Next(ctx context.Context, req *chunk.Chunk) error {
return trs.recordSet.Next(ctx, req)
}

func (trs *tidbResultSet) Close() error {
func (trs *tidbResultSet) Finish() error {
if x, ok := trs.recordSet.(interface{ Finish() error }); ok {
return x.Finish()
}
return nil
}

func (trs *tidbResultSet) Close() {
if !atomic.CompareAndSwapInt32(&trs.closed, 0, 1) {
return nil
return
}
err := trs.recordSet.Close()
terror.Call(trs.recordSet.Close)
trs.recordSet = nil
return err
}

// IsClosed implements ResultSet.IsClosed interface.
Expand Down
30 changes: 30 additions & 0 deletions pkg/session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -2475,16 +2475,46 @@ const ExecStmtVarKey ExecStmtVarKeyType = 0
// RecordSet, so this struct exists and RecordSet.Close() is overrided handle that.
type execStmtResult struct {
sqlexec.RecordSet
<<<<<<< HEAD
se *session
sql sqlexec.Statement
=======
se *session
sql sqlexec.Statement
once sync.Once
closed bool
>>>>>>> d23e1c379a5 (server,executor: split ResultSet Close() to Finish() and Close() (#49224))
}

func (rs *execStmtResult) Finish() error {
var err error
rs.once.Do(func() {
var err1 error
if f, ok := rs.RecordSet.(interface{ Finish() error }); ok {
err1 = f.Finish()
}
err2 := finishStmt(context.Background(), rs.se, err, rs.sql)
err = stderrs.Join(err1, err2)
})
return err
}

func (rs *execStmtResult) Close() error {
<<<<<<< HEAD
se := rs.se
if err := rs.RecordSet.Close(); err != nil {
return finishStmt(context.Background(), se, err, rs.sql)
}
return finishStmt(context.Background(), se, nil, rs.sql)
=======
if rs.closed {
return nil
}
err1 := rs.Finish()
err2 := rs.RecordSet.Close()
rs.closed = true
return stderrs.Join(err1, err2)
>>>>>>> d23e1c379a5 (server,executor: split ResultSet Close() to Finish() and Close() (#49224))
}

// rollbackOnError makes sure the next statement starts a new transaction with the latest InfoSchema.
Expand Down