Skip to content

Commit

Permalink
refactor(rust): Add the capability to run new streaming engine on tes…
Browse files Browse the repository at this point in the history
…t suite (pola-rs#17706)

Co-authored-by: ritchie <ritchie46@gmail.com>
  • Loading branch information
orlp and ritchie46 authored Jul 19, 2024
1 parent b331538 commit 8928f7a
Show file tree
Hide file tree
Showing 13 changed files with 162 additions and 61 deletions.
9 changes: 9 additions & 0 deletions crates/polars-io/src/pl_async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,15 @@ impl RuntimeManager {
{
self.rt.spawn(future)
}

// See [`tokio::runtime::Runtime::spawn_blocking`].
pub fn spawn_blocking<F, R>(&self, f: F) -> tokio::task::JoinHandle<R>
where
F: FnOnce() -> R + Send + 'static,
R: Send + 'static,
{
self.rt.spawn_blocking(f)
}
}

static RUNTIME: Lazy<RuntimeManager> = Lazy::new(RuntimeManager::new);
Expand Down
59 changes: 48 additions & 11 deletions crates/polars-lazy/src/frame/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -730,20 +730,57 @@ impl LazyFrame {
pub fn collect(self) -> PolarsResult<DataFrame> {
#[cfg(feature = "new_streaming")]
{
if self.opt_state.new_streaming {
let alp_plan = self.to_alp_optimized()?;
let lp_top = alp_plan.lp_top;
let mut ir_arena = alp_plan.lp_arena;
let expr_arena = alp_plan.expr_arena;

let lp_top = ir_arena.add(IR::Sink {
input: lp_top,
payload: SinkType::Memory,
});
let force_new_streaming = self.opt_state.new_streaming;
let mut alp_plan = self.to_alp_optimized()?;
let stream_lp_top = alp_plan.lp_arena.add(IR::Sink {
input: alp_plan.lp_top,
payload: SinkType::Memory,
});

if force_new_streaming {
return polars_stream::run_query(
stream_lp_top,
alp_plan.lp_arena,
&alp_plan.expr_arena,
);
}

return polars_stream::run_query(lp_top, ir_arena, expr_arena);
if std::env::var("POLARS_AUTO_NEW_STREAMING")
.as_deref()
.unwrap_or("")
== "1"
{
let f = || {
polars_stream::run_query(
stream_lp_top,
alp_plan.lp_arena.clone(),
&alp_plan.expr_arena,
)
};
match std::panic::catch_unwind(std::panic::AssertUnwindSafe(f)) {
Ok(r) => return r,
Err(e) => {
// Fallback to normal engine if error is due to not being implemented,
// otherwise propagate error.
if e.downcast_ref::<&str>() != Some(&"not yet implemented") {
if polars_core::config::verbose() {
eprintln!("caught unimplemented error in new streaming engine, falling back to normal engine");
}
std::panic::resume_unwind(e);
}
},
}
}

let mut physical_plan = create_physical_plan(
alp_plan.lp_top,
&mut alp_plan.lp_arena,
&alp_plan.expr_arena,
)?;
let mut state = ExecutionState::new();
physical_plan.execute(&mut state)
}
#[cfg(not(feature = "new_streaming"))]
self._collect_post_opt(|_, _, _| Ok(()))
}

Expand Down
1 change: 1 addition & 0 deletions crates/polars-plan/src/dsl/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -531,6 +531,7 @@ impl Expr {
options: FunctionOptions {
collect_groups: ApplyOptions::ElementWise,
fmt_str: "map",
flags: FunctionFlags::default() | FunctionFlags::OPTIONAL_RE_ENTRANT,
..Default::default()
},
}
Expand Down
2 changes: 1 addition & 1 deletion crates/polars-plan/src/dsl/python_udf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ impl Expr {
},
})
});
let mut flags = FunctionFlags::default();
let mut flags = FunctionFlags::default() | FunctionFlags::OPTIONAL_RE_ENTRANT;
if returns_scalar {
flags |= FunctionFlags::RETURNS_SCALAR;
}
Expand Down
4 changes: 4 additions & 0 deletions crates/polars-plan/src/plans/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,10 @@ bitflags!(
/// head_1(x) -> {1}
/// sum(x) -> {4}
const RETURNS_SCALAR = 1 << 5;
/// This can happen with UDF's that use Polars within the UDF.
/// This can lead to recursively entering the engine and sometimes deadlocks.
/// This flag must be set to handle that.
const OPTIONAL_RE_ENTRANT = 1 << 6;
}
);

Expand Down
35 changes: 23 additions & 12 deletions crates/polars-stream/src/async_executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::cell::{Cell, UnsafeCell};
use std::future::Future;
use std::marker::PhantomData;
use std::panic::AssertUnwindSafe;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::{Arc, OnceLock, Weak};

