Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ttl: TTL support split scan ranges for some primary key with more than one columns #39896

Merged
merged 3 commits into from
Dec 14, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions metrics/grafana/tidb.json
Original file line number Diff line number Diff line change
Expand Up @@ -17776,9 +17776,9 @@
"targets": [
{
"exemplar": true,
"expr": "sum(tidb_server_ttl_job_status{k8s_cluster=\"$k8s_cluster\",tidb_cluster=\"$tidb_cluster\", instance=~\"$instance\"}) by (type)",
"expr": "sum(tidb_server_ttl_job_status{k8s_cluster=\"$k8s_cluster\",tidb_cluster=\"$tidb_cluster\", instance=~\"$instance\"}) by (type, instance)",
"interval": "",
"legendFormat": "{{ type }}",
"legendFormat": "{{ instance }} {{ type }}",
"queryType": "randomWalk",
"refId": "A"
}
Expand Down
30 changes: 29 additions & 1 deletion ttl/cache/split_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,10 @@ func createTTLTable(t *testing.T, tk *testkit.TestKit, name string, option strin
return createTTLTableWithSQL(t, tk, name, fmt.Sprintf("create table test.%s(id %s primary key, t timestamp) TTL = `t` + interval 1 day", name, option))
}

func create2PKTTLTable(t *testing.T, tk *testkit.TestKit, name string, option string) *cache.PhysicalTable {
return createTTLTableWithSQL(t, tk, name, fmt.Sprintf("create table test.%s(id %s, id2 int, t timestamp, primary key(id, id2)) TTL = `t` + interval 1 day", name, option))
}

func createTTLTableWithSQL(t *testing.T, tk *testkit.TestKit, name string, sql string) *cache.PhysicalTable {
tk.MustExec(sql)
is, ok := tk.Session().GetDomainInfoSchema().(infoschema.InfoSchema)
Expand Down Expand Up @@ -273,6 +277,7 @@ func TestSplitTTLScanRangesWithSignedInt(t *testing.T) {
createTTLTable(t, tk, "t4", "int"),
createTTLTable(t, tk, "t5", "bigint"),
createTTLTable(t, tk, "t6", ""), // no clustered
create2PKTTLTable(t, tk, "t7", "tinyint"),
}

tikvStore := newMockTiKVStore(t)
Expand Down Expand Up @@ -334,6 +339,7 @@ func TestSplitTTLScanRangesWithUnsignedInt(t *testing.T) {
createTTLTable(t, tk, "t3", "mediumint unsigned"),
createTTLTable(t, tk, "t4", "int unsigned"),
createTTLTable(t, tk, "t5", "bigint unsigned"),
create2PKTTLTable(t, tk, "t6", "tinyint unsigned"),
}

tikvStore := newMockTiKVStore(t)
Expand Down Expand Up @@ -397,6 +403,7 @@ func TestSplitTTLScanRangesWithBytes(t *testing.T) {
createTTLTable(t, tk, "t2", "char(32) CHARACTER SET BINARY"),
createTTLTable(t, tk, "t3", "varchar(32) CHARACTER SET BINARY"),
createTTLTable(t, tk, "t4", "bit(32)"),
create2PKTTLTable(t, tk, "t5", "binary(32)"),
}

tikvStore := newMockTiKVStore(t)
Expand Down Expand Up @@ -446,6 +453,7 @@ func TestNoTTLSplitSupportTables(t *testing.T) {
createTTLTable(t, tk, "t2", "varchar(32) CHARACTER SET UTF8MB4"),
createTTLTable(t, tk, "t3", "double"),
createTTLTable(t, tk, "t4", "decimal(32, 2)"),
create2PKTTLTable(t, tk, "t5", "char(32) CHARACTER SET UTF8MB4"),
}

tikvStore := newMockTiKVStore(t)
Expand Down Expand Up @@ -526,6 +534,14 @@ func TestGetNextBytesHandleDatum(t *testing.T) {
key: buildBytesRowKey([]byte{1, 2, 3, 4, 5, 6, 7, 8, 0}),
result: []byte{1, 2, 3, 4, 5, 6, 7, 8, 0},
},
{
key: append(buildBytesRowKey([]byte{1, 2, 3, 4, 5, 6, 7, 8, 0}), 0),
result: []byte{1, 2, 3, 4, 5, 6, 7, 8, 0, 0},
},
{
key: append(buildBytesRowKey([]byte{1, 2, 3, 4, 5, 6, 7, 8, 0}), 1),
result: []byte{1, 2, 3, 4, 5, 6, 7, 8, 0, 0},
},
{
key: []byte{},
result: []byte{},
Expand Down Expand Up @@ -613,7 +629,7 @@ func TestGetNextBytesHandleDatum(t *testing.T) {
bs[len(bs)-10] = 254
return bs
},
result: []byte{1, 2, 3, 4, 5, 6, 7},
result: []byte{1, 2, 3, 4, 5, 6, 7, 0},
},
{
// recordPrefix + bytesFlag + [1, 2, 3, 4, 5, 6, 7, 0, 253, 9, 0, 0, 0, 0, 0, 0, 0, 248]
Expand Down Expand Up @@ -718,6 +734,18 @@ func TestGetNextIntHandle(t *testing.T) {
key: tablecodec.EncodeRowKeyWithHandle(tblID, kv.IntHandle(math.MinInt64)),
result: math.MinInt64,
},
{
key: append(tablecodec.EncodeRowKeyWithHandle(tblID, kv.IntHandle(7)), 0),
result: 8,
},
{
key: append(tablecodec.EncodeRowKeyWithHandle(tblID, kv.IntHandle(math.MaxInt64)), 0),
isNull: true,
},
{
key: append(tablecodec.EncodeRowKeyWithHandle(tblID, kv.IntHandle(math.MinInt64)), 0),
result: math.MinInt64 + 1,
},
{
key: []byte{},
result: math.MinInt64,
Expand Down
13 changes: 8 additions & 5 deletions ttl/cache/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,9 +166,9 @@ func NewPhysicalTable(schema model.CIStr, tbl *model.TableInfo, partition model.
}, nil
}

// ValidateKey validates a key
func (t *PhysicalTable) ValidateKey(key []types.Datum) error {
if len(t.KeyColumns) != len(key) {
// ValidateKeyPrefix validates a key prefix
func (t *PhysicalTable) ValidateKeyPrefix(key []types.Datum) error {
if len(key) > len(t.KeyColumns) {
return errors.Errorf("invalid key length: %d, expected %d", len(key), len(t.KeyColumns))
}
return nil
Expand Down Expand Up @@ -198,7 +198,7 @@ func (t *PhysicalTable) EvalExpireTime(ctx context.Context, se session.Session,

// SplitScanRanges split ranges for TTL scan
func (t *PhysicalTable) SplitScanRanges(ctx context.Context, store kv.Storage, splitCnt int) ([]ScanRange, error) {
if len(t.KeyColumns) != 1 || splitCnt <= 1 {
if len(t.KeyColumns) < 1 || splitCnt <= 1 {
return []ScanRange{newFullRange()}, nil
}

Expand Down Expand Up @@ -431,7 +431,10 @@ func GetNextBytesHandleDatum(key kv.Key, recordPrefix []byte) (d types.Datum) {
return d
}

if _, v, err := codec.DecodeOne(encodedVal); err == nil {
if remain, v, err := codec.DecodeOne(encodedVal); err == nil {
if len(remain) > 0 {
v.SetBytes(kv.Key(v.GetBytes()).Next())
}
return v
}

Expand Down
18 changes: 7 additions & 11 deletions ttl/sqlbuilder/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,16 +319,12 @@ type ScanQueryGenerator struct {

// NewScanQueryGenerator creates a new ScanQueryGenerator
func NewScanQueryGenerator(tbl *cache.PhysicalTable, expire time.Time, rangeStart []types.Datum, rangeEnd []types.Datum) (*ScanQueryGenerator, error) {
if len(rangeStart) > 0 {
if err := tbl.ValidateKey(rangeStart); err != nil {
return nil, err
}
if err := tbl.ValidateKeyPrefix(rangeStart); err != nil {
return nil, err
}

if len(rangeEnd) > 0 {
if err := tbl.ValidateKey(rangeEnd); err != nil {
return nil, err
}
if err := tbl.ValidateKeyPrefix(rangeEnd); err != nil {
return nil, err
}

return &ScanQueryGenerator{
Expand Down Expand Up @@ -393,11 +389,11 @@ func (g *ScanQueryGenerator) setStack(key []types.Datum) error {
return nil
}

if err := g.tbl.ValidateKey(key); err != nil {
if err := g.tbl.ValidateKeyPrefix(key); err != nil {
return err
}

g.stack = g.stack[:cap(g.stack)]
g.stack = g.stack[:len(key)]
for i := 0; i < len(key); i++ {
g.stack[i] = key[0 : i+1]
}
Expand Down Expand Up @@ -440,7 +436,7 @@ func (g *ScanQueryGenerator) buildSQL() (string, error) {
}

if len(g.keyRangeEnd) > 0 {
if err := b.WriteCommonCondition(g.tbl.KeyColumns, "<", g.keyRangeEnd); err != nil {
if err := b.WriteCommonCondition(g.tbl.KeyColumns[0:len(g.keyRangeEnd)], "<", g.keyRangeEnd); err != nil {
return "", err
}
}
Expand Down
48 changes: 48 additions & 0 deletions ttl/sqlbuilder/sql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -702,6 +702,54 @@ func TestScanQueryGenerator(t *testing.T) {
},
},
},
{
tbl: t2,
expire: time.UnixMilli(0).In(time.UTC),
rangeStart: d(1),
rangeEnd: d(100),
path: [][]interface{}{
{
nil, 5,
"SELECT LOW_PRIORITY `a`, `b`, `c` FROM `test2`.`t2` WHERE `a` >= 1 AND `a` < 100 AND `time` < '1970-01-01 00:00:00' ORDER BY `a`, `b`, `c` ASC LIMIT 5",
},
{
result(d(1, "x", []byte{0x1a}), 5), 5,
"SELECT LOW_PRIORITY `a`, `b`, `c` FROM `test2`.`t2` WHERE `a` = 1 AND `b` = 'x' AND `c` > x'1a' AND `a` < 100 AND `time` < '1970-01-01 00:00:00' ORDER BY `a`, `b`, `c` ASC LIMIT 5",
},
{
result(d(1, "x", []byte{0x20}), 4), 5,
"SELECT LOW_PRIORITY `a`, `b`, `c` FROM `test2`.`t2` WHERE `a` = 1 AND `b` > 'x' AND `a` < 100 AND `time` < '1970-01-01 00:00:00' ORDER BY `a`, `b`, `c` ASC LIMIT 5",
},
{
result(d(1, "y", []byte{0x0a}), 4), 5,
"SELECT LOW_PRIORITY `a`, `b`, `c` FROM `test2`.`t2` WHERE `a` > 1 AND `a` < 100 AND `time` < '1970-01-01 00:00:00' ORDER BY `a`, `b`, `c` ASC LIMIT 5",
},
},
},
{
tbl: t2,
expire: time.UnixMilli(0).In(time.UTC),
rangeStart: d(1, "x"),
rangeEnd: d(100, "z"),
path: [][]interface{}{
{
nil, 5,
"SELECT LOW_PRIORITY `a`, `b`, `c` FROM `test2`.`t2` WHERE `a` = 1 AND `b` >= 'x' AND (`a`, `b`) < (100, 'z') AND `time` < '1970-01-01 00:00:00' ORDER BY `a`, `b`, `c` ASC LIMIT 5",
},
{
result(d(1, "x", []byte{0x1a}), 5), 5,
"SELECT LOW_PRIORITY `a`, `b`, `c` FROM `test2`.`t2` WHERE `a` = 1 AND `b` = 'x' AND `c` > x'1a' AND (`a`, `b`) < (100, 'z') AND `time` < '1970-01-01 00:00:00' ORDER BY `a`, `b`, `c` ASC LIMIT 5",
},
{
result(d(1, "x", []byte{0x20}), 4), 5,
"SELECT LOW_PRIORITY `a`, `b`, `c` FROM `test2`.`t2` WHERE `a` = 1 AND `b` > 'x' AND (`a`, `b`) < (100, 'z') AND `time` < '1970-01-01 00:00:00' ORDER BY `a`, `b`, `c` ASC LIMIT 5",
},
{
result(d(1, "y", []byte{0x0a}), 4), 5,
"SELECT LOW_PRIORITY `a`, `b`, `c` FROM `test2`.`t2` WHERE `a` > 1 AND (`a`, `b`) < (100, 'z') AND `time` < '1970-01-01 00:00:00' ORDER BY `a`, `b`, `c` ASC LIMIT 5",
},
},
},
}

for i, c := range cases {
Expand Down