Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SPARK SQL will fail on AWS EMR with incompatible data types issues #730

Closed
gourav-sg opened this issue Sep 10, 2020 · 22 comments
Closed

SPARK SQL will fail on AWS EMR with incompatible data types issues #730

gourav-sg opened this issue Sep 10, 2020 · 22 comments
Labels
bug Something isn't working

Comments

@gourav-sg
Copy link

Describe the bug
java.lang.ClassCastException: org.apache.spark.sql.execution.vectorized.OnHeapColumnVector cannot be cast to com.nvidia.spark.rapids.GpuColumnVector

Steps/Code to reproduce bug
from pyspark.sql import SparkSession
from pyspark import SparkConf
conf = SparkConf().setAppName("MortgageETL")
conf.set("spark.jars", "s3://gourav-bucket/gourav/gpu/cudf-0.9.2.jar,s3://gourav-bucket/gourav/gpu/rapids-4-spark_2.12-0.1.0.jar,s3://gourav-bucket/gourav/gpu/cudf-0.14-cuda10-1.jar")
conf.set('spark.rapids.sql.explain', 'ALL')
conf.set("spark.executor.extraJavaOptions", "-Dai.rapids.cudf.prefer-pinned=true")
conf.set("spark.rapids.sql.concurrentGpuTasks", "1")
conf.set("spark.plugins", "com.nvidia.spark.SQLPlugin")
conf.set("spark.sql.adaptive.enabled", False)
conf.set("spark.executor.resource.gpu.discoveryScript","/usr/lib/spark/examples/src/main/scripts/getGpusResources.sh")
conf.set("spark.executor.resource.gpu.amount", "1")
conf.set("spark.task.resource.gpu.amount", "0.25")
conf.set("spark.executor.cores", "2")
conf.set("spark.task.cpus", "1")
spark = SparkSession.builder.enableHiveSupport().config(conf=conf).master("yarn").getOrCreate()
spark.conf.set('spark.rapids.sql.incompatibleOps.enabled', False)
spark.sql("SELECT 'a' FLD1, id FROM range(100)").show()

THIS WORKS

spark.sql("SELECT 'a' FLD1, id FROM range(100)").write.parquet("s3://gourav-bucket/gourav/testdata3/")
spark.read.parquet("s3://gourav-bucket/gourav/testdata3/").createOrReplaceTempView("test")
spark.sql("SELECT * FROM test").show()

FAILED HERE

spark.conf.set('spark.rapids.sql.incompatibleOps.enabled', True)
spark.sql("SELECT * FROM test").show()

FAILED AGAIN

Expected behavior
should show the table

Environment details (please complete the following information)
Environment location: YARN, Cloud(AWS EMR 6.1.0)
Thu Sep 10 22:14:18 2020
+-----------------------------------------------------------------------------+
| NVIDIA-SMI 440.33.01 Driver Version: 440.33.01 CUDA Version: 10.2 |
|-------------------------------+----------------------+----------------------+
| GPU Name Persistence-M| Bus-Id Disp.A | Volatile Uncorr. ECC |
| Fan Temp Perf Pwr:Usage/Cap| Memory-Usage | GPU-Util Compute M. |
|===============================+======================+======================|
| 0 Tesla T4 Off | 00000000:00:1E.0 Off | 0 |
| N/A 43C P0 27W / 70W | 14108MiB / 15109MiB | 0% Default |
+-------------------------------+----------------------+----------------------+

+-----------------------------------------------------------------------------+
| Processes: GPU Memory |
| GPU PID Type Process name Usage |
|=============================================================================|
| 0 16812 C /etc/alternatives/jre/bin/java 14097MiB |

Additional context
Add any other context about the problem here.

@gourav-sg gourav-sg added ? - Needs Triage Need team to review and classify bug Something isn't working labels Sep 10, 2020
@jlowe
Copy link
Member

jlowe commented Sep 10, 2020

@gourav-sg thanks for the bug report! I believe this is already fixed in the upcoming 0.2 release, as I was unable to reproduce it with rapids-4-spark_2.12-0.2.0-SNAPSHOT.jar and cudf-0.15-cuda10-1.jar.

If you don't mind, it would be good if you could also verify this issue does not reproduce in 0.2.0-SNAPSHOT. You can build your own version of 0.2.0-SNAPSHOT by checking out latest on branch-0.2 and building. It's a straightforward build with Maven, e.g.: mvn clean package, add -DskipTests if you don't want to wait for the tests to run. The resulting jar can be found in dist/target/rapids-4-spark_2.12-0.2.0-SNAPSHOT.jar after the build. The cudf jar can be found in your local maven repository after the build (usually ~/.m2/repository/ai/rapids/cudf/0.15/cudf-0.15-cuda10-1.jar) or you can download it at https://repo.maven.apache.org/maven2/ai/rapids/cudf/0.15/cudf-0.15-cuda10-1.jar.

If you are able to reproduce it on 0.2.0, it would be good to get the full stacktrace of the error.

@sameerz sameerz removed the ? - Needs Triage Need team to review and classify label Sep 11, 2020
@gourav-sg
Copy link
Author

