Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add parser option enable_options_value_normalization #11330

Merged
merged 17 commits into from
Jul 25, 2024
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,9 @@ config_namespace! {
/// When set to true, SQL parser will normalize ident (convert ident to lowercase when not quoted)
pub enable_ident_normalization: bool, default = true

/// When set to true, SQL parser will normalize options value (convert value to lowercase)
pub enable_options_value_normalization: bool, default = true

/// Configure the SQL dialect used by DataFusion's parser; supported values include: Generic,
/// MySQL, PostgreSQL, Hive, SQLite, Snowflake, Redshift, MsSQL, ClickHouse, BigQuery, and Ansi.
pub dialect: String, default = "generic".to_string()
Expand Down
2 changes: 2 additions & 0 deletions datafusion/core/src/execution/session_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -512,6 +512,8 @@ impl SessionState {
ParserOptions {
parse_float_as_decimal: sql_parser_options.parse_float_as_decimal,
enable_ident_normalization: sql_parser_options.enable_ident_normalization,
enable_options_value_normalization: sql_parser_options
.enable_options_value_normalization,
support_varchar_with_length: sql_parser_options.support_varchar_with_length,
}
}
Expand Down
2 changes: 1 addition & 1 deletion datafusion/sql/src/cte.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
// Process CTEs from top to bottom
for cte in with.cte_tables {
// A `WITH` block can't use the same name more than once
let cte_name = self.normalizer.normalize(cte.alias.name.clone());
let cte_name = self.ident_normalizer.normalize(cte.alias.name.clone());
if planner_context.contains_cte(&cte_name) {
return plan_err!(
"WITH query name {cte_name:?} specified more than once"
Expand Down
6 changes: 3 additions & 3 deletions datafusion/sql/src/expr/identifier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
// interpret names with '.' as if they were
// compound identifiers, but this is not a compound
// identifier. (e.g. it is "foo.bar" not foo.bar)
let normalize_ident = self.normalizer.normalize(id);
let normalize_ident = self.ident_normalizer.normalize(id);
match schema.field_with_unqualified_name(normalize_ident.as_str()) {
Ok(_) => {
// found a match without a qualified name, this is a inner table column
Expand Down Expand Up @@ -97,7 +97,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
if ids[0].value.starts_with('@') {
let var_names: Vec<_> = ids
.into_iter()
.map(|id| self.normalizer.normalize(id))
.map(|id| self.ident_normalizer.normalize(id))
.collect();
let ty = self
.context_provider
Expand All @@ -111,7 +111,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
} else {
let ids = ids
.into_iter()
.map(|id| self.normalizer.normalize(id))
.map(|id| self.ident_normalizer.normalize(id))
.collect::<Vec<_>>();

// Currently not supporting more than one nested level
Expand Down
58 changes: 46 additions & 12 deletions datafusion/sql/src/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,10 @@ use arrow_schema::*;
use datafusion_common::{
field_not_found, internal_err, plan_datafusion_err, DFSchemaRef, SchemaError,
};
use sqlparser::ast::TimezoneInfo;
use sqlparser::ast::{ArrayElemTypeDef, ExactNumberInfo};
use sqlparser::ast::{ColumnDef as SQLColumnDef, ColumnOption};
use sqlparser::ast::{DataType as SQLDataType, Ident, ObjectName, TableAlias};
use sqlparser::ast::{TimezoneInfo, Value};

use datafusion_common::TableReference;
use datafusion_common::{
Expand All @@ -38,8 +38,7 @@ use datafusion_expr::logical_plan::{LogicalPlan, LogicalPlanBuilder};
use datafusion_expr::utils::find_column_exprs;
use datafusion_expr::{col, Expr};

use crate::utils::make_decimal_type;

use crate::utils::{make_decimal_type, value_to_string};
pub use datafusion_expr::planner::ContextProvider;

/// SQL parser options
Expand All @@ -48,6 +47,7 @@ pub struct ParserOptions {
pub parse_float_as_decimal: bool,
pub enable_ident_normalization: bool,
pub support_varchar_with_length: bool,
pub enable_options_value_normalization: bool,
}

impl Default for ParserOptions {
Expand All @@ -56,6 +56,7 @@ impl Default for ParserOptions {
parse_float_as_decimal: false,
enable_ident_normalization: true,
support_varchar_with_length: true,
enable_options_value_normalization: true,
}
}
}
Expand Down Expand Up @@ -86,6 +87,32 @@ impl IdentNormalizer {
}
}

/// Value Normalizer
#[derive(Debug)]
pub struct ValueNormalizer {
normalize: bool,
}

impl Default for ValueNormalizer {
fn default() -> Self {
Self { normalize: true }
}
}

impl ValueNormalizer {
pub fn new(normalize: bool) -> Self {
Self { normalize }
}

pub fn normalize(&self, value: Value) -> Option<String> {
match (value_to_string(&value), self.normalize) {
(Some(s), true) => Some(s.to_ascii_lowercase()),
(Some(s), false) => Some(s),
(None, _) => None,
}
}
}

/// Struct to store the states used by the Planner. The Planner will leverage the states to resolve
/// CTEs, Views, subqueries and PREPARE statements. The states include
/// Common Table Expression (CTE) provided with WITH clause and
Expand Down Expand Up @@ -184,7 +211,8 @@ impl PlannerContext {
pub struct SqlToRel<'a, S: ContextProvider> {
pub(crate) context_provider: &'a S,
pub(crate) options: ParserOptions,
pub(crate) normalizer: IdentNormalizer,
pub(crate) ident_normalizer: IdentNormalizer,
pub(crate) value_normalizer: ValueNormalizer,
}

impl<'a, S: ContextProvider> SqlToRel<'a, S> {
Expand All @@ -195,12 +223,14 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {

/// Create a new query planner
pub fn new_with_options(context_provider: &'a S, options: ParserOptions) -> Self {
let normalize = options.enable_ident_normalization;
let ident_normalize = options.enable_ident_normalization;
let options_value_normalize = options.enable_options_value_normalization;

SqlToRel {
context_provider,
options,
normalizer: IdentNormalizer::new(normalize),
ident_normalizer: IdentNormalizer::new(ident_normalize),
value_normalizer: ValueNormalizer::new(options_value_normalize),
}
}

Expand All @@ -214,7 +244,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
.iter()
.any(|x| x.option == ColumnOption::NotNull);
fields.push(Field::new(
self.normalizer.normalize(column.name),
self.ident_normalizer.normalize(column.name),
data_type,
!not_nullable,
));
Expand Down Expand Up @@ -252,8 +282,10 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
let default_expr = self
.sql_to_expr(default_sql_expr.clone(), &empty_schema, planner_context)
.map_err(error_desc)?;
column_defaults
.push((self.normalizer.normalize(column.name.clone()), default_expr));
column_defaults.push((
self.ident_normalizer.normalize(column.name.clone()),
default_expr,
));
}
}
Ok(column_defaults)
Expand All @@ -268,7 +300,9 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
let plan = self.apply_expr_alias(plan, alias.columns)?;

LogicalPlanBuilder::from(plan)
.alias(TableReference::bare(self.normalizer.normalize(alias.name)))?
.alias(TableReference::bare(
self.ident_normalizer.normalize(alias.name),
))?
.build()
}

Expand All @@ -289,7 +323,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
let fields = plan.schema().fields().clone();
LogicalPlanBuilder::from(plan)
.project(fields.iter().zip(idents.into_iter()).map(|(field, ident)| {
col(field.name()).alias(self.normalizer.normalize(ident))
col(field.name()).alias(self.ident_normalizer.normalize(ident))
}))?
.build()
}
Expand Down Expand Up @@ -415,7 +449,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
None => Ident::new(format!("c{idx}"))
};
Ok(Arc::new(Field::new(
self.normalizer.normalize(field_name),
self.ident_normalizer.normalize(field_name),
data_type,
true,
)))
Expand Down
2 changes: 1 addition & 1 deletion datafusion/sql/src/relation/join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
JoinConstraint::Using(idents) => {
let keys: Vec<Column> = idents
.into_iter()
.map(|x| Column::from_name(self.normalizer.normalize(x)))
.map(|x| Column::from_name(self.ident_normalizer.normalize(x)))
.collect();
LogicalPlanBuilder::from(left)
.join_using(right, join_type, keys)?
Expand Down
2 changes: 1 addition & 1 deletion datafusion/sql/src/select.rs
Original file line number Diff line number Diff line change
Expand Up @@ -576,7 +576,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
&[&[plan.schema()]],
&plan.using_columns()?,
)?;
let name = self.normalizer.normalize(alias);
let name = self.ident_normalizer.normalize(alias);
// avoiding adding an alias if the column name is the same.
let expr = match &col {
Expr::Column(column) if column.name.eq(&name) => col,
Expand Down
108 changes: 37 additions & 71 deletions datafusion/sql/src/statement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,30 +66,6 @@ fn ident_to_string(ident: &Ident) -> String {
normalize_ident(ident.to_owned())
}

fn value_to_string(value: &Value) -> Option<String> {
match value {
Value::SingleQuotedString(s) => Some(s.to_string()),
Value::DollarQuotedString(s) => Some(s.to_string()),
Value::Number(_, _) | Value::Boolean(_) => Some(value.to_string()),
Value::DoubleQuotedString(_)
| Value::EscapedStringLiteral(_)
| Value::NationalStringLiteral(_)
| Value::SingleQuotedByteStringLiteral(_)
| Value::DoubleQuotedByteStringLiteral(_)
| Value::TripleSingleQuotedString(_)
| Value::TripleDoubleQuotedString(_)
| Value::TripleSingleQuotedByteStringLiteral(_)
| Value::TripleDoubleQuotedByteStringLiteral(_)
| Value::SingleQuotedRawStringLiteral(_)
| Value::DoubleQuotedRawStringLiteral(_)
| Value::TripleSingleQuotedRawStringLiteral(_)
| Value::TripleDoubleQuotedRawStringLiteral(_)
| Value::HexStringLiteral(_)
| Value::Null
| Value::Placeholder(_) => None,
}
}

fn object_name_to_string(object_name: &ObjectName) -> String {
object_name
.0
Expand Down Expand Up @@ -881,25 +857,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
}
};

let mut options = HashMap::new();
for (key, value) in statement.options {
let value_string = match value_to_string(&value) {
None => {
return plan_err!("Unsupported Value in COPY statement {}", value);
}
Some(v) => v,
};

if !(&key.contains('.')) {
// If config does not belong to any namespace, assume it is
// a format option and apply the format prefix for backwards
// compatibility.
let renamed_key = format!("format.{}", key);
options.insert(renamed_key.to_lowercase(), value_string.to_lowercase());
} else {
options.insert(key.to_lowercase(), value_string.to_lowercase());
}
}
let options_map = self.parse_options_map(statement.options, true)?;

let maybe_file_type = if let Some(stored_as) = &statement.stored_as {
if let Ok(ext_file_type) = self.context_provider.get_file_type(stored_as) {
Expand Down Expand Up @@ -946,7 +904,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
output_url: statement.target,
file_type,
partition_by,
options,
options: options_map,
}))
}

Expand Down Expand Up @@ -1007,29 +965,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
let inline_constraints = calc_inline_constraints_from_columns(&columns);
all_constraints.extend(inline_constraints);

let mut options_map = HashMap::<String, String>::new();
for (key, value) in options {
if options_map.contains_key(&key) {
xinlifoobar marked this conversation as resolved.
Show resolved Hide resolved
return plan_err!("Option {key} is specified multiple times");
}

let Some(value_string) = value_to_string(&value) else {
return plan_err!(
"Unsupported Value in CREATE EXTERNAL TABLE statement {}",
value
);
};

if !(&key.contains('.')) {
// If a config does not belong to any namespace, we assume it is
// a format option and apply the format prefix for backwards
// compatibility.
let renamed_key = format!("format.{}", key.to_lowercase());
options_map.insert(renamed_key, value_string.to_lowercase());
} else {
options_map.insert(key.to_lowercase(), value_string.to_lowercase());
}
}
let options_map = self.parse_options_map(options, false)?;

let compression = options_map
.get("format.compression")
Expand Down Expand Up @@ -1081,6 +1017,36 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
)))
}

