From 3eb198b5171ebdd41a0279d1da059544429945ca Mon Sep 17 00:00:00 2001 From: Michael J Ward Date: Sun, 4 Aug 2024 18:07:34 -0500 Subject: [PATCH] Add PyExpr to_variant conversions (#793) * make PyExpr::to_variant arms explicit * update PyInList to wrap expr::InList * update PyExists to wrap expr::Exists * update PyInSubquery to wrap expr::InSubquery * update Placeholder to wrap expr::Placeholder * make PyLogicalPlan::to_variant match arms explicit * add PySortExpr wrapper * add PyUnnestExpr wrapper * update PyAlias to wrap upstream Alias * return not implemented error for unimplemnted variants in PyExpr::to_variant * added to_variant python test from the GH issue * remove unused import * return unsupported_variants for unimplemented variants in PyLogicalPlan::to_variant --- python/datafusion/tests/test_expr.py | 28 +++++++++++ src/expr.rs | 51 +++++++++++++++++--- src/expr/alias.rs | 32 +++++++------ src/expr/exists.rs | 15 +++--- src/expr/in_list.rs | 22 ++++----- src/expr/in_subquery.rs | 22 ++++----- src/expr/placeholder.rs | 21 ++++---- src/expr/sort_expr.rs | 71 ++++++++++++++++++++++++++++ src/expr/unnest_expr.rs | 67 ++++++++++++++++++++++++++ src/sql/logical.rs | 15 ++++-- 10 files changed, 273 insertions(+), 71 deletions(-) create mode 100644 src/expr/sort_expr.rs create mode 100644 src/expr/unnest_expr.rs diff --git a/python/datafusion/tests/test_expr.py b/python/datafusion/tests/test_expr.py index c9f0e98d..1a41120a 100644 --- a/python/datafusion/tests/test_expr.py +++ b/python/datafusion/tests/test_expr.py @@ -139,3 +139,31 @@ def test_relational_expr(test_ctx): assert df.filter(col("b") != "beta").count() == 2 assert df.filter(col("a") == "beta").count() == 0 + + +def test_expr_to_variant(): + # Taken from https://github.com/apache/datafusion-python/issues/781 + from datafusion import SessionContext + from datafusion.expr import Filter + + + def traverse_logical_plan(plan): + cur_node = plan.to_variant() + if isinstance(cur_node, Filter): + return cur_node.predicate().to_variant() + if hasattr(plan, 'inputs'): + for input_plan in plan.inputs(): + res = traverse_logical_plan(input_plan) + if res is not None: + return res + + ctx = SessionContext() + data = {'id': [1, 2, 3], 'name': ['Alice', 'Bob', 'Charlie']} + ctx.from_pydict(data, name='table1') + query = "SELECT * FROM table1 t1 WHERE t1.name IN ('dfa', 'ad', 'dfre', 'vsa')" + logical_plan = ctx.sql(query).optimized_logical_plan() + variant = traverse_logical_plan(logical_plan) + assert variant is not None + assert variant.expr().to_variant().qualified_name() == 'table1.name' + assert str(variant.list()) == '[Expr(Utf8("dfa")), Expr(Utf8("ad")), Expr(Utf8("dfre")), Expr(Utf8("vsa"))]' + assert not variant.negated() diff --git a/src/expr.rs b/src/expr.rs index aab0daa6..04bfc85c 100644 --- a/src/expr.rs +++ b/src/expr.rs @@ -33,7 +33,7 @@ use datafusion_expr::{ }; use crate::common::data_type::{DataTypeMap, RexType}; -use crate::errors::{py_runtime_err, py_type_err, DataFusionError}; +use crate::errors::{py_runtime_err, py_type_err, py_unsupported_variant_err, DataFusionError}; use crate::expr::aggregate_expr::PyAggregateFunction; use crate::expr::binary_expr::PyBinaryExpr; use crate::expr::column::PyColumn; @@ -84,11 +84,13 @@ pub mod scalar_subquery; pub mod scalar_variable; pub mod signature; pub mod sort; +pub mod sort_expr; pub mod subquery; pub mod subquery_alias; pub mod table_scan; pub mod union; pub mod unnest; +pub mod unnest_expr; pub mod window; /// A PyExpr that can be used on a DataFrame @@ -119,8 +121,9 @@ pub fn py_expr_list(expr: &[Expr]) -> PyResult> { impl PyExpr { /// Return the specific expression fn to_variant(&self, py: Python) -> PyResult { - Python::with_gil(|_| match &self.expr { - Expr::Alias(alias) => Ok(PyAlias::new(&alias.expr, &alias.name).into_py(py)), + Python::with_gil(|_| { + match &self.expr { + Expr::Alias(alias) => Ok(PyAlias::from(alias.clone()).into_py(py)), Expr::Column(col) => Ok(PyColumn::from(col.clone()).into_py(py)), Expr::ScalarVariable(data_type, variables) => { Ok(PyScalarVariable::new(data_type, variables).into_py(py)) @@ -141,10 +144,44 @@ impl PyExpr { Expr::AggregateFunction(expr) => { Ok(PyAggregateFunction::from(expr.clone()).into_py(py)) } - other => Err(py_runtime_err(format!( - "Cannot convert this Expr to a Python object: {:?}", - other + Expr::SimilarTo(value) => Ok(PySimilarTo::from(value.clone()).into_py(py)), + Expr::Between(value) => Ok(between::PyBetween::from(value.clone()).into_py(py)), + Expr::Case(value) => Ok(case::PyCase::from(value.clone()).into_py(py)), + Expr::Cast(value) => Ok(cast::PyCast::from(value.clone()).into_py(py)), + Expr::TryCast(value) => Ok(cast::PyTryCast::from(value.clone()).into_py(py)), + Expr::Sort(value) => Ok(sort_expr::PySortExpr::from(value.clone()).into_py(py)), + Expr::ScalarFunction(value) => Err(py_unsupported_variant_err(format!( + "Converting Expr::ScalarFunction to a Python object is not implemented: {:?}", + value ))), + Expr::WindowFunction(value) => Err(py_unsupported_variant_err(format!( + "Converting Expr::WindowFunction to a Python object is not implemented: {:?}", + value + ))), + Expr::InList(value) => Ok(in_list::PyInList::from(value.clone()).into_py(py)), + Expr::Exists(value) => Ok(exists::PyExists::from(value.clone()).into_py(py)), + Expr::InSubquery(value) => { + Ok(in_subquery::PyInSubquery::from(value.clone()).into_py(py)) + } + Expr::ScalarSubquery(value) => { + Ok(scalar_subquery::PyScalarSubquery::from(value.clone()).into_py(py)) + } + Expr::Wildcard { qualifier } => Err(py_unsupported_variant_err(format!( + "Converting Expr::Wildcard to a Python object is not implemented : {:?}", + qualifier + ))), + Expr::GroupingSet(value) => { + Ok(grouping_set::PyGroupingSet::from(value.clone()).into_py(py)) + } + Expr::Placeholder(value) => { + Ok(placeholder::PyPlaceholder::from(value.clone()).into_py(py)) + } + Expr::OuterReferenceColumn(data_type, column) => Err(py_unsupported_variant_err(format!( + "Converting Expr::OuterReferenceColumn to a Python object is not implemented: {:?} - {:?}", + data_type, column + ))), + Expr::Unnest(value) => Ok(unnest_expr::PyUnnestExpr::from(value.clone()).into_py(py)), + } }) } @@ -599,6 +636,7 @@ pub(crate) fn init_module(m: &Bound<'_, PyModule>) -> PyResult<()> { m.add_class::()?; m.add_class::()?; m.add_class::()?; + m.add_class::()?; m.add_class::()?; m.add_class::()?; m.add_class::()?; @@ -606,6 +644,7 @@ pub(crate) fn init_module(m: &Bound<'_, PyModule>) -> PyResult<()> { m.add_class::()?; m.add_class::()?; m.add_class::()?; + m.add_class::()?; m.add_class::()?; m.add_class::()?; m.add_class::()?; diff --git a/src/expr/alias.rs b/src/expr/alias.rs index 2ce65634..3208800a 100644 --- a/src/expr/alias.rs +++ b/src/expr/alias.rs @@ -19,13 +19,24 @@ use crate::expr::PyExpr; use pyo3::prelude::*; use std::fmt::{self, Display, Formatter}; -use datafusion_expr::Expr; +use datafusion_expr::expr::Alias; #[pyclass(name = "Alias", module = "datafusion.expr", subclass)] #[derive(Clone)] pub struct PyAlias { - expr: PyExpr, - alias_name: String, + alias: Alias, +} + +impl From for PyAlias { + fn from(alias: Alias) -> Self { + Self { alias } + } +} + +impl From for Alias { + fn from(py_alias: PyAlias) -> Self { + py_alias.alias + } } impl Display for PyAlias { @@ -35,29 +46,20 @@ impl Display for PyAlias { "Alias \nExpr: `{:?}` \nAlias Name: `{}`", - &self.expr, &self.alias_name + &self.alias.expr, &self.alias.name ) } } -impl PyAlias { - pub fn new(expr: &Expr, alias_name: &String) -> Self { - Self { - expr: expr.clone().into(), - alias_name: alias_name.to_owned(), - } - } -} - #[pymethods] impl PyAlias { /// Retrieve the "name" of the alias fn alias(&self) -> PyResult { - Ok(self.alias_name.clone()) + Ok(self.alias.name.clone()) } fn expr(&self) -> PyResult { - Ok(self.expr.clone()) + Ok((*self.alias.expr.clone()).into()) } /// Get a String representation of this column diff --git a/src/expr/exists.rs b/src/expr/exists.rs index 7df9a6e8..fd2aa8c2 100644 --- a/src/expr/exists.rs +++ b/src/expr/exists.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -use datafusion_expr::Subquery; +use datafusion_expr::expr::Exists; use pyo3::prelude::*; use super::subquery::PySubquery; @@ -23,23 +23,22 @@ use super::subquery::PySubquery; #[pyclass(name = "Exists", module = "datafusion.expr", subclass)] #[derive(Clone)] pub struct PyExists { - subquery: Subquery, - negated: bool, + exists: Exists, } -impl PyExists { - pub fn new(subquery: Subquery, negated: bool) -> Self { - Self { subquery, negated } +impl From for PyExists { + fn from(exists: Exists) -> Self { + PyExists { exists } } } #[pymethods] impl PyExists { fn subquery(&self) -> PySubquery { - self.subquery.clone().into() + self.exists.subquery.clone().into() } fn negated(&self) -> bool { - self.negated + self.exists.negated } } diff --git a/src/expr/in_list.rs b/src/expr/in_list.rs index 840eee2c..c1a99a3c 100644 --- a/src/expr/in_list.rs +++ b/src/expr/in_list.rs @@ -16,38 +16,32 @@ // under the License. use crate::expr::PyExpr; -use datafusion_expr::Expr; +use datafusion_expr::expr::InList; use pyo3::prelude::*; #[pyclass(name = "InList", module = "datafusion.expr", subclass)] #[derive(Clone)] pub struct PyInList { - expr: Box, - list: Vec, - negated: bool, + in_list: InList, } -impl PyInList { - pub fn new(expr: Box, list: Vec, negated: bool) -> Self { - Self { - expr, - list, - negated, - } +impl From for PyInList { + fn from(in_list: InList) -> Self { + PyInList { in_list } } } #[pymethods] impl PyInList { fn expr(&self) -> PyExpr { - (*self.expr).clone().into() + (*self.in_list.expr).clone().into() } fn list(&self) -> Vec { - self.list.iter().map(|e| e.clone().into()).collect() + self.in_list.list.iter().map(|e| e.clone().into()).collect() } fn negated(&self) -> bool { - self.negated + self.in_list.negated } } diff --git a/src/expr/in_subquery.rs b/src/expr/in_subquery.rs index 6cee4a1f..7dfafdbf 100644 --- a/src/expr/in_subquery.rs +++ b/src/expr/in_subquery.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -use datafusion_expr::{Expr, Subquery}; +use datafusion_expr::expr::InSubquery; use pyo3::prelude::*; use super::{subquery::PySubquery, PyExpr}; @@ -23,32 +23,26 @@ use super::{subquery::PySubquery, PyExpr}; #[pyclass(name = "InSubquery", module = "datafusion.expr", subclass)] #[derive(Clone)] pub struct PyInSubquery { - expr: Box, - subquery: Subquery, - negated: bool, + in_subquery: InSubquery, } -impl PyInSubquery { - pub fn new(expr: Box, subquery: Subquery, negated: bool) -> Self { - Self { - expr, - subquery, - negated, - } +impl From for PyInSubquery { + fn from(in_subquery: InSubquery) -> Self { + PyInSubquery { in_subquery } } } #[pymethods] impl PyInSubquery { fn expr(&self) -> PyExpr { - (*self.expr).clone().into() + (*self.in_subquery.expr).clone().into() } fn subquery(&self) -> PySubquery { - self.subquery.clone().into() + self.in_subquery.subquery.clone().into() } fn negated(&self) -> bool { - self.negated + self.in_subquery.negated } } diff --git a/src/expr/placeholder.rs b/src/expr/placeholder.rs index e37c8b56..ca75ce37 100644 --- a/src/expr/placeholder.rs +++ b/src/expr/placeholder.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -use datafusion::arrow::datatypes::DataType; +use datafusion_expr::expr::Placeholder; use pyo3::prelude::*; use crate::common::data_type::PyDataType; @@ -23,26 +23,25 @@ use crate::common::data_type::PyDataType; #[pyclass(name = "Placeholder", module = "datafusion.expr", subclass)] #[derive(Clone)] pub struct PyPlaceholder { - id: String, - data_type: Option, + placeholder: Placeholder, } -impl PyPlaceholder { - pub fn new(id: String, data_type: DataType) -> Self { - Self { - id, - data_type: Some(data_type), - } +impl From for PyPlaceholder { + fn from(placeholder: Placeholder) -> Self { + PyPlaceholder { placeholder } } } #[pymethods] impl PyPlaceholder { fn id(&self) -> String { - self.id.clone() + self.placeholder.id.clone() } fn data_type(&self) -> Option { - self.data_type.as_ref().map(|e| e.clone().into()) + self.placeholder + .data_type + .as_ref() + .map(|e| e.clone().into()) } } diff --git a/src/expr/sort_expr.rs b/src/expr/sort_expr.rs new file mode 100644 index 00000000..6a8a0cf0 --- /dev/null +++ b/src/expr/sort_expr.rs @@ -0,0 +1,71 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::expr::PyExpr; +use datafusion_expr::SortExpr; +use pyo3::prelude::*; +use std::fmt::{self, Display, Formatter}; + +#[pyclass(name = "SortExpr", module = "datafusion.expr", subclass)] +#[derive(Clone)] +pub struct PySortExpr { + sort: SortExpr, +} + +impl From for SortExpr { + fn from(sort: PySortExpr) -> Self { + sort.sort + } +} + +impl From for PySortExpr { + fn from(sort: SortExpr) -> PySortExpr { + PySortExpr { sort } + } +} + +impl Display for PySortExpr { + fn fmt(&self, f: &mut Formatter) -> fmt::Result { + write!( + f, + "Sort + Expr: {:?} + Asc: {:?} + NullsFirst: {:?}", + &self.sort.expr, &self.sort.asc, &self.sort.nulls_first + ) + } +} + +#[pymethods] +impl PySortExpr { + fn expr(&self) -> PyResult { + Ok((*self.sort.expr).clone().into()) + } + + fn ascending(&self) -> PyResult { + Ok(self.sort.asc) + } + + fn nulls_first(&self) -> PyResult { + Ok(self.sort.nulls_first) + } + + fn __repr__(&self) -> String { + format!("{}", self) + } +} diff --git a/src/expr/unnest_expr.rs b/src/expr/unnest_expr.rs new file mode 100644 index 00000000..a2f8230c --- /dev/null +++ b/src/expr/unnest_expr.rs @@ -0,0 +1,67 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use datafusion_expr::expr::Unnest; +use pyo3::prelude::*; +use std::fmt::{self, Display, Formatter}; + +use super::PyExpr; + +#[pyclass(name = "UnnestExpr", module = "datafusion.expr", subclass)] +#[derive(Clone)] +pub struct PyUnnestExpr { + unnest: Unnest, +} + +impl From for PyUnnestExpr { + fn from(unnest: Unnest) -> PyUnnestExpr { + PyUnnestExpr { unnest } + } +} + +impl From for Unnest { + fn from(unnest: PyUnnestExpr) -> Self { + unnest.unnest + } +} + +impl Display for PyUnnestExpr { + fn fmt(&self, f: &mut Formatter) -> fmt::Result { + write!( + f, + "Unnest + Expr: {:?}", + &self.unnest.expr, + ) + } +} + +#[pymethods] +impl PyUnnestExpr { + /// Retrieves the expression that is being unnested + fn expr(&self) -> PyResult { + Ok((*self.unnest.expr).clone().into()) + } + + fn __repr__(&self) -> PyResult { + Ok(format!("UnnestExpr({})", self)) + } + + fn __name__(&self) -> PyResult { + Ok("UnnestExpr".to_string()) + } +} diff --git a/src/sql/logical.rs b/src/sql/logical.rs index b1446b92..c4471f50 100644 --- a/src/sql/logical.rs +++ b/src/sql/logical.rs @@ -81,9 +81,18 @@ impl PyLogicalPlan { LogicalPlan::SubqueryAlias(plan) => PySubqueryAlias::from(plan.clone()).to_variant(py), LogicalPlan::Unnest(plan) => PyUnnest::from(plan.clone()).to_variant(py), LogicalPlan::Window(plan) => PyWindow::from(plan.clone()).to_variant(py), - other => Err(py_unsupported_variant_err(format!( - "Cannot convert this plan to a LogicalNode: {:?}", - other + LogicalPlan::Repartition(_) + | LogicalPlan::Union(_) + | LogicalPlan::Statement(_) + | LogicalPlan::Values(_) + | LogicalPlan::Prepare(_) + | LogicalPlan::Dml(_) + | LogicalPlan::Ddl(_) + | LogicalPlan::Copy(_) + | LogicalPlan::DescribeTable(_) + | LogicalPlan::RecursiveQuery(_) => Err(py_unsupported_variant_err(format!( + "Conversion of variant not implemented: {:?}", + self.plan ))), } }