From ea2e4fc28c232fb0dad74d3300b99786d812a45b Mon Sep 17 00:00:00 2001 From: Sergii Mikhtoniuk Date: Tue, 9 Apr 2024 21:26:42 -0700 Subject: [PATCH 1/2] Enable clippy::pedantic::large_futures lint --- Cargo.toml | 4 ++++ benchmarks/Cargo.toml | 3 +++ clippy.toml | 4 ++++ datafusion-examples/Cargo.toml | 3 +++ datafusion/common-runtime/Cargo.toml | 3 +++ datafusion/common/Cargo.toml | 3 +++ datafusion/core/Cargo.toml | 3 +++ datafusion/execution/Cargo.toml | 3 +++ datafusion/expr/Cargo.toml | 3 +++ datafusion/functions-aggregate/Cargo.toml | 3 +++ datafusion/functions-array/Cargo.toml | 3 +++ datafusion/functions/Cargo.toml | 3 +++ datafusion/optimizer/Cargo.toml | 3 +++ datafusion/physical-expr-common/Cargo.toml | 3 +++ datafusion/physical-expr/Cargo.toml | 3 +++ datafusion/physical-plan/Cargo.toml | 3 +++ datafusion/proto/Cargo.toml | 3 +++ datafusion/proto/gen/Cargo.toml | 3 +++ datafusion/sql/Cargo.toml | 3 +++ datafusion/sqllogictest/Cargo.toml | 3 +++ datafusion/substrait/Cargo.toml | 3 +++ datafusion/wasmtest/Cargo.toml | 3 +++ docs/Cargo.toml | 3 +++ test-utils/Cargo.toml | 3 +++ 24 files changed, 74 insertions(+) diff --git a/Cargo.toml b/Cargo.toml index c5a1aa9c8ef8..1d882817acc7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -126,3 +126,7 @@ opt-level = 3 overflow-checks = false panic = 'unwind' rpath = false + +[workspace.lints.clippy] +# Detects large stack-allocated futures that may cause stack overflow crashes (see threshold in clippy.toml) +large_futures = "warn" diff --git a/benchmarks/Cargo.toml b/benchmarks/Cargo.toml index a9fbab1ce41f..ff8f4851f95b 100644 --- a/benchmarks/Cargo.toml +++ b/benchmarks/Cargo.toml @@ -26,6 +26,9 @@ repository = { workspace = true } license = { workspace = true } rust-version = { workspace = true } +[lints] +workspace = true + [features] ci = [] default = ["mimalloc"] diff --git a/clippy.toml b/clippy.toml index 62d8263085df..908f51664542 100644 --- a/clippy.toml +++ b/clippy.toml @@ -6,3 +6,7 @@ disallowed-methods = [ disallowed-types = [ { path = "std::time::Instant", reason = "Use `datafusion_common::instant::Instant` instead for WASM compatibility" }, ] + +# Lowering the threshold to help prevent stack overflows (default is 16384) +# See: https://rust-lang.github.io/rust-clippy/master/index.html#/large_futures +future-size-threshold = 10000 \ No newline at end of file diff --git a/datafusion-examples/Cargo.toml b/datafusion-examples/Cargo.toml index 4966143782ba..0074a2b8d40c 100644 --- a/datafusion-examples/Cargo.toml +++ b/datafusion-examples/Cargo.toml @@ -29,6 +29,9 @@ license = { workspace = true } authors = { workspace = true } rust-version = { workspace = true } +[lints] +workspace = true + [[example]] name = "flight_sql_server" path = "examples/flight/flight_sql_server.rs" diff --git a/datafusion/common-runtime/Cargo.toml b/datafusion/common-runtime/Cargo.toml index 7ed8b2cf2975..c10436087675 100644 --- a/datafusion/common-runtime/Cargo.toml +++ b/datafusion/common-runtime/Cargo.toml @@ -28,6 +28,9 @@ license = { workspace = true } authors = { workspace = true } rust-version = { workspace = true } +[lints] +workspace = true + [lib] name = "datafusion_common_runtime" path = "src/lib.rs" diff --git a/datafusion/common/Cargo.toml b/datafusion/common/Cargo.toml index 167307f37108..2391b2f83087 100644 --- a/datafusion/common/Cargo.toml +++ b/datafusion/common/Cargo.toml @@ -28,6 +28,9 @@ license = { workspace = true } authors = { workspace = true } rust-version = { workspace = true } +[lints] +workspace = true + [lib] name = "datafusion_common" path = "src/lib.rs" diff --git a/datafusion/core/Cargo.toml b/datafusion/core/Cargo.toml index 3ee6471ca966..2bd552aacc44 100644 --- a/datafusion/core/Cargo.toml +++ b/datafusion/core/Cargo.toml @@ -32,6 +32,9 @@ authors = { workspace = true } # https://github.com/foresterre/cargo-msrv/issues/590 rust-version = "1.73" +[lints] +workspace = true + [lib] name = "datafusion" path = "src/lib.rs" diff --git a/datafusion/execution/Cargo.toml b/datafusion/execution/Cargo.toml index 84c878bf10dc..a00b3354eb73 100644 --- a/datafusion/execution/Cargo.toml +++ b/datafusion/execution/Cargo.toml @@ -28,6 +28,9 @@ license = { workspace = true } authors = { workspace = true } rust-version = { workspace = true } +[lints] +workspace = true + [lib] name = "datafusion_execution" path = "src/lib.rs" diff --git a/datafusion/expr/Cargo.toml b/datafusion/expr/Cargo.toml index 6f6147d36883..2759572581ea 100644 --- a/datafusion/expr/Cargo.toml +++ b/datafusion/expr/Cargo.toml @@ -28,6 +28,9 @@ license = { workspace = true } authors = { workspace = true } rust-version = { workspace = true } +[lints] +workspace = true + [lib] name = "datafusion_expr" path = "src/lib.rs" diff --git a/datafusion/functions-aggregate/Cargo.toml b/datafusion/functions-aggregate/Cargo.toml index d42932d8abdd..be354acb4851 100644 --- a/datafusion/functions-aggregate/Cargo.toml +++ b/datafusion/functions-aggregate/Cargo.toml @@ -28,6 +28,9 @@ license = { workspace = true } authors = { workspace = true } rust-version = { workspace = true } +[lints] +workspace = true + [lib] name = "datafusion_functions_aggregate" path = "src/lib.rs" diff --git a/datafusion/functions-array/Cargo.toml b/datafusion/functions-array/Cargo.toml index 6ef9c6b055af..eb1ef9e03f31 100644 --- a/datafusion/functions-array/Cargo.toml +++ b/datafusion/functions-array/Cargo.toml @@ -28,6 +28,9 @@ license = { workspace = true } authors = { workspace = true } rust-version = { workspace = true } +[lints] +workspace = true + [features] [lib] diff --git a/datafusion/functions/Cargo.toml b/datafusion/functions/Cargo.toml index f9985069413b..577ecdb7461d 100644 --- a/datafusion/functions/Cargo.toml +++ b/datafusion/functions/Cargo.toml @@ -28,6 +28,9 @@ license = { workspace = true } authors = { workspace = true } rust-version = { workspace = true } +[lints] +workspace = true + [features] # enable core functions core_expressions = [] diff --git a/datafusion/optimizer/Cargo.toml b/datafusion/optimizer/Cargo.toml index 2b96d0bc5626..b1a6953501a6 100644 --- a/datafusion/optimizer/Cargo.toml +++ b/datafusion/optimizer/Cargo.toml @@ -28,6 +28,9 @@ license = { workspace = true } authors = { workspace = true } rust-version = { workspace = true } +[lints] +workspace = true + [lib] name = "datafusion_optimizer" path = "src/lib.rs" diff --git a/datafusion/physical-expr-common/Cargo.toml b/datafusion/physical-expr-common/Cargo.toml index 89a41a5d10ce..d1202c83d526 100644 --- a/datafusion/physical-expr-common/Cargo.toml +++ b/datafusion/physical-expr-common/Cargo.toml @@ -28,6 +28,9 @@ license = { workspace = true } authors = { workspace = true } rust-version = { workspace = true } +[lints] +workspace = true + [lib] name = "datafusion_physical_expr_common" path = "src/lib.rs" diff --git a/datafusion/physical-expr/Cargo.toml b/datafusion/physical-expr/Cargo.toml index fe72a7a46fcb..5261f1c8968d 100644 --- a/datafusion/physical-expr/Cargo.toml +++ b/datafusion/physical-expr/Cargo.toml @@ -28,6 +28,9 @@ license = { workspace = true } authors = { workspace = true } rust-version = { workspace = true } +[lints] +workspace = true + [lib] name = "datafusion_physical_expr" path = "src/lib.rs" diff --git a/datafusion/physical-plan/Cargo.toml b/datafusion/physical-plan/Cargo.toml index 6863f2646000..25e1a6ad5bd3 100644 --- a/datafusion/physical-plan/Cargo.toml +++ b/datafusion/physical-plan/Cargo.toml @@ -28,6 +28,9 @@ license = { workspace = true } authors = { workspace = true } rust-version = { workspace = true } +[lints] +workspace = true + [lib] name = "datafusion_physical_plan" path = "src/lib.rs" diff --git a/datafusion/proto/Cargo.toml b/datafusion/proto/Cargo.toml index 325cd8704ccf..ecb41e4e263e 100644 --- a/datafusion/proto/Cargo.toml +++ b/datafusion/proto/Cargo.toml @@ -32,6 +32,9 @@ rust-version = "1.73" # Exclude proto files so crates.io consumers don't need protoc exclude = ["*.proto"] +[lints] +workspace = true + [lib] name = "datafusion_proto" path = "src/lib.rs" diff --git a/datafusion/proto/gen/Cargo.toml b/datafusion/proto/gen/Cargo.toml index 01ce92ee9e8e..ca93706419e4 100644 --- a/datafusion/proto/gen/Cargo.toml +++ b/datafusion/proto/gen/Cargo.toml @@ -29,6 +29,9 @@ publish = false # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html +[lints] +workspace = true + [dependencies] # Pin these dependencies so that the generated output is deterministic pbjson-build = "=0.6.2" diff --git a/datafusion/sql/Cargo.toml b/datafusion/sql/Cargo.toml index b9f6dc259eb7..ef3ed265c7ab 100644 --- a/datafusion/sql/Cargo.toml +++ b/datafusion/sql/Cargo.toml @@ -28,6 +28,9 @@ license = { workspace = true } authors = { workspace = true } rust-version = { workspace = true } +[lints] +workspace = true + [lib] name = "datafusion_sql" path = "src/lib.rs" diff --git a/datafusion/sqllogictest/Cargo.toml b/datafusion/sqllogictest/Cargo.toml index 384c5b7153c3..c652c8041ff1 100644 --- a/datafusion/sqllogictest/Cargo.toml +++ b/datafusion/sqllogictest/Cargo.toml @@ -26,6 +26,9 @@ repository = { workspace = true } rust-version = { workspace = true } version = { workspace = true } +[lints] +workspace = true + [lib] name = "datafusion_sqllogictest" path = "src/lib.rs" diff --git a/datafusion/substrait/Cargo.toml b/datafusion/substrait/Cargo.toml index 37444e8632c7..ace8da906dc2 100644 --- a/datafusion/substrait/Cargo.toml +++ b/datafusion/substrait/Cargo.toml @@ -28,6 +28,9 @@ authors = { workspace = true } # Specify MSRV here as `cargo msrv` doesn't support workspace version rust-version = "1.73" +[lints] +workspace = true + [dependencies] async-recursion = "1.0" chrono = { workspace = true } diff --git a/datafusion/wasmtest/Cargo.toml b/datafusion/wasmtest/Cargo.toml index 2f8521b19b74..46e157aecfd9 100644 --- a/datafusion/wasmtest/Cargo.toml +++ b/datafusion/wasmtest/Cargo.toml @@ -27,6 +27,9 @@ license = { workspace = true } authors = { workspace = true } rust-version = { workspace = true } +[lints] +workspace = true + [lib] crate-type = ["cdylib", "rlib"] diff --git a/docs/Cargo.toml b/docs/Cargo.toml index d5e114075c95..14398c841579 100644 --- a/docs/Cargo.toml +++ b/docs/Cargo.toml @@ -28,5 +28,8 @@ license = { workspace = true } authors = { workspace = true } rust-version = { workspace = true } +[lints] +workspace = true + [dependencies] datafusion = { workspace = true } diff --git a/test-utils/Cargo.toml b/test-utils/Cargo.toml index 1e86457f844a..325a2cc2fcc4 100644 --- a/test-utils/Cargo.toml +++ b/test-utils/Cargo.toml @@ -22,6 +22,9 @@ edition = { workspace = true } # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html +[lints] +workspace = true + [dependencies] arrow = { workspace = true } datafusion-common = { workspace = true, default-features = true } From 0a2b63324965141ff0b6c28ebf68a8c50db2fd22 Mon Sep 17 00:00:00 2001 From: Sergii Mikhtoniuk Date: Tue, 9 Apr 2024 21:27:29 -0700 Subject: [PATCH 2/2] Lighten DataFrame size and fix large futures warnings --- datafusion/core/src/dataframe/mod.rs | 174 +++++++++++++------ datafusion/core/src/dataframe/parquet.rs | 7 +- datafusion/core/src/execution/context/mod.rs | 47 +++-- 3 files changed, 158 insertions(+), 70 deletions(-) diff --git a/datafusion/core/src/dataframe/mod.rs b/datafusion/core/src/dataframe/mod.rs index 683cb809a5b1..6b4046dede87 100644 --- a/datafusion/core/src/dataframe/mod.rs +++ b/datafusion/core/src/dataframe/mod.rs @@ -156,7 +156,8 @@ impl Default for DataFrameWriteOptions { /// ``` #[derive(Debug, Clone)] pub struct DataFrame { - session_state: SessionState, + // Box the (large) SessionState to reduce the size of DataFrame on the stack + session_state: Box, plan: LogicalPlan, } @@ -168,7 +169,7 @@ impl DataFrame { /// `DataFrame` from an existing datasource. pub fn new(session_state: SessionState, plan: LogicalPlan) -> Self { Self { - session_state, + session_state: Box::new(session_state), plan, } } @@ -234,7 +235,10 @@ impl DataFrame { }; let project_plan = LogicalPlanBuilder::from(plan).project(expr_list)?.build()?; - Ok(DataFrame::new(self.session_state, project_plan)) + Ok(DataFrame { + session_state: self.session_state, + plan: project_plan, + }) } /// Expand each list element of a column to multiple rows. @@ -273,7 +277,10 @@ impl DataFrame { let plan = LogicalPlanBuilder::from(self.plan) .unnest_column_with_options(column, options)? .build()?; - Ok(DataFrame::new(self.session_state, plan)) + Ok(DataFrame { + session_state: self.session_state, + plan, + }) } /// Return a DataFrame with only rows for which `predicate` evaluates to @@ -298,7 +305,10 @@ impl DataFrame { let plan = LogicalPlanBuilder::from(self.plan) .filter(predicate)? .build()?; - Ok(DataFrame::new(self.session_state, plan)) + Ok(DataFrame { + session_state: self.session_state, + plan, + }) } /// Return a new `DataFrame` that aggregates the rows of the current @@ -329,7 +339,10 @@ impl DataFrame { let plan = LogicalPlanBuilder::from(self.plan) .aggregate(group_expr, aggr_expr)? .build()?; - Ok(DataFrame::new(self.session_state, plan)) + Ok(DataFrame { + session_state: self.session_state, + plan, + }) } /// Return a new DataFrame that adds the result of evaluating one or more @@ -338,7 +351,10 @@ impl DataFrame { let plan = LogicalPlanBuilder::from(self.plan) .window(window_exprs)? .build()?; - Ok(DataFrame::new(self.session_state, plan)) + Ok(DataFrame { + session_state: self.session_state, + plan, + }) } /// Returns a new `DataFrame` with a limited number of rows. @@ -363,7 +379,10 @@ impl DataFrame { let plan = LogicalPlanBuilder::from(self.plan) .limit(skip, fetch)? .build()?; - Ok(DataFrame::new(self.session_state, plan)) + Ok(DataFrame { + session_state: self.session_state, + plan, + }) } /// Calculate the union of two [`DataFrame`]s, preserving duplicate rows. @@ -387,7 +406,10 @@ impl DataFrame { let plan = LogicalPlanBuilder::from(self.plan) .union(dataframe.plan)? .build()?; - Ok(DataFrame::new(self.session_state, plan)) + Ok(DataFrame { + session_state: self.session_state, + plan, + }) } /// Calculate the distinct union of two [`DataFrame`]s. @@ -409,12 +431,13 @@ impl DataFrame { /// # } /// ``` pub fn union_distinct(self, dataframe: DataFrame) -> Result { - Ok(DataFrame::new( - self.session_state, - LogicalPlanBuilder::from(self.plan) - .union_distinct(dataframe.plan)? - .build()?, - )) + let plan = LogicalPlanBuilder::from(self.plan) + .union_distinct(dataframe.plan)? + .build()?; + Ok(DataFrame { + session_state: self.session_state, + plan, + }) } /// Return a new `DataFrame` with all duplicated rows removed. @@ -432,10 +455,11 @@ impl DataFrame { /// # } /// ``` pub fn distinct(self) -> Result { - Ok(DataFrame::new( - self.session_state, - LogicalPlanBuilder::from(self.plan).distinct()?.build()?, - )) + let plan = LogicalPlanBuilder::from(self.plan).distinct()?.build()?; + Ok(DataFrame { + session_state: self.session_state, + plan, + }) } /// Return a new `DataFrame` that has statistics for a DataFrame. @@ -603,15 +627,18 @@ impl DataFrame { describe_record_batch.schema(), vec![vec![describe_record_batch]], )?; - Ok(DataFrame::new( - self.session_state, - LogicalPlanBuilder::scan( - UNNAMED_TABLE, - provider_as_source(Arc::new(provider)), - None, - )? - .build()?, - )) + + let plan = LogicalPlanBuilder::scan( + UNNAMED_TABLE, + provider_as_source(Arc::new(provider)), + None, + )? + .build()?; + + Ok(DataFrame { + session_state: self.session_state, + plan, + }) } /// Sort the DataFrame by the specified sorting expressions. @@ -637,7 +664,10 @@ impl DataFrame { /// ``` pub fn sort(self, expr: Vec) -> Result { let plan = LogicalPlanBuilder::from(self.plan).sort(expr)?.build()?; - Ok(DataFrame::new(self.session_state, plan)) + Ok(DataFrame { + session_state: self.session_state, + plan, + }) } /// Join this `DataFrame` with another `DataFrame` using explicitly specified @@ -691,7 +721,10 @@ impl DataFrame { filter, )? .build()?; - Ok(DataFrame::new(self.session_state, plan)) + Ok(DataFrame { + session_state: self.session_state, + plan, + }) } /// Join this `DataFrame` with another `DataFrame` using the specified @@ -741,7 +774,10 @@ impl DataFrame { let plan = LogicalPlanBuilder::from(self.plan) .join_on(right.plan, join_type, expr)? .build()?; - Ok(DataFrame::new(self.session_state, plan)) + Ok(DataFrame { + session_state: self.session_state, + plan, + }) } /// Repartition a DataFrame based on a logical partitioning scheme. @@ -762,7 +798,10 @@ impl DataFrame { let plan = LogicalPlanBuilder::from(self.plan) .repartition(partitioning_scheme)? .build()?; - Ok(DataFrame::new(self.session_state, plan)) + Ok(DataFrame { + session_state: self.session_state, + plan, + }) } /// Return the total number of rows in this `DataFrame`. @@ -867,7 +906,7 @@ impl DataFrame { /// Return a new [`TaskContext`] which would be used to execute this DataFrame pub fn task_ctx(&self) -> TaskContext { - TaskContext::from(&self.session_state) + TaskContext::from(self.session_state.as_ref()) } /// Executes this DataFrame and returns a stream over a single partition @@ -973,7 +1012,7 @@ impl DataFrame { /// Returns both the [`LogicalPlan`] and [`SessionState`] that comprise this [`DataFrame`] pub fn into_parts(self) -> (SessionState, LogicalPlan) { - (self.session_state, self.plan) + (*self.session_state, self.plan) } /// Return the [`LogicalPlan`] represented by this DataFrame without running @@ -1027,7 +1066,10 @@ impl DataFrame { let plan = LogicalPlanBuilder::from(self.plan) .explain(verbose, analyze)? .build()?; - Ok(DataFrame::new(self.session_state, plan)) + Ok(DataFrame { + session_state: self.session_state, + plan, + }) } /// Return a `FunctionRegistry` used to plan udf's calls @@ -1046,7 +1088,7 @@ impl DataFrame { /// # } /// ``` pub fn registry(&self) -> &dyn FunctionRegistry { - &self.session_state + self.session_state.as_ref() } /// Calculate the intersection of two [`DataFrame`]s. The two [`DataFrame`]s must have exactly the same schema @@ -1066,10 +1108,11 @@ impl DataFrame { pub fn intersect(self, dataframe: DataFrame) -> Result { let left_plan = self.plan; let right_plan = dataframe.plan; - Ok(DataFrame::new( - self.session_state, - LogicalPlanBuilder::intersect(left_plan, right_plan, true)?, - )) + let plan = LogicalPlanBuilder::intersect(left_plan, right_plan, true)?; + Ok(DataFrame { + session_state: self.session_state, + plan, + }) } /// Calculate the exception of two [`DataFrame`]s. The two [`DataFrame`]s must have exactly the same schema @@ -1089,11 +1132,11 @@ impl DataFrame { pub fn except(self, dataframe: DataFrame) -> Result { let left_plan = self.plan; let right_plan = dataframe.plan; - - Ok(DataFrame::new( - self.session_state, - LogicalPlanBuilder::except(left_plan, right_plan, true)?, - )) + let plan = LogicalPlanBuilder::except(left_plan, right_plan, true)?; + Ok(DataFrame { + session_state: self.session_state, + plan, + }) } /// Execute this `DataFrame` and write the results to `table_name`. @@ -1118,7 +1161,13 @@ impl DataFrame { write_options.overwrite, )? .build()?; - DataFrame::new(self.session_state, plan).collect().await + + DataFrame { + session_state: self.session_state, + plan, + } + .collect() + .await } /// Execute the `DataFrame` and write the results to CSV file(s). @@ -1166,7 +1215,13 @@ impl DataFrame { options.partition_by, )? .build()?; - DataFrame::new(self.session_state, plan).collect().await + + DataFrame { + session_state: self.session_state, + plan, + } + .collect() + .await } /// Execute the `DataFrame` and write the results to JSON file(s). @@ -1215,7 +1270,13 @@ impl DataFrame { options.partition_by, )? .build()?; - DataFrame::new(self.session_state, plan).collect().await + + DataFrame { + session_state: self.session_state, + plan, + } + .collect() + .await } /// Add an additional column to the DataFrame. @@ -1261,7 +1322,10 @@ impl DataFrame { let project_plan = LogicalPlanBuilder::from(plan).project(fields)?.build()?; - Ok(DataFrame::new(self.session_state, project_plan)) + Ok(DataFrame { + session_state: self.session_state, + plan: project_plan, + }) } /// Rename one column by applying a new projection. This is a no-op if the column to be @@ -1326,7 +1390,10 @@ impl DataFrame { let project_plan = LogicalPlanBuilder::from(self.plan) .project(projection)? .build()?; - Ok(DataFrame::new(self.session_state, project_plan)) + Ok(DataFrame { + session_state: self.session_state, + plan: project_plan, + }) } /// Replace all parameters in logical plan with the specified @@ -1388,7 +1455,10 @@ impl DataFrame { /// ``` pub fn with_param_values(self, query_values: impl Into) -> Result { let plan = self.plan.with_param_values(query_values)?; - Ok(Self::new(self.session_state, plan)) + Ok(DataFrame { + session_state: self.session_state, + plan, + }) } /// Cache DataFrame as a memory table. @@ -1405,7 +1475,7 @@ impl DataFrame { /// # } /// ``` pub async fn cache(self) -> Result { - let context = SessionContext::new_with_state(self.session_state.clone()); + let context = SessionContext::new_with_state((*self.session_state).clone()); // The schema is consistent with the output let plan = self.clone().create_physical_plan().await?; let schema = plan.schema(); diff --git a/datafusion/core/src/dataframe/parquet.rs b/datafusion/core/src/dataframe/parquet.rs index 7cc3201bf7e4..0ec46df0ae5d 100644 --- a/datafusion/core/src/dataframe/parquet.rs +++ b/datafusion/core/src/dataframe/parquet.rs @@ -68,7 +68,12 @@ impl DataFrame { options.partition_by, )? .build()?; - DataFrame::new(self.session_state, plan).collect().await + DataFrame { + session_state: self.session_state, + plan, + } + .collect() + .await } } diff --git a/datafusion/core/src/execution/context/mod.rs b/datafusion/core/src/execution/context/mod.rs index 9b5a5fef8cb9..215bdbc3d579 100644 --- a/datafusion/core/src/execution/context/mod.rs +++ b/datafusion/core/src/execution/context/mod.rs @@ -471,24 +471,37 @@ impl SessionContext { /// [`SQLOptions::verify_plan`]. pub async fn execute_logical_plan(&self, plan: LogicalPlan) -> Result { match plan { - LogicalPlan::Ddl(ddl) => match ddl { - DdlStatement::CreateExternalTable(cmd) => { - self.create_external_table(&cmd).await - } - DdlStatement::CreateMemoryTable(cmd) => { - self.create_memory_table(cmd).await - } - DdlStatement::CreateView(cmd) => self.create_view(cmd).await, - DdlStatement::CreateCatalogSchema(cmd) => { - self.create_catalog_schema(cmd).await + LogicalPlan::Ddl(ddl) => { + // Box::pin avoids allocating the stack space within this function's frame + // for every one of these individual async functions, decreasing the risk of + // stack overflows. + match ddl { + DdlStatement::CreateExternalTable(cmd) => { + Box::pin(async move { self.create_external_table(&cmd).await }) + as std::pin::Pin + Send>> + } + DdlStatement::CreateMemoryTable(cmd) => { + Box::pin(self.create_memory_table(cmd)) + } + DdlStatement::CreateView(cmd) => Box::pin(self.create_view(cmd)), + DdlStatement::CreateCatalogSchema(cmd) => { + Box::pin(self.create_catalog_schema(cmd)) + } + DdlStatement::CreateCatalog(cmd) => { + Box::pin(self.create_catalog(cmd)) + } + DdlStatement::DropTable(cmd) => Box::pin(self.drop_table(cmd)), + DdlStatement::DropView(cmd) => Box::pin(self.drop_view(cmd)), + DdlStatement::DropCatalogSchema(cmd) => { + Box::pin(self.drop_schema(cmd)) + } + DdlStatement::CreateFunction(cmd) => { + Box::pin(self.create_function(cmd)) + } + DdlStatement::DropFunction(cmd) => Box::pin(self.drop_function(cmd)), } - DdlStatement::CreateCatalog(cmd) => self.create_catalog(cmd).await, - DdlStatement::DropTable(cmd) => self.drop_table(cmd).await, - DdlStatement::DropView(cmd) => self.drop_view(cmd).await, - DdlStatement::DropCatalogSchema(cmd) => self.drop_schema(cmd).await, - DdlStatement::CreateFunction(cmd) => self.create_function(cmd).await, - DdlStatement::DropFunction(cmd) => self.drop_function(cmd).await, - }, + .await + } // TODO what about the other statements (like TransactionStart and TransactionEnd) LogicalPlan::Statement(Statement::SetVariable(stmt)) => { self.set_variable(stmt).await