Skip to content

Commit

Permalink
Port struct to datafusion-functions (#9546)
Browse files Browse the repository at this point in the history
* initial port

* fix clippy

* update cargo in cli

* remove dependency

* resolve conflict

* cargo update in CLI
  • Loading branch information
yyy1000 authored Mar 11, 2024
1 parent d927882 commit 75ad221
Show file tree
Hide file tree
Showing 13 changed files with 77 additions and 60 deletions.
16 changes: 8 additions & 8 deletions datafusion-cli/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

20 changes: 2 additions & 18 deletions datafusion/expr/src/built_in_function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use crate::signature::TIMEZONE_WILDCARD;
use crate::type_coercion::functions::data_types;
use crate::{FuncMonotonicity, Signature, TypeSignature, Volatility};

use arrow::datatypes::{DataType, Field, Fields, TimeUnit};
use arrow::datatypes::{DataType, Field, TimeUnit};
use datafusion_common::{plan_err, DataFusionError, Result};

use strum::IntoEnumIterator;
Expand Down Expand Up @@ -151,10 +151,6 @@ pub enum BuiltinScalarFunction {
/// array_resize
ArrayResize,

// struct functions
/// struct
Struct,

// string functions
/// ascii
Ascii,
Expand Down Expand Up @@ -390,7 +386,6 @@ impl BuiltinScalarFunction {
BuiltinScalarFunction::Translate => Volatility::Immutable,
BuiltinScalarFunction::Trim => Volatility::Immutable,
BuiltinScalarFunction::Upper => Volatility::Immutable,
BuiltinScalarFunction::Struct => Volatility::Immutable,
BuiltinScalarFunction::FromUnixtime => Volatility::Immutable,
BuiltinScalarFunction::ArrowTypeof => Volatility::Immutable,
BuiltinScalarFunction::OverLay => Volatility::Immutable,
Expand Down Expand Up @@ -598,14 +593,7 @@ impl BuiltinScalarFunction {
_ => Ok(Float64),
},

BuiltinScalarFunction::Struct => {
let return_fields = input_expr_types
.iter()
.enumerate()
.map(|(pos, dt)| Field::new(format!("c{pos}"), dt.clone(), true))
.collect::<Vec<Field>>();
Ok(Struct(Fields::from(return_fields)))
}


BuiltinScalarFunction::Atan2 => match &input_expr_types[0] {
Float32 => Ok(Float32),
Expand Down Expand Up @@ -713,7 +701,6 @@ impl BuiltinScalarFunction {
Signature::variadic_any(self.volatility())
}

BuiltinScalarFunction::Struct => Signature::variadic_any(self.volatility()),
BuiltinScalarFunction::Concat
| BuiltinScalarFunction::ConcatWithSeparator => {
Signature::variadic(vec![Utf8], self.volatility())
Expand Down Expand Up @@ -1146,9 +1133,6 @@ impl BuiltinScalarFunction {
&["array_intersect", "list_intersect"]
}
BuiltinScalarFunction::OverLay => &["overlay"],

// struct functions
BuiltinScalarFunction::Struct => &["struct"],
}
}
}
Expand Down
7 changes: 0 additions & 7 deletions datafusion/expr/src/expr_fn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -827,13 +827,6 @@ scalar_expr!(Levenshtein, levenshtein, string1 string2, "Returns the Levenshtein
scalar_expr!(SubstrIndex, substr_index, string delimiter count, "Returns the substring from str before count occurrences of the delimiter");
scalar_expr!(FindInSet, find_in_set, str strlist, "Returns a value in the range of 1 to N if the string str is in the string list strlist consisting of N substrings");

scalar_expr!(
Struct,
struct_fun,
val,
"returns a vector of fields from the struct"
);

/// Create a CASE WHEN statement with literal WHEN expressions for comparison to the base expression.
pub fn case(expr: Expr) -> CaseBuilder {
CaseBuilder::new(Some(Box::new(expr)), vec![], vec![], None)
Expand Down
5 changes: 4 additions & 1 deletion datafusion/functions/src/core/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,18 @@
mod nullif;
mod nvl;
mod nvl2;
pub mod r#struct;

// create UDFs
make_udf_function!(nullif::NullIfFunc, NULLIF, nullif);
make_udf_function!(nvl::NVLFunc, NVL, nvl);
make_udf_function!(nvl2::NVL2Func, NVL2, nvl2);
make_udf_function!(r#struct::StructFunc, STRUCT, r#struct);

// Export the functions out of this package, both as expr_fn as well as a list of functions
export_functions!(
(nullif, arg_1 arg_2, "returns NULL if value1 equals value2; otherwise it returns value1. This can be used to perform the inverse operation of the COALESCE expression."),
(nvl, arg_1 arg_2, "returns value2 if value1 is NULL; otherwise it returns value1"),
(nvl2, arg_1 arg_2 arg_3, "Returns value2 if value1 is not NULL; otherwise, it returns value3.")
(nvl2, arg_1 arg_2 arg_3, "Returns value2 if value1 is not NULL; otherwise, it returns value3."),
(r#struct, args, "Returns a struct with the given arguments")
);
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,12 @@
// specific language governing permissions and limitations
// under the License.

//! Struct expressions

use arrow::array::*;
use arrow::datatypes::Field;
use arrow::datatypes::{DataType, Field, Fields};
use arrow_array::{ArrayRef, StructArray};
use datafusion_common::{exec_err, Result};
use datafusion_expr::ColumnarValue;
use datafusion_expr::{ScalarUDFImpl, Signature, Volatility};
use std::any::Any;
use std::sync::Arc;

fn array_struct(args: &[ArrayRef]) -> Result<ArrayRef> {
Expand All @@ -47,10 +47,9 @@ fn array_struct(args: &[ArrayRef]) -> Result<ArrayRef> {

Ok(Arc::new(StructArray::from(vec)))
}

/// put values in a struct array.
pub fn struct_expr(values: &[ColumnarValue]) -> Result<ColumnarValue> {
let arrays = values
fn struct_expr(args: &[ColumnarValue]) -> Result<ColumnarValue> {
let arrays = args
.iter()
.map(|x| {
Ok(match x {
Expand All @@ -61,10 +60,55 @@ pub fn struct_expr(values: &[ColumnarValue]) -> Result<ColumnarValue> {
.collect::<Result<Vec<ArrayRef>>>()?;
Ok(ColumnarValue::Array(array_struct(arrays.as_slice())?))
}
#[derive(Debug)]
pub struct StructFunc {
signature: Signature,
}

impl StructFunc {
pub fn new() -> Self {
Self {
signature: Signature::variadic_any(Volatility::Immutable),
}
}
}

impl Default for StructFunc {
fn default() -> Self {
Self::new()
}
}

impl ScalarUDFImpl for StructFunc {
fn as_any(&self) -> &dyn Any {
self
}
fn name(&self) -> &str {
"struct"
}

fn signature(&self) -> &Signature {
&self.signature
}

fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
let return_fields = arg_types
.iter()
.enumerate()
.map(|(pos, dt)| Field::new(format!("c{pos}"), dt.clone(), true))
.collect::<Vec<Field>>();
Ok(DataType::Struct(Fields::from(return_fields)))
}

fn invoke(&self, args: &[ColumnarValue]) -> Result<ColumnarValue> {
struct_expr(args)
}
}

#[cfg(test)]
mod tests {
use super::*;
use arrow_array::Int64Array;
use datafusion_common::cast::as_struct_array;
use datafusion_common::ScalarValue;

Expand Down
4 changes: 1 addition & 3 deletions datafusion/physical-expr/src/functions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
use crate::sort_properties::SortProperties;
use crate::{
array_expressions, conditional_expressions, datetime_expressions, math_expressions,
string_expressions, struct_expressions, PhysicalExpr, ScalarFunctionExpr,
string_expressions, PhysicalExpr, ScalarFunctionExpr,
};
use arrow::{
array::ArrayRef,
Expand Down Expand Up @@ -362,8 +362,6 @@ pub fn create_physical_fun(
BuiltinScalarFunction::ArrayUnion => Arc::new(|args| {
make_scalar_function_inner(array_expressions::array_union)(args)
}),
// struct functions
BuiltinScalarFunction::Struct => Arc::new(struct_expressions::struct_expr),

// string functions
BuiltinScalarFunction::Ascii => Arc::new(|args| match args[0].data_type() {
Expand Down
1 change: 0 additions & 1 deletion datafusion/physical-expr/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ mod scalar_function;
mod sort_expr;
pub mod sort_properties;
pub mod string_expressions;
pub mod struct_expressions;
pub mod tree_node;
pub mod udf;
#[cfg(feature = "unicode_expressions")]
Expand Down
2 changes: 1 addition & 1 deletion datafusion/proto/proto/datafusion.proto
Original file line number Diff line number Diff line change
Expand Up @@ -614,7 +614,7 @@ enum ScalarFunction {
Upper = 62;
Coalesce = 63;
Power = 64;
StructFun = 65;
// 65 was StructFun
FromUnixtime = 66;
Atan2 = 67;
// 68 was DateBin
Expand Down
3 changes: 0 additions & 3 deletions datafusion/proto/src/generated/pbjson.rs

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 1 addition & 3 deletions datafusion/proto/src/generated/prost.rs

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 1 addition & 5 deletions datafusion/proto/src/logical_plan/from_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ use datafusion_expr::{
logical_plan::{PlanType, StringifiedPlan},
lower, lpad, ltrim, md5, nanvl, now, octet_length, overlay, pi, power, radians,
random, repeat, replace, reverse, right, round, rpad, rtrim, sha224, sha256, sha384,
sha512, signum, sin, sinh, split_part, sqrt, starts_with, strpos, struct_fun, substr,
sha512, signum, sin, sinh, split_part, sqrt, starts_with, strpos, substr,
substr_index, substring, tan, tanh, to_hex, translate, trim, trunc, upper, uuid,
AggregateFunction, Between, BinaryExpr, BuiltInWindowFunction, BuiltinScalarFunction,
Case, Cast, Expr, GetFieldAccess, GetIndexedField, GroupingSet,
Expand Down Expand Up @@ -534,7 +534,6 @@ impl From<&protobuf::ScalarFunction> for BuiltinScalarFunction {
ScalarFunction::Coalesce => Self::Coalesce,
ScalarFunction::Pi => Self::Pi,
ScalarFunction::Power => Self::Power,
ScalarFunction::StructFun => Self::Struct,
ScalarFunction::FromUnixtime => Self::FromUnixtime,
ScalarFunction::Atan2 => Self::Atan2,
ScalarFunction::Nanvl => Self::Nanvl,
Expand Down Expand Up @@ -1755,9 +1754,6 @@ pub fn parse_expr(
parse_expr(&args[0], registry, codec)?,
parse_expr(&args[1], registry, codec)?,
)),
ScalarFunction::StructFun => {
Ok(struct_fun(parse_expr(&args[0], registry, codec)?))
}
}
}
ExprType::ScalarUdfExpr(protobuf::ScalarUdfExprNode {
Expand Down
1 change: 0 additions & 1 deletion datafusion/proto/src/logical_plan/to_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1514,7 +1514,6 @@ impl TryFrom<&BuiltinScalarFunction> for protobuf::ScalarFunction {
BuiltinScalarFunction::Coalesce => Self::Coalesce,
BuiltinScalarFunction::Pi => Self::Pi,
BuiltinScalarFunction::Power => Self::Power,
BuiltinScalarFunction::Struct => Self::StructFun,
BuiltinScalarFunction::FromUnixtime => Self::FromUnixtime,
BuiltinScalarFunction::Atan2 => Self::Atan2,
BuiltinScalarFunction::Nanvl => Self::Nanvl,
Expand Down
10 changes: 8 additions & 2 deletions datafusion/sql/src/expr/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -596,8 +596,14 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
self.sql_expr_to_logical_expr(value, input_schema, planner_context)
})
.collect::<Result<Vec<_>>>()?;
Ok(Expr::ScalarFunction(ScalarFunction::new(
BuiltinScalarFunction::Struct,
let struct_func = self
.context_provider
.get_function_meta("struct")
.ok_or_else(|| {
internal_datafusion_err!("Unable to find expected 'struct' function")
})?;
Ok(Expr::ScalarFunction(ScalarFunction::new_udf(
struct_func,
args,
)))
}
Expand Down

0 comments on commit 75ad221

Please sign in to comment.