Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add array_reverse function to datafusion-function-* crate #9630

Merged
merged 2 commits into from
Mar 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 0 additions & 6 deletions datafusion/expr/src/built_in_function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,6 @@ pub enum BuiltinScalarFunction {
ArrayReplaceN,
/// array_replace_all
ArrayReplaceAll,
/// array_reverse
ArrayReverse,
/// array_intersect
ArrayIntersect,
/// array_union
Expand Down Expand Up @@ -289,7 +287,6 @@ impl BuiltinScalarFunction {
BuiltinScalarFunction::ArrayReplace => Volatility::Immutable,
BuiltinScalarFunction::ArrayReplaceN => Volatility::Immutable,
BuiltinScalarFunction::ArrayReplaceAll => Volatility::Immutable,
BuiltinScalarFunction::ArrayReverse => Volatility::Immutable,
BuiltinScalarFunction::ArrayIntersect => Volatility::Immutable,
BuiltinScalarFunction::ArrayUnion => Volatility::Immutable,
BuiltinScalarFunction::Ascii => Volatility::Immutable,
Expand Down Expand Up @@ -359,7 +356,6 @@ impl BuiltinScalarFunction {
BuiltinScalarFunction::ArrayReplace => Ok(input_expr_types[0].clone()),
BuiltinScalarFunction::ArrayReplaceN => Ok(input_expr_types[0].clone()),
BuiltinScalarFunction::ArrayReplaceAll => Ok(input_expr_types[0].clone()),
BuiltinScalarFunction::ArrayReverse => Ok(input_expr_types[0].clone()),
BuiltinScalarFunction::ArrayIntersect => {
match (input_expr_types[0].clone(), input_expr_types[1].clone()) {
(DataType::Null, DataType::Null) | (DataType::Null, _) => {
Expand Down Expand Up @@ -557,7 +553,6 @@ impl BuiltinScalarFunction {
BuiltinScalarFunction::ArrayReplaceAll => {
Signature::any(3, self.volatility())
}
BuiltinScalarFunction::ArrayReverse => Signature::any(1, self.volatility()),
BuiltinScalarFunction::ArrayIntersect => Signature::any(2, self.volatility()),
BuiltinScalarFunction::ArrayUnion => Signature::any(2, self.volatility()),

Expand Down Expand Up @@ -882,7 +877,6 @@ impl BuiltinScalarFunction {
BuiltinScalarFunction::ArrayReplaceAll => {
&["array_replace_all", "list_replace_all"]
}
BuiltinScalarFunction::ArrayReverse => &["array_reverse", "list_reverse"],
BuiltinScalarFunction::ArrayUnion => &["array_union", "list_union"],
BuiltinScalarFunction::ArrayIntersect => {
&["array_intersect", "list_intersect"]
Expand Down
6 changes: 0 additions & 6 deletions datafusion/expr/src/expr_fn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -638,12 +638,6 @@ scalar_expr!(
array from to,
"replaces all occurrences of the specified element with another specified element."
);
scalar_expr!(
ArrayReverse,
array_reverse,
array,
"reverses the order of elements in the array."
);
scalar_expr!(ArrayUnion, array_union, array1 array2, "returns an array of the elements in the union of array1 and array2 without duplicates.");

scalar_expr!(
Expand Down
69 changes: 69 additions & 0 deletions datafusion/functions-array/src/kernels.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1202,3 +1202,72 @@ pub fn general_array_distinct<OffsetSize: OffsetSizeTrait>(
None,
)?))
}

/// array_reverse SQL function
pub fn array_reverse(arg: &[ArrayRef]) -> Result<ArrayRef> {
if arg.len() != 1 {
return exec_err!("array_reverse needs one argument");
}

match &arg[0].data_type() {
DataType::List(field) => {
let array = as_list_array(&arg[0])?;
general_array_reverse::<i32>(array, field)
}
DataType::LargeList(field) => {
let array = as_large_list_array(&arg[0])?;
general_array_reverse::<i64>(array, field)
}
DataType::Null => Ok(arg[0].clone()),
array_type => exec_err!("array_reverse does not support type '{array_type:?}'."),
}
}

fn general_array_reverse<O: OffsetSizeTrait>(
array: &GenericListArray<O>,
field: &FieldRef,
) -> Result<ArrayRef>
where
O: TryFrom<i64>,
{
let values = array.values();
let original_data = values.to_data();
let capacity = Capacities::Array(original_data.len());
let mut offsets = vec![O::usize_as(0)];
let mut nulls = vec![];
let mut mutable =
MutableArrayData::with_capacities(vec![&original_data], false, capacity);

for (row_index, offset_window) in array.offsets().windows(2).enumerate() {
// skip the null value
if array.is_null(row_index) {
nulls.push(false);
offsets.push(offsets[row_index] + O::one());
mutable.extend(0, 0, 1);
continue;
} else {
nulls.push(true);
}

let start = offset_window[0];
let end = offset_window[1];

let mut index = end - O::one();
let mut cnt = 0;

while index >= start {
mutable.extend(0, index.to_usize().unwrap(), index.to_usize().unwrap() + 1);
index = index - O::one();
cnt += 1;
}
offsets.push(offsets[row_index] + O::usize_as(cnt));
}

let data = mutable.freeze();
Ok(Arc::new(GenericListArray::<O>::try_new(
field.clone(),
OffsetBuffer::<O>::new(offsets.into()),
arrow_array::make_array(data),
Some(nulls.into()),
)?))
}
2 changes: 2 additions & 0 deletions datafusion/functions-array/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ pub mod expr_fn {
pub use super::udf::array_ndims;
pub use super::udf::array_repeat;
pub use super::udf::array_resize;
pub use super::udf::array_reverse;
pub use super::udf::array_sort;
pub use super::udf::array_to_string;
pub use super::udf::cardinality;
Expand Down Expand Up @@ -100,6 +101,7 @@ pub fn register_all(registry: &mut dyn FunctionRegistry) -> Result<()> {
udf::array_distinct_udf(),
udf::array_repeat_udf(),
udf::array_resize_udf(),
udf::array_reverse_udf(),
];
functions.into_iter().try_for_each(|udf| {
let existing_udf = registry.register_udf(udf)?;
Expand Down
49 changes: 49 additions & 0 deletions datafusion/functions-array/src/udf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -882,3 +882,52 @@ impl ScalarUDFImpl for crate::udf::ArrayDistinct {
&self.aliases
}
}

make_udf_function!(
ArrayReverse,
array_reverse,
array,
"reverses the order of elements in the array.",
array_reverse_udf
);

#[derive(Debug)]
pub(super) struct ArrayReverse {
signature: Signature,
aliases: Vec<String>,
}

impl crate::udf::ArrayReverse {
pub fn new() -> Self {
Self {
signature: Signature::any(1, Volatility::Immutable),
aliases: vec!["array_reverse".to_string(), "list_reverse".to_string()],
}
}
}

impl ScalarUDFImpl for crate::udf::ArrayReverse {
fn as_any(&self) -> &dyn Any {
self
}
fn name(&self) -> &str {
"array_reserse"
}

fn signature(&self) -> &Signature {
&self.signature
}

fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
Ok(arg_types[0].clone())
}

fn invoke(&self, args: &[ColumnarValue]) -> Result<ColumnarValue> {
let args = ColumnarValue::values_to_arrays(args)?;
crate::kernels::array_reverse(&args).map(ColumnarValue::Array)
}

fn aliases(&self) -> &[String] {
&self.aliases
}
}
69 changes: 0 additions & 69 deletions datafusion/physical-expr/src/array_expressions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -990,72 +990,3 @@ pub fn general_array_distinct<OffsetSize: OffsetSizeTrait>(
None,
)?))
}

/// array_reverse SQL function
pub fn array_reverse(arg: &[ArrayRef]) -> Result<ArrayRef> {
if arg.len() != 1 {
return exec_err!("array_reverse needs one argument");
}

match &arg[0].data_type() {
DataType::List(field) => {
let array = as_list_array(&arg[0])?;
general_array_reverse::<i32>(array, field)
}
DataType::LargeList(field) => {
let array = as_large_list_array(&arg[0])?;
general_array_reverse::<i64>(array, field)
}
DataType::Null => Ok(arg[0].clone()),
array_type => exec_err!("array_reverse does not support type '{array_type:?}'."),
}
}

fn general_array_reverse<O: OffsetSizeTrait>(
array: &GenericListArray<O>,
field: &FieldRef,
) -> Result<ArrayRef>
where
O: TryFrom<i64>,
{
let values = array.values();
let original_data = values.to_data();
let capacity = Capacities::Array(original_data.len());
let mut offsets = vec![O::usize_as(0)];
let mut nulls = vec![];
let mut mutable =
MutableArrayData::with_capacities(vec![&original_data], false, capacity);

for (row_index, offset_window) in array.offsets().windows(2).enumerate() {
// skip the null value
if array.is_null(row_index) {
nulls.push(false);
offsets.push(offsets[row_index] + O::one());
mutable.extend(0, 0, 1);
continue;
} else {
nulls.push(true);
}

let start = offset_window[0];
let end = offset_window[1];

let mut index = end - O::one();
let mut cnt = 0;

while index >= start {
mutable.extend(0, index.to_usize().unwrap(), index.to_usize().unwrap() + 1);
index = index - O::one();
cnt += 1;
}
offsets.push(offsets[row_index] + O::usize_as(cnt));
}

let data = mutable.freeze();
Ok(Arc::new(GenericListArray::<O>::try_new(
field.clone(),
OffsetBuffer::<O>::new(offsets.into()),
arrow_array::make_array(data),
Some(nulls.into()),
)?))
}
3 changes: 0 additions & 3 deletions datafusion/physical-expr/src/functions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -282,9 +282,6 @@ pub fn create_physical_fun(
BuiltinScalarFunction::ArrayReplaceAll => Arc::new(|args| {
make_scalar_function_inner(array_expressions::array_replace_all)(args)
}),
BuiltinScalarFunction::ArrayReverse => Arc::new(|args| {
make_scalar_function_inner(array_expressions::array_reverse)(args)
}),
BuiltinScalarFunction::ArrayIntersect => Arc::new(|args| {
make_scalar_function_inner(array_expressions::array_intersect)(args)
}),
Expand Down
22 changes: 11 additions & 11 deletions datafusion/proto/proto/datafusion.proto
Original file line number Diff line number Diff line change
Expand Up @@ -661,23 +661,23 @@ enum ScalarFunction {
ArrayIntersect = 119;
ArrayUnion = 120;
OverLay = 121;
/// 122 is Range
// 122 is Range
ArrayExcept = 123;
// 124 was ArrayPopFront
Levenshtein = 125;
SubstrIndex = 126;
FindInSet = 127;
/// 128 was ArraySort
/// 129 was ArrayDistinct
/// 130 was ArrayResize
// 128 was ArraySort
// 129 was ArrayDistinct
// 130 was ArrayResize
EndsWith = 131;
/// 132 was InStr
/// 133 was MakeDate
ArrayReverse = 134;
/// 135 is RegexpLike
/// 136 was ToChar
/// 137 was ToDate
/// 138 was ToUnixtime
// 132 was InStr
// 133 was MakeDate
// 134 was ArrayReverse
// 135 is RegexpLike
// 136 was ToChar
// 137 was ToDate
// 138 was ToUnixtime
}

message ScalarFunctionNode {
Expand Down
3 changes: 0 additions & 3 deletions datafusion/proto/src/generated/pbjson.rs

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

26 changes: 12 additions & 14 deletions datafusion/proto/src/generated/prost.rs

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading
Loading