hi @jlowe
I build the new jar file and did the exact steps above, and now I am receiving an error while creating the SPARK session itself as mentioned below:

Py4JJavaError: An error occurred while calling None.org.apache.spark.sql.SparkSession. : java.lang.ExceptionInInitializerError at com.nvidia.spark.rapids.SQLExecPlugin.apply(Plugin.scala:55) at com.nvidia.spark.rapids.SQLExecPlugin.apply(Plugin.scala:50) at org.apache.spark.sql.SparkSession$.$anonfun$applyExtensions$1(SparkSession.scala:1156) at org.apache.spark.sql.SparkSession$.$anonfun$applyExtensions$1$adapted(SparkSession.scala:1151) at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at org.apache.spark.sql.SparkSession$.org$apache$spark$sql$SparkSession$$applyExtensions(SparkSession.scala:1151) at org.apache.spark.sql.SparkSession.<init>(SparkSession.scala:99) at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:423) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:247) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:238) at py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80) at py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69) at py4j.GatewayConnection.run(GatewayConnection.java:238) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.IllegalArgumentException: Could not find Spark Shim Loader for 3.0.0-amzn-0 at com.nvidia.spark.rapids.ShimLoader$.<init>(ShimLoader.scala:42) at com.nvidia.spark.rapids.ShimLoader$.<clinit>(ShimLoader.scala) ... 20 more

@jlowe
Copy link
Member

jlowe commented Sep 11, 2020

java.lang.ExceptionInInitializerError at com.nvidia.spark.rapids.SQLExecPlugin.apply(Plugin.scala:55)

This shows there was an error initializing the plugin. Do you see anything earlier in the log indicating what may have triggered the error, either in the driver or executor logs? (e.g.: unsatisified link error or something similar)

@gourav-sg
Copy link
Author

Hi,

this is the first error visible, thanks a ton for coming back :)

java.lang.NoSuchMethodError: ai.rapids.cudf.Rmm.initialize(ILai/rapids/cudf/Rmm$LogConf;JJ)V at com.nvidia.spark.rapids.GpuDeviceManager$.initializeRmm(GpuDeviceManager.scala:235) at com.nvidia.spark.rapids.GpuDeviceManager$.initializeMemory(GpuDeviceManager.scala:265) at com.nvidia.spark.rapids.GpuDeviceManager$.initializeGpuAndMemory(GpuDeviceManager.scala:126) at com.nvidia.spark.rapids.RapidsExecutorPlugin.init(Plugin.scala:128) at org.apache.spark.internal.plugin.ExecutorPluginContainer.$anonfun$executorPlugins$1(PluginContainer.scala:111) at scala.collection.TraversableLike.$anonfun$flatMap$1(TraversableLike.scala:245) at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at scala.collection.TraversableLike.flatMap(TraversableLike.scala:245) at scala.collection.TraversableLike.flatMap$(TraversableLike.scala:242) at scala.collection.AbstractTraversable.flatMap(Traversable.scala:108) at org.apache.spark.internal.plugin.ExecutorPluginContainer.<init>(PluginContainer.scala:99) at org.apache.spark.internal.plugin.PluginContainer$.apply(PluginContainer.scala:164) at org.apache.spark.internal.plugin.PluginContainer$.apply(PluginContainer.scala:152) at org.apache.spark.executor.Executor.$anonfun$plugins$1(Executor.scala:158) at org.apache.spark.util.Utils$.withContextClassLoader(Utils.scala:221) at org.apache.spark.executor.Executor.<init>(Executor.scala:158) at org.apache.spark.executor.CoarseGrainedExecutorBackend$$anonfun$receive$1.applyOrElse(CoarseGrainedExecutorBackend.scala:168) at org.apache.spark.rpc.netty.Inbox.$anonfun$process$1(Inbox.scala:115) at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:203) at org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:100) at org.apache.spark.rpc.netty.MessageLoop.org$apache$spark$rpc$netty$MessageLoop$$receiveLoop(MessageLoop.scala:75) at org.apache.spark.rpc.netty.MessageLoop$$anon$1.run(MessageLoop.scala:41) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)

@jlowe
Copy link
Member

jlowe commented Sep 11, 2020

The NoSuchMethodError makes me wonder if we're getting into a situation similar to what happened initially in #628. Are there multiple versions of cudf in the classpath somehow?

@gourav-sg
Copy link
Author

gourav-sg commented Sep 11, 2020 via email

@gourav-sg
Copy link
Author

Hi,
@jlowe I have tested this and there is only CUDA library installed under /usr/local/cuda which is a link to the location /usr/local/cuda-10.2/. I can run count in GPU's without any issues in the jar file rapids-4-spark_2.12-0.1.0.jar. But with that jar file the show command fails.
In the next jar file which you suggested rapids-4-spark_2.12-0.2.0-SNAPSHOT.jar the session does not start successfully at all.

This is the error that I see in the pyhton console, the error in yarn logs is already mentioned above:

