From b5be9f6db297b7990bc94c6de91309dbc263ed87 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=8E=8B=E8=B6=85?= Date: Sun, 29 Jan 2023 19:39:54 +0800 Subject: [PATCH] ttl: support telemetry for TTL (#40806) close pingcap/tidb#40519 --- telemetry/BUILD.bazel | 2 + telemetry/data_feature_usage.go | 3 + telemetry/data_feature_usage_test.go | 158 ++++++++++++++++++++ telemetry/main_test.go | 2 + telemetry/ttl.go | 214 +++++++++++++++++++++++++++ 5 files changed, 379 insertions(+) create mode 100644 telemetry/ttl.go diff --git a/telemetry/BUILD.bazel b/telemetry/BUILD.bazel index 1f032aa3f237a..a6c79f7de596f 100644 --- a/telemetry/BUILD.bazel +++ b/telemetry/BUILD.bazel @@ -13,6 +13,7 @@ go_library( "id.go", "status.go", "telemetry.go", + "ttl.go", "util.go", ], importpath = "github.com/pingcap/tidb/telemetry", @@ -24,6 +25,7 @@ go_library( "//infoschema", "//kv", "//metrics", + "//parser/ast", "//parser/model", "//parser/mysql", "//sessionctx", diff --git a/telemetry/data_feature_usage.go b/telemetry/data_feature_usage.go index 81bf7a9785a3a..8661ce13ecccb 100644 --- a/telemetry/data_feature_usage.go +++ b/telemetry/data_feature_usage.go @@ -60,6 +60,7 @@ type featureUsage struct { AutoIDNoCache bool `json:"autoIDNoCache"` IndexMergeUsageCounter *m.IndexMergeUsageCounter `json:"indexMergeUsageCounter"` ResourceControlUsage *resourceControlUsage `json:"resourceControl"` + TTLUsage *ttlUsageCounter `json:"ttlUsage"` } type placementPolicyUsage struct { @@ -117,6 +118,8 @@ func getFeatureUsage(ctx context.Context, sctx sessionctx.Context) (*featureUsag usage.IndexMergeUsageCounter = getIndexMergeUsageInfo() + usage.TTLUsage = getTTLUsageInfo(ctx, sctx) + return &usage, nil } diff --git a/telemetry/data_feature_usage_test.go b/telemetry/data_feature_usage_test.go index a678bc681eb18..a667219ba50a8 100644 --- a/telemetry/data_feature_usage_test.go +++ b/telemetry/data_feature_usage_test.go @@ -15,12 +15,16 @@ package telemetry_test import ( + "encoding/json" "fmt" + "strings" "testing" + "time" _ "github.com/pingcap/tidb/autoid_service" "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/ddl" + "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/telemetry" "github.com/pingcap/tidb/testkit" @@ -619,3 +623,157 @@ func TestIndexMergeUsage(t *testing.T) { require.NoError(t, err) require.Equal(t, int64(2), usage.IndexMergeUsageCounter.IndexMergeUsed) } + +func TestTTLTelemetry(t *testing.T) { + timeFormat := "2006-01-02 15:04:05" + dateFormat := "2006-01-02" + + now := time.Now() + curDate := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, now.Location()) + if interval := curDate.Add(time.Hour * 24).Sub(now); interval > 0 && interval < 5*time.Minute { + // make sure testing is not running at the end of one day + time.Sleep(interval) + } + + store, do := testkit.CreateMockStoreAndDomain(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("set @@global.tidb_ttl_job_enable=0") + + getTTLTable := func(name string) *model.TableInfo { + tbl, err := do.InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr(name)) + require.NoError(t, err) + require.NotNil(t, tbl.Meta().TTLInfo) + return tbl.Meta() + } + + jobIDIdx := 1 + insertTTLHistory := func(tblName string, partitionName string, createTime, finishTime, ttlExpire time.Time, scanError string, totalRows, errorRows int64, status string) { + defer func() { + jobIDIdx++ + }() + + tbl := getTTLTable(tblName) + tblID := tbl.ID + partitionID := tbl.ID + if partitionName != "" { + for _, def := range tbl.Partition.Definitions { + if def.Name.L == strings.ToLower(partitionName) { + partitionID = def.ID + } + } + require.NotEqual(t, tblID, partitionID) + } + + summary := make(map[string]interface{}) + summary["total_rows"] = totalRows + summary["success_rows"] = totalRows - errorRows + summary["error_rows"] = errorRows + summary["total_scan_task"] = 1 + summary["scheduled_scan_task"] = 1 + summary["finished_scan_task"] = 1 + if scanError != "" { + summary["scan_task_err"] = scanError + } + + summaryText, err := json.Marshal(summary) + require.NoError(t, err) + + tk.MustExec("insert into "+ + "mysql.tidb_ttl_job_history ("+ + " job_id, table_id, parent_table_id, table_schema, table_name, partition_name, "+ + " create_time, finish_time, ttl_expire, summary_text, "+ + " expired_rows, deleted_rows, error_delete_rows, status) "+ + "VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", + jobIDIdx, partitionID, tblID, "test", tblName, partitionName, + createTime.Format(timeFormat), finishTime.Format(timeFormat), ttlExpire.Format(timeFormat), summaryText, + totalRows, totalRows-errorRows, errorRows, status, + ) + } + + oneDayAgoDate := curDate.Add(-24 * time.Hour) + // start today, end today + times11 := []time.Time{curDate.Add(time.Hour), curDate.Add(2 * time.Hour), curDate} + // start yesterday, end today + times21 := []time.Time{curDate.Add(-2 * time.Hour), curDate, curDate.Add(-3 * time.Hour)} + // start yesterday, end yesterday + times31 := []time.Time{oneDayAgoDate, oneDayAgoDate.Add(time.Hour), oneDayAgoDate.Add(-time.Hour)} + times32 := []time.Time{oneDayAgoDate.Add(2 * time.Hour), oneDayAgoDate.Add(3 * time.Hour), oneDayAgoDate.Add(time.Hour)} + times33 := []time.Time{oneDayAgoDate.Add(4 * time.Hour), oneDayAgoDate.Add(5 * time.Hour), oneDayAgoDate.Add(3 * time.Hour)} + // start 2 days ago, end yesterday + times41 := []time.Time{oneDayAgoDate.Add(-2 * time.Hour), oneDayAgoDate.Add(time.Hour), oneDayAgoDate.Add(-3 * time.Hour)} + // start two days ago, end two days ago + times51 := []time.Time{oneDayAgoDate.Add(-5 * time.Hour), oneDayAgoDate.Add(-4 * time.Hour), oneDayAgoDate.Add(-6 * time.Hour)} + + tk.MustExec("create table t1 (t timestamp) TTL=`t` + interval 1 hour") + insertTTLHistory("t1", "", times11[0], times11[1], times11[2], "", 100000000, 0, "finished") + insertTTLHistory("t1", "", times21[0], times21[1], times21[2], "", 100000000, 0, "finished") + insertTTLHistory("t1", "", times31[0], times31[1], times31[2], "err1", 112600, 110000, "finished") + insertTTLHistory("t1", "", times32[0], times32[1], times32[2], "", 2600, 0, "timeout") + insertTTLHistory("t1", "", times33[0], times33[1], times33[2], "", 2600, 0, "finished") + insertTTLHistory("t1", "", times41[0], times41[1], times41[2], "", 2600, 0, "finished") + insertTTLHistory("t1", "", times51[0], times51[1], times51[2], "", 100000000, 1, "finished") + + usage, err := telemetry.GetFeatureUsage(tk.Session()) + require.NoError(t, err) + checkTableHistWithDeleteRows := func(vals ...int64) { + require.Equal(t, 5, len(vals)) + require.Equal(t, 5, len(usage.TTLUsage.TableHistWithDeleteRows)) + require.Equal(t, int64(10*1000), *usage.TTLUsage.TableHistWithDeleteRows[0].LessThan) + require.Equal(t, vals[0], usage.TTLUsage.TableHistWithDeleteRows[0].Count) + require.Equal(t, int64(100*1000), *usage.TTLUsage.TableHistWithDeleteRows[1].LessThan) + require.Equal(t, vals[1], usage.TTLUsage.TableHistWithDeleteRows[1].Count) + require.Equal(t, int64(1000*1000), *usage.TTLUsage.TableHistWithDeleteRows[2].LessThan) + require.Equal(t, vals[2], usage.TTLUsage.TableHistWithDeleteRows[2].Count) + require.Equal(t, int64(10*1000*1000), *usage.TTLUsage.TableHistWithDeleteRows[3].LessThan) + require.Equal(t, vals[3], usage.TTLUsage.TableHistWithDeleteRows[3].Count) + require.True(t, usage.TTLUsage.TableHistWithDeleteRows[4].LessThanMax) + require.Nil(t, usage.TTLUsage.TableHistWithDeleteRows[4].LessThan) + require.Equal(t, vals[4], usage.TTLUsage.TableHistWithDeleteRows[4].Count) + } + + checkTableHistWithDelay := func(vals ...int64) { + require.Equal(t, 5, len(vals)) + require.Equal(t, 5, len(usage.TTLUsage.TableHistWithDelayTime)) + require.Equal(t, int64(1), *usage.TTLUsage.TableHistWithDelayTime[0].LessThan) + require.Equal(t, vals[0], usage.TTLUsage.TableHistWithDelayTime[0].Count) + require.Equal(t, int64(6), *usage.TTLUsage.TableHistWithDelayTime[1].LessThan) + require.Equal(t, vals[1], usage.TTLUsage.TableHistWithDelayTime[1].Count) + require.Equal(t, int64(24), *usage.TTLUsage.TableHistWithDelayTime[2].LessThan) + require.Equal(t, vals[2], usage.TTLUsage.TableHistWithDelayTime[2].Count) + require.Equal(t, int64(72), *usage.TTLUsage.TableHistWithDelayTime[3].LessThan) + require.Equal(t, vals[3], usage.TTLUsage.TableHistWithDelayTime[3].Count) + require.True(t, usage.TTLUsage.TableHistWithDelayTime[4].LessThanMax) + require.Nil(t, usage.TTLUsage.TableHistWithDelayTime[4].LessThan) + require.Equal(t, vals[4], usage.TTLUsage.TableHistWithDelayTime[4].Count) + } + + require.False(t, usage.TTLUsage.TTLJobEnabled) + require.Equal(t, int64(1), usage.TTLUsage.TTLTables) + require.Equal(t, int64(1), usage.TTLUsage.TTLJobEnabledTables) + require.Equal(t, oneDayAgoDate.Format(dateFormat), usage.TTLUsage.TTLHistDate) + checkTableHistWithDeleteRows(0, 1, 0, 0, 0) + checkTableHistWithDelay(0, 0, 1, 0, 0) + + tk.MustExec("create table t2 (t timestamp) TTL=`t` + interval 20 hour") + tk.MustExec("set @@global.tidb_ttl_job_enable=1") + insertTTLHistory("t2", "", times31[0], times31[1], times31[2], "", 9999, 0, "finished") + usage, err = telemetry.GetFeatureUsage(tk.Session()) + require.NoError(t, err) + require.True(t, usage.TTLUsage.TTLJobEnabled) + require.Equal(t, int64(2), usage.TTLUsage.TTLTables) + require.Equal(t, int64(2), usage.TTLUsage.TTLJobEnabledTables) + require.Equal(t, oneDayAgoDate.Format(dateFormat), usage.TTLUsage.TTLHistDate) + checkTableHistWithDeleteRows(1, 1, 0, 0, 0) + checkTableHistWithDelay(0, 1, 1, 0, 0) + + tk.MustExec("create table t3 (t timestamp) TTL=`t` + interval 1 hour TTL_ENABLE='OFF'") + usage, err = telemetry.GetFeatureUsage(tk.Session()) + require.NoError(t, err) + require.True(t, usage.TTLUsage.TTLJobEnabled) + require.Equal(t, int64(3), usage.TTLUsage.TTLTables) + require.Equal(t, int64(2), usage.TTLUsage.TTLJobEnabledTables) + require.Equal(t, oneDayAgoDate.Format(dateFormat), usage.TTLUsage.TTLHistDate) + checkTableHistWithDeleteRows(1, 1, 0, 0, 0) + checkTableHistWithDelay(0, 1, 1, 0, 1) +} diff --git a/telemetry/main_test.go b/telemetry/main_test.go index 0e8d98b2a4f6c..8478a3ead4084 100644 --- a/telemetry/main_test.go +++ b/telemetry/main_test.go @@ -41,6 +41,8 @@ func TestMain(m *testing.M) { goleak.IgnoreTopFunction("github.com/golang/glog.(*loggingT).flushDaemon"), goleak.IgnoreTopFunction("github.com/lestrrat-go/httprc.runFetchWorker"), goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start"), + goleak.IgnoreTopFunction("net/http.(*persistConn).writeLoop"), + goleak.IgnoreTopFunction("internal/poll.runtime_pollWait"), } goleak.VerifyTestMain(m, opts...) diff --git a/telemetry/ttl.go b/telemetry/ttl.go new file mode 100644 index 0000000000000..b9c8c0210fb0c --- /dev/null +++ b/telemetry/ttl.go @@ -0,0 +1,214 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package telemetry + +import ( + "context" + "fmt" + "math" + "time" + + "github.com/pingcap/tidb/infoschema" + "github.com/pingcap/tidb/parser/ast" + "github.com/pingcap/tidb/parser/model" + "github.com/pingcap/tidb/sessionctx" + "github.com/pingcap/tidb/sessionctx/variable" + "github.com/pingcap/tidb/util/logutil" + "github.com/pingcap/tidb/util/sqlexec" + "go.uber.org/zap" +) + +const ( + // selectDeletedRowsOneDaySQL selects the deleted rows for each table of last day + selectDeletedRowsOneDaySQL = `SELECT parent_table_id, CAST(SUM(deleted_rows) AS SIGNED) + FROM + mysql.tidb_ttl_job_history + WHERE + create_time >= CURDATE() - INTERVAL 7 DAY + AND finish_time >= CURDATE() - INTERVAL 1 DAY + AND finish_time < CURDATE() + GROUP BY parent_table_id;` + // selectDelaySQL selects the deletion delay in minute for each table at the end of last day + selectDelaySQL = `SELECT + parent_table_id, TIMESTAMPDIFF(MINUTE, MIN(tm), CURDATE()) AS ttl_minutes + FROM + ( + SELECT + table_id, + parent_table_id, + MAX(ttl_expire) AS tm + FROM + mysql.tidb_ttl_job_history + WHERE + create_time > CURDATE() - INTERVAL 7 DAY + AND finish_time < CURDATE() + AND status = 'finished' + AND JSON_VALID(summary_text) + AND summary_text ->> "$.scan_task_err" IS NULL + GROUP BY + table_id, parent_table_id + ) t + GROUP BY parent_table_id;` +) + +type ttlHistItem struct { + // LessThan is not null means it collects the count of items with condition [prevLessThan, LessThan) + // Notice that it's type is an int64 pointer to forbid serializing it when it is not set. + LessThan *int64 `json:"less_than,omitempty"` + // LessThanMax is true means the condition is [prevLessThan, MAX) + LessThanMax bool `json:"less_than_max,omitempty"` + // Count is the count of items that fit the condition + Count int64 `json:"count"` +} + +type ttlUsageCounter struct { + TTLJobEnabled bool `json:"ttl_job_enabled"` + TTLTables int64 `json:"ttl_table_count"` + TTLJobEnabledTables int64 `json:"ttl_job_enabled_tables"` + TTLHistDate string `json:"ttl_hist_date"` + TableHistWithDeleteRows []*ttlHistItem `json:"table_hist_with_delete_rows"` + TableHistWithDelayTime []*ttlHistItem `json:"table_hist_with_delay_time"` +} + +func int64Pointer(val int64) *int64 { + v := val + return &v +} + +func (c *ttlUsageCounter) UpdateTableHistWithDeleteRows(rows int64) { + for _, item := range c.TableHistWithDeleteRows { + if item.LessThanMax || rows < *item.LessThan { + item.Count++ + return + } + } +} + +func (c *ttlUsageCounter) UpdateTableHistWithDelayTime(tblCnt int, hours int64) { + for _, item := range c.TableHistWithDelayTime { + if item.LessThanMax || hours < *item.LessThan { + item.Count += int64(tblCnt) + return + } + } +} + +func getTTLUsageInfo(ctx context.Context, sctx sessionctx.Context) (counter *ttlUsageCounter) { + counter = &ttlUsageCounter{ + TTLJobEnabled: variable.EnableTTLJob.Load(), + TTLHistDate: time.Now().Add(-24 * time.Hour).Format("2006-01-02"), + TableHistWithDeleteRows: []*ttlHistItem{ + { + LessThan: int64Pointer(10 * 1000), + }, + { + LessThan: int64Pointer(100 * 1000), + }, + { + LessThan: int64Pointer(1000 * 1000), + }, + { + LessThan: int64Pointer(10000 * 1000), + }, + { + LessThanMax: true, + }, + }, + TableHistWithDelayTime: []*ttlHistItem{ + { + LessThan: int64Pointer(1), + }, + { + LessThan: int64Pointer(6), + }, + { + LessThan: int64Pointer(24), + }, + { + LessThan: int64Pointer(72), + }, + { + LessThanMax: true, + }, + }, + } + + is, ok := sctx.GetDomainInfoSchema().(infoschema.InfoSchema) + if !ok { + // it should never happen + logutil.BgLogger().Error(fmt.Sprintf("GetDomainInfoSchema returns a invalid type: %T", is)) + return + } + + ttlTables := make(map[int64]*model.TableInfo) + for _, db := range is.AllSchemas() { + for _, tbl := range is.SchemaTables(db.Name) { + tblInfo := tbl.Meta() + if tblInfo.State != model.StatePublic || tblInfo.TTLInfo == nil { + continue + } + + counter.TTLTables++ + if tblInfo.TTLInfo.Enable { + counter.TTLJobEnabledTables++ + } + ttlTables[tblInfo.ID] = tblInfo + } + } + + exec := sctx.(sqlexec.RestrictedSQLExecutor) + rows, _, err := exec.ExecRestrictedSQL(ctx, nil, selectDeletedRowsOneDaySQL) + if err != nil { + logutil.BgLogger().Error("exec sql error", zap.String("SQL", selectDeletedRowsOneDaySQL), zap.Error(err)) + } else { + for _, row := range rows { + counter.UpdateTableHistWithDeleteRows(row.GetInt64(1)) + } + } + + rows, _, err = exec.ExecRestrictedSQL(ctx, nil, selectDelaySQL) + if err != nil { + logutil.BgLogger().Error("exec sql error", zap.String("SQL", selectDelaySQL), zap.Error(err)) + } else { + noHistoryTables := len(ttlTables) + for _, row := range rows { + tblID := row.GetInt64(0) + tbl, ok := ttlTables[tblID] + if !ok { + // table not exist, maybe truncated or deleted + continue + } + noHistoryTables-- + + evalIntervalSQL := fmt.Sprintf( + "SELECT TIMESTAMPDIFF(HOUR, CURDATE() - INTERVAL %d MINUTE, CURDATE() - INTERVAL %s %s)", + row.GetInt64(1), tbl.TTLInfo.IntervalExprStr, ast.TimeUnitType(tbl.TTLInfo.IntervalTimeUnit).String(), + ) + + innerRows, _, err := exec.ExecRestrictedSQL(ctx, nil, evalIntervalSQL) + if err != nil || len(innerRows) == 0 { + logutil.BgLogger().Error("exec sql error or empty rows returned", zap.String("SQL", evalIntervalSQL), zap.Error(err)) + continue + } + + hours := innerRows[0].GetInt64(0) + counter.UpdateTableHistWithDelayTime(1, hours) + } + + // When no history found for a table, use max delay + counter.UpdateTableHistWithDelayTime(noHistoryTables, math.MaxInt64) + } + return +}