Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable creating and inserting to empty external tables via SQL #7276

Merged
merged 2 commits into from
Aug 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions datafusion/core/src/datasource/file_format/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -545,12 +545,11 @@ impl DataSink for CsvSink {
// Uniquely identify this batch of files with a random string, to prevent collisions overwriting files
let write_id = Alphanumeric.sample_string(&mut rand::thread_rng(), 16);
for part_idx in 0..num_partitions {
let header = true;
let header = self.has_header;
let builder = WriterBuilder::new().with_delimiter(self.delimiter);
let serializer = CsvSerializer::new()
.with_builder(builder)
.with_header(header);
serializers.push(Box::new(serializer));
let file_path = base_path
.prefix()
.child(format!("/{}_{}.csv", write_id, part_idx));
Expand All @@ -567,6 +566,8 @@ impl DataSink for CsvSink {
object_store.clone(),
)
.await?;

serializers.push(Box::new(serializer));
writers.push(writer);
}
}
Expand Down
194 changes: 194 additions & 0 deletions datafusion/core/src/datasource/listing/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,22 @@ pub enum ListingTableInsertMode {
///Throw an error if insert into is attempted on this table
Error,
}

impl FromStr for ListingTableInsertMode {
type Err = DataFusionError;
fn from_str(s: &str) -> Result<Self, Self::Err> {
let s_lower = s.to_lowercase();
match s_lower.as_str() {
"append_to_file" => Ok(ListingTableInsertMode::AppendToFile),
"append_new_files" => Ok(ListingTableInsertMode::AppendNewFiles),
"error" => Ok(ListingTableInsertMode::Error),
_ => Err(DataFusionError::Plan(format!(
"Unknown or unsupported insert mode {s}. Supported options are \
append_to_file, append_new_files, and error."
))),
}
}
}
/// Options for creating a [`ListingTable`]
#[derive(Clone, Debug)]
pub struct ListingOptions {
Expand Down Expand Up @@ -1607,6 +1623,124 @@ mod tests {
Ok(())
}

#[tokio::test]
async fn test_insert_into_sql_csv_defaults() -> Result<()> {
helper_test_insert_into_sql(
"csv",
FileCompressionType::UNCOMPRESSED,
"OPTIONS (insert_mode 'append_new_files')",
None,
)
.await?;
Ok(())
}

#[tokio::test]
async fn test_insert_into_sql_csv_defaults_header_row() -> Result<()> {
helper_test_insert_into_sql(
"csv",
FileCompressionType::UNCOMPRESSED,
"WITH HEADER ROW \
OPTIONS (insert_mode 'append_new_files')",
None,
)
.await?;
Ok(())
}

#[tokio::test]
async fn test_insert_into_sql_json_defaults() -> Result<()> {
helper_test_insert_into_sql(
"json",
FileCompressionType::UNCOMPRESSED,
"OPTIONS (insert_mode 'append_new_files')",
None,
)
.await?;
Ok(())
}

#[tokio::test]
async fn test_insert_into_sql_parquet_defaults() -> Result<()> {
helper_test_insert_into_sql(
"parquet",
FileCompressionType::UNCOMPRESSED,
"",
None,
)
.await?;
Ok(())
}

#[tokio::test]
async fn test_insert_into_sql_parquet_session_overrides() -> Result<()> {
let mut config_map: HashMap<String, String> = HashMap::new();
config_map.insert(
"datafusion.execution.parquet.compression".into(),
"zstd(5)".into(),
);
config_map.insert(
"datafusion.execution.parquet.dictionary_enabled".into(),
"false".into(),
);
config_map.insert(
"datafusion.execution.parquet.dictionary_page_size_limit".into(),
"100".into(),
);
config_map.insert(
"datafusion.execution.parquet.staistics_enabled".into(),
"none".into(),
);
config_map.insert(
"datafusion.execution.parquet.max_statistics_size".into(),
"10".into(),
);
config_map.insert(
"datafusion.execution.parquet.max_row_group_size".into(),
"5".into(),
);
config_map.insert(
"datafusion.execution.parquet.created_by".into(),
"datafusion test".into(),
);
config_map.insert(
"datafusion.execution.parquet.column_index_truncate_length".into(),
"50".into(),
);
config_map.insert(
"datafusion.execution.parquet.data_page_row_count_limit".into(),
"50".into(),
);
config_map.insert(
"datafusion.execution.parquet.bloom_filter_enabled".into(),
"true".into(),
);
config_map.insert(
"datafusion.execution.parquet.bloom_filter_fpp".into(),
"0.01".into(),
);
config_map.insert(
"datafusion.execution.parquet.bloom_filter_ndv".into(),
"1000".into(),
);
config_map.insert(
"datafusion.execution.parquet.writer_version".into(),
"2.0".into(),
);
config_map.insert(
"datafusion.execution.parquet.write_batch_size".into(),
"5".into(),
);
helper_test_insert_into_sql(
"parquet",
FileCompressionType::UNCOMPRESSED,
"",
Some(config_map),
)
.await?;
Ok(())
}

#[tokio::test]
async fn test_insert_into_append_new_parquet_files_session_overrides() -> Result<()> {
let mut config_map: HashMap<String, String> = HashMap::new();
Expand Down Expand Up @@ -2096,4 +2230,64 @@ mod tests {
// Return Ok if the function
Ok(())
}

/// tests insert into with end to end sql
/// create external table + insert into statements
async fn helper_test_insert_into_sql(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we could use sqllogictest to test this as well --

Perhaps add to https://github.com/apache/arrow-datafusion/blob/main/datafusion/core/tests/sqllogictests/test_files/insert.slt

I can't remember how temporary directories work in sqllogictests though

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I actually worked on this in #7283 which I just opened for copy.slt. Once those updates are in, I can cut another PR to add additional tests to insert.slt

file_type: &str,
// TODO test with create statement options such as compression
_file_compression_type: FileCompressionType,
external_table_options: &str,
session_config_map: Option<HashMap<String, String>>,
) -> Result<()> {
// Create the initial context
let session_ctx = match session_config_map {
Some(cfg) => {
let config = SessionConfig::from_string_hash_map(cfg)?;
SessionContext::with_config(config)
}
None => SessionContext::new(),
};

// create table
let tmp_dir = TempDir::new()?;
let tmp_path = tmp_dir.into_path();
let str_path = tmp_path.to_str().expect("Temp path should convert to &str");
session_ctx
.sql(&format!(
"create external table foo(a varchar, b varchar, c int) \
stored as {file_type} \
location '{str_path}' \
{external_table_options}"
))
.await?
.collect()
.await?;

// insert data
session_ctx.sql("insert into foo values ('foo', 'bar', 1),('foo', 'bar', 2), ('foo', 'bar', 3)")
.await?
.collect()
.await?;

// check count
let batches = session_ctx
.sql("select * from foo")
.await?
.collect()
.await?;

let expected = vec![
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is very cool.

"+-----+-----+---+",
"| a | b | c |",
"+-----+-----+---+",
"| foo | bar | 1 |",
"| foo | bar | 2 |",
"| foo | bar | 3 |",
"+-----+-----+---+",
];
assert_batches_eq!(expected, &batches);

Ok(())
}
}
17 changes: 16 additions & 1 deletion datafusion/core/src/datasource/listing_table_factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ use crate::datasource::provider::TableProviderFactory;
use crate::datasource::TableProvider;
use crate::execution::context::SessionState;

use super::listing::ListingTableInsertMode;

/// A `TableProviderFactory` capable of creating new `ListingTable`s
pub struct ListingTableFactory {}

Expand Down Expand Up @@ -131,13 +133,26 @@ impl TableProviderFactory for ListingTableFactory {
// look for 'infinite' as an option
let infinite_source = cmd.unbounded;

let explicit_insert_mode = cmd.options.get("insert_mode");
let insert_mode = match explicit_insert_mode {
Some(mode) => ListingTableInsertMode::from_str(mode),
None => match file_type {
FileType::CSV => Ok(ListingTableInsertMode::AppendToFile),
FileType::PARQUET => Ok(ListingTableInsertMode::AppendNewFiles),
FileType::AVRO => Ok(ListingTableInsertMode::AppendNewFiles),
FileType::JSON => Ok(ListingTableInsertMode::AppendToFile),
FileType::ARROW => Ok(ListingTableInsertMode::AppendNewFiles),
},
}?;

let options = ListingOptions::new(file_format)
.with_collect_stat(state.config().collect_statistics())
.with_file_extension(file_extension)
.with_target_partitions(state.config().target_partitions())
.with_table_partition_cols(table_partition_cols)
.with_infinite_source(infinite_source)
.with_file_sort_order(cmd.order_exprs.clone());
.with_file_sort_order(cmd.order_exprs.clone())
.with_insert_mode(insert_mode);

let table_path = ListingTableUrl::parse(&cmd.location)?;
let resolved_schema = match provided_schema {
Expand Down
5 changes: 0 additions & 5 deletions datafusion/sql/src/statement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -607,11 +607,6 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
options,
} = statement;

// semantic checks
if file_type == "PARQUET" && !columns.is_empty() {
plan_err!("Column definitions can not be specified for PARQUET files.")?;
}

if file_type != "CSV"
&& file_type != "JSON"
&& file_compression_type != CompressionTypeVariant::UNCOMPRESSED
Expand Down
14 changes: 9 additions & 5 deletions datafusion/sql/tests/sql_integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1796,11 +1796,15 @@ fn create_external_table_with_compression_type() {
#[test]
fn create_external_table_parquet() {
let sql = "CREATE EXTERNAL TABLE t(c1 int) STORED AS PARQUET LOCATION 'foo.parquet'";
let err = logical_plan(sql).expect_err("query should have failed");
assert_eq!(
"Plan(\"Column definitions can not be specified for PARQUET files.\")",
format!("{err:?}")
);
let expected = "CreateExternalTable: Bare { table: \"t\" }";
quick_test(sql, expected);
}

#[test]
fn create_external_table_parquet_sort_order() {
let sql = "create external table foo(a varchar, b varchar, c timestamp) stored as parquet location '/tmp/foo' with order (c)";
let expected = "CreateExternalTable: Bare { table: \"foo\" }";
quick_test(sql, expected);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤔 I tried to run this locally it doesn't quite work

❯ create external table t(x integer, y varchar) stored as parquet location '/tmp/foo' with order (x);
0 rows in set. Query took 0.001 seconds.

❯ select * from t;
0 rows in set. Query took 0.002 seconds.

❯ insert into t values (1, 'foo'), (2, 'bar');
This feature is not implemented: Writing to a sorted listing table via insert into is not supported yet. To write to this table in the meantime, register an equivalent table with file_sort_order = vec![]

But it is getting very close.

Copy link
Contributor Author

@devinjdangelo devinjdangelo Aug 14, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that error I added intentionally. Inserting to a sorted table would work, but there is nothing to enforce that the sort order is preserved yet. So, my concern is a user inserts unsorted data to a sorted table, and then subsequent queries return incorrect results surprisingly.

We could add this as an issue to the streaming writes epic (support inserts to a sorted listingtable).

}

#[test]
Expand Down