Py4JJavaError: An error occurred while calling None.org.apache.spark.sql.SparkSession. : java.lang.ExceptionInInitializerError at com.nvidia.spark.rapids.SQLExecPlugin.apply(Plugin.scala:55) at com.nvidia.spark.rapids.SQLExecPlugin.apply(Plugin.scala:50) at org.apache.spark.sql.SparkSession$.$anonfun$applyExtensions$1(SparkSession.scala:1156) at org.apache.spark.sql.SparkSession$.$anonfun$applyExtensions$1$adapted(SparkSession.scala:1151) at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at org.apache.spark.sql.SparkSession$.org$apache$spark$sql$SparkSession$$applyExtensions(SparkSession.scala:1151) at org.apache.spark.sql.SparkSession.<init>(SparkSession.scala:99) at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:423) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:247) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:238) at py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80) at py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69) at py4j.GatewayConnection.run(GatewayConnection.java:238) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.IllegalArgumentException: Could not find Spark Shim Loader for 3.0.0-amzn-0 at com.nvidia.spark.rapids.ShimLoader$.<init>(ShimLoader.scala:42) at com.nvidia.spark.rapids.ShimLoader$.<clinit>(ShimLoader.scala)

Regards,
Gourav

@gourav-sg
Copy link
Author

Hi,
@jlowe
I am further seeing that for decimal fields SPARK reports that its not supported in GPU and things are running fine, where as for integer datatypes, things fail with the error java.lang.ClassCastException: org.apache.spark.sql.execution.vectorized.OnHeapColumnVector cannot be cast to com.nvidia.spark.rapids.GpuColumnVector. This is when I am using rapids-4-spark_2.12-0.1.0.jar.

I remember about a configuration is SPARK 3.x to fall back from using GPU to normal CPU in case of errors, can you please let me know the configuration?

Also is there is any work around from this issue it will be a lot of help.

Regards,
Gourav

@jlowe
Copy link
Member

jlowe commented Sep 11, 2020

there is only CUDA library installed under /usr/local/cuda which is a link to the location /usr/local/cuda-10.2/

OK, so this node has a CUDA 10.2 runtime environment. That's useful to know. However I was asking about the cudf jar. Given this is a CUDA 10.2 runtime environment, are you using the cudf-0.15-cuda10-2.jar or cudf-0.15-cuda10-1.jar? The cudf jar being used needs to have a classifier (the "cuda10-1" or "cuda10-2" part of the jar name) that matches the CUDA runtime environment available on the system. You're getting a NoSuchMethodError when cudf initializes, which indicates there may be a mismatch between the cudf Java code and the cudf JNI code. A similar NoSuchMethodError occurred in the other issue, hence I'm wondering if there are multiple cudf jars being placed in the application classpath when it tries to run.

Looking closer at your startup configs, I see this:

conf.set("spark.jars", "s3://gourav-bucket/gourav/gpu/cudf-0.9.2.jar,s3://gourav-bucket/gourav/gpu/rapids-4-spark_2.12-0.1.0.jar,s3://gourav-bucket/gourav/gpu/cudf-0.14-cuda10-1.jar")

That looks like you're placing two different cudf jars in the classpath, cudf-0.9.2 and cudf-0.140-cuda10-1. There should only be one cudf jar in the classpath, and it needs to match the version expected by the rapids plugin. If the cudf-0.9.2 jar is also in the classpath when trying to run with plugin 0.2.0-SNAPSHOT and cudf-0.15 then that could explain the problem there.

Also cudf-0.14-cuda10-1.jar is built for CUDA runtime 10.1, but you mentioned a CUDA 10.2 runtime environment above which is a bit odd. Maybe the CUDA 10.1 environment is also available under /usr/local/cuda-10.1? (Multiple CUDA runtime environments can be installed and coexist peacefully.)

This is the error that I see in the pyhton console, the error in yarn logs is already mentioned above:

The Python error is caused by cudf failing to initialize. Solving the cudf NoSuchMethodError should enable the plugin to initialize successfully and solve that Python error as well.

I remember about a configuration is SPARK 3.x to fall back from using GPU to normal CPU in case of errors, can you please let me know the configuration?

The plugin should never throw an exception like ClassCastException. The plugin does not support dynamically switching back to the CPU when exceptions occur during runtime. It only falls back to the CPU during the query planning process when a query operation is known to not have a GPU-equivalent (or that equivalent has been configured to be disallowed).

As I mentioned above, I believe the error you are getting with the plugin 0.1.0 jars is fixed in the 0.2.0 release because I followed your problem reproduction steps and did not see the error.

Also is there is any work around from this issue it will be a lot of help.

You could try disabling Parquet reads in the plugin by setting spark.rapids.sql.input.ParquetScan=false. See the configuration documentation for more details on how to disable specific operation types from being run on the GPU. This isn't ideal since it will avoid using the plugin for Parquet reads, but it might get you past this particular issue. I haven't been able to reproduce it, so I don't know for sure which operation in the query is failing with the class cast exception. I'll see if I can reproduce it on the 0.1.0 release to get more information. In the meantime, please examine your 0.2.0-SNAPSHOT setup per the instructions above to see if we can get that working for you.

@gourav-sg
Copy link
Author

Hi,

@jlowe the details of the code execution is mentioned below, you can see that I am using the correct CUDA jar file.

Can you please share your code?

