From dd6e2ee86b0db07d89a256b1c8a2e1009e3f51f2 Mon Sep 17 00:00:00 2001 From: Ritchie Vink Date: Mon, 6 May 2024 18:18:06 +0200 Subject: [PATCH] fix: Respect user passed 'reader_schema' in 'scan_csv' (#16080) --- Cargo.lock | 4 +++ Cargo.toml | 2 +- .../src/physical_plan/executors/scan/csv.rs | 6 +++-- .../physical_plan/executors/scan/parquet.rs | 18 ++++++++++--- .../src/physical_plan/planner/lp.rs | 2 +- .../src/executors/sources/parquet.rs | 9 +++++-- crates/polars-plan/Cargo.toml | 2 ++ .../src/logical_plan/conversion/scans.rs | 24 ++++++++++++------ crates/polars-plan/src/logical_plan/schema.rs | 5 ++-- py-polars/tests/unit/io/test_lazy_csv.py | 25 +++++++++++++++++++ 10 files changed, 79 insertions(+), 18 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index d4332d87aa02..4bb623113671 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1287,6 +1287,9 @@ name = "either" version = "1.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a47c1c47d2f5964e29c61246e81db715514cd532db6b5116a25ea3c03d6780a2" +dependencies = [ + "serde", +] [[package]] name = "elliptic-curve" @@ -3079,6 +3082,7 @@ dependencies = [ "chrono", "chrono-tz", "ciborium", + "either", "futures", "hashbrown 0.14.3", "libloading", diff --git a/Cargo.toml b/Cargo.toml index ab0e1535e01f..3887ac8f39ec 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -38,7 +38,7 @@ chrono-tz = "0.8.1" ciborium = "0.2" crossbeam-channel = "0.5.8" crossbeam-queue = "0.3" -either = "1.9" +either = "1.11" ethnum = "1.3.2" fallible-streaming-iterator = "0.1.9" futures = "0.3.25" diff --git a/crates/polars-lazy/src/physical_plan/executors/scan/csv.rs b/crates/polars-lazy/src/physical_plan/executors/scan/csv.rs index 06277d5b054e..69a8df57c41c 100644 --- a/crates/polars-lazy/src/physical_plan/executors/scan/csv.rs +++ b/crates/polars-lazy/src/physical_plan/executors/scan/csv.rs @@ -4,7 +4,7 @@ use super::*; pub struct CsvExec { pub path: PathBuf, - pub schema: SchemaRef, + pub file_info: FileInfo, pub options: CsvReaderOptions, pub file_options: FileScanOptions, pub predicate: Option>, @@ -26,7 +26,9 @@ impl CsvExec { CsvReader::from_path(&self.path) .unwrap() .has_header(self.options.has_header) - .with_dtypes(Some(self.schema.clone())) + .with_schema(Some( + self.file_info.reader_schema.clone().unwrap().unwrap_right(), + )) .with_separator(self.options.separator) .with_ignore_errors(self.options.ignore_errors) .with_skip_rows(self.options.skip_rows) diff --git a/crates/polars-lazy/src/physical_plan/executors/scan/parquet.rs b/crates/polars-lazy/src/physical_plan/executors/scan/parquet.rs index 429a87574c1f..695cc58c98c0 100644 --- a/crates/polars-lazy/src/physical_plan/executors/scan/parquet.rs +++ b/crates/polars-lazy/src/physical_plan/executors/scan/parquet.rs @@ -91,7 +91,12 @@ impl ParquetExec { ); let mut reader = ParquetReader::new(file) - .with_schema(self.file_info.reader_schema.clone()) + .with_schema( + self.file_info + .reader_schema + .clone() + .map(|either| either.unwrap_left()), + ) .read_parallel(parallel) .set_low_memory(self.options.low_memory) .use_statistics(self.options.use_statistics) @@ -163,7 +168,9 @@ impl ParquetExec { .file_info .reader_schema .as_ref() - .expect("should be set"); + .expect("should be set") + .as_ref() + .unwrap_left(); let first_metadata = &self.metadata; let cloud_options = self.cloud_options.as_ref(); let with_columns = self @@ -343,7 +350,12 @@ impl ParquetExec { ); return Ok(materialize_empty_df( projection.as_deref(), - self.file_info.reader_schema.as_ref().unwrap(), + self.file_info + .reader_schema + .as_ref() + .unwrap() + .as_ref() + .unwrap_left(), hive_partitions.as_deref(), self.file_options.row_index.as_ref(), )); diff --git a/crates/polars-lazy/src/physical_plan/planner/lp.rs b/crates/polars-lazy/src/physical_plan/planner/lp.rs index 85260425a844..da98d7aa0216 100644 --- a/crates/polars-lazy/src/physical_plan/planner/lp.rs +++ b/crates/polars-lazy/src/physical_plan/planner/lp.rs @@ -239,7 +239,7 @@ pub fn create_physical_plan( let path = paths[0].clone(); Ok(Box::new(executors::CsvExec { path, - schema: file_info.schema, + file_info, options: csv_options, predicate, file_options, diff --git a/crates/polars-pipe/src/executors/sources/parquet.rs b/crates/polars-pipe/src/executors/sources/parquet.rs index b94028980ede..79e7380c6291 100644 --- a/crates/polars-pipe/src/executors/sources/parquet.rs +++ b/crates/polars-pipe/src/executors/sources/parquet.rs @@ -113,7 +113,7 @@ impl ParquetSource { file_options, projection, chunk_size, - reader_schema, + reader_schema.map(|either| either.unwrap_left()), hive_partitions, )) } @@ -151,7 +151,12 @@ impl ParquetSource { .map(|v| v.as_slice()); check_projected_arrow_schema( batched_reader.schema().as_ref(), - self.file_info.reader_schema.as_ref().unwrap(), + self.file_info + .reader_schema + .as_ref() + .unwrap() + .as_ref() + .unwrap_left(), with_columns, "schema of all files in a single scan_parquet must be equal", )?; diff --git a/crates/polars-plan/Cargo.toml b/crates/polars-plan/Cargo.toml index 18279f58fe00..ee6d0a2d43ee 100644 --- a/crates/polars-plan/Cargo.toml +++ b/crates/polars-plan/Cargo.toml @@ -28,6 +28,7 @@ bytemuck = { workspace = true } chrono = { workspace = true, optional = true } chrono-tz = { workspace = true, optional = true } ciborium = { workspace = true, optional = true } +either = { workspace = true } futures = { workspace = true, optional = true } hashbrown = { workspace = true } once_cell = { workspace = true } @@ -53,6 +54,7 @@ serde = [ "polars-time/serde", "polars-io/serde", "polars-ops/serde", + "either/serde", ] streaming = [] parquet = ["polars-io/parquet", "polars-parquet"] diff --git a/crates/polars-plan/src/logical_plan/conversion/scans.rs b/crates/polars-plan/src/logical_plan/conversion/scans.rs index 84139ff5e713..7d03fa9b7d56 100644 --- a/crates/polars-plan/src/logical_plan/conversion/scans.rs +++ b/crates/polars-plan/src/logical_plan/conversion/scans.rs @@ -1,6 +1,7 @@ use std::io::Read; use std::path::PathBuf; +use either::Either; #[cfg(feature = "cloud")] use polars_io::pl_async::get_runtime; use polars_io::prelude::*; @@ -66,7 +67,7 @@ pub(super) fn parquet_file_info( let mut file_info = FileInfo::new( schema, - Some(reader_schema), + Some(Either::Left(reader_schema)), (num_rows, num_rows.unwrap_or(0)), ); @@ -110,7 +111,7 @@ pub(super) fn ipc_file_info( metadata.schema.as_ref().into(), file_options.row_index.as_ref(), ), - Some(Arc::clone(&metadata.schema)), + Some(Either::Left(Arc::clone(&metadata.schema))), (None, 0), ); @@ -171,14 +172,23 @@ pub(super) fn csv_file_info( .clone() .unwrap_or_else(|| Arc::new(inferred_schema)); - if let Some(rc) = &file_options.row_index { - let schema = Arc::make_mut(&mut schema); - schema.insert_at_index(0, rc.name.as_str().into(), IDX_DTYPE)?; - } + let reader_schema = if let Some(rc) = &file_options.row_index { + let reader_schema = schema.clone(); + let mut output_schema = (*reader_schema).clone(); + output_schema.insert_at_index(0, rc.name.as_str().into(), IDX_DTYPE)?; + schema = Arc::new(output_schema); + reader_schema + } else { + schema.clone() + }; let n_bytes = reader_bytes.len(); let estimated_n_rows = (rows_read as f64 / bytes_read as f64 * n_bytes as f64) as usize; csv_options.skip_rows += csv_options.skip_rows_after_header; - Ok(FileInfo::new(schema, None, (None, estimated_n_rows))) + Ok(FileInfo::new( + schema, + Some(Either::Right(reader_schema)), + (None, estimated_n_rows), + )) } diff --git a/crates/polars-plan/src/logical_plan/schema.rs b/crates/polars-plan/src/logical_plan/schema.rs index 6c4629a80cb0..2ee480c9727b 100644 --- a/crates/polars-plan/src/logical_plan/schema.rs +++ b/crates/polars-plan/src/logical_plan/schema.rs @@ -3,6 +3,7 @@ use std::path::Path; use std::sync::Mutex; use arrow::datatypes::ArrowSchemaRef; +use either::Either; use polars_core::prelude::*; use polars_utils::format_smartstring; #[cfg(feature = "serde")] @@ -43,7 +44,7 @@ pub struct FileInfo { pub schema: SchemaRef, /// Stores the schema used for the reader, as the main schema can contain /// extra hive columns. - pub reader_schema: Option, + pub reader_schema: Option>, /// - known size /// - estimated size pub row_estimation: (Option, usize), @@ -54,7 +55,7 @@ impl FileInfo { /// Constructs a new [`FileInfo`]. pub fn new( schema: SchemaRef, - reader_schema: Option, + reader_schema: Option>, row_estimation: (Option, usize), ) -> Self { Self { diff --git a/py-polars/tests/unit/io/test_lazy_csv.py b/py-polars/tests/unit/io/test_lazy_csv.py index 59e7291ea522..0079f6ad43ac 100644 --- a/py-polars/tests/unit/io/test_lazy_csv.py +++ b/py-polars/tests/unit/io/test_lazy_csv.py @@ -308,3 +308,28 @@ def test_csv_null_values_with_projection_15515() -> None: "SireKey": [None], "BirthDate": [19940315], } + + +@pytest.mark.write_disk() +def test_csv_respect_user_schema_ragged_lines_15254() -> None: + with tempfile.NamedTemporaryFile() as f: + f.write( + b""" +A,B,C +1,2,3 +4,5,6,7,8 +9,10,11 +""".strip() + ) + f.seek(0) + + df = pl.scan_csv( + f.name, schema=dict.fromkeys("ABCDE", pl.String), truncate_ragged_lines=True + ).collect() + assert df.to_dict(as_series=False) == { + "A": ["1", "4", "9"], + "B": ["2", "5", "10"], + "C": ["3", "6", "11"], + "D": [None, "7", None], + "E": [None, "8", None], + }