From cbb1b05dc9f0902f6a7382f016abd8b2d36df77f Mon Sep 17 00:00:00 2001 From: tangenta Date: Wed, 13 Jul 2022 11:46:49 +0800 Subject: [PATCH] *: support show ddl jobs for sub-jobs --- ddl/multi_schema_change_test.go | 35 ++++++++++++++++++++++++++++++ executor/executor.go | 16 ++++++++++++++ executor/infoschema_reader.go | 8 +++++++ executor/infoschema_reader_test.go | 5 +++++ parser/model/ddl.go | 6 ++++- 5 files changed, 69 insertions(+), 1 deletion(-) diff --git a/ddl/multi_schema_change_test.go b/ddl/multi_schema_change_test.go index 9f80cd19fbab4..4d91d83aefb8d 100644 --- a/ddl/multi_schema_change_test.go +++ b/ddl/multi_schema_change_test.go @@ -632,6 +632,41 @@ func TestMultiSchemaChangeModifyColumnsCancelled(t *testing.T) { Check(testkit.Rows("int")) } +func TestMultiSchemaChangeAdminShowDDLJobs(t *testing.T) { + store, dom, clean := testkit.CreateMockStoreAndDomain(t) + defer clean() + + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + originHook := dom.DDL().GetHook() + hook := &ddl.TestDDLCallback{Do: dom} + hook.OnJobRunBeforeExported = func(job *model.Job) { + assert.Equal(t, model.ActionMultiSchemaChange, job.Type) + if job.MultiSchemaInfo.SubJobs[0].SchemaState == model.StateDeleteOnly { + newTk := testkit.NewTestKit(t, store) + rows := newTk.MustQuery("admin show ddl jobs 1").Rows() + // 1 history job and 1 running job with 2 subjobs + assert.Equal(t, len(rows), 4) + assert.Equal(t, rows[1][1], "test") + assert.Equal(t, rows[1][2], "t") + assert.Equal(t, rows[1][3], "add index /* subjob */") + assert.Equal(t, rows[1][4], "delete only") + assert.Equal(t, rows[1][len(rows[1])-1], "running") + + assert.Equal(t, rows[2][3], "add index /* subjob */") + assert.Equal(t, rows[2][4], "none") + assert.Equal(t, rows[2][len(rows[2])-1], "queueing") + } + } + + tk.MustExec("create table t (a int, b int, c int)") + tk.MustExec("insert into t values (1, 2, 3)") + + dom.DDL().SetHook(hook) + tk.MustExec("alter table t add index t(a), add index t1(b)") + dom.DDL().SetHook(originHook) +} + func TestMultiSchemaChangeWithExpressionIndex(t *testing.T) { store, dom, clean := testkit.CreateMockStoreAndDomain(t) defer clean() diff --git a/executor/executor.go b/executor/executor.go index 7d2839cb29e34..be6e75495fce0 100644 --- a/executor/executor.go +++ b/executor/executor.go @@ -568,6 +568,22 @@ func (e *DDLJobRetriever) appendJobToChunk(req *chunk.Chunk, job *model.Job, che req.AppendNull(10) } req.AppendString(11, job.State.String()) + if job.Type == model.ActionMultiSchemaChange { + for _, subJob := range job.MultiSchemaInfo.SubJobs { + req.AppendInt64(0, job.ID) + req.AppendString(1, schemaName) + req.AppendString(2, tableName) + req.AppendString(3, subJob.Type.String()+" /* subjob */") + req.AppendString(4, subJob.SchemaState.String()) + req.AppendInt64(5, job.SchemaID) + req.AppendInt64(6, job.TableID) + req.AppendInt64(7, subJob.RowCount) + req.AppendNull(8) + req.AppendNull(9) + req.AppendNull(10) + req.AppendString(11, subJob.State.String()) + } + } } func ts2Time(timestamp uint64, loc *time.Location) types.Time { diff --git a/executor/infoschema_reader.go b/executor/infoschema_reader.go index 6adcf3e01f851..bd25f2825cd69 100644 --- a/executor/infoschema_reader.go +++ b/executor/infoschema_reader.go @@ -1303,6 +1303,9 @@ func (e *DDLJobsReaderExec) Next(ctx context.Context, req *chunk.Chunk) error { for i := e.cursor; i < e.cursor+num; i++ { e.appendJobToChunk(req, e.runningJobs[i], checker) req.AppendString(12, e.runningJobs[i].Query) + for range e.runningJobs[i].MultiSchemaInfo.SubJobs { + req.AppendString(12, e.runningJobs[i].Query) + } } e.cursor += num count += num @@ -1318,6 +1321,11 @@ func (e *DDLJobsReaderExec) Next(ctx context.Context, req *chunk.Chunk) error { for _, job := range e.cacheJobs { e.appendJobToChunk(req, job, checker) req.AppendString(12, job.Query) + if job.Type == model.ActionMultiSchemaChange { + for range job.MultiSchemaInfo.SubJobs { + req.AppendString(12, job.Query) + } + } } e.cursor += len(e.cacheJobs) } diff --git a/executor/infoschema_reader_test.go b/executor/infoschema_reader_test.go index 3f5631d43bff1..97531b2489140 100644 --- a/executor/infoschema_reader_test.go +++ b/executor/infoschema_reader_test.go @@ -243,6 +243,11 @@ func TestDDLJobs(t *testing.T) { DDLJobsTester.MustExec("set role r_priv") DDLJobsTester.MustQuery("select DB_NAME, TABLE_NAME from information_schema.DDL_JOBS where DB_NAME = 'test_ddl_jobs' and TABLE_NAME = 't';").Check( testkit.Rows("test_ddl_jobs t")) + + tk.MustExec("create table tt (a int);") + tk.MustExec("alter table tt add index t(a), add column b int") + tk.MustQuery("select db_name, table_name, job_type from information_schema.DDL_JOBS limit 3").Check( + testkit.Rows("test_ddl_jobs tt alter table multi-schema change", "test_ddl_jobs tt add index /* subjob */", "test_ddl_jobs tt add column /* subjob */")) } func TestKeyColumnUsage(t *testing.T) { diff --git a/parser/model/ddl.go b/parser/model/ddl.go index 6be7ad22c571f..62f2fe5239e62 100644 --- a/parser/model/ddl.go +++ b/parser/model/ddl.go @@ -554,8 +554,12 @@ func (job *Job) DecodeArgs(args ...interface{}) error { // String implements fmt.Stringer interface. func (job *Job) String() string { rowCount := job.GetRowCount() - return fmt.Sprintf("ID:%d, Type:%s, State:%s, SchemaState:%s, SchemaID:%d, TableID:%d, RowCount:%d, ArgLen:%d, start time: %v, Err:%v, ErrCount:%d, SnapshotVersion:%v", + ret := fmt.Sprintf("ID:%d, Type:%s, State:%s, SchemaState:%s, SchemaID:%d, TableID:%d, RowCount:%d, ArgLen:%d, start time: %v, Err:%v, ErrCount:%d, SnapshotVersion:%v", job.ID, job.Type, job.State, job.SchemaState, job.SchemaID, job.TableID, rowCount, len(job.Args), TSConvert2Time(job.StartTS), job.Error, job.ErrorCount, job.SnapshotVer) + if job.Type != ActionMultiSchemaChange && job.MultiSchemaInfo != nil { + ret += fmt.Sprintf(", Multi-Schema Change:true, Revertible:%v", job.MultiSchemaInfo.Revertible) + } + return ret } func (job *Job) hasDependentSchema(other *Job) (bool, error) {