fn parse_options_map(
&self,
options: Vec<(String, Value)>,
allow_duplicates: bool,
xinlifoobar marked this conversation as resolved.
Show resolved Hide resolved
) -> Result<HashMap<String, String>> {
let mut options_map = HashMap::new();
for (key, value) in options {
if !allow_duplicates && options_map.contains_key(&key) {
return plan_err!("Option {key} is specified multiple times");
}

let Some(value_string) = self.value_normalizer.normalize(value.clone())
else {
return plan_err!("Unsupported Value {}", value);
};

if !(&key.contains('.')) {
// If config does not belong to any namespace, assume it is
// a format option and apply the format prefix for backwards
// compatibility.
let renamed_key = format!("format.{}", key);
options_map.insert(renamed_key.to_lowercase(), value_string);
} else {
options_map.insert(key.to_lowercase(), value_string);
}
}

Ok(options_map)
}

/// Generate a plan for EXPLAIN ... that will print out a plan
///
/// Note this is the sqlparser explain statement, not the
Expand Down Expand Up @@ -1188,7 +1154,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
// parse value string from Expr
let value_string = match &value[0] {
SQLExpr::Identifier(i) => ident_to_string(i),
SQLExpr::Value(v) => match value_to_string(v) {
SQLExpr::Value(v) => match crate::utils::value_to_string(v) {
xinlifoobar marked this conversation as resolved.
Show resolved Hide resolved
None => {
return plan_err!("Unsupported Value {}", value[0]);
}
Expand Down Expand Up @@ -1349,8 +1315,8 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
None => {
// If the target table has an alias, use it to qualify the column name
if let Some(alias) = &table_alias {
Expr::Column(Column::new(
Some(self.normalizer.normalize(alias.name.clone())),
datafusion_expr::Expr::Column(Column::new(
Some(self.ident_normalizer.normalize(alias.name.clone())),
field.name(),
))
} else {
Expand Down Expand Up @@ -1405,7 +1371,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
let mut value_indices = vec![None; table_schema.fields().len()];
let fields = columns
.into_iter()
.map(|c| self.normalizer.normalize(c))
.map(|c| self.ident_normalizer.normalize(c))
.enumerate()
.map(|(i, c)| {
let column_index = table_schema
Expand Down
Loading