diff --git a/executor/builder.go b/executor/builder.go index 3f4939f732f21..68568f294a3bb 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -221,6 +221,8 @@ func (b *executorBuilder) build(p plannercore.Plan) Executor { return b.buildShowDDLJobs(v) case *plannercore.ShowDDLJobQueries: return b.buildShowDDLJobQueries(v) + case *plannercore.ShowDDLJobQueriesWithRange: + return b.buildShowDDLJobQueriesWithRange(v) case *plannercore.ShowSlow: return b.buildShowSlow(v) case *plannercore.PhysicalShow: @@ -390,6 +392,15 @@ func (b *executorBuilder) buildShowDDLJobQueries(v *plannercore.ShowDDLJobQuerie return e } +func (b *executorBuilder) buildShowDDLJobQueriesWithRange(v *plannercore.ShowDDLJobQueriesWithRange) Executor { + e := &ShowDDLJobQueriesWithRangeExec{ + baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ID()), + offset: v.Offset, + limit: v.Limit, + } + return e +} + func (b *executorBuilder) buildShowSlow(v *plannercore.ShowSlow) Executor { e := &ShowSlowExec{ baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ID()), diff --git a/executor/executor.go b/executor/executor.go index 2dfcae01d52f2..c122a4c2f4a2e 100644 --- a/executor/executor.go +++ b/executor/executor.go @@ -671,6 +671,80 @@ func (e *ShowDDLJobQueriesExec) Next(ctx context.Context, req *chunk.Chunk) erro return nil } +// ShowDDLJobQueriesWithRangeExec represents a show DDL job queries with range executor. +// The jobs id that is given by 'admin show ddl job queries' statement, +// can be searched within a specified range in history jobs using offset and limit. +type ShowDDLJobQueriesWithRangeExec struct { + baseExecutor + + cursor int + jobs []*model.Job + offset uint64 + limit uint64 +} + +// Open implements the Executor Open interface. +func (e *ShowDDLJobQueriesWithRangeExec) Open(ctx context.Context) error { + var err error + var jobs []*model.Job + if err := e.baseExecutor.Open(ctx); err != nil { + return err + } + session, err := e.getSysSession() + if err != nil { + return err + } + err = sessiontxn.NewTxn(context.Background(), session) + if err != nil { + return err + } + defer func() { + // releaseSysSession will rollbacks txn automatically. + e.releaseSysSession(kv.WithInternalSourceType(context.Background(), kv.InternalTxnDDL), session) + }() + txn, err := session.Txn(true) + if err != nil { + return err + } + session.GetSessionVars().SetInTxn(true) + + m := meta.NewMeta(txn) + jobs, err = ddl.GetAllDDLJobs(session, m) + if err != nil { + return err + } + + historyJobs, err := ddl.GetLastNHistoryDDLJobs(m, int(e.offset+e.limit)) + if err != nil { + return err + } + + e.jobs = append(e.jobs, jobs...) + e.jobs = append(e.jobs, historyJobs...) + + return nil +} + +// Next implements the Executor Next interface. +func (e *ShowDDLJobQueriesWithRangeExec) Next(ctx context.Context, req *chunk.Chunk) error { + req.GrowAndReset(e.maxChunkSize) + if e.cursor >= len(e.jobs) { + return nil + } + if int(e.limit) > len(e.jobs) { + return nil + } + numCurBatch := mathutil.Min(req.Capacity(), len(e.jobs)-e.cursor) + for i := e.cursor; i < e.cursor+numCurBatch; i++ { + if i >= int(e.offset) && i < int(e.offset+e.limit) { + req.AppendString(0, strconv.FormatInt(e.jobs[i].ID, 10)) + req.AppendString(1, e.jobs[i].Query) + } + } + e.cursor += numCurBatch + return nil +} + // Open implements the Executor Open interface. func (e *ShowDDLJobsExec) Open(ctx context.Context) error { if err := e.baseExecutor.Open(ctx); err != nil { diff --git a/executor/executor_test.go b/executor/executor_test.go index db2aebede77a6..f473077a4f0a3 100644 --- a/executor/executor_test.go +++ b/executor/executor_test.go @@ -5668,6 +5668,31 @@ func TestAdmin(t *testing.T) { result.Check(testkit.Rows(historyJobs[0].Query)) require.NoError(t, err) + // show DDL job queries with range test + tk.MustExec("use test") + tk.MustExec("drop table if exists admin_test2") + tk.MustExec("create table admin_test2 (c1 int, c2 int, c3 int default 1, index (c1))") + tk.MustExec("drop table if exists admin_test3") + tk.MustExec("create table admin_test3 (c1 int, c2 int, c3 int default 1, index (c1))") + tk.MustExec("drop table if exists admin_test4") + tk.MustExec("create table admin_test4 (c1 int, c2 int, c3 int default 1, index (c1))") + tk.MustExec("drop table if exists admin_test5") + tk.MustExec("create table admin_test5 (c1 int, c2 int, c3 int default 1, index (c1))") + tk.MustExec("drop table if exists admin_test6") + tk.MustExec("create table admin_test6 (c1 int, c2 int, c3 int default 1, index (c1))") + tk.MustExec("drop table if exists admin_test7") + tk.MustExec("create table admin_test7 (c1 int, c2 int, c3 int default 1, index (c1))") + tk.MustExec("drop table if exists admin_test8") + tk.MustExec("create table admin_test8 (c1 int, c2 int, c3 int default 1, index (c1))") + historyJobs, err = ddl.GetLastNHistoryDDLJobs(meta.NewMeta(txn), ddl.DefNumHistoryJobs) + result = tk.MustQuery(`admin show ddl job queries limit 3`) + result.Check(testkit.Rows(fmt.Sprintf("%d %s", historyJobs[0].ID, historyJobs[0].Query), fmt.Sprintf("%d %s", historyJobs[1].ID, historyJobs[1].Query), fmt.Sprintf("%d %s", historyJobs[2].ID, historyJobs[2].Query))) + result = tk.MustQuery(`admin show ddl job queries limit 3, 2`) + result.Check(testkit.Rows(fmt.Sprintf("%d %s", historyJobs[3].ID, historyJobs[3].Query), fmt.Sprintf("%d %s", historyJobs[4].ID, historyJobs[4].Query))) + result = tk.MustQuery(`admin show ddl job queries limit 3 offset 2`) + result.Check(testkit.Rows(fmt.Sprintf("%d %s", historyJobs[2].ID, historyJobs[2].Query), fmt.Sprintf("%d %s", historyJobs[3].ID, historyJobs[3].Query), fmt.Sprintf("%d %s", historyJobs[4].ID, historyJobs[4].Query))) + require.NoError(t, err) + // check table test tk.MustExec("create table admin_test1 (c1 int, c2 int default 1, index (c1))") tk.MustExec("insert admin_test1 (c1) values (21),(22)") diff --git a/planner/core/common_plans.go b/planner/core/common_plans.go index 9568096deb589..97820b5ec2cb1 100644 --- a/planner/core/common_plans.go +++ b/planner/core/common_plans.go @@ -76,6 +76,14 @@ type ShowDDLJobQueries struct { JobIDs []int64 } +// ShowDDLJobQueriesWithRange is for showing DDL job queries sql with specified limit and offset. +type ShowDDLJobQueriesWithRange struct { + baseSchemaProducer + + Limit uint64 + Offset uint64 +} + // ShowNextRowID is for showing the next global row ID. type ShowNextRowID struct { baseSchemaProducer diff --git a/planner/core/planbuilder.go b/planner/core/planbuilder.go index b5c923ff6dd27..b5a1d243cec5f 100644 --- a/planner/core/planbuilder.go +++ b/planner/core/planbuilder.go @@ -1408,6 +1408,10 @@ func (b *PlanBuilder) buildAdmin(ctx context.Context, as *ast.AdminStmt) (Plan, p := &ShowDDLJobQueries{JobIDs: as.JobIDs} p.setSchemaAndNames(buildShowDDLJobQueriesFields()) ret = p + case ast.AdminShowDDLJobQueriesWithRange: + p := &ShowDDLJobQueriesWithRange{Limit: as.LimitSimple.Count, Offset: as.LimitSimple.Offset} + p.setSchemaAndNames(buildShowDDLJobQueriesWithRangeFields()) + ret = p case ast.AdminShowSlow: p := &ShowSlow{ShowSlow: as.ShowSlow} p.setSchemaAndNames(buildShowSlowSchema()) @@ -2818,6 +2822,13 @@ func buildShowDDLJobQueriesFields() (*expression.Schema, types.NameSlice) { return schema.col2Schema(), schema.names } +func buildShowDDLJobQueriesWithRangeFields() (*expression.Schema, types.NameSlice) { + schema := newColumnsWithNames(2) + schema.Append(buildColumnWithName("", "JOB_ID", mysql.TypeVarchar, 64)) + schema.Append(buildColumnWithName("", "QUERY", mysql.TypeVarchar, 256)) + return schema.col2Schema(), schema.names +} + func buildShowSlowSchema() (*expression.Schema, types.NameSlice) { longlongSize, _ := mysql.GetDefaultFieldLengthAndDecimal(mysql.TypeLonglong) tinySize, _ := mysql.GetDefaultFieldLengthAndDecimal(mysql.TypeTiny)