diff --git a/ttl/ttlworker/BUILD.bazel b/ttl/ttlworker/BUILD.bazel index 0a2a6112e3ac8..6e3f77a106d86 100644 --- a/ttl/ttlworker/BUILD.bazel +++ b/ttl/ttlworker/BUILD.bazel @@ -10,6 +10,7 @@ go_library( "scan.go", "session.go", "task_manager.go", + "timer_sync.go", "worker.go", ], importpath = "github.com/pingcap/tidb/ttl/ttlworker", @@ -17,10 +18,12 @@ go_library( deps = [ "//infoschema", "//kv", + "//parser/model", "//parser/terror", "//sessionctx", "//sessionctx/variable", "//store/driver/error", + "//timer/api", "//ttl/cache", "//ttl/client", "//ttl/metrics", @@ -38,6 +41,7 @@ go_library( "@com_github_pingcap_failpoint//:failpoint", "@com_github_tikv_client_go_v2//tikv", "@io_etcd_go_etcd_client_v3//:client", + "@org_golang_x_exp//slices", "@org_golang_x_time//rate", "@org_uber_go_multierr//:multierr", "@org_uber_go_zap//:zap", @@ -55,11 +59,12 @@ go_test( "session_test.go", "task_manager_integration_test.go", "task_manager_test.go", + "timer_sync_test.go", ], embed = [":ttlworker"], flaky = True, race = "on", - shard_count = 35, + shard_count = 36, deps = [ "//domain", "//infoschema", @@ -73,6 +78,8 @@ go_test( "//statistics/handle", "//store/mockstore", "//testkit", + "//timer/api", + "//timer/tablestore", "//ttl/cache", "//ttl/client", "//ttl/metrics", diff --git a/ttl/ttlworker/timer_sync.go b/ttl/ttlworker/timer_sync.go new file mode 100644 index 0000000000000..8a23f57e87394 --- /dev/null +++ b/ttl/ttlworker/timer_sync.go @@ -0,0 +1,269 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package ttlworker + +import ( + "context" + "encoding/json" + "fmt" + "time" + + "github.com/pingcap/errors" + "github.com/pingcap/tidb/infoschema" + "github.com/pingcap/tidb/parser/model" + timerapi "github.com/pingcap/tidb/timer/api" + "github.com/pingcap/tidb/ttl/cache" + "github.com/pingcap/tidb/ttl/session" + "github.com/pingcap/tidb/util/logutil" + "go.uber.org/zap" + "golang.org/x/exp/slices" +) + +const ( + timerKeyPrefix = "/tidb/ttl/physical_table/" + timerHookClass = "tidb.ttl" + fullRefreshTimersCacheInterval = 10 * time.Minute + timerDelayDeleteInterval = 10 * time.Minute +) + +// TTLTimerData is the data stored in each timer for TTL +type TTLTimerData struct { + TableID int64 `json:"table_id"` + PhysicalID int64 `json:"physical_id"` +} + +// TTLTimersSyncer is used to sync timers for ttl +type TTLTimersSyncer struct { + pool sessionPool + cli timerapi.TimerClient + key2Timers map[string]*timerapi.TimerRecord + lastPullTimers time.Time + delayDelete time.Duration +} + +// NewTTLTimerSyncer creates a new TTLTimersSyncer +func NewTTLTimerSyncer(pool sessionPool, cli timerapi.TimerClient) *TTLTimersSyncer { + return &TTLTimersSyncer{ + pool: pool, + cli: cli, + key2Timers: make(map[string]*timerapi.TimerRecord), + delayDelete: timerDelayDeleteInterval, + } +} + +// SetDelayDeleteInterval sets interval for delay delete a timer +// It's better not to delete a timer immediately when the related table is not exist. The reason is that information schema +// is synced asynchronously, the new created table's meta may not synced to the current node yet. +func (g *TTLTimersSyncer) SetDelayDeleteInterval(interval time.Duration) { + g.delayDelete = interval +} + +// SyncTimers syncs timers with TTL tables +func (g *TTLTimersSyncer) SyncTimers(ctx context.Context, is infoschema.InfoSchema) { + if time.Since(g.lastPullTimers) > fullRefreshTimersCacheInterval { + newKey2Timers := make(map[string]*timerapi.TimerRecord, len(g.key2Timers)) + timers, err := g.cli.GetTimers(ctx, timerapi.WithKeyPrefix(timerKeyPrefix)) + if err != nil { + logutil.BgLogger().Error("failed to pull timers", zap.Error(err)) + return + } + + for _, timer := range timers { + newKey2Timers[timer.Key] = timer + } + g.key2Timers = newKey2Timers + g.lastPullTimers = time.Now() + } + + se, err := getSession(g.pool) + if err != nil { + logutil.BgLogger().Error("failed to sync TTL timers", zap.Error(err)) + return + } + defer se.Close() + + currentTimerKeys := make(map[string]struct{}) + for _, db := range is.AllSchemas() { + for _, tbl := range is.SchemaTables(db.Name) { + tblInfo := tbl.Meta() + if tblInfo.State != model.StatePublic || tblInfo.TTLInfo == nil { + continue + } + for _, key := range g.syncTimersForTable(ctx, se, db.Name, tblInfo) { + currentTimerKeys[key] = struct{}{} + } + } + } + + for key, timer := range g.key2Timers { + if _, ok := currentTimerKeys[key]; ok { + continue + } + + if time.Since(timer.CreateTime) > g.delayDelete { + if _, err = g.cli.DeleteTimer(ctx, timer.ID); err != nil { + logutil.BgLogger().Error("failed to delete timer", zap.Error(err), zap.String("timerID", timer.ID)) + } + } + } +} + +func (g *TTLTimersSyncer) syncTimersForTable(ctx context.Context, se session.Session, schema model.CIStr, tblInfo *model.TableInfo) []string { + if tblInfo.Partition == nil { + return []string{g.syncOneTimer(ctx, se, schema, tblInfo, nil)} + } + + defs := tblInfo.Partition.Definitions + keys := make([]string, 0, len(defs)) + for i := range defs { + keys = append( + keys, + g.syncOneTimer(ctx, se, schema, tblInfo, &defs[i]), + ) + } + return keys +} + +func (g *TTLTimersSyncer) syncOneTimer(ctx context.Context, se session.Session, schema model.CIStr, tblInfo *model.TableInfo, partition *model.PartitionDefinition) (key string) { + key = buildTimerKey(tblInfo, partition) + tags := getTimerTags(schema, tblInfo, partition) + ttlInfo := tblInfo.TTLInfo + existTimer, ok := g.key2Timers[key] + if ok && slices.Equal(existTimer.Tags, tags) && existTimer.Enable == ttlInfo.Enable && existTimer.SchedPolicyExpr == ttlInfo.JobInterval { + return + } + + timer, err := g.cli.GetTimerByKey(ctx, key) + if err != nil && !errors.ErrorEqual(err, timerapi.ErrTimerNotExist) { + logutil.BgLogger().Error("failed to get timer for TTL table", zap.Error(err), zap.String("key", key)) + return + } + + if errors.ErrorEqual(err, timerapi.ErrTimerNotExist) { + var watermark time.Time + ttlTableStatus, err := getTTLTableStatus(ctx, se, tblInfo, partition) + if err != nil { + logutil.BgLogger().Warn("failed to get TTL table status", zap.Error(err), zap.String("key", key)) + } + + if ttlTableStatus != nil { + watermark = ttlTableStatus.LastJobStartTime + } + + dataObj := &TTLTimerData{ + TableID: tblInfo.ID, + PhysicalID: tblInfo.ID, + } + + if partition != nil { + dataObj.PhysicalID = partition.ID + } + + data, err := json.Marshal(dataObj) + if err != nil { + logutil.BgLogger().Error("failed to marshal TTL data object", zap.Error(err)) + return + } + + timer, err = g.cli.CreateTimer(ctx, timerapi.TimerSpec{ + Key: key, + Tags: tags, + Data: data, + SchedPolicyType: timerapi.SchedEventInterval, + SchedPolicyExpr: ttlInfo.JobInterval, + HookClass: timerHookClass, + Watermark: watermark, + Enable: ttlInfo.Enable, + }) + if err != nil { + logutil.BgLogger().Error("failed to create new timer", + zap.Error(err), + zap.String("key", key), + zap.Strings("tags", tags), + ) + return + } + g.key2Timers[key] = timer + return + } + + err = g.cli.UpdateTimer(ctx, timer.ID, + timerapi.WithSetTags(tags), + timerapi.WithSetSchedExpr(timerapi.SchedEventInterval, tblInfo.TTLInfo.JobInterval), + timerapi.WithSetEnable(tblInfo.TTLInfo.Enable), + ) + + if err != nil { + logutil.BgLogger().Error("failed to update timer", + zap.Error(err), + zap.String("timerID", timer.ID), + zap.String("key", key), + zap.Strings("tags", tags), + ) + return + } + + timer, err = g.cli.GetTimerByID(ctx, timer.ID) + if err != nil { + logutil.BgLogger().Error("failed to get timer", + zap.Error(err), + zap.String("timerID", timer.ID), + ) + return + } + + g.key2Timers[timer.Key] = timer + return +} + +func getTimerTags(schema model.CIStr, tblInfo *model.TableInfo, partition *model.PartitionDefinition) []string { + dbTag := fmt.Sprintf("db=%s", schema.O) + tblTag := fmt.Sprintf("table=%s", tblInfo.Name.O) + if partition != nil { + return []string{ + dbTag, tblTag, + fmt.Sprintf("partition=%s", partition.Name.O), + } + } + + return []string{dbTag, tblTag} +} + +func buildTimerKey(tblInfo *model.TableInfo, partition *model.PartitionDefinition) string { + physicalID := tblInfo.ID + if partition != nil { + physicalID = partition.ID + } + return fmt.Sprintf("%s%d/%d", timerKeyPrefix, tblInfo.ID, physicalID) +} + +func getTTLTableStatus(ctx context.Context, se session.Session, tblInfo *model.TableInfo, partition *model.PartitionDefinition) (*cache.TableStatus, error) { + pid := tblInfo.ID + if partition != nil { + pid = partition.ID + } + + sql, args := cache.SelectFromTTLTableStatusWithID(pid) + rows, err := se.ExecuteSQL(ctx, sql, args...) + if err != nil { + return nil, err + } + + if len(rows) == 0 { + return nil, nil + } + + return cache.RowToTableStatus(se, rows[0]) +} diff --git a/ttl/ttlworker/timer_sync_test.go b/ttl/ttlworker/timer_sync_test.go new file mode 100644 index 0000000000000..eb08f681e878a --- /dev/null +++ b/ttl/ttlworker/timer_sync_test.go @@ -0,0 +1,238 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package ttlworker_test + +import ( + "context" + "encoding/json" + "fmt" + "testing" + "time" + + "github.com/google/uuid" + "github.com/pingcap/tidb/domain" + "github.com/pingcap/tidb/parser/model" + "github.com/pingcap/tidb/testkit" + timerapi "github.com/pingcap/tidb/timer/api" + "github.com/pingcap/tidb/timer/tablestore" + "github.com/pingcap/tidb/ttl/ttlworker" + "github.com/stretchr/testify/require" +) + +func TestTTLTimerSync(t *testing.T) { + store, do := testkit.CreateMockStoreAndDomain(t) + + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec(tablestore.CreateTimerTableSQL("test", "test_timers")) + timerStore := tablestore.NewTableTimerStore(1, do.SysSessionPool(), "test", "test_timers", nil) + defer timerStore.Close() + + tk.MustExec("set @@global.tidb_ttl_job_enable=0") + tk.MustExec("create table t0(t timestamp)") + tk.MustExec("create table t1(t timestamp) TTL=`t`+interval 1 HOUR") + tk.MustExec("create table t2(t timestamp) TTL=`t`+interval 1 HOUR ttl_job_interval='1m' ttl_enable='OFF'") + tk.MustExec("create table t3(t timestamp, t2 timestamp) TTL=`t`+interval 1 HOUR ttl_job_interval='1h'") + tk.MustExec("create table tp1(a int, t timestamp) TTL=`t`+interval 1 HOUR ttl_job_interval='3h' partition by range(a) (" + + "partition p0 values less than (10)," + + "partition p1 values less than (100)," + + "partition p2 values less than (1000)" + + ")") + var zeroTime time.Time + wm1 := time.Unix(3600*24*12, 0) + wm2 := time.Unix(3600*24*24, 0) + insertTTLTableStatusWatermark(t, do, tk, "test", "t1", "", zeroTime) + insertTTLTableStatusWatermark(t, do, tk, "test", "t2", "", wm1) + insertTTLTableStatusWatermark(t, do, tk, "test", "tp1", "p0", zeroTime) + insertTTLTableStatusWatermark(t, do, tk, "test", "tp1", "p1", wm2) + + cli := timerapi.NewDefaultTimerClient(timerStore) + sync := ttlworker.NewTTLTimerSyncer(do.SysSessionPool(), cli) + + // first sync + sync.SyncTimers(context.TODO(), do.InfoSchema()) + checkTimerCnt(t, cli, 6) + timer1 := checkTimerWithTableMeta(t, do, cli, "test", "t1", "", zeroTime) + timer2 := checkTimerWithTableMeta(t, do, cli, "test", "t2", "", wm1) + timer3 := checkTimerWithTableMeta(t, do, cli, "test", "t3", "", zeroTime) + timerP10 := checkTimerWithTableMeta(t, do, cli, "test", "tp1", "p0", zeroTime) + timerP11 := checkTimerWithTableMeta(t, do, cli, "test", "tp1", "p1", wm2) + timerP12 := checkTimerWithTableMeta(t, do, cli, "test", "tp1", "p2", zeroTime) + + // create table/partition + tk.MustExec("create table t4(t timestamp) TTL=`t`+interval 1 HOUR ttl_job_interval='2h'") + tk.MustExec("create table t5(t timestamp)") + tk.MustExec("alter table tp1 add partition (partition p3 values less than(10000))") + tk.MustExec("create table tp2(a int, t timestamp) TTL=`t`+interval 1 HOUR ttl_job_interval='6h' partition by range(a) (" + + "partition p0 values less than (10)," + + "partition p1 values less than (100)" + + ")") + sync.SyncTimers(context.TODO(), do.InfoSchema()) + checkTimerCnt(t, cli, 10) + timer4 := checkTimerWithTableMeta(t, do, cli, "test", "t4", "", zeroTime) + checkTimerWithTableMeta(t, do, cli, "test", "tp1", "p3", zeroTime) + timerP20 := checkTimerWithTableMeta(t, do, cli, "test", "tp2", "p0", zeroTime) + timerP21 := checkTimerWithTableMeta(t, do, cli, "test", "tp2", "p1", zeroTime) + checkTimersNotChange(t, cli, timer1, timer2, timer3, timerP10, timerP11, timerP12) + + // update table + tk.MustExec("alter table t1 ttl_enable='OFF'") + tk.MustExec("alter table t2 ttl_job_interval='6m'") + tk.MustExec("alter table t3 TTL=`t2`+interval 2 HOUR") + tk.MustExec("alter table t5 TTL=`t`+interval 10 HOUR ttl_enable='OFF'") + tk.MustExec("alter table tp1 ttl_job_interval='3m'") + sync.SyncTimers(context.TODO(), do.InfoSchema()) + checkTimerCnt(t, cli, 11) + checkTimerWithTableMeta(t, do, cli, "test", "t1", "", zeroTime) + timer2 = checkTimerWithTableMeta(t, do, cli, "test", "t2", "", wm1) + timer5 := checkTimerWithTableMeta(t, do, cli, "test", "t5", "", zeroTime) + timerP10 = checkTimerWithTableMeta(t, do, cli, "test", "tp1", "p0", zeroTime) + timerP11 = checkTimerWithTableMeta(t, do, cli, "test", "tp1", "p1", wm2) + timerP12 = checkTimerWithTableMeta(t, do, cli, "test", "tp1", "p2", zeroTime) + timerP13 := checkTimerWithTableMeta(t, do, cli, "test", "tp1", "p3", zeroTime) + checkTimersNotChange(t, cli, timer3, timer4, timerP20, timerP21) + + // rename table + tk.MustExec("rename table t1 to t1a") + sync.SyncTimers(context.TODO(), do.InfoSchema()) + checkTimerCnt(t, cli, 11) + timer1 = checkTimerWithTableMeta(t, do, cli, "test", "t1a", "", zeroTime) + checkTimersNotChange(t, cli, timer1, timer2, timer3, timer4, timer5, timerP10, timerP11, timerP12, timerP13, timerP20, timerP21) + + // truncate table/partition + oldTimer2 := timer2 + oldTimerP11 := timerP11 + oldTimerP20 := timerP20 + oldTimerP21 := timerP21 + tk.MustExec("truncate table t2") + tk.MustExec("alter table tp1 truncate partition p1") + tk.MustExec("truncate table tp2") + sync.SyncTimers(context.TODO(), do.InfoSchema()) + checkTimerCnt(t, cli, 15) + timer2 = checkTimerWithTableMeta(t, do, cli, "test", "t2", "", zeroTime) + require.NotEqual(t, oldTimer2.ID, timer2.ID) + timerP11 = checkTimerWithTableMeta(t, do, cli, "test", "tp1", "p1", zeroTime) + require.NotEqual(t, oldTimerP11.ID, timerP11.ID) + timerP20 = checkTimerWithTableMeta(t, do, cli, "test", "tp2", "p0", zeroTime) + require.NotEqual(t, oldTimerP20.ID, timerP20.ID) + timerP21 = checkTimerWithTableMeta(t, do, cli, "test", "tp2", "p1", zeroTime) + require.NotEqual(t, oldTimerP21.ID, timerP21.ID) + checkTimersNotChange(t, cli, timer1, oldTimer2, timer3, timer4, timer5, timerP10, oldTimerP11, timerP12, timerP13, oldTimerP20, oldTimerP21) + + // drop table/partition + tk.MustExec("drop table t1a") + tk.MustExec("alter table tp1 drop partition p3") + tk.MustExec("drop table tp2") + sync.SyncTimers(context.TODO(), do.InfoSchema()) + checkTimerCnt(t, cli, 15) + checkTimersNotChange(t, cli, oldTimer2, oldTimerP11, oldTimerP20, oldTimerP21) + checkTimersNotChange(t, cli, timer1, timer2, timer3, timer4, timer5, timerP10, timerP11, timerP12, timerP13, timerP20, timerP21) + + // clear deleted tables + sync.SetDelayDeleteInterval(time.Millisecond) + time.Sleep(time.Second) + sync.SyncTimers(context.TODO(), do.InfoSchema()) + checkTimerCnt(t, cli, 7) + checkTimersNotChange(t, cli, timer2, timer3, timer4, timer5, timerP10, timerP11, timerP12) +} + +func insertTTLTableStatusWatermark(t *testing.T, do *domain.Domain, tk *testkit.TestKit, db, table, partition string, watermark time.Time) { + tbl, err := do.InfoSchema().TableByName(model.NewCIStr(db), model.NewCIStr(table)) + require.NoError(t, err) + tblInfo := tbl.Meta() + physicalID := tblInfo.ID + var par model.PartitionDefinition + if partition != "" { + for _, def := range tblInfo.Partition.Definitions { + if def.Name.L == model.NewCIStr(partition).L { + par = def + } + } + require.NotNil(t, par) + physicalID = par.ID + } + + if watermark.IsZero() { + tk.MustExec("insert into mysql.tidb_ttl_table_status (table_id, parent_table_id) values (?, ?)", physicalID, tblInfo.ID) + return + } + + tk.MustExec( + "insert into mysql.tidb_ttl_table_status (table_id, parent_table_id, last_job_id, last_job_start_time, last_job_finish_time, last_job_ttl_expire) values(?, ?, ?, FROM_UNIXTIME(?), FROM_UNIXTIME(?), FROM_UNIXTIME(?))", + physicalID, tblInfo.ID, uuid.NewString(), watermark.Unix(), watermark.Add(time.Minute).Unix(), watermark.Add(-time.Minute).Unix(), + ) +} + +func checkTimerCnt(t *testing.T, cli timerapi.TimerClient, cnt int) { + timers, err := cli.GetTimers(context.TODO()) + require.NoError(t, err) + require.Equal(t, cnt, len(timers)) +} + +func checkTimersNotChange(t *testing.T, cli timerapi.TimerClient, timers ...*timerapi.TimerRecord) { + for i, timer := range timers { + tm, err := cli.GetTimerByID(context.TODO(), timer.ID) + require.NoError(t, err) + require.Equal(t, *timer, *tm, fmt.Sprintf("index: %d", i)) + } +} + +func checkTimerWithTableMeta(t *testing.T, do *domain.Domain, cli timerapi.TimerClient, db, table, partition string, watermark time.Time) *timerapi.TimerRecord { + is := do.InfoSchema() + dbInfo, ok := is.SchemaByName(model.NewCIStr(db)) + require.True(t, ok) + tbl, err := is.TableByName(model.NewCIStr(db), model.NewCIStr(table)) + require.NoError(t, err) + tblInfo := tbl.Meta() + physicalID := tblInfo.ID + var par model.PartitionDefinition + if partition != "" { + for _, def := range tblInfo.Partition.Definitions { + if def.Name.L == model.NewCIStr(partition).L { + par = def + } + } + require.NotNil(t, par) + physicalID = par.ID + } + + key := fmt.Sprintf("/tidb/ttl/physical_table/%d/%d", tblInfo.ID, physicalID) + timer, err := cli.GetTimerByKey(context.TODO(), key) + require.NoError(t, err) + + require.Equal(t, tblInfo.TTLInfo.Enable, timer.Enable) + require.Equal(t, timerapi.SchedEventInterval, timer.SchedPolicyType) + require.Equal(t, tblInfo.TTLInfo.JobInterval, timer.SchedPolicyExpr) + if partition == "" { + require.Equal(t, []string{ + fmt.Sprintf("db=%s", dbInfo.Name.O), + fmt.Sprintf("table=%s", tblInfo.Name.O), + }, timer.Tags) + } else { + require.Equal(t, []string{ + fmt.Sprintf("db=%s", dbInfo.Name.O), + fmt.Sprintf("table=%s", tblInfo.Name.O), + fmt.Sprintf("partition=%s", par.Name.O), + }, timer.Tags) + } + + require.NotNil(t, timer.Data) + var timerData ttlworker.TTLTimerData + require.NoError(t, json.Unmarshal(timer.Data, &timerData)) + require.Equal(t, tblInfo.ID, timerData.TableID) + require.Equal(t, physicalID, timerData.PhysicalID) + require.Equal(t, watermark.Unix(), timer.Watermark.Unix()) + return timer +}