Skip to content

Commit

Permalink
executor: add syntax 'ADMIN SHOW DDL JOB QUERIES LIMIT m OFFSET n' to…
Browse files Browse the repository at this point in the history
… retrieve DDL commands within a certain range (#36480)

close #36198
  • Loading branch information
lyzx2001 authored Jul 25, 2022
1 parent 6314548 commit c4c0665
Show file tree
Hide file tree
Showing 5 changed files with 129 additions and 0 deletions.
11 changes: 11 additions & 0 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,8 @@ func (b *executorBuilder) build(p plannercore.Plan) Executor {
return b.buildShowDDLJobs(v)
case *plannercore.ShowDDLJobQueries:
return b.buildShowDDLJobQueries(v)
case *plannercore.ShowDDLJobQueriesWithRange:
return b.buildShowDDLJobQueriesWithRange(v)
case *plannercore.ShowSlow:
return b.buildShowSlow(v)
case *plannercore.PhysicalShow:
Expand Down Expand Up @@ -390,6 +392,15 @@ func (b *executorBuilder) buildShowDDLJobQueries(v *plannercore.ShowDDLJobQuerie
return e
}

func (b *executorBuilder) buildShowDDLJobQueriesWithRange(v *plannercore.ShowDDLJobQueriesWithRange) Executor {
e := &ShowDDLJobQueriesWithRangeExec{
baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ID()),
offset: v.Offset,
limit: v.Limit,
}
return e
}

func (b *executorBuilder) buildShowSlow(v *plannercore.ShowSlow) Executor {
e := &ShowSlowExec{
baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ID()),
Expand Down
74 changes: 74 additions & 0 deletions executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -671,6 +671,80 @@ func (e *ShowDDLJobQueriesExec) Next(ctx context.Context, req *chunk.Chunk) erro
return nil
}

// ShowDDLJobQueriesWithRangeExec represents a show DDL job queries with range executor.
// The jobs id that is given by 'admin show ddl job queries' statement,
// can be searched within a specified range in history jobs using offset and limit.
type ShowDDLJobQueriesWithRangeExec struct {
baseExecutor

cursor int
jobs []*model.Job
offset uint64
limit uint64
}

// Open implements the Executor Open interface.
func (e *ShowDDLJobQueriesWithRangeExec) Open(ctx context.Context) error {
var err error
var jobs []*model.Job
if err := e.baseExecutor.Open(ctx); err != nil {
return err
}
session, err := e.getSysSession()
if err != nil {
return err
}
err = sessiontxn.NewTxn(context.Background(), session)
if err != nil {
return err
}
defer func() {
// releaseSysSession will rollbacks txn automatically.
e.releaseSysSession(kv.WithInternalSourceType(context.Background(), kv.InternalTxnDDL), session)
}()
txn, err := session.Txn(true)
if err != nil {
return err
}
session.GetSessionVars().SetInTxn(true)

m := meta.NewMeta(txn)
jobs, err = ddl.GetAllDDLJobs(session, m)
if err != nil {
return err
}

historyJobs, err := ddl.GetLastNHistoryDDLJobs(m, int(e.offset+e.limit))
if err != nil {
return err
}

e.jobs = append(e.jobs, jobs...)
e.jobs = append(e.jobs, historyJobs...)

return nil
}

// Next implements the Executor Next interface.
func (e *ShowDDLJobQueriesWithRangeExec) Next(ctx context.Context, req *chunk.Chunk) error {
req.GrowAndReset(e.maxChunkSize)
if e.cursor >= len(e.jobs) {
return nil
}
if int(e.limit) > len(e.jobs) {
return nil
}
numCurBatch := mathutil.Min(req.Capacity(), len(e.jobs)-e.cursor)
for i := e.cursor; i < e.cursor+numCurBatch; i++ {
if i >= int(e.offset) && i < int(e.offset+e.limit) {
req.AppendString(0, strconv.FormatInt(e.jobs[i].ID, 10))
req.AppendString(1, e.jobs[i].Query)
}
}
e.cursor += numCurBatch
return nil
}

// Open implements the Executor Open interface.
func (e *ShowDDLJobsExec) Open(ctx context.Context) error {
if err := e.baseExecutor.Open(ctx); err != nil {
Expand Down
25 changes: 25 additions & 0 deletions executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5668,6 +5668,31 @@ func TestAdmin(t *testing.T) {
result.Check(testkit.Rows(historyJobs[0].Query))
require.NoError(t, err)

// show DDL job queries with range test
tk.MustExec("use test")
tk.MustExec("drop table if exists admin_test2")
tk.MustExec("create table admin_test2 (c1 int, c2 int, c3 int default 1, index (c1))")
tk.MustExec("drop table if exists admin_test3")
tk.MustExec("create table admin_test3 (c1 int, c2 int, c3 int default 1, index (c1))")
tk.MustExec("drop table if exists admin_test4")
tk.MustExec("create table admin_test4 (c1 int, c2 int, c3 int default 1, index (c1))")
tk.MustExec("drop table if exists admin_test5")
tk.MustExec("create table admin_test5 (c1 int, c2 int, c3 int default 1, index (c1))")
tk.MustExec("drop table if exists admin_test6")
tk.MustExec("create table admin_test6 (c1 int, c2 int, c3 int default 1, index (c1))")
tk.MustExec("drop table if exists admin_test7")
tk.MustExec("create table admin_test7 (c1 int, c2 int, c3 int default 1, index (c1))")
tk.MustExec("drop table if exists admin_test8")
tk.MustExec("create table admin_test8 (c1 int, c2 int, c3 int default 1, index (c1))")
historyJobs, err = ddl.GetLastNHistoryDDLJobs(meta.NewMeta(txn), ddl.DefNumHistoryJobs)
result = tk.MustQuery(`admin show ddl job queries limit 3`)
result.Check(testkit.Rows(fmt.Sprintf("%d %s", historyJobs[0].ID, historyJobs[0].Query), fmt.Sprintf("%d %s", historyJobs[1].ID, historyJobs[1].Query), fmt.Sprintf("%d %s", historyJobs[2].ID, historyJobs[2].Query)))
result = tk.MustQuery(`admin show ddl job queries limit 3, 2`)
result.Check(testkit.Rows(fmt.Sprintf("%d %s", historyJobs[3].ID, historyJobs[3].Query), fmt.Sprintf("%d %s", historyJobs[4].ID, historyJobs[4].Query)))
result = tk.MustQuery(`admin show ddl job queries limit 3 offset 2`)
result.Check(testkit.Rows(fmt.Sprintf("%d %s", historyJobs[2].ID, historyJobs[2].Query), fmt.Sprintf("%d %s", historyJobs[3].ID, historyJobs[3].Query), fmt.Sprintf("%d %s", historyJobs[4].ID, historyJobs[4].Query)))
require.NoError(t, err)

// check table test
tk.MustExec("create table admin_test1 (c1 int, c2 int default 1, index (c1))")
tk.MustExec("insert admin_test1 (c1) values (21),(22)")
Expand Down
8 changes: 8 additions & 0 deletions planner/core/common_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,14 @@ type ShowDDLJobQueries struct {
JobIDs []int64
}

// ShowDDLJobQueriesWithRange is for showing DDL job queries sql with specified limit and offset.
type ShowDDLJobQueriesWithRange struct {
baseSchemaProducer

Limit uint64
Offset uint64
}

// ShowNextRowID is for showing the next global row ID.
type ShowNextRowID struct {
baseSchemaProducer
Expand Down
11 changes: 11 additions & 0 deletions planner/core/planbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -1408,6 +1408,10 @@ func (b *PlanBuilder) buildAdmin(ctx context.Context, as *ast.AdminStmt) (Plan,
p := &ShowDDLJobQueries{JobIDs: as.JobIDs}
p.setSchemaAndNames(buildShowDDLJobQueriesFields())
ret = p
case ast.AdminShowDDLJobQueriesWithRange:
p := &ShowDDLJobQueriesWithRange{Limit: as.LimitSimple.Count, Offset: as.LimitSimple.Offset}
p.setSchemaAndNames(buildShowDDLJobQueriesWithRangeFields())
ret = p
case ast.AdminShowSlow:
p := &ShowSlow{ShowSlow: as.ShowSlow}
p.setSchemaAndNames(buildShowSlowSchema())
Expand Down Expand Up @@ -2818,6 +2822,13 @@ func buildShowDDLJobQueriesFields() (*expression.Schema, types.NameSlice) {
return schema.col2Schema(), schema.names
}

func buildShowDDLJobQueriesWithRangeFields() (*expression.Schema, types.NameSlice) {
schema := newColumnsWithNames(2)
schema.Append(buildColumnWithName("", "JOB_ID", mysql.TypeVarchar, 64))
schema.Append(buildColumnWithName("", "QUERY", mysql.TypeVarchar, 256))
return schema.col2Schema(), schema.names
}

func buildShowSlowSchema() (*expression.Schema, types.NameSlice) {
longlongSize, _ := mysql.GetDefaultFieldLengthAndDecimal(mysql.TypeLonglong)
tinySize, _ := mysql.GetDefaultFieldLengthAndDecimal(mysql.TypeTiny)
Expand Down

0 comments on commit c4c0665

Please sign in to comment.