Skip to content

Commit

Permalink
[SPARK-39839][SQL] Handle special case of null variable-length Decima…
Browse files Browse the repository at this point in the history
…l with non-zero offsetAndSize in UnsafeRow structural integrity check

### What changes were proposed in this pull request?

Update the `UnsafeRow` structural integrity check in `UnsafeRowUtils.validateStructuralIntegrity` to handle a special case with null variable-length DecimalType value.

### Why are the changes needed?

The check should follow the format that `UnsafeRowWriter` produces. In general, `UnsafeRowWriter` clears out a field with zero when the field is set to be null, c.f. `UnsafeRowWriter.setNullAt(ordinal)` and `UnsafeRow.setNullAt(ordinal)`.

But there's a special case for `DecimalType` values: this is the only type that is both:
- can be fixed-length or variable-length, depending on the precision, and
- is mutable in `UnsafeRow`.

To support a variable-length `DecimalType` to be mutable in `UnsafeRow`, the `UnsafeRowWriter` always leaves a 16-byte space in the variable-length section of the `UnsafeRow` (tail end of the row), regardless of whether the `Decimal` value being written is null or not. In the fixed-length part of the field, it would be an "OffsetAndSize", and the `offset` part always points to the start offset of the variable-length part of the field, while the `size` part will either be `0` for the null value, or `1` to at most `16` for non-null values.
When `setNullAt(ordinal)` is called instead of passing a null value to `write(int, Decimal, int, int)`, however, the `offset` part gets zero'd out and this field stops being mutable. There's a comment on `UnsafeRow.setDecimal` that mentions to keep this field able to support updates, `setNullAt(ordinal)` cannot be called, but there's no code enforcement of that.

So we need to recognize that in the structural integrity check and allow variable-length `DecimalType` to have non-zero field even for null.

Note that for non-null values, the existing check does conform to the format from `UnsafeRowWriter`. It's only null value of variable-length `DecimalType` that'd trigger a bug, which can affect Structured Streaming's checkpoint file read where this check is applied.

### Does this PR introduce _any_ user-facing change?

Yes, previously the `UnsafeRow` structural integrity validation will return false positive for correct data, when there's a null value in a variable-length `DecimalType` field. The fix will no longer return false positive.
Because the Structured Streaming checkpoint file validation uses this check, previously a good checkpoint file may be rejected by the check, and the only workaround is to disable the check; with the fix, the correct checkpoint file will be allowed to load.

### How was this patch tested?

Added new test case in `UnsafeRowUtilsSuite`

Closes #37252 from rednaxelafx/fix-unsaferow-validation.

Authored-by: Kris Mok <kris.mok@databricks.com>
Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
(cherry picked from commit c608ae2)
Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
  • Loading branch information
