diff --git a/datafusion/physical-expr/src/aggregate/min_max.rs b/datafusion/physical-expr/src/aggregate/min_max.rs index 5c4c48b15803..f5b708e8894e 100644 --- a/datafusion/physical-expr/src/aggregate/min_max.rs +++ b/datafusion/physical-expr/src/aggregate/min_max.rs @@ -53,6 +53,9 @@ use crate::aggregate::utils::down_cast_any_ref; use crate::expressions::format_state_name; use arrow::array::Array; use arrow::array::Decimal128Array; +use arrow::array::Decimal256Array; +use arrow::datatypes::i256; +use arrow::datatypes::Decimal256Type; use super::moving_min_max; @@ -183,6 +186,7 @@ impl AggregateExpr for Max { | Float32 | Float64 | Decimal128(_, _) + | Decimal256(_, _) | Date32 | Date64 | Time32(_) @@ -239,6 +243,9 @@ impl AggregateExpr for Max { Decimal128(_, _) => { instantiate_max_accumulator!(self, i128, Decimal128Type) } + Decimal256(_, _) => { + instantiate_max_accumulator!(self, i256, Decimal256Type) + } // It would be nice to have a fast implementation for Strings as well // https://github.com/apache/arrow-datafusion/issues/6906 @@ -318,6 +325,16 @@ macro_rules! min_max_batch { scale ) } + DataType::Decimal256(precision, scale) => { + typed_min_max_batch!( + $VALUES, + Decimal256Array, + Decimal256, + $OP, + precision, + scale + ) + } // all types that have a natural order DataType::Float64 => { typed_min_max_batch!($VALUES, Float64Array, Float64, $OP) @@ -522,6 +539,19 @@ macro_rules! min_max { ); } } + ( + lhs @ ScalarValue::Decimal256(lhsv, lhsp, lhss), + rhs @ ScalarValue::Decimal256(rhsv, rhsp, rhss) + ) => { + if lhsp.eq(rhsp) && lhss.eq(rhss) { + typed_min_max!(lhsv, rhsv, Decimal256, $OP, lhsp, lhss) + } else { + return internal_err!( + "MIN/MAX is not expected to receive scalars of incompatible types {:?}", + (lhs, rhs) + ); + } + } (ScalarValue::Boolean(lhs), ScalarValue::Boolean(rhs)) => { typed_min_max!(lhs, rhs, Boolean, $OP) } @@ -880,6 +910,7 @@ impl AggregateExpr for Min { | Float32 | Float64 | Decimal128(_, _) + | Decimal256(_, _) | Date32 | Date64 | Time32(_) @@ -935,6 +966,9 @@ impl AggregateExpr for Min { Decimal128(_, _) => { instantiate_min_accumulator!(self, i128, Decimal128Type) } + Decimal256(_, _) => { + instantiate_min_accumulator!(self, i256, Decimal256Type) + } // This is only reached if groups_accumulator_supported is out of sync _ => internal_err!( "GroupsAccumulator not supported for min({})", diff --git a/datafusion/physical-plan/src/sorts/cursor.rs b/datafusion/physical-plan/src/sorts/cursor.rs index 1ca41d4fe21c..df90c97faf68 100644 --- a/datafusion/physical-plan/src/sorts/cursor.rs +++ b/datafusion/physical-plan/src/sorts/cursor.rs @@ -20,125 +20,155 @@ use std::cmp::Ordering; use arrow::buffer::ScalarBuffer; use arrow::compute::SortOptions; use arrow::datatypes::ArrowNativeTypeOp; -use arrow::row::{Row, Rows}; +use arrow::row::Rows; use arrow_array::types::ByteArrayType; -use arrow_array::{Array, ArrowPrimitiveType, GenericByteArray, PrimitiveArray}; +use arrow_array::{ + Array, ArrowPrimitiveType, GenericByteArray, OffsetSizeTrait, PrimitiveArray, +}; +use arrow_buffer::{Buffer, OffsetBuffer}; use datafusion_execution::memory_pool::MemoryReservation; -/// A [`Cursor`] for [`Rows`] -pub struct RowCursor { - cur_row: usize, - num_rows: usize, +/// A comparable collection of values for use with [`Cursor`] +/// +/// This is a trait as there are several specialized implementations, such as for +/// single columns or for normalized multi column keys ([`Rows`]) +pub trait CursorValues { + fn len(&self) -> usize; - rows: Rows, + /// Returns true if `l[l_idx] == r[r_idx]` + fn eq(l: &Self, l_idx: usize, r: &Self, r_idx: usize) -> bool; - /// Tracks for the memory used by in the `Rows` of this - /// cursor. Freed on drop - #[allow(dead_code)] - reservation: MemoryReservation, + /// Returns comparison of `l[l_idx]` and `r[r_idx]` + fn compare(l: &Self, l_idx: usize, r: &Self, r_idx: usize) -> Ordering; } -impl std::fmt::Debug for RowCursor { - fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { - f.debug_struct("SortKeyCursor") - .field("cur_row", &self.cur_row) - .field("num_rows", &self.num_rows) - .finish() - } +/// A comparable cursor, used by sort operations +/// +/// A `Cursor` is a pointer into a collection of rows, stored in +/// [`CursorValues`] +/// +/// ```text +/// +/// ┌───────────────────────┐ +/// │ │ ┌──────────────────────┐ +/// │ ┌─────────┐ ┌─────┐ │ ─ ─ ─ ─│ Cursor │ +/// │ │ 1 │ │ A │ │ │ └──────────────────────┘ +/// │ ├─────────┤ ├─────┤ │ +/// │ │ 2 │ │ A │◀─ ┼ ─ ┘ Cursor tracks an +/// │ └─────────┘ └─────┘ │ offset within a +/// │ ... ... │ CursorValues +/// │ │ +/// │ ┌─────────┐ ┌─────┐ │ +/// │ │ 3 │ │ E │ │ +/// │ └─────────┘ └─────┘ │ +/// │ │ +/// │ CursorValues │ +/// └───────────────────────┘ +/// +/// +/// Store logical rows using +/// one of several formats, +/// with specialized +/// implementations +/// depending on the column +/// types +#[derive(Debug)] +pub struct Cursor { + offset: usize, + values: T, } -impl RowCursor { - /// Create a new SortKeyCursor from `rows` and a `reservation` - /// that tracks its memory. There must be at least one row - /// - /// Panics if the reservation is not for exactly `rows.size()` - /// bytes or if `rows` is empty. - pub fn new(rows: Rows, reservation: MemoryReservation) -> Self { - assert_eq!( - rows.size(), - reservation.size(), - "memory reservation mismatch" - ); - assert!(rows.num_rows() > 0); - Self { - cur_row: 0, - num_rows: rows.num_rows(), - rows, - reservation, - } +impl Cursor { + /// Create a [`Cursor`] from the given [`CursorValues`] + pub fn new(values: T) -> Self { + Self { offset: 0, values } } - /// Returns the current row - fn current(&self) -> Row<'_> { - self.rows.row(self.cur_row) + /// Returns true if there are no more rows in this cursor + pub fn is_finished(&self) -> bool { + self.offset == self.values.len() + } + + /// Advance the cursor, returning the previous row index + pub fn advance(&mut self) -> usize { + let t = self.offset; + self.offset += 1; + t } } -impl PartialEq for RowCursor { +impl PartialEq for Cursor { fn eq(&self, other: &Self) -> bool { - self.current() == other.current() + T::eq(&self.values, self.offset, &other.values, other.offset) } } -impl Eq for RowCursor {} +impl Eq for Cursor {} -impl PartialOrd for RowCursor { +impl PartialOrd for Cursor { fn partial_cmp(&self, other: &Self) -> Option { Some(self.cmp(other)) } } -impl Ord for RowCursor { +impl Ord for Cursor { fn cmp(&self, other: &Self) -> Ordering { - self.current().cmp(&other.current()) + T::compare(&self.values, self.offset, &other.values, other.offset) } } -/// A cursor into a sorted batch of rows. +/// Implements [`CursorValues`] for [`Rows`] /// -/// Each cursor must have at least one row so `advance` can be called at least -/// once prior to calling `is_finished`. -pub trait Cursor: Ord { - /// Returns true if there are no more rows in this cursor - fn is_finished(&self) -> bool; +/// Used for sorting when there are multiple columns in the sort key +#[derive(Debug)] +pub struct RowValues { + rows: Rows, - /// Advance the cursor, returning the previous row index - fn advance(&mut self) -> usize; + /// Tracks for the memory used by in the `Rows` of this + /// cursor. Freed on drop + #[allow(dead_code)] + reservation: MemoryReservation, } -impl Cursor for RowCursor { - #[inline] - fn is_finished(&self) -> bool { - self.num_rows == self.cur_row +impl RowValues { + /// Create a new [`RowValues`] from `rows` and a `reservation` + /// that tracks its memory. There must be at least one row + /// + /// Panics if the reservation is not for exactly `rows.size()` + /// bytes or if `rows` is empty. + pub fn new(rows: Rows, reservation: MemoryReservation) -> Self { + assert_eq!( + rows.size(), + reservation.size(), + "memory reservation mismatch" + ); + assert!(rows.num_rows() > 0); + Self { rows, reservation } } +} - #[inline] - fn advance(&mut self) -> usize { - let t = self.cur_row; - self.cur_row += 1; - t +impl CursorValues for RowValues { + fn len(&self) -> usize { + self.rows.num_rows() } -} -/// An [`Array`] that can be converted into [`FieldValues`] -pub trait FieldArray: Array + 'static { - type Values: FieldValues; + fn eq(l: &Self, l_idx: usize, r: &Self, r_idx: usize) -> bool { + l.rows.row(l_idx) == r.rows.row(r_idx) + } - fn values(&self) -> Self::Values; + fn compare(l: &Self, l_idx: usize, r: &Self, r_idx: usize) -> Ordering { + l.rows.row(l_idx).cmp(&r.rows.row(r_idx)) + } } -/// A comparable set of non-nullable values -pub trait FieldValues { - type Value: ?Sized; +/// An [`Array`] that can be converted into [`CursorValues`] +pub trait CursorArray: Array + 'static { + type Values: CursorValues; - fn len(&self) -> usize; - - fn compare(a: &Self::Value, b: &Self::Value) -> Ordering; - - fn value(&self, idx: usize) -> &Self::Value; + fn values(&self) -> Self::Values; } -impl FieldArray for PrimitiveArray { +impl CursorArray for PrimitiveArray { type Values = PrimitiveValues; fn values(&self) -> Self::Values { @@ -149,74 +179,80 @@ impl FieldArray for PrimitiveArray { #[derive(Debug)] pub struct PrimitiveValues(ScalarBuffer); -impl FieldValues for PrimitiveValues { - type Value = T; - +impl CursorValues for PrimitiveValues { fn len(&self) -> usize { self.0.len() } - #[inline] - fn compare(a: &Self::Value, b: &Self::Value) -> Ordering { - T::compare(*a, *b) + fn eq(l: &Self, l_idx: usize, r: &Self, r_idx: usize) -> bool { + l.0[l_idx].is_eq(r.0[r_idx]) } - #[inline] - fn value(&self, idx: usize) -> &Self::Value { - &self.0[idx] + fn compare(l: &Self, l_idx: usize, r: &Self, r_idx: usize) -> Ordering { + l.0[l_idx].compare(r.0[r_idx]) } } -impl FieldArray for GenericByteArray { - type Values = Self; +pub struct ByteArrayValues { + offsets: OffsetBuffer, + values: Buffer, +} - fn values(&self) -> Self::Values { - // Once https://github.com/apache/arrow-rs/pull/4048 is released - // Could potentially destructure array into buffers to reduce codegen, - // in a similar vein to what is done for PrimitiveArray - self.clone() +impl ByteArrayValues { + fn value(&self, idx: usize) -> &[u8] { + assert!(idx < self.len()); + // Safety: offsets are valid and checked bounds above + unsafe { + let start = self.offsets.get_unchecked(idx).as_usize(); + let end = self.offsets.get_unchecked(idx + 1).as_usize(); + self.values.get_unchecked(start..end) + } } } -impl FieldValues for GenericByteArray { - type Value = T::Native; - +impl CursorValues for ByteArrayValues { fn len(&self) -> usize { - Array::len(self) + self.offsets.len() - 1 } - #[inline] - fn compare(a: &Self::Value, b: &Self::Value) -> Ordering { - let a: &[u8] = a.as_ref(); - let b: &[u8] = b.as_ref(); - a.cmp(b) + fn eq(l: &Self, l_idx: usize, r: &Self, r_idx: usize) -> bool { + l.value(l_idx) == r.value(r_idx) } - #[inline] - fn value(&self, idx: usize) -> &Self::Value { - self.value(idx) + fn compare(l: &Self, l_idx: usize, r: &Self, r_idx: usize) -> Ordering { + l.value(l_idx).cmp(r.value(r_idx)) + } +} + +impl CursorArray for GenericByteArray { + type Values = ByteArrayValues; + + fn values(&self) -> Self::Values { + ByteArrayValues { + offsets: self.offsets().clone(), + values: self.values().clone(), + } } } -/// A cursor over sorted, nullable [`FieldValues`] +/// A collection of sorted, nullable [`CursorValues`] /// /// Note: comparing cursors with different `SortOptions` will yield an arbitrary ordering #[derive(Debug)] -pub struct FieldCursor { +pub struct ArrayValues { values: T, - offset: usize, // If nulls first, the first non-null index // Otherwise, the first null index null_threshold: usize, options: SortOptions, } -impl FieldCursor { - /// Create a new [`FieldCursor`] from the provided `values` sorted according +impl ArrayValues { + /// Create a new [`ArrayValues`] from the provided `values` sorted according /// to `options`. /// /// Panics if the array is empty - pub fn new>(options: SortOptions, array: &A) -> Self { + pub fn new>(options: SortOptions, array: &A) -> Self { assert!(array.len() > 0, "Empty array passed to FieldCursor"); let null_threshold = match options.nulls_first { true => array.null_count(), @@ -225,67 +261,48 @@ impl FieldCursor { Self { values: array.values(), - offset: 0, null_threshold, options, } } - fn is_null(&self) -> bool { - (self.offset < self.null_threshold) == self.options.nulls_first + fn is_null(&self, idx: usize) -> bool { + (idx < self.null_threshold) == self.options.nulls_first } } -impl PartialEq for FieldCursor { - fn eq(&self, other: &Self) -> bool { - self.cmp(other).is_eq() +impl CursorValues for ArrayValues { + fn len(&self) -> usize { + self.values.len() } -} -impl Eq for FieldCursor {} -impl PartialOrd for FieldCursor { - fn partial_cmp(&self, other: &Self) -> Option { - Some(self.cmp(other)) + fn eq(l: &Self, l_idx: usize, r: &Self, r_idx: usize) -> bool { + match (l.is_null(l_idx), r.is_null(r_idx)) { + (true, true) => true, + (false, false) => T::eq(&l.values, l_idx, &r.values, r_idx), + _ => false, + } } -} -impl Ord for FieldCursor { - fn cmp(&self, other: &Self) -> Ordering { - match (self.is_null(), other.is_null()) { + fn compare(l: &Self, l_idx: usize, r: &Self, r_idx: usize) -> Ordering { + match (l.is_null(l_idx), r.is_null(r_idx)) { (true, true) => Ordering::Equal, - (true, false) => match self.options.nulls_first { + (true, false) => match l.options.nulls_first { true => Ordering::Less, false => Ordering::Greater, }, - (false, true) => match self.options.nulls_first { + (false, true) => match l.options.nulls_first { true => Ordering::Greater, false => Ordering::Less, }, - (false, false) => { - let s_v = self.values.value(self.offset); - let o_v = other.values.value(other.offset); - - match self.options.descending { - true => T::compare(o_v, s_v), - false => T::compare(s_v, o_v), - } - } + (false, false) => match l.options.descending { + true => T::compare(&r.values, r_idx, &l.values, l_idx), + false => T::compare(&l.values, l_idx, &r.values, r_idx), + }, } } } -impl Cursor for FieldCursor { - fn is_finished(&self) -> bool { - self.offset == self.values.len() - } - - fn advance(&mut self) -> usize { - let t = self.offset; - self.offset += 1; - t - } -} - #[cfg(test)] mod tests { use super::*; @@ -294,18 +311,19 @@ mod tests { options: SortOptions, values: ScalarBuffer, null_count: usize, - ) -> FieldCursor> { + ) -> Cursor>> { let null_threshold = match options.nulls_first { true => null_count, false => values.len() - null_count, }; - FieldCursor { - offset: 0, + let values = ArrayValues { values: PrimitiveValues(values), null_threshold, options, - } + }; + + Cursor::new(values) } #[test] diff --git a/datafusion/physical-plan/src/sorts/merge.rs b/datafusion/physical-plan/src/sorts/merge.rs index e60baf2cd806..422ff3aebdb3 100644 --- a/datafusion/physical-plan/src/sorts/merge.rs +++ b/datafusion/physical-plan/src/sorts/merge.rs @@ -20,7 +20,7 @@ use crate::metrics::BaselineMetrics; use crate::sorts::builder::BatchBuilder; -use crate::sorts::cursor::Cursor; +use crate::sorts::cursor::{Cursor, CursorValues}; use crate::sorts::stream::PartitionedStream; use crate::RecordBatchStream; use arrow::datatypes::SchemaRef; @@ -35,7 +35,7 @@ use std::task::{ready, Context, Poll}; type CursorStream = Box>>; #[derive(Debug)] -pub(crate) struct SortPreservingMergeStream { +pub(crate) struct SortPreservingMergeStream { in_progress: BatchBuilder, /// The sorted input streams to merge together @@ -88,8 +88,8 @@ pub(crate) struct SortPreservingMergeStream { /// target batch size batch_size: usize, - /// Vector that holds cursors for each non-exhausted input partition - cursors: Vec>, + /// Cursors for each input partition. `None` means the input is exhausted + cursors: Vec>>, /// Optional number of rows to fetch fetch: Option, @@ -98,7 +98,7 @@ pub(crate) struct SortPreservingMergeStream { produced: usize, } -impl SortPreservingMergeStream { +impl SortPreservingMergeStream { pub(crate) fn new( streams: CursorStream, schema: SchemaRef, @@ -140,7 +140,7 @@ impl SortPreservingMergeStream { None => Poll::Ready(Ok(())), Some(Err(e)) => Poll::Ready(Err(e)), Some(Ok((cursor, batch))) => { - self.cursors[idx] = Some(cursor); + self.cursors[idx] = Some(Cursor::new(cursor)); Poll::Ready(self.in_progress.push_batch(idx, batch)) } } @@ -310,7 +310,7 @@ impl SortPreservingMergeStream { } } -impl Stream for SortPreservingMergeStream { +impl Stream for SortPreservingMergeStream { type Item = Result; fn poll_next( @@ -322,7 +322,7 @@ impl Stream for SortPreservingMergeStream { } } -impl RecordBatchStream for SortPreservingMergeStream { +impl RecordBatchStream for SortPreservingMergeStream { fn schema(&self) -> SchemaRef { self.in_progress.schema().clone() } diff --git a/datafusion/physical-plan/src/sorts/stream.rs b/datafusion/physical-plan/src/sorts/stream.rs index a7f9e7380c47..4cabdc6e178c 100644 --- a/datafusion/physical-plan/src/sorts/stream.rs +++ b/datafusion/physical-plan/src/sorts/stream.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -use crate::sorts::cursor::{FieldArray, FieldCursor, RowCursor}; +use crate::sorts::cursor::{ArrayValues, CursorArray, RowValues}; use crate::SendableRecordBatchStream; use crate::{PhysicalExpr, PhysicalSortExpr}; use arrow::array::Array; @@ -76,7 +76,7 @@ impl FusedStreams { } /// A [`PartitionedStream`] that wraps a set of [`SendableRecordBatchStream`] -/// and computes [`RowCursor`] based on the provided [`PhysicalSortExpr`] +/// and computes [`RowValues`] based on the provided [`PhysicalSortExpr`] #[derive(Debug)] pub struct RowCursorStream { /// Converter to convert output of physical expressions @@ -114,7 +114,7 @@ impl RowCursorStream { }) } - fn convert_batch(&mut self, batch: &RecordBatch) -> Result { + fn convert_batch(&mut self, batch: &RecordBatch) -> Result { let cols = self .column_expressions .iter() @@ -127,12 +127,12 @@ impl RowCursorStream { // track the memory in the newly created Rows. let mut rows_reservation = self.reservation.new_empty(); rows_reservation.try_grow(rows.size())?; - Ok(RowCursor::new(rows, rows_reservation)) + Ok(RowValues::new(rows, rows_reservation)) } } impl PartitionedStream for RowCursorStream { - type Output = Result<(RowCursor, RecordBatch)>; + type Output = Result<(RowValues, RecordBatch)>; fn partitions(&self) -> usize { self.streams.0.len() @@ -153,7 +153,7 @@ impl PartitionedStream for RowCursorStream { } /// Specialized stream for sorts on single primitive columns -pub struct FieldCursorStream { +pub struct FieldCursorStream { /// The physical expressions to sort by sort: PhysicalSortExpr, /// Input streams @@ -161,7 +161,7 @@ pub struct FieldCursorStream { phantom: PhantomData T>, } -impl std::fmt::Debug for FieldCursorStream { +impl std::fmt::Debug for FieldCursorStream { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("PrimitiveCursorStream") .field("num_streams", &self.streams) @@ -169,7 +169,7 @@ impl std::fmt::Debug for FieldCursorStream { } } -impl FieldCursorStream { +impl FieldCursorStream { pub fn new(sort: PhysicalSortExpr, streams: Vec) -> Self { let streams = streams.into_iter().map(|s| s.fuse()).collect(); Self { @@ -179,16 +179,16 @@ impl FieldCursorStream { } } - fn convert_batch(&mut self, batch: &RecordBatch) -> Result> { + fn convert_batch(&mut self, batch: &RecordBatch) -> Result> { let value = self.sort.expr.evaluate(batch)?; let array = value.into_array(batch.num_rows()); let array = array.as_any().downcast_ref::().expect("field values"); - Ok(FieldCursor::new(self.sort.options, array)) + Ok(ArrayValues::new(self.sort.options, array)) } } -impl PartitionedStream for FieldCursorStream { - type Output = Result<(FieldCursor, RecordBatch)>; +impl PartitionedStream for FieldCursorStream { + type Output = Result<(ArrayValues, RecordBatch)>; fn partitions(&self) -> usize { self.streams.0.len() diff --git a/datafusion/sql/src/utils.rs b/datafusion/sql/src/utils.rs index 28eaf241fa6f..616a2fc74932 100644 --- a/datafusion/sql/src/utils.rs +++ b/datafusion/sql/src/utils.rs @@ -17,7 +17,9 @@ //! SQL Utility Functions -use arrow_schema::{DataType, DECIMAL128_MAX_PRECISION, DECIMAL_DEFAULT_SCALE}; +use arrow_schema::{ + DataType, DECIMAL128_MAX_PRECISION, DECIMAL256_MAX_PRECISION, DECIMAL_DEFAULT_SCALE, +}; use datafusion_common::tree_node::{Transformed, TreeNode}; use sqlparser::ast::Ident; @@ -221,14 +223,17 @@ pub(crate) fn make_decimal_type( (None, None) => (DECIMAL128_MAX_PRECISION, DECIMAL_DEFAULT_SCALE), }; - // Arrow decimal is i128 meaning 38 maximum decimal digits if precision == 0 - || precision > DECIMAL128_MAX_PRECISION + || precision > DECIMAL256_MAX_PRECISION || scale.unsigned_abs() > precision { plan_err!( - "Decimal(precision = {precision}, scale = {scale}) should satisfy `0 < precision <= 38`, and `scale <= precision`." + "Decimal(precision = {precision}, scale = {scale}) should satisfy `0 < precision <= 76`, and `scale <= precision`." ) + } else if precision > DECIMAL128_MAX_PRECISION + && precision <= DECIMAL256_MAX_PRECISION + { + Ok(DataType::Decimal256(precision, scale)) } else { Ok(DataType::Decimal128(precision, scale)) } diff --git a/datafusion/sql/tests/sql_integration.rs b/datafusion/sql/tests/sql_integration.rs index 653d2ec52d92..2446ee0a5841 100644 --- a/datafusion/sql/tests/sql_integration.rs +++ b/datafusion/sql/tests/sql_integration.rs @@ -201,7 +201,7 @@ fn cast_to_invalid_decimal_type_precision_0() { let sql = "SELECT CAST(10 AS DECIMAL(0))"; let err = logical_plan(sql).expect_err("query should have failed"); assert_eq!( - "Error during planning: Decimal(precision = 0, scale = 0) should satisfy `0 < precision <= 38`, and `scale <= precision`.", + "Error during planning: Decimal(precision = 0, scale = 0) should satisfy `0 < precision <= 76`, and `scale <= precision`.", err.strip_backtrace() ); } @@ -212,9 +212,19 @@ fn cast_to_invalid_decimal_type_precision_gt_38() { // precision > 38 { let sql = "SELECT CAST(10 AS DECIMAL(39))"; + let plan = "Projection: CAST(Int64(10) AS Decimal256(39, 0))\n EmptyRelation"; + quick_test(sql, plan); + } +} + +#[test] +fn cast_to_invalid_decimal_type_precision_gt_76() { + // precision > 76 + { + let sql = "SELECT CAST(10 AS DECIMAL(79))"; let err = logical_plan(sql).expect_err("query should have failed"); assert_eq!( - "Error during planning: Decimal(precision = 39, scale = 0) should satisfy `0 < precision <= 38`, and `scale <= precision`.", + "Error during planning: Decimal(precision = 79, scale = 0) should satisfy `0 < precision <= 76`, and `scale <= precision`.", err.strip_backtrace() ); } @@ -227,7 +237,7 @@ fn cast_to_invalid_decimal_type_precision_lt_scale() { let sql = "SELECT CAST(10 AS DECIMAL(5, 10))"; let err = logical_plan(sql).expect_err("query should have failed"); assert_eq!( - "Error during planning: Decimal(precision = 5, scale = 10) should satisfy `0 < precision <= 38`, and `scale <= precision`.", + "Error during planning: Decimal(precision = 5, scale = 10) should satisfy `0 < precision <= 76`, and `scale <= precision`.", err.strip_backtrace() ); } diff --git a/datafusion/sqllogictest/test_files/decimal.slt b/datafusion/sqllogictest/test_files/decimal.slt index 570116b7a2a2..87a846c07727 100644 --- a/datafusion/sqllogictest/test_files/decimal.slt +++ b/datafusion/sqllogictest/test_files/decimal.slt @@ -629,3 +629,77 @@ select AVG(column1) from t; statement ok drop table t; + +statement ok +CREATE EXTERNAL TABLE decimal256_simple ( +c1 DECIMAL(50,6) NOT NULL, +c2 DOUBLE NOT NULL, +c3 BIGINT NOT NULL, +c4 BOOLEAN NOT NULL, +c5 DECIMAL(52,7) NOT NULL +) +STORED AS CSV +WITH HEADER ROW +LOCATION '../core/tests/data/decimal_data.csv'; + +query TT +select arrow_typeof(c1), arrow_typeof(c5) from decimal256_simple limit 1; +---- +Decimal256(50, 6) Decimal256(52, 7) + +query R rowsort +SELECT c1 from decimal256_simple; +---- +0.00001 +0.00002 +0.00002 +0.00003 +0.00003 +0.00003 +0.00004 +0.00004 +0.00004 +0.00004 +0.00005 +0.00005 +0.00005 +0.00005 +0.00005 + +query R rowsort +select c1 from decimal256_simple where c1 > 0.000030; +---- +0.00004 +0.00004 +0.00004 +0.00004 +0.00005 +0.00005 +0.00005 +0.00005 +0.00005 + +query RRIBR rowsort +select * from decimal256_simple where c1 > c5; +---- +0.00002 0.000000000002 3 false 0.000019 +0.00003 0.000000000003 5 true 0.000011 +0.00005 0.000000000005 8 false 0.000033 + +query TR +select arrow_typeof(avg(c1)), avg(c1) from decimal256_simple; +---- +Decimal256(54, 10) 0.0000366666 + +query TR +select arrow_typeof(min(c1)), min(c1) from decimal256_simple where c4=false; +---- +Decimal256(50, 6) 0.00002 + +query TR +select arrow_typeof(max(c1)), max(c1) from decimal256_simple where c4=false; +---- +Decimal256(50, 6) 0.00005 + +statement ok +drop table decimal256_simple;