Skip to content

Commit

Permalink
[SPARK-38042][SQL] Ensure that ScalaReflection.dataTypeFor works on a…
Browse files Browse the repository at this point in the history
…liased array types

An aliased array type in a product, in a Dataset or Dataframe, causes an exception:

```
type Data = Array[Long]
val xs:List[(Data,Int)] = List((Array(1),1), (Array(2),2))
sc.parallelize(xs).toDF("a", "b")
```

Causing

```
scala.MatchError: Data (of class scala.reflect.internal.Types$AliasNoArgsTypeRef)
 at org.apache.spark.sql.catalyst.ScalaReflection$.$anonfun$dataTypeFor$1(ScalaReflection.scala:104)
 at scala.reflect.internal.tpe.TypeConstraints$UndoLog.undo(TypeConstraints.scala:69)
 at org.apache.spark.sql.catalyst.ScalaReflection.cleanUpReflectionObjects(ScalaReflection.scala:904)
 at org.apache.spark.sql.catalyst.ScalaReflection.cleanUpReflectionObjects$(ScalaReflection.scala:903)
 at org.apache.spark.sql.catalyst.ScalaReflection$.cleanUpReflectionObjects(ScalaReflection.scala:49)
 at org.apache.spark.sql.catalyst.ScalaReflection$.dataTypeFor(ScalaReflection.scala:88)
 at org.apache.spark.sql.catalyst.ScalaReflection$.$anonfun$serializerFor$6(ScalaReflection.scala:573)
 at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
 at scala.collection.immutable.List.foreach(List.scala:392)
 at scala.collection.TraversableLike.map(TraversableLike.scala:238)
 at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
 at scala.collection.immutable.List.map(List.scala:298)
 at org.apache.spark.sql.catalyst.ScalaReflection$.$anonfun$serializerFor$1(ScalaReflection.scala:562)
 at scala.reflect.internal.tpe.TypeConstraints$UndoLog.undo(TypeConstraints.scala:69)
 at org.apache.spark.sql.catalyst.ScalaReflection.cleanUpReflectionObjects(ScalaReflection.scala:904)
 at org.apache.spark.sql.catalyst.ScalaReflection.cleanUpReflectionObjects$(ScalaReflection.scala:903)
 at org.apache.spark.sql.catalyst.ScalaReflection$.cleanUpReflectionObjects(ScalaReflection.scala:49)
 at org.apache.spark.sql.catalyst.ScalaReflection$.serializerFor(ScalaReflection.scala:432)
 at org.apache.spark.sql.catalyst.ScalaReflection$.$anonfun$serializerForType$1(ScalaReflection.scala:421)
 at scala.reflect.internal.tpe.TypeConstraints$UndoLog.undo(TypeConstraints.scala:69)
 at org.apache.spark.sql.catalyst.ScalaReflection.cleanUpReflectionObjects(ScalaReflection.scala:904)
 at org.apache.spark.sql.catalyst.ScalaReflection.cleanUpReflectionObjects$(ScalaReflection.scala:903)
 at org.apache.spark.sql.catalyst.ScalaReflection$.cleanUpReflectionObjects(ScalaReflection.scala:49)
 at org.apache.spark.sql.catalyst.ScalaReflection$.serializerForType(ScalaReflection.scala:413)
 at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$.apply(ExpressionEncoder.scala:55)
 at org.apache.spark.sql.Encoders$.product(Encoders.scala:285)
 at org.apache.spark.sql.LowPrioritySQLImplicits.newProductEncoder(SQLImplicits.scala:251)
 at org.apache.spark.sql.LowPrioritySQLImplicits.newProductEncoder$(SQLImplicits.scala:251)
 at org.apache.spark.sql.SQLImplicits.newProductEncoder(SQLImplicits.scala:32)
 ... 48 elided
```

It seems that this can be fixed by changing, in ScalaReflection.dataTypeFor:

```
val TypeRef(_, _, Seq(elementType)) = tpe
```

to

```
val TypeRef(_, _, Seq(elementType)) = tpe.dealias
```

### Why are the changes needed?

Without this change, any attempt to create datasets or dataframes using such types throws the exception above.

### Does this PR introduce _any_ user-facing change?

No, except for preventing this exception from being thrown.

### How was this patch tested?

Added a test to DatasetSuite

Closes #35370 from jtnystrom/spark-38042.

Lead-authored-by: Johan Nystrom <johan@monomorphic.org>
Co-authored-by: Johan Nystrom-Persson <johan@jnpersson.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
  • Loading branch information
2 people authored and cloud-fan committed Feb 28, 2022
1 parent c7e363f commit 89799b8
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ object ScalaReflection extends ScalaReflection {
val className = getClassNameFromType(tpe)
className match {
case "scala.Array" =>
val TypeRef(_, _, Seq(elementType)) = tpe
val TypeRef(_, _, Seq(elementType)) = tpe.dealias
arrayClassFor(elementType)
case other =>
val clazz = getClassFromType(tpe)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,12 @@ object TestForTypeAlias {
type TwoInt = (Int, Int)
type ThreeInt = (TwoInt, Int)
type SeqOfTwoInt = Seq[TwoInt]
type IntArray = Array[Int]

def tupleTypeAlias: TwoInt = (1, 1)
def nestedTupleTypeAlias: ThreeInt = ((1, 1), 2)
def seqOfTupleTypeAlias: SeqOfTwoInt = Seq((1, 1), (2, 2))
def aliasedArrayInTuple: (Int, IntArray) = (1, Array(1))
}

class DatasetSuite extends QueryTest
Expand Down Expand Up @@ -1647,6 +1649,12 @@ class DatasetSuite extends QueryTest
("", Seq((1, 1), (2, 2))))
}

test("SPARK-38042: Dataset should work with a product containing an aliased array type") {
checkDataset(
Seq(1).toDS().map(_ => ("", TestForTypeAlias.aliasedArrayInTuple)),
("", (1, Array(1))))
}

test("Check RelationalGroupedDataset toString: Single data") {
val kvDataset = (1 to 3).toDF("id").groupBy("id")
val expected = "RelationalGroupedDataset: [" +
Expand Down

0 comments on commit 89799b8

Please sign in to comment.