From 2e8fa9ad1c17a30eef6c8a4c2e5a5bcd330d97b8 Mon Sep 17 00:00:00 2001 From: BornChanger Date: Tue, 7 Nov 2023 20:11:48 +0800 Subject: [PATCH 1/2] pkg: support the TSO format for asof expression Signed-off-by: BornChanger --- executor/recover_test.go | 61 ++++++++++++++++++++++++++++++++++++ sessiontxn/staleread/util.go | 7 +++++ 2 files changed, 68 insertions(+) diff --git a/executor/recover_test.go b/executor/recover_test.go index 6fd3426f9c7b6..ef5d68aabb558 100644 --- a/executor/recover_test.go +++ b/executor/recover_test.go @@ -446,6 +446,67 @@ func TestFlashbackWithSafeTs(t *testing.T) { require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/changeFlashbackGetMinSafeTimeTimeout")) } +func TestFlashbackTSOWithSafeTs(t *testing.T) { + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/ddl/mockFlashbackTest", `return(true)`)) + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/ddl/changeFlashbackGetMinSafeTimeTimeout", `return(0)`)) + + timeBeforeDrop, _, safePointSQL, resetGC := MockGC(tk) + defer resetGC() + + // Set GC safe point. + tk.MustExec(fmt.Sprintf(safePointSQL, timeBeforeDrop)) + + time.Sleep(time.Second) + ts, _ := tk.Session().GetStore().GetOracle().GetTimestamp(context.Background(), &oracle.Option{}) + flashbackTs := oracle.GetTimeFromTS(ts) + testcases := []struct { + name string + sql string + injectSafeTS uint64 + // compareWithSafeTS will be 0 if FlashbackTS==SafeTS, -1 if FlashbackTS < SafeTS, and +1 if FlashbackTS > SafeTS. + compareWithSafeTS int + }{ + { + name: "5 seconds ago to now, safeTS 5 secs ago", + sql: fmt.Sprintf("flashback cluster to timestamp '%d'", ts), + injectSafeTS: oracle.GoTimeToTS(flashbackTs), + compareWithSafeTS: 0, + }, + { + name: "10 seconds ago to now, safeTS 5 secs ago", + sql: fmt.Sprintf("flashback cluster to timestamp '%d'", ts), + injectSafeTS: oracle.GoTimeToTS(flashbackTs.Add(10 * time.Second)), + compareWithSafeTS: -1, + }, + { + name: "5 seconds ago to now, safeTS 10 secs ago", + sql: fmt.Sprintf("flashback cluster to timestamp '%d'", ts), + injectSafeTS: oracle.GoTimeToTS(flashbackTs.Add(-10 * time.Second)), + compareWithSafeTS: 1, + }, + } + for _, testcase := range testcases { + t.Log(testcase.name) + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/ddl/injectSafeTS", + fmt.Sprintf("return(%v)", testcase.injectSafeTS))) + if testcase.compareWithSafeTS == 1 { + start := time.Now() + tk.MustContainErrMsg(testcase.sql, + "cannot set flashback timestamp after min-resolved-ts") + // When set `flashbackGetMinSafeTimeTimeout` = 0, no retry for `getStoreGlobalMinSafeTS`. + require.Less(t, time.Since(start), time.Second) + } else { + tk.MustExec(testcase.sql) + } + } + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/ddl/injectSafeTS")) + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/ddl/mockFlashbackTest")) + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/ddl/changeFlashbackGetMinSafeTimeTimeout")) +} + func TestFlashbackRetryGetMinSafeTime(t *testing.T) { store := testkit.CreateMockStore(t) tk := testkit.NewTestKit(t, store) diff --git a/sessiontxn/staleread/util.go b/sessiontxn/staleread/util.go index 814861ffddcce..d2cc7e4863446 100644 --- a/sessiontxn/staleread/util.go +++ b/sessiontxn/staleread/util.go @@ -16,6 +16,7 @@ package staleread import ( "context" + "strconv" "time" "github.com/pingcap/failpoint" @@ -30,6 +31,7 @@ import ( ) // CalculateAsOfTsExpr calculates the TsExpr of AsOfClause to get a StartTS. +// tsExpr could be an expression of TSO or a timestamp func CalculateAsOfTsExpr(ctx context.Context, sctx sessionctx.Context, tsExpr ast.ExprNode) (uint64, error) { sctx.GetSessionVars().StmtCtx.SetStaleTSOProvider(func() (uint64, error) { failpoint.Inject("mockStaleReadTSO", func(val failpoint.Value) (uint64, error) { @@ -49,6 +51,11 @@ func CalculateAsOfTsExpr(ctx context.Context, sctx sessionctx.Context, tsExpr as return 0, errAsOf.FastGenWithCause("as of timestamp cannot be NULL") } + // if tsVal is TSO already, return it directly. + if tso, err := strconv.ParseUint(tsVal.GetString(), 10, 64); err == nil { + return tso, nil + } + toTypeTimestamp := types.NewFieldType(mysql.TypeTimestamp) // We need at least the millionsecond here, so set fsp to 3. toTypeTimestamp.SetDecimal(3) From fb0a117679066125fa8ba607d42deae36042bc0a Mon Sep 17 00:00:00 2001 From: BornChanger <97348524+BornChanger@users.noreply.github.com> Date: Fri, 8 Dec 2023 16:29:41 +0800 Subject: [PATCH 2/2] Update recover_test.go --- executor/recover_test.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/executor/recover_test.go b/executor/recover_test.go index ef5d68aabb558..0b8c342b1111c 100644 --- a/executor/recover_test.go +++ b/executor/recover_test.go @@ -450,8 +450,8 @@ func TestFlashbackTSOWithSafeTs(t *testing.T) { store := testkit.CreateMockStore(t) tk := testkit.NewTestKit(t, store) - require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/ddl/mockFlashbackTest", `return(true)`)) - require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/ddl/changeFlashbackGetMinSafeTimeTimeout", `return(0)`)) + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/mockFlashbackTest", `return(true)`)) + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/changeFlashbackGetMinSafeTimeTimeout", `return(0)`)) timeBeforeDrop, _, safePointSQL, resetGC := MockGC(tk) defer resetGC() @@ -490,7 +490,7 @@ func TestFlashbackTSOWithSafeTs(t *testing.T) { } for _, testcase := range testcases { t.Log(testcase.name) - require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/ddl/injectSafeTS", + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/injectSafeTS", fmt.Sprintf("return(%v)", testcase.injectSafeTS))) if testcase.compareWithSafeTS == 1 { start := time.Now() @@ -502,9 +502,9 @@ func TestFlashbackTSOWithSafeTs(t *testing.T) { tk.MustExec(testcase.sql) } } - require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/ddl/injectSafeTS")) - require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/ddl/mockFlashbackTest")) - require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/ddl/changeFlashbackGetMinSafeTimeTimeout")) + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/injectSafeTS")) + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/mockFlashbackTest")) + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/changeFlashbackGetMinSafeTimeTimeout")) } func TestFlashbackRetryGetMinSafeTime(t *testing.T) {