Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(rust): bump arrow v51 and datafusion v37.1 #2395

Merged
merged 11 commits into from
Apr 26, 2024
45 changes: 21 additions & 24 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,9 +1,5 @@
[workspace]
members = [
"crates/*",
"delta-inspect",
"python",
]
members = ["crates/*", "delta-inspect", "python"]
exclude = ["proofs"]
resolver = "2"

Expand Down Expand Up @@ -31,28 +27,29 @@ debug = "line-tables-only"

[workspace.dependencies]
# arrow
arrow = { version = "50" }
arrow-arith = { version = "50" }
arrow-array = { version = "50", features = ["chrono-tz"]}
arrow-buffer = { version = "50" }
arrow-cast = { version = "50" }
arrow-ipc = { version = "50" }
arrow-json = { version = "50" }
arrow-ord = { version = "50" }
arrow-row = { version = "50" }
arrow-schema = { version = "50" }
arrow-select = { version = "50" }
arrow = { version = "51" }
arrow-arith = { version = "51" }
arrow-array = { version = "51", features = ["chrono-tz"] }
arrow-buffer = { version = "51" }
arrow-cast = { version = "51" }
arrow-ipc = { version = "51" }
arrow-json = { version = "51" }
arrow-ord = { version = "51" }
arrow-row = { version = "51" }
arrow-schema = { version = "51" }
arrow-select = { version = "51" }
object_store = { version = "0.9" }
parquet = { version = "50" }
parquet = { version = "51" }

# datafusion
datafusion = { version = "36" }
datafusion-expr = { version = "36" }
datafusion-common = { version = "36" }
datafusion-proto = { version = "36" }
datafusion-sql = { version = "36" }
datafusion-physical-expr = { version = "36" }
datafusion-functions = { version = "36" }
datafusion = { version = "37.1" }
datafusion-expr = { version = "37.1" }
datafusion-common = { version = "37.1" }
datafusion-proto = { version = "37.1" }
datafusion-sql = { version = "37.1" }
datafusion-physical-expr = { version = "37.1" }
datafusion-functions = { version = "37.1" }
datafusion-functions-array = { version = "37.1" }

# serde
serde = { version = "1.0.194", features = ["derive"] }
Expand Down
2 changes: 2 additions & 0 deletions crates/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ datafusion-proto = { workspace = true, optional = true }
datafusion-sql = { workspace = true, optional = true }
datafusion-physical-expr = { workspace = true, optional = true }
datafusion-functions = { workspace = true, optional = true }
datafusion-functions-array = { workspace = true, optional = true }

# serde
serde = { workspace = true, features = ["derive"] }
Expand Down Expand Up @@ -123,6 +124,7 @@ datafusion = [
"datafusion-physical-expr",
"datafusion-sql",
"datafusion-functions",
"datafusion-functions-array",
"sqlparser",
]
datafusion-ext = ["datafusion"]
Expand Down
13 changes: 7 additions & 6 deletions crates/core/src/data_catalog/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,12 +110,13 @@ impl SchemaProvider for ListingSchemaProvider {
self.tables.iter().map(|t| t.key().clone()).collect()
}

