Skip to content

Commit

Permalink
Merge remote-tracking branch 'apache/main' into string-view
Browse files Browse the repository at this point in the history
  • Loading branch information
alamb committed Jul 15, 2024
2 parents 921afdf + bfd8156 commit e942e23
Show file tree
Hide file tree
Showing 71 changed files with 13,334 additions and 659 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/dev_pr/labeler.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@

development-process:
- changed-files:
- any-glob-to-any-file: ['dev/**.*', '.github/**.*', 'ci/**.*', '.asf.yaml']
- any-glob-to-any-file: ['dev/**/*', '.github/**/*', 'ci/**/*', '.asf.yaml']

documentation:
- changed-files:
- any-glob-to-any-file: ['docs/**.*', 'README.md', './**/README.md', 'DEVELOPERS.md', 'datafusion/docs/**.*']
- any-glob-to-any-file: ['docs/**/*', 'README.md', './**/README.md', 'DEVELOPERS.md', 'datafusion/docs/**/*']

sql:
- changed-files:
Expand Down
11 changes: 9 additions & 2 deletions datafusion-cli/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use datafusion::datasource::listing::{
use datafusion::datasource::TableProvider;
use datafusion::error::Result;
use datafusion::execution::context::SessionState;
use datafusion::execution::session_state::SessionStateBuilder;

use async_trait::async_trait;
use dirs::home_dir;
Expand Down Expand Up @@ -162,6 +163,7 @@ impl SchemaProvider for DynamicFileSchemaProvider {
.ok_or_else(|| plan_datafusion_err!("locking error"))?
.read()
.clone();
let mut builder = SessionStateBuilder::from(state.clone());
let optimized_name = substitute_tilde(name.to_owned());
let table_url = ListingTableUrl::parse(optimized_name.as_str())?;
let scheme = table_url.scheme();
Expand All @@ -178,13 +180,18 @@ impl SchemaProvider for DynamicFileSchemaProvider {
// to any command options so the only choice is to use an empty collection
match scheme {
"s3" | "oss" | "cos" => {
state = state.add_table_options_extension(AwsOptions::default());
if let Some(table_options) = builder.table_options() {
table_options.extensions.insert(AwsOptions::default())
}
}
"gs" | "gcs" => {
state = state.add_table_options_extension(GcpOptions::default())
if let Some(table_options) = builder.table_options() {
table_options.extensions.insert(GcpOptions::default())
}
}
_ => {}
};
state = builder.build();
let store = get_object_store(
&state,
table_url.scheme(),
Expand Down
4 changes: 2 additions & 2 deletions datafusion-examples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,8 @@ cargo run --example dataframe
- [`parquet_index.rs`](examples/parquet_index.rs): Create an secondary index over several parquet files and use it to speed up queries
- [`parquet_sql_multiple_files.rs`](examples/parquet_sql_multiple_files.rs): Build and run a query plan from a SQL statement against multiple local Parquet files
- [`parquet_exec_visitor.rs`](examples/parquet_exec_visitor.rs): Extract statistics by visiting an ExecutionPlan after execution
- [`parse_sql_expr.rs`](examples/parse_sql_expr.rs): Parse SQL text into Datafusion `Expr`.
- [`plan_to_sql.rs`](examples/plan_to_sql.rs): Generate SQL from Datafusion `Expr` and `LogicalPlan`
- [`parse_sql_expr.rs`](examples/parse_sql_expr.rs): Parse SQL text into DataFusion `Expr`.
- [`plan_to_sql.rs`](examples/plan_to_sql.rs): Generate SQL from DataFusion `Expr` and `LogicalPlan`
- [`pruning.rs`](examples/pruning.rs): Use pruning to rule out files based on statistics
- [`query-aws-s3.rs`](examples/external_dependency/query-aws-s3.rs): Configure `object_store` and run a query against files stored in AWS S3
- [`query-http-csv.rs`](examples/query-http-csv.rs): Configure `object_store` and run a query against files vi HTTP
Expand Down
9 changes: 4 additions & 5 deletions datafusion-examples/examples/custom_file_format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use arrow::{
datatypes::UInt64Type,
};
use arrow_schema::{DataType, Field, Schema, SchemaRef};
use datafusion::execution::session_state::SessionStateBuilder;
use datafusion::{
datasource::{
file_format::{
Expand All @@ -32,9 +33,9 @@ use datafusion::{
MemTable,
},
error::Result,
execution::{context::SessionState, runtime_env::RuntimeEnv},
execution::context::SessionState,
physical_plan::ExecutionPlan,
prelude::{SessionConfig, SessionContext},
prelude::SessionContext,
};
use datafusion_common::{GetExt, Statistics};
use datafusion_physical_expr::{PhysicalExpr, PhysicalSortRequirement};
Expand Down Expand Up @@ -176,9 +177,7 @@ impl GetExt for TSVFileFactory {
#[tokio::main]
async fn main() -> Result<()> {
// Create a new context with the default configuration
let config = SessionConfig::new();
let runtime = RuntimeEnv::default();
let mut state = SessionState::new_with_config_rt(config, Arc::new(runtime));
let mut state = SessionStateBuilder::new().with_default_features().build();

// Register the custom file format
let file_format = Arc::new(TSVFileFactory::new());
Expand Down
2 changes: 1 addition & 1 deletion datafusion-examples/examples/expr_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ async fn main() -> Result<()> {
Ok(())
}

/// Datafusion's `expr_fn` API makes it easy to create [`Expr`]s for the
/// DataFusion's `expr_fn` API makes it easy to create [`Expr`]s for the
/// full range of expression types such as aggregates and window functions.
fn expr_fn_demo() -> Result<()> {
// Let's say you want to call the "first_value" aggregate function
Expand Down
2 changes: 1 addition & 1 deletion datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ config_namespace! {
/// Currently experimental
pub split_file_groups_by_statistics: bool, default = false

/// Should Datafusion keep the columns used for partition_by in the output RecordBatches
/// Should DataFusion keep the columns used for partition_by in the output RecordBatches
pub keep_partition_by_columns: bool, default = false
}
}
Expand Down
3 changes: 3 additions & 0 deletions datafusion/common/src/file_options/csv_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@ impl TryFrom<&CsvOptions> for CsvWriterOptions {
if let Some(v) = &value.timestamp_format {
builder = builder.with_timestamp_format(v.into())
}
if let Some(v) = &value.timestamp_tz_format {
builder = builder.with_timestamp_tz_format(v.into())
}
if let Some(v) = &value.time_format {
builder = builder.with_time_format(v.into())
}
Expand Down
33 changes: 29 additions & 4 deletions datafusion/core/src/dataframe/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -896,9 +896,8 @@ impl DataFrame {
join_type: JoinType,
on_exprs: impl IntoIterator<Item = Expr>,
) -> Result<DataFrame> {
let expr = on_exprs.into_iter().reduce(Expr::and);
let plan = LogicalPlanBuilder::from(self.plan)
.join_on(right.plan, join_type, expr)?
.join_on(right.plan, join_type, on_exprs)?
.build()?;
Ok(DataFrame {
session_state: self.session_state,
Expand Down Expand Up @@ -1472,7 +1471,7 @@ impl DataFrame {
///
/// The method supports case sensitive rename with wrapping column name into one of following symbols ( " or ' or ` )
///
/// Alternatively setting Datafusion param `datafusion.sql_parser.enable_ident_normalization` to `false` will enable
/// Alternatively setting DataFusion param `datafusion.sql_parser.enable_ident_normalization` to `false` will enable
/// case sensitive rename without need to wrap column name into special symbols
///
/// # Example
Expand Down Expand Up @@ -1694,7 +1693,7 @@ mod tests {
use crate::test_util::{register_aggregate_csv, test_table, test_table_with_name};

use arrow::array::{self, Int32Array};
use datafusion_common::{Constraint, Constraints};
use datafusion_common::{Constraint, Constraints, ScalarValue};
use datafusion_common_runtime::SpawnedTask;
use datafusion_expr::{
array_agg, cast, create_udf, expr, lit, BuiltInWindowFunction,
Expand Down Expand Up @@ -2555,6 +2554,32 @@ mod tests {
Ok(())
}

#[tokio::test]
async fn join_on_filter_datatype() -> Result<()> {
let left = test_table_with_name("a").await?.select_columns(&["c1"])?;
let right = test_table_with_name("b").await?.select_columns(&["c1"])?;

// JOIN ON untyped NULL
let join = left.clone().join_on(
right.clone(),
JoinType::Inner,
Some(Expr::Literal(ScalarValue::Null)),
)?;
let expected_plan = "CrossJoin:\
\n TableScan: a projection=[c1], full_filters=[Boolean(NULL)]\
\n TableScan: b projection=[c1]";
assert_eq!(expected_plan, format!("{:?}", join.into_optimized_plan()?));

// JOIN ON expression must be boolean type
let join = left.join_on(right, JoinType::Inner, Some(lit("TRUE")))?;
let expected = join.into_optimized_plan().unwrap_err();
assert_eq!(
expected.strip_backtrace(),
"type_coercion\ncaused by\nError during planning: Join condition must be boolean type, but got Utf8"
);
Ok(())
}

#[tokio::test]
async fn join_ambiguous_filter() -> Result<()> {
let left = test_table_with_name("a")
Expand Down
7 changes: 6 additions & 1 deletion datafusion/core/src/datasource/file_format/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -632,6 +632,7 @@ mod tests {
use datafusion_execution::runtime_env::{RuntimeConfig, RuntimeEnv};
use datafusion_expr::{col, lit};

use crate::execution::session_state::SessionStateBuilder;
use chrono::DateTime;
use object_store::local::LocalFileSystem;
use object_store::path::Path;
Expand Down Expand Up @@ -814,7 +815,11 @@ mod tests {
let runtime = Arc::new(RuntimeEnv::new(RuntimeConfig::new()).unwrap());
let mut cfg = SessionConfig::new();
cfg.options_mut().catalog.has_header = true;
let session_state = SessionState::new_with_config_rt(cfg, runtime);
let session_state = SessionStateBuilder::new()
.with_config(cfg)
.with_runtime_env(runtime)
.with_default_features()
.build();
let integration = LocalFileSystem::new_with_prefix(arrow_test_data()).unwrap();
let path = Path::from("csv/aggregate_test_100.csv");
let csv = CsvFormat::default().with_has_header(true);
Expand Down
32 changes: 16 additions & 16 deletions datafusion/core/src/datasource/file_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -893,12 +893,12 @@ async fn send_arrays_to_col_writers(
let mut next_channel = 0;
for (array, field) in rb.columns().iter().zip(schema.fields()) {
for c in compute_leaves(field, array)? {
col_array_channels[next_channel]
.send(c)
.await
.map_err(|_| {
DataFusionError::Internal("Unable to send array to writer!".into())
})?;
// Do not surface error from closed channel (means something
// else hit an error, and the plan is shutting down).
if col_array_channels[next_channel].send(c).await.is_err() {
return Ok(());
}

next_channel += 1;
}
}
Expand Down Expand Up @@ -984,11 +984,11 @@ fn spawn_parquet_parallel_serialization_task(
&pool,
);

serialize_tx.send(finalize_rg_task).await.map_err(|_| {
DataFusionError::Internal(
"Unable to send closed RG to concat task!".into(),
)
})?;
// Do not surface error from closed channel (means something
// else hit an error, and the plan is shutting down).
if serialize_tx.send(finalize_rg_task).await.is_err() {
return Ok(());
}

current_rg_rows = 0;
rb = rb.slice(rows_left, rb.num_rows() - rows_left);
Expand All @@ -1013,11 +1013,11 @@ fn spawn_parquet_parallel_serialization_task(
&pool,
);

serialize_tx.send(finalize_rg_task).await.map_err(|_| {
DataFusionError::Internal(
"Unable to send closed RG to concat task!".into(),
)
})?;
// Do not surface error from closed channel (means something
// else hit an error, and the plan is shutting down).
if serialize_tx.send(finalize_rg_task).await.is_err() {
return Ok(());
}
}

Ok(())
Expand Down
27 changes: 16 additions & 11 deletions datafusion/core/src/datasource/file_format/write/orchestration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,15 +42,20 @@ use tokio::task::JoinSet;
type WriterType = Box<dyn AsyncWrite + Send + Unpin>;
type SerializerType = Arc<dyn BatchSerializer>;

/// Serializes a single data stream in parallel and writes to an ObjectStore
/// concurrently. Data order is preserved. In the event of an error,
/// the ObjectStore writer is returned to the caller in addition to an error,
/// so that the caller may handle aborting failed writes.
/// Serializes a single data stream in parallel and writes to an ObjectStore concurrently.
/// Data order is preserved.
///
/// In the event of a non-IO error which does not involve the ObjectStore writer,
/// the writer returned to the caller in addition to the error,
/// so that failed writes may be aborted.
///
/// In the event of an IO error involving the ObjectStore writer,
/// the writer is dropped to avoid calling further methods on it which might panic.
pub(crate) async fn serialize_rb_stream_to_object_store(
mut data_rx: Receiver<RecordBatch>,
serializer: Arc<dyn BatchSerializer>,
mut writer: WriterType,
) -> std::result::Result<(WriterType, u64), (WriterType, DataFusionError)> {
) -> std::result::Result<(WriterType, u64), (Option<WriterType>, DataFusionError)> {
let (tx, mut rx) =
mpsc::channel::<SpawnedTask<Result<(usize, Bytes), DataFusionError>>>(100);
let serialize_task = SpawnedTask::spawn(async move {
Expand Down Expand Up @@ -82,7 +87,7 @@ pub(crate) async fn serialize_rb_stream_to_object_store(
Ok(_) => (),
Err(e) => {
return Err((
writer,
None,
DataFusionError::Execution(format!(
"Error writing to object store: {e}"
)),
Expand All @@ -93,12 +98,12 @@ pub(crate) async fn serialize_rb_stream_to_object_store(
}
Ok(Err(e)) => {
// Return the writer along with the error
return Err((writer, e));
return Err((Some(writer), e));
}
Err(e) => {
// Handle task panic or cancellation
return Err((
writer,
Some(writer),
DataFusionError::Execution(format!(
"Serialization task panicked or was cancelled: {e}"
)),
Expand All @@ -109,10 +114,10 @@ pub(crate) async fn serialize_rb_stream_to_object_store(

match serialize_task.join().await {
Ok(Ok(_)) => (),
Ok(Err(e)) => return Err((writer, e)),
Ok(Err(e)) => return Err((Some(writer), e)),
Err(_) => {
return Err((
writer,
Some(writer),
internal_datafusion_err!("Unknown error writing to object store"),
))
}
Expand Down Expand Up @@ -153,7 +158,7 @@ pub(crate) async fn stateless_serialize_and_write_files(
row_count += cnt;
}
Err((writer, e)) => {
finished_writers.push(writer);
finished_writers.extend(writer);
any_errors = true;
triggering_error = Some(e);
}
Expand Down
25 changes: 19 additions & 6 deletions datafusion/core/src/execution/context/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ use object_store::ObjectStore;
use parking_lot::RwLock;
use url::Url;

use crate::execution::session_state::SessionStateBuilder;
pub use datafusion_execution::config::SessionConfig;
pub use datafusion_execution::TaskContext;
pub use datafusion_expr::execution_props::ExecutionProps;
Expand Down Expand Up @@ -294,7 +295,11 @@ impl SessionContext {
/// all `SessionContext`'s should be configured with the
/// same `RuntimeEnv`.
pub fn new_with_config_rt(config: SessionConfig, runtime: Arc<RuntimeEnv>) -> Self {
let state = SessionState::new_with_config_rt(config, runtime);
let state = SessionStateBuilder::new()
.with_config(config)
.with_runtime_env(runtime)
.with_default_features()
.build();
Self::new_with_state(state)
}

Expand All @@ -315,7 +320,7 @@ impl SessionContext {
}

/// Creates a new `SessionContext` using the provided [`SessionState`]
#[deprecated(since = "32.0.0", note = "Use SessionState::new_with_state")]
#[deprecated(since = "32.0.0", note = "Use SessionContext::new_with_state")]
pub fn with_state(state: SessionState) -> Self {
Self::new_with_state(state)
}
Expand Down Expand Up @@ -1574,6 +1579,7 @@ mod tests {
use datafusion_common_runtime::SpawnedTask;

use crate::catalog::schema::SchemaProvider;
use crate::execution::session_state::SessionStateBuilder;
use crate::physical_planner::PhysicalPlanner;
use async_trait::async_trait;
use tempfile::TempDir;
Expand Down Expand Up @@ -1707,7 +1713,11 @@ mod tests {
.set_str("datafusion.catalog.location", url.as_str())
.set_str("datafusion.catalog.format", "CSV")
.set_str("datafusion.catalog.has_header", "true");
let session_state = SessionState::new_with_config_rt(cfg, runtime);
let session_state = SessionStateBuilder::new()
.with_config(cfg)
.with_runtime_env(runtime)
.with_default_features()
.build();
let ctx = SessionContext::new_with_state(session_state);
ctx.refresh_catalogs().await?;

Expand All @@ -1733,9 +1743,12 @@ mod tests {
#[tokio::test]
async fn custom_query_planner() -> Result<()> {
let runtime = Arc::new(RuntimeEnv::default());
let session_state =
SessionState::new_with_config_rt(SessionConfig::new(), runtime)
.with_query_planner(Arc::new(MyQueryPlanner {}));
let session_state = SessionStateBuilder::new()
.with_config(SessionConfig::new())
.with_runtime_env(runtime)
.with_default_features()
.with_query_planner(Arc::new(MyQueryPlanner {}))
.build();
let ctx = SessionContext::new_with_state(session_state);

let df = ctx.sql("SELECT 1").await?;
Expand Down
Loading

0 comments on commit e942e23

Please sign in to comment.