use crossbeam_deque::{Injector, Steal, Stealer, Worker as WorkQueue};
Expand Down Expand Up @@ -44,6 +44,7 @@ pub enum TaskPriority {
/// Metadata associated with a task to help schedule it and clean it up.
struct TaskMetadata {
priority: TaskPriority,
freshly_spawned: AtomicBool,

task_key: TaskKey,
completed_tasks: Weak<Mutex<Vec<TaskKey>>>,
Expand Down Expand Up @@ -82,12 +83,29 @@ struct Executor {
impl Executor {
fn schedule_task(&self, task: ReadyTask) {
let thread = TLS_THREAD_ID.get();
let priority = task.metadata().priority;
if let Some(ttl) = self.thread_task_lists.get(thread) {
let meta = task.metadata();
let opt_ttl = self.thread_task_lists.get(thread);

let mut use_global_queue = opt_ttl.is_none();
if meta.freshly_spawned.load(Ordering::Relaxed) {
use_global_queue = true;
meta.freshly_spawned.store(false, Ordering::Relaxed);
}

if use_global_queue {
// Scheduled from an unknown thread, add to global queue.
if meta.priority == TaskPriority::High {
self.global_high_prio_task_queue.push(task);
} else {
self.global_low_prio_task_queue.push(task);
}
self.park_group.unpark_one();
} else {
let ttl = opt_ttl.unwrap();
// SAFETY: this slot may only be accessed from the local thread, which we are.
let slot = unsafe { &mut *ttl.local_slot.get() };

if priority == TaskPriority::High {
if meta.priority == TaskPriority::High {
// Insert new task into thread local slot, taking out the old task.
let Some(task) = slot.replace(task) else {
// We pushed a task into our local slot which was empty. Since
Expand All @@ -107,14 +125,6 @@ impl Executor {
self.park_group.unpark_one();
}
}
} else {
// Scheduled from an unknown thread, add to global queue.
if priority == TaskPriority::High {
self.global_high_prio_task_queue.push(task);
} else {
self.global_low_prio_task_queue.push(task);
}
self.park_group.unpark_one();
}
}

Expand Down Expand Up @@ -288,6 +298,7 @@ impl<'scope, 'env> TaskScope<'scope, 'env> {
TaskMetadata {
task_key,
priority,
freshly_spawned: AtomicBool::new(true),
completed_tasks: Arc::downgrade(&self.completed_tasks),
},
)
Expand Down
3 changes: 3 additions & 0 deletions crates/polars-stream/src/execute.rs
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,9 @@ pub fn execute_graph(
break;
}
run_subgraph(graph, &nodes, &pipes, num_pipelines)?;
if polars_core::config::verbose() {
eprintln!("polars-stream: done running graph phase");
}
}

// Ensure everything is done.
Expand Down
4 changes: 2 additions & 2 deletions crates/polars-stream/src/morsel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,8 @@ impl Morsel {
}

#[allow(unused)]
pub fn into_df(self) -> DataFrame {
self.df
pub fn into_inner(self) -> (DataFrame, MorselSeq, Option<WaitToken>) {
(self.df, self.seq, self.consume_token)
}

pub fn df(&self) -> &DataFrame {
Expand Down
74 changes: 44 additions & 30 deletions crates/polars-stream/src/nodes/select.rs
Original file line number Diff line number Diff line change
@@ -1,25 +1,27 @@
use std::sync::Arc;

use polars_core::schema::Schema;
use polars_core::series::Series;
use polars_expr::prelude::PhysicalExpr;

use super::compute_node_prelude::*;

pub struct SelectNode {
selectors: Vec<Arc<dyn PhysicalExpr>>,
selector_reentrant: Vec<bool>,
schema: Arc<Schema>,
extend_original: bool,
}

impl SelectNode {
pub fn new(
selectors: Vec<Arc<dyn PhysicalExpr>>,
selector_reentrant: Vec<bool>,
schema: Arc<Schema>,
extend_original: bool,
) -> Self {
Self {
selectors,
selector_reentrant,
schema,
extend_original,
}
Expand Down Expand Up @@ -52,38 +54,50 @@ impl ComputeNode for SelectNode {
let slf = &*self;
join_handles.push(scope.spawn_task(TaskPriority::High, async move {
while let Ok(morsel) = recv.recv().await {
let morsel = morsel.try_map(|df| {
// Select columns.
let mut selected: Vec<Series> = slf
.selectors
.iter()
.map(|s| s.evaluate(&df, state))
.collect::<PolarsResult<_>>()?;

// Extend or create new dataframe.
let ret = if slf.extend_original {
let mut out = df.clone();
out._add_columns(selected, &slf.schema)?;
out
let (df, seq, consume_token) = morsel.into_inner();
let mut selected = Vec::new();
for (selector, reentrant) in slf.selectors.iter().zip(&slf.selector_reentrant) {
// We need spawn_blocking because evaluate could contain Python UDFs which
// recursively call the executor again.
let s = if *reentrant {
let df = df.clone();
let selector = selector.clone();
let state = state.clone();
polars_io::pl_async::get_runtime()
.spawn_blocking(move || selector.evaluate(&df, &state))
.await
.unwrap()?
} else {
// Broadcast scalars.
let max_non_unit_length = selected
.iter()
.map(|s| s.len())
.filter(|l| *l != 1)
.max()
.unwrap_or(1);
for s in &mut selected {
if s.len() != max_non_unit_length {
assert!(s.len() == 1, "got series of incompatible lengths");
*s = s.new_from_index(0, max_non_unit_length);
}
}
unsafe { DataFrame::new_no_checks(selected) }
selector.evaluate(&df, state)?
};
selected.push(s);
}

PolarsResult::Ok(ret)
})?;
let ret = if slf.extend_original {
let mut out = df;
out._add_columns(selected, &slf.schema)?;
out
} else {
// Broadcast scalars.
let max_non_unit_length = selected
.iter()
.map(|s| s.len())
.filter(|l| *l != 1)
.max()
.unwrap_or(1);
for s in &mut selected {
if s.len() != max_non_unit_length {
assert!(s.len() == 1, "got series of incompatible lengths");
*s = s.new_from_index(0, max_non_unit_length);
}
}
unsafe { DataFrame::new_no_checks(selected) }
};

let mut morsel = Morsel::new(ret, seq);
if let Some(token) = consume_token {
morsel.set_consume_token(token);
}

if send.send(morsel).await.is_err() {
break;
Expand Down
23 changes: 21 additions & 2 deletions crates/polars-stream/src/physical_plan/lower_ir.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::sync::Arc;
use polars_error::PolarsResult;
use polars_expr::reduce::can_convert_into_reduction;
use polars_plan::plans::{AExpr, Context, IR};
use polars_plan::prelude::SinkType;
use polars_plan::prelude::{ArenaExprIter, FunctionFlags, SinkType};
use polars_utils::arena::{Arena, Node};
use slotmap::SlotMap;

Expand All @@ -13,11 +13,20 @@ fn is_streamable(node: Node, arena: &Arena<AExpr>) -> bool {
polars_plan::plans::is_streamable(node, arena, Context::Default)
}

fn has_potential_recurring_entrance(node: Node, arena: &Arena<AExpr>) -> bool {
arena.iter(node).any(|(_n, ae)| match ae {
AExpr::Function { options, .. } | AExpr::AnonymousFunction { options, .. } => {
options.flags.contains(FunctionFlags::OPTIONAL_RE_ENTRANT)
},
_ => false,
})
}

#[recursive::recursive]
pub fn lower_ir(
node: Node,
ir_arena: &mut Arena<IR>,
expr_arena: &mut Arena<AExpr>,
expr_arena: &Arena<AExpr>,
phys_sm: &mut SlotMap<PhysNodeKey, PhysNode>,
) -> PolarsResult<PhysNodeKey> {
let ir_node = ir_arena.get(node);
Expand All @@ -35,12 +44,17 @@ pub fn lower_ir(
schema,
..
} if expr.iter().all(|e| is_streamable(e.node(), expr_arena)) => {
let selector_reentrant = expr
.iter()
.map(|e| has_potential_recurring_entrance(e.node(), expr_arena))
.collect();
let selectors = expr.clone();
let output_schema = schema.clone();
let input = lower_ir(*input, ir_arena, expr_arena, phys_sm)?;
Ok(phys_sm.insert(PhysNode::Select {
input,
selectors,
selector_reentrant,
output_schema,
extend_original: false,
}))
Expand Down Expand Up @@ -76,12 +90,17 @@ pub fn lower_ir(
schema,
..
} if exprs.iter().all(|e| is_streamable(e.node(), expr_arena)) => {
let selector_reentrant = exprs
.iter()
.map(|e| has_potential_recurring_entrance(e.node(), expr_arena))
.collect();
let selectors = exprs.clone();
let output_schema = schema.clone();
let input = lower_ir(*input, ir_arena, expr_arena, phys_sm)?;
Ok(phys_sm.insert(PhysNode::Select {
input,
selectors,
selector_reentrant,
output_schema,
extend_original: true,
}))
Expand Down
1 change: 1 addition & 0 deletions crates/polars-stream/src/physical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ pub enum PhysNode {
Select {
input: PhysNodeKey,
selectors: Vec<ExprIR>,
selector_reentrant: Vec<bool>,
extend_original: bool,
output_schema: Arc<Schema>,
},
Expand Down
Loading

0 comments on commit 8928f7a

Please sign in to comment.