From 4a4d31269bff4597ac4646c891756be738d76252 Mon Sep 17 00:00:00 2001 From: Kuhu Shukla Date: Wed, 18 Nov 2020 16:12:00 -0600 Subject: [PATCH] Change ColumnViewAccess usage to work with ColumnView (#1105) Signed-off-by: Kuhu Shukla --- .../nvidia/spark/rapids/GpuColumnVector.java | 22 +++++++++---------- .../rapids/RapidsHostColumnVectorCore.java | 14 +++++------- .../spark/rapids/GpuColumnarToRowExec.scala | 2 +- .../spark/rapids/GpuWindowExpression.scala | 4 ++-- .../spark/sql/rapids/stringFunctions.scala | 4 ++-- 5 files changed, 22 insertions(+), 24 deletions(-) diff --git a/sql-plugin/src/main/java/com/nvidia/spark/rapids/GpuColumnVector.java b/sql-plugin/src/main/java/com/nvidia/spark/rapids/GpuColumnVector.java index 683a5229905..99a573bfb1b 100644 --- a/sql-plugin/src/main/java/com/nvidia/spark/rapids/GpuColumnVector.java +++ b/sql-plugin/src/main/java/com/nvidia/spark/rapids/GpuColumnVector.java @@ -16,7 +16,7 @@ package com.nvidia.spark.rapids; -import ai.rapids.cudf.ColumnViewAccess; +import ai.rapids.cudf.ColumnView; import ai.rapids.cudf.DType; import ai.rapids.cudf.HostColumnVector; import ai.rapids.cudf.Scalar; @@ -305,8 +305,8 @@ public static ColumnarBatch from(Table table, DataType[] colTypes) { /** * This should only ever be called from an assertion. */ - private static boolean typeConversionAllowed(ColumnViewAccess cv, DataType colType) { - DType dt = cv.getDataType(); + private static boolean typeConversionAllowed(ColumnView cv, DataType colType) { + DType dt = cv.getType(); if (!dt.isNestedType()) { return getRapidsType(colType).equals(dt); } @@ -316,19 +316,19 @@ private static boolean typeConversionAllowed(ColumnViewAccess cv, DataTyp if (!(dt.equals(DType.LIST))) { return false; } - try (ColumnViewAccess structCv = cv.getChildColumnViewAccess(0)) { - if (!(structCv.getDataType().equals(DType.STRUCT))) { + try (ColumnView structCv = cv.getChildColumnView(0)) { + if (!(structCv.getType().equals(DType.STRUCT))) { return false; } if (structCv.getNumChildren() != 2) { return false; } - try (ColumnViewAccess keyCv = structCv.getChildColumnViewAccess(0)) { + try (ColumnView keyCv = structCv.getChildColumnView(0)) { if (!typeConversionAllowed(keyCv, mType.keyType())) { return false; } } - try (ColumnViewAccess valCv = structCv.getChildColumnViewAccess(1)) { + try (ColumnView valCv = structCv.getChildColumnView(1)) { return typeConversionAllowed(valCv, mType.valueType()); } } @@ -336,7 +336,7 @@ private static boolean typeConversionAllowed(ColumnViewAccess cv, DataTyp if (!(dt.equals(DType.LIST))) { return false; } - try (ColumnViewAccess tmp = cv.getChildColumnViewAccess(0)) { + try (ColumnView tmp = cv.getChildColumnView(0)) { return typeConversionAllowed(tmp, ((ArrayType) colType).elementType()); } } else if (colType instanceof StructType) { @@ -349,7 +349,7 @@ private static boolean typeConversionAllowed(ColumnViewAccess cv, DataTyp return false; } for (int childIndex = 0; childIndex < numChildren; childIndex++) { - try (ColumnViewAccess tmp = cv.getChildColumnViewAccess(childIndex)) { + try (ColumnView tmp = cv.getChildColumnView(childIndex)) { StructField entry = ((StructType) colType).apply(childIndex); if (!typeConversionAllowed(tmp, entry.dataType())) { return false; @@ -361,8 +361,8 @@ private static boolean typeConversionAllowed(ColumnViewAccess cv, DataTyp if (!(dt.equals(DType.LIST))) { return false; } - try (ColumnViewAccess tmp = cv.getChildColumnViewAccess(0)) { - DType tmpType = tmp.getDataType(); + try (ColumnView tmp = cv.getChildColumnView(0)) { + DType tmpType = tmp.getType(); return tmpType.equals(DType.INT8) || tmpType.equals(DType.UINT8); } } else { diff --git a/sql-plugin/src/main/java/com/nvidia/spark/rapids/RapidsHostColumnVectorCore.java b/sql-plugin/src/main/java/com/nvidia/spark/rapids/RapidsHostColumnVectorCore.java index f99ac209387..2471ea2ae20 100644 --- a/sql-plugin/src/main/java/com/nvidia/spark/rapids/RapidsHostColumnVectorCore.java +++ b/sql-plugin/src/main/java/com/nvidia/spark/rapids/RapidsHostColumnVectorCore.java @@ -17,9 +17,8 @@ package com.nvidia.spark.rapids; -import ai.rapids.cudf.ColumnViewAccess; import ai.rapids.cudf.HostColumnVectorCore; -import ai.rapids.cudf.HostMemoryBuffer; + import org.apache.spark.sql.types.ArrayType; import org.apache.spark.sql.types.DataType; import org.apache.spark.sql.types.Decimal; @@ -127,7 +126,7 @@ public final ColumnarArray getArray(int rowId) { if (cachedChildren[0] == null) { // cache the child data ArrayType at = (ArrayType) dataType(); - HostColumnVectorCore data = (HostColumnVectorCore) cudfCv.getChildColumnViewAccess(0); + HostColumnVectorCore data = cudfCv.getChildColumnView(0); cachedChildren[0] = new RapidsHostColumnVectorCore(at.elementType(), data); } RapidsHostColumnVectorCore data = cachedChildren[0]; @@ -141,12 +140,11 @@ public final ColumnarMap getMap(int ordinal) { if (cachedChildren[0] == null) { // Cache the key/value MapType mt = (MapType) dataType(); - ColumnViewAccess structHcv = cudfCv.getChildColumnViewAccess(0); + HostColumnVectorCore structHcv = cudfCv.getChildColumnView(0); // keys - HostColumnVectorCore firstHcvCore = (HostColumnVectorCore) structHcv.getChildColumnViewAccess(0); - + HostColumnVectorCore firstHcvCore = structHcv.getChildColumnView(0); // values - HostColumnVectorCore secondHcvCore = (HostColumnVectorCore) structHcv.getChildColumnViewAccess(1); + HostColumnVectorCore secondHcvCore = structHcv.getChildColumnView(1); cachedChildren[0] = new RapidsHostColumnVectorCore(mt.keyType(), firstHcvCore); cachedChildren[1] = new RapidsHostColumnVectorCore(mt.valueType(), secondHcvCore); @@ -180,7 +178,7 @@ public final ColumnVector getChild(int ordinal) { StructType st = (StructType) dataType(); StructField[] fields = st.fields(); for (int i = 0; i < fields.length; i++) { - HostColumnVectorCore tmp = (HostColumnVectorCore) cudfCv.getChildColumnViewAccess(i); + HostColumnVectorCore tmp = cudfCv.getChildColumnView(i); cachedChildren[i] = new RapidsHostColumnVectorCore(fields[i].dataType(), tmp); } } diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuColumnarToRowExec.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuColumnarToRowExec.scala index 1c6be8677bc..57455ce92a1 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuColumnarToRowExec.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuColumnarToRowExec.scala @@ -73,7 +73,7 @@ class AcceleratedColumnarToRowIterator( currentCv = Some(wip) at = 0 total = wip.getRowCount().toInt - val byteBuffer = currentCv.get.getChildColumnViewAccess(0).getDataBuffer + val byteBuffer = currentCv.get.getChildColumnView(0).getData baseDataAddress = byteBuffer.getAddress } diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExpression.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExpression.scala index 7aa64bf2009..2b6564902f9 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExpression.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExpression.scala @@ -241,7 +241,7 @@ case class GpuWindowExpression(windowFunction: Expression, windowSpec: GpuWindow } } val expectedType = GpuColumnVector.getRapidsType(windowFunc.dataType) - if (expectedType != aggColumn.getDataType) { + if (expectedType != aggColumn.getType) { withResource(aggColumn) { aggColumn => GpuColumnVector.from(aggColumn.castTo(expectedType), windowFunc.dataType) } @@ -272,7 +272,7 @@ case class GpuWindowExpression(windowFunction: Expression, windowSpec: GpuWindow } } val expectedType = GpuColumnVector.getRapidsType(windowFunc.dataType) - if (expectedType != aggColumn.getDataType) { + if (expectedType != aggColumn.getType) { withResource(aggColumn) { aggColumn => GpuColumnVector.from(aggColumn.castTo(expectedType), windowFunc.dataType) } diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/stringFunctions.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/stringFunctions.scala index fffc82cc306..9f1489a3a30 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/stringFunctions.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/stringFunctions.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.rapids import scala.collection.mutable.ArrayBuffer -import ai.rapids.cudf.{ColumnVector, DType, PadSide, Scalar, Table} +import ai.rapids.cudf.{ColumnVector, ColumnView, DType, PadSide, Scalar, Table} import com.nvidia.spark.rapids._ import com.nvidia.spark.rapids.RapidsPluginImplicits._ @@ -289,7 +289,7 @@ case class GpuConcat(children: Seq[Expression]) extends GpuComplexTypeMergingExp } emptyStrScalar = GpuScalar.from("", StringType) GpuColumnVector.from(ColumnVector.stringConcatenate(emptyStrScalar, nullStrScalar, - columns.toArray[ColumnVector]), dataType) + columns.toArray[ColumnView]), dataType) } finally { columns.safeClose() if (emptyStrScalar != null) {