Skip to content

Commit

Permalink
feat: Raise more informative error message for directories containing…
Browse files Browse the repository at this point in the history
… files with mixed extensions (pola-rs#17480)
  • Loading branch information
nameexhaustion authored and Henry Harbeck committed Jul 8, 2024
1 parent 18777f5 commit acba082
Show file tree
Hide file tree
Showing 6 changed files with 87 additions and 26 deletions.
4 changes: 2 additions & 2 deletions crates/polars-lazy/src/scan/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ impl LazyCsvReader {
where
F: Fn(Schema) -> PolarsResult<Schema>,
{
let paths = self.expand_paths(false)?.0;
let paths = self.expand_paths_default()?;
let Some(path) = paths.first() else {
polars_bail!(ComputeError: "no paths specified for this reader");
};
Expand Down Expand Up @@ -262,7 +262,7 @@ impl LazyFileListReader for LazyCsvReader {
/// Get the final [LazyFrame].
fn finish(self) -> PolarsResult<LazyFrame> {
// `expand_paths` respects globs
let paths = self.expand_paths(false)?.0;
let paths = self.expand_paths_default()?;

let mut lf: LazyFrame =
DslBuilder::scan_csv(paths, self.read_options, self.cache, self.cloud_options)?
Expand Down
58 changes: 53 additions & 5 deletions crates/polars-lazy/src/scan/file_list_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,27 @@ pub(super) fn get_glob_start_idx(path: &[u8]) -> Option<usize> {
memchr::memchr3(b'*', b'?', b'[', path)
}

/// Checks if `expanded_paths` were expanded from a single directory
pub(super) fn expanded_from_single_directory<P: AsRef<std::path::Path>>(
paths: &[P],
expanded_paths: &[P],
) -> bool {
// Single input that isn't a glob
paths.len() == 1 && get_glob_start_idx(paths[0].as_ref().to_str().unwrap().as_bytes()).is_none()
// And isn't a file
&& {
(
// For local paths, we can just use `is_dir`
!is_cloud_url(paths[0].as_ref()) && paths[0].as_ref().is_dir()
)
|| (
// Otherwise we check the output path is different from the input path, so that we also
// handle the case of a directory containing a single file.
!expanded_paths.is_empty() && (paths[0].as_ref() != expanded_paths[0].as_ref())
)
}
}

/// Recursively traverses directories and expands globs if `glob` is `true`.
/// Returns the expanded paths and the index at which to start parsing hive
/// partitions from the path.
Expand Down Expand Up @@ -228,10 +249,31 @@ fn expand_paths(
}
}

Ok((
out_paths.into_iter().collect::<Arc<[_]>>(),
*expand_start_idx,
))
let out_paths = if expanded_from_single_directory(paths, out_paths.as_ref()) {
// Require all file extensions to be the same when expanding a single directory.
let ext = out_paths[0].extension();

(0..out_paths.len())
.map(|i| {
let path = out_paths[i].clone();

if path.extension() != ext {
polars_bail!(
InvalidOperation: r#"directory contained paths with different file extensions: \
first path: {}, second path: {}. Please use a glob pattern to explicitly specify
which files to read (e.g. "dir/**/*", "dir/**/*.parquet")"#,
out_paths[i - 1].to_str().unwrap(), path.to_str().unwrap()
);
};

Ok(path)
})
.collect::<PolarsResult<Arc<[_]>>>()?
} else {
Arc::<[_]>::from(out_paths)
};

Ok((out_paths, *expand_start_idx))
}

/// Reads [LazyFrame] from a filesystem or a cloud storage.
Expand All @@ -245,7 +287,7 @@ pub trait LazyFileListReader: Clone {
return self.finish_no_glob();
}

let paths = self.expand_paths(false)?.0;
let paths = self.expand_paths_default()?;

let lfs = paths
.iter()
Expand Down Expand Up @@ -348,4 +390,10 @@ pub trait LazyFileListReader: Clone {
check_directory_level,
)
}

/// Expand paths without performing any directory level or file extension
/// checks.
fn expand_paths_default(&self) -> PolarsResult<Arc<[PathBuf]>> {
self.expand_paths(false).map(|x| x.0)
}
}
11 changes: 2 additions & 9 deletions crates/polars-lazy/src/scan/ipc.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
use std::path::{Path, PathBuf};

use file_list_reader::get_glob_start_idx;
use file_list_reader::expanded_from_single_directory;
use polars_core::prelude::*;
use polars_io::cloud::CloudOptions;
use polars_io::ipc::IpcScanOptions;
use polars_io::utils::is_cloud_url;
use polars_io::{HiveOptions, RowIndex};

use crate::prelude::*;
Expand Down Expand Up @@ -55,13 +54,7 @@ impl LazyFileListReader for LazyIpcReader {
self.expand_paths(self.args.hive_options.enabled.unwrap_or(false))?;
self.args.hive_options.enabled =
Some(self.args.hive_options.enabled.unwrap_or_else(|| {
self.paths.len() == 1
&& get_glob_start_idx(self.paths[0].to_str().unwrap().as_bytes()).is_none()
&& !paths.is_empty()
&& {
(!is_cloud_url(&paths[0]) && paths[0].is_dir())
|| (paths[0] != self.paths[0])
}
expanded_from_single_directory(self.paths.as_ref(), paths.as_ref())
}));
self.args.hive_options.hive_start_idx = hive_start_idx;
let args = self.args;
Expand Down
2 changes: 1 addition & 1 deletion crates/polars-lazy/src/scan/ndjson.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ impl LazyFileListReader for LazyJsonLineReader {
return self.finish_no_glob();
}

let paths = self.expand_paths(false)?.0;
let paths = self.expand_paths_default()?;

let file_options = FileScanOptions {
n_rows: self.n_rows,
Expand Down
11 changes: 2 additions & 9 deletions crates/polars-lazy/src/scan/parquet.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
use std::path::{Path, PathBuf};

use file_list_reader::expanded_from_single_directory;
use polars_core::prelude::*;
use polars_io::cloud::CloudOptions;
use polars_io::parquet::read::ParallelStrategy;
use polars_io::utils::is_cloud_url;
use polars_io::{HiveOptions, RowIndex};

use crate::prelude::*;
use crate::scan::file_list_reader::get_glob_start_idx;

#[derive(Clone)]
pub struct ScanArgsParquet {
Expand Down Expand Up @@ -63,13 +62,7 @@ impl LazyFileListReader for LazyParquetReader {
self.expand_paths(self.args.hive_options.enabled.unwrap_or(false))?;
self.args.hive_options.enabled =
Some(self.args.hive_options.enabled.unwrap_or_else(|| {
self.paths.len() == 1
&& get_glob_start_idx(self.paths[0].to_str().unwrap().as_bytes()).is_none()
&& !paths.is_empty()
&& {
(!is_cloud_url(&paths[0]) && paths[0].is_dir())
|| (paths[0] != self.paths[0])
}
expanded_from_single_directory(self.paths.as_ref(), paths.as_ref())
}));
self.args.hive_options.hive_start_idx = hive_start_idx;

Expand Down
27 changes: 27 additions & 0 deletions py-polars/tests/unit/io/test_scan.py
Original file line number Diff line number Diff line change
Expand Up @@ -543,3 +543,30 @@ def test_path_expansion_excludes_empty_files_17362(tmp_path: Path) -> None:

assert_frame_equal(pl.scan_parquet(tmp_path).collect(), df)
assert_frame_equal(pl.scan_parquet(tmp_path / "*").collect(), df)


@pytest.mark.write_disk()
def test_scan_single_dir_differing_file_extensions_raises_17436(tmp_path: Path) -> None:
tmp_path.mkdir(exist_ok=True)

df = pl.DataFrame({"x": 1})
df.write_parquet(tmp_path / "data.parquet")
df.write_ipc(tmp_path / "data.ipc")

with pytest.raises(
pl.exceptions.InvalidOperationError, match="different file extensions"
):
pl.scan_parquet(tmp_path).collect()

for lf in [
pl.scan_parquet(tmp_path / "*.parquet"),
pl.scan_ipc(tmp_path / "*.ipc"),
]:
assert_frame_equal(lf.collect(), df)

# Ensure passing a glob doesn't trigger file extension checking
with pytest.raises(
pl.exceptions.ComputeError,
match="parquet: File out of specification: The file must end with PAR1",
):
pl.scan_parquet(tmp_path / "*").collect()

0 comments on commit acba082

Please sign in to comment.