Skip to content

Commit

Permalink
ttl: TTL support split scan ranges for some primary key with more tha…
Browse files Browse the repository at this point in the history
…n one columns (#39896) (#39908)

close #39895
  • Loading branch information
ti-chi-bot authored Dec 20, 2022
1 parent 2ed9959 commit adfe96a
Show file tree
Hide file tree
Showing 5 changed files with 94 additions and 19 deletions.
4 changes: 2 additions & 2 deletions metrics/grafana/tidb.json
Original file line number Diff line number Diff line change
Expand Up @@ -17768,9 +17768,9 @@
"targets": [
{
"exemplar": true,
"expr": "sum(tidb_server_ttl_job_status{k8s_cluster=\"$k8s_cluster\",tidb_cluster=\"$tidb_cluster\", instance=~\"$instance\"}) by (type)",
"expr": "sum(tidb_server_ttl_job_status{k8s_cluster=\"$k8s_cluster\",tidb_cluster=\"$tidb_cluster\", instance=~\"$instance\"}) by (type, instance)",
"interval": "",
"legendFormat": "{{ type }}",
"legendFormat": "{{ instance }} {{ type }}",
"queryType": "randomWalk",
"refId": "A"
}
Expand Down
30 changes: 29 additions & 1 deletion ttl/cache/split_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,10 @@ func createTTLTable(t *testing.T, tk *testkit.TestKit, name string, option strin
return createTTLTableWithSQL(t, tk, name, fmt.Sprintf("create table test.%s(id %s primary key, t timestamp) TTL = `t` + interval 1 day", name, option))
}

func create2PKTTLTable(t *testing.T, tk *testkit.TestKit, name string, option string) *cache.PhysicalTable {
return createTTLTableWithSQL(t, tk, name, fmt.Sprintf("create table test.%s(id %s, id2 int, t timestamp, primary key(id, id2)) TTL = `t` + interval 1 day", name, option))
}

func createTTLTableWithSQL(t *testing.T, tk *testkit.TestKit, name string, sql string) *cache.PhysicalTable {
tk.MustExec(sql)
is, ok := tk.Session().GetDomainInfoSchema().(infoschema.InfoSchema)
Expand Down Expand Up @@ -279,6 +283,7 @@ func TestSplitTTLScanRangesWithSignedInt(t *testing.T) {
createTTLTable(t, tk, "t4", "int"),
createTTLTable(t, tk, "t5", "bigint"),
createTTLTable(t, tk, "t6", ""), // no clustered
create2PKTTLTable(t, tk, "t7", "tinyint"),
}

tikvStore := newMockTiKVStore(t)
Expand Down Expand Up @@ -345,6 +350,7 @@ func TestSplitTTLScanRangesWithUnsignedInt(t *testing.T) {
createTTLTable(t, tk, "t3", "mediumint unsigned"),
createTTLTable(t, tk, "t4", "int unsigned"),
createTTLTable(t, tk, "t5", "bigint unsigned"),
create2PKTTLTable(t, tk, "t6", "tinyint unsigned"),
}

tikvStore := newMockTiKVStore(t)
Expand Down Expand Up @@ -413,6 +419,7 @@ func TestSplitTTLScanRangesWithBytes(t *testing.T) {
createTTLTable(t, tk, "t2", "char(32) CHARACTER SET BINARY"),
createTTLTable(t, tk, "t3", "varchar(32) CHARACTER SET BINARY"),
createTTLTable(t, tk, "t4", "bit(32)"),
create2PKTTLTable(t, tk, "t5", "binary(32)"),
}

tikvStore := newMockTiKVStore(t)
Expand Down Expand Up @@ -467,6 +474,7 @@ func TestNoTTLSplitSupportTables(t *testing.T) {
createTTLTable(t, tk, "t2", "varchar(32) CHARACTER SET UTF8MB4"),
createTTLTable(t, tk, "t3", "double"),
createTTLTable(t, tk, "t4", "decimal(32, 2)"),
create2PKTTLTable(t, tk, "t5", "char(32) CHARACTER SET UTF8MB4"),
}

tikvStore := newMockTiKVStore(t)
Expand Down Expand Up @@ -547,6 +555,14 @@ func TestGetNextBytesHandleDatum(t *testing.T) {
key: buildBytesRowKey([]byte{1, 2, 3, 4, 5, 6, 7, 8, 0}),
result: []byte{1, 2, 3, 4, 5, 6, 7, 8, 0},
},
{
key: append(buildBytesRowKey([]byte{1, 2, 3, 4, 5, 6, 7, 8, 0}), 0),
result: []byte{1, 2, 3, 4, 5, 6, 7, 8, 0, 0},
},
{
key: append(buildBytesRowKey([]byte{1, 2, 3, 4, 5, 6, 7, 8, 0}), 1),
result: []byte{1, 2, 3, 4, 5, 6, 7, 8, 0, 0},
},
{
key: []byte{},
result: []byte{},
Expand Down Expand Up @@ -634,7 +650,7 @@ func TestGetNextBytesHandleDatum(t *testing.T) {
bs[len(bs)-10] = 254
return bs
},
result: []byte{1, 2, 3, 4, 5, 6, 7},
result: []byte{1, 2, 3, 4, 5, 6, 7, 0},
},
{
// recordPrefix + bytesFlag + [1, 2, 3, 4, 5, 6, 7, 0, 253, 9, 0, 0, 0, 0, 0, 0, 0, 248]
Expand Down Expand Up @@ -739,6 +755,18 @@ func TestGetNextIntHandle(t *testing.T) {
key: tablecodec.EncodeRowKeyWithHandle(tblID, kv.IntHandle(math.MinInt64)),
result: math.MinInt64,
},
{
key: append(tablecodec.EncodeRowKeyWithHandle(tblID, kv.IntHandle(7)), 0),
result: 8,
},
{
key: append(tablecodec.EncodeRowKeyWithHandle(tblID, kv.IntHandle(math.MaxInt64)), 0),
isNull: true,
},
{
key: append(tablecodec.EncodeRowKeyWithHandle(tblID, kv.IntHandle(math.MinInt64)), 0),
result: math.MinInt64 + 1,
},
{
key: []byte{},
result: math.MinInt64,
Expand Down
13 changes: 8 additions & 5 deletions ttl/cache/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,9 +166,9 @@ func NewPhysicalTable(schema model.CIStr, tbl *model.TableInfo, partition model.
}, nil
}

// ValidateKey validates a key
func (t *PhysicalTable) ValidateKey(key []types.Datum) error {
if len(t.KeyColumns) != len(key) {
// ValidateKeyPrefix validates a key prefix
func (t *PhysicalTable) ValidateKeyPrefix(key []types.Datum) error {
if len(key) > len(t.KeyColumns) {
return errors.Errorf("invalid key length: %d, expected %d", len(key), len(t.KeyColumns))
}
return nil
Expand Down Expand Up @@ -198,7 +198,7 @@ func (t *PhysicalTable) EvalExpireTime(ctx context.Context, se session.Session,

// SplitScanRanges split ranges for TTL scan
func (t *PhysicalTable) SplitScanRanges(ctx context.Context, store kv.Storage, splitCnt int) ([]ScanRange, error) {
if len(t.KeyColumns) != 1 || splitCnt <= 1 {
if len(t.KeyColumns) < 1 || splitCnt <= 1 {
return []ScanRange{newFullRange()}, nil
}

Expand Down Expand Up @@ -431,7 +431,10 @@ func GetNextBytesHandleDatum(key kv.Key, recordPrefix []byte) (d types.Datum) {
return d
}

if _, v, err := codec.DecodeOne(encodedVal); err == nil {
if remain, v, err := codec.DecodeOne(encodedVal); err == nil {
if len(remain) > 0 {
v.SetBytes(kv.Key(v.GetBytes()).Next())
}
return v
}

Expand Down
18 changes: 7 additions & 11 deletions ttl/sqlbuilder/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,16 +319,12 @@ type ScanQueryGenerator struct {

// NewScanQueryGenerator creates a new ScanQueryGenerator
func NewScanQueryGenerator(tbl *cache.PhysicalTable, expire time.Time, rangeStart []types.Datum, rangeEnd []types.Datum) (*ScanQueryGenerator, error) {
if len(rangeStart) > 0 {
if err := tbl.ValidateKey(rangeStart); err != nil {
return nil, err
}
if err := tbl.ValidateKeyPrefix(rangeStart); err != nil {
return nil, err
}

if len(rangeEnd) > 0 {
if err := tbl.ValidateKey(rangeEnd); err != nil {
return nil, err
}
if err := tbl.ValidateKeyPrefix(rangeEnd); err != nil {
return nil, err
}

return &ScanQueryGenerator{
Expand Down Expand Up @@ -393,11 +389,11 @@ func (g *ScanQueryGenerator) setStack(key []types.Datum) error {
return nil
}

if err := g.tbl.ValidateKey(key); err != nil {
if err := g.tbl.ValidateKeyPrefix(key); err != nil {
return err
}

g.stack = g.stack[:cap(g.stack)]
g.stack = g.stack[:len(key)]
for i := 0; i < len(key); i++ {
g.stack[i] = key[0 : i+1]
}
Expand Down Expand Up @@ -440,7 +436,7 @@ func (g *ScanQueryGenerator) buildSQL() (string, error) {
}

if len(g.keyRangeEnd) > 0 {
if err := b.WriteCommonCondition(g.tbl.KeyColumns, "<", g.keyRangeEnd); err != nil {
if err := b.WriteCommonCondition(g.tbl.KeyColumns[0:len(g.keyRangeEnd)], "<", g.keyRangeEnd); err != nil {
return "", err
}
}
Expand Down
48 changes: 48 additions & 0 deletions ttl/sqlbuilder/sql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -702,6 +702,54 @@ func TestScanQueryGenerator(t *testing.T) {
},
},
},
{
tbl: t2,
expire: time.UnixMilli(0).In(time.UTC),
rangeStart: d(1),
rangeEnd: d(100),
path: [][]interface{}{
{
nil, 5,
"SELECT LOW_PRIORITY `a`, `b`, `c` FROM `test2`.`t2` WHERE `a` >= 1 AND `a` < 100 AND `time` < '1970-01-01 00:00:00' ORDER BY `a`, `b`, `c` ASC LIMIT 5",
},
{
result(d(1, "x", []byte{0x1a}), 5), 5,
"SELECT LOW_PRIORITY `a`, `b`, `c` FROM `test2`.`t2` WHERE `a` = 1 AND `b` = 'x' AND `c` > x'1a' AND `a` < 100 AND `time` < '1970-01-01 00:00:00' ORDER BY `a`, `b`, `c` ASC LIMIT 5",
},
{
result(d(1, "x", []byte{0x20}), 4), 5,
"SELECT LOW_PRIORITY `a`, `b`, `c` FROM `test2`.`t2` WHERE `a` = 1 AND `b` > 'x' AND `a` < 100 AND `time` < '1970-01-01 00:00:00' ORDER BY `a`, `b`, `c` ASC LIMIT 5",
},
{
result(d(1, "y", []byte{0x0a}), 4), 5,
"SELECT LOW_PRIORITY `a`, `b`, `c` FROM `test2`.`t2` WHERE `a` > 1 AND `a` < 100 AND `time` < '1970-01-01 00:00:00' ORDER BY `a`, `b`, `c` ASC LIMIT 5",
},
},
},
{
tbl: t2,
expire: time.UnixMilli(0).In(time.UTC),
rangeStart: d(1, "x"),
rangeEnd: d(100, "z"),
path: [][]interface{}{
{
nil, 5,
"SELECT LOW_PRIORITY `a`, `b`, `c` FROM `test2`.`t2` WHERE `a` = 1 AND `b` >= 'x' AND (`a`, `b`) < (100, 'z') AND `time` < '1970-01-01 00:00:00' ORDER BY `a`, `b`, `c` ASC LIMIT 5",
},
{
result(d(1, "x", []byte{0x1a}), 5), 5,
"SELECT LOW_PRIORITY `a`, `b`, `c` FROM `test2`.`t2` WHERE `a` = 1 AND `b` = 'x' AND `c` > x'1a' AND (`a`, `b`) < (100, 'z') AND `time` < '1970-01-01 00:00:00' ORDER BY `a`, `b`, `c` ASC LIMIT 5",
},
{
result(d(1, "x", []byte{0x20}), 4), 5,
"SELECT LOW_PRIORITY `a`, `b`, `c` FROM `test2`.`t2` WHERE `a` = 1 AND `b` > 'x' AND (`a`, `b`) < (100, 'z') AND `time` < '1970-01-01 00:00:00' ORDER BY `a`, `b`, `c` ASC LIMIT 5",
},
{
result(d(1, "y", []byte{0x0a}), 4), 5,
"SELECT LOW_PRIORITY `a`, `b`, `c` FROM `test2`.`t2` WHERE `a` > 1 AND (`a`, `b`) < (100, 'z') AND `time` < '1970-01-01 00:00:00' ORDER BY `a`, `b`, `c` ASC LIMIT 5",
},
},
},
}

for i, c := range cases {
Expand Down

0 comments on commit adfe96a

Please sign in to comment.