From faf2c3548e9d1d83ccdee59d0817743b88810815 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Fri, 7 Jul 2023 04:06:38 -0400 Subject: [PATCH] Fix build on main due to logical conflict --- .../core/src/physical_plan/streaming.rs | 72 ++++++++----------- 1 file changed, 29 insertions(+), 43 deletions(-) diff --git a/datafusion/core/src/physical_plan/streaming.rs b/datafusion/core/src/physical_plan/streaming.rs index 22ae2af205a9..97b244d1acf5 100644 --- a/datafusion/core/src/physical_plan/streaming.rs +++ b/datafusion/core/src/physical_plan/streaming.rs @@ -30,9 +30,7 @@ use log::debug; use crate::datasource::physical_plan::{OutputOrderingDisplay, ProjectSchemaDisplay}; use crate::physical_plan::stream::RecordBatchStreamAdapter; -use crate::physical_plan::{ - DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream, -}; +use crate::physical_plan::{ExecutionPlan, Partitioning, SendableRecordBatchStream}; use datafusion_execution::TaskContext; use super::{DisplayAs, DisplayFormatType}; @@ -106,7 +104,34 @@ impl DisplayAs for StreamingTableExec { ) -> std::fmt::Result { match t { DisplayFormatType::Default | DisplayFormatType::Verbose => { - write!(f, "StreamingTableExec") + write!( + f, + "StreamingTableExec: partition_sizes={:?}", + self.partitions.len(), + )?; + if !self.projected_schema.fields().is_empty() { + write!( + f, + ", projection={}", + ProjectSchemaDisplay(&self.projected_schema) + )?; + } + if self.infinite { + write!(f, ", infinite_source=true")?; + } + + self.projected_output_ordering + .as_deref() + .map_or(Ok(()), |ordering| { + if !ordering.is_empty() { + write!( + f, + ", output_ordering={}", + OutputOrderingDisplay(ordering) + )?; + } + Ok(()) + }) } } } @@ -164,45 +189,6 @@ impl ExecutionPlan for StreamingTableExec { }) } - fn fmt_as( - &self, - t: DisplayFormatType, - f: &mut std::fmt::Formatter, - ) -> std::fmt::Result { - match t { - DisplayFormatType::Default | DisplayFormatType::Verbose => { - write!( - f, - "StreamingTableExec: partition_sizes={:?}", - self.partitions.len(), - )?; - if !self.projected_schema.fields().is_empty() { - write!( - f, - ", projection={}", - ProjectSchemaDisplay(&self.projected_schema) - )?; - } - if self.infinite { - write!(f, ", infinite_source=true")?; - } - - self.projected_output_ordering - .as_deref() - .map_or(Ok(()), |ordering| { - if !ordering.is_empty() { - write!( - f, - ", output_ordering={}", - OutputOrderingDisplay(ordering) - )?; - } - Ok(()) - }) - } - } - } - fn statistics(&self) -> Statistics { Default::default() }