Skip to content

Commit

Permalink
session: do not keep history when the transaction retry is disabled (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
jackysp authored and coocood committed Jul 12, 2019
1 parent d36d663 commit 7f3bff8
Show file tree
Hide file tree
Showing 5 changed files with 137 additions and 24 deletions.
2 changes: 1 addition & 1 deletion ddl/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3098,9 +3098,9 @@ func (s *testDBSuite2) TestLockTables(c *C) {
// Test lock table by other session in transaction and commit with retry.
tk.MustExec("unlock tables")
tk2.MustExec("unlock tables")
tk.MustExec("set @@session.tidb_disable_txn_auto_retry=0")
tk.MustExec("begin")
tk.MustExec("insert into t1 set a=1")
tk.MustExec("set @@session.tidb_disable_txn_auto_retry=0")
tk2.MustExec("lock tables t1 write")
_, err = tk.Exec("commit")
c.Assert(terror.ErrorEqual(err, infoschema.ErrTableLocked), IsTrue, Commentf("err: %v\n", err))
Expand Down
61 changes: 48 additions & 13 deletions session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,10 +134,8 @@ var (
)

type stmtRecord struct {
stmtID uint32
st sqlexec.Statement
stmtCtx *stmtctx.StatementContext
params []interface{}
}

// StmtHistory holds all histories of statements in a txn.
Expand All @@ -146,12 +144,10 @@ type StmtHistory struct {
}

// Add appends a stmt to history list.
func (h *StmtHistory) Add(stmtID uint32, st sqlexec.Statement, stmtCtx *stmtctx.StatementContext, params ...interface{}) {
func (h *StmtHistory) Add(st sqlexec.Statement, stmtCtx *stmtctx.StatementContext) {
s := &stmtRecord{
stmtID: stmtID,
st: st,
stmtCtx: stmtCtx,
params: append(([]interface{})(nil), params...),
}
h.history = append(h.history, s)
}
Expand Down Expand Up @@ -451,14 +447,8 @@ func (s *session) doCommitWithRetry(ctx context.Context) error {
err := s.doCommit(ctx)
if err != nil {
commitRetryLimit := s.sessionVars.RetryLimit
if s.sessionVars.DisableTxnAutoRetry && !s.sessionVars.InRestrictedSQL {
// Do not retry non-autocommit transactions.
// For autocommit single statement transactions, the history count is always 1.
// For explicit transactions, the statement count is more than 1.
history := GetHistory(s)
if history.Count() > 1 {
commitRetryLimit = 0
}
if !s.sessionVars.TxnCtx.CouldRetry {
commitRetryLimit = 0
}
// Don't retry in BatchInsert mode. As a counter-example, insert into t1 select * from t2,
// BatchInsert already commit the first batch 1000 rows, then it commit 1000-2000 and retry the statement,
Expand Down Expand Up @@ -517,6 +507,13 @@ func (s *session) CommitTxn(ctx context.Context) error {
if commitDetail != nil {
s.sessionVars.StmtCtx.MergeExecDetails(nil, commitDetail)
}

failpoint.Inject("keepHistory", func(val failpoint.Value) {
if val.(bool) {
failpoint.Return(err)
}
})

s.sessionVars.TxnCtx.Cleanup()
s.recordTransactionCounter(err)
return err
Expand Down Expand Up @@ -1247,10 +1244,48 @@ func (s *session) Txn(active bool) (kv.Transaction, error) {
if !s.sessionVars.IsAutocommit() {
s.sessionVars.SetStatusFlag(mysql.ServerStatusInTrans, true)
}
s.sessionVars.TxnCtx.CouldRetry = s.isTxnRetryable()
}
return &s.txn, nil
}

// isTxnRetryable (if returns true) means the transaction could retry.
// If the transaction is in pessimistic mode, do not retry.
// If the session is already in transaction, enable retry or internal SQL could retry.
// If not, the transaction could always retry, because it should be auto committed transaction.
// Anyway the retry limit is 0, the transaction could not retry.
func (s *session) isTxnRetryable() bool {
sessVars := s.sessionVars

// The pessimistic transaction no need to retry.
if sessVars.TxnCtx.IsPessimistic {
return false
}

// If retry limit is 0, the transaction could not retry.
if sessVars.RetryLimit == 0 {
return false
}

// If the session is not InTxn, it is an auto-committed transaction.
// The auto-committed transaction could always retry.
if !sessVars.InTxn() {
return true
}

// The internal transaction could always retry.
if sessVars.InRestrictedSQL {
return true
}

// If the retry is enabled, the transaction could retry.
if !sessVars.DisableTxnAutoRetry {
return true
}

return false
}

func (s *session) NewTxn(ctx context.Context) error {
if s.txn.Valid() {
txnID := s.txn.StartTS()
Expand Down
78 changes: 76 additions & 2 deletions session/session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -545,7 +545,7 @@ func (s *testSessionSuite) TestRetryCleanTxn(c *C) {
c.Assert(err, IsNil)
stmt, _ := session.Compile(context.TODO(), tk.Se, stmtNode)
executor.ResetContextOfStmt(tk.Se, stmtNode)
history.Add(0, stmt, tk.Se.GetSessionVars().StmtCtx)
history.Add(stmt, tk.Se.GetSessionVars().StmtCtx)
_, err = tk.Exec("commit")
c.Assert(err, NotNil)
txn, err := tk.Se.Txn(false)
Expand All @@ -559,6 +559,7 @@ func (s *testSessionSuite) TestReadOnlyNotInHistory(c *C) {
tk.MustExec("create table history (a int)")
tk.MustExec("insert history values (1), (2), (3)")
tk.MustExec("set @@autocommit = 0")
tk.MustExec("set tidb_disable_txn_auto_retry = 0")
tk.MustQuery("select * from history")
history := session.GetHistory(tk.Se)
c.Assert(history.Count(), Equals, 0)
Expand All @@ -572,6 +573,76 @@ func (s *testSessionSuite) TestReadOnlyNotInHistory(c *C) {
c.Assert(history.Count(), Equals, 0)
}

func (s *testSessionSuite) TestNoHistoryWhenDisableRetry(c *C) {
tk := testkit.NewTestKitWithInit(c, s.store)
tk.MustExec("create table history (a int)")
tk.MustExec("set @@autocommit = 0")

// retry_limit = 0 will not add history.
tk.MustExec("set @@tidb_retry_limit = 0")
tk.MustExec("insert history values (1)")
c.Assert(session.GetHistory(tk.Se).Count(), Equals, 0)

// Disable auto_retry will add history for auto committed only
tk.MustExec("set @@autocommit = 1")
tk.MustExec("set @@tidb_retry_limit = 10")
tk.MustExec("set @@tidb_disable_txn_auto_retry = 1")
c.Assert(failpoint.Enable("github.com/pingcap/tidb/session/keepHistory", `1*return(true)->return(false)`), IsNil)
tk.MustExec("insert history values (1)")
c.Assert(session.GetHistory(tk.Se).Count(), Equals, 1)
c.Assert(failpoint.Disable("github.com/pingcap/tidb/session/keepHistory"), IsNil)
tk.MustExec("begin")
tk.MustExec("insert history values (1)")
c.Assert(session.GetHistory(tk.Se).Count(), Equals, 0)
tk.MustExec("commit")

// Enable auto_retry will add history for both.
tk.MustExec("set @@tidb_disable_txn_auto_retry = 0")
c.Assert(failpoint.Enable("github.com/pingcap/tidb/session/keepHistory", `1*return(true)->return(false)`), IsNil)
tk.MustExec("insert history values (1)")
c.Assert(failpoint.Disable("github.com/pingcap/tidb/session/keepHistory"), IsNil)
c.Assert(session.GetHistory(tk.Se).Count(), Equals, 1)
tk.MustExec("begin")
tk.MustExec("insert history values (1)")
c.Assert(session.GetHistory(tk.Se).Count(), Equals, 2)
tk.MustExec("commit")
}

func (s *testSessionSuite) TestNoRetryForCurrentTxn(c *C) {
tk := testkit.NewTestKitWithInit(c, s.store)
tk1 := testkit.NewTestKitWithInit(c, s.store)
tk.MustExec("create table history (a int)")
tk.MustExec("insert history values (1)")

// Firstly, disable retry.
tk.MustExec("set tidb_disable_txn_auto_retry = 1")
tk.MustExec("begin")
tk.MustExec("update history set a = 2")
// Enable retry now.
tk.MustExec("set tidb_disable_txn_auto_retry = 0")

tk1.MustExec("update history set a = 3")
c.Assert(tk.ExecToErr("commit"), NotNil)
}

func (s *testSessionSuite) TestRetryForCurrentTxn(c *C) {
tk := testkit.NewTestKitWithInit(c, s.store)
tk1 := testkit.NewTestKitWithInit(c, s.store)
tk.MustExec("create table history (a int)")
tk.MustExec("insert history values (1)")

// Firstly, enable retry.
tk.MustExec("set tidb_disable_txn_auto_retry = 0")
tk.MustExec("begin")
tk.MustExec("update history set a = 2")
// Disable retry now.
tk.MustExec("set tidb_disable_txn_auto_retry = 1")

tk1.MustExec("update history set a = 3")
tk.MustExec("commit")
tk.MustQuery("select * from history").Check(testkit.Rows("2"))
}

// TestTruncateAlloc tests that the auto_increment ID does not reuse the old table's allocator.
func (s *testSessionSuite) TestTruncateAlloc(c *C) {
tk := testkit.NewTestKitWithInit(c, s.store)
Expand Down Expand Up @@ -990,6 +1061,7 @@ func (s *testSessionSuite) TestBinaryReadOnly(c *C) {
id2, _, _, err := tk.Se.PrepareStmt("insert into t values (?)")
c.Assert(err, IsNil)
tk.MustExec("set autocommit = 0")
tk.MustExec("set tidb_disable_txn_auto_retry = 0")
_, err = tk.Se.ExecutePreparedStmt(context.Background(), id, []types.Datum{types.NewDatum(1)})
c.Assert(err, IsNil)
c.Assert(session.GetHistory(tk.Se).Count(), Equals, 0)
Expand Down Expand Up @@ -2177,6 +2249,7 @@ func (s *testSessionSuite) TestStatementCountLimit(c *C) {
defer func() {
config.GetGlobalConfig().Performance.StmtCountLimit = saved
}()
tk.MustExec("set tidb_disable_txn_auto_retry = 0")
tk.MustExec("begin")
tk.MustExec("insert into stmt_count_limit values (1)")
tk.MustExec("insert into stmt_count_limit values (2)")
Expand All @@ -2195,6 +2268,7 @@ func (s *testSessionSuite) TestStatementCountLimit(c *C) {
func (s *testSessionSuite) TestBatchCommit(c *C) {
tk := testkit.NewTestKitWithInit(c, s.store)
tk.MustExec("set tidb_batch_commit = 1")
tk.MustExec("set tidb_disable_txn_auto_retry = 0")
tk.MustExec("create table t (id int)")
saved := config.GetGlobalConfig().Performance
config.GetGlobalConfig().Performance.StmtCountLimit = 3
Expand Down Expand Up @@ -2440,7 +2514,7 @@ func (s *testSchemaSuite) TestDisableTxnAutoRetry(c *C) {
// session 1 starts a transaction early.
// execute a select statement to clear retry history.
tk1.MustExec("select 1")
tk1.Se.NewTxn(context.Background())
tk1.Se.PrepareTxnCtx(context.Background())
// session 2 update the value.
tk2.MustExec("update no_retry set id = 4")
// AutoCommit update will retry, so it would not fail.
Expand Down
17 changes: 10 additions & 7 deletions session/tidb.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,13 +176,14 @@ func finishStmt(ctx context.Context, sctx sessionctx.Context, se *session, sessV
return se.CommitTxn(ctx)
}

return checkStmtLimit(ctx, sctx, se, sessVars)
return checkStmtLimit(ctx, sctx, se)
}

func checkStmtLimit(ctx context.Context, sctx sessionctx.Context, se *session, sessVars *variable.SessionVars) error {
func checkStmtLimit(ctx context.Context, sctx sessionctx.Context, se *session) error {
// If the user insert, insert, insert ... but never commit, TiDB would OOM.
// So we limit the statement count in a transaction here.
var err error
sessVars := se.GetSessionVars()
history := GetHistory(sctx)
if history.Count() > int(config.GetGlobalConfig().Performance.StmtCountLimit) {
if !sessVars.BatchCommit {
Expand All @@ -195,7 +196,7 @@ func checkStmtLimit(ctx context.Context, sctx sessionctx.Context, se *session, s
// The last history could not be "commit"/"rollback" statement.
// It means it is impossible to start a new transaction at the end of the transaction.
// Because after the server executed "commit"/"rollback" statement, the session is out of the transaction.
se.sessionVars.SetStatusFlag(mysql.ServerStatusInTrans, true)
sessVars.SetStatusFlag(mysql.ServerStatusInTrans, true)
}
return err
}
Expand All @@ -222,12 +223,14 @@ func runStmt(ctx context.Context, sctx sessionctx.Context, s sqlexec.Statement)
}
rs, err = s.Exec(ctx)
sessVars := se.GetSessionVars()
// All the history should be added here.
sessVars.TxnCtx.StatementCount++
if !s.IsReadOnly(sessVars) {
if err == nil && !sessVars.TxnCtx.IsPessimistic {
GetHistory(sctx).Add(0, s, se.sessionVars.StmtCtx)
// All the history should be added here.
if err == nil && sessVars.TxnCtx.CouldRetry {
GetHistory(sctx).Add(s, sessVars.StmtCtx)
}

// Handle the stmt commit/rollback.
if txn, err1 := sctx.Txn(false); err1 == nil {
if txn.Valid() {
if err != nil {
Expand All @@ -240,8 +243,8 @@ func runStmt(ctx context.Context, sctx sessionctx.Context, s sqlexec.Statement)
logutil.BgLogger().Error("get txn error", zap.Error(err1))
}
}

err = finishStmt(ctx, sctx, se, sessVars, err)

if se.txn.pending() {
// After run statement finish, txn state is still pending means the
// statement never need a Txn(), such as:
Expand Down
3 changes: 2 additions & 1 deletion sessionctx/variable/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ type TransactionContext struct {
DirtyDB interface{}
Binlog interface{}
InfoSchema interface{}
CouldRetry bool
History interface{}
SchemaVersion int64
StartTS uint64
Expand Down Expand Up @@ -135,7 +136,7 @@ func (tc *TransactionContext) UpdateDeltaForTable(tableID int64, delta int64, co

// Cleanup clears up transaction info that no longer use.
func (tc *TransactionContext) Cleanup() {
//tc.InfoSchema = nil; we cannot do it now, because some operation like handleFieldList depend on this.
// tc.InfoSchema = nil; we cannot do it now, because some operation like handleFieldList depend on this.
tc.DirtyDB = nil
tc.Binlog = nil
tc.History = nil
Expand Down

0 comments on commit 7f3bff8

Please sign in to comment.