From c2b355840d5628ea4e05b67867d12da03a4bdd6b Mon Sep 17 00:00:00 2001 From: Thomas Graves Date: Fri, 29 Jan 2021 12:56:57 -0600 Subject: [PATCH] Support faster copy for a custom DataSource V2 which supplies Arrow data (#1622) * Add in data source v2, csv file and test for arrow copy * remove commented out line --- docs/configs.md | 1 + .../src/main/python/datasourcev2_read.py | 104 ++++ .../src/test/resources/people.csv | 66 +++ .../rapids/shims/spark300/Spark300Shims.scala | 15 + .../rapids/shims/spark311/Spark311Shims.scala | 16 + .../nvidia/spark/rapids/GpuColumnVector.java | 130 ++++- .../rapids/AccessibleArrowColumnVector.java | 507 ++++++++++++++++++ .../spark/rapids/GpuCoalesceBatches.scala | 10 +- .../spark/rapids/HostColumnarToGpu.scala | 110 +++- .../com/nvidia/spark/rapids/RapidsConf.scala | 10 + .../com/nvidia/spark/rapids/SparkShims.scala | 7 + .../rapids/GpuCoalesceBatchesSuite.scala | 172 +++++- 12 files changed, 1109 insertions(+), 39 deletions(-) create mode 100644 integration_tests/src/main/python/datasourcev2_read.py create mode 100644 integration_tests/src/test/resources/people.csv create mode 100644 sql-plugin/src/main/java/org/apache/spark/sql/vectorized/rapids/AccessibleArrowColumnVector.java diff --git a/docs/configs.md b/docs/configs.md index 6ff58712230..501e5075ff0 100644 --- a/docs/configs.md +++ b/docs/configs.md @@ -29,6 +29,7 @@ scala> spark.conf.set("spark.rapids.sql.incompatibleOps.enabled", true) Name | Description | Default Value -----|-------------|-------------- +spark.rapids.alluxio.pathsToReplace|List of paths to be replaced with corresponding alluxio scheme. Eg, when configureis set to "s3:/foo->alluxio://0.1.2.3:19998/foo,gcs:/bar->alluxio://0.1.2.3:19998/bar", which means: s3:/foo/a.csv will be replaced to alluxio://0.1.2.3:19998/foo/a.csv and gcs:/bar/b.csv will be replaced to alluxio://0.1.2.3:19998/bar/b.csv|None spark.rapids.cloudSchemes|Comma separated list of additional URI schemes that are to be considered cloud based filesystems. Schemes already included: dbfs, s3, s3a, s3n, wasbs, gs. Cloud based stores generally would be total separate from the executors and likely have a higher I/O read cost. Many times the cloud filesystems also get better throughput when you have multiple readers in parallel. This is used with spark.rapids.sql.format.parquet.reader.type|None spark.rapids.memory.gpu.allocFraction|The fraction of total GPU memory that should be initially allocated for pooled memory. Extra memory will be allocated as needed, but it may result in more fragmentation. This must be less than or equal to the maximum limit configured via spark.rapids.memory.gpu.maxAllocFraction.|0.9 spark.rapids.memory.gpu.debug|Provides a log of GPU memory allocations and frees. If set to STDOUT or STDERR the logging will go there. Setting it to NONE disables logging. All other values are reserved for possible future expansion and in the mean time will disable logging.|NONE diff --git a/integration_tests/src/main/python/datasourcev2_read.py b/integration_tests/src/main/python/datasourcev2_read.py new file mode 100644 index 00000000000..4a556369304 --- /dev/null +++ b/integration_tests/src/main/python/datasourcev2_read.py @@ -0,0 +1,104 @@ +# Copyright (c) 2021, NVIDIA CORPORATION. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pytest + +from asserts import assert_gpu_and_cpu_are_equal_collect +from marks import * +from pyspark.sql.types import * +from spark_session import with_cpu_session + +# This test requires a datasource v2 jar containing the class +# org.apache.spark.sql.connector.InMemoryTableCatalog +# which returns ArrowColumnVectors be specified in order for it to run. +# If that class is not present it skips the tests. + +catalogName = "columnar" +columnarClass = 'org.apache.spark.sql.connector.InMemoryTableCatalog' + +def createPeopleCSVDf(spark, peopleCSVLocation): + return spark.read.format("csv")\ + .option("header", "false")\ + .option("inferSchema", "true")\ + .load(peopleCSVLocation)\ + .withColumnRenamed("_c0", "name")\ + .withColumnRenamed("_c1", "age")\ + .withColumnRenamed("_c2", "job") + +def setupInMemoryTableWithPartitioning(spark, csv, tname, column_and_table): + peopleCSVDf = createPeopleCSVDf(spark, csv) + peopleCSVDf.createOrReplaceTempView(tname) + spark.table(tname).write.partitionBy("job").saveAsTable(column_and_table) + +def setupInMemoryTableNoPartitioning(spark, csv, tname, column_and_table): + peopleCSVDf = createPeopleCSVDf(spark, csv) + peopleCSVDf.createOrReplaceTempView(tname) + spark.table(tname).write.saveAsTable(column_and_table) + +def readTable(csvPath, tableToRead): + return lambda spark: spark.table(tableToRead)\ + .orderBy("name", "age") + +def createDatabase(spark): + try: + spark.sql("create database IF NOT EXISTS " + catalogName) + spark.sql("use " + catalogName) + except Exception: + pytest.skip("Failed to load catalog for datasource v2 {}, jar is probably missing".format(columnarClass)) + +def cleanupDatabase(spark): + spark.sql("drop database IF EXISTS " + catalogName) + +@pytest.fixture(autouse=True) +def setupAndCleanUp(): + with_cpu_session(lambda spark : createDatabase(spark), + conf={'spark.sql.catalog.columnar': columnarClass}) + yield + with_cpu_session(lambda spark : cleanupDatabase(spark), + conf={'spark.sql.catalog.columnar': columnarClass}) + +@allow_non_gpu('ShowTablesExec', 'DropTableExec') +@pytest.mark.parametrize('csv', ['people.csv']) +def test_read_round_trip_partitioned(std_input_path, csv, spark_tmp_table_factory): + csvPath = std_input_path + "/" + csv + tableName = spark_tmp_table_factory.get() + columnarTableName = catalogName + "." + tableName + with_cpu_session(lambda spark : setupInMemoryTableWithPartitioning(spark, csvPath, tableName, columnarTableName), + conf={'spark.sql.catalog.columnar': columnarClass}) + assert_gpu_and_cpu_are_equal_collect(readTable(csvPath, columnarTableName), + conf={'spark.sql.catalog.columnar': columnarClass}) + +@allow_non_gpu('ShowTablesExec', 'DropTableExec') +@pytest.mark.parametrize('csv', ['people.csv']) +def test_read_round_trip_no_partitioned(std_input_path, csv, spark_tmp_table_factory): + csvPath = std_input_path + "/" + csv + tableNameNoPart = spark_tmp_table_factory.get() + columnarTableNameNoPart = catalogName + "." + tableNameNoPart + with_cpu_session(lambda spark : setupInMemoryTableNoPartitioning(spark, csvPath, tableNameNoPart, columnarTableNameNoPart), + conf={'spark.sql.catalog.columnar': columnarClass}) + assert_gpu_and_cpu_are_equal_collect(readTable(csvPath, columnarTableNameNoPart), + conf={'spark.sql.catalog.columnar': columnarClass}) + +@allow_non_gpu('ShowTablesExec', 'DropTableExec') +@pytest.mark.parametrize('csv', ['people.csv']) +def test_read_round_trip_no_partitioned_arrow_off(std_input_path, csv, spark_tmp_table_factory): + csvPath = std_input_path + "/" + csv + tableNameNoPart = spark_tmp_table_factory.get() + columnarTableNameNoPart = catalogName + "." + tableNameNoPart + with_cpu_session(lambda spark : setupInMemoryTableNoPartitioning(spark, csvPath, tableNameNoPart, columnarTableNameNoPart), + conf={'spark.sql.catalog.columnar': columnarClass, + 'spark.rapids.arrowCopyOptmizationEnabled': 'false'}) + assert_gpu_and_cpu_are_equal_collect(readTable(csvPath, columnarTableNameNoPart), + conf={'spark.sql.catalog.columnar': columnarClass, + 'spark.rapids.arrowCopyOptmizationEnabled': 'false'}) diff --git a/integration_tests/src/test/resources/people.csv b/integration_tests/src/test/resources/people.csv new file mode 100644 index 00000000000..d9781c69aef --- /dev/null +++ b/integration_tests/src/test/resources/people.csv @@ -0,0 +1,66 @@ +Jackelyn,23,Engineer +Jeannie,40,Banker +Mariella,20,Professor +Ardith,33,Professor +Elena,41,Banker +Noma,39,Student +Corliss,24,Student +Denae,24,Banker +Ned,44,Professor +Karolyn,,Engineer +Cornelia,45,Sales Manager +Kiyoko,41,Banker +Denisha,45,Engineer +Hilton,21,Sales Manager +Becky,32,Sales Manager +Wendie,29,Banker +Veronica,25,Sales Manager +Carolina,26,Student +Laquita,47,Banker +Stephani,,Professor +Emile,29,Professor +Octavio,35,Banker +Florencio,34,Banker +Elna,38,Banker +Cherri,23,Banker +Raleigh,47,Banker +Hollis,32,Professor +Charlette,35,Professor +Yetta,31,Student +Alfreda,,Engineer +Brigette,45,Banker +Maryann,40,Sales Manager +Miki,37,Banker +Pearle,37,Sales Manager +Damian,21,Sales Manager +Theodore,34,Student +Kristi,46,Engineer +Izetta,45,Professor +Tammi,43,Engineer +Star,,Sales Manager +Kaylee,27,Professor +Lakeshia,48,Professor +Elba,43,Sales Manager +Valencia,20,Engineer +Randa,35,Banker +Lourie,36,Professor +Tracie,31,Banker +Antwan,40,Professor +Gerry,23,Student +Jason,,Banker +Steve,51,CEO +Faker,25,Gamer +가나다라마바사,30,Banker +가나다마바사,20,Banker +가나다나다라마바,21,Professor +가나다나라마바사,22,Engineer +가나나나가나나,23,Sales Manager +가나다나다나나다,24,Banker +나다나다가나다,25,Student +Elon,,CEO +Yohwan,38,Gamer +Kitaek,48,Jobless +Kijung,22,Tutor +KiWoo,23,Tutor +Dongik,43,CEO +Dasong,9,Student \ No newline at end of file diff --git a/shims/spark300/src/main/scala/com/nvidia/spark/rapids/shims/spark300/Spark300Shims.scala b/shims/spark300/src/main/scala/com/nvidia/spark/rapids/shims/spark300/Spark300Shims.scala index 49bfe51a083..1bfe5f645a1 100644 --- a/shims/spark300/src/main/scala/com/nvidia/spark/rapids/shims/spark300/Spark300Shims.scala +++ b/shims/spark300/src/main/scala/com/nvidia/spark/rapids/shims/spark300/Spark300Shims.scala @@ -16,10 +16,12 @@ package com.nvidia.spark.rapids.shims.spark300 +import java.nio.ByteBuffer import java.time.ZoneId import com.nvidia.spark.rapids._ import com.nvidia.spark.rapids.spark300.RapidsShuffleManager +import org.apache.arrow.vector.ValueVector import org.apache.hadoop.fs.Path import org.apache.spark.SparkEnv @@ -450,6 +452,19 @@ class Spark300Shims extends SparkShims { InMemoryFileIndex.shouldFilterOut(path) } + // Arrow version changed between Spark versions + override def getArrowDataBuf(vec: ValueVector): ByteBuffer = { + vec.getDataBuffer().nioBuffer() + } + + override def getArrowValidityBuf(vec: ValueVector): ByteBuffer = { + vec.getValidityBuffer().nioBuffer() + } + + override def getArrowOffsetsBuf(vec: ValueVector): ByteBuffer = { + vec.getOffsetBuffer().nioBuffer() + } + override def replaceWithAlluxioPathIfNeeded( conf: RapidsConf, relation: HadoopFsRelation, diff --git a/shims/spark311/src/main/scala/com/nvidia/spark/rapids/shims/spark311/Spark311Shims.scala b/shims/spark311/src/main/scala/com/nvidia/spark/rapids/shims/spark311/Spark311Shims.scala index d18093f9850..d3b8613574c 100644 --- a/shims/spark311/src/main/scala/com/nvidia/spark/rapids/shims/spark311/Spark311Shims.scala +++ b/shims/spark311/src/main/scala/com/nvidia/spark/rapids/shims/spark311/Spark311Shims.scala @@ -16,9 +16,12 @@ package com.nvidia.spark.rapids.shims.spark311 +import java.nio.ByteBuffer + import com.nvidia.spark.rapids._ import com.nvidia.spark.rapids.shims.spark301.Spark301Shims import com.nvidia.spark.rapids.spark311.RapidsShuffleManager +import org.apache.arrow.vector.ValueVector import org.apache.spark.SparkEnv import org.apache.spark.sql.SparkSession @@ -387,4 +390,17 @@ class Spark311Shims extends Spark301Shims { override def shouldIgnorePath(path: String): Boolean = { HadoopFSUtilsShim.shouldIgnorePath(path) } + + // Arrow version changed between Spark versions + override def getArrowDataBuf(vec: ValueVector): ByteBuffer = { + vec.getDataBuffer.nioBuffer() + } + + override def getArrowValidityBuf(vec: ValueVector): ByteBuffer = { + vec.getValidityBuffer.nioBuffer() + } + + override def getArrowOffsetsBuf(vec: ValueVector): ByteBuffer = { + vec.getOffsetBuffer.nioBuffer() + } } diff --git a/sql-plugin/src/main/java/com/nvidia/spark/rapids/GpuColumnVector.java b/sql-plugin/src/main/java/com/nvidia/spark/rapids/GpuColumnVector.java index e843a7b668e..b181c9d6e30 100644 --- a/sql-plugin/src/main/java/com/nvidia/spark/rapids/GpuColumnVector.java +++ b/sql-plugin/src/main/java/com/nvidia/spark/rapids/GpuColumnVector.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019-2020, NVIDIA CORPORATION. + * Copyright (c) 2019-2021, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -18,6 +18,7 @@ import ai.rapids.cudf.ColumnView; import ai.rapids.cudf.DType; +import ai.rapids.cudf.ArrowColumnBuilder; import ai.rapids.cudf.HostColumnVector; import ai.rapids.cudf.Scalar; import ai.rapids.cudf.Schema; @@ -118,12 +119,43 @@ private static HostColumnVector.DataType convertFrom(DataType spark, boolean nul } } - public static final class GpuColumnarBatchBuilder implements AutoCloseable { - private final ai.rapids.cudf.HostColumnVector.ColumnBuilder[] builders; - private final StructField[] fields; + public static abstract class GpuColumnarBatchBuilderBase implements AutoCloseable { + protected StructField[] fields; + + public abstract void close(); + public abstract void copyColumnar(ColumnVector cv, int colNum, boolean nullable, int rows); + + protected abstract ColumnVector buildAndPutOnDevice(int builderIndex); + protected abstract int buildersLength(); + + public ColumnarBatch build(int rows) { + int buildersLen = buildersLength(); + ColumnVector[] vectors = new ColumnVector[buildersLen]; + boolean success = false; + try { + for (int i = 0; i < buildersLen; i++) { + vectors[i] = buildAndPutOnDevice(i); + } + ColumnarBatch ret = new ColumnarBatch(vectors, rows); + success = true; + return ret; + } finally { + if (!success) { + for (ColumnVector vec: vectors) { + if (vec != null) { + vec.close(); + } + } + } + } + } + } + + public static final class GpuArrowColumnarBatchBuilder extends GpuColumnarBatchBuilderBase { + private final ai.rapids.cudf.ArrowColumnBuilder[] builders; /** - * A collection of builders for building up columnar data. + * A collection of builders for building up columnar data from Arrow data. * @param schema the schema of the batch. * @param rows the maximum number of rows in this batch. * @param batch if this is going to copy a ColumnarBatch in a non GPU format that batch @@ -131,20 +163,21 @@ public static final class GpuColumnarBatchBuilder implements AutoCloseable { * of how big to allocate buffers that do not necessarily correspond to the * number of rows. */ - public GpuColumnarBatchBuilder(StructType schema, int rows, ColumnarBatch batch) { + public GpuArrowColumnarBatchBuilder(StructType schema, int rows, ColumnarBatch batch) { fields = schema.fields(); int len = fields.length; - builders = new ai.rapids.cudf.HostColumnVector.ColumnBuilder[len]; + builders = new ai.rapids.cudf.ArrowColumnBuilder[len]; boolean success = false; + try { for (int i = 0; i < len; i++) { StructField field = fields[i]; - builders[i] = new HostColumnVector.ColumnBuilder(convertFrom(field.dataType(), field.nullable()), rows); + builders[i] = new ArrowColumnBuilder(convertFrom(field.dataType(), field.nullable())); } success = true; } finally { if (!success) { - for (ai.rapids.cudf.HostColumnVector.ColumnBuilder b: builders) { + for (ai.rapids.cudf.ArrowColumnBuilder b: builders) { if (b != null) { b.close(); } @@ -153,33 +186,88 @@ public GpuColumnarBatchBuilder(StructType schema, int rows, ColumnarBatch batch) } } - public ai.rapids.cudf.HostColumnVector.ColumnBuilder builder(int i) { + protected int buildersLength() { + return builders.length; + } + + protected ColumnVector buildAndPutOnDevice(int builderIndex) { + ai.rapids.cudf.ColumnVector cv = builders[builderIndex].buildAndPutOnDevice(); + GpuColumnVector gcv = new GpuColumnVector(fields[builderIndex].dataType(), cv); + builders[builderIndex] = null; + return gcv; + } + + public void copyColumnar(ColumnVector cv, int colNum, boolean nullable, int rows) { + HostColumnarToGpu.arrowColumnarCopy(cv, builder(colNum), nullable, rows); + } + + public ai.rapids.cudf.ArrowColumnBuilder builder(int i) { return builders[i]; } - public ColumnarBatch build(int rows) { - ColumnVector[] vectors = new ColumnVector[builders.length]; + @Override + public void close() { + for (ai.rapids.cudf.ArrowColumnBuilder b: builders) { + if (b != null) { + b.close(); + } + } + } + } + + public static final class GpuColumnarBatchBuilder extends GpuColumnarBatchBuilderBase { + private final ai.rapids.cudf.HostColumnVector.ColumnBuilder[] builders; + + /** + * A collection of builders for building up columnar data. + * @param schema the schema of the batch. + * @param rows the maximum number of rows in this batch. + * @param batch if this is going to copy a ColumnarBatch in a non GPU format that batch + * we are going to copy. If not this may be null. This is used to get an idea + * of how big to allocate buffers that do not necessarily correspond to the + * number of rows. + */ + public GpuColumnarBatchBuilder(StructType schema, int rows, ColumnarBatch batch) { + fields = schema.fields(); + int len = fields.length; + builders = new ai.rapids.cudf.HostColumnVector.ColumnBuilder[len]; boolean success = false; try { - for (int i = 0; i < builders.length; i++) { - ai.rapids.cudf.ColumnVector cv = builders[i].buildAndPutOnDevice(); - vectors[i] = new GpuColumnVector(fields[i].dataType(), cv); - builders[i] = null; + for (int i = 0; i < len; i++) { + StructField field = fields[i]; + builders[i] = new HostColumnVector.ColumnBuilder(convertFrom(field.dataType(), field.nullable()), rows); } - ColumnarBatch ret = new ColumnarBatch(vectors, rows); success = true; - return ret; } finally { if (!success) { - for (ColumnVector vec: vectors) { - if (vec != null) { - vec.close(); + for (ai.rapids.cudf.HostColumnVector.ColumnBuilder b: builders) { + if (b != null) { + b.close(); } } } } } + public void copyColumnar(ColumnVector cv, int colNum, boolean nullable, int rows) { + HostColumnarToGpu.columnarCopy(cv, builder(colNum), nullable, rows); + } + + public ai.rapids.cudf.HostColumnVector.ColumnBuilder builder(int i) { + return builders[i]; + } + + protected int buildersLength() { + return builders.length; + } + + protected ColumnVector buildAndPutOnDevice(int builderIndex) { + ai.rapids.cudf.ColumnVector cv = builders[builderIndex].buildAndPutOnDevice(); + GpuColumnVector gcv = new GpuColumnVector(fields[builderIndex].dataType(), cv); + builders[builderIndex] = null; + return gcv; + } + public HostColumnVector[] buildHostColumns() { HostColumnVector[] vectors = new HostColumnVector[builders.length]; try { diff --git a/sql-plugin/src/main/java/org/apache/spark/sql/vectorized/rapids/AccessibleArrowColumnVector.java b/sql-plugin/src/main/java/org/apache/spark/sql/vectorized/rapids/AccessibleArrowColumnVector.java new file mode 100644 index 00000000000..514f11316af --- /dev/null +++ b/sql-plugin/src/main/java/org/apache/spark/sql/vectorized/rapids/AccessibleArrowColumnVector.java @@ -0,0 +1,507 @@ +/* + * Copyright (c) 2021, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.vectorized.rapids; + +import org.apache.arrow.vector.*; +import org.apache.arrow.vector.complex.*; +import org.apache.arrow.vector.holders.NullableVarCharHolder; + +import org.apache.spark.sql.util.ArrowUtils; +import org.apache.spark.sql.types.*; +import org.apache.spark.sql.vectorized.ColumnarArray; +import org.apache.spark.sql.vectorized.ColumnarMap; +import org.apache.spark.sql.vectorized.ColumnVector; +import org.apache.spark.unsafe.types.UTF8String; + +/** + * A column vector backed by Apache Arrow that adds API to get to the Arrow ValueVector. + * Currently calendar interval type and map type are not supported. + * Original code copied from Spark ArrowColumnVector. + */ +public final class AccessibleArrowColumnVector extends ColumnVector { + private final AccessibleArrowVectorAccessor accessor; + private AccessibleArrowColumnVector[] childColumns; + + public ValueVector getArrowValueVector() { + return accessor.vector; + } + + @Override + public boolean hasNull() { + return accessor.getNullCount() > 0; + } + + @Override + public int numNulls() { + return accessor.getNullCount(); + } + + @Override + public void close() { + if (childColumns != null) { + for (int i = 0; i < childColumns.length; i++) { + childColumns[i].close(); + childColumns[i] = null; + } + childColumns = null; + } + accessor.close(); + } + + @Override + public boolean isNullAt(int rowId) { + return accessor.isNullAt(rowId); + } + + @Override + public boolean getBoolean(int rowId) { + return accessor.getBoolean(rowId); + } + + @Override + public byte getByte(int rowId) { + return accessor.getByte(rowId); + } + + @Override + public short getShort(int rowId) { + return accessor.getShort(rowId); + } + + @Override + public int getInt(int rowId) { + return accessor.getInt(rowId); + } + + @Override + public long getLong(int rowId) { + return accessor.getLong(rowId); + } + + @Override + public float getFloat(int rowId) { + return accessor.getFloat(rowId); + } + + @Override + public double getDouble(int rowId) { + return accessor.getDouble(rowId); + } + + @Override + public Decimal getDecimal(int rowId, int precision, int scale) { + if (isNullAt(rowId)) return null; + return accessor.getDecimal(rowId, precision, scale); + } + + @Override + public UTF8String getUTF8String(int rowId) { + if (isNullAt(rowId)) return null; + return accessor.getUTF8String(rowId); + } + + @Override + public byte[] getBinary(int rowId) { + if (isNullAt(rowId)) return null; + return accessor.getBinary(rowId); + } + + @Override + public ColumnarArray getArray(int rowId) { + if (isNullAt(rowId)) return null; + return accessor.getArray(rowId); + } + + @Override + public ColumnarMap getMap(int rowId) { + if (isNullAt(rowId)) return null; + return accessor.getMap(rowId); + } + + @Override + public AccessibleArrowColumnVector getChild(int ordinal) { return childColumns[ordinal]; } + + public AccessibleArrowColumnVector(ValueVector vector) { + super(ArrowUtils.fromArrowField(vector.getField())); + + if (vector instanceof BitVector) { + accessor = new AccessibleBooleanAccessor((BitVector) vector); + } else if (vector instanceof TinyIntVector) { + accessor = new AccessibleByteAccessor((TinyIntVector) vector); + } else if (vector instanceof SmallIntVector) { + accessor = new AccessibleShortAccessor((SmallIntVector) vector); + } else if (vector instanceof IntVector) { + accessor = new AccessibleIntAccessor((IntVector) vector); + } else if (vector instanceof BigIntVector) { + accessor = new AccessibleLongAccessor((BigIntVector) vector); + } else if (vector instanceof Float4Vector) { + accessor = new AccessibleFloatAccessor((Float4Vector) vector); + } else if (vector instanceof Float8Vector) { + accessor = new AccessibleDoubleAccessor((Float8Vector) vector); + } else if (vector instanceof DecimalVector) { + accessor = new AccessibleDecimalAccessor((DecimalVector) vector); + } else if (vector instanceof VarCharVector) { + accessor = new AccessibleStringAccessor((VarCharVector) vector); + } else if (vector instanceof VarBinaryVector) { + accessor = new AccessibleBinaryAccessor((VarBinaryVector) vector); + } else if (vector instanceof DateDayVector) { + accessor = new AccessibleDateAccessor((DateDayVector) vector); + } else if (vector instanceof TimeStampMicroTZVector) { + accessor = new AccessibleTimestampAccessor((TimeStampMicroTZVector) vector); + } else if (vector instanceof MapVector) { + MapVector mapVector = (MapVector) vector; + accessor = new AccessibleMapAccessor(mapVector); + } else if (vector instanceof ListVector) { + ListVector listVector = (ListVector) vector; + accessor = new AccessibleArrayAccessor(listVector); + } else if (vector instanceof StructVector) { + StructVector structVector = (StructVector) vector; + accessor = new AccessibleStructAccessor(structVector); + + childColumns = new AccessibleArrowColumnVector[structVector.size()]; + for (int i = 0; i < childColumns.length; ++i) { + childColumns[i] = new AccessibleArrowColumnVector(structVector.getVectorById(i)); + } + } else { + throw new UnsupportedOperationException(); + } + } + + private abstract static class AccessibleArrowVectorAccessor { + private final ValueVector vector; + + AccessibleArrowVectorAccessor(ValueVector vector) { + this.vector = vector; + } + + // TODO: should be final after removing ArrayAccessor workaround + boolean isNullAt(int rowId) { + return vector.isNull(rowId); + } + + final int getNullCount() { + return vector.getNullCount(); + } + + final void close() { + vector.close(); + } + + boolean getBoolean(int rowId) { + throw new UnsupportedOperationException(); + } + + byte getByte(int rowId) { + throw new UnsupportedOperationException(); + } + + short getShort(int rowId) { + throw new UnsupportedOperationException(); + } + + int getInt(int rowId) { + throw new UnsupportedOperationException(); + } + + long getLong(int rowId) { + throw new UnsupportedOperationException(); + } + + float getFloat(int rowId) { + throw new UnsupportedOperationException(); + } + + double getDouble(int rowId) { + throw new UnsupportedOperationException(); + } + + Decimal getDecimal(int rowId, int precision, int scale) { + throw new UnsupportedOperationException(); + } + + UTF8String getUTF8String(int rowId) { + throw new UnsupportedOperationException(); + } + + byte[] getBinary(int rowId) { + throw new UnsupportedOperationException(); + } + + ColumnarArray getArray(int rowId) { + throw new UnsupportedOperationException(); + } + + ColumnarMap getMap(int rowId) { + throw new UnsupportedOperationException(); + } + } + + private static class AccessibleBooleanAccessor extends AccessibleArrowVectorAccessor { + + private final BitVector accessor; + + AccessibleBooleanAccessor(BitVector vector) { + super(vector); + this.accessor = vector; + } + + @Override + final boolean getBoolean(int rowId) { + return accessor.get(rowId) == 1; + } + } + + private static class AccessibleByteAccessor extends AccessibleArrowVectorAccessor { + + private final TinyIntVector accessor; + + AccessibleByteAccessor(TinyIntVector vector) { + super(vector); + this.accessor = vector; + } + + @Override + final byte getByte(int rowId) { + return accessor.get(rowId); + } + } + + private static class AccessibleShortAccessor extends AccessibleArrowVectorAccessor { + + private final SmallIntVector accessor; + + AccessibleShortAccessor(SmallIntVector vector) { + super(vector); + this.accessor = vector; + } + + @Override + final short getShort(int rowId) { + return accessor.get(rowId); + } + } + + private static class AccessibleIntAccessor extends AccessibleArrowVectorAccessor { + + private final IntVector accessor; + + AccessibleIntAccessor(IntVector vector) { + super(vector); + this.accessor = vector; + } + + @Override + final int getInt(int rowId) { + return accessor.get(rowId); + } + } + + private static class AccessibleLongAccessor extends AccessibleArrowVectorAccessor { + + private final BigIntVector accessor; + + AccessibleLongAccessor(BigIntVector vector) { + super(vector); + this.accessor = vector; + } + + @Override + final long getLong(int rowId) { + return accessor.get(rowId); + } + } + + private static class AccessibleFloatAccessor extends AccessibleArrowVectorAccessor { + + private final Float4Vector accessor; + + AccessibleFloatAccessor(Float4Vector vector) { + super(vector); + this.accessor = vector; + } + + @Override + final float getFloat(int rowId) { + return accessor.get(rowId); + } + } + + private static class AccessibleDoubleAccessor extends AccessibleArrowVectorAccessor { + + private final Float8Vector accessor; + + AccessibleDoubleAccessor(Float8Vector vector) { + super(vector); + this.accessor = vector; + } + + @Override + final double getDouble(int rowId) { + return accessor.get(rowId); + } + } + + private static class AccessibleDecimalAccessor extends AccessibleArrowVectorAccessor { + + private final DecimalVector accessor; + + AccessibleDecimalAccessor(DecimalVector vector) { + super(vector); + this.accessor = vector; + } + + @Override + final Decimal getDecimal(int rowId, int precision, int scale) { + if (isNullAt(rowId)) return null; + return Decimal.apply(accessor.getObject(rowId), precision, scale); + } + } + + private static class AccessibleStringAccessor extends AccessibleArrowVectorAccessor { + + private final VarCharVector accessor; + private final NullableVarCharHolder stringResult = new NullableVarCharHolder(); + + AccessibleStringAccessor(VarCharVector vector) { + super(vector); + this.accessor = vector; + } + + @Override + final UTF8String getUTF8String(int rowId) { + accessor.get(rowId, stringResult); + if (stringResult.isSet == 0) { + return null; + } else { + return UTF8String.fromAddress(null, + stringResult.buffer.memoryAddress() + stringResult.start, + stringResult.end - stringResult.start); + } + } + } + + private static class AccessibleBinaryAccessor extends AccessibleArrowVectorAccessor { + + private final VarBinaryVector accessor; + + AccessibleBinaryAccessor(VarBinaryVector vector) { + super(vector); + this.accessor = vector; + } + + @Override + final byte[] getBinary(int rowId) { + return accessor.getObject(rowId); + } + } + + private static class AccessibleDateAccessor extends AccessibleArrowVectorAccessor { + + private final DateDayVector accessor; + + AccessibleDateAccessor(DateDayVector vector) { + super(vector); + this.accessor = vector; + } + + @Override + final int getInt(int rowId) { + return accessor.get(rowId); + } + } + + private static class AccessibleTimestampAccessor extends AccessibleArrowVectorAccessor { + + private final TimeStampMicroTZVector accessor; + + AccessibleTimestampAccessor(TimeStampMicroTZVector vector) { + super(vector); + this.accessor = vector; + } + + @Override + final long getLong(int rowId) { + return accessor.get(rowId); + } + } + + private static class AccessibleArrayAccessor extends AccessibleArrowVectorAccessor { + + private final ListVector accessor; + private final AccessibleArrowColumnVector arrayData; + + AccessibleArrayAccessor(ListVector vector) { + super(vector); + this.accessor = vector; + this.arrayData = new AccessibleArrowColumnVector(vector.getDataVector()); + } + + @Override + final boolean isNullAt(int rowId) { + // TODO: Workaround if vector has all non-null values, see ARROW-1948 + if (accessor.getValueCount() > 0 && accessor.getValidityBuffer().capacity() == 0) { + return false; + } else { + return super.isNullAt(rowId); + } + } + + @Override + final ColumnarArray getArray(int rowId) { + int start = accessor.getElementStartIndex(rowId); + int end = accessor.getElementEndIndex(rowId); + return new ColumnarArray(arrayData, start, end - start); + } + } + + /** + * Any call to "get" method will throw UnsupportedOperationException. + * + * Access struct values in a AccessibleArrowColumnVector doesn't use this accessor. Instead, it uses + * getStruct() method defined in the parent class. Any call to "get" method in this class is a + * bug in the code. + * + */ + private static class AccessibleStructAccessor extends AccessibleArrowVectorAccessor { + + AccessibleStructAccessor(StructVector vector) { + super(vector); + } + } + + private static class AccessibleMapAccessor extends AccessibleArrowVectorAccessor { + private final MapVector accessor; + private final AccessibleArrowColumnVector keys; + private final AccessibleArrowColumnVector values; + + AccessibleMapAccessor(MapVector vector) { + super(vector); + this.accessor = vector; + StructVector entries = (StructVector) vector.getDataVector(); + this.keys = new AccessibleArrowColumnVector(entries.getChild(MapVector.KEY_NAME)); + this.values = new AccessibleArrowColumnVector(entries.getChild(MapVector.VALUE_NAME)); + } + + @Override + final ColumnarMap getMap(int rowId) { + int index = rowId * MapVector.OFFSET_WIDTH; + int offset = accessor.getOffsetBuffer().getInt(index); + int length = accessor.getInnerValueCountAt(rowId); + return new ColumnarMap(keys, values, offset, length); + } + } +} diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCoalesceBatches.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCoalesceBatches.scala index 2cebd4e6d1d..a242dbb41e6 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCoalesceBatches.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCoalesceBatches.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019-2020, NVIDIA CORPORATION. + * Copyright (c) 2019-2021, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -199,7 +199,7 @@ abstract class AbstractGpuCoalesceIterator( /** * Called first to initialize any state needed for a new batch to be created. */ - def initNewBatch(): Unit + def initNewBatch(batch: ColumnarBatch): Unit /** * Called to add a new batch to the final output batch. The batch passed in will @@ -331,7 +331,7 @@ abstract class AbstractGpuCoalesceIterator( private def addBatch(batch: ColumnarBatch): Unit = { if (!batchInitialized) { - initNewBatch() + initNewBatch(batch) batchInitialized = true } addBatchToConcat(batch) @@ -372,7 +372,7 @@ class GpuCoalesceIteratorNoSpill(iter: Iterator[ColumnarBatch], private[this] var codec: TableCompressionCodec = _ - override def initNewBatch(): Unit = { + override def initNewBatch(batch: ColumnarBatch): Unit = { batches.clear() compressedBatchIndices.clear() } @@ -485,7 +485,7 @@ class GpuCoalesceIterator(iter: Iterator[ColumnarBatch], private val batches: ArrayBuffer[SpillableColumnarBatch] = ArrayBuffer.empty private var maxDeviceMemory: Long = 0 - override def initNewBatch(): Unit = { + override def initNewBatch(batch: ColumnarBatch): Unit = { batches.safeClose() batches.clear() } diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/HostColumnarToGpu.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/HostColumnarToGpu.scala index 295655972f6..ba50bf6fe59 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/HostColumnarToGpu.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/HostColumnarToGpu.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019-2020, NVIDIA CORPORATION. + * Copyright (c) 2019-2021, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,16 +16,83 @@ package com.nvidia.spark.rapids +import java.nio.ByteBuffer + +import org.apache.arrow.vector.ValueVector + import org.apache.spark.TaskContext +import org.apache.spark.internal.Logging import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode} import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} import org.apache.spark.sql.types._ -import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector} +import org.apache.spark.sql.vectorized.{ArrowColumnVector, ColumnarBatch, ColumnVector} +import org.apache.spark.sql.vectorized.rapids.AccessibleArrowColumnVector + +object HostColumnarToGpu extends Logging { + + // use reflection to get access to a private field in a class + private def getClassFieldAccessible(className: String, fieldName: String) = { + val classObj = Class.forName(className) + val fields = classObj.getDeclaredFields.toList + val field = fields.filter( x => { + x.getName.contains(fieldName) + }).head + field.setAccessible(true) + field + } + + private lazy val accessorField = { + getClassFieldAccessible("org.apache.spark.sql.vectorized.ArrowColumnVector", "accessor") + } + + private lazy val vecField = { + getClassFieldAccessible("org.apache.spark.sql.vectorized.ArrowColumnVector$ArrowVectorAccessor", + "vector") + } + + // use reflection to get value vector from ArrowColumnVector + private def getArrowValueVector(cv: ColumnVector): ValueVector = { + val arrowCV = cv.asInstanceOf[ArrowColumnVector] + val accessor = accessorField.get(arrowCV) + vecField.get(accessor).asInstanceOf[ValueVector] + } + + def arrowColumnarCopy( + cv: ColumnVector, + ab: ai.rapids.cudf.ArrowColumnBuilder, + nullable: Boolean, + rows: Int): Unit = { + val valVector = cv match { + case v: ArrowColumnVector => + try { + getArrowValueVector(v) + } catch { + case e: Exception => + throw new IllegalStateException("Trying to read from a ArrowColumnVector but can't " + + "access its Arrow ValueVector", e) + } + case av: AccessibleArrowColumnVector => + av.getArrowValueVector() + case _ => + throw new IllegalStateException(s"Illegal column vector type: ${cv.getClass}") + } + val nullCount = valVector.getNullCount() + val dataBuf = ShimLoader.getSparkShims.getArrowDataBuf(valVector) + val validity = ShimLoader.getSparkShims.getArrowValidityBuf(valVector) + // this is a bit ugly, not all Arrow types have the offsets buffer + var offsets: ByteBuffer = null + try { + offsets = ShimLoader.getSparkShims.getArrowOffsetsBuf(valVector) + } catch { + case e: UnsupportedOperationException => + // swallow the exception and assume no offsets buffer + } + ab.addBatch(rows, nullCount, dataBuf, validity, offsets) + } -object HostColumnarToGpu { def columnarCopy(cv: ColumnVector, b: ai.rapids.cudf.HostColumnVector.ColumnBuilder, nullable: Boolean, rows: Int): Unit = { (cv.dataType(), nullable) match { @@ -159,7 +226,8 @@ class HostToGpuCoalesceIterator(iter: Iterator[ColumnarBatch], concatTime: SQLMetric, totalTime: SQLMetric, peakDevMemory: SQLMetric, - opName: String) + opName: String, + useArrowCopyOpt: Boolean) extends AbstractGpuCoalesceIterator(iter, goal, numInputRows, @@ -174,33 +242,53 @@ class HostToGpuCoalesceIterator(iter: Iterator[ColumnarBatch], // RequireSingleBatch goal is intentionally not supported in this iterator assert(goal != RequireSingleBatch) - var batchBuilder: GpuColumnVector.GpuColumnarBatchBuilder = _ + var batchBuilder: GpuColumnVector.GpuColumnarBatchBuilderBase = _ var totalRows = 0 var maxDeviceMemory: Long = 0 + // the arrow cudf converter only supports primitive types and strings + // decimals and nested types aren't supported yet + private def arrowTypesSupported(schema: StructType): Boolean = { + val dataTypes = schema.fields.map(_.dataType) + dataTypes.forall(GpuOverrides.isSupportedType(_)) + } + /** * Initialize the builders using an estimated row count based on the schema and the desired * batch size defined by [[RapidsConf.GPU_BATCH_SIZE_BYTES]]. */ - override def initNewBatch(): Unit = { + override def initNewBatch(batch: ColumnarBatch): Unit = { if (batchBuilder != null) { batchBuilder.close() batchBuilder = null } + // when reading host batches it is essential to read the data immediately and pass to a // builder and we need to determine how many rows to allocate in the builder based on the // schema and desired batch size batchRowLimit = GpuBatchUtils.estimateRowCount(goal.targetSizeBytes, GpuBatchUtils.estimateGpuMemory(schema, 512), 512) - batchBuilder = new GpuColumnVector.GpuColumnarBatchBuilder(schema, batchRowLimit, null) + + // if no columns then probably a count operation so doesn't matter which builder we use + // as we won't actually copy any data and we can't tell what type of data it is without + // having a column + if (useArrowCopyOpt && batch.numCols() > 0 && + arrowTypesSupported(schema) && + (batch.column(0).isInstanceOf[ArrowColumnVector] || + batch.column(0).isInstanceOf[AccessibleArrowColumnVector])) { + logDebug("Using GpuArrowColumnarBatchBuilder") + batchBuilder = new GpuColumnVector.GpuArrowColumnarBatchBuilder(schema, batchRowLimit, batch) + } else { + logDebug("Using GpuColumnarBatchBuilder") + batchBuilder = new GpuColumnVector.GpuColumnarBatchBuilder(schema, batchRowLimit, null) + } totalRows = 0 } override def addBatchToConcat(batch: ColumnarBatch): Unit = { val rows = batch.numRows() for (i <- 0 until batch.numCols()) { - HostColumnarToGpu.columnarCopy(batch.column(i), batchBuilder.builder(i), - schema.fields(i).nullable, rows) + batchBuilder.copyColumnar(batch.column(i), i, schema.fields(i).nullable, rows) } totalRows += rows } @@ -300,10 +388,12 @@ case class HostColumnarToGpu(child: SparkPlan, goal: CoalesceGoal) val outputSchema = schema val batches = child.executeColumnar() + + val confUseArrow = new RapidsConf(child.conf).useArrowCopyOptimization batches.mapPartitions { iter => new HostToGpuCoalesceIterator(iter, goal, outputSchema, numInputRows, numInputBatches, numOutputRows, numOutputBatches, collectTime, concatTime, - totalTime, peakDevMemory, "HostColumnarToGpu") + totalTime, peakDevMemory, "HostColumnarToGpu", confUseArrow) } } } diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala index 33b21060d47..20b8130039c 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala @@ -832,6 +832,14 @@ object RapidsConf { .booleanConf .createWithDefault(true) + val USE_ARROW_OPT = conf("spark.rapids.arrowCopyOptmizationEnabled") + .doc("Option to turn off using the optimized Arrow copy code when reading from " + + "ArrowColumnVector in HostColumnarToGpu. Left as internal as user shouldn't " + + "have to turn it off, but its convenient for testing.") + .internal() + .booleanConf + .createWithDefault(true) + private def printSectionHeader(category: String): Unit = println(s"\n### $category") @@ -1127,6 +1135,8 @@ class RapidsConf(conf: Map[String, String]) extends Logging { lazy val allowDisableEntirePlan: Boolean = get(ALLOW_DISABLE_ENTIRE_PLAN) + lazy val useArrowCopyOptimization: Boolean = get(USE_ARROW_OPT) + lazy val getCloudSchemes: Option[Seq[String]] = get(CLOUD_SCHEMES) lazy val getAlluxioPathsToReplace: Option[Seq[String]] = get(ALLUXIO_PATHS_REPLACE) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/SparkShims.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/SparkShims.scala index 716a932874f..839bde5903d 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/SparkShims.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/SparkShims.scala @@ -16,6 +16,9 @@ package com.nvidia.spark.rapids +import java.nio.ByteBuffer + +import org.apache.arrow.vector.ValueVector import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path @@ -169,6 +172,10 @@ trait SparkShims { def shouldIgnorePath(path: String): Boolean + def getArrowDataBuf(vec: ValueVector): ByteBuffer + def getArrowValidityBuf(vec: ValueVector): ByteBuffer + def getArrowOffsetsBuf(vec: ValueVector): ByteBuffer + def replaceWithAlluxioPathIfNeeded( conf: RapidsConf, relation: HadoopFsRelation, diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/GpuCoalesceBatchesSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/GpuCoalesceBatchesSuite.scala index 455c37ba8e3..e4d0a37086c 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/GpuCoalesceBatchesSuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/GpuCoalesceBatchesSuite.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020, NVIDIA CORPORATION. + * Copyright (c) 2020-2021, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -19,13 +19,21 @@ package com.nvidia.spark.rapids import java.io.File import java.nio.file.Files +import scala.collection.JavaConverters._ + import ai.rapids.cudf.{ContiguousTable, Cuda, HostColumnVector, Table} import com.nvidia.spark.rapids.format.CodecType +import org.apache.arrow.memory.RootAllocator +import org.apache.arrow.vector.IntVector +import org.apache.arrow.vector.complex.ListVector +import org.apache.arrow.vector.complex.MapVector +import org.apache.arrow.vector.types.{DateUnit, FloatingPointPrecision, TimeUnit} +import org.apache.arrow.vector.types.pojo.{ArrowType, Field, FieldType} import org.apache.spark.sql.execution.metric.SQLMetric import org.apache.spark.sql.rapids.metrics.source.MockTaskContext -import org.apache.spark.sql.types.{DataType, DataTypes, DecimalType, LongType, StructField, StructType} -import org.apache.spark.sql.vectorized.ColumnarBatch +import org.apache.spark.sql.types._ +import org.apache.spark.sql.vectorized.{ArrowColumnVector, ColumnarBatch} class GpuCoalesceBatchesSuite extends SparkQueryCompareTestSuite { @@ -94,6 +102,164 @@ class GpuCoalesceBatchesSuite extends SparkQueryCompareTestSuite { }, conf) } + // this was copied from Spark ArrowUtils + /** Maps data type from Spark to Arrow. NOTE: timeZoneId required for TimestampTypes */ + private def toArrowType(dt: DataType, timeZoneId: String): ArrowType = dt match { + case BooleanType => ArrowType.Bool.INSTANCE + case ByteType => new ArrowType.Int(8, true) + case ShortType => new ArrowType.Int(8 * 2, true) + case IntegerType => new ArrowType.Int(8 * 4, true) + case LongType => new ArrowType.Int(8 * 8, true) + case FloatType => new ArrowType.FloatingPoint(FloatingPointPrecision.SINGLE) + case DoubleType => new ArrowType.FloatingPoint(FloatingPointPrecision.DOUBLE) + case StringType => ArrowType.Utf8.INSTANCE + case BinaryType => ArrowType.Binary.INSTANCE + // case DecimalType.Fixed(precision, scale) => new ArrowType.Decimal(precision, scale) + case DateType => new ArrowType.Date(DateUnit.DAY) + case TimestampType => + if (timeZoneId == null) { + throw new UnsupportedOperationException( + s"${TimestampType.catalogString} must supply timeZoneId parameter") + } else { + new ArrowType.Timestamp(TimeUnit.MICROSECOND, timeZoneId) + } + case _ => + throw new UnsupportedOperationException(s"Unsupported data type: ${dt.catalogString}") + } + + // this was copied from Spark ArrowUtils + /** Maps field from Spark to Arrow. NOTE: timeZoneId required for TimestampType */ + private def toArrowField( + name: String, dt: DataType, nullable: Boolean, timeZoneId: String): Field = { + dt match { + case ArrayType(elementType, containsNull) => + val fieldType = new FieldType(nullable, ArrowType.List.INSTANCE, null) + new Field(name, fieldType, + Seq(toArrowField("element", elementType, containsNull, timeZoneId)).asJava) + case StructType(fields) => + val fieldType = new FieldType(nullable, ArrowType.Struct.INSTANCE, null) + new Field(name, fieldType, + fields.map { field => + toArrowField(field.name, field.dataType, field.nullable, timeZoneId) + }.toSeq.asJava) + case MapType(keyType, valueType, valueContainsNull) => + val mapType = new FieldType(nullable, new ArrowType.Map(false), null) + // Note: Map Type struct can not be null, Struct Type key field can not be null + new Field(name, mapType, + Seq(toArrowField(MapVector.DATA_VECTOR_NAME, + new StructType() + .add(MapVector.KEY_NAME, keyType, nullable = false) + .add(MapVector.VALUE_NAME, valueType, nullable = valueContainsNull), + nullable = false, + timeZoneId)).asJava) + case dataType => + val fieldType = new FieldType(nullable, toArrowType(dataType, timeZoneId), null) + new Field(name, fieldType, Seq.empty[Field].asJava) + } + } + + private def setupArrowBatch(withArrayType:Boolean = false): (ColumnarBatch, StructType) = { + val rootAllocator = new RootAllocator(Long.MaxValue) + val allocator = rootAllocator.newChildAllocator("int", 0, Long.MaxValue) + val vector1 = toArrowField("int1", IntegerType, nullable = true, null) + .createVector(allocator).asInstanceOf[IntVector] + vector1.allocateNew() + val vector2 = toArrowField("int2", IntegerType, nullable = true, null) + .createVector(allocator).asInstanceOf[IntVector] + vector2.allocateNew() + val vector3 = toArrowField("array", ArrayType(IntegerType), nullable = true, null) + .createVector(allocator).asInstanceOf[ListVector] + vector3.allocateNew() + val elementVector = vector3.getDataVector().asInstanceOf[IntVector] + + (0 until 10).foreach { i => + vector1.setSafe(i, i) + vector2.setSafe(i + 1, i) + elementVector.setSafe(i, i) + vector3.startNewValue(i) + elementVector.setSafe(0, 1) + elementVector.setSafe(1, 2) + vector3.endValue(i, 2) + } + elementVector.setValueCount(22) + vector3.setValueCount(11) + + vector1.setNull(10) + vector1.setValueCount(11) + vector2.setNull(0) + vector2.setValueCount(11) + + val baseVectors = Seq(new ArrowColumnVector(vector1), new ArrowColumnVector(vector2)) + val columnVectors = if (withArrayType) { + baseVectors :+ new ArrowColumnVector(vector3) + } else { + baseVectors + } + val schemaBase = Seq(StructField("int1", IntegerType), StructField("int2", IntegerType)) + val schemaSeq = if (withArrayType) { + schemaBase :+ StructField("array", ArrayType(IntegerType)) + } else { + schemaBase + } + val schema = StructType(schemaSeq) + (new ColumnarBatch(columnVectors.toArray), schema) + } + + test("test HostToGpuCoalesceIterator with arrow valid") { + val (batch, schema) = setupArrowBatch(false) + val iter = Iterator.single(batch) + + val hostToGpuCoalesceIterator = new HostToGpuCoalesceIterator(iter, + TargetSize(1024), + schema: StructType, + new SQLMetric("t1", 0), new SQLMetric("t2", 0), new SQLMetric("t3", 0), + new SQLMetric("t4", 0), new SQLMetric("t5", 0), new SQLMetric("t6", 0), + new SQLMetric("t7", 0), new SQLMetric("t8", 0), + "testcoalesce", + useArrowCopyOpt = true) + + hostToGpuCoalesceIterator.initNewBatch(batch) + assert(hostToGpuCoalesceIterator.batchBuilder. + isInstanceOf[GpuColumnVector.GpuArrowColumnarBatchBuilder]) + } + + test("test HostToGpuCoalesceIterator with arrow array") { + val (batch, schema) = setupArrowBatch(true) + val iter = Iterator.single(batch) + + val hostToGpuCoalesceIterator = new HostToGpuCoalesceIterator(iter, + TargetSize(1024), + schema: StructType, + new SQLMetric("t1", 0), new SQLMetric("t2", 0), new SQLMetric("t3", 0), + new SQLMetric("t4", 0), new SQLMetric("t5", 0), new SQLMetric("t6", 0), + new SQLMetric("t7", 0), new SQLMetric("t8", 0), + "testcoalesce", + useArrowCopyOpt = true) + + hostToGpuCoalesceIterator.initNewBatch(batch) + // array isn't supported should fall back + assert(hostToGpuCoalesceIterator.batchBuilder. + isInstanceOf[GpuColumnVector.GpuColumnarBatchBuilder]) + } + + test("test HostToGpuCoalesceIterator with arrow config off") { + val (batch, schema) = setupArrowBatch() + val iter = Iterator.single(batch) + + val hostToGpuCoalesceIterator = new HostToGpuCoalesceIterator(iter, + TargetSize(1024), + schema: StructType, + new SQLMetric("t1", 0), new SQLMetric("t2", 0), new SQLMetric("t3", 0), + new SQLMetric("t4", 0), new SQLMetric("t5", 0), new SQLMetric("t6", 0), + new SQLMetric("t7", 0), new SQLMetric("t8", 0), + "testcoalesce", + useArrowCopyOpt = false) + + hostToGpuCoalesceIterator.initNewBatch(batch) + assert(hostToGpuCoalesceIterator.batchBuilder. + isInstanceOf[GpuColumnVector.GpuColumnarBatchBuilder]) + } + test("coalesce HostColumnarToGpu") { val conf = makeBatchedBytes(1)