rednaxelafx authored and HeartSaVioR committed Jul 27, 2022
1 parent 9fdd097 commit ee8cafb
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,15 @@ object UnsafeRowUtils {
* - schema.fields.length == row.numFields should always be true
* - UnsafeRow.calculateBitSetWidthInBytes(row.numFields) < row.getSizeInBytes should always be
* true if the expectedSchema contains at least one field.
* - For variable-length fields: if null bit says it's null then don't do anything, else extract
* offset and size:
* - For variable-length fields:
* - if null bit says it's null, then
* - in general the offset-and-size should be zero
* - special case: variable-length DecimalType is considered mutable in UnsafeRow, and to
* support that, the offset is set to point to the variable-length part like a non-null
* value, while the size is set to zero to signal that it's a null value. The offset
* may also be set to zero, in which case this variable-length Decimal no longer supports
* being mutable in the UnsafeRow.
* - otherwise the field is not null, then extract offset and size:
* 1) 0 <= size < row.getSizeInBytes should always be true. We can be even more precise than
* this, where the upper bound of size can only be as big as the variable length part of
* the row.
Expand All @@ -52,9 +59,7 @@ object UnsafeRowUtils {
var varLenFieldsSizeInBytes = 0
expectedSchema.fields.zipWithIndex.foreach {
case (field, index) if !UnsafeRow.isFixedLength(field.dataType) && !row.isNullAt(index) =>
val offsetAndSize = row.getLong(index)
val offset = (offsetAndSize >> 32).toInt
val size = offsetAndSize.toInt
val (offset, size) = getOffsetAndSize(row, index)
if (size < 0 ||
offset < bitSetWidthInBytes + 8 * row.numFields || offset + size > rowSizeInBytes) {
return false
Expand All @@ -74,13 +79,38 @@ object UnsafeRowUtils {
if ((row.getLong(index) >> 32) != 0L) return false
case _ =>
}
case (_, index) if row.isNullAt(index) =>
if (row.getLong(index) != 0L) return false
case (field, index) if row.isNullAt(index) =>
field.dataType match {
case dt: DecimalType if !UnsafeRow.isFixedLength(dt) =>
// See special case in UnsafeRowWriter.write(int, Decimal, int, int) and
// UnsafeRow.setDecimal(int, Decimal, int).
// A variable-length Decimal may be marked as null while having non-zero offset and
// zero length. This allows the field to be updated (i.e. mutable variable-length data)

// Check the integrity of null value of variable-length DecimalType in UnsafeRow:
// 1. size must be zero
// 2. offset may be zero, in which case this variable-length field is no longer mutable
// 3. otherwise offset is non-zero, range check it the same way as a non-null value
val (offset, size) = getOffsetAndSize(row, index)
if (size != 0 || offset != 0 &&
(offset < bitSetWidthInBytes + 8 * row.numFields || offset > rowSizeInBytes)) {
return false
}
case _ =>
if (row.getLong(index) != 0L) return false
}
case _ =>
}
if (bitSetWidthInBytes + 8 * row.numFields + varLenFieldsSizeInBytes > rowSizeInBytes) {
return false
}
true
}

def getOffsetAndSize(row: UnsafeRow, index: Int): (Int, Int) = {
val offsetAndSize = row.getLong(index)
val offset = (offsetAndSize >> 32).toInt
val size = offsetAndSize.toInt
(offset, size)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@

package org.apache.spark.sql.catalyst.util

import java.math.{BigDecimal => JavaBigDecimal}

import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.catalyst.expressions.{SpecificInternalRow, UnsafeProjection, UnsafeRow}
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.apache.spark.sql.types.{Decimal, DecimalType, IntegerType, StringType, StructField, StructType}

class UnsafeRowUtilsSuite extends SparkFunSuite {

Expand Down Expand Up @@ -52,4 +54,31 @@ class UnsafeRowUtilsSuite extends SparkFunSuite {
StructField("value2", IntegerType, false)))
assert(!UnsafeRowUtils.validateStructuralIntegrity(testRow, invalidSchema))
}

test("Handle special case for null variable-length Decimal") {
val schema = StructType(StructField("d", DecimalType(19, 0), nullable = true) :: Nil)
val unsafeRowProjection = UnsafeProjection.create(schema)
val row = unsafeRowProjection(new SpecificInternalRow(schema))

// row is empty at this point
assert(row.isNullAt(0) && UnsafeRowUtils.getOffsetAndSize(row, 0) == (16, 0))
assert(UnsafeRowUtils.validateStructuralIntegrity(row, schema))

// set Decimal field to precision-overflowed value
val bigDecimalVal = Decimal(new JavaBigDecimal("12345678901234567890")) // precision=20, scale=0
row.setDecimal(0, bigDecimalVal, 19) // should overflow and become null
assert(row.isNullAt(0) && UnsafeRowUtils.getOffsetAndSize(row, 0) == (16, 0))
assert(UnsafeRowUtils.validateStructuralIntegrity(row, schema))

// set Decimal field to valid non-null value
val bigDecimalVal2 = Decimal(new JavaBigDecimal("1234567890123456789")) // precision=19, scale=0
row.setDecimal(0, bigDecimalVal2, 19) // should succeed
assert(!row.isNullAt(0) && UnsafeRowUtils.getOffsetAndSize(row, 0) == (16, 8))
assert(UnsafeRowUtils.validateStructuralIntegrity(row, schema))

// set Decimal field to null explicitly, after which this field no longer supports updating
row.setNullAt(0)
assert(row.isNullAt(0) && UnsafeRowUtils.getOffsetAndSize(row, 0) == (0, 0))
assert(UnsafeRowUtils.validateStructuralIntegrity(row, schema))
}
}

0 comments on commit ee8cafb

Please sign in to comment.