diff --git a/executor/ddl_test.go b/executor/ddl_test.go index f5b22e47876a2..9d1a9cbedc976 100644 --- a/executor/ddl_test.go +++ b/executor/ddl_test.go @@ -287,6 +287,19 @@ func (s *testSuite3) TestCreateView(c *C) { c.Assert(err.Error(), Equals, "update view v_issue_16253 is not supported now.") } +func (s *testSuite3) TestViewRecursion(c *C) { + tk := testkit.NewTestKit(c, s.store) + tk.MustExec("use test") + tk.MustExec("create table if not exists t(a int)") + tk.MustExec("create definer='root'@'localhost' view recursive_view1 as select * from t") + tk.MustExec("create definer='root'@'localhost' view recursive_view2 as select * from recursive_view1") + tk.MustExec("drop table t") + tk.MustExec("rename table recursive_view2 to t") + _, err := tk.Exec("select * from recursive_view1") + c.Assert(terror.ErrorEqual(err, plannercore.ErrViewRecursive), IsTrue) + tk.MustExec("drop view recursive_view1, t") +} + func (s *testSuite3) TestIssue16250(c *C) { tk := testkit.NewTestKit(c, s.store) tk.MustExec("use test") diff --git a/planner/core/errors.go b/planner/core/errors.go index a8af76b340aa1..203f294c020cd 100644 --- a/planner/core/errors.go +++ b/planner/core/errors.go @@ -126,6 +126,7 @@ var ( ErrWindowDuplicateName = terror.ClassOptimizer.New(codeWindowDuplicateName, mysql.MySQLErrName[mysql.ErrWindowDuplicateName]) ErrPartitionClauseOnNonpartitioned = terror.ClassOptimizer.New(codePartitionClauseOnNonpartitioned, mysql.MySQLErrName[mysql.ErrPartitionClauseOnNonpartitioned]) ErrNoSuchTable = terror.ClassOptimizer.New(mysql.ErrNoSuchTable, mysql.MySQLErrName[mysql.ErrNoSuchTable]) + ErrViewRecursive = terror.ClassOptimizer.New(mysql.ErrViewRecursive, mysql.MySQLErrName[mysql.ErrViewRecursive]) errTooBigPrecision = terror.ClassExpression.New(mysql.ErrTooBigPrecision, mysql.MySQLErrName[mysql.ErrTooBigPrecision]) ErrDBaccessDenied = terror.ClassOptimizer.New(mysql.ErrDBaccessDenied, mysql.MySQLErrName[mysql.ErrDBaccessDenied]) ErrTableaccessDenied = terror.ClassOptimizer.New(mysql.ErrTableaccessDenied, mysql.MySQLErrName[mysql.ErrTableaccessDenied]) diff --git a/planner/core/logical_plan_builder.go b/planner/core/logical_plan_builder.go index 939d82442f5df..5ba86c89a371a 100644 --- a/planner/core/logical_plan_builder.go +++ b/planner/core/logical_plan_builder.go @@ -32,6 +32,7 @@ import ( "github.com/pingcap/parser/model" "github.com/pingcap/parser/mysql" "github.com/pingcap/parser/opcode" + "github.com/pingcap/parser/terror" "github.com/pingcap/tidb/domain" "github.com/pingcap/tidb/expression" "github.com/pingcap/tidb/expression/aggregation" @@ -48,6 +49,7 @@ import ( driver "github.com/pingcap/tidb/types/parser_driver" "github.com/pingcap/tidb/util/chunk" "github.com/pingcap/tidb/util/plancodec" + "github.com/pingcap/tidb/util/set" ) const ( @@ -2313,9 +2315,6 @@ func (b *PlanBuilder) buildDataSource(ctx context.Context, tn *ast.TableName) (L b.visitInfo = appendVisitInfo(b.visitInfo, mysql.SelectPriv, dbName.L, tableInfo.Name.L, "", authErr) if tableInfo.IsView() { - if b.capFlag&collectUnderlyingViewName != 0 { - b.underlyingViewNames.Insert(dbName.L + "." + tn.Name.L) - } return b.BuildDataSourceFromView(ctx, dbName, tableInfo) } @@ -2431,8 +2430,33 @@ func (b *PlanBuilder) buildDataSource(ctx context.Context, tn *ast.TableName) (L return result, nil } +// checkRecursiveView checks whether this view is recursively defined. +func (b *PlanBuilder) checkRecursiveView(dbName model.CIStr, tableName model.CIStr) (func(), error) { + viewFullName := dbName.L + "." + tableName.L + if b.buildingViewStack == nil { + b.buildingViewStack = set.NewStringSet() + } + // If this view has already been on the building stack, it means + // this view contains a recursive definition. + if b.buildingViewStack.Exist(viewFullName) { + return nil, ErrViewRecursive.GenWithStackByArgs(dbName.O, tableName.O) + } + // If the view is being renamed, we return the mysql compatible error message. + if b.capFlag&renameView != 0 && viewFullName == b.renamingViewName { + return nil, ErrNoSuchTable.GenWithStackByArgs(dbName.O, tableName.O) + } + b.buildingViewStack.Insert(viewFullName) + return func() { delete(b.buildingViewStack, viewFullName) }, nil +} + // BuildDataSourceFromView is used to build LogicalPlan from view func (b *PlanBuilder) BuildDataSourceFromView(ctx context.Context, dbName model.CIStr, tableInfo *model.TableInfo) (LogicalPlan, error) { + deferFunc, err := b.checkRecursiveView(dbName, tableInfo.Name) + if err != nil { + return nil, err + } + defer deferFunc() + charset, collation := b.ctx.GetSessionVars().GetCharsetInfo() viewParser := parser.New() viewParser.EnableWindowFunc(b.ctx.GetSessionVars().EnableWindowFunction) @@ -2444,7 +2468,10 @@ func (b *PlanBuilder) BuildDataSourceFromView(ctx context.Context, dbName model. b.visitInfo = make([]visitInfo, 0) selectLogicalPlan, err := b.Build(ctx, selectNode) if err != nil { - err = ErrViewInvalid.GenWithStackByArgs(dbName.O, tableInfo.Name.O) + if terror.ErrorNotEqual(err, ErrViewRecursive) && + terror.ErrorNotEqual(err, ErrNoSuchTable) { + err = ErrViewInvalid.GenWithStackByArgs(dbName.O, tableInfo.Name.O) + } return nil, err } diff --git a/planner/core/planbuilder.go b/planner/core/planbuilder.go index 424a1efdb8738..69cf61d43782b 100644 --- a/planner/core/planbuilder.go +++ b/planner/core/planbuilder.go @@ -170,9 +170,9 @@ const ( // canExpandAST indicates whether the origin AST can be expanded during plan // building. ONLY used for `CreateViewStmt` now. canExpandAST - // collectUnderlyingViewName indicates whether to collect the underlying - // view names of a CreateViewStmt during plan building. - collectUnderlyingViewName + // renameView indicates a view is being renamed, so we cannot use the origin + // definition of that view. + renameView ) // PlanBuilder builds Plan from an ast.Node. @@ -207,8 +207,11 @@ type PlanBuilder struct { // SelectLock need this information to locate the lock on partitions. partitionedTable []table.PartitionedTable - // CreateView needs this information to check whether exists nested view. - underlyingViewNames set.StringSet + + // buildingViewStack is used to check whether there is a recursive view. + buildingViewStack set.StringSet + // renamingViewName is the name of the view which is being renamed. + renamingViewName string } // GetVisitInfo gets the visitInfo of the PlanBuilder. @@ -2225,20 +2228,16 @@ func (b *PlanBuilder) buildDDL(ctx context.Context, node ast.DDLNode) (Plan, err v.ReferTable.Name.L, "", authErr) } case *ast.CreateViewStmt: - b.capFlag |= canExpandAST - b.capFlag |= collectUnderlyingViewName + b.capFlag |= canExpandAST | renameView + b.renamingViewName = v.ViewName.Schema.L + "." + v.ViewName.Name.L defer func() { b.capFlag &= ^canExpandAST - b.capFlag &= ^collectUnderlyingViewName + b.capFlag &= ^renameView }() - b.underlyingViewNames = set.NewStringSet() plan, err := b.Build(ctx, v.Select) if err != nil { return nil, err } - if b.underlyingViewNames.Exist(v.ViewName.Schema.L + "." + v.ViewName.Name.L) { - return nil, ErrNoSuchTable.GenWithStackByArgs(v.ViewName.Schema.O, v.ViewName.Name.O) - } schema := plan.Schema() if v.Cols == nil { adjustOverlongViewColname(plan.(LogicalPlan))