Skip to content

Commit

Permalink
feat(rust!, python): Add context trace to LazyFrame conversion erro…
Browse files Browse the repository at this point in the history
…rs (#15761)
  • Loading branch information
ritchie46 authored Apr 19, 2024
1 parent 27e69b5 commit 0423fa3
Show file tree
Hide file tree
Showing 8 changed files with 180 additions and 87 deletions.
89 changes: 82 additions & 7 deletions crates/polars-error/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@ mod warning;
use std::borrow::Cow;
use std::collections::TryReserveError;
use std::error::Error;
use std::fmt::{self, Display, Formatter};
use std::fmt::{self, Display, Formatter, Write};
use std::ops::Deref;
use std::sync::Arc;
use std::{env, io};

pub use warning::*;
Expand Down Expand Up @@ -56,8 +57,14 @@ pub enum PolarsError {
Duplicate(ErrString),
#[error("invalid operation: {0}")]
InvalidOperation(ErrString),
#[error(transparent)]
Io(#[from] io::Error),
#[error("{}", match msg {
Some(msg) => format!("{}", msg),
None => format!("{}", error)
})]
IO {
error: Arc<io::Error>,
msg: Option<ErrString>,
},
#[error("no data: {0}")]
NoData(ErrString),
#[error("{0}")]
Expand All @@ -72,6 +79,20 @@ pub enum PolarsError {
StringCacheMismatch(ErrString),
#[error("field not found: {0}")]
StructFieldNotFound(ErrString),
#[error("{error}: {msg}")]
Context {
error: Box<PolarsError>,
msg: ErrString,
},
}

impl From<io::Error> for PolarsError {
fn from(value: io::Error) -> Self {
PolarsError::IO {
error: Arc::new(value),
msg: None,
}
}
}

