From 249e996b666a663d057c2c16ca6c239d9a82da32 Mon Sep 17 00:00:00 2001 From: Sergii Mikhtoniuk Date: Tue, 9 Apr 2024 21:27:29 -0700 Subject: [PATCH] Lighten DataFrame size and fix large futures warnings --- clippy.toml | 5 +- datafusion/core/src/dataframe/mod.rs | 173 +++++++++++++------ datafusion/core/src/dataframe/parquet.rs | 7 +- datafusion/core/src/execution/context/mod.rs | 47 +++-- 4 files changed, 161 insertions(+), 71 deletions(-) diff --git a/clippy.toml b/clippy.toml index 5d5e948a3f7e..fd0a7d27d3a5 100644 --- a/clippy.toml +++ b/clippy.toml @@ -6,4 +6,7 @@ disallowed-methods = [ disallowed-types = [ { path = "std::time::Instant", reason = "Use `datafusion_common::instant::Instant` instead for WASM compatibility" }, ] -future-size-threshold = 10000 \ No newline at end of file + +# Lowering the threshold to help prevent stack overflows (default is 16384) +# See: https://rust-lang.github.io/rust-clippy/master/index.html#/large_futures +future-size-threshold = 10000 diff --git a/datafusion/core/src/dataframe/mod.rs b/datafusion/core/src/dataframe/mod.rs index 683cb809a5b1..60d3dd8e36e5 100644 --- a/datafusion/core/src/dataframe/mod.rs +++ b/datafusion/core/src/dataframe/mod.rs @@ -156,7 +156,7 @@ impl Default for DataFrameWriteOptions { /// ``` #[derive(Debug, Clone)] pub struct DataFrame { - session_state: SessionState, + session_state: Box, plan: LogicalPlan, } @@ -168,7 +168,7 @@ impl DataFrame { /// `DataFrame` from an existing datasource. pub fn new(session_state: SessionState, plan: LogicalPlan) -> Self { Self { - session_state, + session_state: Box::new(session_state), plan, } } @@ -234,7 +234,10 @@ impl DataFrame { }; let project_plan = LogicalPlanBuilder::from(plan).project(expr_list)?.build()?; - Ok(DataFrame::new(self.session_state, project_plan)) + Ok(DataFrame { + session_state: self.session_state, + plan: project_plan, + }) } /// Expand each list element of a column to multiple rows. @@ -273,7 +276,10 @@ impl DataFrame { let plan = LogicalPlanBuilder::from(self.plan) .unnest_column_with_options(column, options)? .build()?; - Ok(DataFrame::new(self.session_state, plan)) + Ok(DataFrame { + session_state: self.session_state, + plan, + }) } /// Return a DataFrame with only rows for which `predicate` evaluates to @@ -298,7 +304,10 @@ impl DataFrame { let plan = LogicalPlanBuilder::from(self.plan) .filter(predicate)? .build()?; - Ok(DataFrame::new(self.session_state, plan)) + Ok(DataFrame { + session_state: self.session_state, + plan, + }) } /// Return a new `DataFrame` that aggregates the rows of the current @@ -329,7 +338,10 @@ impl DataFrame { let plan = LogicalPlanBuilder::from(self.plan) .aggregate(group_expr, aggr_expr)? .build()?; - Ok(DataFrame::new(self.session_state, plan)) + Ok(DataFrame { + session_state: self.session_state, + plan, + }) } /// Return a new DataFrame that adds the result of evaluating one or more @@ -338,7 +350,10 @@ impl DataFrame { let plan = LogicalPlanBuilder::from(self.plan) .window(window_exprs)? .build()?; - Ok(DataFrame::new(self.session_state, plan)) + Ok(DataFrame { + session_state: self.session_state, + plan, + }) } /// Returns a new `DataFrame` with a limited number of rows. @@ -363,7 +378,10 @@ impl DataFrame { let plan = LogicalPlanBuilder::from(self.plan) .limit(skip, fetch)? .build()?; - Ok(DataFrame::new(self.session_state, plan)) + Ok(DataFrame { + session_state: self.session_state, + plan, + }) } /// Calculate the union of two [`DataFrame`]s, preserving duplicate rows. @@ -387,7 +405,10 @@ impl DataFrame { let plan = LogicalPlanBuilder::from(self.plan) .union(dataframe.plan)? .build()?; - Ok(DataFrame::new(self.session_state, plan)) + Ok(DataFrame { + session_state: self.session_state, + plan, + }) } /// Calculate the distinct union of two [`DataFrame`]s. @@ -409,12 +430,13 @@ impl DataFrame { /// # } /// ``` pub fn union_distinct(self, dataframe: DataFrame) -> Result { - Ok(DataFrame::new( - self.session_state, - LogicalPlanBuilder::from(self.plan) - .union_distinct(dataframe.plan)? - .build()?, - )) + let plan = LogicalPlanBuilder::from(self.plan) + .union_distinct(dataframe.plan)? + .build()?; + Ok(DataFrame { + session_state: self.session_state, + plan, + }) } /// Return a new `DataFrame` with all duplicated rows removed. @@ -432,10 +454,11 @@ impl DataFrame { /// # } /// ``` pub fn distinct(self) -> Result { - Ok(DataFrame::new( - self.session_state, - LogicalPlanBuilder::from(self.plan).distinct()?.build()?, - )) + let plan = LogicalPlanBuilder::from(self.plan).distinct()?.build()?; + Ok(DataFrame { + session_state: self.session_state, + plan, + }) } /// Return a new `DataFrame` that has statistics for a DataFrame. @@ -603,15 +626,18 @@ impl DataFrame { describe_record_batch.schema(), vec![vec![describe_record_batch]], )?; - Ok(DataFrame::new( - self.session_state, - LogicalPlanBuilder::scan( - UNNAMED_TABLE, - provider_as_source(Arc::new(provider)), - None, - )? - .build()?, - )) + + let plan = LogicalPlanBuilder::scan( + UNNAMED_TABLE, + provider_as_source(Arc::new(provider)), + None, + )? + .build()?; + + Ok(DataFrame { + session_state: self.session_state, + plan, + }) } /// Sort the DataFrame by the specified sorting expressions. @@ -637,7 +663,10 @@ impl DataFrame { /// ``` pub fn sort(self, expr: Vec) -> Result { let plan = LogicalPlanBuilder::from(self.plan).sort(expr)?.build()?; - Ok(DataFrame::new(self.session_state, plan)) + Ok(DataFrame { + session_state: self.session_state, + plan, + }) } /// Join this `DataFrame` with another `DataFrame` using explicitly specified @@ -691,7 +720,10 @@ impl DataFrame { filter, )? .build()?; - Ok(DataFrame::new(self.session_state, plan)) + Ok(DataFrame { + session_state: self.session_state, + plan, + }) } /// Join this `DataFrame` with another `DataFrame` using the specified @@ -741,7 +773,10 @@ impl DataFrame { let plan = LogicalPlanBuilder::from(self.plan) .join_on(right.plan, join_type, expr)? .build()?; - Ok(DataFrame::new(self.session_state, plan)) + Ok(DataFrame { + session_state: self.session_state, + plan, + }) } /// Repartition a DataFrame based on a logical partitioning scheme. @@ -762,7 +797,10 @@ impl DataFrame { let plan = LogicalPlanBuilder::from(self.plan) .repartition(partitioning_scheme)? .build()?; - Ok(DataFrame::new(self.session_state, plan)) + Ok(DataFrame { + session_state: self.session_state, + plan, + }) } /// Return the total number of rows in this `DataFrame`. @@ -867,7 +905,7 @@ impl DataFrame { /// Return a new [`TaskContext`] which would be used to execute this DataFrame pub fn task_ctx(&self) -> TaskContext { - TaskContext::from(&self.session_state) + TaskContext::from(self.session_state.as_ref()) } /// Executes this DataFrame and returns a stream over a single partition @@ -973,7 +1011,7 @@ impl DataFrame { /// Returns both the [`LogicalPlan`] and [`SessionState`] that comprise this [`DataFrame`] pub fn into_parts(self) -> (SessionState, LogicalPlan) { - (self.session_state, self.plan) + (*self.session_state, self.plan) } /// Return the [`LogicalPlan`] represented by this DataFrame without running @@ -1027,7 +1065,10 @@ impl DataFrame { let plan = LogicalPlanBuilder::from(self.plan) .explain(verbose, analyze)? .build()?; - Ok(DataFrame::new(self.session_state, plan)) + Ok(DataFrame { + session_state: self.session_state, + plan, + }) } /// Return a `FunctionRegistry` used to plan udf's calls @@ -1046,7 +1087,7 @@ impl DataFrame { /// # } /// ``` pub fn registry(&self) -> &dyn FunctionRegistry { - &self.session_state + self.session_state.as_ref() } /// Calculate the intersection of two [`DataFrame`]s. The two [`DataFrame`]s must have exactly the same schema @@ -1066,10 +1107,11 @@ impl DataFrame { pub fn intersect(self, dataframe: DataFrame) -> Result { let left_plan = self.plan; let right_plan = dataframe.plan; - Ok(DataFrame::new( - self.session_state, - LogicalPlanBuilder::intersect(left_plan, right_plan, true)?, - )) + let plan = LogicalPlanBuilder::intersect(left_plan, right_plan, true)?; + Ok(DataFrame { + session_state: self.session_state, + plan, + }) } /// Calculate the exception of two [`DataFrame`]s. The two [`DataFrame`]s must have exactly the same schema @@ -1089,11 +1131,11 @@ impl DataFrame { pub fn except(self, dataframe: DataFrame) -> Result { let left_plan = self.plan; let right_plan = dataframe.plan; - - Ok(DataFrame::new( - self.session_state, - LogicalPlanBuilder::except(left_plan, right_plan, true)?, - )) + let plan = LogicalPlanBuilder::except(left_plan, right_plan, true)?; + Ok(DataFrame { + session_state: self.session_state, + plan, + }) } /// Execute this `DataFrame` and write the results to `table_name`. @@ -1118,7 +1160,13 @@ impl DataFrame { write_options.overwrite, )? .build()?; - DataFrame::new(self.session_state, plan).collect().await + + DataFrame { + session_state: self.session_state, + plan, + } + .collect() + .await } /// Execute the `DataFrame` and write the results to CSV file(s). @@ -1166,7 +1214,13 @@ impl DataFrame { options.partition_by, )? .build()?; - DataFrame::new(self.session_state, plan).collect().await + + DataFrame { + session_state: self.session_state, + plan, + } + .collect() + .await } /// Execute the `DataFrame` and write the results to JSON file(s). @@ -1215,7 +1269,13 @@ impl DataFrame { options.partition_by, )? .build()?; - DataFrame::new(self.session_state, plan).collect().await + + DataFrame { + session_state: self.session_state, + plan, + } + .collect() + .await } /// Add an additional column to the DataFrame. @@ -1261,7 +1321,10 @@ impl DataFrame { let project_plan = LogicalPlanBuilder::from(plan).project(fields)?.build()?; - Ok(DataFrame::new(self.session_state, project_plan)) + Ok(DataFrame { + session_state: self.session_state, + plan: project_plan, + }) } /// Rename one column by applying a new projection. This is a no-op if the column to be @@ -1326,7 +1389,10 @@ impl DataFrame { let project_plan = LogicalPlanBuilder::from(self.plan) .project(projection)? .build()?; - Ok(DataFrame::new(self.session_state, project_plan)) + Ok(DataFrame { + session_state: self.session_state, + plan: project_plan, + }) } /// Replace all parameters in logical plan with the specified @@ -1388,7 +1454,10 @@ impl DataFrame { /// ``` pub fn with_param_values(self, query_values: impl Into) -> Result { let plan = self.plan.with_param_values(query_values)?; - Ok(Self::new(self.session_state, plan)) + Ok(DataFrame { + session_state: self.session_state, + plan, + }) } /// Cache DataFrame as a memory table. @@ -1405,7 +1474,7 @@ impl DataFrame { /// # } /// ``` pub async fn cache(self) -> Result { - let context = SessionContext::new_with_state(self.session_state.clone()); + let context = SessionContext::new_with_state((*self.session_state).clone()); // The schema is consistent with the output let plan = self.clone().create_physical_plan().await?; let schema = plan.schema(); diff --git a/datafusion/core/src/dataframe/parquet.rs b/datafusion/core/src/dataframe/parquet.rs index 7cc3201bf7e4..0ec46df0ae5d 100644 --- a/datafusion/core/src/dataframe/parquet.rs +++ b/datafusion/core/src/dataframe/parquet.rs @@ -68,7 +68,12 @@ impl DataFrame { options.partition_by, )? .build()?; - DataFrame::new(self.session_state, plan).collect().await + DataFrame { + session_state: self.session_state, + plan, + } + .collect() + .await } } diff --git a/datafusion/core/src/execution/context/mod.rs b/datafusion/core/src/execution/context/mod.rs index 9b5a5fef8cb9..215bdbc3d579 100644 --- a/datafusion/core/src/execution/context/mod.rs +++ b/datafusion/core/src/execution/context/mod.rs @@ -471,24 +471,37 @@ impl SessionContext { /// [`SQLOptions::verify_plan`]. pub async fn execute_logical_plan(&self, plan: LogicalPlan) -> Result { match plan { - LogicalPlan::Ddl(ddl) => match ddl { - DdlStatement::CreateExternalTable(cmd) => { - self.create_external_table(&cmd).await - } - DdlStatement::CreateMemoryTable(cmd) => { - self.create_memory_table(cmd).await - } - DdlStatement::CreateView(cmd) => self.create_view(cmd).await, - DdlStatement::CreateCatalogSchema(cmd) => { - self.create_catalog_schema(cmd).await + LogicalPlan::Ddl(ddl) => { + // Box::pin avoids allocating the stack space within this function's frame + // for every one of these individual async functions, decreasing the risk of + // stack overflows. + match ddl { + DdlStatement::CreateExternalTable(cmd) => { + Box::pin(async move { self.create_external_table(&cmd).await }) + as std::pin::Pin + Send>> + } + DdlStatement::CreateMemoryTable(cmd) => { + Box::pin(self.create_memory_table(cmd)) + } + DdlStatement::CreateView(cmd) => Box::pin(self.create_view(cmd)), + DdlStatement::CreateCatalogSchema(cmd) => { + Box::pin(self.create_catalog_schema(cmd)) + } + DdlStatement::CreateCatalog(cmd) => { + Box::pin(self.create_catalog(cmd)) + } + DdlStatement::DropTable(cmd) => Box::pin(self.drop_table(cmd)), + DdlStatement::DropView(cmd) => Box::pin(self.drop_view(cmd)), + DdlStatement::DropCatalogSchema(cmd) => { + Box::pin(self.drop_schema(cmd)) + } + DdlStatement::CreateFunction(cmd) => { + Box::pin(self.create_function(cmd)) + } + DdlStatement::DropFunction(cmd) => Box::pin(self.drop_function(cmd)), } - DdlStatement::CreateCatalog(cmd) => self.create_catalog(cmd).await, - DdlStatement::DropTable(cmd) => self.drop_table(cmd).await, - DdlStatement::DropView(cmd) => self.drop_view(cmd).await, - DdlStatement::DropCatalogSchema(cmd) => self.drop_schema(cmd).await, - DdlStatement::CreateFunction(cmd) => self.create_function(cmd).await, - DdlStatement::DropFunction(cmd) => self.drop_function(cmd).await, - }, + .await + } // TODO what about the other statements (like TransactionStart and TransactionEnd) LogicalPlan::Statement(Statement::SetVariable(stmt)) => { self.set_variable(stmt).await