Skip to content

Commit

Permalink
Initial commit
Browse files Browse the repository at this point in the history
  • Loading branch information
mustafasrepo committed Feb 21, 2024
1 parent 33f1866 commit d0be310
Showing 1 changed file with 4 additions and 17 deletions.
21 changes: 4 additions & 17 deletions datafusion/physical-plan/src/limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,8 @@ use crate::{
DisplayFormatType, Distribution, EquivalenceProperties, ExecutionPlan, Partitioning,
};

use arrow::array::ArrayRef;
use arrow::datatypes::SchemaRef;
use arrow::record_batch::{RecordBatch, RecordBatchOptions};
use arrow::record_batch::RecordBatch;
use datafusion_common::stats::Precision;
use datafusion_common::{internal_err, DataFusionError, Result};
use datafusion_execution::TaskContext;
Expand Down Expand Up @@ -512,21 +511,8 @@ impl LimitStream {
self.fetch = 0;
self.input = None; // clear input so it can be dropped early

let limited_columns: Vec<ArrayRef> = batch
.columns()
.iter()
.map(|col| col.slice(0, col.len().min(batch_rows)))
.collect();
let options =
RecordBatchOptions::new().with_row_count(Option::from(batch_rows));
Some(
RecordBatch::try_new_with_options(
batch.schema(),
limited_columns,
&options,
)
.unwrap(),
)
// It is guaranteed that batch_rows is <= batch.num_rows
Some(batch.slice(0, batch_rows))
}
}
}
Expand Down Expand Up @@ -575,6 +561,7 @@ mod tests {
use crate::{common, test};

use crate::aggregates::{AggregateExec, AggregateMode, PhysicalGroupBy};
use arrow_array::RecordBatchOptions;
use arrow_schema::Schema;
use datafusion_physical_expr::expressions::col;
use datafusion_physical_expr::PhysicalExpr;
Expand Down

0 comments on commit d0be310

Please sign in to comment.