Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: Drop partition DDL handling for overlapping partitions during State Changes | tidb-test=pr/2402 #56082

Merged
merged 40 commits into from
Oct 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
b8a37f2
Test to show DROP PARTITION anomaly and PK violation
mjonss Sep 5, 2024
7421840
minor code move of rollback.
mjonss Sep 7, 2024
948c153
Merge remote-tracking branch 'pingcap/master' into drop-partition-ano…
mjonss Sep 12, 2024
e722642
Merge branch 'drop-partition-anomaly' into drop-partition-no-global-i…
mjonss Sep 12, 2024
c22283d
WIP, need to fix BatchPointGet, otherwise seems to work.
mjonss Sep 13, 2024
095d65e
WIP, handling DROP PARTITION also for point get as well as list default
mjonss Sep 15, 2024
17190c8
Update for LIST COLUMNS during Drop Partition
mjonss Sep 15, 2024
a398928
Works for BatchPointGet too now
mjonss Sep 15, 2024
087d96b
linting + bazel_prepare
mjonss Sep 15, 2024
1898533
Added DDLAction also for reorganize partition
mjonss Sep 15, 2024
750e703
minor code improvement during linting
mjonss Sep 16, 2024
cff8ff0
Missed to add readable partitions back for full range partition pruni…
mjonss Sep 16, 2024
0f98a56
order of cleanup important
mjonss Sep 16, 2024
cda02eb
Return FullRange instead of array of each index from partition pruning
mjonss Sep 16, 2024
74f2f52
Test results changes to have 'all' if all partitions is included, ins…
mjonss Sep 16, 2024
e4461f2
Updated test results
mjonss Sep 16, 2024
d01ea7d
Added tests for LIST [COLUMNS] with Default partition.
mjonss Sep 17, 2024
2792c6c
Merge remote-tracking branch 'pingcap/master' into drop-partition-no-…
mjonss Sep 17, 2024
967d8c3
Removed SkipIfFailpointDisabled testkit function
mjonss Sep 19, 2024
254bde1
Replaced small function with direct check instead.
mjonss Sep 19, 2024
ec30ba6
Added ReplaceWithOverlappingPartitionIdx
mjonss Sep 19, 2024
fc6692c
Missed one place of checkAddListPartitions to replace
mjonss Sep 19, 2024
9f4a3bc
Updated function comment
mjonss Sep 19, 2024
e835791
Merge remote-tracking branch 'pingcap/master' into drop-partition-no-…
mjonss Sep 19, 2024
dcb9b96
Merge remote-tracking branch 'pingcap/master' into drop-partition-no-…
mjonss Sep 20, 2024
733e7c4
Merge remote-tracking branch 'pingcap/master' into drop-partition-no-…
mjonss Sep 23, 2024
f0276da
Simplified code for overlapping dropping range partition
mjonss Sep 24, 2024
37c31be
Updated function comments about Overlapping Dropping partition
mjonss Sep 25, 2024
0e7b911
Minor refactoring according to review comments.
mjonss Sep 25, 2024
132f6fc
Merge remote-tracking branch 'pingcap/master' into drop-partition-no-…
mjonss Sep 25, 2024
585267d
reusing pi.IsDropping where reviewer asked for it :)
mjonss Sep 25, 2024
288b4f8
Merge remote-tracking branch 'pingcap/master' into drop-partition-no-…
mjonss Sep 25, 2024
619837e
Merge remote-tracking branch 'pingcap/master' into drop-partition-no-…
mjonss Sep 25, 2024
ae23d01
Merge remote-tracking branch 'pingcap/master' into drop-partition-no-…
mjonss Sep 26, 2024
f3eeaad
Merge remote-tracking branch 'pingcap/master' into drop-partition-no-…
mjonss Sep 27, 2024
ea5ec86
Changed order of cleaning partition info states
mjonss Sep 27, 2024
0267517
Merge remote-tracking branch 'pingcap/master' into drop-partition-no-…
mjonss Sep 28, 2024
d3b1351
Merge remote-tracking branch 'pingcap/master' into drop-partition-no-…
mjonss Sep 29, 2024
bafa836
Updated function comment.
mjonss Oct 9, 2024
6ecceb5
Merge remote-tracking branch 'pingcap/master' into drop-partition-no-…
mjonss Oct 9, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions pkg/ddl/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -2282,9 +2282,8 @@ func (e *executor) AddTablePartitions(ctx sessionctx.Context, ident ast.Ident, s
}
if pi.Type == pmodel.PartitionTypeList {
// TODO: make sure that checks in ddl_api and ddl_worker is the same.
err = checkAddListPartitions(meta)
if err != nil {
return errors.Trace(err)
if meta.Partition.GetDefaultListPartition() != -1 {
return dbterror.ErrGeneralUnsupportedDDL.GenWithStackByArgs("ADD List partition, already contains DEFAULT partition. Please use REORGANIZE PARTITION instead")
}
}

Expand Down
199 changes: 124 additions & 75 deletions pkg/ddl/partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func checkAddPartition(t *meta.Mutator, job *model.Job) (*model.TableInfo, *mode
func (w *worker) onAddTablePartition(jobCtx *jobContext, job *model.Job) (ver int64, _ error) {
// Handle the rolling back job
if job.IsRollingback() {
ver, err := w.onDropTablePartition(jobCtx, job)
ver, err := w.rollbackLikeDropPartition(jobCtx, job)
Benjamin2037 marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return ver, errors.Trace(err)
}
Expand Down Expand Up @@ -344,20 +344,6 @@ func rollbackAddingPartitionInfo(tblInfo *model.TableInfo) ([]int64, []string, [
return physicalTableIDs, partNames, rollbackBundles
}

// Check if current table already contains DEFAULT list partition
func checkAddListPartitions(tblInfo *model.TableInfo) error {
for i := range tblInfo.Partition.Definitions {
for j := range tblInfo.Partition.Definitions[i].InValues {
for _, val := range tblInfo.Partition.Definitions[i].InValues[j] {
if val == "DEFAULT" { // should already be normalized
return dbterror.ErrGeneralUnsupportedDDL.GenWithStackByArgs("ADD List partition, already contains DEFAULT partition. Please use REORGANIZE PARTITION instead")
}
}
}
}
return nil
}

// checkAddPartitionValue check add Partition Values,
// For Range: values less than value must be strictly increasing for each partition.
// For List: if a Default partition exists,
Expand Down Expand Up @@ -398,9 +384,8 @@ func checkAddPartitionValue(meta *model.TableInfo, part *model.PartitionInfo) er
}
}
case pmodel.PartitionTypeList:
err := checkAddListPartitions(meta)
if err != nil {
return err
if meta.Partition.GetDefaultListPartition() != -1 {
return dbterror.ErrGeneralUnsupportedDDL.GenWithStackByArgs("ADD List partition, already contains DEFAULT partition. Please use REORGANIZE PARTITION instead")
}
}
return nil
Expand Down Expand Up @@ -2144,75 +2129,118 @@ func dropLabelRules(ctx context.Context, schemaName, tableName string, partNames
return infosync.UpdateLabelRules(ctx, patch)
}

// onDropTablePartition deletes old partition meta.
func (w *worker) onDropTablePartition(jobCtx *jobContext, job *model.Job) (ver int64, _ error) {
// rollbackLikeDropPartition does rollback for Reorganize partition and Add partition.
// It will drop newly created partitions that has not yet been used, including cleaning
// up label rules and bundles as well as changed indexes due to global flag.
func (w *worker) rollbackLikeDropPartition(jobCtx *jobContext, job *model.Job) (ver int64, _ error) {
mjonss marked this conversation as resolved.
Show resolved Hide resolved
args, err := model.GetTablePartitionArgs(job)
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}
partNames, partInfo := args.PartNames, args.PartInfo
partInfo := args.PartInfo
metaMut := jobCtx.metaMut
tblInfo, err := GetTableInfoAndCancelFaultJob(metaMut, job, job.SchemaID)
if err != nil {
return ver, errors.Trace(err)
}
if job.Type != model.ActionDropTablePartition {
// If rollback from reorganize partition, remove DroppingDefinitions from tableInfo
tblInfo.Partition.DroppingDefinitions = nil
// If rollback from adding table partition, remove addingDefinitions from tableInfo.
physicalTableIDs, pNames, rollbackBundles := rollbackAddingPartitionInfo(tblInfo)
err = infosync.PutRuleBundlesWithDefaultRetry(context.TODO(), rollbackBundles)
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Wrapf(err, "failed to notify PD the placement rules")
}
// TODO: Will this drop LabelRules for existing partitions, if the new partitions have the same name?
err = dropLabelRules(w.ctx, job.SchemaName, tblInfo.Name.L, pNames)
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Wrapf(err, "failed to notify PD the label rules")
}
tblInfo.Partition.DroppingDefinitions = nil
physicalTableIDs, pNames, rollbackBundles := rollbackAddingPartitionInfo(tblInfo)
err = infosync.PutRuleBundlesWithDefaultRetry(context.TODO(), rollbackBundles)
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Wrapf(err, "failed to notify PD the placement rules")
}
// TODO: Will this drop LabelRules for existing partitions, if the new partitions have the same name?
err = dropLabelRules(w.ctx, job.SchemaName, tblInfo.Name.L, pNames)
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Wrapf(err, "failed to notify PD the label rules")
}

