Skip to content

Commit

Permalink
[SPARK-38337][CORE][SQL][DSTREAM][MLLIB] Replace toIterator with `i…
Browse files Browse the repository at this point in the history
…terator` for `IterableLike`/`IterableOnce` to cleanup deprecated api usage

### What changes were proposed in this pull request?
In Scala 2.12, `IterableLike.toIterator` identified as `deprecatedOverriding`:
```scala
deprecatedOverriding("toIterator should stay consistent with iterator for all Iterables: override iterator instead.", "2.11.0")
override def toIterator: Iterator[A] = iterator
```

In Scala 2.13, `IterableOnce.toIterator`  identified as `deprecated`:

```scala
  deprecated("Use .iterator instead of .toIterator", "2.13.0")
  `inline` final def toIterator: Iterator[A] = iterator
```

This PR replaces `toIterator` with `iterator` as recommended by scaladoc as above.

### Why are the changes needed?
Cleanup deprecated api usage

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Pass GA

Closes #35665 from LuciferYang/toIterator-is-deprecated.

Authored-by: yangjie01 <yangjie01@baidu.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
  • Loading branch information
LuciferYang authored and HyukjinKwon committed Feb 28, 2022
1 parent 0c74bff commit 309c65a
Show file tree
Hide file tree
Showing 39 changed files with 52 additions and 52 deletions.
4 changes: 2 additions & 2 deletions core/src/main/scala/org/apache/spark/MapOutputTracker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1177,7 +1177,7 @@ private[spark] class MapOutputTrackerMaster(
override def getMapSizesForMergeResult(
shuffleId: Int,
partitionId: Int): Iterator[(BlockManagerId, Seq[(BlockId, Long, Int)])] = {
Seq.empty.toIterator
Seq.empty.iterator
}

// This method is only called in local-mode. Since push based shuffle won't be
Expand All @@ -1186,7 +1186,7 @@ private[spark] class MapOutputTrackerMaster(
shuffleId: Int,
partitionId: Int,
chunkTracker: RoaringBitmap): Iterator[(BlockManagerId, Seq[(BlockId, Long, Int)])] = {
Seq.empty.toIterator
Seq.empty.iterator
}

// This method is only called in local-mode.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ private[spark] object PythonRDD extends Logging {
out.writeInt(1)

// Write the next object and signal end of data for this iteration
writeIteratorToStream(partitionArray.toIterator, out)
writeIteratorToStream(partitionArray.iterator, out)
out.writeInt(SpecialLengths.END_OF_DATA_SECTION)
out.flush()
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,13 +76,13 @@ private[ui] class MasterPage(parent: MasterWebUI) extends WebUIPage("") {

private def formatMasterResourcesInUse(aliveWorkers: Array[WorkerInfo]): String = {
val totalInfo = aliveWorkers.map(_.resourcesInfo)
.flatMap(_.toIterator)
.flatMap(_.iterator)
.groupBy(_._1) // group by resource name
.map { case (rName, rInfoArr) =>
rName -> rInfoArr.map(_._2.addresses.size).sum
}
val usedInfo = aliveWorkers.map(_.resourcesInfoUsed)
.flatMap(_.toIterator)
.flatMap(_.iterator)
.groupBy(_._1) // group by resource name
.map { case (rName, rInfoArr) =>
rName -> rInfoArr.map(_._2.addresses.size).sum
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ private[scheduler] class TaskSetExcludelist(
// over the limit, exclude this task from the entire host.
val execsWithFailuresOnNode = nodeToExecsWithFailures.getOrElseUpdate(host, new HashSet())
execsWithFailuresOnNode += exec
val failuresOnHost = execsWithFailuresOnNode.toIterator.flatMap { exec =>
val failuresOnHost = execsWithFailuresOnNode.iterator.flatMap { exec =>
execToFailures.get(exec).map { failures =>
// We count task attempts here, not the number of unique executors with failures. This is
// because jobs are aborted based on the number task attempts; if we counted unique
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1872,7 +1872,7 @@ private[spark] class BlockManager(
serializerManager.dataSerializeStream(
blockId,
out,
elements.toIterator)(info.classTag.asInstanceOf[ClassTag[T]])
elements.iterator)(info.classTag.asInstanceOf[ClassTag[T]])
}
case Right(bytes) =>
diskStore.putBytes(blockId, bytes)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@ private[spark] class MemoryStore(
val unrolledIterator = if (valuesHolder.vector != null) {
valuesHolder.vector.iterator
} else {
valuesHolder.arrayValues.toIterator
valuesHolder.arrayValues.iterator
}

Left(new PartiallyUnrolledIterator(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,5 +138,5 @@ class RDDOperationScopeSuite extends SparkFunSuite with BeforeAndAfter {

private class MyCoolRDD(sc: SparkContext) extends RDD[Int](sc, Nil) {
override def getPartitions: Array[Partition] = Array.empty
override def compute(p: Partition, context: TaskContext): Iterator[Int] = { Nil.toIterator }
override def compute(p: Partition, context: TaskContext): Iterator[Int] = { Nil.iterator }
}
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ class BlockStoreShuffleReaderSuite extends SparkFunSuite with LocalSparkContext
val shuffleBlockId = ShuffleBlockId(shuffleId, mapId, reduceId)
(shuffleBlockId, byteOutputStream.size().toLong, mapId)
}
Seq((localBlockManagerId, shuffleBlockIdsAndSizes)).toIterator
Seq((localBlockManagerId, shuffleBlockIdsAndSizes)).iterator
}

// Create a mocked shuffle handle to pass into HashShuffleReader.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ class SortShuffleWriterSuite
mapId = 2,
context,
shuffleExecutorComponents)
writer.write(records.toIterator)
writer.write(records.iterator)
writer.stop(success = true)
val dataFile = shuffleBlockResolver.getDataFile(shuffleId, 2)
val writeMetrics = context.taskMetrics().shuffleWriteMetrics
Expand Down Expand Up @@ -160,7 +160,7 @@ class SortShuffleWriterSuite
context,
new LocalDiskShuffleExecutorComponents(
conf, shuffleBlockResolver._blockManager, shuffleBlockResolver))
writer.write(records.toIterator)
writer.write(records.iterator)
val sorterMethod = PrivateMethod[ExternalSorter[_, _, _]](Symbol("sorter"))
val sorter = writer.invokePrivate(sorterMethod())
val expectSpillSize = if (doSpill) records.size else 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ class ShuffleBlockFetcherIteratorSuite extends SparkFunSuite with PrivateMethodT
transfer,
blockManager.getOrElse(createMockBlockManager()),
mapOutputTracker,
blocksByAddress.toIterator,
blocksByAddress.iterator,
(_, in) => streamWrapperLimitSize.map(new LimitedInputStream(in, _)).getOrElse(in),
maxBytesInFlight,
maxReqsInFlight,
Expand Down
2 changes: 1 addition & 1 deletion core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -464,7 +464,7 @@ class UtilsSuite extends SparkFunSuite with ResetSystemProperties with Logging {

test("get iterator size") {
val empty = Seq[Int]()
assert(Utils.getIteratorSize(empty.toIterator) === 0L)
assert(Utils.getIteratorSize(empty.iterator) === 0L)
val iterator = Iterator.range(0, 5)
assert(Utils.getIteratorSize(iterator) === 5L)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ private[image] class ImageFileFormat extends FileFormat with DataSourceRegister
}
val resultOpt = ImageSchema.decode(origin, bytes)
val filteredResult = if (imageSourceOptions.dropInvalid) {
resultOpt.toIterator
resultOpt.iterator
} else {
Iterator(resultOpt.getOrElse(ImageSchema.invalidImageRow(origin)))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ private[evaluation] object AreaUnderCurve {
* @param curve an iterator over ordered 2D points stored in pairs representing a curve
*/
def of(curve: Iterable[(Double, Double)]): Double = {
curve.toIterator.sliding(2).withPartial(false).aggregate(0.0)(
curve.iterator.sliding(2).withPartial(false).aggregate(0.0)(
seqop = (auc: Double, points: Seq[(Double, Double)]) => auc + trapezoid(points),
combop = _ + _
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ private[fpm] class LocalPrefixSpan(
count >= minCount
}.sorted
// project and recursively call genFreqPatterns
freqItems.toIterator.flatMap { case (item, count) =>
freqItems.iterator.flatMap { case (item, count) =>
val newPrefix = prefix :+ item
Iterator.single((newPrefix, count)) ++ {
val projected = postfixes.map(_.project(item)).filter(_.nonEmpty)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ object PrefixSpan extends Logging {
data.flatMap { itemsets =>
val uniqItems = mutable.Set.empty[Item]
itemsets.foreach(set => uniqItems ++= set)
uniqItems.toIterator.map((_, 1L))
uniqItems.iterator.map((_, 1L))
}.reduceByKey(_ + _).filter { case (_, count) =>
count >= minCount
}.sortBy(-_._2).map(_._1).collect()
Expand Down Expand Up @@ -478,7 +478,7 @@ object PrefixSpan extends Logging {
}
i += 1
}
prefixes.toIterator
prefixes.iterator
}

/** Tests whether this postfix is non-empty. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -591,7 +591,7 @@ case class GetViewColumnByNameAndOrdinal(
override def dataType: DataType = throw new UnresolvedException("dataType")
override def nullable: Boolean = throw new UnresolvedException("nullable")
override lazy val resolved = false
override def stringArgs: Iterator[Any] = super.stringArgs.toSeq.dropRight(1).toIterator
override def stringArgs: Iterator[Any] = super.stringArgs.toSeq.dropRight(1).iterator
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -828,7 +828,7 @@ case class MapObjects private(

private def executeFuncOnCollection(inputCollection: Seq[_]): Iterator[_] = {
val row = new GenericInternalRow(1)
inputCollection.toIterator.map { element =>
inputCollection.iterator.map { element =>
row.update(0, element)
lambdaFunction.eval(row)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ private[sql] class JsonInferSchema(options: JSONOptions) extends Serializable {
wrappedCharException.initCause(e)
handleJsonErrorsByParseMode(parseMode, columnNameOfCorruptRecord, wrappedCharException)
}
}.reduceOption(typeMerger).toIterator
}.reduceOption(typeMerger).iterator
}

// Here we manually submit a fold-like Spark job, so that we can set the SQLConf when running
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ object ObjectSerializerPruning extends Rule[LogicalPlan] {
serializer: NamedExpression,
prunedDataType: DataType): NamedExpression = {
val prunedStructTypes = collectStructType(prunedDataType, ArrayBuffer.empty[StructType])
.toIterator
.iterator

def transformer: PartialFunction[Expression, Expression] = {
case m: ExternalMapToCatalyst =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ class FailureSafeParser[IN](

def parse(input: IN): Iterator[InternalRow] = {
try {
rawParser.apply(input).toIterator.map(row => toResultRow(Some(row), () => null))
rawParser.apply(input).iterator.map(row => toResultRow(Some(row), () => null))
} catch {
case e: BadRecordException => mode match {
case PermissiveMode =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ class StringKeyHashMap[T](normalizer: (String) => String) {

def remove(key: String): Option[T] = base.remove(normalizer(key))

def iterator: Iterator[(String, T)] = base.toIterator
def iterator: Iterator[(String, T)] = base.iterator

def clear(): Unit = base.clear()
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ object StringUtils extends Logging {
* @return the equivalent Java regular expression of the pattern
*/
def escapeLikeRegex(pattern: String, escapeChar: Char): String = {
val in = pattern.toIterator
val in = pattern.iterator
val out = new StringBuilder()

def fail(message: String) = throw QueryCompilationErrors.invalidPatternError(pattern, message)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ trait SQLKeywordUtils extends SparkFunSuite with SQLHelper {
val default = (_: String) => Nil
var startTagFound = false
var parseFinished = false
val lineIter = sqlSyntaxDefs.toIterator
val lineIter = sqlSyntaxDefs.iterator
while (!parseFinished && lineIter.hasNext) {
val line = lineIter.next()
if (line.trim.startsWith(startTag)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ class GenerateUnsafeRowJoinerSuite extends SparkFunSuite {
if (actualFixedLength !== expectedFixedLength) {
actualFixedLength.grouped(8)
.zip(expectedFixedLength.grouped(8))
.zip(mergedSchema.fields.toIterator)
.zip(mergedSchema.fields.iterator)
.foreach {
case ((actual, expected), field) =>
assert(actual === expected, s"Fixed length sections are not equal for field $field")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ case class ExecutedCommandExec(cmd: RunnableCommand) extends LeafExecNode {

override def executeCollect(): Array[InternalRow] = sideEffectResult.toArray

override def executeToIterator(): Iterator[InternalRow] = sideEffectResult.toIterator
override def executeToIterator(): Iterator[InternalRow] = sideEffectResult.iterator

override def executeTake(limit: Int): Array[InternalRow] = sideEffectResult.take(limit).toArray

Expand Down Expand Up @@ -124,7 +124,7 @@ case class DataWritingCommandExec(cmd: DataWritingCommand, child: SparkPlan)

override def executeCollect(): Array[InternalRow] = sideEffectResult.toArray

override def executeToIterator(): Iterator[InternalRow] = sideEffectResult.toIterator
override def executeToIterator(): Iterator[InternalRow] = sideEffectResult.iterator

override def executeTake(limit: Int): Array[InternalRow] = sideEffectResult.take(limit).toArray

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -469,7 +469,7 @@ case class AlterTableAddPartitionCommand(
// Also the request to metastore times out when adding lot of partitions in one shot.
// we should split them into smaller batches
val batchSize = conf.getConf(SQLConf.ADD_PARTITION_BATCH_SIZE)
parts.toIterator.grouped(batchSize).foreach { batch =>
parts.iterator.grouped(batchSize).foreach { batch =>
catalog.createPartitions(table.identifier, batch, ignoreIfExists = ifNotExists)
}

Expand Down Expand Up @@ -772,7 +772,7 @@ case class RepairTableCommand(
// we should split them into smaller batches. Since Hive client is not thread safe, we cannot
// do this in parallel.
val batchSize = spark.conf.get(SQLConf.ADD_PARTITION_BATCH_SIZE)
partitionSpecsAndLocs.toIterator.grouped(batchSize).foreach { batch =>
partitionSpecsAndLocs.iterator.grouped(batchSize).foreach { batch =>
val now = MILLISECONDS.toSeconds(System.currentTimeMillis())
val parts = batch.map { case (spec, location) =>
val params = partitionStats.get(location.toString).map {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ class FileScanRDD(
inputMetrics.setBytesRead(existingBytesRead + getBytesReadCallback())
}

private[this] val files = split.asInstanceOf[FilePartition].files.toIterator
private[this] val files = split.asInstanceOf[FilePartition].files.iterator
private[this] var currentFile: PartitionedFile = null
private[this] var currentIterator: Iterator[Object] = null

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ object OrcUtils extends Logging {
: Option[StructType] = {
val ignoreCorruptFiles = sparkSession.sessionState.conf.ignoreCorruptFiles
val conf = sparkSession.sessionState.newHadoopConfWithOptions(options)
files.toIterator.map(file => readSchema(file.getPath, conf, ignoreCorruptFiles)).collectFirst {
files.iterator.map(file => readSchema(file.getPath, conf, ignoreCorruptFiles)).collectFirst {
case Some(schema) =>
logDebug(s"Reading schema from file $files, got Hive schema string: $schema")
toCatalystSchema(schema)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ abstract class FilePartitionReaderFactory extends PartitionReaderFactory {
override def createReader(partition: InputPartition): PartitionReader[InternalRow] = {
assert(partition.isInstanceOf[FilePartition])
val filePartition = partition.asInstanceOf[FilePartition]
val iter = filePartition.files.toIterator.map { file =>
val iter = filePartition.files.iterator.map { file =>
PartitionedFileReader(file, buildReader(file))
}
new FilePartitionReader[InternalRow](iter)
Expand All @@ -35,7 +35,7 @@ abstract class FilePartitionReaderFactory extends PartitionReaderFactory {
override def createColumnarReader(partition: InputPartition): PartitionReader[ColumnarBatch] = {
assert(partition.isInstanceOf[FilePartition])
val filePartition = partition.asInstanceOf[FilePartition]
val iter = filePartition.files.toIterator.map { file =>
val iter = filePartition.files.iterator.map { file =>
PartitionedFileReader(file, buildColumnarReader(file))
}
new FilePartitionReader[ColumnarBatch](iter)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ abstract class V2CommandExec extends SparkPlan {
*/
override def executeCollect(): Array[InternalRow] = result.toArray

override def executeToIterator(): Iterator[InternalRow] = result.toIterator
override def executeToIterator(): Iterator[InternalRow] = result.iterator

override def executeTake(limit: Int): Array[InternalRow] = result.take(limit).toArray

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,7 @@ case class BroadcastNestedLoopJoinExec(
i += 1
}
}
Seq(matched).toIterator
Seq(matched).iterator
}

matchedBuildRows.fold(new BitSet(relation.value.length))(_ | _)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1377,7 +1377,7 @@ class ArrowConvertersSuite extends SharedSparkSession {
val schema = StructType(Seq(StructField("int", IntegerType, nullable = true)))

val ctx = TaskContext.empty()
val batchIter = ArrowConverters.toBatchIterator(inputRows.toIterator, schema, 5, null, ctx)
val batchIter = ArrowConverters.toBatchIterator(inputRows.iterator, schema, 5, null, ctx)
val outputRowIter = ArrowConverters.fromBatchIterator(batchIter, schema, null, ctx)

var count = 0
Expand All @@ -1398,7 +1398,7 @@ class ArrowConvertersSuite extends SharedSparkSession {

val schema = StructType(Seq(StructField("int", IntegerType, nullable = true)))
val ctx = TaskContext.empty()
val batchIter = ArrowConverters.toBatchIterator(inputRows.toIterator, schema, 5, null, ctx)
val batchIter = ArrowConverters.toBatchIterator(inputRows.iterator, schema, 5, null, ctx)

// Write batches to Arrow stream format as a byte array
val out = new ByteArrayOutputStream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -488,7 +488,7 @@ class FileIndexSuite extends SharedSparkSession {
new Path("file")), Array(new BlockLocation()))
)
when(dfs.listLocatedStatus(path)).thenReturn(new RemoteIterator[LocatedFileStatus] {
val iter = statuses.toIterator
val iter = statuses.iterator
override def hasNext: Boolean = iter.hasNext
override def next(): LocatedFileStatus = iter.next
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -629,7 +629,7 @@ class HashedRelationSuite extends SharedSparkSession {

test("EmptyHashedRelation override methods behavior test") {
val buildKey = Seq(BoundReference(0, LongType, false))
val hashed = HashedRelation(Seq.empty[InternalRow].toIterator, buildKey, 1, mm)
val hashed = HashedRelation(Seq.empty[InternalRow].iterator, buildKey, 1, mm)
assert(hashed == EmptyHashedRelation)

val key = InternalRow(1L)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ private[hive] object OrcFileOperator extends Logging {
: Option[StructType] = {
// Take the first file where we can open a valid reader if we can find one. Otherwise just
// return None to indicate we can't infer the schema.
paths.toIterator.map(getFileReader(_, conf, ignoreCorruptFiles)).collectFirst {
paths.iterator.map(getFileReader(_, conf, ignoreCorruptFiles)).collectFirst {
case Some(reader) =>
val readerInspector = reader.getObjectInspector.asInstanceOf[StructObjectInspector]
val schema = readerInspector.getTypeName
Expand Down
Loading

0 comments on commit 309c65a

Please sign in to comment.