From 424b531a93c63277fa600a80cd2eac5f31ccf726 Mon Sep 17 00:00:00 2001 From: Chao Wang Date: Fri, 3 Feb 2023 18:12:02 +0800 Subject: [PATCH] ttl: fix TTL will delete unexpected rows when timezone changed --- ttl/client/notification.go | 2 +- ttl/session/session.go | 24 ++-- ttl/sqlbuilder/sql.go | 8 +- ttl/sqlbuilder/sql_test.go | 106 +++++++++--------- ttl/ttlworker/job_manager_integration_test.go | 40 +++++++ ttl/ttlworker/scan_test.go | 2 +- 6 files changed, 113 insertions(+), 69 deletions(-) diff --git a/ttl/client/notification.go b/ttl/client/notification.go index 6c44cd0dd7aa9..d65405b5bf06b 100644 --- a/ttl/client/notification.go +++ b/ttl/client/notification.go @@ -73,7 +73,7 @@ func (c *mockClient) WatchNotification(_ context.Context, typ string) clientv3.W c.Lock() defer c.Unlock() - ch := make(chan clientv3.WatchResponse, 1) + ch := make(chan clientv3.WatchResponse, 8) c.notificationWatchers[typ] = append(c.notificationWatchers[typ], ch) return ch } diff --git a/ttl/session/session.go b/ttl/session/session.go index 8fe2d1674e722..a413cdadf03ff 100644 --- a/ttl/session/session.go +++ b/ttl/session/session.go @@ -148,21 +148,23 @@ func (s *session) RunInTxn(ctx context.Context, fn func() error, txnMode TxnMode // ResetWithGlobalTimeZone resets the session time zone to global time zone func (s *session) ResetWithGlobalTimeZone(ctx context.Context) error { sessVar := s.GetSessionVars() - globalTZ, err := sessVar.GetGlobalSystemVar(ctx, variable.TimeZone) - if err != nil { - return err - } + if sessVar.TimeZone != nil { + globalTZ, err := sessVar.GetGlobalSystemVar(ctx, variable.TimeZone) + if err != nil { + return err + } - tz, err := sessVar.GetSessionOrGlobalSystemVar(ctx, variable.TimeZone) - if err != nil { - return err - } + tz, err := sessVar.GetSessionOrGlobalSystemVar(ctx, variable.TimeZone) + if err != nil { + return err + } - if globalTZ == tz { - return nil + if globalTZ == tz { + return nil + } } - _, err = s.ExecuteSQL(ctx, "SET @@time_zone=@@global.time_zone") + _, err := s.ExecuteSQL(ctx, "SET @@time_zone=@@global.time_zone") return err } diff --git a/ttl/sqlbuilder/sql.go b/ttl/sqlbuilder/sql.go index 29b0a094026d3..23cd292e78210 100644 --- a/ttl/sqlbuilder/sql.go +++ b/ttl/sqlbuilder/sql.go @@ -32,8 +32,6 @@ import ( "github.com/pkg/errors" ) -const dateTimeFormat = "2006-01-02 15:04:05.999999" - func writeHex(in io.Writer, d types.Datum) error { _, err := fmt.Fprintf(in, "x'%s'", hex.EncodeToString(d.GetBytes())) return err @@ -183,9 +181,9 @@ func (b *SQLBuilder) WriteExpireCondition(expire time.Time) error { b.writeColNames([]*model.ColumnInfo{b.tbl.TimeColumn}, false) b.restoreCtx.WritePlain(" < ") - b.restoreCtx.WritePlain("'") - b.restoreCtx.WritePlain(expire.Format(dateTimeFormat)) - b.restoreCtx.WritePlain("'") + b.restoreCtx.WritePlain("FROM_UNIXTIME(") + b.restoreCtx.WritePlain(strconv.FormatInt(expire.Unix(), 10)) + b.restoreCtx.WritePlain(")") b.hasWriteExpireCond = true return nil } diff --git a/ttl/sqlbuilder/sql_test.go b/ttl/sqlbuilder/sql_test.go index 76e42e6c5ca18..d93acccf19963 100644 --- a/ttl/sqlbuilder/sql_test.go +++ b/ttl/sqlbuilder/sql_test.go @@ -82,17 +82,17 @@ func TestEscape(t *testing.T) { { tp: "select", ds: [][]types.Datum{d("key1'\";123`456")}, - sql: "SELECT LOW_PRIORITY `col1\"';123``456` FROM `testp;\"';123``456`.`tp\"';123``456` PARTITION(`p1\"';123``456`) WHERE `col1\"';123``456` > 'key1\\'\\\";123`456' AND `time\"';123``456` < '1970-01-01 00:00:00'", + sql: "SELECT LOW_PRIORITY `col1\"';123``456` FROM `testp;\"';123``456`.`tp\"';123``456` PARTITION(`p1\"';123``456`) WHERE `col1\"';123``456` > 'key1\\'\\\";123`456' AND `time\"';123``456` < FROM_UNIXTIME(0)", }, { tp: "delete", ds: [][]types.Datum{d("key2'\";123`456")}, - sql: "DELETE LOW_PRIORITY FROM `testp;\"';123``456`.`tp\"';123``456` PARTITION(`p1\"';123``456`) WHERE `col1\"';123``456` IN ('key2\\'\\\";123`456') AND `time\"';123``456` < '1970-01-01 00:00:00'", + sql: "DELETE LOW_PRIORITY FROM `testp;\"';123``456`.`tp\"';123``456` PARTITION(`p1\"';123``456`) WHERE `col1\"';123``456` IN ('key2\\'\\\";123`456') AND `time\"';123``456` < FROM_UNIXTIME(0)", }, { tp: "delete", ds: [][]types.Datum{d("key3'\";123`456"), d("key4'`\"")}, - sql: "DELETE LOW_PRIORITY FROM `testp;\"';123``456`.`tp\"';123``456` PARTITION(`p1\"';123``456`) WHERE `col1\"';123``456` IN ('key3\\'\\\";123`456', 'key4\\'`\\\"') AND `time\"';123``456` < '1970-01-01 00:00:00'", + sql: "DELETE LOW_PRIORITY FROM `testp;\"';123``456`.`tp\"';123``456` PARTITION(`p1\"';123``456`) WHERE `col1\"';123``456` IN ('key3\\'\\\";123`456', 'key4\\'`\\\"') AND `time\"';123``456` < FROM_UNIXTIME(0)", }, } @@ -115,7 +115,8 @@ func TestEscape(t *testing.T) { var tbName *ast.TableName var keyColumnName, timeColumnName string var values []string - var timeString string + var timeFunc string + var timeTS int64 switch c.tp { case "select": stmt, ok := stmts[0].(*ast.SelectStmt) @@ -127,7 +128,8 @@ func TestEscape(t *testing.T) { values = []string{cond1.R.(ast.ValueExpr).GetValue().(string)} cond2 := and.R.(*ast.BinaryOperationExpr) timeColumnName = cond2.L.(*ast.ColumnNameExpr).Name.Name.O - timeString = cond2.R.(ast.ValueExpr).GetValue().(string) + timeFunc = cond2.R.(*ast.FuncCallExpr).FnName.L + timeTS = cond2.R.(*ast.FuncCallExpr).Args[0].(ast.ValueExpr).GetValue().(int64) case "delete": stmt, ok := stmts[0].(*ast.DeleteStmt) require.True(t, ok) @@ -142,7 +144,8 @@ func TestEscape(t *testing.T) { } cond2 := and.R.(*ast.BinaryOperationExpr) timeColumnName = cond2.L.(*ast.ColumnNameExpr).Name.Name.O - timeString = cond2.R.(ast.ValueExpr).GetValue().(string) + timeFunc = cond2.R.(*ast.FuncCallExpr).FnName.L + timeTS = cond2.R.(*ast.FuncCallExpr).Args[0].(ast.ValueExpr).GetValue().(int64) default: require.FailNow(t, "invalid tp: %s", c.tp) } @@ -156,7 +159,8 @@ func TestEscape(t *testing.T) { for i, row := range c.ds { require.Equal(t, row[0].GetString(), values[i]) } - require.Equal(t, "1970-01-01 00:00:00", timeString) + require.Equal(t, "from_unixtime", timeFunc) + require.Equal(t, int64(0), timeTS) } } @@ -461,14 +465,14 @@ func TestSQLBuilder(t *testing.T) { shLoc, err := time.LoadLocation("Asia/Shanghai") require.NoError(t, err) must(b.WriteExpireCondition(time.UnixMilli(0).In(shLoc))) - mustBuild(b, "SELECT LOW_PRIORITY `id` FROM `test`.`t1` WHERE `time` < '1970-01-01 08:00:00'") + mustBuild(b, "SELECT LOW_PRIORITY `id` FROM `test`.`t1` WHERE `time` < FROM_UNIXTIME(0)") b = sqlbuilder.NewSQLBuilder(t1) must(b.WriteSelect()) must(b.WriteCommonCondition(t1.KeyColumns, ">", d("a1"))) must(b.WriteCommonCondition(t1.KeyColumns, "<=", d("c3"))) must(b.WriteExpireCondition(time.UnixMilli(0).In(time.UTC))) - mustBuild(b, "SELECT LOW_PRIORITY `id` FROM `test`.`t1` WHERE `id` > 'a1' AND `id` <= 'c3' AND `time` < '1970-01-01 00:00:00'") + mustBuild(b, "SELECT LOW_PRIORITY `id` FROM `test`.`t1` WHERE `id` > 'a1' AND `id` <= 'c3' AND `time` < FROM_UNIXTIME(0)") b = sqlbuilder.NewSQLBuilder(t1) must(b.WriteSelect()) @@ -498,7 +502,7 @@ func TestSQLBuilder(t *testing.T) { must(b.WriteExpireCondition(time.UnixMilli(0).In(time.UTC))) must(b.WriteOrderBy(t1.KeyColumns, false)) must(b.WriteLimit(128)) - mustBuild(b, "SELECT LOW_PRIORITY `id` FROM `test`.`t1` WHERE `id` > 'a1\\';\\'' AND `id` <= 'a2\\\"' AND `time` < '1970-01-01 00:00:00' ORDER BY `id` ASC LIMIT 128") + mustBuild(b, "SELECT LOW_PRIORITY `id` FROM `test`.`t1` WHERE `id` > 'a1\\';\\'' AND `id` <= 'a2\\\"' AND `time` < FROM_UNIXTIME(0) ORDER BY `id` ASC LIMIT 128") b = sqlbuilder.NewSQLBuilder(t2) must(b.WriteSelect()) @@ -511,7 +515,7 @@ func TestSQLBuilder(t *testing.T) { must(b.WriteExpireCondition(time.UnixMilli(0).In(time.UTC))) must(b.WriteOrderBy(t2.KeyColumns, false)) must(b.WriteLimit(100)) - mustBuild(b, "SELECT LOW_PRIORITY `a`, `b` FROM `test2`.`t2` WHERE (`a`, `b`) <= ('x2', 21) AND `time` < '1970-01-01 00:00:00' ORDER BY `a`, `b` ASC LIMIT 100") + mustBuild(b, "SELECT LOW_PRIORITY `a`, `b` FROM `test2`.`t2` WHERE (`a`, `b`) <= ('x2', 21) AND `time` < FROM_UNIXTIME(0) ORDER BY `a`, `b` ASC LIMIT 100") b = sqlbuilder.NewSQLBuilder(t2) must(b.WriteSelect()) @@ -520,7 +524,7 @@ func TestSQLBuilder(t *testing.T) { must(b.WriteExpireCondition(time.UnixMilli(0).In(time.UTC))) must(b.WriteOrderBy(t2.KeyColumns, false)) must(b.WriteLimit(100)) - mustBuild(b, "SELECT LOW_PRIORITY `a`, `b` FROM `test2`.`t2` WHERE `a` = 'x3' AND `b` > 31 AND `time` < '1970-01-01 00:00:00' ORDER BY `a`, `b` ASC LIMIT 100") + mustBuild(b, "SELECT LOW_PRIORITY `a`, `b` FROM `test2`.`t2` WHERE `a` = 'x3' AND `b` > 31 AND `time` < FROM_UNIXTIME(0) ORDER BY `a`, `b` ASC LIMIT 100") // test build delete queries b = sqlbuilder.NewSQLBuilder(t1) @@ -532,46 +536,46 @@ func TestSQLBuilder(t *testing.T) { must(b.WriteDelete()) must(b.WriteInCondition(t1.KeyColumns, d("a"))) must(b.WriteExpireCondition(time.UnixMilli(0).In(time.UTC))) - mustBuild(b, "DELETE LOW_PRIORITY FROM `test`.`t1` WHERE `id` IN ('a') AND `time` < '1970-01-01 00:00:00'") + mustBuild(b, "DELETE LOW_PRIORITY FROM `test`.`t1` WHERE `id` IN ('a') AND `time` < FROM_UNIXTIME(0)") b = sqlbuilder.NewSQLBuilder(t1) must(b.WriteDelete()) must(b.WriteInCondition(t1.KeyColumns, d("a"), d("b"))) must(b.WriteExpireCondition(time.UnixMilli(0).In(time.UTC))) - mustBuild(b, "DELETE LOW_PRIORITY FROM `test`.`t1` WHERE `id` IN ('a', 'b') AND `time` < '1970-01-01 00:00:00'") + mustBuild(b, "DELETE LOW_PRIORITY FROM `test`.`t1` WHERE `id` IN ('a', 'b') AND `time` < FROM_UNIXTIME(0)") b = sqlbuilder.NewSQLBuilder(t1) must(b.WriteDelete()) must(b.WriteInCondition(t2.KeyColumns, d("a", 1))) must(b.WriteExpireCondition(time.UnixMilli(0).In(time.UTC))) must(b.WriteLimit(100)) - mustBuild(b, "DELETE LOW_PRIORITY FROM `test`.`t1` WHERE (`a`, `b`) IN (('a', 1)) AND `time` < '1970-01-01 00:00:00' LIMIT 100") + mustBuild(b, "DELETE LOW_PRIORITY FROM `test`.`t1` WHERE (`a`, `b`) IN (('a', 1)) AND `time` < FROM_UNIXTIME(0) LIMIT 100") b = sqlbuilder.NewSQLBuilder(t1) must(b.WriteDelete()) must(b.WriteInCondition(t2.KeyColumns, d("a", 1), d("b", 2))) must(b.WriteExpireCondition(time.UnixMilli(0).In(time.UTC))) must(b.WriteLimit(100)) - mustBuild(b, "DELETE LOW_PRIORITY FROM `test`.`t1` WHERE (`a`, `b`) IN (('a', 1), ('b', 2)) AND `time` < '1970-01-01 00:00:00' LIMIT 100") + mustBuild(b, "DELETE LOW_PRIORITY FROM `test`.`t1` WHERE (`a`, `b`) IN (('a', 1), ('b', 2)) AND `time` < FROM_UNIXTIME(0) LIMIT 100") b = sqlbuilder.NewSQLBuilder(t1) must(b.WriteDelete()) must(b.WriteInCondition(t2.KeyColumns, d("a", 1), d("b", 2))) must(b.WriteExpireCondition(time.UnixMilli(0).In(time.UTC))) - mustBuild(b, "DELETE LOW_PRIORITY FROM `test`.`t1` WHERE (`a`, `b`) IN (('a', 1), ('b', 2)) AND `time` < '1970-01-01 00:00:00'") + mustBuild(b, "DELETE LOW_PRIORITY FROM `test`.`t1` WHERE (`a`, `b`) IN (('a', 1), ('b', 2)) AND `time` < FROM_UNIXTIME(0)") // test select partition table b = sqlbuilder.NewSQLBuilder(tp) must(b.WriteSelect()) must(b.WriteCommonCondition(tp.KeyColumns, ">", d("a1"))) must(b.WriteExpireCondition(time.UnixMilli(0).In(time.UTC))) - mustBuild(b, "SELECT LOW_PRIORITY `id` FROM `testp`.`tp` PARTITION(`p1`) WHERE `id` > 'a1' AND `time` < '1970-01-01 00:00:00'") + mustBuild(b, "SELECT LOW_PRIORITY `id` FROM `testp`.`tp` PARTITION(`p1`) WHERE `id` > 'a1' AND `time` < FROM_UNIXTIME(0)") b = sqlbuilder.NewSQLBuilder(tp) must(b.WriteDelete()) must(b.WriteInCondition(tp.KeyColumns, d("a"), d("b"))) must(b.WriteExpireCondition(time.UnixMilli(0).In(time.UTC))) - mustBuild(b, "DELETE LOW_PRIORITY FROM `testp`.`tp` PARTITION(`p1`) WHERE `id` IN ('a', 'b') AND `time` < '1970-01-01 00:00:00'") + mustBuild(b, "DELETE LOW_PRIORITY FROM `testp`.`tp` PARTITION(`p1`) WHERE `id` IN ('a', 'b') AND `time` < FROM_UNIXTIME(0)") } func TestScanQueryGenerator(t *testing.T) { @@ -624,7 +628,7 @@ func TestScanQueryGenerator(t *testing.T) { path: [][]interface{}{ { nil, 3, - "SELECT LOW_PRIORITY `id` FROM `test`.`t1` WHERE `time` < '1970-01-01 00:00:00' ORDER BY `id` ASC LIMIT 3", + "SELECT LOW_PRIORITY `id` FROM `test`.`t1` WHERE `time` < FROM_UNIXTIME(0) ORDER BY `id` ASC LIMIT 3", }, { nil, 5, "", @@ -637,7 +641,7 @@ func TestScanQueryGenerator(t *testing.T) { path: [][]interface{}{ { nil, 3, - "SELECT LOW_PRIORITY `id` FROM `test`.`t1` WHERE `time` < '1970-01-01 00:00:00' ORDER BY `id` ASC LIMIT 3", + "SELECT LOW_PRIORITY `id` FROM `test`.`t1` WHERE `time` < FROM_UNIXTIME(0) ORDER BY `id` ASC LIMIT 3", }, { [][]types.Datum{}, 5, "", @@ -652,11 +656,11 @@ func TestScanQueryGenerator(t *testing.T) { path: [][]interface{}{ { nil, 3, - "SELECT LOW_PRIORITY `id` FROM `test`.`t1` WHERE `id` >= 1 AND `id` < 100 AND `time` < '1970-01-01 00:00:00' ORDER BY `id` ASC LIMIT 3", + "SELECT LOW_PRIORITY `id` FROM `test`.`t1` WHERE `id` >= 1 AND `id` < 100 AND `time` < FROM_UNIXTIME(0) ORDER BY `id` ASC LIMIT 3", }, { result(d(10), 3), 5, - "SELECT LOW_PRIORITY `id` FROM `test`.`t1` WHERE `id` > 10 AND `id` < 100 AND `time` < '1970-01-01 00:00:00' ORDER BY `id` ASC LIMIT 5", + "SELECT LOW_PRIORITY `id` FROM `test`.`t1` WHERE `id` > 10 AND `id` < 100 AND `time` < FROM_UNIXTIME(0) ORDER BY `id` ASC LIMIT 5", }, { result(d(15), 4), 5, @@ -670,15 +674,15 @@ func TestScanQueryGenerator(t *testing.T) { path: [][]interface{}{ { nil, 3, - "SELECT LOW_PRIORITY `id` FROM `test`.`t1` WHERE `time` < '1970-01-01 00:00:00' ORDER BY `id` ASC LIMIT 3", + "SELECT LOW_PRIORITY `id` FROM `test`.`t1` WHERE `time` < FROM_UNIXTIME(0) ORDER BY `id` ASC LIMIT 3", }, { result(d(2), 3), 5, - "SELECT LOW_PRIORITY `id` FROM `test`.`t1` WHERE `id` > 2 AND `time` < '1970-01-01 00:00:00' ORDER BY `id` ASC LIMIT 5", + "SELECT LOW_PRIORITY `id` FROM `test`.`t1` WHERE `id` > 2 AND `time` < FROM_UNIXTIME(0) ORDER BY `id` ASC LIMIT 5", }, { result(d(4), 5), 6, - "SELECT LOW_PRIORITY `id` FROM `test`.`t1` WHERE `id` > 4 AND `time` < '1970-01-01 00:00:00' ORDER BY `id` ASC LIMIT 6", + "SELECT LOW_PRIORITY `id` FROM `test`.`t1` WHERE `id` > 4 AND `time` < FROM_UNIXTIME(0) ORDER BY `id` ASC LIMIT 6", }, { result(d(7), 5), 5, "", @@ -691,7 +695,7 @@ func TestScanQueryGenerator(t *testing.T) { path: [][]interface{}{ { nil, 5, - "SELECT LOW_PRIORITY `a`, `b`, `c` FROM `test2`.`t2` WHERE `time` < '1970-01-01 00:00:00' ORDER BY `a`, `b`, `c` ASC LIMIT 5", + "SELECT LOW_PRIORITY `a`, `b`, `c` FROM `test2`.`t2` WHERE `time` < FROM_UNIXTIME(0) ORDER BY `a`, `b`, `c` ASC LIMIT 5", }, { nil, 5, "", @@ -704,7 +708,7 @@ func TestScanQueryGenerator(t *testing.T) { path: [][]interface{}{ { nil, 5, - "SELECT LOW_PRIORITY `a`, `b`, `c` FROM `test2`.`t2` WHERE `time` < '1970-01-01 00:00:00' ORDER BY `a`, `b`, `c` ASC LIMIT 5", + "SELECT LOW_PRIORITY `a`, `b`, `c` FROM `test2`.`t2` WHERE `time` < FROM_UNIXTIME(0) ORDER BY `a`, `b`, `c` ASC LIMIT 5", }, { nil, 5, "", @@ -717,7 +721,7 @@ func TestScanQueryGenerator(t *testing.T) { path: [][]interface{}{ { nil, 5, - "SELECT LOW_PRIORITY `a`, `b`, `c` FROM `test2`.`t2` WHERE `time` < '1970-01-01 00:00:00' ORDER BY `a`, `b`, `c` ASC LIMIT 5", + "SELECT LOW_PRIORITY `a`, `b`, `c` FROM `test2`.`t2` WHERE `time` < FROM_UNIXTIME(0) ORDER BY `a`, `b`, `c` ASC LIMIT 5", }, { [][]types.Datum{}, 5, "", @@ -730,7 +734,7 @@ func TestScanQueryGenerator(t *testing.T) { path: [][]interface{}{ { nil, 5, - "SELECT LOW_PRIORITY `a`, `b`, `c` FROM `test2`.`t2` WHERE `time` < '1970-01-01 00:00:00' ORDER BY `a`, `b`, `c` ASC LIMIT 5", + "SELECT LOW_PRIORITY `a`, `b`, `c` FROM `test2`.`t2` WHERE `time` < FROM_UNIXTIME(0) ORDER BY `a`, `b`, `c` ASC LIMIT 5", }, { result(d(1, "x", []byte{0xf0}), 4), 5, "", @@ -745,39 +749,39 @@ func TestScanQueryGenerator(t *testing.T) { path: [][]interface{}{ { nil, 5, - "SELECT LOW_PRIORITY `a`, `b`, `c` FROM `test2`.`t2` WHERE `a` = 1 AND `b` = 'x' AND `c` >= x'0e' AND (`a`, `b`, `c`) < (100, 'z', x'ff') AND `time` < '1970-01-01 00:00:00' ORDER BY `a`, `b`, `c` ASC LIMIT 5", + "SELECT LOW_PRIORITY `a`, `b`, `c` FROM `test2`.`t2` WHERE `a` = 1 AND `b` = 'x' AND `c` >= x'0e' AND (`a`, `b`, `c`) < (100, 'z', x'ff') AND `time` < FROM_UNIXTIME(0) ORDER BY `a`, `b`, `c` ASC LIMIT 5", }, { result(d(1, "x", []byte{0x1a}), 5), 5, - "SELECT LOW_PRIORITY `a`, `b`, `c` FROM `test2`.`t2` WHERE `a` = 1 AND `b` = 'x' AND `c` > x'1a' AND (`a`, `b`, `c`) < (100, 'z', x'ff') AND `time` < '1970-01-01 00:00:00' ORDER BY `a`, `b`, `c` ASC LIMIT 5", + "SELECT LOW_PRIORITY `a`, `b`, `c` FROM `test2`.`t2` WHERE `a` = 1 AND `b` = 'x' AND `c` > x'1a' AND (`a`, `b`, `c`) < (100, 'z', x'ff') AND `time` < FROM_UNIXTIME(0) ORDER BY `a`, `b`, `c` ASC LIMIT 5", }, { result(d(1, "x", []byte{0x20}), 4), 5, - "SELECT LOW_PRIORITY `a`, `b`, `c` FROM `test2`.`t2` WHERE `a` = 1 AND `b` > 'x' AND (`a`, `b`, `c`) < (100, 'z', x'ff') AND `time` < '1970-01-01 00:00:00' ORDER BY `a`, `b`, `c` ASC LIMIT 5", + "SELECT LOW_PRIORITY `a`, `b`, `c` FROM `test2`.`t2` WHERE `a` = 1 AND `b` > 'x' AND (`a`, `b`, `c`) < (100, 'z', x'ff') AND `time` < FROM_UNIXTIME(0) ORDER BY `a`, `b`, `c` ASC LIMIT 5", }, { result(d(1, "y", []byte{0x0a}), 5), 5, - "SELECT LOW_PRIORITY `a`, `b`, `c` FROM `test2`.`t2` WHERE `a` = 1 AND `b` = 'y' AND `c` > x'0a' AND (`a`, `b`, `c`) < (100, 'z', x'ff') AND `time` < '1970-01-01 00:00:00' ORDER BY `a`, `b`, `c` ASC LIMIT 5", + "SELECT LOW_PRIORITY `a`, `b`, `c` FROM `test2`.`t2` WHERE `a` = 1 AND `b` = 'y' AND `c` > x'0a' AND (`a`, `b`, `c`) < (100, 'z', x'ff') AND `time` < FROM_UNIXTIME(0) ORDER BY `a`, `b`, `c` ASC LIMIT 5", }, { result(d(1, "y", []byte{0x11}), 4), 5, - "SELECT LOW_PRIORITY `a`, `b`, `c` FROM `test2`.`t2` WHERE `a` = 1 AND `b` > 'y' AND (`a`, `b`, `c`) < (100, 'z', x'ff') AND `time` < '1970-01-01 00:00:00' ORDER BY `a`, `b`, `c` ASC LIMIT 5", + "SELECT LOW_PRIORITY `a`, `b`, `c` FROM `test2`.`t2` WHERE `a` = 1 AND `b` > 'y' AND (`a`, `b`, `c`) < (100, 'z', x'ff') AND `time` < FROM_UNIXTIME(0) ORDER BY `a`, `b`, `c` ASC LIMIT 5", }, { result(d(1, "z", []byte{0x02}), 4), 5, - "SELECT LOW_PRIORITY `a`, `b`, `c` FROM `test2`.`t2` WHERE `a` > 1 AND (`a`, `b`, `c`) < (100, 'z', x'ff') AND `time` < '1970-01-01 00:00:00' ORDER BY `a`, `b`, `c` ASC LIMIT 5", + "SELECT LOW_PRIORITY `a`, `b`, `c` FROM `test2`.`t2` WHERE `a` > 1 AND (`a`, `b`, `c`) < (100, 'z', x'ff') AND `time` < FROM_UNIXTIME(0) ORDER BY `a`, `b`, `c` ASC LIMIT 5", }, { result(d(3, "a", []byte{0x01}), 5), 5, - "SELECT LOW_PRIORITY `a`, `b`, `c` FROM `test2`.`t2` WHERE `a` = 3 AND `b` = 'a' AND `c` > x'01' AND (`a`, `b`, `c`) < (100, 'z', x'ff') AND `time` < '1970-01-01 00:00:00' ORDER BY `a`, `b`, `c` ASC LIMIT 5", + "SELECT LOW_PRIORITY `a`, `b`, `c` FROM `test2`.`t2` WHERE `a` = 3 AND `b` = 'a' AND `c` > x'01' AND (`a`, `b`, `c`) < (100, 'z', x'ff') AND `time` < FROM_UNIXTIME(0) ORDER BY `a`, `b`, `c` ASC LIMIT 5", }, { result(d(3, "a", []byte{0x11}), 4), 5, - "SELECT LOW_PRIORITY `a`, `b`, `c` FROM `test2`.`t2` WHERE `a` = 3 AND `b` > 'a' AND (`a`, `b`, `c`) < (100, 'z', x'ff') AND `time` < '1970-01-01 00:00:00' ORDER BY `a`, `b`, `c` ASC LIMIT 5", + "SELECT LOW_PRIORITY `a`, `b`, `c` FROM `test2`.`t2` WHERE `a` = 3 AND `b` > 'a' AND (`a`, `b`, `c`) < (100, 'z', x'ff') AND `time` < FROM_UNIXTIME(0) ORDER BY `a`, `b`, `c` ASC LIMIT 5", }, { result(d(3, "c", []byte{0x12}), 4), 5, - "SELECT LOW_PRIORITY `a`, `b`, `c` FROM `test2`.`t2` WHERE `a` > 3 AND (`a`, `b`, `c`) < (100, 'z', x'ff') AND `time` < '1970-01-01 00:00:00' ORDER BY `a`, `b`, `c` ASC LIMIT 5", + "SELECT LOW_PRIORITY `a`, `b`, `c` FROM `test2`.`t2` WHERE `a` > 3 AND (`a`, `b`, `c`) < (100, 'z', x'ff') AND `time` < FROM_UNIXTIME(0) ORDER BY `a`, `b`, `c` ASC LIMIT 5", }, { result(d(5, "e", []byte{0xa1}), 4), 5, "", @@ -792,19 +796,19 @@ func TestScanQueryGenerator(t *testing.T) { path: [][]interface{}{ { nil, 5, - "SELECT LOW_PRIORITY `a`, `b`, `c` FROM `test2`.`t2` WHERE `a` >= 1 AND `a` < 100 AND `time` < '1970-01-01 00:00:00' ORDER BY `a`, `b`, `c` ASC LIMIT 5", + "SELECT LOW_PRIORITY `a`, `b`, `c` FROM `test2`.`t2` WHERE `a` >= 1 AND `a` < 100 AND `time` < FROM_UNIXTIME(0) ORDER BY `a`, `b`, `c` ASC LIMIT 5", }, { result(d(1, "x", []byte{0x1a}), 5), 5, - "SELECT LOW_PRIORITY `a`, `b`, `c` FROM `test2`.`t2` WHERE `a` = 1 AND `b` = 'x' AND `c` > x'1a' AND `a` < 100 AND `time` < '1970-01-01 00:00:00' ORDER BY `a`, `b`, `c` ASC LIMIT 5", + "SELECT LOW_PRIORITY `a`, `b`, `c` FROM `test2`.`t2` WHERE `a` = 1 AND `b` = 'x' AND `c` > x'1a' AND `a` < 100 AND `time` < FROM_UNIXTIME(0) ORDER BY `a`, `b`, `c` ASC LIMIT 5", }, { result(d(1, "x", []byte{0x20}), 4), 5, - "SELECT LOW_PRIORITY `a`, `b`, `c` FROM `test2`.`t2` WHERE `a` = 1 AND `b` > 'x' AND `a` < 100 AND `time` < '1970-01-01 00:00:00' ORDER BY `a`, `b`, `c` ASC LIMIT 5", + "SELECT LOW_PRIORITY `a`, `b`, `c` FROM `test2`.`t2` WHERE `a` = 1 AND `b` > 'x' AND `a` < 100 AND `time` < FROM_UNIXTIME(0) ORDER BY `a`, `b`, `c` ASC LIMIT 5", }, { result(d(1, "y", []byte{0x0a}), 4), 5, - "SELECT LOW_PRIORITY `a`, `b`, `c` FROM `test2`.`t2` WHERE `a` > 1 AND `a` < 100 AND `time` < '1970-01-01 00:00:00' ORDER BY `a`, `b`, `c` ASC LIMIT 5", + "SELECT LOW_PRIORITY `a`, `b`, `c` FROM `test2`.`t2` WHERE `a` > 1 AND `a` < 100 AND `time` < FROM_UNIXTIME(0) ORDER BY `a`, `b`, `c` ASC LIMIT 5", }, }, }, @@ -816,19 +820,19 @@ func TestScanQueryGenerator(t *testing.T) { path: [][]interface{}{ { nil, 5, - "SELECT LOW_PRIORITY `a`, `b`, `c` FROM `test2`.`t2` WHERE `a` = 1 AND `b` >= 'x' AND (`a`, `b`) < (100, 'z') AND `time` < '1970-01-01 00:00:00' ORDER BY `a`, `b`, `c` ASC LIMIT 5", + "SELECT LOW_PRIORITY `a`, `b`, `c` FROM `test2`.`t2` WHERE `a` = 1 AND `b` >= 'x' AND (`a`, `b`) < (100, 'z') AND `time` < FROM_UNIXTIME(0) ORDER BY `a`, `b`, `c` ASC LIMIT 5", }, { result(d(1, "x", []byte{0x1a}), 5), 5, - "SELECT LOW_PRIORITY `a`, `b`, `c` FROM `test2`.`t2` WHERE `a` = 1 AND `b` = 'x' AND `c` > x'1a' AND (`a`, `b`) < (100, 'z') AND `time` < '1970-01-01 00:00:00' ORDER BY `a`, `b`, `c` ASC LIMIT 5", + "SELECT LOW_PRIORITY `a`, `b`, `c` FROM `test2`.`t2` WHERE `a` = 1 AND `b` = 'x' AND `c` > x'1a' AND (`a`, `b`) < (100, 'z') AND `time` < FROM_UNIXTIME(0) ORDER BY `a`, `b`, `c` ASC LIMIT 5", }, { result(d(1, "x", []byte{0x20}), 4), 5, - "SELECT LOW_PRIORITY `a`, `b`, `c` FROM `test2`.`t2` WHERE `a` = 1 AND `b` > 'x' AND (`a`, `b`) < (100, 'z') AND `time` < '1970-01-01 00:00:00' ORDER BY `a`, `b`, `c` ASC LIMIT 5", + "SELECT LOW_PRIORITY `a`, `b`, `c` FROM `test2`.`t2` WHERE `a` = 1 AND `b` > 'x' AND (`a`, `b`) < (100, 'z') AND `time` < FROM_UNIXTIME(0) ORDER BY `a`, `b`, `c` ASC LIMIT 5", }, { result(d(1, "y", []byte{0x0a}), 4), 5, - "SELECT LOW_PRIORITY `a`, `b`, `c` FROM `test2`.`t2` WHERE `a` > 1 AND (`a`, `b`) < (100, 'z') AND `time` < '1970-01-01 00:00:00' ORDER BY `a`, `b`, `c` ASC LIMIT 5", + "SELECT LOW_PRIORITY `a`, `b`, `c` FROM `test2`.`t2` WHERE `a` > 1 AND (`a`, `b`) < (100, 'z') AND `time` < FROM_UNIXTIME(0) ORDER BY `a`, `b`, `c` ASC LIMIT 5", }, }, }, @@ -898,25 +902,25 @@ func TestBuildDeleteSQL(t *testing.T) { tbl: t1, expire: time.UnixMilli(0).In(time.UTC), rows: [][]types.Datum{d(1)}, - sql: "DELETE LOW_PRIORITY FROM `test`.`t1` WHERE `id` IN (1) AND `time` < '1970-01-01 00:00:00' LIMIT 1", + sql: "DELETE LOW_PRIORITY FROM `test`.`t1` WHERE `id` IN (1) AND `time` < FROM_UNIXTIME(0) LIMIT 1", }, { tbl: t1, expire: time.UnixMilli(0).In(time.UTC), rows: [][]types.Datum{d(2), d(3), d(4)}, - sql: "DELETE LOW_PRIORITY FROM `test`.`t1` WHERE `id` IN (2, 3, 4) AND `time` < '1970-01-01 00:00:00' LIMIT 3", + sql: "DELETE LOW_PRIORITY FROM `test`.`t1` WHERE `id` IN (2, 3, 4) AND `time` < FROM_UNIXTIME(0) LIMIT 3", }, { tbl: t2, expire: time.UnixMilli(0).In(time.UTC), rows: [][]types.Datum{d(1, "a")}, - sql: "DELETE LOW_PRIORITY FROM `test2`.`t2` WHERE (`a`, `b`) IN ((1, 'a')) AND `time` < '1970-01-01 00:00:00' LIMIT 1", + sql: "DELETE LOW_PRIORITY FROM `test2`.`t2` WHERE (`a`, `b`) IN ((1, 'a')) AND `time` < FROM_UNIXTIME(0) LIMIT 1", }, { tbl: t2, expire: time.UnixMilli(0).In(time.UTC), rows: [][]types.Datum{d(1, "a"), d(2, "b")}, - sql: "DELETE LOW_PRIORITY FROM `test2`.`t2` WHERE (`a`, `b`) IN ((1, 'a'), (2, 'b')) AND `time` < '1970-01-01 00:00:00' LIMIT 2", + sql: "DELETE LOW_PRIORITY FROM `test2`.`t2` WHERE (`a`, `b`) IN ((1, 'a'), (2, 'b')) AND `time` < FROM_UNIXTIME(0) LIMIT 2", }, } diff --git a/ttl/ttlworker/job_manager_integration_test.go b/ttl/ttlworker/job_manager_integration_test.go index e2e864344fde3..a1cafd7a7cb66 100644 --- a/ttl/ttlworker/job_manager_integration_test.go +++ b/ttl/ttlworker/job_manager_integration_test.go @@ -261,6 +261,46 @@ func TestTriggerTTLJob(t *testing.T) { tk.MustQuery("select id from t order by id asc").Check(testkit.Rows("2", "4")) } +func TestTTLDeleteWithTimeZoneChange(t *testing.T) { + failpoint.Enable("github.com/pingcap/tidb/ttl/ttlworker/task-manager-loop-interval", fmt.Sprintf("return(%d)", time.Second)) + defer failpoint.Disable("github.com/pingcap/tidb/ttl/ttlworker/task-manager-loop-interval") + + store, do := testkit.CreateMockStoreAndDomain(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("set @@global.time_zone='Asia/Shanghai'") + tk.MustExec("set @@time_zone='Asia/Shanghai'") + + tk.MustExec("create table t1(id int primary key, t datetime) TTL=`t` + INTERVAL 1 DAY TTL_ENABLE='OFF'") + tbl1, err := do.InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("t1")) + require.NoError(t, err) + tblID1 := tbl1.Meta().ID + tk.MustExec("insert into t1 values(1, NOW()), (2, NOW() - INTERVAL 31 HOUR), (3, NOW() - INTERVAL 33 HOUR)") + + tk.MustExec("create table t2(id int primary key, t timestamp) TTL=`t` + INTERVAL 1 DAY TTL_ENABLE='OFF'") + tbl2, err := do.InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("t2")) + require.NoError(t, err) + tblID2 := tbl2.Meta().ID + tk.MustExec("insert into t2 values(1, NOW()), (2, NOW() - INTERVAL 31 HOUR), (3, NOW() - INTERVAL 33 HOUR)") + + tk.MustExec("set @@global.time_zone='UTC'") + tk.MustExec("set @@time_zone='UTC'") + tk.MustExec("alter table t1 TTL_ENABLE='ON'") + tk.MustExec("alter table t2 TTL_ENABLE='ON'") + + ctx, cancel := context.WithTimeout(context.TODO(), time.Minute) + defer cancel() + cli := do.TTLJobManager().GetCommandCli() + _, _ = client.TriggerNewTTLJob(ctx, cli, "test", "t1") + _, _ = client.TriggerNewTTLJob(ctx, cli, "test", "t2") + + waitTTLJobFinished(t, tk, tblID1) + tk.MustQuery("select id from t1 order by id asc").Check(testkit.Rows("1", "2")) + + waitTTLJobFinished(t, tk, tblID2) + tk.MustQuery("select id from t2 order by id asc").Check(testkit.Rows("1")) +} + func waitTTLJobFinished(t *testing.T, tk *testkit.TestKit, tableID int64) { start := time.Now() for time.Since(start) < time.Minute { diff --git a/ttl/ttlworker/scan_test.go b/ttl/ttlworker/scan_test.go index 1c280a5b0f2b2..3659ce0843617 100644 --- a/ttl/ttlworker/scan_test.go +++ b/ttl/ttlworker/scan_test.go @@ -248,7 +248,7 @@ func (t *mockScanTask) selectSQL(i int) string { if i == 0 { op = ">=" } - return fmt.Sprintf("SELECT LOW_PRIORITY `_tidb_rowid` FROM `test`.`t1` WHERE `_tidb_rowid` %s %d AND `time` < '1970-01-01 08:00:00' ORDER BY `_tidb_rowid` ASC LIMIT 3", op, i*100) + return fmt.Sprintf("SELECT LOW_PRIORITY `_tidb_rowid` FROM `test`.`t1` WHERE `_tidb_rowid` %s %d AND `time` < FROM_UNIXTIME(0) ORDER BY `_tidb_rowid` ASC LIMIT 3", op, i*100) } func (t *mockScanTask) runDoScanForTest(delTaskCnt int, errString string) *ttlScanTaskExecResult {