if _, err := alterTableLabelRule(job.SchemaName, tblInfo, getIDs([]*model.TableInfo{tblInfo})); err != nil {
job.State = model.JobStateCancelled
return ver, err
}
if job.Type == model.ActionAlterTablePartitioning {
// ALTER TABLE t PARTITION BY ... creates an additional
// Table ID
// Note, for REMOVE PARTITIONING, it is the same
// as for the single partition, to be changed to table.
physicalTableIDs = append(physicalTableIDs, partInfo.NewTableID)
if _, err := alterTableLabelRule(job.SchemaName, tblInfo, getIDs([]*model.TableInfo{tblInfo})); err != nil {
job.State = model.JobStateCancelled
return ver, err
}
if partInfo.Type != pmodel.PartitionTypeNone {
// ALTER TABLE ... PARTITION BY
// Also remove anything with the new table id
physicalTableIDs = append(physicalTableIDs, partInfo.NewTableID)
// Reset if it was normal table before
if tblInfo.Partition.Type == pmodel.PartitionTypeNone ||
tblInfo.Partition.DDLType == pmodel.PartitionTypeNone {
tblInfo.Partition = nil
}
}

var dropIndices []*model.IndexInfo
for _, indexInfo := range tblInfo.Indices {
if indexInfo.Unique &&
indexInfo.State == model.StateDeleteReorganization &&
tblInfo.Partition.DDLState == model.StateDeleteReorganization {
dropIndices = append(dropIndices, indexInfo)
}
}
for _, indexInfo := range dropIndices {
DropIndexColumnFlag(tblInfo, indexInfo)
RemoveDependentHiddenColumns(tblInfo, indexInfo)
removeIndexInfo(tblInfo, indexInfo)
var dropIndices []*model.IndexInfo
for _, indexInfo := range tblInfo.Indices {
if indexInfo.Unique &&
indexInfo.State == model.StateDeleteReorganization &&
tblInfo.Partition.DDLState == model.StateDeleteReorganization {
dropIndices = append(dropIndices, indexInfo)
}
}
for _, indexInfo := range dropIndices {
DropIndexColumnFlag(tblInfo, indexInfo)
RemoveDependentHiddenColumns(tblInfo, indexInfo)
removeIndexInfo(tblInfo, indexInfo)
}
if tblInfo.Partition != nil {
tblInfo.Partition.ClearReorgIntermediateInfo()
}

if tblInfo.Partition.Type == pmodel.PartitionTypeNone {
tblInfo.Partition = nil
} else {
tblInfo.Partition.ClearReorgIntermediateInfo()
}
ver, err = updateVersionAndTableInfo(jobCtx, job, tblInfo, true)
if err != nil {
return ver, errors.Trace(err)
}
job.FinishTableJob(model.JobStateRollbackDone, model.StateNone, ver, tblInfo)
args.OldPhysicalTblIDs = physicalTableIDs
job.FillFinishedArgs(args)
return ver, nil
ver, err = updateVersionAndTableInfo(jobCtx, job, tblInfo, true)
if err != nil {
return ver, errors.Trace(err)
}
job.FinishTableJob(model.JobStateRollbackDone, model.StateNone, ver, tblInfo)
args.OldPhysicalTblIDs = physicalTableIDs
job.FillFinishedArgs(args)
return ver, nil
}

// onDropTablePartition deletes old partition meta.
// States:
// StateNone
//
// Old partitions are queued to be deleted (delete_range), global index up-to-date
//
// StateDeleteReorganization
//
// Old partitions are not accessible/used by any sessions.
// Inserts/updates of global index which still have entries pointing to old partitions
// will overwrite those entries
// In the background we are reading all old partitions and deleting their entries from
// the global indexes.
//
// StateDeleteOnly
//
// old partitions are no longer visible, but if there is inserts/updates to the global indexes,
// duplicate key errors will be given, even if the entries are from dropped partitions
// Note that overlapping ranges (i.e. a dropped partitions with 'less than (N)' will now .. ?!?
//
// StateWriteOnly
//
// old partitions are blocked for read and write. But for read we are allowing
// "overlapping" partition to be read instead. Which means that write can only
// happen in the 'overlapping' partitions original range, not into the extended
// range open by the dropped partitions.
//
// StatePublic
//
// Original state, unaware of DDL
func (w *worker) onDropTablePartition(jobCtx *jobContext, job *model.Job) (ver int64, _ error) {
args, err := model.GetTablePartitionArgs(job)
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}
partNames := args.PartNames
metaMut := jobCtx.metaMut
tblInfo, err := GetTableInfoAndCancelFaultJob(metaMut, job, job.SchemaID)
if err != nil {
return ver, errors.Trace(err)
}

var physicalTableIDs []int64
Expand All @@ -2221,15 +2249,30 @@ func (w *worker) onDropTablePartition(jobCtx *jobContext, job *model.Job) (ver i
originalState := job.SchemaState
switch job.SchemaState {
case model.StatePublic:
// If an error occurs, it returns that it cannot delete all partitions or that the partition doesn't exist.
// Here we mark the partitions to be dropped, so they are not read or written
err = CheckDropTablePartition(tblInfo, partNames)
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}
// Reason, see https://github.com/pingcap/tidb/issues/55888
// Only mark the partitions as to be dropped, so they are not used, but not yet removed.
originalDefs := tblInfo.Partition.Definitions
physicalTableIDs = updateDroppingPartitionInfo(tblInfo, partNames)
tblInfo.Partition.Definitions = originalDefs
tblInfo.Partition.DDLState = model.StateWriteOnly
tblInfo.Partition.DDLAction = model.ActionDropTablePartition

job.SchemaState = model.StateWriteOnly
ver, err = updateVersionAndTableInfo(jobCtx, job, tblInfo, originalState != job.SchemaState)
case model.StateWriteOnly:
// Since the previous state do not use the dropping partitions,
// we can now actually remove them, allowing to write into the overlapping range
// of the higher range partition or LIST default partition.
physicalTableIDs = updateDroppingPartitionInfo(tblInfo, partNames)
err = dropLabelRules(w.ctx, job.SchemaName, tblInfo.Name.L, partNames)
if err != nil {
// TODO: Add failpoint error/cancel injection and test failure/rollback and cancellation!
job.State = model.JobStateCancelled
return ver, errors.Wrapf(err, "failed to notify PD the label rules")
}
Expand Down Expand Up @@ -2265,12 +2308,14 @@ func (w *worker) onDropTablePartition(jobCtx *jobContext, job *model.Job) (ver i
return ver, err
}

tblInfo.Partition.DDLState = model.StateDeleteOnly
job.SchemaState = model.StateDeleteOnly
ver, err = updateVersionAndTableInfo(jobCtx, job, tblInfo, originalState != job.SchemaState)
case model.StateDeleteOnly:
// This state is not a real 'DeleteOnly' state, because tidb does not maintaining the state check in partitionDefinition.
// This state is not a real 'DeleteOnly' state, because tidb does not maintain the state check in partitionDefinition.
// Insert this state to confirm all servers can not see the old partitions when reorg is running,
// so that no new data will be inserted into old partitions when reorganizing.
tblInfo.Partition.DDLState = model.StateDeleteReorganization
job.SchemaState = model.StateDeleteReorganization
ver, err = updateVersionAndTableInfo(jobCtx, job, tblInfo, originalState != job.SchemaState)
case model.StateDeleteReorganization:
Expand Down Expand Up @@ -2330,6 +2375,8 @@ func (w *worker) onDropTablePartition(jobCtx *jobContext, job *model.Job) (ver i
}
droppedDefs := tblInfo.Partition.DroppingDefinitions
tblInfo.Partition.DroppingDefinitions = nil
tblInfo.Partition.DDLState = model.StateNone
tblInfo.Partition.DDLAction = model.ActionNone
// used by ApplyDiff in updateSchemaVersion
job.CtxVars = []any{physicalTableIDs} // TODO remove it.
ver, err = updateVersionAndTableInfo(jobCtx, job, tblInfo, true)
Expand Down Expand Up @@ -2464,6 +2511,7 @@ func (w *worker) onTruncateTablePartition(jobCtx *jobContext, job *model.Job) (i
pi.DroppingDefinitions = truncatingDefinitions
pi.NewPartitionIDs = newIDs[:]

tblInfo.Partition.DDLAction = model.ActionTruncateTablePartition
job.SchemaState = model.StateDeleteOnly
ver, err = updateVersionAndTableInfo(jobCtx, job, tblInfo, true)
case model.StateDeleteOnly:
Expand Down Expand Up @@ -3072,7 +3120,7 @@ func getReorgPartitionInfo(t *meta.Mutator, job *model.Job) (*model.TableInfo, [
func (w *worker) onReorganizePartition(jobCtx *jobContext, job *model.Job) (ver int64, _ error) {
// Handle the rolling back job
if job.IsRollingback() {
ver, err := w.onDropTablePartition(jobCtx, job)
ver, err := w.rollbackLikeDropPartition(jobCtx, job)
if err != nil {
return ver, errors.Trace(err)
}
Expand Down Expand Up @@ -3277,6 +3325,7 @@ func (w *worker) onReorganizePartition(jobCtx *jobContext, job *model.Job) (ver
metrics.GetBackfillProgressByLabel(metrics.LblReorgPartition, job.SchemaName, tblInfo.Name.String()).Set(0.1 / float64(math.MaxUint64))
job.SchemaState = model.StateDeleteOnly
tblInfo.Partition.DDLState = model.StateDeleteOnly
tblInfo.Partition.DDLAction = job.Type
ver, err = updateVersionAndTableInfoWithCheck(jobCtx, job, tblInfo, true)
if err != nil {
return ver, errors.Trace(err)
Expand Down
18 changes: 3 additions & 15 deletions pkg/ddl/rollingback.go
Original file line number Diff line number Diff line change
Expand Up @@ -360,17 +360,12 @@ func convertAddTablePartitionJob2RollbackJob(jobCtx *jobContext, job *model.Job,
}
args.PartNames = partNames
model.FillRollbackArgsForAddPartition(job, args)
/*
_, err = job.Encode(true)
if err != nil {
return ver, errors.Trace(err)
}
*/
ver, err = updateVersionAndTableInfo(jobCtx, job, tblInfo, true)
if err != nil {
return ver, errors.Trace(err)
}
tblInfo.Partition.DDLState = model.StateNone
tblInfo.Partition.DDLAction = model.ActionNone
job.State = model.JobStateRollingback
return ver, errors.Trace(otherwiseErr)
}
Expand Down Expand Up @@ -427,7 +422,7 @@ func convertReorgPartitionJob2RollbackJob(jobCtx *jobContext, job *model.Job, ot
}
// We cannot drop the index here, we need to wait until
// the next schema version
// i.e. rollback in onDropTablePartition
// i.e. rollback in rollbackLikeDropPartition
// New index that became public in this state,
// mark it to be dropped in next schema version
if indexInfo.Global {
Expand Down Expand Up @@ -508,13 +503,6 @@ func convertReorgPartitionJob2RollbackJob(jobCtx *jobContext, job *model.Job, ot
}
args.PartNames = partNames
job.FillArgs(args)
/*
_, err = job.Encode(true)
if err != nil {
return ver, errors.Trace(err)
}
*/
ver, err = updateVersionAndTableInfo(jobCtx, job, tblInfo, true)
if err != nil {
return ver, errors.Trace(err)
Expand Down
2 changes: 2 additions & 0 deletions pkg/ddl/tests/partition/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ go_test(
srcs = [
"db_partition_test.go",
"main_test.go",
"multi_domain_test.go",
"placement_test.go",
"reorg_partition_test.go",
],
Expand Down Expand Up @@ -39,6 +40,7 @@ go_test(
"//pkg/types",
"//pkg/util/codec",
"//pkg/util/dbterror",
"//pkg/util/logutil",
"//pkg/util/mathutil",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_failpoint//:failpoint",
Expand Down
Loading