Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Decompress in CSV / NDJSON scan #17841

Merged
merged 8 commits into from
Jul 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion crates/polars-io/src/csv/read/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,3 @@ pub use parser::count_rows;
pub use read_impl::batched::{BatchedCsvReader, OwnedBatchedCsvReader};
pub use reader::CsvReader;
pub use schema_inference::infer_file_schema;
pub use utils::is_compressed;
4 changes: 2 additions & 2 deletions crates/polars-io/src/csv/read/read_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@ use super::schema_inference::{check_decimal_comma, infer_file_schema};
#[cfg(any(feature = "decompress", feature = "decompress-fast"))]
use super::utils::decompress;
use super::utils::get_file_chunks;
#[cfg(not(any(feature = "decompress", feature = "decompress-fast")))]
use super::utils::is_compressed;
use crate::mmap::ReaderBytes;
use crate::predicates::PhysicalIoExpr;
#[cfg(not(any(feature = "decompress", feature = "decompress-fast")))]
use crate::utils::is_compressed;
use crate::utils::update_row_counts;
use crate::RowIndex;

Expand Down
2 changes: 1 addition & 1 deletion crates/polars-io/src/csv/read/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ impl CsvReadOptions {
);

let path = resolve_homedir(self.path.as_ref().unwrap());
let reader = polars_utils::open_file(path)?;
let reader = polars_utils::open_file(&path)?;
let options = self;

