Skip to content

Commit

Permalink
ttl: support telemetry for TTL (#40806)
Browse files Browse the repository at this point in the history
close #40519
  • Loading branch information
lcwangchao authored Jan 29, 2023
1 parent 5048568 commit b5be9f6
Show file tree
Hide file tree
Showing 5 changed files with 379 additions and 0 deletions.
2 changes: 2 additions & 0 deletions telemetry/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ go_library(
"id.go",
"status.go",
"telemetry.go",
"ttl.go",
"util.go",
],
importpath = "github.com/pingcap/tidb/telemetry",
Expand All @@ -24,6 +25,7 @@ go_library(
"//infoschema",
"//kv",
"//metrics",
"//parser/ast",
"//parser/model",
"//parser/mysql",
"//sessionctx",
Expand Down
3 changes: 3 additions & 0 deletions telemetry/data_feature_usage.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ type featureUsage struct {
AutoIDNoCache bool `json:"autoIDNoCache"`
IndexMergeUsageCounter *m.IndexMergeUsageCounter `json:"indexMergeUsageCounter"`
ResourceControlUsage *resourceControlUsage `json:"resourceControl"`
TTLUsage *ttlUsageCounter `json:"ttlUsage"`
}

type placementPolicyUsage struct {
Expand Down Expand Up @@ -117,6 +118,8 @@ func getFeatureUsage(ctx context.Context, sctx sessionctx.Context) (*featureUsag

usage.IndexMergeUsageCounter = getIndexMergeUsageInfo()

usage.TTLUsage = getTTLUsageInfo(ctx, sctx)

return &usage, nil
}

Expand Down
158 changes: 158 additions & 0 deletions telemetry/data_feature_usage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,16 @@
package telemetry_test

import (
"encoding/json"
"fmt"
"strings"
"testing"
"time"

_ "github.com/pingcap/tidb/autoid_service"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/ddl"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/telemetry"
"github.com/pingcap/tidb/testkit"
Expand Down Expand Up @@ -619,3 +623,157 @@ func TestIndexMergeUsage(t *testing.T) {
require.NoError(t, err)
require.Equal(t, int64(2), usage.IndexMergeUsageCounter.IndexMergeUsed)
}

func TestTTLTelemetry(t *testing.T) {
timeFormat := "2006-01-02 15:04:05"
dateFormat := "2006-01-02"

now := time.Now()
curDate := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, now.Location())
if interval := curDate.Add(time.Hour * 24).Sub(now); interval > 0 && interval < 5*time.Minute {
// make sure testing is not running at the end of one day
time.Sleep(interval)
}

store, do := testkit.CreateMockStoreAndDomain(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("set @@global.tidb_ttl_job_enable=0")

getTTLTable := func(name string) *model.TableInfo {
tbl, err := do.InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr(name))
require.NoError(t, err)
require.NotNil(t, tbl.Meta().TTLInfo)
return tbl.Meta()
}

jobIDIdx := 1
insertTTLHistory := func(tblName string, partitionName string, createTime, finishTime, ttlExpire time.Time, scanError string, totalRows, errorRows int64, status string) {
defer func() {
jobIDIdx++
}()

tbl := getTTLTable(tblName)
tblID := tbl.ID
partitionID := tbl.ID
if partitionName != "" {
for _, def := range tbl.Partition.Definitions {
if def.Name.L == strings.ToLower(partitionName) {
partitionID = def.ID
}
}
require.NotEqual(t, tblID, partitionID)
}

summary := make(map[string]interface{})
summary["total_rows"] = totalRows
summary["success_rows"] = totalRows - errorRows
summary["error_rows"] = errorRows
summary["total_scan_task"] = 1
summary["scheduled_scan_task"] = 1
summary["finished_scan_task"] = 1
if scanError != "" {
summary["scan_task_err"] = scanError
}

summaryText, err := json.Marshal(summary)
require.NoError(t, err)

tk.MustExec("insert into "+
"mysql.tidb_ttl_job_history ("+
" job_id, table_id, parent_table_id, table_schema, table_name, partition_name, "+
" create_time, finish_time, ttl_expire, summary_text, "+
" expired_rows, deleted_rows, error_delete_rows, status) "+
"VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
jobIDIdx, partitionID, tblID, "test", tblName, partitionName,
createTime.Format(timeFormat), finishTime.Format(timeFormat), ttlExpire.Format(timeFormat), summaryText,
totalRows, totalRows-errorRows, errorRows, status,
)
}

oneDayAgoDate := curDate.Add(-24 * time.Hour)
// start today, end today
times11 := []time.Time{curDate.Add(time.Hour), curDate.Add(2 * time.Hour), curDate}
// start yesterday, end today
times21 := []time.Time{curDate.Add(-2 * time.Hour), curDate, curDate.Add(-3 * time.Hour)}
// start yesterday, end yesterday
times31 := []time.Time{oneDayAgoDate, oneDayAgoDate.Add(time.Hour), oneDayAgoDate.Add(-time.Hour)}
times32 := []time.Time{oneDayAgoDate.Add(2 * time.Hour), oneDayAgoDate.Add(3 * time.Hour), oneDayAgoDate.Add(time.Hour)}
times33 := []time.Time{oneDayAgoDate.Add(4 * time.Hour), oneDayAgoDate.Add(5 * time.Hour), oneDayAgoDate.Add(3 * time.Hour)}
// start 2 days ago, end yesterday
times41 := []time.Time{oneDayAgoDate.Add(-2 * time.Hour), oneDayAgoDate.Add(time.Hour), oneDayAgoDate.Add(-3 * time.Hour)}
// start two days ago, end two days ago
times51 := []time.Time{oneDayAgoDate.Add(-5 * time.Hour), oneDayAgoDate.Add(-4 * time.Hour), oneDayAgoDate.Add(-6 * time.Hour)}

tk.MustExec("create table t1 (t timestamp) TTL=`t` + interval 1 hour")
insertTTLHistory("t1", "", times11[0], times11[1], times11[2], "", 100000000, 0, "finished")
insertTTLHistory("t1", "", times21[0], times21[1], times21[2], "", 100000000, 0, "finished")
insertTTLHistory("t1", "", times31[0], times31[1], times31[2], "err1", 112600, 110000, "finished")
insertTTLHistory("t1", "", times32[0], times32[1], times32[2], "", 2600, 0, "timeout")
insertTTLHistory("t1", "", times33[0], times33[1], times33[2], "", 2600, 0, "finished")
insertTTLHistory("t1", "", times41[0], times41[1], times41[2], "", 2600, 0, "finished")
insertTTLHistory("t1", "", times51[0], times51[1], times51[2], "", 100000000, 1, "finished")

usage, err := telemetry.GetFeatureUsage(tk.Session())
require.NoError(t, err)
checkTableHistWithDeleteRows := func(vals ...int64) {
require.Equal(t, 5, len(vals))
require.Equal(t, 5, len(usage.TTLUsage.TableHistWithDeleteRows))
require.Equal(t, int64(10*1000), *usage.TTLUsage.TableHistWithDeleteRows[0].LessThan)
require.Equal(t, vals[0], usage.TTLUsage.TableHistWithDeleteRows[0].Count)
require.Equal(t, int64(100*1000), *usage.TTLUsage.TableHistWithDeleteRows[1].LessThan)
require.Equal(t, vals[1], usage.TTLUsage.TableHistWithDeleteRows[1].Count)
require.Equal(t, int64(1000*1000), *usage.TTLUsage.TableHistWithDeleteRows[2].LessThan)
require.Equal(t, vals[2], usage.TTLUsage.TableHistWithDeleteRows[2].Count)
require.Equal(t, int64(10*1000*1000), *usage.TTLUsage.TableHistWithDeleteRows[3].LessThan)
require.Equal(t, vals[3], usage.TTLUsage.TableHistWithDeleteRows[3].Count)
require.True(t, usage.TTLUsage.TableHistWithDeleteRows[4].LessThanMax)
require.Nil(t, usage.TTLUsage.TableHistWithDeleteRows[4].LessThan)
require.Equal(t, vals[4], usage.TTLUsage.TableHistWithDeleteRows[4].Count)
}

checkTableHistWithDelay := func(vals ...int64) {
require.Equal(t, 5, len(vals))
require.Equal(t, 5, len(usage.TTLUsage.TableHistWithDelayTime))
require.Equal(t, int64(1), *usage.TTLUsage.TableHistWithDelayTime[0].LessThan)
require.Equal(t, vals[0], usage.TTLUsage.TableHistWithDelayTime[0].Count)
require.Equal(t, int64(6), *usage.TTLUsage.TableHistWithDelayTime[1].LessThan)
require.Equal(t, vals[1], usage.TTLUsage.TableHistWithDelayTime[1].Count)
require.Equal(t, int64(24), *usage.TTLUsage.TableHistWithDelayTime[2].LessThan)
require.Equal(t, vals[2], usage.TTLUsage.TableHistWithDelayTime[2].Count)
require.Equal(t, int64(72), *usage.TTLUsage.TableHistWithDelayTime[3].LessThan)
require.Equal(t, vals[3], usage.TTLUsage.TableHistWithDelayTime[3].Count)
require.True(t, usage.TTLUsage.TableHistWithDelayTime[4].LessThanMax)
require.Nil(t, usage.TTLUsage.TableHistWithDelayTime[4].LessThan)
require.Equal(t, vals[4], usage.TTLUsage.TableHistWithDelayTime[4].Count)
}

require.False(t, usage.TTLUsage.TTLJobEnabled)
require.Equal(t, int64(1), usage.TTLUsage.TTLTables)
require.Equal(t, int64(1), usage.TTLUsage.TTLJobEnabledTables)
require.Equal(t, oneDayAgoDate.Format(dateFormat), usage.TTLUsage.TTLHistDate)
checkTableHistWithDeleteRows(0, 1, 0, 0, 0)
checkTableHistWithDelay(0, 0, 1, 0, 0)

tk.MustExec("create table t2 (t timestamp) TTL=`t` + interval 20 hour")
tk.MustExec("set @@global.tidb_ttl_job_enable=1")
insertTTLHistory("t2", "", times31[0], times31[1], times31[2], "", 9999, 0, "finished")
usage, err = telemetry.GetFeatureUsage(tk.Session())
require.NoError(t, err)
require.True(t, usage.TTLUsage.TTLJobEnabled)
require.Equal(t, int64(2), usage.TTLUsage.TTLTables)
require.Equal(t, int64(2), usage.TTLUsage.TTLJobEnabledTables)
require.Equal(t, oneDayAgoDate.Format(dateFormat), usage.TTLUsage.TTLHistDate)
checkTableHistWithDeleteRows(1, 1, 0, 0, 0)
checkTableHistWithDelay(0, 1, 1, 0, 0)

tk.MustExec("create table t3 (t timestamp) TTL=`t` + interval 1 hour TTL_ENABLE='OFF'")
usage, err = telemetry.GetFeatureUsage(tk.Session())
require.NoError(t, err)
require.True(t, usage.TTLUsage.TTLJobEnabled)
require.Equal(t, int64(3), usage.TTLUsage.TTLTables)
require.Equal(t, int64(2), usage.TTLUsage.TTLJobEnabledTables)
require.Equal(t, oneDayAgoDate.Format(dateFormat), usage.TTLUsage.TTLHistDate)
checkTableHistWithDeleteRows(1, 1, 0, 0, 0)
checkTableHistWithDelay(0, 1, 1, 0, 1)
}
2 changes: 2 additions & 0 deletions telemetry/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ func TestMain(m *testing.M) {
goleak.IgnoreTopFunction("github.com/golang/glog.(*loggingT).flushDaemon"),
goleak.IgnoreTopFunction("github.com/lestrrat-go/httprc.runFetchWorker"),
goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start"),
goleak.IgnoreTopFunction("net/http.(*persistConn).writeLoop"),
goleak.IgnoreTopFunction("internal/poll.runtime_pollWait"),
}

goleak.VerifyTestMain(m, opts...)
Expand Down
Loading

0 comments on commit b5be9f6

Please sign in to comment.