Skip to content

Commit

Permalink
ddl: support concurrent ddl (#32169)
Browse files Browse the repository at this point in the history
* 1. init ddl tables

create tidb_ddl_job, tidb_ddl_reorg, tidb_ddl_history tables with raw meta write, these 3 tables is use to replace the ddl job queue and reorg and history hash table

* 2. setup concurrent ddl env and add ddl worker pool

adds the ddl worker pool definition, the ddl job manager will find a job and ship it to a worker in the worker pool.
Also, this commit provides a sessionctx wrapper, only use in ddl relate. it just wraps begin, commit and execute

* 3. add ddl manager to handle ddl job

* 4. reorg handler for concurrent ddl

just implements the partner of the reorg information.

* 5. manage ddl jobs for concurrent ddl

add the partner of add job, delete job and many others related to history job
because many of the functions need a session now, we just change the caller

* 6. add metrics for concurrent ddl

add metrics

* 7. support multiple tables

* 8. fix test

* 9. migrate ddl between table and queue

support switch between the old and new ddl framework, migrate the existing ddl job between queue and table

* 10. check tikv version and set reorg worker count according cpu count

* *: add featuretag on tests

Signed-off-by: Weizhen Wang <wangweizhen@pingcap.com>

* use a determined table id for 3 tables

* remove ctx value

* add GetSchemaVersionWithNonEmptyDiff function

* address tangenta and zimulala comment

* use only one etcd path

* make ActionRenameTable support multi-schema

* reset sql digest to make top sql work correct

* add comment

* fix test

* remove 0 for schema version lock

Co-authored-by: xiongjiwei <xiongjiwei1996@outlook.com>
Co-authored-by: Ti Chi Robot <ti-community-prow-bot@tidb.io>
Co-authored-by: wjHuang <huangwenjun1997@gmail.com>
  • Loading branch information
4 people authored Jul 20, 2022
1 parent fbbae46 commit ab513bf
Show file tree
Hide file tree
Showing 69 changed files with 2,868 additions and 302 deletions.
3 changes: 3 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,9 @@ bazel_coverage_test: failpoint-enable bazel_ci_prepare
bazel --output_user_root=/home/jenkins/.tidb/tmp coverage --config=ci --@io_bazel_rules_go//go/config:cover_format=go_cover \
-- //... -//cmd/... -//tests/graceshutdown/... \
-//tests/globalkilltest/... -//tests/readonlytest/... -//br/pkg/task:task_test
bazel --output_user_root=/home/jenkins/.tidb/tmp coverage --config=ci --@io_bazel_rules_go//go/config:cover_format=go_cover --define gotags=featuretag \
-- //... -//cmd/... -//tests/graceshutdown/... \
-//tests/globalkilltest/... -//tests/readonlytest/... -//br/pkg/task:task_test

bazel_build: bazel_ci_prepare
mkdir -p bin
Expand Down
1 change: 1 addition & 0 deletions br/pkg/backup/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ go_library(
"//meta",
"//meta/autoid",
"//parser/model",
"//sessionctx",
"//statistics/handle",
"//util",
"//util/codec",
Expand Down
21 changes: 16 additions & 5 deletions br/pkg/backup/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"github.com/pingcap/tidb/meta"
"github.com/pingcap/tidb/meta/autoid"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/codec"
"github.com/pingcap/tidb/util/ranger"
Expand Down Expand Up @@ -472,21 +473,31 @@ func skipUnsupportedDDLJob(job *model.Job) bool {
}

// WriteBackupDDLJobs sends the ddl jobs are done in (lastBackupTS, backupTS] to metaWriter.
func WriteBackupDDLJobs(metaWriter *metautil.MetaWriter, store kv.Storage, lastBackupTS, backupTS uint64) error {
func WriteBackupDDLJobs(metaWriter *metautil.MetaWriter, se sessionctx.Context, store kv.Storage, lastBackupTS, backupTS uint64) error {
snapshot := store.GetSnapshot(kv.NewVersion(backupTS))
snapMeta := meta.NewSnapshotMeta(snapshot)
lastSnapshot := store.GetSnapshot(kv.NewVersion(lastBackupTS))
lastSnapMeta := meta.NewSnapshotMeta(lastSnapshot)
lastSchemaVersion, err := lastSnapMeta.GetSchemaVersion()
lastSchemaVersion, err := lastSnapMeta.GetSchemaVersionWithNonEmptyDiff()
if err != nil {
return errors.Trace(err)
}
allJobs, err := ddl.GetAllDDLJobs(snapMeta)
backupSchemaVersion, err := snapMeta.GetSchemaVersionWithNonEmptyDiff()
if err != nil {
return errors.Trace(err)
}

version, err := store.CurrentVersion(kv.GlobalTxnScope)
if err != nil {
return errors.Trace(err)
}
newestMeta := meta.NewSnapshotMeta(store.GetSnapshot(kv.NewVersion(version.Ver)))
allJobs, err := ddl.GetAllDDLJobs(se, newestMeta)
if err != nil {
return errors.Trace(err)
}
log.Debug("get all jobs", zap.Int("jobs", len(allJobs)))
historyJobs, err := ddl.GetAllHistoryDDLJobs(snapMeta)
historyJobs, err := ddl.GetAllHistoryDDLJobs(newestMeta)
if err != nil {
return errors.Trace(err)
}
Expand All @@ -500,7 +511,7 @@ func WriteBackupDDLJobs(metaWriter *metautil.MetaWriter, store kv.Storage, lastB
}

if (job.State == model.JobStateDone || job.State == model.JobStateSynced) &&
(job.BinlogInfo != nil && job.BinlogInfo.SchemaVersion > lastSchemaVersion) {
(job.BinlogInfo != nil && job.BinlogInfo.SchemaVersion > lastSchemaVersion && job.BinlogInfo.SchemaVersion <= backupSchemaVersion) {
if job.BinlogInfo.DBInfo != nil {
// ignore all placement policy info during incremental backup for now.
job.BinlogInfo.DBInfo.PlacementPolicyRef = nil
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/backup/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ func TestSkipUnsupportedDDLJob(t *testing.T) {
metaWriter := metautil.NewMetaWriter(s.storage, metautil.MetaFileSize, false, "", &cipher)
ctx := context.Background()
metaWriter.StartWriteMetasAsync(ctx, metautil.AppendDDL)
err = backup.WriteBackupDDLJobs(metaWriter, s.cluster.Storage, lastTS, ts)
err = backup.WriteBackupDDLJobs(metaWriter, tk.Session(), s.cluster.Storage, lastTS, ts)
require.NoErrorf(t, err, "Error get ddl jobs: %s", err)
err = metaWriter.FinishWriteMetas(ctx, metautil.AppendDDL)
require.NoError(t, err, "Flush failed", err)
Expand Down
4 changes: 2 additions & 2 deletions br/pkg/restore/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ func TestFilterDDLJobs(t *testing.T) {
metaWriter := metautil.NewMetaWriter(s.storage, metautil.MetaFileSize, false, "", &cipher)
ctx := context.Background()
metaWriter.StartWriteMetasAsync(ctx, metautil.AppendDDL)
err = backup.WriteBackupDDLJobs(metaWriter, s.mock.Storage, lastTS, ts)
err = backup.WriteBackupDDLJobs(metaWriter, tk.Session(), s.mock.Storage, lastTS, ts)
require.NoErrorf(t, err, "Error get ddl jobs: %s", err)
err = metaWriter.FinishWriteMetas(ctx, metautil.AppendDDL)
require.NoErrorf(t, err, "Flush failed", err)
Expand Down Expand Up @@ -258,7 +258,7 @@ func TestFilterDDLJobsV2(t *testing.T) {
metaWriter := metautil.NewMetaWriter(s.storage, metautil.MetaFileSize, true, "", &cipher)
ctx := context.Background()
metaWriter.StartWriteMetasAsync(ctx, metautil.AppendDDL)
err = backup.WriteBackupDDLJobs(metaWriter, s.mock.Storage, lastTS, ts)
err = backup.WriteBackupDDLJobs(metaWriter, tk.Session(), s.mock.Storage, lastTS, ts)
require.NoErrorf(t, err, "Error get ddl jobs: %s", err)
err = metaWriter.FinishWriteMetas(ctx, metautil.AppendDDL)
require.NoErrorf(t, err, "Flush failed", err)
Expand Down
1 change: 1 addition & 0 deletions br/pkg/task/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ go_library(
"//config",
"//kv",
"//parser/mysql",
"//sessionctx",
"//sessionctx/stmtctx",
"//sessionctx/variable",
"//statistics/handle",
Expand Down
3 changes: 2 additions & 1 deletion br/pkg/task/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/pingcap/tidb/br/pkg/summary"
"github.com/pingcap/tidb/br/pkg/utils"
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/stmtctx"
"github.com/pingcap/tidb/statistics/handle"
"github.com/pingcap/tidb/types"
Expand Down Expand Up @@ -399,7 +400,7 @@ func RunBackup(c context.Context, g glue.Glue, cmdName string, cfg *BackupConfig
}

metawriter.StartWriteMetasAsync(ctx, metautil.AppendDDL)
err = backup.WriteBackupDDLJobs(metawriter, mgr.GetStorage(), cfg.LastBackupTS, backupTS)
err = backup.WriteBackupDDLJobs(metawriter, se.(sessionctx.Context), mgr.GetStorage(), cfg.LastBackupTS, backupTS)
if err != nil {
return errors.Trace(err)
}
Expand Down
4 changes: 4 additions & 0 deletions ddl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,19 @@ go_library(
"backfilling.go",
"callback.go",
"column.go",
"constant.go",
"ddl.go",
"ddl_algorithm.go",
"ddl_api.go",
"ddl_tiflash_api.go",
"ddl_worker.go",
"ddl_workerpool.go",
"delete_range.go",
"delete_range_util.go",
"foreign_key.go",
"generated_column.go",
"index.go",
"job_table.go",
"mock.go",
"multi_schema_change.go",
"options.go",
Expand Down Expand Up @@ -144,6 +147,7 @@ go_test(
"index_change_test.go",
"index_modify_test.go",
"integration_test.go",
"job_table_test.go",
"main_test.go",
"modify_column_test.go",
"multi_schema_change_test.go",
Expand Down
2 changes: 1 addition & 1 deletion ddl/backfilling.go
Original file line number Diff line number Diff line change
Expand Up @@ -409,7 +409,7 @@ func (w *worker) handleReorgTasks(reorgInfo *reorgInfo, totalAddedCount *int64,

if err != nil {
// Update the reorg handle that has been processed.
err1 := reorgInfo.UpdateReorgMeta(nextKey)
err1 := reorgInfo.UpdateReorgMeta(nextKey, w.sessPool)
metrics.BatchAddIdxHistogram.WithLabelValues(metrics.LblError).Observe(elapsedTime.Seconds())
logutil.BgLogger().Warn("[ddl] backfill worker handle batch tasks failed",
zap.ByteString("elementType", reorgInfo.currElement.TypeKey),
Expand Down
14 changes: 14 additions & 0 deletions ddl/callback.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,10 @@ type Callback interface {
OnJobUpdated(job *model.Job)
// OnWatched is called after watching owner is completed.
OnWatched(ctx context.Context)
// OnGetJobBefore is called before getting job.
OnGetJobBefore(jobType string)
// OnGetJobAfter is called after getting job.
OnGetJobAfter(jobType string, job *model.Job)
}

// BaseCallback implements Callback.OnChanged interface.
Expand Down Expand Up @@ -86,6 +90,16 @@ func (*BaseCallback) OnWatched(ctx context.Context) {
// Nothing to do.
}

// OnGetJobBefore implements Callback.OnGetJobBefore interface.
func (c *BaseCallback) OnGetJobBefore(jobType string) {
// Nothing to do.
}

// OnGetJobAfter implements Callback.OnGetJobAfter interface.
func (c *BaseCallback) OnGetJobAfter(jobType string, job *model.Job) {
// Nothing to do.
}

// DomainReloader is used to avoid import loop.
type DomainReloader interface {
Reload() error
Expand Down
20 changes: 20 additions & 0 deletions ddl/callback_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ type TestDDLCallback struct {
onJobUpdated func(*model.Job)
OnJobUpdatedExported func(*model.Job)
onWatched func(ctx context.Context)
OnGetJobBeforeExported func(string)
OnGetJobAfterExported func(string, *model.Job)
}

// OnChanged mock the same behavior with the main DDL hook.
Expand Down Expand Up @@ -118,6 +120,24 @@ func (tc *TestDDLCallback) OnWatched(ctx context.Context) {
tc.BaseCallback.OnWatched(ctx)
}

// OnGetJobBefore implements Callback.OnGetJobBefore interface.
func (tc *TestDDLCallback) OnGetJobBefore(jobType string) {
if tc.OnGetJobBeforeExported != nil {
tc.OnGetJobBeforeExported(jobType)
return
}
tc.BaseCallback.OnGetJobBefore(jobType)
}

// OnGetJobAfter implements Callback.OnGetJobAfter interface.
func (tc *TestDDLCallback) OnGetJobAfter(jobType string, job *model.Job) {
if tc.OnGetJobAfterExported != nil {
tc.OnGetJobAfterExported(jobType, job)
return
}
tc.BaseCallback.OnGetJobAfter(jobType, job)
}

func TestCallback(t *testing.T) {
cb := &BaseCallback{}
require.Nil(t, cb.OnChanged(nil))
Expand Down
8 changes: 4 additions & 4 deletions ddl/column.go
Original file line number Diff line number Diff line change
Expand Up @@ -701,7 +701,7 @@ func (w *worker) doModifyColumnTypeWithData(
// Make sure job args change after `updateVersionAndTableInfoWithCheck`, otherwise, the job args will
// be updated in `updateDDLJob` even if it meets an error in `updateVersionAndTableInfoWithCheck`.
job.SchemaState = model.StateDeleteOnly
metrics.GetBackfillProgressByLabel(metrics.LblModifyColumn).Set(0)
metrics.GetBackfillProgressByLabel(metrics.LblModifyColumn, job.SchemaName, tblInfo.Name.String()).Set(0)
job.Args = append(job.Args, changingCol, changingIdxs, rmIdxIDs)
case model.StateDeleteOnly:
// Column from null to not null.
Expand Down Expand Up @@ -791,7 +791,7 @@ func doReorgWorkForModifyColumnMultiSchema(w *worker, d *ddlCtx, t *meta.Meta, j

func doReorgWorkForModifyColumn(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job, tbl table.Table,
oldCol, changingCol *model.ColumnInfo, changingIdxs []*model.IndexInfo) (done bool, ver int64, err error) {
rh := newReorgHandler(t)
rh := newReorgHandler(t, w.sess, w.concurrentDDL)
reorgInfo, err := getReorgInfo(d.jobContext(job), d, rh, job, tbl, BuildElements(changingCol, changingIdxs))
if err != nil || reorgInfo.first {
// If we run reorg firstly, we should update the job snapshot version
Expand Down Expand Up @@ -1062,7 +1062,7 @@ func (w *worker) updateColumnAndIndexes(t table.Table, oldCol, col *model.Column
// Update the element in the reorgInfo for updating the reorg meta below.
reorgInfo.currElement = reorgInfo.elements[i+1]
// Write the reorg info to store so the whole reorganize process can recover from panic.
err := reorgInfo.UpdateReorgMeta(reorgInfo.StartKey)
err := reorgInfo.UpdateReorgMeta(reorgInfo.StartKey, w.sessPool)
logutil.BgLogger().Info("[ddl] update column and indexes",
zap.Int64("jobID", reorgInfo.Job.ID),
zap.ByteString("elementType", reorgInfo.currElement.TypeKey),
Expand Down Expand Up @@ -1103,7 +1103,7 @@ func newUpdateColumnWorker(sessCtx sessionctx.Context, id int, t table.PhysicalT
backfillWorker: newBackfillWorker(sessCtx, id, t, reorgInfo),
oldColInfo: oldCol,
newColInfo: newCol,
metricCounter: metrics.BackfillTotalCounter.WithLabelValues("update_col_rate"),
metricCounter: metrics.BackfillTotalCounter.WithLabelValues(metrics.GenerateReorgLabel("update_col_rate", reorgInfo.SchemaName, t.Meta().Name.String())),
rowDecoder: rowDecoder,
rowMap: make(map[int64]types.Datum, len(decodeColMap)),
sqlMode: reorgInfo.ReorgMeta.SQLMode,
Expand Down
35 changes: 35 additions & 0 deletions ddl/constant.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
// Copyright 2022 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package ddl

import (
"github.com/pingcap/tidb/meta"
)

const (
// JobTable stores the information of DDL jobs.
JobTable = "tidb_ddl_job"
// ReorgTable stores the information of DDL reorganization.
ReorgTable = "tidb_ddl_reorg"
// HistoryTable stores the history DDL jobs.
HistoryTable = "tidb_ddl_history"

// JobTableID is the table ID of `tidb_ddl_job`.
JobTableID = meta.MaxInt48 - 1
// ReorgTableID is the table ID of `tidb_ddl_reorg`.
ReorgTableID = meta.MaxInt48 - 2
// HistoryTableID is the table ID of `tidb_ddl_history`.
HistoryTableID = meta.MaxInt48 - 3
)
4 changes: 2 additions & 2 deletions ddl/db_change_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1378,7 +1378,7 @@ func prepareTestControlParallelExecSQL(t *testing.T, store kv.Storage, dom *doma
require.NoError(t, err)
txn, err := sess.Txn(true)
require.NoError(t, err)
jobs, err := ddl.GetAllDDLJobs(meta.NewMeta(txn))
jobs, err := ddl.GetAllDDLJobs(sess, meta.NewMeta(txn))
require.NoError(t, err)
qLen = len(jobs)
if qLen == 2 {
Expand Down Expand Up @@ -1407,7 +1407,7 @@ func prepareTestControlParallelExecSQL(t *testing.T, store kv.Storage, dom *doma
require.NoError(t, err)
txn, err := sess.Txn(true)
require.NoError(t, err)
jobs, err := ddl.GetAllDDLJobs(meta.NewMeta(txn))
jobs, err := ddl.GetAllDDLJobs(sess, meta.NewMeta(txn))
require.NoError(t, err)
qLen = len(jobs)
if qLen == 1 {
Expand Down
4 changes: 3 additions & 1 deletion ddl/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -647,7 +647,7 @@ func TestAddExpressionIndexRollback(t *testing.T) {
txn, err := ctx.Txn(true)
require.NoError(t, err)
m := meta.NewMeta(txn)
element, start, end, physicalID, err := m.GetDDLReorgHandle(currJob)
element, start, end, physicalID, err := ddl.NewReorgHandlerForTest(m, testkit.NewTestKit(t, store).Session()).GetDDLReorgHandle(currJob)
require.True(t, meta.ErrDDLReorgElementNotExist.Equal(err))
require.Nil(t, element)
require.Nil(t, start)
Expand Down Expand Up @@ -1274,6 +1274,8 @@ func TestCancelJobWriteConflict(t *testing.T) {
stmt := fmt.Sprintf("admin cancel ddl jobs %d", job.ID)
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/kv/mockCommitErrorInNewTxn", `return("no_retry")`))
defer func() { require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/kv/mockCommitErrorInNewTxn")) }()
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/mockCancelConcurencyDDL", `return(true)`))
defer func() { require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/mockCancelConcurencyDDL")) }()
rs, cancelErr = tk2.Session().Execute(context.Background(), stmt)
}
}
Expand Down
Loading

0 comments on commit ab513bf

Please sign in to comment.