Ok(CsvReader {
Expand Down
17 changes: 1 addition & 16 deletions crates/polars-io/src/csv/read/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,22 +45,6 @@ pub(crate) fn get_file_chunks(
offsets
}

// magic numbers
const GZIP: [u8; 2] = [31, 139];
const ZLIB0: [u8; 2] = [0x78, 0x01];
const ZLIB1: [u8; 2] = [0x78, 0x9C];
const ZLIB2: [u8; 2] = [0x78, 0xDA];
const ZSTD: [u8; 4] = [0x28, 0xB5, 0x2F, 0xFD];

/// check if csv file is compressed
pub fn is_compressed(bytes: &[u8]) -> bool {
bytes.starts_with(&ZLIB0)
|| bytes.starts_with(&ZLIB1)
|| bytes.starts_with(&ZLIB2)
|| bytes.starts_with(&GZIP)
|| bytes.starts_with(&ZSTD)
}

#[cfg(any(feature = "decompress", feature = "decompress-fast"))]
fn decompress_impl<R: Read>(
decoder: &mut R,
Expand Down Expand Up @@ -145,6 +129,7 @@ pub(crate) fn decompress(
quote_char: Option<u8>,
eol_char: u8,
) -> Option<Vec<u8>> {
use crate::utils::compression::magic::*;
if bytes.starts_with(&GZIP) {
let mut decoder = flate2::read::MultiGzDecoder::new(bytes);
decompress_impl(&mut decoder, n_rows, separator, quote_char, eol_char)
Expand Down
19 changes: 19 additions & 0 deletions crates/polars-io/src/utils/compression.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
// magic numbers
pub mod magic {
pub const GZIP: [u8; 2] = [31, 139];
pub const ZLIB0: [u8; 2] = [0x78, 0x01];
pub const ZLIB1: [u8; 2] = [0x78, 0x9C];
pub const ZLIB2: [u8; 2] = [0x78, 0xDA];
pub const ZSTD: [u8; 4] = [0x28, 0xB5, 0x2F, 0xFD];
}

/// check if csv file is compressed
pub fn is_compressed(bytes: &[u8]) -> bool {
use magic::*;

bytes.starts_with(&ZLIB0)
|| bytes.starts_with(&ZLIB1)
|| bytes.starts_with(&ZLIB2)
|| bytes.starts_with(&GZIP)
|| bytes.starts_with(&ZSTD)
}
2 changes: 2 additions & 0 deletions crates/polars-io/src/utils/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
pub mod compression;
mod other;

pub use compression::is_compressed;
pub use other::*;

pub const URL_ENCODE_CHAR_SET: &percent_encoding::AsciiSet = &percent_encoding::CONTROLS
Expand Down
47 changes: 47 additions & 0 deletions crates/polars-io/src/utils/other.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use once_cell::sync::Lazy;
use polars_core::prelude::*;
#[cfg(any(feature = "ipc_streaming", feature = "parquet"))]
use polars_core::utils::{accumulate_dataframes_vertical_unchecked, split_df_as_ref};
use polars_error::to_compute_err;
use regex::{Regex, RegexBuilder};

use crate::mmap::{MmapBytesReader, ReaderBytes};
Expand Down Expand Up @@ -41,6 +42,52 @@ pub fn get_reader_bytes<'a, R: Read + MmapBytesReader + ?Sized>(
}
}

/// Decompress `bytes` if compression is detected, otherwise simply return it.
/// An `out` vec must be given for ownership of the decompressed data.
///
/// # Safety
/// The `out` vec outlives `bytes` (declare `out` first).
pub unsafe fn maybe_decompress_bytes<'a>(
bytes: &'a [u8],
out: &'a mut Vec<u8>,
) -> PolarsResult<&'a [u8]> {
assert!(out.is_empty());
use crate::prelude::is_compressed;
let is_compressed = bytes.len() >= 4 && is_compressed(bytes);

if is_compressed {
#[cfg(any(feature = "decompress", feature = "decompress-fast"))]
{
use crate::utils::compression::magic::*;

if bytes.starts_with(&GZIP) {
flate2::read::MultiGzDecoder::new(bytes)
.read_to_end(out)
.map_err(to_compute_err)?;
} else if bytes.starts_with(&ZLIB0)
|| bytes.starts_with(&ZLIB1)
|| bytes.starts_with(&ZLIB2)
{
flate2::read::ZlibDecoder::new(bytes)
.read_to_end(out)
.map_err(to_compute_err)?;
} else if bytes.starts_with(&ZSTD) {
zstd::Decoder::new(bytes)?.read_to_end(out)?;
} else {
polars_bail!(ComputeError: "unimplemented compression format")
}

Ok(out)
}
#[cfg(not(any(feature = "decompress", feature = "decompress-fast")))]
{
panic!("cannot decompress without 'decompress' or 'decompress-fast' feature")
}
} else {
Ok(bytes)
}
}

/// Compute `remaining_rows_to_read` to be taken per file up front, so we can actually read
/// concurrently/parallel
///
Expand Down
1 change: 1 addition & 0 deletions crates/polars-mem-engine/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ description = "In memory engine of the Polars project."
[dependencies]
arrow = { workspace = true }
futures = { workspace = true, optional = true }
memmap = { workspace = true }
polars-core = { workspace = true, features = ["lazy"] }
polars-error = { workspace = true }
polars-expr = { workspace = true }
Expand Down
27 changes: 18 additions & 9 deletions crates/polars-mem-engine/src/executors/scan/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,14 +65,18 @@ impl CsvExec {
let mut df = if run_async {
#[cfg(feature = "cloud")]
{
let file = polars_io::file_cache::FILE_CACHE
.get_entry(path.to_str().unwrap())
// Safety: This was initialized by schema inference.
.unwrap()
.try_open_assume_latest()?;
let owned = &mut vec![];
let mmap = unsafe { memmap::Mmap::map(&file).unwrap() };

options
.into_reader_with_file_handle(
polars_io::file_cache::FILE_CACHE
.get_entry(path.to_str().unwrap())
// Safety: This was initialized by schema inference.
.unwrap()
.try_open_assume_latest()?,
)
.into_reader_with_file_handle(std::io::Cursor::new(unsafe {
maybe_decompress_bytes(mmap.as_ref(), owned)
}?))
._with_predicate(predicate.clone())
.finish()
}
Expand All @@ -81,9 +85,14 @@ impl CsvExec {
panic!("required feature `cloud` is not enabled")
}
} else {
let file = polars_utils::open_file(path)?;
let mmap = unsafe { memmap::Mmap::map(&file).unwrap() };
let owned = &mut vec![];

options
.try_into_reader_with_file_path(Some(path.clone()))
.unwrap()
.into_reader_with_file_handle(std::io::Cursor::new(unsafe {
maybe_decompress_bytes(mmap.as_ref(), owned)
}?))
._with_predicate(predicate.clone())
.finish()
}?;
Expand Down
46 changes: 27 additions & 19 deletions crates/polars-mem-engine/src/executors/scan/ndjson.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,32 +69,40 @@ impl JsonExec {
return None;
}

let reader = if run_async {
JsonLineReader::new({
#[cfg(feature = "cloud")]
let file = if run_async {
#[cfg(feature = "cloud")]
{
match polars_io::file_cache::FILE_CACHE
.get_entry(p.to_str().unwrap())
// Safety: This was initialized by schema inference.
.unwrap()
.try_open_assume_latest()
{
match polars_io::file_cache::FILE_CACHE
.get_entry(p.to_str().unwrap())
// Safety: This was initialized by schema inference.
.unwrap()
.try_open_assume_latest()
{
Ok(v) => v,
Err(e) => return Some(Err(e)),
}
Ok(v) => v,
Err(e) => return Some(Err(e)),
}
#[cfg(not(feature = "cloud"))]
{
panic!("required feature `cloud` is not enabled")
}
})
}
#[cfg(not(feature = "cloud"))]
{
panic!("required feature `cloud` is not enabled")
}
} else {
match JsonLineReader::from_path(p) {
Ok(r) => r,
match polars_utils::open_file(p.as_ref()) {
Ok(v) => v,
Err(e) => return Some(Err(e)),
}
};

let mmap = unsafe { memmap::Mmap::map(&file).unwrap() };
let owned = &mut vec![];
let curs = std::io::Cursor::new(
match unsafe { maybe_decompress_bytes(mmap.as_ref(), owned) } {
Ok(v) => v,
Err(e) => return Some(Err(e)),
},
);
let reader = JsonLineReader::new(curs);

let row_index = self.file_scan_options.row_index.as_mut();

let df = reader
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ impl Source for GroupBySource {
if partition_dir.exists() {
for file in std::fs::read_dir(partition_dir).expect("should be there") {
let spilled = file.unwrap().path();
let file = polars_utils::open_file(spilled)?;
let file = polars_utils::open_file(&spilled)?;
let reader = IpcReader::new(file);
let spilled = reader.finish().unwrap();
if spilled.n_chunks() > 1 {
Expand Down
1 change: 1 addition & 0 deletions crates/polars-plan/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ ciborium = { workspace = true, optional = true }
either = { workspace = true }
futures = { workspace = true, optional = true }
hashbrown = { workspace = true }
memmap = { workspace = true }
once_cell = { workspace = true }
percent-encoding = { workspace = true }
pyo3 = { workspace = true, optional = true }
Expand Down
36 changes: 19 additions & 17 deletions crates/polars-plan/src/plans/conversion/scans.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,6 @@ pub(super) fn csv_file_info(
use std::io::{Read, Seek};

use polars_core::{config, POOL};
use polars_io::csv::read::is_compressed;
use polars_io::csv::read::schema_inference::SchemaInferenceResult;
use polars_io::utils::get_reader_bytes;
use rayon::iter::{IntoParallelIterator, ParallelIterator};
Expand Down Expand Up @@ -173,7 +172,7 @@ pub(super) fn csv_file_info(
};

let infer_schema_func = |i| {
let mut file = if run_async {
let file = if run_async {
#[cfg(feature = "cloud")]
{
let entry: &Arc<polars_io::file_cache::FileCacheEntry> =
Expand All @@ -185,24 +184,22 @@ pub(super) fn csv_file_info(
panic!("required feature `cloud` is not enabled")
}
} else {
polars_utils::open_file(paths.get(i).unwrap())?
let p: &PathBuf = &paths[i];
polars_utils::open_file(p.as_ref())?
};

let mut magic_nr = [0u8; 4];
let res_len = file.read(&mut magic_nr)?;
if res_len < 2 {
if csv_options.raise_if_empty {
polars_bail!(NoData: "empty CSV")
}
} else {
polars_ensure!(
!is_compressed(&magic_nr),
ComputeError: "cannot scan compressed csv; use `read_csv` for compressed data",
);
let mmap = unsafe { memmap::Mmap::map(&file).unwrap() };
let owned = &mut vec![];

let mut curs =
std::io::Cursor::new(unsafe { maybe_decompress_bytes(mmap.as_ref(), owned) }?);

if curs.read(&mut [0; 4])? < 2 && csv_options.raise_if_empty {
polars_bail!(NoData: "empty CSV")
}
curs.rewind()?;

file.rewind()?;
let reader_bytes = get_reader_bytes(&mut file).expect("could not mmap file");
let reader_bytes = get_reader_bytes(&mut curs).expect("could not mmap file");

// this needs a way to estimated bytes/rows.
let si_result =
Expand Down Expand Up @@ -323,7 +320,12 @@ pub(super) fn ndjson_file_info(
} else {
polars_utils::open_file(first_path)?
};
let mut reader = std::io::BufReader::new(f);

let owned = &mut vec![];
let mmap = unsafe { memmap::Mmap::map(&f).unwrap() };

let mut reader =
std::io::BufReader::new(unsafe { maybe_decompress_bytes(mmap.as_ref(), owned) }?);

let (mut reader_schema, schema) = if let Some(schema) = ndjson_options.schema.take() {
if file_options.row_index.is_none() {
Expand Down
14 changes: 4 additions & 10 deletions crates/polars-utils/src/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,10 @@ pub fn _limit_path_len_io_err(path: &Path, err: io::Error) -> PolarsError {
io::Error::new(err.kind(), msg).into()
}

pub fn open_file<P>(path: P) -> PolarsResult<File>
where
P: AsRef<Path>,
{
File::open(&path).map_err(|err| _limit_path_len_io_err(path.as_ref(), err))
pub fn open_file(path: &Path) -> PolarsResult<File> {
File::open(path).map_err(|err| _limit_path_len_io_err(path, err))
}

pub fn create_file<P>(path: P) -> PolarsResult<File>
where
P: AsRef<Path>,
{
File::create(&path).map_err(|err| _limit_path_len_io_err(path.as_ref(), err))
pub fn create_file(path: &Path) -> PolarsResult<File> {
File::create(path).map_err(|err| _limit_path_len_io_err(path, err))
}
2 changes: 1 addition & 1 deletion crates/polars/tests/it/io/parquet/write/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ async fn test_column_async(column: &str, compression: CompressionOptions) -> Par
#[test]
fn test_parquet() {
// In CI: This test will be skipped because the file does not exist.
if let Ok(r) = polars_utils::open_file("data/simple.parquet") {
if let Ok(r) = polars_utils::open_file("data/simple.parquet".as_ref()) {
let reader = ParquetReader::new(r);
let df = reader.finish().unwrap();
assert_eq!(df.get_column_names(), ["a", "b"]);
Expand Down
Loading