Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Minor: remove uncessary #cfg test #8036

Merged
merged 3 commits into from
Nov 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions datafusion/core/src/datasource/file_format/arrow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,8 @@ impl FileFormat for ArrowFormat {
const ARROW_MAGIC: [u8; 6] = [b'A', b'R', b'R', b'O', b'W', b'1'];
const CONTINUATION_MARKER: [u8; 4] = [0xff; 4];

/// Custom implementation of inferring schema. Should eventually be moved upstream to arrow-rs.
/// See https://github.com/apache/arrow-rs/issues/5021
/// Custom implementation of inferring schema. Should eventually be moved upstream to arrow-rs.
/// See <https://github.com/apache/arrow-rs/issues/5021>
async fn infer_schema_from_file_stream(
mut stream: BoxStream<'static, object_store::Result<Bytes>>,
) -> Result<SchemaRef> {
Expand Down
60 changes: 27 additions & 33 deletions datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,39 +35,33 @@ use datafusion_physical_expr::expressions::{col, Sum};
use datafusion_physical_expr::{AggregateExpr, PhysicalSortExpr};
use test_utils::add_empty_batches;

#[cfg(test)]
#[allow(clippy::items_after_test_module)]
mod tests {
use super::*;

#[tokio::test(flavor = "multi_thread", worker_threads = 8)]
async fn aggregate_test() {
let test_cases = vec![
vec!["a"],
vec!["b", "a"],
vec!["c", "a"],
vec!["c", "b", "a"],
vec!["d", "a"],
vec!["d", "b", "a"],
vec!["d", "c", "a"],
vec!["d", "c", "b", "a"],
];
let n = 300;
let distincts = vec![10, 20];
for distinct in distincts {
let mut handles = Vec::new();
for i in 0..n {
let test_idx = i % test_cases.len();
let group_by_columns = test_cases[test_idx].clone();
let job = tokio::spawn(run_aggregate_test(
make_staggered_batches::<true>(1000, distinct, i as u64),
group_by_columns,
));
handles.push(job);
}
for job in handles {
job.await.unwrap();
}
#[tokio::test(flavor = "multi_thread", worker_threads = 8)]
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewing this with whitespace blind diff shows what is going on pretty clearly

async fn aggregate_test() {
let test_cases = vec![
vec!["a"],
vec!["b", "a"],
vec!["c", "a"],
vec!["c", "b", "a"],
vec!["d", "a"],
vec!["d", "b", "a"],
vec!["d", "c", "a"],
vec!["d", "c", "b", "a"],
];
let n = 300;
let distincts = vec![10, 20];
for distinct in distincts {
let mut handles = Vec::new();
for i in 0..n {
let test_idx = i % test_cases.len();
let group_by_columns = test_cases[test_idx].clone();
let job = tokio::spawn(run_aggregate_test(
make_staggered_batches::<true>(1000, distinct, i as u64),
group_by_columns,
));
handles.push(job);
}
for job in handles {
job.await.unwrap();
}
}
}
Expand Down
192 changes: 93 additions & 99 deletions datafusion/core/tests/fuzz_cases/window_fuzz.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,108 +44,102 @@ use hashbrown::HashMap;
use rand::rngs::StdRng;
use rand::{Rng, SeedableRng};

#[cfg(test)]
#[allow(clippy::items_after_test_module)]
mod tests {
use super::*;

use datafusion_physical_plan::windows::PartitionSearchMode::{
Linear, PartiallySorted, Sorted,
};
use datafusion_physical_plan::windows::PartitionSearchMode::{
Linear, PartiallySorted, Sorted,
};

#[tokio::test(flavor = "multi_thread", worker_threads = 16)]
async fn window_bounded_window_random_comparison() -> Result<()> {
// make_staggered_batches gives result sorted according to a, b, c
// In the test cases first entry represents partition by columns
// Second entry represents order by columns.
// Third entry represents search mode.
// In sorted mode physical plans are in the form for WindowAggExec
//```
// WindowAggExec
// MemoryExec]
// ```
// and in the form for BoundedWindowAggExec
// ```
// BoundedWindowAggExec
// MemoryExec
// ```
// In Linear and PartiallySorted mode physical plans are in the form for WindowAggExec
//```
// WindowAggExec
// SortExec(required by window function)
// MemoryExec]
// ```
// and in the form for BoundedWindowAggExec
// ```
// BoundedWindowAggExec
// MemoryExec
// ```
let test_cases = vec![
(vec!["a"], vec!["a"], Sorted),
(vec!["a"], vec!["b"], Sorted),
(vec!["a"], vec!["a", "b"], Sorted),
(vec!["a"], vec!["b", "c"], Sorted),
(vec!["a"], vec!["a", "b", "c"], Sorted),
(vec!["b"], vec!["a"], Linear),
(vec!["b"], vec!["a", "b"], Linear),
(vec!["b"], vec!["a", "c"], Linear),
(vec!["b"], vec!["a", "b", "c"], Linear),
(vec!["c"], vec!["a"], Linear),
(vec!["c"], vec!["a", "b"], Linear),
(vec!["c"], vec!["a", "c"], Linear),
(vec!["c"], vec!["a", "b", "c"], Linear),
(vec!["b", "a"], vec!["a"], Sorted),
(vec!["b", "a"], vec!["b"], Sorted),
(vec!["b", "a"], vec!["c"], Sorted),
(vec!["b", "a"], vec!["a", "b"], Sorted),
(vec!["b", "a"], vec!["b", "c"], Sorted),
(vec!["b", "a"], vec!["a", "c"], Sorted),
(vec!["b", "a"], vec!["a", "b", "c"], Sorted),
(vec!["c", "b"], vec!["a"], Linear),
(vec!["c", "b"], vec!["a", "b"], Linear),
(vec!["c", "b"], vec!["a", "c"], Linear),
(vec!["c", "b"], vec!["a", "b", "c"], Linear),
(vec!["c", "a"], vec!["a"], PartiallySorted(vec![1])),
(vec!["c", "a"], vec!["b"], PartiallySorted(vec![1])),
(vec!["c", "a"], vec!["c"], PartiallySorted(vec![1])),
(vec!["c", "a"], vec!["a", "b"], PartiallySorted(vec![1])),
(vec!["c", "a"], vec!["b", "c"], PartiallySorted(vec![1])),
(vec!["c", "a"], vec!["a", "c"], PartiallySorted(vec![1])),
(
vec!["c", "a"],
vec!["a", "b", "c"],
PartiallySorted(vec![1]),
),
(vec!["c", "b", "a"], vec!["a"], Sorted),
(vec!["c", "b", "a"], vec!["b"], Sorted),
(vec!["c", "b", "a"], vec!["c"], Sorted),
(vec!["c", "b", "a"], vec!["a", "b"], Sorted),
(vec!["c", "b", "a"], vec!["b", "c"], Sorted),
(vec!["c", "b", "a"], vec!["a", "c"], Sorted),
(vec!["c", "b", "a"], vec!["a", "b", "c"], Sorted),
];
let n = 300;
let n_distincts = vec![10, 20];
for n_distinct in n_distincts {
let mut handles = Vec::new();
for i in 0..n {
let idx = i % test_cases.len();
let (pb_cols, ob_cols, search_mode) = test_cases[idx].clone();
let job = tokio::spawn(run_window_test(
make_staggered_batches::<true>(1000, n_distinct, i as u64),
i as u64,
pb_cols,
ob_cols,
search_mode,
));
handles.push(job);
}
for job in handles {
job.await.unwrap()?;
}
#[tokio::test(flavor = "multi_thread", worker_threads = 16)]
async fn window_bounded_window_random_comparison() -> Result<()> {
// make_staggered_batches gives result sorted according to a, b, c
// In the test cases first entry represents partition by columns
// Second entry represents order by columns.
// Third entry represents search mode.
// In sorted mode physical plans are in the form for WindowAggExec
//```
// WindowAggExec
// MemoryExec]
// ```
// and in the form for BoundedWindowAggExec
// ```
// BoundedWindowAggExec
// MemoryExec
// ```
// In Linear and PartiallySorted mode physical plans are in the form for WindowAggExec
//```
// WindowAggExec
// SortExec(required by window function)
// MemoryExec]
// ```
// and in the form for BoundedWindowAggExec
// ```
// BoundedWindowAggExec
// MemoryExec
// ```
let test_cases = vec![
(vec!["a"], vec!["a"], Sorted),
(vec!["a"], vec!["b"], Sorted),
(vec!["a"], vec!["a", "b"], Sorted),
(vec!["a"], vec!["b", "c"], Sorted),
(vec!["a"], vec!["a", "b", "c"], Sorted),
(vec!["b"], vec!["a"], Linear),
(vec!["b"], vec!["a", "b"], Linear),
(vec!["b"], vec!["a", "c"], Linear),
(vec!["b"], vec!["a", "b", "c"], Linear),
(vec!["c"], vec!["a"], Linear),
(vec!["c"], vec!["a", "b"], Linear),
(vec!["c"], vec!["a", "c"], Linear),
(vec!["c"], vec!["a", "b", "c"], Linear),
(vec!["b", "a"], vec!["a"], Sorted),
(vec!["b", "a"], vec!["b"], Sorted),
(vec!["b", "a"], vec!["c"], Sorted),
(vec!["b", "a"], vec!["a", "b"], Sorted),
(vec!["b", "a"], vec!["b", "c"], Sorted),
(vec!["b", "a"], vec!["a", "c"], Sorted),
(vec!["b", "a"], vec!["a", "b", "c"], Sorted),
(vec!["c", "b"], vec!["a"], Linear),
(vec!["c", "b"], vec!["a", "b"], Linear),
(vec!["c", "b"], vec!["a", "c"], Linear),
(vec!["c", "b"], vec!["a", "b", "c"], Linear),
(vec!["c", "a"], vec!["a"], PartiallySorted(vec![1])),
(vec!["c", "a"], vec!["b"], PartiallySorted(vec![1])),
(vec!["c", "a"], vec!["c"], PartiallySorted(vec![1])),
(vec!["c", "a"], vec!["a", "b"], PartiallySorted(vec![1])),
(vec!["c", "a"], vec!["b", "c"], PartiallySorted(vec![1])),
(vec!["c", "a"], vec!["a", "c"], PartiallySorted(vec![1])),
(
vec!["c", "a"],
vec!["a", "b", "c"],
PartiallySorted(vec![1]),
),
(vec!["c", "b", "a"], vec!["a"], Sorted),
(vec!["c", "b", "a"], vec!["b"], Sorted),
(vec!["c", "b", "a"], vec!["c"], Sorted),
(vec!["c", "b", "a"], vec!["a", "b"], Sorted),
(vec!["c", "b", "a"], vec!["b", "c"], Sorted),
(vec!["c", "b", "a"], vec!["a", "c"], Sorted),
(vec!["c", "b", "a"], vec!["a", "b", "c"], Sorted),
];
let n = 300;
let n_distincts = vec![10, 20];
for n_distinct in n_distincts {
let mut handles = Vec::new();
for i in 0..n {
let idx = i % test_cases.len();
let (pb_cols, ob_cols, search_mode) = test_cases[idx].clone();
let job = tokio::spawn(run_window_test(
make_staggered_batches::<true>(1000, n_distinct, i as u64),
i as u64,
pb_cols,
ob_cols,
search_mode,
));
handles.push(job);
}
for job in handles {
job.await.unwrap()?;
}
Ok(())
}
Ok(())
}

fn get_random_function(
Expand Down