Skip to content

Commit

Permalink
fix: Fix sink_ipc_cloud panicking with runtime error
Browse files Browse the repository at this point in the history
Fixes #13614

When writing to ObjectStore-compatible storage using the IPC format,
it seems like the `block_on` calls inside the constructed `CloudWriter`
might sometimes get called inside another `block_on` call.
Tokio does not like this, resulting in a panic.

This PR resolves this issue by using `block_on_potential_spawn`
in the necessary places instead.

This is a fix that was originally written by @Qqwy in another PR:
#14262

Co-Authored-By: Qqwy / Marten <qqwy@gmx.com>
  • Loading branch information
philss and Qqwy committed Aug 8, 2024
1 parent 3dda47e commit 57bd7bc
Show file tree
Hide file tree
Showing 5 changed files with 97 additions and 53 deletions.
10 changes: 5 additions & 5 deletions crates/polars-io/src/cloud/adaptors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ impl CloudWriter {
/// Creates a new (current-thread) Tokio runtime
/// which bridges the sync writing process with the async ObjectStore multipart uploading.
/// TODO: Naming?
pub async fn new_with_object_store(
pub fn new_with_object_store(
object_store: Arc<dyn ObjectStore>,
path: Path,
) -> PolarsResult<Self> {
Expand All @@ -42,7 +42,7 @@ impl CloudWriter {
pub async fn new(uri: &str, cloud_options: Option<&CloudOptions>) -> PolarsResult<Self> {
let (cloud_location, object_store) =
crate::cloud::build_object_store(uri, cloud_options, false).await?;
Self::new_with_object_store(object_store, cloud_location.prefix.into()).await
Self::new_with_object_store(object_store, cloud_location.prefix.into())
}

async fn abort(&mut self) -> PolarsResult<()> {
Expand All @@ -56,7 +56,7 @@ impl std::io::Write for CloudWriter {
// We extend the lifetime for the duration of this function. This is safe as well block the
// async runtime here
let buf = unsafe { std::mem::transmute::<&[u8], &'static [u8]>(buf) };
get_runtime().block_on(async {
get_runtime().block_on_potential_spawn(async {
let res = self.writer.write_all(buf).await;
if res.is_err() {
let _ = self.abort().await;
Expand All @@ -66,7 +66,7 @@ impl std::io::Write for CloudWriter {
}

fn flush(&mut self) -> std::io::Result<()> {
get_runtime().block_on(async {
get_runtime().block_on_potential_spawn(async {
let res = self.writer.flush().await;
if res.is_err() {
let _ = self.abort().await;
Expand All @@ -78,7 +78,7 @@ impl std::io::Write for CloudWriter {

impl Drop for CloudWriter {
fn drop(&mut self) {
let _ = get_runtime().block_on(self.writer.shutdown());
let _ = get_runtime().block_on_potential_spawn(self.writer.shutdown());
}
}

Expand Down
39 changes: 20 additions & 19 deletions crates/polars-pipe/src/executors/sinks/output/ipc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,34 +42,35 @@ pub struct IpcCloudSink {}
#[cfg(feature = "cloud")]
impl IpcCloudSink {
#[allow(clippy::new_ret_no_self)]
#[tokio::main(flavor = "current_thread")]
pub async fn new(
pub fn new(
uri: &str,
cloud_options: Option<&polars_io::cloud::CloudOptions>,
ipc_options: IpcWriterOptions,
schema: &Schema,
) -> PolarsResult<FilesSink> {
let cloud_writer = polars_io::cloud::CloudWriter::new(uri, cloud_options).await?;
let writer = IpcWriter::new(cloud_writer)
.with_compression(ipc_options.compression)
.batched(schema)?;
polars_io::pl_async::get_runtime().block_on_potential_spawn(async {
let cloud_writer = polars_io::cloud::CloudWriter::new(uri, cloud_options).await?;
let writer = IpcWriter::new(cloud_writer)
.with_compression(ipc_options.compression)
.batched(schema)?;

let writer = Box::new(writer) as Box<dyn SinkWriter + Send>;
let writer = Box::new(writer) as Box<dyn SinkWriter + Send>;

let morsels_per_sink = morsels_per_sink();
let backpressure = morsels_per_sink * 2;
let (sender, receiver) = bounded(backpressure);
let morsels_per_sink = morsels_per_sink();
let backpressure = morsels_per_sink * 2;
let (sender, receiver) = bounded(backpressure);

let io_thread_handle = Arc::new(Some(init_writer_thread(
receiver,
writer,
ipc_options.maintain_order,
morsels_per_sink,
)));
let io_thread_handle = Arc::new(Some(init_writer_thread(
receiver,
writer,
ipc_options.maintain_order,
morsels_per_sink,
)));

Ok(FilesSink {
sender,
io_thread_handle,
Ok(FilesSink {
sender,
io_thread_handle,
})
})
}
}
Expand Down
59 changes: 30 additions & 29 deletions crates/polars-pipe/src/executors/sinks/output/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,40 +144,41 @@ pub struct ParquetCloudSink {}
#[cfg(feature = "cloud")]
impl ParquetCloudSink {
#[allow(clippy::new_ret_no_self)]
#[tokio::main(flavor = "current_thread")]
pub async fn new(
pub fn new(
uri: &str,
cloud_options: Option<&polars_io::cloud::CloudOptions>,
parquet_options: ParquetWriteOptions,
schema: &Schema,
) -> PolarsResult<FilesSink> {
let cloud_writer = polars_io::cloud::CloudWriter::new(uri, cloud_options).await?;
let writer = ParquetWriter::new(cloud_writer)
.with_compression(parquet_options.compression)
.with_data_page_size(parquet_options.data_page_size)
.with_statistics(parquet_options.statistics)
.with_row_group_size(parquet_options.row_group_size)
// This is important! Otherwise we will deadlock
// See: #7074
.set_parallel(false)
.batched(schema)?;

let writer = Box::new(writer) as Box<dyn SinkWriter + Send>;

let morsels_per_sink = morsels_per_sink();
let backpressure = morsels_per_sink * 2;
let (sender, receiver) = bounded(backpressure);

let io_thread_handle = Arc::new(Some(init_writer_thread(
receiver,
writer,
true,
morsels_per_sink,
)));

Ok(FilesSink {
sender,
io_thread_handle,
polars_io::pl_async::get_runtime().block_on_potential_spawn(async {
let cloud_writer = polars_io::cloud::CloudWriter::new(uri, cloud_options).await?;
let writer = ParquetWriter::new(cloud_writer)
.with_compression(parquet_options.compression)
.with_data_page_size(parquet_options.data_page_size)
.with_statistics(parquet_options.statistics)
.with_row_group_size(parquet_options.row_group_size)
// This is important! Otherwise we will deadlock
// See: #7074
.set_parallel(false)
.batched(schema)?;

let writer = Box::new(writer) as Box<dyn SinkWriter + Send>;

let morsels_per_sink = morsels_per_sink();
let backpressure = morsels_per_sink * 2;
let (sender, receiver) = bounded(backpressure);

let io_thread_handle = Arc::new(Some(init_writer_thread(
receiver,
writer,
true,
morsels_per_sink,
)));

Ok(FilesSink {
sender,
io_thread_handle,
})
})
}
}
Expand Down
12 changes: 12 additions & 0 deletions examples/write_ipc_cloud/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
[package]
name = "write_ipc_cloud"
version = "0.1.0"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
aws-creds = "0.36.0"
polars = { path = "../../crates/polars", features = ["lazy", "aws", "ipc", "cloud_write", "streaming"] }

[workspace]
30 changes: 30 additions & 0 deletions examples/write_ipc_cloud/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
use cloud::AmazonS3ConfigKey as Key;
use polars::prelude::*;

const TEST_S3_LOCATION: &str = "s3://test-bucket/test-writes/polars_write_example_cloud.ipc";

fn main() -> PolarsResult<()> {
let cloud_options = cloud::CloudOptions::default().with_aws([
(Key::AccessKeyId, "test".to_string()),
(Key::SecretAccessKey, "test".to_string()),
(Key::Endpoint, "http://localhost:4566".to_string()),
(Key::Region, "us-east-1".to_string()),
]);
let cloud_options = Some(cloud_options);

let df = df!(
"foo" => &[1, 2, 3],
"bar" => &[None, Some("bak"), Some("baz")],
)
.unwrap();

df.lazy()
.sink_ipc_cloud(
TEST_S3_LOCATION.to_string(),
cloud_options,
Default::default(),
)
.unwrap();

Ok(())
}

0 comments on commit 57bd7bc

Please sign in to comment.