From c659089bc22a032d41dcd4e741adf56a5a82ea63 Mon Sep 17 00:00:00 2001 From: Cheng Pan <379377944@qq.com> Date: Thu, 25 Feb 2021 18:19:02 +0800 Subject: [PATCH] [KYUUBI #360] Correct handle getNextRowSet with FETCH_PRIOR FETCH_FIRST ![pan3793](https://badgen.net/badge/Hello/pan3793/green) [![Closes #370](https://badgen.net/badge/Preview/Closes%20%23370/blue)](https://github.com/yaooqinn/kyuubi/pull/370) ![332](https://badgen.net/badge/%2B/332/red) ![24](https://badgen.net/badge/-/24/green) ![8](https://badgen.net/badge/commits/8/yellow) ![Feature](https://badgen.net/badge/Label/Feature/) ![Bug](https://badgen.net/badge/Label/Bug/) [❨?❩](https://pullrequestbadge.com/?utm_medium=github&utm_source=yaooqinn&utm_campaign=badge_info) ### _Why are the changes needed?_ close #360 Ref: https://github.com/apache/spark/pull/30600 ### _How was this patch tested?_ - [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible - [ ] Add screenshots for manual tests if appropriate - [x] [Run test](https://kyuubi.readthedocs.io/en/latest/tools/testing.html#running-tests) locally before make a pull request Closes #370 from pan3793/KYUUBI-360. e79b8cb [Cheng Pan] [KYUUBI #360] comments 0fae3db [Cheng Pan] fix import 3d1b2a6 [Cheng Pan] [KYUUBI #360] fix ut eda3e59 [Cheng Pan] [KYUUBI #360] fix import 16178d6 [Cheng Pan] [KYUUBI #360] ut 179404d [Cheng Pan] [KYUUBI #360] nit 455af6b [Cheng Pan] [KYUUBI #360] correct getNextRowSet with FETCH_PRIOR FETCH_FIRST 2307f1f [Cheng Pan] [KYUUBI #360] move ThriftUtils to kyuubi-common Authored-by: Cheng Pan <379377944@qq.com> Signed-off-by: Kent Yao --- .../kyuubi/engine/spark/FetchIterator.scala | 110 ++++++++++++++ .../spark/operation/ExecuteStatement.scala | 4 +- .../engine/spark/operation/GetCatalogs.scala | 3 +- .../engine/spark/operation/GetColumns.scala | 6 +- .../engine/spark/operation/GetFunctions.scala | 3 +- .../engine/spark/operation/GetSchemas.scala | 3 +- .../spark/operation/GetTableTypes.scala | 3 +- .../engine/spark/operation/GetTables.scala | 3 +- .../engine/spark/operation/GetTypeInfo.scala | 5 +- .../spark/operation/SparkOperation.scala | 14 +- .../engine/spark/FetchIteratorSuite.scala | 134 ++++++++++++++++++ .../spark/operation/SparkOperationSuite.scala | 41 ++++++ .../main/scala/org/apache/kyuubi/Utils.scala | 2 +- .../org/apache/kyuubi/util}/ThriftUtils.scala | 8 +- .../kyuubi/operation/NoopOperation.scala | 5 +- .../operation/NoopOperationManager.scala | 3 +- .../kyuubi/operation/KyuubiOperation.scala | 3 +- .../operation/KyuubiOperationManager.scala | 3 +- .../kyuubi/session/KyuubiSessionImpl.scala | 3 +- 19 files changed, 332 insertions(+), 24 deletions(-) create mode 100644 externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/FetchIterator.scala create mode 100644 externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/FetchIteratorSuite.scala rename {kyuubi-main/src/main/scala/org/apache/kyuubi => kyuubi-common/src/main/scala/org/apache/kyuubi/util}/ThriftUtils.scala (84%) diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/FetchIterator.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/FetchIterator.scala new file mode 100644 index 00000000000..371766dbd26 --- /dev/null +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/FetchIterator.scala @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi.engine.spark + +/** + * Borrowed from Apache Spark, see SPARK-33655 + */ +private[engine] sealed trait FetchIterator[A] extends Iterator[A] { + /** + * Begin a fetch block, forward from the current position. + * Resets the fetch start offset. + */ + def fetchNext(): Unit + + /** + * Begin a fetch block, moving the iterator back by offset from the start of the previous fetch + * block start. + * Resets the fetch start offset. + * + * @param offset the amount to move a fetch start position toward the prior direction. + */ + def fetchPrior(offset: Long): Unit = fetchAbsolute(getFetchStart - offset) + + /** + * Begin a fetch block, moving the iterator to the given position. + * Resets the fetch start offset. + * + * @param pos index to move a position of iterator. + */ + def fetchAbsolute(pos: Long): Unit + + def getFetchStart: Long + + def getPosition: Long +} + +private[engine] class ArrayFetchIterator[A](src: Array[A]) extends FetchIterator[A] { + private var fetchStart: Long = 0 + + private var position: Long = 0 + + override def fetchNext(): Unit = fetchStart = position + + override def fetchAbsolute(pos: Long): Unit = { + position = (pos max 0) min src.length + fetchStart = position + } + + override def getFetchStart: Long = fetchStart + + override def getPosition: Long = position + + override def hasNext: Boolean = position < src.length + + override def next(): A = { + position += 1 + src(position.toInt - 1) + } +} + +private[engine] class IterableFetchIterator[A](iterable: Iterable[A]) extends FetchIterator[A] { + private var iter: Iterator[A] = iterable.iterator + + private var fetchStart: Long = 0 + + private var position: Long = 0 + + override def fetchNext(): Unit = fetchStart = position + + override def fetchAbsolute(pos: Long): Unit = { + val newPos = pos max 0 + if (newPos < position) resetPosition() + while (position < newPos && hasNext) next() + fetchStart = position + } + + override def getFetchStart: Long = fetchStart + + override def getPosition: Long = position + + override def hasNext: Boolean = iter.hasNext + + override def next(): A = { + position += 1 + iter.next() + } + + private def resetPosition(): Unit = { + if (position != 0) { + iter = iterable.iterator + position = 0 + fetchStart = 0 + } + } +} diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala index 08630e46d8e..4a076a425cb 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala @@ -24,7 +24,7 @@ import org.apache.spark.sql.functions._ import org.apache.spark.sql.types._ import org.apache.kyuubi.{KyuubiSQLException, Logging} -import org.apache.kyuubi.engine.spark.KyuubiSparkUtil +import org.apache.kyuubi.engine.spark.{ArrayFetchIterator, KyuubiSparkUtil} import org.apache.kyuubi.operation.{OperationState, OperationType} import org.apache.kyuubi.operation.log.OperationLog import org.apache.kyuubi.session.Session @@ -74,7 +74,7 @@ class ExecuteStatement( debug(s"original result queryExecution: ${result.queryExecution}") val castedResult = result.select(castCols: _*) debug(s"casted result queryExecution: ${castedResult.queryExecution}") - iter = castedResult.collect().toList.iterator + iter = new ArrayFetchIterator(castedResult.collect()) setState(OperationState.FINISHED) } catch { onError(cancel = true) diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/GetCatalogs.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/GetCatalogs.scala index 9d1414715f9..e5e2a66b67b 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/GetCatalogs.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/GetCatalogs.scala @@ -20,6 +20,7 @@ package org.apache.kyuubi.engine.spark.operation import org.apache.spark.sql.SparkSession import org.apache.spark.sql.types.StructType +import org.apache.kyuubi.engine.spark.IterableFetchIterator import org.apache.kyuubi.engine.spark.shim.SparkCatalogShim import org.apache.kyuubi.operation.OperationType import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant.TABLE_CAT @@ -35,7 +36,7 @@ class GetCatalogs(spark: SparkSession, session: Session) override protected def runInternal(): Unit = { try { - iter = SparkCatalogShim().getCatalogs(spark).toIterator + iter = new IterableFetchIterator(SparkCatalogShim().getCatalogs(spark).toList) } catch onError() } } diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/GetColumns.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/GetColumns.scala index 37cc2cae35d..ad33b27d86c 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/GetColumns.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/GetColumns.scala @@ -20,6 +20,7 @@ package org.apache.kyuubi.engine.spark.operation import org.apache.spark.sql.SparkSession import org.apache.spark.sql.types._ +import org.apache.kyuubi.engine.spark.IterableFetchIterator import org.apache.kyuubi.engine.spark.shim.SparkCatalogShim import org.apache.kyuubi.operation.OperationType import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant._ @@ -88,9 +89,8 @@ class GetColumns( val schemaPattern = toJavaRegex(schemaName) val tablePattern = toJavaRegex(tableName) val columnPattern = toJavaRegex(columnName) - iter = SparkCatalogShim() - .getColumns(spark, catalogName, schemaPattern, tablePattern, columnPattern) - .toList.iterator + iter = new IterableFetchIterator(SparkCatalogShim() + .getColumns(spark, catalogName, schemaPattern, tablePattern, columnPattern).toList) } catch { onError() } diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/GetFunctions.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/GetFunctions.scala index c821f41fca1..defb9cd47de 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/GetFunctions.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/GetFunctions.scala @@ -22,6 +22,7 @@ import java.sql.DatabaseMetaData import org.apache.spark.sql.{Row, SparkSession} import org.apache.spark.sql.types.StructType +import org.apache.kyuubi.engine.spark.IterableFetchIterator import org.apache.kyuubi.operation.OperationType import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant._ import org.apache.kyuubi.session.Session @@ -70,7 +71,7 @@ class GetFunctions( info.getClassName) } } - iter = a.toList.iterator + iter = new IterableFetchIterator(a.toList) } catch { onError() } diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/GetSchemas.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/GetSchemas.scala index 7b246796990..ce312f9492d 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/GetSchemas.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/GetSchemas.scala @@ -20,6 +20,7 @@ package org.apache.kyuubi.engine.spark.operation import org.apache.spark.sql.SparkSession import org.apache.spark.sql.types.StructType +import org.apache.kyuubi.engine.spark.IterableFetchIterator import org.apache.kyuubi.engine.spark.shim.SparkCatalogShim import org.apache.kyuubi.operation.OperationType import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant._ @@ -42,7 +43,7 @@ class GetSchemas(spark: SparkSession, session: Session, catalogName: String, sch try { val schemaPattern = toJavaRegex(schema) val rows = SparkCatalogShim().getSchemas(spark, catalogName, schemaPattern) - iter = rows.toList.toIterator + iter = new IterableFetchIterator(rows) } catch onError() } } diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/GetTableTypes.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/GetTableTypes.scala index 9170234aa64..50e8edf485b 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/GetTableTypes.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/GetTableTypes.scala @@ -20,6 +20,7 @@ package org.apache.kyuubi.engine.spark.operation import org.apache.spark.sql.{Row, SparkSession} import org.apache.spark.sql.types.StructType +import org.apache.kyuubi.engine.spark.IterableFetchIterator import org.apache.kyuubi.engine.spark.shim.SparkCatalogShim import org.apache.kyuubi.operation.OperationType import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant._ @@ -33,6 +34,6 @@ class GetTableTypes(spark: SparkSession, session: Session) } override protected def runInternal(): Unit = { - iter = SparkCatalogShim.sparkTableTypes.map(Row(_)).toList.iterator + iter = new IterableFetchIterator(SparkCatalogShim.sparkTableTypes.map(Row(_)).toList) } } diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/GetTables.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/GetTables.scala index 4d252f38a2c..80117d2f3ac 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/GetTables.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/GetTables.scala @@ -20,6 +20,7 @@ package org.apache.kyuubi.engine.spark.operation import org.apache.spark.sql.SparkSession import org.apache.spark.sql.types.StructType +import org.apache.kyuubi.engine.spark.IterableFetchIterator import org.apache.kyuubi.engine.spark.shim.SparkCatalogShim import org.apache.kyuubi.operation.OperationType import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant._ @@ -73,7 +74,7 @@ class GetTables( } else { catalogTablesAndViews } - iter = allTableAndViews.toList.iterator + iter = new IterableFetchIterator(allTableAndViews) } catch { onError() } diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/GetTypeInfo.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/GetTypeInfo.scala index 6cc8c7cd457..3b801895aca 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/GetTypeInfo.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/GetTypeInfo.scala @@ -22,6 +22,7 @@ import java.sql.Types._ import org.apache.spark.sql.{Row, SparkSession} import org.apache.spark.sql.types.StructType +import org.apache.kyuubi.engine.spark.IterableFetchIterator import org.apache.kyuubi.operation.OperationType import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant._ import org.apache.kyuubi.session.Session @@ -83,7 +84,7 @@ class GetTypeInfo(spark: SparkSession, session: Session) } override protected def runInternal(): Unit = { - iter = Seq( + iter = new IterableFetchIterator(Seq( toRow("VOID", NULL), toRow("BOOLEAN", BOOLEAN), toRow("TINYINT", TINYINT, 3), @@ -101,6 +102,6 @@ class GetTypeInfo(spark: SparkSession, session: Session) toRow("MAP", JAVA_OBJECT), toRow("STRUCT", STRUCT), toRow("INTERVAL", OTHER) - ).toList.iterator + )) } } diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkOperation.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkOperation.scala index dade5f8bf9b..ae9b0d09749 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkOperation.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkOperation.scala @@ -25,8 +25,9 @@ import org.apache.spark.sql.{Row, SparkSession} import org.apache.spark.sql.types.StructType import org.apache.kyuubi.KyuubiSQLException +import org.apache.kyuubi.engine.spark.FetchIterator import org.apache.kyuubi.operation.{AbstractOperation, OperationState} -import org.apache.kyuubi.operation.FetchOrientation.FetchOrientation +import org.apache.kyuubi.operation.FetchOrientation._ import org.apache.kyuubi.operation.OperationState.OperationState import org.apache.kyuubi.operation.OperationType.OperationType import org.apache.kyuubi.operation.log.OperationLog @@ -36,7 +37,7 @@ import org.apache.kyuubi.session.Session abstract class SparkOperation(spark: SparkSession, opType: OperationType, session: Session) extends AbstractOperation(opType, session) { - protected var iter: Iterator[Row] = _ + protected var iter: FetchIterator[Row] = _ protected final val operationLog: OperationLog = OperationLog.createOperationLog(session.handle, getHandle) @@ -130,8 +131,15 @@ abstract class SparkOperation(spark: SparkSession, opType: OperationType, sessio validateDefaultFetchOrientation(order) assertState(OperationState.FINISHED) setHasResultSet(true) + order match { + case FETCH_NEXT => iter.fetchNext() + case FETCH_PRIOR => iter.fetchPrior(rowSetSize); + case FETCH_FIRST => iter.fetchAbsolute(0); + } val taken = iter.take(rowSetSize) - RowSet.toTRowSet(taken.toList, resultSchema, getProtocolVersion) + val resultRowSet = RowSet.toTRowSet(taken.toList, resultSchema, getProtocolVersion) + resultRowSet.setStartRowOffset(iter.getPosition) + resultRowSet } override def shouldRunAsync: Boolean = false diff --git a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/FetchIteratorSuite.scala b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/FetchIteratorSuite.scala new file mode 100644 index 00000000000..e615898e815 --- /dev/null +++ b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/FetchIteratorSuite.scala @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi.engine.spark + +import org.apache.kyuubi.KyuubiFunSuite + +class FetchIteratorSuite extends KyuubiFunSuite { + + private def getRows(fetchIter: FetchIterator[Int], maxRowCount: Int): Seq[Int] = { + for (_ <- 0 until maxRowCount if fetchIter.hasNext) yield fetchIter.next() + } + + test("test fetchNext and fetchPrior") { + val testData = 0 until 10 + + def iteratorTest(fetchIter: FetchIterator[Int]): Unit = { + fetchIter.fetchNext() + assert(fetchIter.getFetchStart == 0) + assert(fetchIter.getPosition == 0) + assertResult(0 until 2)(getRows(fetchIter, 2)) + assert(fetchIter.getFetchStart == 0) + assert(fetchIter.getPosition == 2) + + fetchIter.fetchNext() + assert(fetchIter.getFetchStart == 2) + assert(fetchIter.getPosition == 2) + assertResult(2 until 3)(getRows(fetchIter, 1)) + assert(fetchIter.getFetchStart == 2) + assert(fetchIter.getPosition == 3) + + fetchIter.fetchPrior(2) + assert(fetchIter.getFetchStart == 0) + assert(fetchIter.getPosition == 0) + assertResult(0 until 3)(getRows(fetchIter, 3)) + assert(fetchIter.getFetchStart == 0) + assert(fetchIter.getPosition == 3) + + fetchIter.fetchNext() + assert(fetchIter.getFetchStart == 3) + assert(fetchIter.getPosition == 3) + assertResult(3 until 8)(getRows(fetchIter, 5)) + assert(fetchIter.getFetchStart == 3) + assert(fetchIter.getPosition == 8) + + fetchIter.fetchPrior(2) + assert(fetchIter.getFetchStart == 1) + assert(fetchIter.getPosition == 1) + assertResult(1 until 4)(getRows(fetchIter, 3)) + assert(fetchIter.getFetchStart == 1) + assert(fetchIter.getPosition == 4) + + fetchIter.fetchNext() + assert(fetchIter.getFetchStart == 4) + assert(fetchIter.getPosition == 4) + assertResult(4 until 10)(getRows(fetchIter, 10)) + assert(fetchIter.getFetchStart == 4) + assert(fetchIter.getPosition == 10) + + fetchIter.fetchNext() + assert(fetchIter.getFetchStart == 10) + assert(fetchIter.getPosition == 10) + assertResult(Seq.empty[Int])(getRows(fetchIter, 10)) + assert(fetchIter.getFetchStart == 10) + assert(fetchIter.getPosition == 10) + + fetchIter.fetchPrior(20) + assert(fetchIter.getFetchStart == 0) + assert(fetchIter.getPosition == 0) + assertResult(0 until 3)(getRows(fetchIter, 3)) + assert(fetchIter.getFetchStart == 0) + assert(fetchIter.getPosition == 3) + } + iteratorTest(new ArrayFetchIterator[Int](testData.toArray)) + iteratorTest(new IterableFetchIterator[Int](testData)) + } + + test("test fetchAbsolute") { + val testData = 0 until 10 + + def iteratorTest(fetchIter: FetchIterator[Int]): Unit = { + fetchIter.fetchNext() + assert(fetchIter.getFetchStart == 0) + assert(fetchIter.getPosition == 0) + assertResult(0 until 5)(getRows(fetchIter, 5)) + assert(fetchIter.getFetchStart == 0) + assert(fetchIter.getPosition == 5) + + fetchIter.fetchAbsolute(2) + assert(fetchIter.getFetchStart == 2) + assert(fetchIter.getPosition == 2) + assertResult(2 until 5)(getRows(fetchIter, 3)) + assert(fetchIter.getFetchStart == 2) + assert(fetchIter.getPosition == 5) + + fetchIter.fetchAbsolute(7) + assert(fetchIter.getFetchStart == 7) + assert(fetchIter.getPosition == 7) + assertResult(7 until 8)(getRows(fetchIter, 1)) + assert(fetchIter.getFetchStart == 7) + assert(fetchIter.getPosition == 8) + + fetchIter.fetchAbsolute(20) + assert(fetchIter.getFetchStart == 10) + assert(fetchIter.getPosition == 10) + assertResult(Seq.empty[Int])(getRows(fetchIter, 1)) + assert(fetchIter.getFetchStart == 10) + assert(fetchIter.getPosition == 10) + + fetchIter.fetchAbsolute(0) + assert(fetchIter.getFetchStart == 0) + assert(fetchIter.getPosition == 0) + assertResult(0 until 3)(getRows(fetchIter, 3)) + assert(fetchIter.getFetchStart == 0) + assert(fetchIter.getPosition == 3) + } + iteratorTest(new ArrayFetchIterator[Int](testData.toArray)) + iteratorTest(new IterableFetchIterator[Int](testData)) + } +} diff --git a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/operation/SparkOperationSuite.scala b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/operation/SparkOperationSuite.scala index 47b904dcc98..d3120054459 100644 --- a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/operation/SparkOperationSuite.scala +++ b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/operation/SparkOperationSuite.scala @@ -236,6 +236,47 @@ class SparkOperationSuite extends WithSparkSQLEngine with JDBCTests { } } + test("test fetch orientation") { + val sql = "SELECT id FROM range(2)" + + withSessionHandle { (client, handle) => + val req = new TExecuteStatementReq() + req.setSessionHandle(handle) + req.setStatement(sql) + val tExecuteStatementResp = client.ExecuteStatement(req) + val opHandle = tExecuteStatementResp.getOperationHandle + waitForOperationToComplete(client, opHandle) + + // fetch next from before first row + val tFetchResultsReq1 = new TFetchResultsReq(opHandle, TFetchOrientation.FETCH_NEXT, 1) + val tFetchResultsResp1 = client.FetchResults(tFetchResultsReq1) + assert(tFetchResultsResp1.getStatus.getStatusCode === TStatusCode.SUCCESS_STATUS) + val idSeq1 = tFetchResultsResp1.getResults.getColumns.get(0).getI64Val.getValues.asScala.toSeq + assertResult(Seq(0L))(idSeq1) + + // fetch next from first row + val tFetchResultsReq2 = new TFetchResultsReq(opHandle, TFetchOrientation.FETCH_NEXT, 1) + val tFetchResultsResp2 = client.FetchResults(tFetchResultsReq2) + assert(tFetchResultsResp2.getStatus.getStatusCode === TStatusCode.SUCCESS_STATUS) + val idSeq2 = tFetchResultsResp2.getResults.getColumns.get(0).getI64Val.getValues.asScala.toSeq + assertResult(Seq(1L))(idSeq2) + + // fetch prior from second row, expected got first row + val tFetchResultsReq3 = new TFetchResultsReq(opHandle, TFetchOrientation.FETCH_PRIOR, 1) + val tFetchResultsResp3 = client.FetchResults(tFetchResultsReq3) + assert(tFetchResultsResp3.getStatus.getStatusCode === TStatusCode.SUCCESS_STATUS) + val idSeq3 = tFetchResultsResp3.getResults.getColumns.get(0).getI64Val.getValues.asScala.toSeq + assertResult(Seq(0L))(idSeq3) + + // fetch first + val tFetchResultsReq4 = new TFetchResultsReq(opHandle, TFetchOrientation.FETCH_FIRST, 3) + val tFetchResultsResp4 = client.FetchResults(tFetchResultsReq4) + assert(tFetchResultsResp4.getStatus.getStatusCode === TStatusCode.SUCCESS_STATUS) + val idSeq4 = tFetchResultsResp4.getResults.getColumns.get(0).getI64Val.getValues.asScala.toSeq + assertResult(Seq(0L, 1L))(idSeq4) + } + } + test("Hive JDBC Database MetaData API Auditing") { withJdbcStatement() { statement => val metaData = statement.getConnection.getMetaData diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/Utils.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/Utils.scala index f2606c033a7..b93253d1a0b 100644 --- a/kyuubi-common/src/main/scala/org/apache/kyuubi/Utils.scala +++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/Utils.scala @@ -155,5 +155,5 @@ private[kyuubi] object Utils extends Logging { /** * Whether the underlying operating system is Windows. */ - val isWindows = SystemUtils.IS_OS_WINDOWS + val isWindows: Boolean = SystemUtils.IS_OS_WINDOWS } diff --git a/kyuubi-main/src/main/scala/org/apache/kyuubi/ThriftUtils.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/util/ThriftUtils.scala similarity index 84% rename from kyuubi-main/src/main/scala/org/apache/kyuubi/ThriftUtils.scala rename to kyuubi-common/src/main/scala/org/apache/kyuubi/util/ThriftUtils.scala index 738c0cc00da..bfe0bd64bc2 100644 --- a/kyuubi-main/src/main/scala/org/apache/kyuubi/ThriftUtils.scala +++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/util/ThriftUtils.scala @@ -15,10 +15,12 @@ * limitations under the License. */ -package org.apache.kyuubi +package org.apache.kyuubi.util import org.apache.hive.service.rpc.thrift.{TRow, TRowSet, TStatus, TStatusCode} +import org.apache.kyuubi.KyuubiSQLException + object ThriftUtils { def verifyTStatus(tStatus: TStatus): Unit = { @@ -27,6 +29,8 @@ object ThriftUtils { } } - val EMPTY_ROW_SET = new TRowSet(0, new java.util.ArrayList[TRow](0)) + def newEmptyRowSet: TRowSet = new TRowSet(0, new java.util.ArrayList[TRow](0)) + + val EMPTY_ROW_SET: TRowSet = newEmptyRowSet } diff --git a/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/NoopOperation.scala b/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/NoopOperation.scala index 9e249a56c97..a307712ba3f 100644 --- a/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/NoopOperation.scala +++ b/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/NoopOperation.scala @@ -21,13 +21,14 @@ import java.nio.ByteBuffer import scala.collection.JavaConverters._ -import org.apache.hive.service.rpc.thrift.{TColumn, TColumnDesc, TPrimitiveTypeEntry, TRow, TRowSet, TStringColumn, TTableSchema, TTypeDesc, TTypeEntry, TTypeId} +import org.apache.hive.service.rpc.thrift.{TColumn, TColumnDesc, TPrimitiveTypeEntry, TRowSet, TStringColumn, TTableSchema, TTypeDesc, TTypeEntry, TTypeId} import org.apache.kyuubi.KyuubiSQLException import org.apache.kyuubi.operation.FetchOrientation.FetchOrientation import org.apache.kyuubi.operation.OperationType.OperationType import org.apache.kyuubi.operation.log.OperationLog import org.apache.kyuubi.session.Session +import org.apache.kyuubi.util.ThriftUtils class NoopOperation(typ: OperationType, session: Session, shouldFail: Boolean = false) extends AbstractOperation(typ, session) { @@ -75,7 +76,7 @@ class NoopOperation(typ: OperationType, session: Session, shouldFail: Boolean = override def getNextRowSet(order: FetchOrientation, rowSetSize: Int): TRowSet = { val col = TColumn.stringVal(new TStringColumn(Seq(typ.toString).asJava, ByteBuffer.allocate(0))) - val tRowSet = new TRowSet(0, new java.util.ArrayList[TRow](0)) + val tRowSet = ThriftUtils.newEmptyRowSet tRowSet.addToColumns(col) tRowSet } diff --git a/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/NoopOperationManager.scala b/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/NoopOperationManager.scala index 26f92d2b2ce..33a35050f66 100644 --- a/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/NoopOperationManager.scala +++ b/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/NoopOperationManager.scala @@ -21,6 +21,7 @@ import org.apache.hive.service.rpc.thrift.{TRow, TRowSet} import org.apache.kyuubi.operation.FetchOrientation.FetchOrientation import org.apache.kyuubi.session.Session +import org.apache.kyuubi.util.ThriftUtils class NoopOperationManager extends OperationManager("noop") { private val invalid = "invalid" @@ -90,5 +91,5 @@ class NoopOperationManager extends OperationManager("noop") { override def getOperationLogRowSet( opHandle: OperationHandle, order: FetchOrientation, - maxRows: Int): TRowSet = new TRowSet(0, new java.util.ArrayList[TRow](0)) + maxRows: Int): TRowSet = ThriftUtils.EMPTY_ROW_SET } diff --git a/kyuubi-main/src/main/scala/org/apache/kyuubi/operation/KyuubiOperation.scala b/kyuubi-main/src/main/scala/org/apache/kyuubi/operation/KyuubiOperation.scala index 831f95b8105..475eef746ef 100644 --- a/kyuubi-main/src/main/scala/org/apache/kyuubi/operation/KyuubiOperation.scala +++ b/kyuubi-main/src/main/scala/org/apache/kyuubi/operation/KyuubiOperation.scala @@ -19,11 +19,12 @@ package org.apache.kyuubi.operation import org.apache.hive.service.rpc.thrift._ -import org.apache.kyuubi.{KyuubiSQLException, ThriftUtils} +import org.apache.kyuubi.KyuubiSQLException import org.apache.kyuubi.operation.FetchOrientation.FetchOrientation import org.apache.kyuubi.operation.OperationType.OperationType import org.apache.kyuubi.operation.log.OperationLog import org.apache.kyuubi.session.Session +import org.apache.kyuubi.util.ThriftUtils abstract class KyuubiOperation( opType: OperationType, diff --git a/kyuubi-main/src/main/scala/org/apache/kyuubi/operation/KyuubiOperationManager.scala b/kyuubi-main/src/main/scala/org/apache/kyuubi/operation/KyuubiOperationManager.scala index 181e30f54d6..729afbb7d81 100644 --- a/kyuubi-main/src/main/scala/org/apache/kyuubi/operation/KyuubiOperationManager.scala +++ b/kyuubi-main/src/main/scala/org/apache/kyuubi/operation/KyuubiOperationManager.scala @@ -21,9 +21,10 @@ import java.util.concurrent.ConcurrentHashMap import org.apache.hive.service.rpc.thrift.{TCLIService, TFetchResultsReq, TRow, TRowSet, TSessionHandle} -import org.apache.kyuubi.{KyuubiSQLException, ThriftUtils} +import org.apache.kyuubi.KyuubiSQLException import org.apache.kyuubi.operation.FetchOrientation.FetchOrientation import org.apache.kyuubi.session.{Session, SessionHandle} +import org.apache.kyuubi.util.ThriftUtils class KyuubiOperationManager private (name: String) extends OperationManager(name) { diff --git a/kyuubi-main/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala b/kyuubi-main/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala index ab3a6e99260..2d8855fbdf3 100644 --- a/kyuubi-main/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala +++ b/kyuubi-main/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala @@ -27,7 +27,7 @@ import org.apache.thrift.TException import org.apache.thrift.protocol.TBinaryProtocol import org.apache.thrift.transport.{TSocket, TTransport} -import org.apache.kyuubi.{KyuubiSQLException, ThriftUtils, Utils} +import org.apache.kyuubi.{KyuubiSQLException, Utils} import org.apache.kyuubi.config.KyuubiConf import org.apache.kyuubi.config.KyuubiConf._ import org.apache.kyuubi.engine.{ShareLevel, SQLEngineAppName} @@ -36,6 +36,7 @@ import org.apache.kyuubi.engine.spark.SparkProcessBuilder import org.apache.kyuubi.ha.HighAvailabilityConf._ import org.apache.kyuubi.ha.client.ServiceDiscovery._ import org.apache.kyuubi.service.authentication.PlainSASLHelper +import org.apache.kyuubi.util.ThriftUtils class KyuubiSessionImpl( protocol: TProtocolVersion,