Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] More detailed logs to show which parquet file and which data type has mismatch. #5200

Closed
viadea opened this issue Apr 11, 2022 · 2 comments · Fixed by #5434
Closed
Assignees
Labels
bug Something isn't working P0 Must have for release

Comments

@viadea
Copy link
Collaborator

viadea commented Apr 11, 2022

If we query a Hive Partition table whose DDL is using Bigint, however if the underline parquet data is using int, it will fail due to column type mismatch.

GPU Spark errors:

ai.rapids.cudf.CudfException: cuDF failure at: ../src/join/hash_join.cu:391: Mismatch in joining column data types

CPU Spark errors:

Caused by: org.apache.spark.sql.execution.QueryExecutionException: Parquet column cannot be converted in file file:///home/xxx/data/hive/testbigint/2022/part-00010-3d360f88-2fd7-4a22-b72e-ac055bb2c955-c000.snappy.parquet. Column: [b], Expected: bigint, Found: INT32

How to repro:

  1. Hive CLI:
drop table testbigint;
create external table testbigint(a string, b bigint) PARTITIONED BY (k string)
STORED AS PARQUET LOCATION '/home/xxx/data/hive/testbigint';

create external table testbigint_dim(b string)
STORED AS PARQUET LOCATION '/home/xxx/data/hive/testbigint_dim';
  1. Spark shell:
import spark.implicits._
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row

val data = Seq(
    Row("Adam",1000),
    Row("Bob",2000),
    Row("Cathy",9999)
)

val schema = StructType( Array(
                 StructField("a", StringType,true),
                 StructField("b", IntegerType,true)
             ))

val df = spark.createDataFrame(spark.sparkContext.parallelize(data),schema)

df.write.format("parquet").mode("overwrite").save("/home/xxx/data/hive/testbigint/2022")


val data2 = Seq(
    Row("Adam",1000L),
    Row("Bob",2000L),
    Row("Cathy",9999L)
)

val schema2 = StructType( Array(
                 StructField("a", StringType,true),
                 StructField("b", LongType,true)
             ))
val df2 = spark.createDataFrame(spark.sparkContext.parallelize(data2),schema2)
df2.write.format("parquet").mode("overwrite").save("/home/xxx/data/hive/testbigint/2021")


val data3 = Seq(Row("1000"),Row("2000"))
val schema3 = StructType( Array(StructField("b", StringType,true)))
val df3 = spark.createDataFrame(spark.sparkContext.parallelize(data3),schema3)
df3.write.format("parquet").mode("overwrite").save("/home/xxx/data/hive/testbigint_dim/")
  1. Hive CLI
ALTER TABLE testbigint ADD PARTITION (k='2022') LOCATION '/home/xxx/data/hive/testbigint/2022';
ALTER TABLE testbigint ADD PARTITION (k='2021') LOCATION '/home/xxx/data/hive/testbigint/2021';
  1. Spark shell
spark.conf.set("spark.rapids.sql.enabled",true)
spark.sql("""select count(*) from testbigint f, testbigint_dim d where  f.b=d.b  """).show

spark.conf.set("spark.rapids.sql.enabled",false)
spark.sql("""select count(*) from testbigint f, testbigint_dim d where  f.b=d.b  """).show
@viadea viadea added feature request New feature or request ? - Needs Triage Need team to review and classify labels Apr 11, 2022
@sameerz sameerz added bug Something isn't working P0 Must have for release and removed feature request New feature or request ? - Needs Triage Need team to review and classify labels Apr 12, 2022
@jlowe
Copy link
Member

jlowe commented Apr 12, 2022

Marking this as a P1 because the RAPIDS Accelerator should have generated an error when trying to convert the types loaded from the Parquet file into the types expected by the specified Spark read schema in the query plan. We need to figure out why the type checks that occur when converting a cudf Table into a Spark ColumnarBatch did not catch this.

@wbo4958 wbo4958 assigned wbo4958 and unassigned wbo4958 Apr 24, 2022
@sperlingxx sperlingxx self-assigned this Apr 29, 2022
@sperlingxx
Copy link
Collaborator

sperlingxx commented Apr 29, 2022

I pick this issue since it appeals to me, while we are on holiday until May 5th. So, please take over it if it is urgent or someone is also interested in this issue.

@sameerz sameerz added this to the May 2 - May 20 milestone Apr 29, 2022
@sameerz sameerz changed the title [FEA] More detailed logs to show which parquet file and which data type has mismatch. [BUG] More detailed logs to show which parquet file and which data type has mismatch. May 10, 2022
sperlingxx added a commit that referenced this issue May 12, 2022
Fixes #5200 #5445

This PR is to add the schema check, referring the checking process of Spark. Converters downcasting INT32 to Byte/Short/Date are added in this PR as well. Note: This PR uses some deprecated API of parquet-mr, in order to accommodate Spark 3.1.

Signed-off-by: sperlingxx <lovedreamf@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working P0 Must have for release
Projects
None yet
Development

Successfully merging a pull request may close this issue.

5 participants