Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Initial work on supporting DecimalType #1063

Closed
wants to merge 13 commits into from

Conversation

sperlingxx
Copy link
Collaborator

@sperlingxx sperlingxx commented Nov 4, 2020

This PR is about to enable DecimalType in spark GPU runtime.

Due to a lot of unready dependencies in cuDF, the primary intention of this PR is to import DecimalType as a supported type under GPU runtime. Along with the support on supported type system, we enabled the creation of GpuColumnVector and GpuScalar with DecimalType.

In terms of tests, we test creating decimal as GpuScalar and GpuColumnVector, with some expressions on decimal-type column vectors (such as: GpuIsNull).

Here is the summary of progress:

  • Enable decimal type for valid RapidsMeta in GpuOverrides (according to latest supportedType mechanism).
  • support decimal for GpuColumnVector (with test cases in DecimalUnitTest.scala)
  • support decimal for GpuScalar (with test cases in DecimalUnitTest.scala)
  • decimal converters for GpuRowToColumnarExec (with test cases in GpuBatchUtilsSuite.scala)
  • decimal columnar copy for HostColumnarToGpu (with test cases in DecimalUnitTest.scala)
  • test cases for decimal in GpuCoalesceBatchesSuite (Some cases fails because contiguous split of decimal column has yet supported by cuDF. I've created a PR to solve this problem.)
  • Creating decimal column vector from scalar will be supported after this PR merged (which is close to).
  • Test GpuProject with decimal columns.
  • Add unit tests for binary operators with decimal data.

@@ -158,6 +158,15 @@ private static final DType toRapidsOrNull(DataType type) {
return DType.TIMESTAMP_MICROSECONDS;
} else if (type instanceof StringType) {
return DType.STRING;
} else if (type instanceof DecimalType) {
DecimalType decType = (DecimalType) type;
if (decType.precision() <= DType.DECIMAL32_MAX_PRECISION) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Spark is kind of split on how decimals are stored internally. For UnsafeRow only supports a long value or a binary representation for BigDecimal.

https://github.com/apache/spark/blob/0b557b329046c66ee67a8c94c5bb95ffbe50e135/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java#L382-L395

For the build in ColumnVector implementation they distinguish between int and long backed storage.

https://github.com/apache/spark/blob/0b557b329046c66ee67a8c94c5bb95ffbe50e135/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java#L361-L375

If we only support DECIMAL64 then it would potentially reduce complexity by a lot. But if we also support DECIMAL32 we could save on GPU memory.

The main reason I ask is because I just merged in some code that speeds up the transfer of fixed width data types. I have not tested it with decimal in any way, but there is code in there that makes some assumptions about what the size of the data types are and I think it is wrong right now. Should be simple to fix it, but I wanted to call this out and be sure we are all on the same page.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can start from only supporting DECIMAL64 for reducing complexity?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am fine either way, and it is not a big deal. I was just thinking out loud.

@@ -450,6 +450,7 @@ object GpuOverrides {
case DateType => true
case TimestampType => ZoneId.systemDefault().normalized() == GpuOverrides.UTC_TIMEZONE_ID
case StringType => true
case dt: DecimalType if dt.precision <= ai.rapids.cudf.DType.DECIMAL64_MAX_PRECISION => true
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have a list of all of the operators we currently support that also support Decimal in Spark? We have to be sure that if we are allowing decimal for all operators, like this is doing, that we have disallowed it everywhere that we do not have automated tests for.

The only tests I see in this patch are unit tests for Scalars.

Copy link
Collaborator Author

@sperlingxx sperlingxx Nov 5, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've rewritten isSupportedType part. In latest version, general method isSupportedType remain unchanged. And a decimal specialized method isSupportedTypeWithDecimal was added for GpuExpressions (operators) who support Deimcal. For now, only very limited ones are decimal-enabled (with test cases), such as: GpuLiteral, GpuAlias. But this collection will be expanded only if any decimal-related dependencies are ready.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I found it is not easy to add DecimalType into supported type while keeping compatible with existed RapidsMetas. I tried to solve this problem with two actions:

  1. Add method isSupportedTypeWithDecimal for RapidMeta who overrides the areAllSupportedTypes interface, such as: Literal, Cast.
  2. Refactor GpuOverrides.areAllSupportedTypes, which receives RapidsMeta itself to determine whether it is decimal supportable. This is for RapidsMeta without interface overriding of areAllSupportedTypes.

@@ -201,6 +210,10 @@ static final DataType getSparkType(DType type) {
return DataTypes.TimestampType;
case STRING:
return DataTypes.StringType;
case DECIMAL32:
return new DecimalType(DType.DECIMAL32_MAX_PRECISION, -type.getScale());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to track the effective precision of the value as it progresses through operations in order to be compatible with how Spark operates on Decimal? For example, take this simple Spark shell session:

scala> val x = Decimal(5.43)
x: org.apache.spark.sql.types.Decimal = 5.43

scala> (x.precision, x.scale)
res2: (Int, Int) = (3,2)

scala> val y = x * x
y: org.apache.spark.sql.types.Decimal = 29.4849

scala> (y.precision, y.scale)
res3: (Int, Int) = (6,4)

If we perform the same operation with DECIMAL32 we will lose track of the precision, and when we cast back to a SparkType instead of getting DecimalType(6,4) as we should, we'll get DecimalType(9,4) instead.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Spark should already be doing that for us, which is another reason why I don't think we want to rely on this API much longer. We are in the process of moving away from it for nested types and I think it would be best to just not support it for decimal at all.

Copy link
Collaborator Author

@sperlingxx sperlingxx Nov 6, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@revans2 @jlowe The problem is that GpuColumnVector.from(cudf.ColumnVector cudfCv) relies on the method getSparkType. If getSparkType doesn't support DecimalTypes, we won't be able to create decimal-typed GpuColumnVector from cudf.ColumnVector through the method GpuColumnVector.from(cudf.ColumnVector cudfCv). The method GpuColumnVector.from(cudf.ColumnVector cudfCv) is referred in so many places, despite deprecated.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am in the process of updating the plugin so GpuColumnVector.from(cudf.ColumnVector cudfCv) goes away and is replaced with GpuColumnVector.from(cudf.ColumnVector cudfCv, DataType), similar to what I have been doing for Table to ColumnarBatch conversion. I have a patch working, just need to finish testing it and a bit more cleanup. Because no one has taken a look at #1078 yet, I'll likely just update it instead of making a separate PR.

The goal eventually is to have getSparkType go away entirely because we cannot support it when there is not a 1:1 mapping between cudf types and spark types.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've refined this PR on the basis of the patch #1078. I appended decimal type check into typeConversionAllowed.

@sameerz sameerz added the feature request New feature or request label Nov 6, 2020
@sperlingxx sperlingxx changed the title initial work on supporting DecimalType [REVIEW] Initial work on supporting DecimalType Nov 6, 2020
@jlowe jlowe changed the title [REVIEW] Initial work on supporting DecimalType Initial work on supporting DecimalType Nov 6, 2020
withResource(GpuColumnVector.from(ColumnVector.decimalFromInts(0, 1),
DecimalType(DType.DECIMAL32_MAX_PRECISION + 1, 0))) { _ => }
}
// TODO: support fromScalar(cudf.ColumnVector cv, int rows) for fixed-point decimal in cuDF
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is going to be a big problem. We need this for a project and other operations to work.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will support this feature ASAP. And I really want to know what other missing essentials for this PR. Thanks!

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What we really have to have/test is

  1. Shuffle decimal types including Contiguous Split, UCX Headers Updated, JCudfSerialzation. I think all of these have been updated, but not sure if they have been tested yet.
  2. CoalesceBatch for decimal types. This includes Contiguous Split and Table.concat. Again I think these are in place, but it needs to be tested.
  3. ColumnarToRow and RowToColumnar. I think that is covered here for the most part, but it is not tested. I would like to have a follow on issue to try and "fix" some of the issues with the fast fixed width transfers, which I am happy to handle.
  4. fromScalar support. This is something that we are missing but I didn't realize how important it is until recently. We have a lot of Expressions that require this so we can take a constant value and turn it into a column, which happens regularly.

Copy link
Collaborator Author

@sperlingxx sperlingxx Nov 13, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @revans2 , thanks for outlining all essential work for basic decimal supporting. I am trying to address them these days, but I found it is not easy to accomplish all these targets at one time.

Here is the summary of current progress:

  • Enable decimal type for valid RapidsMeta in GpuOverrides (according to latest supportedType mechanism).
  • support decimal for GpuColumnVector (with test cases in DecimalUnitTest.scala)
  • support decimal for GpuScalar (with test cases in DecimalUnitTest.scala)
  • decimal converters for GpuRowToColumnarExec (with test cases in GpuBatchUtilsSuite.scala)
  • decimal columnar copy for HostColumnarToGpu (with test cases in DecimalUnitTest.scala)
  • test cases for decimal in GpuCoalesceBatchesSuite (Some cases fails because contiguous split of decimal column has yet supported by cuDF. I've created a PR to solve this problem.)
  • Creating decimal column vector from scalar will be supported after this PR merged (which is close to).

Could we get this PR merged at first, in order not to block other work relying on this (such as: binary operation support) ? Then, we address unsolved issues in other PRs. Perhaps we need to create an issue to summarize the progress of all these items?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am fine with breaking things up, but I really want to see something that is self-contained and tested.

If we drop support for decimal in literal values then we don't need fromScalar to work yet. But any code that we do put in I want to see at least minimal automated tests that it is working. I trust that you have run tests manually, but there is a lot that is changing right now with nested types etc, and I want to be sure that we have tests to verify that someone else didn't break something.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I am afraid that I don't get what the "minimal automated tests" should be, besides existed unit tests for decimal. Does it refer to tests with mixed data types, such as GpuSinglePartitioningSuite? If it does, I am working on them. I've just put a PR to enable creating decimal column with Table.TestBuilder .

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've just added tests for binaryOps and project.

@sperlingxx sperlingxx changed the title Initial work on supporting DecimalType [WIP] Initial work on supporting DecimalType Nov 10, 2020
@sperlingxx sperlingxx changed the title [WIP] Initial work on supporting DecimalType [REVIEW] Initial work on supporting DecimalType Nov 13, 2020
row: SpecializedGetters,
column: Int,
builder: ai.rapids.cudf.HostColumnVector.ColumnBuilder): Double = {
builder.append(row.getDecimal(column, precision, scale).toJavaBigDecimal)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would prefer to see us use toUnscaledLong to avoid any extra object creation, it is also what we are going to ultimately store the data as. This might mean that we need different classes for DECIMAL64 and DECIMAL32 too.

if (row.isNullAt(column)) {
builder.appendNull()
} else {
new NotNullDecimalConverter(precision, scale).append(row, column, builder)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is on the data path. I would like us to avoid object creation if at all possible to speed up the data path. Please make a static method instead, or use inheritance, which I think is less ideal.

builder: ai.rapids.cudf.HostColumnVector.ColumnBuilder): Double = {
builder.append(row.getDecimal(column, precision, scale).toJavaBigDecimal)
// Infer the storage type via precision, because we can't access DType of builder.
if (precision > ai.rapids.cudf.DType.DECIMAL32_MAX_PRECISION) 8 else 4
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here too it would be great to avoid conditionals in the data path. I would prefer it if we either passed in the size or had separate implementations for DECIMAL32 and DECIMAL64

// We assume that all common plans are decimal supportable by default, considering
// whether decimal allowable is mainly determined in expression-level.
override def isSupportedType(t: DataType): Boolean =
GpuOverrides.isSupportedType(t, allowDecimal = true)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have tests that verify that we can support decimal for all top level spark operations? Have we tested join, expand, generate, filter, project, union, window, sort, or hash agregate? What about all of the arrow python UDF code where we go to/from arrow?

I think it would be much better if we split this big PR up into smaller pieces and put each piece in separately with corresponding tests to show that it works, and we only add decimal to the allow list for those things that we know it works for because we have tested it. If you want me to help with this I am happy to do it. I am already in the middle of doing it for Lists I am going to add in structs, maps, binary, null type and finally calendar interval based off of how much time I have and priorities. Some of these we will only be able to do very basic things with, but that should be enough to unblock others for using them for more complicated processing.

@@ -144,6 +144,8 @@ object GpuDivModLike {
case DType.INT64 => Scalar.fromLong(0L)
case DType.FLOAT32 => Scalar.fromFloat(0f)
case DType.FLOAT64 => Scalar.fromDouble(0)
case dt if dt.isDecimalType && dt.isBackedByInt => Scalar.fromDecimal(0, 0)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure this is going to work in all cases. I think this might be another place were we have some tech debt to pay off and need to pass in a DataType instead of a DType.

@@ -144,6 +144,8 @@ object GpuDivModLike {
case DType.INT64 => Scalar.fromLong(0L)
case DType.FLOAT32 => Scalar.fromFloat(0f)
case DType.FLOAT64 => Scalar.fromDouble(0)
case dt if dt.isDecimalType && dt.isBackedByInt => Scalar.fromDecimal(0, 0)
case dt if dt.isDecimalType && dt.isBackedByLong => Scalar.fromDecimal(0, 0L)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the div/mod to work properly we also need to update isScalarZero, or we are going to miss a divide by zero case in decimal.

- GpuEqualNullSafe
- GpuDivModLike (GpuDivide/GpuIntegralDivide/GpuPmod/GpuRemainder)
*/
class DecimalBinaryOpSuite extends GpuExpressionTestSuite {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I personally would prefer to see us update the python tests. They tend to be more complete. These tests are not covering the case where you need to take a literal and turn it into a column because rapidsai/cudf#6723 has not been merged in yet.

This is another reason to break down this into smaller pieces. I cannot let us put in supporting for binary ops without the above PR being merged in and tested. If we break this apart we can get some of the functionality in, and then we can look at what we need to do to get the rest of it in.

@sperlingxx sperlingxx changed the title [REVIEW] Initial work on supporting DecimalType [WIP] Initial work on supporting DecimalType Nov 17, 2020
@sperlingxx sperlingxx closed this Nov 17, 2020
tgravescs pushed a commit to tgravescs/spark-rapids that referenced this pull request Nov 30, 2023
…p ci] [bot] (NVIDIA#1063)

* Update submodule cudf to 50718e673ff53b18706cf66c6e02cda8e30681fe

Signed-off-by: spark-rapids automation <70000568+nvauto@users.noreply.github.com>

* Update submodule cudf to ecadda5558ce29c28d487ce67fcd2fa7882d5db1

Signed-off-by: spark-rapids automation <70000568+nvauto@users.noreply.github.com>

---------

Signed-off-by: spark-rapids automation <70000568+nvauto@users.noreply.github.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
feature request New feature or request
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants