Skip to content

Commit

Permalink
cargo fmt
Browse files Browse the repository at this point in the history
  • Loading branch information
devinjdangelo committed Sep 28, 2023
1 parent 271f838 commit 78cd23b
Showing 1 changed file with 19 additions and 18 deletions.
37 changes: 19 additions & 18 deletions datafusion/core/src/datasource/file_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@

use arrow_array::RecordBatch;

use parquet::arrow::arrow_writer::{ArrowColumnWriter, get_column_writers, ArrowLeafColumn, compute_leaves};
use parquet::arrow::arrow_writer::{
compute_leaves, get_column_writers, ArrowColumnWriter, ArrowLeafColumn,
};
use parquet::column::writer::ColumnCloseResult;
use parquet::file::writer::SerializedFileWriter;
use rand::distributions::DistString;
Expand Down Expand Up @@ -885,22 +887,19 @@ fn spawn_column_parallel_row_group_writer(
parquet_props: Arc<WriterProperties>,
max_buffer_size: usize,
) -> Result<(Vec<ColumnJoinHandle>, Vec<ColSender>)> {

let schema_desc = arrow_to_parquet_schema(&schema)?;
let col_writers =
get_column_writers(&schema_desc, &parquet_props, &schema)?;
let col_writers = get_column_writers(&schema_desc, &parquet_props, &schema)?;
let num_columns = col_writers.len();

let mut col_writer_handles = Vec::with_capacity(num_columns);
let mut col_array_channels = Vec::with_capacity(num_columns);
for writer in col_writers.into_iter() {
// Buffer size of this channel limits the number of arrays queued up for column level serialization
let (send_array, recieve_array) = mpsc::channel::<ArrowLeafColumn>(max_buffer_size);
let (send_array, recieve_array) =
mpsc::channel::<ArrowLeafColumn>(max_buffer_size);
col_array_channels.push(send_array);
col_writer_handles.push(tokio::spawn(column_serializer_task(
recieve_array,
writer,
)))
col_writer_handles
.push(tokio::spawn(column_serializer_task(recieve_array, writer)))
}

Ok((col_writer_handles, col_array_channels))
Expand Down Expand Up @@ -929,8 +928,9 @@ async fn send_arrays_to_col_writers(
.iter()
.zip(rb.columns())
.zip(schema.fields())
.map(|((a,b),c)| (a,b,c)) {
for c in compute_leaves(field, array)?{
.map(|((a, b), c)| (a, b, c))
{
for c in compute_leaves(field, array)? {
tx.send(c).await.map_err(|_| {
DataFusionError::Internal("Unable to send array to writer!".into())
})?;
Expand Down Expand Up @@ -1031,12 +1031,14 @@ fn spawn_parquet_parallel_serialization_task(
for mut rx in rb_recievers {
while let Some(rb) = rx.recv().await {
if current_rg_rows + rb.num_rows() < max_row_group_rows {
send_arrays_to_col_writers(&col_array_channels, &rb, schema.clone()).await?;
send_arrays_to_col_writers(&col_array_channels, &rb, schema.clone())
.await?;
current_rg_rows += rb.num_rows();
} else {
let rows_left = max_row_group_rows - current_rg_rows;
let a = rb.slice(0, rows_left);
send_arrays_to_col_writers(&col_array_channels, &a, schema.clone()).await?;
send_arrays_to_col_writers(&col_array_channels, &a, schema.clone())
.await?;

// Signal the parallel column writers that the RowGroup is done, join and finalize RowGroup
// on a separate task, so that we can immediately start on the next RG before waiting
Expand All @@ -1060,18 +1062,17 @@ fn spawn_parquet_parallel_serialization_task(
writer_props.clone(),
max_buffer_rb,
)?;
send_arrays_to_col_writers(&col_array_channels, &b, schema.clone()).await?;
send_arrays_to_col_writers(&col_array_channels, &b, schema.clone())
.await?;
current_rg_rows = b.num_rows();
}
}
}

// Handle leftover rows as final rowgroup, which may be smaller than max_row_group_rows
drop(col_array_channels);
let finalize_rg_task = spawn_rg_join_and_finalize_task(
column_writer_handles,
current_rg_rows,
);
let finalize_rg_task =
spawn_rg_join_and_finalize_task(column_writer_handles, current_rg_rows);

serialize_tx.send(finalize_rg_task).await.map_err(|_| {
DataFusionError::Internal("Unable to send closed RG to concat task!".into())
Expand Down

0 comments on commit 78cd23b

Please sign in to comment.