Skip to content

Commit

Permalink
Remove slice hacks
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed May 12, 2024
1 parent f1cb1db commit e728d95
Show file tree
Hide file tree
Showing 6 changed files with 44 additions and 68 deletions.
3 changes: 3 additions & 0 deletions crates/polars-core/src/frame/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2249,6 +2249,9 @@ impl DataFrame {
if offset == 0 && length == self.height() {
return self.clone();
}
if length == 0 {
return self.clear();
}
let col = self
.columns
.iter()
Expand Down
22 changes: 12 additions & 10 deletions crates/polars-io/src/csv/read/read_impl/batched_mmap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ impl<'a> CoreReader<'a> {
to_cast: self.to_cast,
ignore_errors: self.ignore_errors,
truncate_ragged_lines: self.truncate_ragged_lines,
n_rows: self.n_rows,
remaining: self.n_rows.unwrap_or(usize::MAX),
encoding: self.encoding,
separator: self.separator,
schema: self.schema,
Expand All @@ -197,7 +197,7 @@ pub struct BatchedCsvReaderMmap<'a> {
truncate_ragged_lines: bool,
to_cast: Vec<Field>,
ignore_errors: bool,
n_rows: Option<usize>,
remaining: usize,
encoding: CsvEncoding,
separator: u8,
schema: SchemaRef,
Expand All @@ -211,14 +211,9 @@ pub struct BatchedCsvReaderMmap<'a> {

impl<'a> BatchedCsvReaderMmap<'a> {
pub fn next_batches(&mut self, n: usize) -> PolarsResult<Option<Vec<DataFrame>>> {
if n == 0 {
if n == 0 || self.remaining == 0 {
return Ok(None);
}
if let Some(n_rows) = self.n_rows {
if self.rows_read >= n_rows as IdxSize {
return Ok(None);
}
}

// get next `n` offset positions.
let file_chunks_iter = (&mut self.file_chunks_iter).take(n);
Expand Down Expand Up @@ -274,8 +269,15 @@ impl<'a> BatchedCsvReaderMmap<'a> {
if self.row_index.is_some() {
update_row_counts2(&mut chunks, self.rows_read)
}
for df in &chunks {
self.rows_read += df.height() as IdxSize;
for df in &mut chunks {
let h = df.height();

if self.remaining < h {
*df = df.slice(0, self.remaining)
};
self.remaining = self.remaining.saturating_sub(h);

self.rows_read += h as IdxSize;
}
Ok(Some(chunks))
}
Expand Down
26 changes: 13 additions & 13 deletions crates/polars-io/src/csv/read/read_impl/batched_read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,6 @@ impl<'a> CoreReader<'a> {

Ok(BatchedCsvReaderRead {
chunk_size: self.chunk_size,
finished: false,
file_chunk_reader: chunk_iter,
file_chunks: vec![],
projection,
Expand All @@ -260,7 +259,7 @@ impl<'a> CoreReader<'a> {
to_cast: self.to_cast,
ignore_errors: self.ignore_errors,
truncate_ragged_lines: self.truncate_ragged_lines,
n_rows: self.n_rows,
remaining: self.n_rows.unwrap_or(usize::MAX),
encoding: self.encoding,
separator: self.separator,
schema: self.schema,
Expand All @@ -273,7 +272,6 @@ impl<'a> CoreReader<'a> {

pub struct BatchedCsvReaderRead<'a> {
chunk_size: usize,
finished: bool,
file_chunk_reader: ChunkReader<'a>,
file_chunks: Vec<(SyncPtr<u8>, usize)>,
projection: Vec<usize>,
Expand All @@ -287,7 +285,7 @@ pub struct BatchedCsvReaderRead<'a> {
to_cast: Vec<Field>,
ignore_errors: bool,
truncate_ragged_lines: bool,
n_rows: Option<usize>,
remaining: usize,
encoding: CsvEncoding,
separator: u8,
schema: SchemaRef,
Expand All @@ -302,14 +300,9 @@ pub struct BatchedCsvReaderRead<'a> {
impl<'a> BatchedCsvReaderRead<'a> {
/// `n` number of batches.
pub fn next_batches(&mut self, n: usize) -> PolarsResult<Option<Vec<DataFrame>>> {
if n == 0 || self.finished {
if n == 0 || self.remaining == 0 {
return Ok(None);
}
if let Some(n_rows) = self.n_rows {
if self.rows_read >= n_rows as IdxSize {
return Ok(None);
}
}

// get next `n` offset positions.

Expand All @@ -331,7 +324,7 @@ impl<'a> BatchedCsvReaderRead<'a> {
// get the final slice
self.file_chunks
.push(self.file_chunk_reader.get_buf_remaining());
self.finished = true
self.remaining = 0
}

// depleted the offsets iterator, we are done as well.
Expand Down Expand Up @@ -380,8 +373,15 @@ impl<'a> BatchedCsvReaderRead<'a> {
if self.row_index.is_some() {
update_row_counts2(&mut chunks, self.rows_read)
}
for df in &chunks {
self.rows_read += df.height() as IdxSize;
for df in &mut chunks {
let h = df.height();

if self.remaining < h {
*df = df.slice(0, self.remaining)
};
self.remaining = self.remaining.saturating_sub(h);

self.rows_read += h as IdxSize;
}
Ok(Some(chunks))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,12 @@ fn jit_insert_slice(
sink_nodes: &mut Vec<(usize, Node, Rc<RefCell<u32>>)>,
operator_offset: usize,
) {
// if the join/union has a slice, we add a new slice node
// if the join has a slice, we add a new slice node
// note that we take the offset + 1, because we want to
// slice AFTER the join has happened and the join will be an
// operator
// NOTE: Don't do this for union, that doesn't work.
// TODO! Deal with this in the optimizer.
use IR::*;
let (offset, len) = match lp_arena.get(node) {
Join { options, .. } if options.args.slice.is_some() => {
Expand All @@ -80,14 +82,6 @@ fn jit_insert_slice(
};
(offset, len)
},
Union {
options:
UnionOptions {
slice: Some((offset, len)),
..
},
..
} => (*offset, *len),
_ => return,
};

Expand Down Expand Up @@ -178,7 +172,6 @@ pub(super) fn construct(
},
PipelineNode::Union(node) => {
operator_nodes.push(node);
jit_insert_slice(node, lp_arena, &mut sink_nodes, operator_offset);
let op = get_operator(node, lp_arena, expr_arena, &to_physical_piped_expr)?;
operators.push(op);
},
Expand Down
34 changes: 1 addition & 33 deletions crates/polars-lazy/src/physical_plan/streaming/convert_alp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,26 +81,6 @@ fn insert_file_sink(mut root: Node, lp_arena: &mut Arena<IR>) -> Node {
root
}

fn insert_slice(
root: Node,
offset: i64,
len: IdxSize,
lp_arena: &mut Arena<IR>,
state: &mut Branch,
) -> Node {
let new_loc_child = lp_arena.duplicate(root);
lp_arena.replace(
root,
IR::Slice {
input: new_loc_child,
offset,
len: len as IdxSize,
},
);
state.operators_sinks.push(PipelineNode::Sink(root));
new_loc_child
}

pub(crate) fn insert_streaming_nodes(
root: Node,
lp_arena: &mut Arena<IR>,
Expand Down Expand Up @@ -249,20 +229,8 @@ pub(crate) fn insert_streaming_nodes(
)
}
},
Scan {
file_options: options,
scan_type,
..
} if scan_type.streamable() => {
Scan { scan_type, .. } if scan_type.streamable() => {
if state.streamable {
#[cfg(feature = "csv")]
if matches!(scan_type, FileScan::Csv { .. }) {
// the batched csv reader doesn't stop exactly at n_rows
if let Some(n_rows) = options.n_rows {
root = insert_slice(root, 0, n_rows as IdxSize, lp_arena, &mut state);
}
}

state.sources.push(root);
pipeline_trees[current_idx].push(state)
}
Expand Down
14 changes: 12 additions & 2 deletions crates/polars-plan/src/logical_plan/optimizer/slice_pushdown_lp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,15 +209,25 @@ impl SlicePushDown {
Ok(lp)
}
(Union {mut inputs, mut options }, Some(state)) => {
options.slice = Some((state.offset, state.len as usize));
if state.offset == 0 {
for input in &mut inputs {
let input_lp = lp_arena.take(*input);
let input_lp = self.pushdown(input_lp, Some(state), lp_arena, expr_arena)?;
lp_arena.replace(*input, input_lp);
}
}
Ok(Union {inputs, options})

let lp = Union {inputs, options};
// The in-memory union node is slice aware.
// We still set this information, but the streaming engine will ignore it.
options.slice = Some((state.offset, state.len as usize));

if self.streaming {
// Ensure the slice node remains.
self.no_pushdown_finish_opt(lp, Some(state), lp_arena)
} else {
Ok(lp)
}
},
(Join {
input_left,
Expand Down

0 comments on commit e728d95

Please sign in to comment.