From 576f41b5f22beb282214b717bb8eb09ac4968423 Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Sat, 16 Nov 2019 18:28:27 -0800 Subject: [PATCH] [SPARK-29378][R] Upgrade SparkR to use Arrow 0.15 API [[SPARK-29376] Upgrade Apache Arrow to version 0.15.1](https://github.com/apache/spark/pull/26133) upgrades to Arrow 0.15 at Scala/Java/Python. This PR aims to upgrade `SparkR` to use Arrow 0.15 API. Currently, it's broken. First of all, it turns out that our Jenkins jobs (including PR builder) ignores Arrow test. Arrow 0.15 has a breaking R API changes at [ARROW-5505](https://issues.apache.org/jira/browse/ARROW-5505) and we missed that. AppVeyor was the only one having SparkR Arrow tests but it's broken now. **Jenkins** ``` Skipped ------------------------------------------------------------------------ 1. createDataFrame/collect Arrow optimization (test_sparkSQL_arrow.R#25) - arrow not installed ``` Second, Arrow throws OOM on AppVeyor environment (Windows JDK8) like the following because it still has Arrow 0.14. ``` Warnings ----------------------------------------------------------------------- 1. createDataFrame/collect Arrow optimization (test_sparkSQL_arrow.R#39) - createDataFrame attempted Arrow optimization because 'spark.sql.execution.arrow.sparkr.enabled' is set to true; however, failed, attempting non-optimization. Reason: Error in handleErrors(returnStatus, conn): java.lang.OutOfMemoryError: Java heap space at java.nio.HeapByteBuffer.(HeapByteBuffer.java:57) at java.nio.ByteBuffer.allocate(ByteBuffer.java:335) at org.apache.arrow.vector.ipc.message.MessageSerializer.readMessage(MessageSerializer.java:669) at org.apache.spark.sql.execution.arrow.ArrowConverters$$anon$3.readNextBatch(ArrowConverters.scala:243) ``` It is due to the version mismatch. ```java int messageLength = MessageSerializer.bytesToInt(buffer.array()); if (messageLength == IPC_CONTINUATION_TOKEN) { buffer.clear(); // ARROW-6313, if the first 4 bytes are continuation message, read the next 4 for the length if (in.readFully(buffer) == 4) { messageLength = MessageSerializer.bytesToInt(buffer.array()); } } // Length of 0 indicates end of stream if (messageLength != 0) { // Read the message into the buffer. ByteBuffer messageBuffer = ByteBuffer.allocate(messageLength); ``` After upgrading this to 0.15, we are hitting ARROW-5505. This PR upgrades Arrow version in AppVeyor and fix the issue. No. Pass the AppVeyor. This PR passed here. - https://ci.appveyor.com/project/ApacheSoftwareFoundation/spark/builds/28909044 ``` SparkSQL Arrow optimization: Spark package found in SPARK_HOME: C:\projects\spark\bin\.. ................ ``` Closes #26555 from dongjoon-hyun/SPARK-R-TEST. Authored-by: Dongjoon Hyun Signed-off-by: Dongjoon Hyun --- R/pkg/R/SQLContext.R | 4 ++-- R/pkg/R/deserialize.R | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/R/pkg/R/SQLContext.R b/R/pkg/R/SQLContext.R index b2d0d15d6a372..e349da84dab3c 100644 --- a/R/pkg/R/SQLContext.R +++ b/R/pkg/R/SQLContext.R @@ -166,9 +166,9 @@ writeToFileInArrow <- function(fileName, rdf, numPartitions) { for (rdf_slice in rdf_slices) { batch <- arrow::record_batch(rdf_slice) if (is.null(stream_writer)) { - stream <- arrow::FileOutputStream(fileName) + stream <- arrow::FileOutputStream$create(fileName) schema <- batch$schema - stream_writer <- arrow::RecordBatchStreamWriter(stream, schema) + stream_writer <- arrow::RecordBatchStreamWriter$create(stream, schema) } stream_writer$write_batch(batch) diff --git a/R/pkg/R/deserialize.R b/R/pkg/R/deserialize.R index a6febb1cbd132..ca4a6e342d772 100644 --- a/R/pkg/R/deserialize.R +++ b/R/pkg/R/deserialize.R @@ -242,7 +242,7 @@ readDeserializeInArrow <- function(inputCon) { # for now. dataLen <- readInt(inputCon) arrowData <- readBin(inputCon, raw(), as.integer(dataLen), endian = "big") - batches <- arrow::RecordBatchStreamReader(arrowData)$batches() + batches <- arrow::RecordBatchStreamReader$create(arrowData)$batches() if (useAsTibble) { as_tibble <- get("as_tibble", envir = asNamespace("arrow"))