Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ddl: change drop partition and truncate partition's job args to support multi partition id array (#18419) #18930

Merged
merged 1 commit into from
Aug 3, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions ddl/ddl_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -2564,14 +2564,15 @@ func (d *ddl) TruncateTablePartition(ctx sessionctx.Context, ident ast.Ident, sp
if err != nil {
return errors.Trace(err)
}
pids := []int64{pid}

job := &model.Job{
SchemaID: schema.ID,
TableID: meta.ID,
SchemaName: schema.Name.L,
Type: model.ActionTruncateTablePartition,
BinlogInfo: &model.HistoryInfo{},
Args: []interface{}{pid},
Args: []interface{}{pids},
}

err = d.doDDLJob(ctx, job)
Expand Down Expand Up @@ -2603,7 +2604,8 @@ func (d *ddl) DropTablePartition(ctx sessionctx.Context, ident ast.Ident, spec *
}

partName := spec.PartitionNames[0].L
err = checkDropTablePartition(meta, partName)
partNames := []string{partName}
err = checkDropTablePartition(meta, partNames)
if err != nil {
if ErrDropPartitionNonExistent.Equal(err) && spec.IfExists {
ctx.GetSessionVars().StmtCtx.AppendNote(err)
Expand All @@ -2618,7 +2620,7 @@ func (d *ddl) DropTablePartition(ctx sessionctx.Context, ident ast.Ident, spec *
SchemaName: schema.Name.L,
Type: model.ActionDropTablePartition,
BinlogInfo: &model.HistoryInfo{},
Args: []interface{}{partName},
Args: []interface{}{partNames},
}

err = d.doDDLJob(ctx, job)
Expand Down
2 changes: 1 addition & 1 deletion ddl/ddl_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -792,7 +792,7 @@ func (s *testDDLSuite) TestCancelJob(c *C) {

// test truncate table partition failed caused by canceled.
test = &tests[24]
truncateTblPartitionArgs := []interface{}{partitionTblInfo.Partition.Definitions[0].ID}
truncateTblPartitionArgs := []interface{}{[]int64{partitionTblInfo.Partition.Definitions[0].ID}}
doDDLJobErrWithSchemaState(ctx, d, c, dbInfo.ID, partitionTblInfo.ID, test.act, truncateTblPartitionArgs, &test.cancelState)
c.Check(checkErr, IsNil)
changedTable = testGetTable(c, d, dbInfo.ID, partitionTblInfo.ID)
Expand Down
14 changes: 9 additions & 5 deletions ddl/delete_range.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,13 +289,17 @@ func insertJobIntoDeleteRangeTable(ctx sessionctx.Context, job *model.Job) error
endKey := tablecodec.EncodeTablePrefix(tableID + 1)
return doInsert(s, job.ID, tableID, startKey, endKey, now)
case model.ActionDropTablePartition, model.ActionTruncateTablePartition:
var physicalTableID int64
if err := job.DecodeArgs(&physicalTableID); err != nil {
var physicalTableIDs []int64
if err := job.DecodeArgs(&physicalTableIDs); err != nil {
return errors.Trace(err)
}
startKey := tablecodec.EncodeTablePrefix(physicalTableID)
endKey := tablecodec.EncodeTablePrefix(physicalTableID + 1)
return doInsert(s, job.ID, physicalTableID, startKey, endKey, now)
for _, physicalTableID := range physicalTableIDs {
startKey := tablecodec.EncodeTablePrefix(physicalTableID)
endKey := tablecodec.EncodeTablePrefix(physicalTableID + 1)
if err := doInsert(s, job.ID, physicalTableID, startKey, endKey, now); err != nil {
return errors.Trace(err)
}
}
// ActionAddIndex, ActionAddPrimaryKey needs do it, because it needs to be rolled back when it's canceled.
case model.ActionAddIndex, model.ActionAddPrimaryKey:
tableID := job.TableID
Expand Down
99 changes: 57 additions & 42 deletions ddl/partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -562,44 +562,54 @@ func validRangePartitionType(col *model.ColumnInfo) bool {
}

// checkDropTablePartition checks if the partition exists and does not allow deleting the last existing partition in the table.
func checkDropTablePartition(meta *model.TableInfo, partName string) error {
func checkDropTablePartition(meta *model.TableInfo, partLowerNames []string) error {
pi := meta.Partition
if pi.Type != model.PartitionTypeRange && pi.Type != model.PartitionTypeList {
return errOnlyOnRangeListPartition.GenWithStackByArgs("DROP")
}
oldDefs := pi.Definitions
for _, def := range oldDefs {
if strings.EqualFold(def.Name.L, strings.ToLower(partName)) {
if len(oldDefs) == 1 {
return errors.Trace(ErrDropLastPartition)
for _, pn := range partLowerNames {
found := false
for _, def := range oldDefs {
if def.Name.L == pn {
found = true
break
}
return nil
}
if !found {
return errors.Trace(ErrDropPartitionNonExistent.GenWithStackByArgs(pn))
}
}
if len(oldDefs) == len(partLowerNames) {
return errors.Trace(ErrDropLastPartition)
}
return errors.Trace(ErrDropPartitionNonExistent.GenWithStackByArgs(partName))
return nil
}

// removePartitionInfo each ddl job deletes a partition.
func removePartitionInfo(tblInfo *model.TableInfo, partName string) int64 {
func removePartitionInfo(tblInfo *model.TableInfo, partLowerNames []string) []int64 {
oldDefs := tblInfo.Partition.Definitions
newDefs := make([]model.PartitionDefinition, 0, len(oldDefs)-1)
var pid int64
for i := 0; i < len(oldDefs); i++ {
if !strings.EqualFold(oldDefs[i].Name.L, strings.ToLower(partName)) {
continue
var pids []int64
for _, partName := range partLowerNames {
for i := 0; i < len(oldDefs); i++ {
if oldDefs[i].Name.L != partName {
continue
}
pids = append(pids, oldDefs[i].ID)
newDefs = append(oldDefs[:i], oldDefs[i+1:]...)
break
}
pid = oldDefs[i].ID
newDefs = append(oldDefs[:i], oldDefs[i+1:]...)
break
}

tblInfo.Partition.Definitions = newDefs
return pid
return pids
}

// onDropTablePartition deletes old partition meta.
func onDropTablePartition(t *meta.Meta, job *model.Job) (ver int64, _ error) {
var partName string
if err := job.DecodeArgs(&partName); err != nil {
var partNames []string
if err := job.DecodeArgs(&partNames); err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}
Expand All @@ -608,12 +618,12 @@ func onDropTablePartition(t *meta.Meta, job *model.Job) (ver int64, _ error) {
return ver, errors.Trace(err)
}
// If an error occurs, it returns that it cannot delete all partitions or that the partition doesn't exist.
err = checkDropTablePartition(tblInfo, partName)
err = checkDropTablePartition(tblInfo, partNames)
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}
physicalTableID := removePartitionInfo(tblInfo, partName)
physicalTableIDs := removePartitionInfo(tblInfo, partNames)
ver, err = updateVersionAndTableInfo(t, job, tblInfo, true)
if err != nil {
return ver, errors.Trace(err)
Expand All @@ -622,15 +632,15 @@ func onDropTablePartition(t *meta.Meta, job *model.Job) (ver int64, _ error) {
// Finish this job.
job.FinishTableJob(model.JobStateDone, model.StateNone, ver, tblInfo)
// A background job will be created to delete old partition data.
job.Args = []interface{}{physicalTableID}
job.Args = []interface{}{physicalTableIDs}
return ver, nil
}

// onDropTablePartition truncates old partition meta.
func onTruncateTablePartition(d *ddlCtx, t *meta.Meta, job *model.Job) (int64, error) {
var ver int64
var oldID int64
if err := job.DecodeArgs(&oldID); err != nil {
var oldIDs []int64
if err := job.DecodeArgs(&oldIDs); err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}
Expand All @@ -643,33 +653,38 @@ func onTruncateTablePartition(d *ddlCtx, t *meta.Meta, job *model.Job) (int64, e
return ver, errors.Trace(ErrPartitionMgmtOnNonpartitioned)
}

var newPartition *model.PartitionDefinition
for i := 0; i < len(pi.Definitions); i++ {
def := &pi.Definitions[i]
if def.ID == oldID {
pid, err1 := t.GenGlobalID()
if err != nil {
return ver, errors.Trace(err1)
newPartitions := make([]model.PartitionDefinition, 0, len(oldIDs))
for _, oldID := range oldIDs {
for i := 0; i < len(pi.Definitions); i++ {
def := &pi.Definitions[i]
if def.ID == oldID {
pid, err1 := t.GenGlobalID()
if err != nil {
return ver, errors.Trace(err1)
}
def.ID = pid
// Shallow copy only use the def.ID in event handle.
newPartitions = append(newPartitions, *def)
break
}
def.ID = pid
newPartition = def
break
}
}
if newPartition == nil {
if len(newPartitions) == 0 {
return ver, table.ErrUnknownPartition.GenWithStackByArgs("drop?", tblInfo.Name.O)
}

// Clear the tiflash replica available status.
if tblInfo.TiFlashReplica != nil {
tblInfo.TiFlashReplica.Available = false
// Set partition replica become unavailable.
for i, id := range tblInfo.TiFlashReplica.AvailablePartitionIDs {
if id == oldID {
newIDs := tblInfo.TiFlashReplica.AvailablePartitionIDs[:i]
newIDs = append(newIDs, tblInfo.TiFlashReplica.AvailablePartitionIDs[i+1:]...)
tblInfo.TiFlashReplica.AvailablePartitionIDs = newIDs
break
for _, oldID := range oldIDs {
for i, id := range tblInfo.TiFlashReplica.AvailablePartitionIDs {
if id == oldID {
newIDs := tblInfo.TiFlashReplica.AvailablePartitionIDs[:i]
newIDs = append(newIDs, tblInfo.TiFlashReplica.AvailablePartitionIDs[i+1:]...)
tblInfo.TiFlashReplica.AvailablePartitionIDs = newIDs
break
}
}
}
}
Expand All @@ -681,9 +696,9 @@ func onTruncateTablePartition(d *ddlCtx, t *meta.Meta, job *model.Job) (int64, e

// Finish this job.
job.FinishTableJob(model.JobStateDone, model.StateNone, ver, tblInfo)
asyncNotifyEvent(d, &util.Event{Tp: model.ActionTruncateTablePartition, TableInfo: tblInfo, PartInfo: &model.PartitionInfo{Definitions: []model.PartitionDefinition{*newPartition}}})
asyncNotifyEvent(d, &util.Event{Tp: model.ActionTruncateTablePartition, TableInfo: tblInfo, PartInfo: &model.PartitionInfo{Definitions: newPartitions}})
// A background job will be created to delete old partition data.
job.Args = []interface{}{oldID}
job.Args = []interface{}{oldIDs}
return ver, nil
}

Expand Down
154 changes: 154 additions & 0 deletions ddl/partition_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
// Copyright 2020 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package ddl

import (
"context"

. "github.com/pingcap/check"
"github.com/pingcap/parser/model"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/types"
)

var _ = Suite(&testPartitionSuite{})

type testPartitionSuite struct {
store kv.Storage
}

func (s *testPartitionSuite) SetUpSuite(c *C) {
s.store = testCreateStore(c, "test_store")
}

func (s *testPartitionSuite) TearDownSuite(c *C) {
err := s.store.Close()
c.Assert(err, IsNil)
}

func (s *testPartitionSuite) TestDropAndTruncatePartition(c *C) {
d := testNewDDLAndStart(
context.Background(),
c,
WithStore(s.store),
WithLease(testLease),
)
defer d.Stop()
dbInfo := testSchemaInfo(c, d, "test_partition")
testCreateSchema(c, testNewContext(d), d, dbInfo)
// generate 5 partition in tableInfo.
tblInfo, partIDs := buildTableInfoWithPartition(c, d)
ctx := testNewContext(d)
testCreateTable(c, ctx, d, dbInfo, tblInfo)

testDropPartition(c, ctx, d, dbInfo, tblInfo, []string{"p0", "p1"})

testTruncatePartition(c, ctx, d, dbInfo, tblInfo, []int64{partIDs[3], partIDs[4]})
}

func buildTableInfoWithPartition(c *C, d *ddl) (*model.TableInfo, []int64) {
tbl := &model.TableInfo{
Name: model.NewCIStr("t"),
}
col := &model.ColumnInfo{
Name: model.NewCIStr("c"),
Offset: 1,
State: model.StatePublic,
FieldType: *types.NewFieldType(mysql.TypeLong),
ID: allocateColumnID(tbl),
}
genIDs, err := d.genGlobalIDs(1)
c.Assert(err, IsNil)
tbl.ID = genIDs[0]
tbl.Columns = []*model.ColumnInfo{col}
tbl.Charset = "utf8"
tbl.Collate = "utf8_bin"

partIDs, err := d.genGlobalIDs(5)
c.Assert(err, IsNil)
partInfo := &model.PartitionInfo{
Type: model.PartitionTypeRange,
Expr: tbl.Columns[0].Name.L,
Enable: true,
Definitions: []model.PartitionDefinition{
{
ID: partIDs[0],
Name: model.NewCIStr("p0"),
LessThan: []string{"100"},
},
{
ID: partIDs[1],
Name: model.NewCIStr("p1"),
LessThan: []string{"200"},
},
{
ID: partIDs[2],
Name: model.NewCIStr("p2"),
LessThan: []string{"300"},
},
{
ID: partIDs[3],
Name: model.NewCIStr("p3"),
LessThan: []string{"400"},
},
{
ID: partIDs[4],
Name: model.NewCIStr("p4"),
LessThan: []string{"500"},
},
},
}
tbl.Partition = partInfo
return tbl, partIDs
}

func buildDropPartitionJob(dbInfo *model.DBInfo, tblInfo *model.TableInfo, partNames []string) *model.Job {
return &model.Job{
SchemaID: dbInfo.ID,
TableID: tblInfo.ID,
Type: model.ActionDropTablePartition,
BinlogInfo: &model.HistoryInfo{},
Args: []interface{}{partNames},
}
}

func testDropPartition(c *C, ctx sessionctx.Context, d *ddl, dbInfo *model.DBInfo, tblInfo *model.TableInfo, partNames []string) *model.Job {
job := buildDropPartitionJob(dbInfo, tblInfo, partNames)
err := d.doDDLJob(ctx, job)
c.Assert(err, IsNil)
v := getSchemaVer(c, ctx)
checkHistoryJobArgs(c, ctx, job.ID, &historyJobArgs{ver: v, tbl: tblInfo})
return job
}

func buildTruncatePartitionJob(dbInfo *model.DBInfo, tblInfo *model.TableInfo, pids []int64) *model.Job {
return &model.Job{
SchemaID: dbInfo.ID,
TableID: tblInfo.ID,
Type: model.ActionTruncateTablePartition,
BinlogInfo: &model.HistoryInfo{},
Args: []interface{}{pids},
}
}

func testTruncatePartition(c *C, ctx sessionctx.Context, d *ddl, dbInfo *model.DBInfo, tblInfo *model.TableInfo, pids []int64) *model.Job {
job := buildTruncatePartitionJob(dbInfo, tblInfo, pids)
err := d.doDDLJob(ctx, job)
c.Assert(err, IsNil)
v := getSchemaVer(c, ctx)
checkHistoryJobArgs(c, ctx, job.ID, &historyJobArgs{ver: v, tbl: tblInfo})
return job
}