Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/main' into apache_main
Browse files Browse the repository at this point in the history
  • Loading branch information
alihandroid committed Jul 26, 2024
2 parents f9becb6 + fab7e23 commit 0d01e76
Show file tree
Hide file tree
Showing 77 changed files with 961 additions and 566 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -154,4 +154,5 @@ rpath = false
large_futures = "warn"

[workspace.lints.rust]
unexpected_cfgs = { level = "warn", check-cfg = ["cfg(tarpaulin)"] }
unused_imports = "deny"
1 change: 1 addition & 0 deletions datafusion/common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ path = "src/lib.rs"
avro = ["apache-avro"]
backtrace = []
pyarrow = ["pyo3", "arrow/pyarrow", "parquet"]
force_hash_collisions = []

[dependencies]
ahash = { workspace = true }
Expand Down
5 changes: 4 additions & 1 deletion datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,9 @@ config_namespace! {
/// When set to true, SQL parser will normalize ident (convert ident to lowercase when not quoted)
pub enable_ident_normalization: bool, default = true

/// When set to true, SQL parser will normalize options value (convert value to lowercase)
pub enable_options_value_normalization: bool, default = true

/// Configure the SQL dialect used by DataFusion's parser; supported values include: Generic,
/// MySQL, PostgreSQL, Hive, SQLite, Snowflake, Redshift, MsSQL, ClickHouse, BigQuery, and Ansi.
pub dialect: String, default = "generic".to_string()
Expand Down Expand Up @@ -1207,7 +1210,7 @@ impl ConfigField for TableOptions {
/// # Parameters
///
/// * `key`: The configuration key specifying which setting to adjust, prefixed with the format (e.g., "format.delimiter")
/// for CSV format.
/// for CSV format.
/// * `value`: The value to set for the specified configuration key.
///
/// # Returns
Expand Down
20 changes: 18 additions & 2 deletions datafusion/common/src/hash_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,27 @@

//! Functionality used both on logical and physical plans

#[cfg(not(feature = "force_hash_collisions"))]
use std::sync::Arc;

use ahash::RandomState;
use arrow::array::*;
use arrow::datatypes::*;
use arrow::row::Rows;
#[cfg(not(feature = "force_hash_collisions"))]
use arrow::{downcast_dictionary_array, downcast_primitive_array};
use arrow_buffer::IntervalDayTime;
use arrow_buffer::IntervalMonthDayNano;

#[cfg(not(feature = "force_hash_collisions"))]
use crate::cast::{
as_boolean_array, as_fixed_size_list_array, as_generic_binary_array,
as_large_list_array, as_list_array, as_map_array, as_primitive_array,
as_string_array, as_struct_array,
};
use crate::error::{Result, _internal_err};
use crate::error::Result;
#[cfg(not(feature = "force_hash_collisions"))]
use crate::error::_internal_err;

// Combines two hashes into one hash
#[inline]
Expand All @@ -41,6 +46,7 @@ pub fn combine_hashes(l: u64, r: u64) -> u64 {
hash.wrapping_mul(37).wrapping_add(r)
}

#[cfg(not(feature = "force_hash_collisions"))]
fn hash_null(random_state: &RandomState, hashes_buffer: &'_ mut [u64], mul_col: bool) {
if mul_col {
hashes_buffer.iter_mut().for_each(|hash| {
Expand Down Expand Up @@ -90,6 +96,7 @@ hash_float_value!((half::f16, u16), (f32, u32), (f64, u64));
/// Builds hash values of PrimitiveArray and writes them into `hashes_buffer`
/// If `rehash==true` this combines the previous hash value in the buffer
/// with the new hash using `combine_hashes`
#[cfg(not(feature = "force_hash_collisions"))]
fn hash_array_primitive<T>(
array: &PrimitiveArray<T>,
random_state: &RandomState,
Expand Down Expand Up @@ -135,6 +142,7 @@ fn hash_array_primitive<T>(
/// Hashes one array into the `hashes_buffer`
/// If `rehash==true` this combines the previous hash value in the buffer
/// with the new hash using `combine_hashes`
#[cfg(not(feature = "force_hash_collisions"))]
fn hash_array<T>(
array: T,
random_state: &RandomState,
Expand Down Expand Up @@ -180,6 +188,7 @@ fn hash_array<T>(
}

/// Hash the values in a dictionary array
#[cfg(not(feature = "force_hash_collisions"))]
fn hash_dictionary<K: ArrowDictionaryKeyType>(
array: &DictionaryArray<K>,
random_state: &RandomState,
Expand Down Expand Up @@ -210,6 +219,7 @@ fn hash_dictionary<K: ArrowDictionaryKeyType>(
Ok(())
}

#[cfg(not(feature = "force_hash_collisions"))]
fn hash_struct_array(
array: &StructArray,
random_state: &RandomState,
Expand Down Expand Up @@ -270,6 +280,7 @@ fn hash_map_array(
Ok(())
}

#[cfg(not(feature = "force_hash_collisions"))]
fn hash_list_array<OffsetSize>(
array: &GenericListArray<OffsetSize>,
random_state: &RandomState,
Expand Down Expand Up @@ -303,6 +314,7 @@ where
Ok(())
}

#[cfg(not(feature = "force_hash_collisions"))]
fn hash_fixed_list_array(
array: &FixedSizeListArray,
random_state: &RandomState,
Expand Down Expand Up @@ -488,7 +500,11 @@ pub fn create_row_hashes_v2<'a>(

#[cfg(test)]
mod tests {
use arrow::{array::*, datatypes::*};
use std::sync::Arc;

use arrow::array::*;
#[cfg(not(feature = "force_hash_collisions"))]
use arrow::datatypes::*;

use super::*;

Expand Down
8 changes: 4 additions & 4 deletions datafusion/common/src/tree_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,14 @@ macro_rules! handle_transform_recursion {
/// There are three categories of TreeNode APIs:
///
/// 1. "Inspecting" APIs to traverse a tree of `&TreeNodes`:
/// [`apply`], [`visit`], [`exists`].
/// [`apply`], [`visit`], [`exists`].
///
/// 2. "Transforming" APIs that traverse and consume a tree of `TreeNode`s
/// producing possibly changed `TreeNode`s: [`transform`], [`transform_up`],
/// [`transform_down`], [`transform_down_up`], and [`rewrite`].
/// producing possibly changed `TreeNode`s: [`transform`], [`transform_up`],
/// [`transform_down`], [`transform_down_up`], and [`rewrite`].
///
/// 3. Internal APIs used to implement the `TreeNode` API: [`apply_children`],
/// and [`map_children`].
/// and [`map_children`].
///
/// | Traversal Order | Inspecting | Transforming |
/// | --- | --- | --- |
Expand Down
2 changes: 1 addition & 1 deletion datafusion/common/src/utils/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use crate::{DataFusionError, Result};
/// # Parameters
/// - `num_elements`: The number of elements expected in the hash table.
/// - `fixed_size`: A fixed overhead size associated with the collection
/// (e.g., HashSet or HashTable).
/// (e.g., HashSet or HashTable).
/// - `T`: The type of elements stored in the hash table.
///
/// # Details
Expand Down
2 changes: 1 addition & 1 deletion datafusion/common/src/utils/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,7 @@ pub fn get_at_indices<T: Clone, I: Borrow<usize>>(
/// This function finds the longest prefix of the form 0, 1, 2, ... within the
/// collection `sequence`. Examples:
/// - For 0, 1, 2, 4, 5; we would produce 3, meaning 0, 1, 2 is the longest satisfying
/// prefix.
/// prefix.
/// - For 1, 2, 3, 4; we would produce 0, meaning there is no such prefix.
pub fn longest_consecutive_prefix<T: Borrow<usize>>(
sequence: impl IntoIterator<Item = T>,
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ default = [
]
encoding_expressions = ["datafusion-functions/encoding_expressions"]
# Used for testing ONLY: causes all values to hash to the same value (test for collisions)
force_hash_collisions = []
force_hash_collisions = ["datafusion-physical-plan/force_hash_collisions", "datafusion-common/force_hash_collisions"]
math_expressions = ["datafusion-functions/math_expressions"]
parquet = ["datafusion-common/parquet", "dep:parquet"]
pyarrow = ["datafusion-common/pyarrow", "parquet"]
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/benches/sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
//! 1. Creates a list of tuples (sorted if necessary)
//!
//! 2. Divides those tuples across some number of streams of [`RecordBatch`]
//! preserving any ordering
//! preserving any ordering
//!
//! 3. Times how long it takes for a given sort plan to process the input
//!
Expand Down
4 changes: 2 additions & 2 deletions datafusion/core/src/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,12 +141,12 @@ pub trait CatalogList: CatalogProviderList {}
/// Here are some examples of how to implement custom catalogs:
///
/// * [`datafusion-cli`]: [`DynamicFileCatalogProvider`] catalog provider
/// that treats files and directories on a filesystem as tables.
/// that treats files and directories on a filesystem as tables.
///
/// * The [`catalog.rs`]: a simple directory based catalog.
///
/// * [delta-rs]: [`UnityCatalogProvider`] implementation that can
/// read from Delta Lake tables
/// read from Delta Lake tables
///
/// [`datafusion-cli`]: https://datafusion.apache.org/user-guide/cli/index.html
/// [`DynamicFileCatalogProvider`]: https://github.com/apache/datafusion/blob/31b9b48b08592b7d293f46e75707aad7dadd7cbc/datafusion-cli/src/catalog.rs#L75
Expand Down
6 changes: 3 additions & 3 deletions datafusion/core/src/dataframe/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,15 +114,15 @@ impl Default for DataFrameWriteOptions {
/// The typical workflow using DataFrames looks like
///
/// 1. Create a DataFrame via methods on [SessionContext], such as [`read_csv`]
/// and [`read_parquet`].
/// and [`read_parquet`].
///
/// 2. Build a desired calculation by calling methods such as [`filter`],
/// [`select`], [`aggregate`], and [`limit`]
/// [`select`], [`aggregate`], and [`limit`]
///
/// 3. Execute into [`RecordBatch`]es by calling [`collect`]
///
/// A `DataFrame` is a wrapper around a [`LogicalPlan`] and the [`SessionState`]
/// required for execution.
/// required for execution.
///
/// DataFrames are "lazy" in the sense that most methods do not actually compute
/// anything, they just build up a plan. Calling [`collect`] executes the plan
Expand Down
35 changes: 19 additions & 16 deletions datafusion/core/src/datasource/file_format/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -344,22 +344,25 @@ impl FileFormat for CsvFormat {
conf: FileScanConfig,
_filters: Option<&Arc<dyn PhysicalExpr>>,
) -> Result<Arc<dyn ExecutionPlan>> {
let exec = CsvExec::new(
conf,
// If format options does not specify whether there is a header,
// we consult configuration options.
self.options
.has_header
.unwrap_or(state.config_options().catalog.has_header),
self.options.delimiter,
self.options.quote,
self.options.escape,
self.options.comment,
self.options
.newlines_in_values
.unwrap_or(state.config_options().catalog.newlines_in_values),
self.options.compression.into(),
);
// Consult configuration options for default values
let has_header = self
.options
.has_header
.unwrap_or(state.config_options().catalog.has_header);
let newlines_in_values = self
.options
.newlines_in_values
.unwrap_or(state.config_options().catalog.newlines_in_values);

let exec = CsvExec::builder(conf)
.with_has_header(has_header)
.with_delimeter(self.options.delimiter)
.with_quote(self.options.quote)
.with_escape(self.options.escape)
.with_comment(self.options.comment)
.with_newlines_in_values(newlines_in_values)
.with_file_compression_type(self.options.compression.into())
.build();
Ok(Arc::new(exec))
}

Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/datasource/listing/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ use object_store::{ObjectMeta, ObjectStore};
/// This means that if this function returns true:
/// - the table provider can filter the table partition values with this expression
/// - the expression can be marked as `TableProviderFilterPushDown::Exact` once this filtering
/// was performed
/// was performed
pub fn expr_applicable_for_cols(col_names: &[String], expr: &Expr) -> bool {
let mut is_applicable = true;
expr.apply(|expr| {
Expand Down
14 changes: 7 additions & 7 deletions datafusion/core/src/datasource/listing/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -287,17 +287,17 @@ impl ListingOptions {
///# Notes
///
/// - If only one level (e.g. `year` in the example above) is
/// specified, the other levels are ignored but the files are
/// still read.
/// specified, the other levels are ignored but the files are
/// still read.
///
/// - Files that don't follow this partitioning scheme will be
/// ignored.
/// ignored.
///
/// - Since the columns have the same value for all rows read from
/// each individual file (such as dates), they are typically
/// dictionary encoded for efficiency. You may use
/// [`wrap_partition_type_in_dict`] to request a
/// dictionary-encoded type.
/// each individual file (such as dates), they are typically
/// dictionary encoded for efficiency. You may use
/// [`wrap_partition_type_in_dict`] to request a
/// dictionary-encoded type.
///
/// - The partition columns are solely extracted from the file path. Especially they are NOT part of the parquet files itself.
///
Expand Down
Loading

0 comments on commit 0d01e76

Please sign in to comment.