From a66e63d49a12a1370f004e99cad1dca66e9738aa Mon Sep 17 00:00:00 2001 From: haojinIntel Date: Mon, 10 Jan 2022 13:25:14 +0800 Subject: [PATCH 1/3] Fix the bug for positive decimal data --- .../cpp/src/operators/row_to_columnar_converter.cc | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/native-sql-engine/cpp/src/operators/row_to_columnar_converter.cc b/native-sql-engine/cpp/src/operators/row_to_columnar_converter.cc index 18928a481..601e52aaa 100644 --- a/native-sql-engine/cpp/src/operators/row_to_columnar_converter.cc +++ b/native-sql-engine/cpp/src/operators/row_to_columnar_converter.cc @@ -393,6 +393,11 @@ arrow::Status CreateArrayData(std::shared_ptr schema, int64_t num for (int k = length - 1; k >= 0; k--) { bytesValue2[length - 1 - k] = bytesValue[k]; } + if (int8_t(bytesValue[0]) < 0){ + for (int k = length; k < 16; k++) { + bytesValue2[k] = 255; + } + } arrow::Decimal128 value = arrow::Decimal128(arrow::BasicDecimal128(bytesValue2)); array_data[position] = value; @@ -950,6 +955,11 @@ arrow::Status CreateArrayData(std::shared_ptr schema, int64_t num for (int k = elementLength - 1; k >= 0; k--) { bytesValue2[elementLength - 1 - k] = bytesValue[k]; } + if (int8_t(bytesValue[0]) < 0){ + for (int k = elementLength; k < 16; k++) { + bytesValue2[k] = 255; + } + } arrow::Decimal128 value = arrow::Decimal128(arrow::BasicDecimal128(bytesValue2)); RETURN_NOT_OK(child_builder.Append(value)); From 02c1ba724b5c82c39f11c1d10f203d1bcef5fc67 Mon Sep 17 00:00:00 2001 From: haojinIntel Date: Mon, 10 Jan 2022 13:31:19 +0800 Subject: [PATCH 2/3] Fix format --- .../cpp/src/operators/row_to_columnar_converter.cc | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/native-sql-engine/cpp/src/operators/row_to_columnar_converter.cc b/native-sql-engine/cpp/src/operators/row_to_columnar_converter.cc index 601e52aaa..2cbcd618a 100644 --- a/native-sql-engine/cpp/src/operators/row_to_columnar_converter.cc +++ b/native-sql-engine/cpp/src/operators/row_to_columnar_converter.cc @@ -393,7 +393,7 @@ arrow::Status CreateArrayData(std::shared_ptr schema, int64_t num for (int k = length - 1; k >= 0; k--) { bytesValue2[length - 1 - k] = bytesValue[k]; } - if (int8_t(bytesValue[0]) < 0){ + if (int8_t(bytesValue[0]) < 0) { for (int k = length; k < 16; k++) { bytesValue2[k] = 255; } @@ -955,7 +955,7 @@ arrow::Status CreateArrayData(std::shared_ptr schema, int64_t num for (int k = elementLength - 1; k >= 0; k--) { bytesValue2[elementLength - 1 - k] = bytesValue[k]; } - if (int8_t(bytesValue[0]) < 0){ + if (int8_t(bytesValue[0]) < 0) { for (int k = elementLength; k < 16; k++) { bytesValue2[k] = 255; } From 0386e0292789361e27fe95cd40482d38e40fb0e4 Mon Sep 17 00:00:00 2001 From: haojinIntel Date: Mon, 10 Jan 2022 14:02:52 +0800 Subject: [PATCH 3/3] Fix the unit test for 'SPARK-22348: table cache should do partition batch pruning (whole-stage-codegen on)' --- .../sql/execution/columnar/InMemoryColumnarQuerySuite.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/native-sql-engine/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala b/native-sql-engine/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala index 4d753e94c..2ae01caf2 100644 --- a/native-sql-engine/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala +++ b/native-sql-engine/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala @@ -20,8 +20,8 @@ package org.apache.spark.sql.execution.columnar import java.nio.charset.StandardCharsets import java.sql.{Date, Timestamp} -import com.intel.oap.execution.ColumnarConditionProjectExec -import com.intel.oap.sql.execution.RowToArrowColumnarExec +import com.intel.oap.execution.{ArrowRowToColumnarExec, ColumnarConditionProjectExec} + import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, QueryTest, Row} import org.apache.spark.sql.catalyst.InternalRow @@ -506,7 +506,7 @@ class InMemoryColumnarQuerySuite extends QueryTest with SharedSparkSession { val df2 = df1.where("y = 3") val planBeforeFilter = df2.queryExecution.executedPlan.collect { - case ColumnarConditionProjectExec(_, _, c: RowToArrowColumnarExec) => c.child + case ColumnarConditionProjectExec(_, _, c: ArrowRowToColumnarExec) => c.child case FilterExec(_, c: ColumnarToRowExec) => c.child case WholeStageCodegenExec(FilterExec(_, ColumnarToRowExec(i: InputAdapter))) => i.child }