async fn table(&self, name: &str) -> Option<Arc<dyn TableProvider>> {
let location = self.tables.get(name).map(|t| t.clone())?;
let provider = open_table_with_storage_options(location, self.storage_options.0.clone())
.await
.ok()?;
Some(Arc::new(provider) as Arc<dyn TableProvider>)
async fn table(&self, name: &str) -> datafusion_common::Result<Option<Arc<dyn TableProvider>>> {
let Some(location) = self.tables.get(name).map(|t| t.clone()) else {
return Ok(None);
};
let provider =
open_table_with_storage_options(location, self.storage_options.0.clone()).await?;
Ok(Some(Arc::new(provider) as Arc<dyn TableProvider>))
}

fn register_table(
Expand Down
12 changes: 6 additions & 6 deletions crates/core/src/data_catalog/unity/datafusion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use dashmap::DashMap;
use datafusion::catalog::schema::SchemaProvider;
use datafusion::catalog::{CatalogProvider, CatalogProviderList};
use datafusion::datasource::TableProvider;
use datafusion_common::DataFusionError;
use tracing::error;

use super::models::{GetTableResponse, ListCatalogsResponse, ListTableSummariesResponse};
Expand Down Expand Up @@ -180,25 +181,24 @@ impl SchemaProvider for UnitySchemaProvider {
self.table_names.clone()
}

async fn table(&self, name: &str) -> Option<Arc<dyn TableProvider>> {
async fn table(&self, name: &str) -> datafusion_common::Result<Option<Arc<dyn TableProvider>>> {
let maybe_table = self
.client
.get_table(&self.catalog_name, &self.schema_name, name)
.await
.ok()?;
.map_err(|err| DataFusionError::External(Box::new(err)))?;

match maybe_table {
GetTableResponse::Success(table) => {
let table = DeltaTableBuilder::from_uri(table.storage_location)
.with_storage_options(self.storage_options.clone())
.load()
.await
.ok()?;
Some(Arc::new(table))
.await?;
Ok(Some(Arc::new(table)))
}
GetTableResponse::Error(err) => {
error!("failed to fetch table from unity catalog: {}", err.message);
None
Err(DataFusionError::External(Box::new(err)))
}
}
}
Expand Down
9 changes: 8 additions & 1 deletion crates/core/src/data_catalog/unity/models.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,24 @@
//! Api models for databricks unity catalog APIs

use core::fmt;
use std::collections::HashMap;

use serde::Deserialize;

/// Error response from unity API
#[derive(Deserialize)]
#[derive(Debug, Deserialize)]
pub struct ErrorResponse {
/// The error code
pub error_code: String,
/// The error message
pub message: String,
}
impl fmt::Display for ErrorResponse {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
writeln!(f, "[{}] {}", self.error_code, self.message)
}
}
impl std::error::Error for ErrorResponse {}

/// List catalogs response
#[derive(Deserialize)]
Expand Down
93 changes: 73 additions & 20 deletions crates/core/src/delta_datafusion/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
//! Utility functions for Datafusion's Expressions

use std::{
fmt::{self, format, Display, Error, Formatter, Write},
fmt::{self, Display, Error, Formatter, Write},
sync::Arc,
};

Expand Down Expand Up @@ -76,6 +76,18 @@ impl<'a> ContextProvider for DeltaContextProvider<'a> {
fn get_table_source(&self, _name: TableReference) -> DFResult<Arc<dyn TableSource>> {
unimplemented!()
}

fn udfs_names(&self) -> Vec<String> {
unimplemented!()
}

fn udafs_names(&self) -> Vec<String> {
unimplemented!()
}

fn udwfs_names(&self) -> Vec<String> {
unimplemented!()
}
}

/// Parse a string predicate into an `Expr`
Expand Down Expand Up @@ -416,8 +428,13 @@ mod test {
use arrow_schema::DataType as ArrowDataType;
use datafusion::prelude::SessionContext;
use datafusion_common::{Column, ScalarValue, ToDFSchema};
use datafusion_expr::{cardinality, col, lit, substring, Cast, Expr, ExprSchemable};
use datafusion_expr::expr::ScalarFunction;
use datafusion_expr::{
col, lit, substring, BinaryExpr, Cast, Expr, ExprSchemable, ScalarFunctionDefinition,
};
use datafusion_functions::core::arrow_cast;
use datafusion_functions::encoding::expr_fn::decode;
use datafusion_functions_array::expr_fn::cardinality;

use crate::delta_datafusion::{DataFusionMixins, DeltaSessionContext};
use crate::kernel::{ArrayType, DataType, PrimitiveType, StructField, StructType};
Expand Down Expand Up @@ -539,13 +556,24 @@ mod test {

// String expression that we output must be parsable for conflict resolution.
let tests = vec![
simple!(
Expr::Cast(Cast {
ParseTest {
expr: Expr::Cast(Cast {
expr: Box::new(lit(1_i64)),
data_type: ArrowDataType::Int32
}),
"arrow_cast(1, 'Int32')".to_string()
),
expected: "arrow_cast(1, 'Int32')".to_string(),
override_expected_expr: Some(
datafusion_expr::Expr::ScalarFunction(
ScalarFunction {
func_def: ScalarFunctionDefinition::UDF(arrow_cast()),
args: vec![
lit(ScalarValue::Int64(Some(1))),
lit(ScalarValue::Utf8(Some("Int32".into())))
]
}
)
),
},
simple!(
Expr::Column(Column::from_qualified_name_ignore_case("Value3")).eq(lit(3_i64)),
"Value3 = 3".to_string()
Expand Down Expand Up @@ -624,9 +652,8 @@ mod test {
substring(col("modified"), lit(0_i64), lit(4_i64)).eq(lit("2021")),
"substr(modified, 0, 4) = '2021'".to_string()
),
simple!(
col("value")
.cast_to(
ParseTest {
expr: col("value").cast_to(
&arrow_schema::DataType::Utf8,
&table
.snapshot()
Expand All @@ -640,8 +667,23 @@ mod test {
)
.unwrap()
.eq(lit("1")),
"arrow_cast(value, 'Utf8') = '1'".to_string()
),
expected: "arrow_cast(value, 'Utf8') = '1'".to_string(),
override_expected_expr: Some(
datafusion_expr::Expr::BinaryExpr(BinaryExpr {
left: Box::new(datafusion_expr::Expr::ScalarFunction(
ScalarFunction {
func_def: ScalarFunctionDefinition::UDF(arrow_cast()),
args: vec![
col("value"),
lit(ScalarValue::Utf8(Some("Utf8".into())))
]
}
)),
op: datafusion_expr::Operator::Eq,
right: Box::new(lit(ScalarValue::Utf8(Some("1".into()))))
})
),
},
simple!(
col("_struct").field("a").eq(lit(20_i64)),
"_struct['a'] = 20".to_string()
Expand All @@ -662,11 +704,16 @@ mod test {
expr: col("_timestamp_ntz").gt(lit(ScalarValue::TimestampMicrosecond(Some(1262304000000000), None))),
expected: "_timestamp_ntz > arrow_cast('2010-01-01T00:00:00.000000', 'Timestamp(Microsecond, None)')".to_string(),
override_expected_expr: Some(col("_timestamp_ntz").gt(
datafusion_expr::Expr::Cast( Cast {
expr: Box::new(lit(ScalarValue::Utf8(Some("2010-01-01T00:00:00.000000".into())))),
data_type:ArrowDataType::Timestamp(arrow_schema::TimeUnit::Microsecond, None)
}
))),
datafusion_expr::Expr::ScalarFunction(
ScalarFunction {
func_def: ScalarFunctionDefinition::UDF(arrow_cast()),
args: vec![
lit(ScalarValue::Utf8(Some("2010-01-01T00:00:00.000000".into()))),
lit(ScalarValue::Utf8(Some("Timestamp(Microsecond, None)".into())))
]
}
)
)),
},
ParseTest {
expr: col("_timestamp").gt(lit(ScalarValue::TimestampMicrosecond(
Expand All @@ -675,10 +722,16 @@ mod test {
))),
expected: "_timestamp > arrow_cast('2010-01-01T00:00:00.000000', 'Timestamp(Microsecond, Some(\"UTC\"))')".to_string(),
override_expected_expr: Some(col("_timestamp").gt(
datafusion_expr::Expr::Cast( Cast {
expr: Box::new(lit(ScalarValue::Utf8(Some("2010-01-01T00:00:00.000000".into())))),
data_type:ArrowDataType::Timestamp(arrow_schema::TimeUnit::Microsecond, Some("UTC".into()))
}))),
datafusion_expr::Expr::ScalarFunction(
ScalarFunction {
func_def: ScalarFunctionDefinition::UDF(arrow_cast()),
args: vec![
lit(ScalarValue::Utf8(Some("2010-01-01T00:00:00.000000".into()))),
lit(ScalarValue::Utf8(Some("Timestamp(Microsecond, Some(\"UTC\"))".into())))
]
}
)
)),
},
];

Expand Down
7 changes: 4 additions & 3 deletions crates/core/src/delta_datafusion/find_files/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ use datafusion::physical_planner::{DefaultPhysicalPlanner, ExtensionPlanner, Phy
use datafusion::prelude::SessionContext;
use datafusion_common::{DFSchemaRef, Result, ToDFSchema};
use datafusion_expr::{col, Expr, LogicalPlan, UserDefinedLogicalNode};
use datafusion_physical_expr::create_physical_expr;
use lazy_static::lazy_static;

use crate::delta_datafusion::find_files::logical::FindFilesNode;
Expand All @@ -29,6 +28,8 @@ use crate::logstore::LogStoreRef;
use crate::table::state::DeltaTableState;
use crate::DeltaTableError;

use super::create_physical_expr_fix;

pub mod logical;
pub mod physical;

Expand Down Expand Up @@ -160,8 +161,8 @@ async fn scan_table_by_files(
let input_schema = scan.logical_schema.as_ref().to_owned();
let input_dfschema = input_schema.clone().try_into()?;

let predicate_expr = create_physical_expr(
&Expr::IsTrue(Box::new(expression.clone())),
let predicate_expr = create_physical_expr_fix(
Expr::IsTrue(Box::new(expression.clone())),
&input_dfschema,
state.execution_props(),
)?;
Expand Down
20 changes: 12 additions & 8 deletions crates/core/src/delta_datafusion/find_files/physical.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,13 @@ use arrow_schema::SchemaRef;
use datafusion::error::Result;
use datafusion::execution::{RecordBatchStream, SendableRecordBatchStream, TaskContext};
use datafusion::physical_plan::memory::MemoryStream;
use datafusion::physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan};
use datafusion::physical_plan::{
DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan, PlanProperties,
};
use datafusion::prelude::SessionContext;
use datafusion_common::tree_node::TreeNode;
use datafusion_expr::Expr;
use datafusion_physical_expr::{Partitioning, PhysicalSortExpr};
use datafusion_physical_expr::{EquivalenceProperties, Partitioning};
use futures::stream::BoxStream;
use futures::{FutureExt, Stream, StreamExt, TryStreamExt};

Expand All @@ -28,6 +30,7 @@ pub struct FindFilesExec {
predicate: Expr,
state: DeltaTableState,
log_store: LogStoreRef,
plan_properties: PlanProperties,
}

impl FindFilesExec {
Expand All @@ -36,6 +39,11 @@ impl FindFilesExec {
predicate,
log_store,
state,
plan_properties: PlanProperties::new(
EquivalenceProperties::new(ONLY_FILES_SCHEMA.clone()),
Partitioning::RoundRobinBatch(num_cpus::get()),
ExecutionMode::Bounded,
),
})
}
}
Expand Down Expand Up @@ -85,12 +93,8 @@ impl ExecutionPlan for FindFilesExec {
ONLY_FILES_SCHEMA.clone()
}

fn output_partitioning(&self) -> Partitioning {
Partitioning::RoundRobinBatch(num_cpus::get())
}

fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
None
fn properties(&self) -> &PlanProperties {
&self.plan_properties
}

fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
Expand Down
Loading
Loading