#[cfg(feature = "regex")]
Expand All @@ -84,10 +105,11 @@ impl From<regex::Error> for PolarsError {
#[cfg(feature = "object_store")]
impl From<object_store::Error> for PolarsError {
fn from(err: object_store::Error) -> Self {
PolarsError::Io(std::io::Error::new(
std::io::Error::new(
std::io::ErrorKind::Other,
format!("object-store error: {err:?}"),
))
)
.into()
}
}

Expand Down Expand Up @@ -127,21 +149,74 @@ impl From<TryReserveError> for PolarsError {
pub type PolarsResult<T> = Result<T, PolarsError>;

impl PolarsError {
pub fn wrap_msg(&self, func: &dyn Fn(&str) -> String) -> Self {
pub fn context_trace(self) -> Self {
use PolarsError::*;
match self {
Context { error, msg } => {
let mut current_error = &*error;
let material_error = error.get_err();

let mut messages = vec![&msg];

while let PolarsError::Context { msg, error } = current_error {
current_error = error;
messages.push(msg)
}

let mut bt = String::new();
let first_message = messages.pop().unwrap();

let mut count = 0;
while let Some(msg) = messages.pop() {
count += 1;
writeln!(&mut bt, "\t[{count}] {}", msg).unwrap();
}
material_error.wrap_msg(move |msg| format!("{first_message}\nThe reason: {msg}:\n\nThis error occurred with the following context stack:\n{bt}"))
},
err => err,
}
}

fn wrap_msg<F: FnOnce(&str) -> String>(&self, func: F) -> Self {
use PolarsError::*;
match self {
ColumnNotFound(msg) => ColumnNotFound(func(msg).into()),
ComputeError(msg) => ComputeError(func(msg).into()),
Duplicate(msg) => Duplicate(func(msg).into()),
InvalidOperation(msg) => InvalidOperation(func(msg).into()),
Io(err) => ComputeError(func(&format!("IO: {err}")).into()),
IO { error, msg } => {
let msg = match msg {
Some(msg) => func(msg),
None => func(&format!("{}", error)),
};
IO {
error: error.clone(),
msg: Some(msg.into()),
}
},
NoData(msg) => NoData(func(msg).into()),
OutOfBounds(msg) => OutOfBounds(func(msg).into()),
SchemaFieldNotFound(msg) => SchemaFieldNotFound(func(msg).into()),
SchemaMismatch(msg) => SchemaMismatch(func(msg).into()),
ShapeMismatch(msg) => ShapeMismatch(func(msg).into()),
StringCacheMismatch(msg) => StringCacheMismatch(func(msg).into()),
StructFieldNotFound(msg) => StructFieldNotFound(func(msg).into()),
_ => unreachable!(),
}
}

fn get_err(&self) -> &Self {
use PolarsError::*;
match self {
Context { error, .. } => error.get_err(),
err => err,
}
}

pub fn context(self, msg: ErrString) -> Self {
PolarsError::Context {
msg,
error: Box::new(self),
}
}
}
Expand Down
107 changes: 75 additions & 32 deletions crates/polars-plan/src/logical_plan/conversion/dsl_plan_to_ir_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,23 @@ fn empty_df() -> IR {
}
}

macro_rules! failed_input {
($($t:tt)*) => {
failed_input_args!(stringify!($($t)*))
}
}
macro_rules! failed_input_args {
($name:expr) => {
format!("'{}' input failed to resolve", $name).into()
};
}

macro_rules! failed_here {
($($t:tt)*) => {
format!("'{}' failed", stringify!($($t)*)).into()
}
}

/// converts LogicalPlan to IR
/// it adds expressions & lps to the respective arenas as it traverses the plan
/// finally it returns the top node of the logical plan
Expand Down Expand Up @@ -51,11 +68,9 @@ pub fn to_alp(
metadata,
..
} => {
let (file_info, md) = scans::parquet_file_info(
&paths,
&file_options,
cloud_options.as_ref(),
)?;
let (file_info, md) =
scans::parquet_file_info(&paths, &file_options, cloud_options.as_ref())
.map_err(|e| e.context(failed_here!(parquet scan)))?;
*metadata = md;
file_info
},
Expand All @@ -66,13 +81,15 @@ pub fn to_alp(
..
} => {
let (file_info, md) =
scans::ipc_file_info(&paths, &file_options, cloud_options.as_ref())?;
scans::ipc_file_info(&paths, &file_options, cloud_options.as_ref())
.map_err(|e| e.context(failed_here!(ipc scan)))?;
*metadata = Some(md);
file_info
},
#[cfg(feature = "csv")]
FileScan::Csv { options, .. } => {
scans::csv_file_info(&paths, &file_options, options)?
scans::csv_file_info(&paths, &file_options, options)
.map_err(|e| e.context(failed_here!(csv scan)))?
},
// FileInfo should be set.
FileScan::Anonymous { .. } => unreachable!(),
Expand Down Expand Up @@ -104,7 +121,8 @@ pub fn to_alp(
let inputs = inputs
.into_iter()
.map(|lp| to_alp(lp, expr_arena, lp_arena))
.collect::<PolarsResult<_>>()?;
.collect::<PolarsResult<_>>()
.map_err(|e| e.context(failed_input!(vertical concat)))?;
IR::Union { inputs, options }
},
DslPlan::HConcat {
Expand All @@ -115,21 +133,25 @@ pub fn to_alp(
let inputs = inputs
.into_iter()
.map(|lp| to_alp(lp, expr_arena, lp_arena))
.collect::<PolarsResult<_>>()?;
.collect::<PolarsResult<_>>()
.map_err(|e| e.context(failed_input!(horizontal concat)))?;
IR::HConcat {
inputs,
schema,
options,
}
},
DslPlan::Filter { input, predicate } => {
let input = to_alp(owned(input), expr_arena, lp_arena)?;
let predicate = expand_filter(predicate, input, lp_arena)?;
let input = to_alp(owned(input), expr_arena, lp_arena)
.map_err(|e| e.context(failed_input!(filter)))?;
let predicate = expand_filter(predicate, input, lp_arena)
.map_err(|e| e.context(failed_here!(filter)))?;
let predicate = to_expr_ir(predicate, expr_arena);
IR::Filter { input, predicate }
},
DslPlan::Slice { input, offset, len } => {
let input = to_alp(owned(input), expr_arena, lp_arena)?;
let input = to_alp(owned(input), expr_arena, lp_arena)
.map_err(|e| e.context(failed_input!(slice)))?;
IR::Slice { input, offset, len }
},
DslPlan::DataFrameScan {
Expand All @@ -150,9 +172,11 @@ pub fn to_alp(
input,
options,
} => {
let input = to_alp(owned(input), expr_arena, lp_arena)?;
let input = to_alp(owned(input), expr_arena, lp_arena)
.map_err(|e| e.context(failed_input!(select)))?;
let schema = lp_arena.get(input).schema(lp_arena);
let (exprs, schema) = prepare_projection(expr, &schema)?;
let (exprs, schema) =
prepare_projection(expr, &schema).map_err(|e| e.context(failed_here!(select)))?;

if exprs.is_empty() {
lp_arena.replace(input, empty_df());
Expand All @@ -174,8 +198,10 @@ pub fn to_alp(
slice,
sort_options,
} => {
let input = to_alp(owned(input), expr_arena, lp_arena)?;
let by_column = expand_expressions(input, by_column, lp_arena, expr_arena)?;
let input = to_alp(owned(input), expr_arena, lp_arena)
.map_err(|e| e.context(failed_input!(sort)))?;
let by_column = expand_expressions(input, by_column, lp_arena, expr_arena)
.map_err(|e| e.context(failed_here!(sort)))?;
IR::Sort {
input,
by_column,
Expand All @@ -188,7 +214,8 @@ pub fn to_alp(
id,
cache_hits,
} => {
let input = to_alp(owned(input), expr_arena, lp_arena)?;
let input = to_alp(owned(input), expr_arena, lp_arena)
.map_err(|e| e.context(failed_input!(cache)))?;
IR::Cache {
input,
id,
Expand All @@ -203,10 +230,12 @@ pub fn to_alp(
maintain_order,
options,
} => {
let input = to_alp(owned(input), expr_arena, lp_arena)?;
let input = to_alp(owned(input), expr_arena, lp_arena)
.map_err(|e| e.context(failed_input!(group_by)))?;

let (keys, aggs, schema) =
resolve_group_by(input, keys, aggs, &options, lp_arena, expr_arena)?;
resolve_group_by(input, keys, aggs, &options, lp_arena, expr_arena)
.map_err(|e| e.context(failed_here!(group_by)))?;

let (apply, schema) = if let Some((apply, schema)) = apply {
(Some(apply), schema)
Expand Down Expand Up @@ -240,14 +269,17 @@ pub fn to_alp(
}
}

let input_left = to_alp(owned(input_left), expr_arena, lp_arena)?;
let input_right = to_alp(owned(input_right), expr_arena, lp_arena)?;
let input_left = to_alp(owned(input_left), expr_arena, lp_arena)
.map_err(|e| e.context(failed_input!(join left)))?;
let input_right = to_alp(owned(input_right), expr_arena, lp_arena)
.map_err(|e| e.context(failed_input!(join, right)))?;

let schema_left = lp_arena.get(input_left).schema(lp_arena);
let schema_right = lp_arena.get(input_right).schema(lp_arena);

let schema =
det_join_schema(&schema_left, &schema_right, &left_on, &right_on, &options)?;
det_join_schema(&schema_left, &schema_right, &left_on, &right_on, &options)
.map_err(|e| e.context(failed_here!(join schema resolving)))?;

let left_on = to_expr_irs_ignore_alias(left_on, expr_arena);
let right_on = to_expr_irs_ignore_alias(right_on, expr_arena);
Expand All @@ -266,8 +298,10 @@ pub fn to_alp(
exprs,
options,
} => {
let input = to_alp(owned(input), expr_arena, lp_arena)?;
let (exprs, schema) = resolve_with_columns(exprs, input, lp_arena, expr_arena)?;
let input = to_alp(owned(input), expr_arena, lp_arena)
.map_err(|e| e.context(failed_input!(with_columns)))?;
let (exprs, schema) = resolve_with_columns(exprs, input, lp_arena, expr_arena)
.map_err(|e| e.context(failed_here!(with_columns)))?;
IR::HStack {
input,
exprs,
Expand All @@ -276,11 +310,14 @@ pub fn to_alp(
}
},
DslPlan::Distinct { input, options } => {
let input = to_alp(owned(input), expr_arena, lp_arena)?;
let input = to_alp(owned(input), expr_arena, lp_arena)
.map_err(|e| e.context(failed_input!(unique)))?;
IR::Distinct { input, options }
},
DslPlan::MapFunction { input, function } => {
let input = to_alp(owned(input), expr_arena, lp_arena)?;
let input = to_alp(owned(input), expr_arena, lp_arena).map_err(|e| {
e.context(failed_input_args!(format!("{}", function).to_lowercase()))
})?;
let input_schema = lp_arena.get(input).schema(lp_arena);

match function {
Expand All @@ -295,7 +332,8 @@ pub fn to_alp(
})
.collect::<Vec<_>>();

let (exprs, schema) = resolve_with_columns(exprs, input, lp_arena, expr_arena)?;
let (exprs, schema) = resolve_with_columns(exprs, input, lp_arena, expr_arena)
.map_err(|e| e.context(failed_here!(fill_nan)))?;
IR::HStack {
input,
exprs,
Expand All @@ -315,8 +353,10 @@ pub fn to_alp(
.map(|e| e.is_not_null())
.collect::<Vec<_>>(),
),
}?;
let predicate = rewrite_projections(vec![predicate], &input_schema, &[])?
}
.map_err(|e| e.context(failed_here!(drop_nulls)))?;
let predicate = rewrite_projections(vec![predicate], &input_schema, &[])
.map_err(|e| e.context(failed_here!(drop_nulls)))?
.pop()
.unwrap();
let predicate = to_expr_ir(predicate, expr_arena);
Expand Down Expand Up @@ -427,11 +467,13 @@ pub fn to_alp(
}
},
DslPlan::ExtContext { input, contexts } => {
let input = to_alp(owned(input), expr_arena, lp_arena)?;
let input = to_alp(owned(input), expr_arena, lp_arena)
.map_err(|e| e.context(failed_input!(with_context)))?;
let contexts = contexts
.into_iter()
.map(|lp| to_alp(lp, expr_arena, lp_arena))
.collect::<PolarsResult<Vec<_>>>()?;
.collect::<PolarsResult<Vec<_>>>()
.map_err(|e| e.context(failed_here!(with_context)))?;

let mut schema = (**lp_arena.get(input).schema(lp_arena)).clone();
for input in &contexts {
Expand All @@ -450,7 +492,8 @@ pub fn to_alp(
}
},
DslPlan::Sink { input, payload } => {
let input = to_alp(owned(input), expr_arena, lp_arena)?;
let input = to_alp(owned(input), expr_arena, lp_arena)
.map_err(|e| e.context(failed_input!(sink)))?;
IR::Sink { input, payload }
},
};
Expand Down
2 changes: 1 addition & 1 deletion crates/polars-utils/src/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,6 @@ where
} else {
format!("{err}: {path}")
};
PolarsError::Io(Error::new(err.kind(), msg))
Error::new(err.kind(), msg).into()
})
}
Loading

0 comments on commit 0423fa3

Please sign in to comment.