Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: Cache schema resolve back to DSL #17610

Merged
merged 3 commits into from
Jul 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions crates/polars-lazy/src/scan/ndjson.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::num::NonZeroUsize;
use std::path::{Path, PathBuf};
use std::sync::RwLock;

use polars_core::prelude::*;
use polars_io::RowIndex;
Expand Down Expand Up @@ -123,7 +124,7 @@ impl LazyFileListReader for LazyJsonLineReader {

Ok(LazyFrame::from(DslPlan::Scan {
paths,
file_info: None,
file_info: Arc::new(RwLock::new(None)),
hive_parts: None,
predicate: None,
file_options,
Expand Down Expand Up @@ -157,7 +158,7 @@ impl LazyFileListReader for LazyJsonLineReader {

Ok(LazyFrame::from(DslPlan::Scan {
paths: self.paths,
file_info: None,
file_info: Arc::new(RwLock::new(None)),
hive_parts: None,
predicate: None,
file_options,
Expand Down
10 changes: 6 additions & 4 deletions crates/polars-plan/src/plans/builder_dsl.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::sync::RwLock;

use polars_core::prelude::*;
#[cfg(any(feature = "parquet", feature = "ipc", feature = "csv"))]
use polars_io::cloud::CloudOptions;
Expand Down Expand Up @@ -57,7 +59,7 @@ impl DslBuilder {

Ok(DslPlan::Scan {
paths: Arc::new([]),
file_info: Some(file_info),
file_info: Arc::new(RwLock::new(Some(file_info))),
hive_parts: None,
predicate: None,
file_options,
Expand Down Expand Up @@ -103,7 +105,7 @@ impl DslBuilder {
};
Ok(DslPlan::Scan {
paths,
file_info: None,
file_info: Arc::new(RwLock::new(None)),
hive_parts: None,
predicate: None,
file_options: options,
Expand Down Expand Up @@ -137,7 +139,7 @@ impl DslBuilder {

Ok(DslPlan::Scan {
paths,
file_info: None,
file_info: Arc::new(RwLock::new(None)),
hive_parts: None,
file_options: FileScanOptions {
with_columns: None,
Expand Down Expand Up @@ -192,7 +194,7 @@ impl DslBuilder {
};
Ok(DslPlan::Scan {
paths,
file_info: None,
file_info: Arc::new(RwLock::new(None)),
hive_parts: None,
file_options: options,
predicate: None,
Expand Down
68 changes: 44 additions & 24 deletions crates/polars-plan/src/plans/conversion/dsl_to_ir.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,9 +117,23 @@ pub fn to_alp_impl(
FileScan::Anonymous { .. } => paths,
};

let mut file_info = if let Some(file_info) = file_info {
file_info
let file_info_read = file_info.read().unwrap();

// leading `_` as clippy doesn't understand that you don't want to read from a lock guard
// if you want to keep it alive.
let mut _file_info_write: Option<_>;
let mut resolved_file_info = if let Some(file_info) = &*file_info_read {
_file_info_write = None;
let out = file_info.clone();
drop(file_info_read);
out
} else {
// Lock so that we don't resolve the same schema in parallel.
drop(file_info_read);

// Set write lock and keep that lock until all fields in `file_info` are resolved.
_file_info_write = Some(file_info.write().unwrap());

match &mut scan_type {
#[cfg(feature = "parquet")]
FileScan::Parquet {
Expand Down Expand Up @@ -166,7 +180,7 @@ pub fn to_alp_impl(
let hive_parts = if hive_parts.is_some() {
hive_parts
} else if file_options.hive_options.enabled.unwrap()
&& file_info.reader_schema.is_some()
&& resolved_file_info.reader_schema.is_some()
{
#[allow(unused_assignments)]
let mut owned = None;
Expand All @@ -175,7 +189,7 @@ pub fn to_alp_impl(
paths.as_ref(),
file_options.hive_options.hive_start_idx,
file_options.hive_options.schema.clone(),
match file_info.reader_schema.as_ref().unwrap() {
match resolved_file_info.reader_schema.as_ref().unwrap() {
Either::Left(v) => {
owned = Some(Schema::from(v));
owned.as_ref().unwrap()
Expand All @@ -188,47 +202,53 @@ pub fn to_alp_impl(
None
};

if let Some(ref hive_parts) = hive_parts {
let hive_schema = hive_parts[0].schema();
file_info.update_schema_with_hive_schema(hive_schema.clone());
}
// Only if we have a writing file handle we must resolve hive partitions
// update schema's etc.
if let Some(lock) = &mut _file_info_write {
if let Some(ref hive_parts) = hive_parts {
let hive_schema = hive_parts[0].schema();
resolved_file_info.update_schema_with_hive_schema(hive_schema.clone());
}

if let Some(ref file_path_col) = file_options.include_file_paths {
let schema = Arc::make_mut(&mut file_info.schema);
if let Some(ref file_path_col) = file_options.include_file_paths {
let schema = Arc::make_mut(&mut resolved_file_info.schema);

if schema.contains(file_path_col) {
polars_bail!(
Duplicate: r#"column name for file paths "{}" conflicts with column name from file"#,
file_path_col
);
}

if schema.contains(file_path_col) {
polars_bail!(
Duplicate: r#"column name for file paths "{}" conflicts with column name from file"#,
file_path_col
);
schema.insert_at_index(
schema.len(),
file_path_col.as_ref().into(),
DataType::String,
)?;
}

schema.insert_at_index(
schema.len(),
file_path_col.as_ref().into(),
DataType::String,
)?;
**lock = Some(resolved_file_info.clone());
}

file_options.with_columns = if file_info.reader_schema.is_some() {
file_options.with_columns = if resolved_file_info.reader_schema.is_some() {
maybe_init_projection_excluding_hive(
file_info.reader_schema.as_ref().unwrap(),
resolved_file_info.reader_schema.as_ref().unwrap(),
hive_parts.as_ref().map(|x| &x[0]),
)
} else {
None
};

if let Some(row_index) = &file_options.row_index {
let schema = Arc::make_mut(&mut file_info.schema);
let schema = Arc::make_mut(&mut resolved_file_info.schema);
*schema = schema
.new_inserting_at_index(0, row_index.name.as_ref().into(), IDX_DTYPE)
.unwrap();
}

IR::Scan {
paths,
file_info,
file_info: resolved_file_info,
hive_parts,
output_schema: None,
predicate: predicate.map(|expr| to_expr_ir(expr, expr_arena)),
Expand Down
3 changes: 2 additions & 1 deletion crates/polars-plan/src/plans/conversion/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ mod scans;
mod stack_opt;

use std::borrow::Cow;
use std::sync::RwLock;

pub use dsl_to_ir::*;
pub use expr_to_ir::*;
Expand Down Expand Up @@ -52,7 +53,7 @@ impl IR {
file_options: options,
} => DslPlan::Scan {
paths,
file_info: Some(file_info),
file_info: Arc::new(RwLock::new(Some(file_info))),
hive_parts,
predicate: predicate.map(|e| e.to_expr(expr_arena)),
scan_type,
Expand Down
8 changes: 6 additions & 2 deletions crates/polars-plan/src/plans/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::fmt;
use std::fmt::Debug;
use std::path::PathBuf;
use std::sync::Arc;
use std::sync::{Arc, RwLock};

use hive::HivePartitions;
use polars_core::prelude::*;
Expand Down Expand Up @@ -80,7 +80,11 @@ pub enum DslPlan {
Scan {
paths: Arc<[PathBuf]>,
// Option as this is mostly materialized on the IR phase.
file_info: Option<FileInfo>,
// During conversion we update the value in the DSL as well
// This is to cater to use cases where parts of a `LazyFrame`
// are used as base of different queries in a loop. That way
// the expensive schema resolving is cached.
file_info: Arc<RwLock<Option<FileInfo>>>,
hive_parts: Option<Arc<[HivePartitions]>>,
predicate: Option<Expr>,
file_options: FileScanOptions,
Expand Down
Loading