[hadoop@ip-172-29-69-4 mnt]$ /mnt/anaconda3/bin/ipython 
Python 3.7.8 | packaged by conda-forge | (default, Jul 31 2020, 02:25:08) 
Type 'copyright', 'credits' or 'license' for more information
IPython 7.17.0 -- An enhanced Interactive Python. Type '?' for help.

In [1]: import os, subprocess, sys, socket
   ...: os.environ["SPARK_HOME"] = "/usr/lib/spark/"
   ...: os.environ["PYSPARK_PYTHON"] = "/mnt/anaconda3/bin/python3.7"
   ...: spark_home = os.environ.get('SPARK_HOME', None)
   ...: sys.path.insert(0, spark_home + "/python")
   ...: sys.path.insert(0, os.path.join(spark_home, 'python/lib/py4j-src.zip'))
   ...: 
   ...: from pyspark.sql import SparkSession
   ...: from pyspark import SparkConf

In [2]: conf = SparkConf().setAppName("MortgageETL")
   ...: 
   ...: conf.set("spark.jars", "s3://gourav-bucket/gourav/gpu/rapids-4-spark_2.12-0.2.0-SNAPSHOT.jar,s3://gourav-bucket/go
   ...: urav/gpu/cudf-0.14-cuda10-2.jar")
Out[2]: <pyspark.conf.SparkConf at 0x7f441b589f50>

In [3]: conf.set('spark.rapids.sql.explain', 'ALL')
   ...: conf.set("spark.executor.extraJavaOptions", "-Dai.rapids.cudf.prefer-pinned=true")
   ...: conf.set("spark.rapids.sql.concurrentGpuTasks", "1")
   ...: conf.set("spark.plugins", "com.nvidia.spark.SQLPlugin")
   ...: conf.set("spark.sql.adaptive.enabled", True)
   ...: conf.set('spark.rapids.sql.incompatibleOps.enabled', True)
   ...: conf.set("spark.executor.resource.gpu.discoveryScript","/usr/lib/spark/examples/src/main/scripts/getGpusResources.sh")
   ...: conf.set("spark.executor.resource.gpu.amount", "1")
   ...: conf.set("spark.task.resource.gpu.amount", "0.25")
   ...: conf.set("spark.executor.cores", "2")
   ...: conf.set("spark.task.cpus", "1")
   ...: conf.set("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
   ...: conf.set("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
   ...: 
Out[3]: <pyspark.conf.SparkConf at 0x7f441b589f50>

In [4]: spark = SparkSession.builder.enableHiveSupport().config(conf=conf).master("yarn").getOrCreate()
   ...: 
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
20/09/11 22:24:08 WARN SparkContext: The configuration of resource: gpu (exec = 1, task = 1/4, runnable tasks = 4) will result in wasted resources due to resource CPU limiting the number of runnable tasks per executor to: 2. Please adjust your configuration.
20/09/11 22:24:08 WARN Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME.
20/09/11 22:24:17 WARN YarnSchedulerBackend$YarnSchedulerEndpoint: Attempted to request executors before the AM has registered!
20/09/11 22:24:17 WARN SQLExecPlugin: Installing extensions to enable rapids GPU SQL support. To disable GPU support set `spark.rapids.sql.enabled` to false
---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
<ipython-input-4-5bac58d4dd40> in <module>
----> 1 spark = SparkSession.builder.enableHiveSupport().config(conf=conf).master("yarn").getOrCreate()

/usr/lib/spark//python/pyspark/sql/session.py in getOrCreate(self)
    187                     # Do not update `SparkConf` for existing `SparkContext`, as it's shared
    188                     # by all sessions.
--> 189                     session = SparkSession(sc)
    190                 for key, value in self._options.items():
    191                     session._jsparkSession.sessionState().conf().setConfString(key, value)

/usr/lib/spark//python/pyspark/sql/session.py in __init__(self, sparkContext, jsparkSession)
    226                 jsparkSession = self._jvm.SparkSession.getDefaultSession().get()
    227             else:
--> 228                 jsparkSession = self._jvm.SparkSession(self._jsc.sc())
    229         self._jsparkSession = jsparkSession
    230         self._jwrapped = self._jsparkSession.sqlContext()

/mnt/anaconda3/lib/python3.7/site-packages/py4j/java_gateway.py in __call__(self, *args)
   1567         answer = self._gateway_client.send_command(command)
   1568         return_value = get_return_value(
-> 1569             answer, self._gateway_client, None, self._fqn)
   1570 
   1571         for temp_arg in temp_args:

/mnt/anaconda3/lib/python3.7/site-packages/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    326                 raise Py4JJavaError(
    327                     "An error occurred while calling {0}{1}{2}.\n".
--> 328                     format(target_id, ".", name), value)
    329             else:
    330                 raise Py4JError(

Py4JJavaError: An error occurred while calling None.org.apache.spark.sql.SparkSession.
: java.lang.ExceptionInInitializerError
	at com.nvidia.spark.rapids.SQLExecPlugin.apply(Plugin.scala:55)
	at com.nvidia.spark.rapids.SQLExecPlugin.apply(Plugin.scala:50)
	at org.apache.spark.sql.SparkSession$.$anonfun$applyExtensions$1(SparkSession.scala:1156)
	at org.apache.spark.sql.SparkSession$.$anonfun$applyExtensions$1$adapted(SparkSession.scala:1151)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.sql.SparkSession$.org$apache$spark$sql$SparkSession$$applyExtensions(SparkSession.scala:1151)
	at org.apache.spark.sql.SparkSession.<init>(SparkSession.scala:99)
	at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
	at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
	at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
	at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:247)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:238)
	at py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)
	at py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IllegalArgumentException: Could not find Spark Shim Loader for 3.0.0-amzn-0
	at com.nvidia.spark.rapids.ShimLoader$.<init>(ShimLoader.scala:42)
	at com.nvidia.spark.rapids.ShimLoader$.<clinit>(ShimLoader.scala)
	... 20 more


In [5]: exit()

Regards,
Gourav

@jlowe
Copy link
Member

jlowe commented Sep 11, 2020

Caused by: java.lang.IllegalArgumentException: Could not find Spark Shim Loader for 3.0.0-amzn-0

This is a different error, so I assume you were able to get past the NoSuchMethodError exception. And this error is starting to shed a lot more light on the nature of the problem. I wasn't testing against AWS EMR, mine was Spark standalone. This shows AWS EMR is using a modified version of Apache Spark, and I'm guessing that is probably incompatible with the plugin in some fashion.

We're going to have to dig deeper into this on our end. In the meantime, it would be helpful if you could replace .show() with .explain() in your query command and send that output, along with the full stacktrace that is associated with the java.lang.ClassCastException: org.apache.spark.sql.execution.vectorized.OnHeapColumnVector cannot be cast to com.nvidia.spark.rapids.GpuColumnVector error.

@gourav-sg
Copy link
Author

Hi,

The entire code is mentioned below and the entire YARN log is attached.
yarn.log.zip

[hadoop@ip-172-29-69-4 ~]$ /mnt/anaconda3/bin/ipython
Python 3.7.8 | packaged by conda-forge | (default, Jul 31 2020, 02:25:08) 
Type 'copyright', 'credits' or 'license' for more information
IPython 7.17.0 -- An enhanced Interactive Python. Type '?' for help.

In [1]: import os, subprocess, sys, socket
   ...: os.environ["SPARK_HOME"] = "/usr/lib/spark/"
   ...: os.environ["PYSPARK_PYTHON"] = "/mnt/anaconda3/bin/python3.7"
   ...: spark_home = os.environ.get('SPARK_HOME', None)
   ...: sys.path.insert(0, spark_home + "/python")
   ...: sys.path.insert(0, os.path.join(spark_home, 'python/lib/py4j-src.zip'))
   ...: 
   ...: from pyspark.sql import SparkSession
   ...: from pyspark import SparkConf

In [2]: conf = SparkConf().setAppName("MortgageETL")
   ...: 

In [3]: conf.set("spark.jars", "s3://gourav-bucket/gourav/gpu/rapids-4-spark_2.12-0.1.0.jar,s3://gourav-bucket/gourav/gpu/
   ...: cudf-0.14-cuda10-2.jar")
   ...: 
   ...: conf.set('spark.rapids.sql.explain', 'ALL')
   ...: conf.set("spark.executor.extraJavaOptions", "-Dai.rapids.cudf.prefer-pinned=true")
   ...: conf.set("spark.rapids.sql.concurrentGpuTasks", "1")
   ...: conf.set("spark.plugins", "com.nvidia.spark.SQLPlugin")
   ...: conf.set("spark.sql.adaptive.enabled", True)
   ...: conf.set('spark.rapids.sql.incompatibleOps.enabled', True)
   ...: conf.set("spark.executor.resource.gpu.discoveryScript","/usr/lib/spark/examples/src/main/scripts/getGpusResources.sh")
   ...: conf.set("spark.executor.resource.gpu.amount", "1")
   ...: conf.set("spark.task.resource.gpu.amount", "0.25")
   ...: conf.set("spark.executor.cores", "2")
   ...: conf.set("spark.task.cpus", "1")
   ...: 
Out[3]: <pyspark.conf.SparkConf at 0x7fac3d8a5f10>

In [4]: spark = SparkSession.builder.enableHiveSupport().config(conf=conf).master("yarn").getOrCreate()
   ...: 
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
20/09/12 07:54:12 WARN SparkContext: The configuration of resource: gpu (exec = 1, task = 1/4, runnable tasks = 4) will result in wasted resources due to resource CPU limiting the number of runnable tasks per executor to: 2. Please adjust your configuration.
20/09/12 07:54:13 WARN Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME.
20/09/12 07:54:22 WARN YarnSchedulerBackend$YarnSchedulerEndpoint: Attempted to request executors before the AM has registered!
20/09/12 07:54:22 WARN SQLExecPlugin: Installing extensions to enable rapids GPU SQL support. To disable GPU support set `spark.rapids.sql.enabled` to false

In [5]: spark.range(100).write.parquet("s3://gourav-bucket/gourav-data/testdata9")
20/09/12 07:55:36 WARN GpuOverrides: 
*Exec <DataWritingCommandExec> could run on GPU
  *Output <InsertIntoHadoopFsRelationCommand> could run on GPU
  !NOT_FOUND <RangeExec> cannot run on GPU because no GPU enabled version of operator class org.apache.spark.sql.execution.RangeExec could be found

                                                                                
In [6]: spark.read.parquet("s3://gourav-bucket/gourav-data/testdata9").explain()
20/09/12 07:56:36 WARN GpuOverrides: 
*Exec <FileSourceScanExec> could run on GPU

== Physical Plan ==
*(1) GpuColumnarToRow false
+- GpuFileGpuScan parquet [id#3L] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[s3://gourav-bucket/gourav-data/testdata9], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:bigint>



In [7]: spark.read.parquet("s3://gourav-bucket/gourav-data/testdata9").show()
20/09/12 07:56:56 WARN GpuOverrides: 
*Exec <CollectLimitExec> could run on GPU
  *Partitioning <SinglePartition$> could run on GPU
  *Exec <ProjectExec> could run on GPU
    *Expression <Alias> cast(id#5L as string) AS id#8 could run on GPU
      *Expression <Cast> cast(id#5L as string) could run on GPU
    *Exec <FileSourceScanExec> could run on GPU

20/09/12 07:56:57 WARN TaskSetManager: Lost task 0.0 in stage 3.0 (TID 18, ip-172-29-69-4.eu-west-1.compute.internal, executor 2): java.lang.ClassCastException: org.apache.spark.sql.execution.vectorized.OnHeapColumnVector cannot be cast to com.nvidia.spark.rapids.GpuColumnVector
	at com.nvidia.spark.rapids.GpuBoundReference.columnarEval(GpuBoundAttribute.scala:71)
	at com.nvidia.spark.rapids.GpuUnaryExpression.columnarEval(GpuExpressions.scala:121)
	at com.nvidia.spark.rapids.GpuAlias.columnarEval(namedExpressions.scala:90)
	at com.nvidia.spark.rapids.GpuProjectExec$.$anonfun$project$1(basicPhysicalOperators.scala:49)
	at com.nvidia.spark.rapids.RapidsPluginImplicits$MapsSafely.$anonfun$safeMap$1(implicits.scala:153)
	at com.nvidia.spark.rapids.RapidsPluginImplicits$MapsSafely.$anonfun$safeMap$1$adapted(implicits.scala:150)
	at scala.collection.immutable.Stream.foreach(Stream.scala:533)
	at com.nvidia.spark.rapids.RapidsPluginImplicits$MapsSafely.safeMap(implicits.scala:150)
	at com.nvidia.spark.rapids.RapidsPluginImplicits$AutoCloseableProducingSeq.safeMap(implicits.scala:185)
	at com.nvidia.spark.rapids.GpuProjectExec$.project(basicPhysicalOperators.scala:48)
	at com.nvidia.spark.rapids.GpuProjectExec$.projectAndClose(basicPhysicalOperators.scala:39)
	at com.nvidia.spark.rapids.GpuProjectExec.$anonfun$doExecuteColumnar$1(basicPhysicalOperators.scala:88)
	at scala.collection.Iterator$$anon$10.next(Iterator.scala:459)
	at com.nvidia.spark.rapids.GpuShuffleExchangeExec$$anon$1.partNextBatch(GpuShuffleExchangeExec.scala:172)
	at com.nvidia.spark.rapids.GpuShuffleExchangeExec$$anon$1.hasNext(GpuShuffleExchangeExec.scala:188)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:133)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
	at org.apache.spark.scheduler.Task.run(Task.scala:127)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:444)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:447)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

20/09/12 07:56:57 ERROR TaskSetManager: Task 3 in stage 3.0 failed 4 times; aborting job
20/09/12 07:56:57 WARN TaskSetManager: Lost task 1.3 in stage 3.0 (TID 29, ip-172-29-69-4.eu-west-1.compute.internal, executor 2): TaskKilled (Stage cancelled)
20/09/12 07:56:57 WARN TaskSetManager: Lost task 2.3 in stage 3.0 (TID 30, ip-172-29-69-4.eu-west-1.compute.internal, executor 2): TaskKilled (Stage cancelled)
---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
<ipython-input-7-0f578d3ae881> in <module>
----> 1 spark.read.parquet("s3://gourav-bucket/gourav-data/testdata9").show()

/usr/lib/spark//python/pyspark/sql/dataframe.py in show(self, n, truncate, vertical)
    439         """
    440         if isinstance(truncate, bool) and truncate:
--> 441             print(self._jdf.showString(n, 20, vertical))
    442         else:
    443             print(self._jdf.showString(n, int(truncate), vertical))

/mnt/anaconda3/lib/python3.7/site-packages/py4j/java_gateway.py in __call__(self, *args)
   1303         answer = self.gateway_client.send_command(command)
   1304         return_value = get_return_value(
-> 1305             answer, self.gateway_client, self.target_id, self.name)
   1306 
   1307         for temp_arg in temp_args:

/usr/lib/spark//python/pyspark/sql/utils.py in deco(*a, **kw)
    129     def deco(*a, **kw):
    130         try:
--> 131             return f(*a, **kw)
    132         except py4j.protocol.Py4JJavaError as e:
    133             converted = convert_exception(e.java_exception)

/mnt/anaconda3/lib/python3.7/site-packages/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    326                 raise Py4JJavaError(
    327                     "An error occurred while calling {0}{1}{2}.\n".
--> 328                     format(target_id, ".", name), value)
    329             else:
    330                 raise Py4JError(

Py4JJavaError: An error occurred while calling o131.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 3 in stage 3.0 failed 4 times, most recent failure: Lost task 3.3 in stage 3.0 (TID 28, ip-172-29-69-4.eu-west-1.compute.internal, executor 2): java.lang.ClassCastException: org.apache.spark.sql.execution.vectorized.OnHeapColumnVector cannot be cast to com.nvidia.spark.rapids.GpuColumnVector
	at com.nvidia.spark.rapids.GpuBoundReference.columnarEval(GpuBoundAttribute.scala:71)
	at com.nvidia.spark.rapids.GpuUnaryExpression.columnarEval(GpuExpressions.scala:121)
	at com.nvidia.spark.rapids.GpuAlias.columnarEval(namedExpressions.scala:90)
	at com.nvidia.spark.rapids.GpuProjectExec$.$anonfun$project$1(basicPhysicalOperators.scala:49)
	at com.nvidia.spark.rapids.RapidsPluginImplicits$MapsSafely.$anonfun$safeMap$1(implicits.scala:153)
	at com.nvidia.spark.rapids.RapidsPluginImplicits$MapsSafely.$anonfun$safeMap$1$adapted(implicits.scala:150)
	at scala.collection.immutable.Stream.foreach(Stream.scala:533)
	at com.nvidia.spark.rapids.RapidsPluginImplicits$MapsSafely.safeMap(implicits.scala:150)
	at com.nvidia.spark.rapids.RapidsPluginImplicits$AutoCloseableProducingSeq.safeMap(implicits.scala:185)
	at com.nvidia.spark.rapids.GpuProjectExec$.project(basicPhysicalOperators.scala:48)
	at com.nvidia.spark.rapids.GpuProjectExec$.projectAndClose(basicPhysicalOperators.scala:39)
	at com.nvidia.spark.rapids.GpuProjectExec.$anonfun$doExecuteColumnar$1(basicPhysicalOperators.scala:88)
	at scala.collection.Iterator$$anon$10.next(Iterator.scala:459)
	at com.nvidia.spark.rapids.GpuShuffleExchangeExec$$anon$1.partNextBatch(GpuShuffleExchangeExec.scala:172)
	at com.nvidia.spark.rapids.GpuShuffleExchangeExec$$anon$1.hasNext(GpuShuffleExchangeExec.scala:188)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:133)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
	at org.apache.spark.scheduler.Task.run(Task.scala:127)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:444)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:447)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2175)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2124)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2123)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2123)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:990)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:990)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:990)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2355)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2304)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2293)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:792)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2093)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2114)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2133)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2158)
	at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1004)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:388)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:1003)
	at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:390)
	at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3664)
	at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:2737)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3655)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:106)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:207)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:88)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3653)
	at org.apache.spark.sql.Dataset.head(Dataset.scala:2737)
	at org.apache.spark.sql.Dataset.take(Dataset.scala:2944)
	at org.apache.spark.sql.Dataset.getRows(Dataset.scala:301)
	at org.apache.spark.sql.Dataset.showString(Dataset.scala:338)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassCastException: org.apache.spark.sql.execution.vectorized.OnHeapColumnVector cannot be cast to com.nvidia.spark.rapids.GpuColumnVector
	at com.nvidia.spark.rapids.GpuBoundReference.columnarEval(GpuBoundAttribute.scala:71)
	at com.nvidia.spark.rapids.GpuUnaryExpression.columnarEval(GpuExpressions.scala:121)
	at com.nvidia.spark.rapids.GpuAlias.columnarEval(namedExpressions.scala:90)
	at com.nvidia.spark.rapids.GpuProjectExec$.$anonfun$project$1(basicPhysicalOperators.scala:49)
	at com.nvidia.spark.rapids.RapidsPluginImplicits$MapsSafely.$anonfun$safeMap$1(implicits.scala:153)
	at com.nvidia.spark.rapids.RapidsPluginImplicits$MapsSafely.$anonfun$safeMap$1$adapted(implicits.scala:150)
	at scala.collection.immutable.Stream.foreach(Stream.scala:533)
	at com.nvidia.spark.rapids.RapidsPluginImplicits$MapsSafely.safeMap(implicits.scala:150)
	at com.nvidia.spark.rapids.RapidsPluginImplicits$AutoCloseableProducingSeq.safeMap(implicits.scala:185)
	at com.nvidia.spark.rapids.GpuProjectExec$.project(basicPhysicalOperators.scala:48)
	at com.nvidia.spark.rapids.GpuProjectExec$.projectAndClose(basicPhysicalOperators.scala:39)
	at com.nvidia.spark.rapids.GpuProjectExec.$anonfun$doExecuteColumnar$1(basicPhysicalOperators.scala:88)
	at scala.collection.Iterator$$anon$10.next(Iterator.scala:459)
	at com.nvidia.spark.rapids.GpuShuffleExchangeExec$$anon$1.partNextBatch(GpuShuffleExchangeExec.scala:172)
	at com.nvidia.spark.rapids.GpuShuffleExchangeExec$$anon$1.hasNext(GpuShuffleExchangeExec.scala:188)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:133)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
	at org.apache.spark.scheduler.Task.run(Task.scala:127)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:444)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:447)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more


In [8]: 

Regards,
Gourav Sengupta

@jlowe jlowe changed the title SPARK SQL will fail with incompatible data types issues SPARK SQL will fail on AWS EMR with incompatible data types issues Sep 14, 2020
@gourav-sg
Copy link
Author

Hi @jlowe
please let me know in case there is anything that I can do here. AWS EMR has a very large share of SPARK users, and your resolution will be useful for all of them.
I am currently testing the solution in trial mode and except for the issue above, things seem to be working fine for now.

Regards,
Gourav

@jlowe
Copy link
Member

jlowe commented Sep 15, 2020

Sorry for the delayed reply. We did a bit of investigation on our end, and I believe we found a workaround, at least for the one test query that is failing for you. Credit goes to @tgravescs for finding some settings that should help.

We recommend setting the following two configs when running with the 0.1.0 plugin in your AWS EMR setup:

spark.sql.adaptive.enabled           false
spark.sql.sources.useV1SourceList    ""

The first config setting will disable Adaptive Query Execution (AQE) which is not supported by the 0.1.0 version of the plugin. The second config setting forces Spark to load the data via DataSourceV2 interfaces which allows the test query to work. We believe something specific to the AWS EMR version of Spark is interfering with the plugin's processing of DataSourceV1 loads from S3, but we don't yet know the root cause of the issue.

I hope this allows you to proceed with your testing. Note that we will not be able to officially support an AWS EMR setup until the 0.3.0 release at the earliest. It appears there will need to be plugin changes to support whatever AWS EMR's Spark is doing with V1 data sources, and the 0.2.0 release is in its final stages.

@gourav-sg
Copy link
Author

Hi @tgravescs and @jlowe ,
I am actively working on this, I have messed up a bit of my scripts which delayed me for a couple of hours today, but will surely come back by tomorrow.
Thanks a ton for your kind help with this.
Regards,
Gourav

@gourav-sg
Copy link
Author

Hi @jlowe and @tgravescs ,

I have been able to finally test with the changes but now nothing works via the GPU anymore. Do you have any sample data and queries that I can use to test whether the DataSourceV1 can use GPU's.

Regards,
Gourav Sengupta

@jlowe
Copy link
Member

jlowe commented Sep 18, 2020

Can you elaborate on what you mean by "now nothing works"? Are operations simply not running on the GPU at all, is it crashing, or something else?

Do you have any sample data and queries that I can use to test whether the DataSourceV1 can use GPU's.

A perfect example is the one you provided in the description of this ticket:

spark.sql("SELECT 'a' FLD1, id FROM range(100)").write.parquet("s3://gourav-bucket/gourav/testdata3/")
spark.read.parquet("s3://gourav-bucket/gourav/testdata3/").createOrReplaceTempView("test")
spark.sql("SELECT * FROM test").show()

That's what I used to replicate the issue in AWS EMR. Setting spark.sql.sources.useV1SourceList to an empty string, forcing all data sources to use V2 instead of V1, and re-executing that query setup allowed it to read Parquet data from S3 using the GPU as seen in the query explanation (e.g.: replace .show() with .explain()).

@gourav-sg
Copy link
Author

gourav-sg commented Sep 19, 2020 via email

@sameerz
Copy link
Collaborator

sameerz commented Sep 21, 2020

Do you have any sample data and queries that I can use to test whether the DataSourceV1 can use GPU's.

You can take look at a straightforward join operation https://nvidia.github.io/spark-rapids/docs/get-started/getting-started.html#example-join-operation or at the mortgage etl notebook https://nvidia.github.io/spark-rapids/docs/examples.html demo.

@tgravescs
Copy link
Collaborator

also if your operators are not running on the GPU can you please let us know what the output of the explain is in the driver log. It looks like you have configuration conf.set('spark.rapids.sql.explain', 'ALL'). enabled, so in the driver log it will detail which operations and data types are allowed on the GPU and which ones block something from being on the GPU.

@sameerz
Copy link
Collaborator

sameerz commented Apr 29, 2021

HI @gourav-sg , since this was filed we've spent time integrating more closely with AWS EMR. There are instructions on using the 0.2 version of the plugin with EMR at https://nvidia.github.io/spark-rapids/docs/get-started/getting-started-aws-emr.html . Let us know if you have any more questions on this issue, or if we can close it.

@gourav-sg
Copy link
Author

gourav-sg commented Apr 29, 2021 via email

@sameerz sameerz closed this as completed Apr 29, 2021
tgravescs pushed a commit to tgravescs/spark-rapids that referenced this issue Nov 30, 2023
…IDIA#730)

Signed-off-by: spark-rapids automation <70000568+nvauto@users.noreply.github.com>

Signed-off-by: spark-rapids automation <70000568+nvauto@users.noreply.github.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

4 participants