Skip to content

Commit

Permalink
[SPARK-17359][SQL][MLLIB] Use ArrayBuffer.+=(A) instead of ArrayBuffe…
Browse files Browse the repository at this point in the history
…r.append(A) in performance critical paths

## What changes were proposed in this pull request?

We should generally use `ArrayBuffer.+=(A)` rather than `ArrayBuffer.append(A)`, because `append(A)` would involve extra boxing / unboxing.

## How was this patch tested?

N/A

Author: Liwei Lin <lwlin7@gmail.com>

Closes apache#14914 from lw-lin/append_to_plus_eq_v2.
  • Loading branch information
lw-lin authored and srowen committed Sep 7, 2016
1 parent 9fccde4 commit 3ce3a28
Show file tree
Hide file tree
Showing 25 changed files with 60 additions and 61 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -467,7 +467,7 @@ private[spark] object PythonRDD extends Logging {
val length = file.readInt()
val obj = new Array[Byte](length)
file.readFully(obj)
objs.append(obj)
objs += obj
}
} catch {
case eof: EOFException => // No-op
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/org/apache/spark/ui/WebUI.scala
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,8 @@ private[spark] abstract class WebUI(
(request: HttpServletRequest) => page.renderJson(request), securityManager, conf, basePath)
attachHandler(renderHandler)
attachHandler(renderJsonHandler)
pageToHandlers.getOrElseUpdate(page, ArrayBuffer[ServletContextHandler]())
.append(renderHandler)
val handlers = pageToHandlers.getOrElseUpdate(page, ArrayBuffer[ServletContextHandler]())
handlers += renderHandler
}

/** Attach a handler to this UI. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ class ExternalAppendOnlyMap[K, V, C](
override protected[this] def spill(collection: SizeTracker): Unit = {
val inMemoryIterator = currentMap.destructiveSortedIterator(keyComparator)
val diskMapIterator = spillMemoryIteratorToDisk(inMemoryIterator)
spilledMaps.append(diskMapIterator)
spilledMaps += diskMapIterator
}

/**
Expand Down Expand Up @@ -215,7 +215,7 @@ class ExternalAppendOnlyMap[K, V, C](
// Flush the disk writer's contents to disk, and update relevant variables
def flush(): Unit = {
val segment = writer.commitAndGet()
batchSizes.append(segment.length)
batchSizes += segment.length
_diskBytesSpilled += segment.length
objectsWritten = 0
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ private[spark] class ExternalSorter[K, V, C](
override protected[this] def spill(collection: WritablePartitionedPairCollection[K, C]): Unit = {
val inMemoryIterator = collection.destructiveSortedWritablePartitionedIterator(comparator)
val spillFile = spillMemoryIteratorToDisk(inMemoryIterator)
spills.append(spillFile)
spills += spillFile
}

/**
Expand Down Expand Up @@ -285,7 +285,7 @@ private[spark] class ExternalSorter[K, V, C](
// The writer is committed at the end of this process.
def flush(): Unit = {
val segment = writer.commitAndGet()
batchSizes.append(segment.length)
batchSizes += segment.length
_diskBytesSpilled += segment.length
objectsWritten = 0
}
Expand Down Expand Up @@ -796,7 +796,7 @@ private[spark] class ExternalSorter[K, V, C](
logInfo(s"Task ${context.taskAttemptId} force spilling in-memory map to disk and " +
s" it will release ${org.apache.spark.util.Utils.bytesToString(getUsed())} memory")
val spillFile = spillMemoryIteratorToDisk(inMemoryIterator)
forceSpillFiles.append(spillFile)
forceSpillFiles += spillFile
val spillReader = new SpillReader(spillFile)
nextUpstream = (0 until numPartitions).iterator.flatMap { p =>
val iterator = spillReader.readNextPartition()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@ private[deploy] object IvyTestUtils {
val allFiles = ArrayBuffer[(String, File)](javaFile)
if (withPython) {
val pythonFile = createPythonFile(root)
allFiles.append((pythonFile.getName, pythonFile))
allFiles += Tuple2(pythonFile.getName, pythonFile)
}
if (withR) {
val rFiles = createRFiles(root, className, artifact.groupId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,7 @@ private[memory] trait MemoryManagerSuite extends SparkFunSuite with BeforeAndAft
if (numBytesToFree <= mm.storageMemoryUsed) {
// We can evict enough blocks to fulfill the request for space
mm.releaseStorageMemory(numBytesToFree, MemoryMode.ON_HEAP)
evictedBlocks.append(
(null, BlockStatus(StorageLevel.MEMORY_ONLY, numBytesToFree, 0L)))
evictedBlocks += Tuple2(null, BlockStatus(StorageLevel.MEMORY_ONLY, numBytesToFree, 0L))
numBytesToFree
} else {
// No blocks were evicted because eviction would not free enough space.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ class BypassMergeSortShuffleWriterSuite extends SparkFunSuite with BeforeAndAfte
val blockId = new TempShuffleBlockId(UUID.randomUUID)
val file = new File(tempDir, blockId.name)
blockIdToFileMap.put(blockId, file)
temporaryFilesCreated.append(file)
temporaryFilesCreated += file
(blockId, file)
}
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,12 +150,12 @@ class SizeEstimatorSuite

val buf = new ArrayBuffer[DummyString]()
for (i <- 0 until 5000) {
buf.append(new DummyString(new Array[Char](10)))
buf += new DummyString(new Array[Char](10))
}
assertResult(340016)(SizeEstimator.estimate(buf.toArray))

for (i <- 0 until 5000) {
buf.append(new DummyString(arr))
buf += new DummyString(arr)
}
assertResult(683912)(SizeEstimator.estimate(buf.toArray))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ class KafkaCluster(val kafkaParams: Map[String, String]) extends Serializable {
} else {
val missing = topicAndPartitions.diff(leaderMap.keySet)
val err = new Err
err.append(new SparkException(s"Couldn't find leaders for ${missing}"))
err += new SparkException(s"Couldn't find leaders for ${missing}")
Left(err)
}
}
Expand Down Expand Up @@ -139,7 +139,7 @@ class KafkaCluster(val kafkaParams: Map[String, String]) extends Serializable {
respErrs.foreach { m =>
val cause = ErrorMapping.exceptionFor(m.errorCode)
val msg = s"Error getting partition metadata for '${m.topic}'. Does the topic exist?"
errs.append(new SparkException(msg, cause))
errs += new SparkException(msg, cause)
}
}
}
Expand Down Expand Up @@ -205,11 +205,11 @@ class KafkaCluster(val kafkaParams: Map[String, String]) extends Serializable {
LeaderOffset(consumer.host, consumer.port, off)
}
} else {
errs.append(new SparkException(
s"Empty offsets for ${tp}, is ${before} before log beginning?"))
errs += new SparkException(
s"Empty offsets for ${tp}, is ${before} before log beginning?")
}
} else {
errs.append(ErrorMapping.exceptionFor(por.error))
errs += ErrorMapping.exceptionFor(por.error)
}
}
}
Expand All @@ -218,7 +218,7 @@ class KafkaCluster(val kafkaParams: Map[String, String]) extends Serializable {
}
}
val missing = topicAndPartitions.diff(result.keySet)
errs.append(new SparkException(s"Couldn't find leader offsets for ${missing}"))
errs += new SparkException(s"Couldn't find leader offsets for ${missing}")
Left(errs)
}
}
Expand Down Expand Up @@ -274,7 +274,7 @@ class KafkaCluster(val kafkaParams: Map[String, String]) extends Serializable {
if (ome.error == ErrorMapping.NoError) {
result += tp -> ome
} else {
errs.append(ErrorMapping.exceptionFor(ome.error))
errs += ErrorMapping.exceptionFor(ome.error)
}
}
}
Expand All @@ -283,7 +283,7 @@ class KafkaCluster(val kafkaParams: Map[String, String]) extends Serializable {
}
}
val missing = topicAndPartitions.diff(result.keySet)
errs.append(new SparkException(s"Couldn't find consumer offsets for ${missing}"))
errs += new SparkException(s"Couldn't find consumer offsets for ${missing}")
Left(errs)
}

Expand Down Expand Up @@ -330,7 +330,7 @@ class KafkaCluster(val kafkaParams: Map[String, String]) extends Serializable {
if (err == ErrorMapping.NoError) {
result += tp -> err
} else {
errs.append(ErrorMapping.exceptionFor(err))
errs += ErrorMapping.exceptionFor(err)
}
}
}
Expand All @@ -339,7 +339,7 @@ class KafkaCluster(val kafkaParams: Map[String, String]) extends Serializable {
}
}
val missing = topicAndPartitions.diff(result.keySet)
errs.append(new SparkException(s"Couldn't set offsets for ${missing}"))
errs += new SparkException(s"Couldn't set offsets for ${missing}")
Left(errs)
}

Expand All @@ -353,7 +353,7 @@ class KafkaCluster(val kafkaParams: Map[String, String]) extends Serializable {
fn(consumer)
} catch {
case NonFatal(e) =>
errs.append(e)
errs += e
} finally {
if (consumer != null) {
consumer.close()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -236,16 +236,16 @@ class MesosFineGrainedSchedulerBackendSuite
mesosOffers.add(createOffer(3, minMem, minCpu))

val expectedWorkerOffers = new ArrayBuffer[WorkerOffer](2)
expectedWorkerOffers.append(new WorkerOffer(
expectedWorkerOffers += new WorkerOffer(
mesosOffers.get(0).getSlaveId.getValue,
mesosOffers.get(0).getHostname,
(minCpu - backend.mesosExecutorCores).toInt
))
expectedWorkerOffers.append(new WorkerOffer(
)
expectedWorkerOffers += new WorkerOffer(
mesosOffers.get(2).getSlaveId.getValue,
mesosOffers.get(2).getHostname,
(minCpu - backend.mesosExecutorCores).toInt
))
)
val taskDesc = new TaskDescription(1L, 0, "s1", "n1", 0, ByteBuffer.wrap(new Array[Byte](0)))
when(taskScheduler.resourceOffers(expectedWorkerOffers)).thenReturn(Seq(Seq(taskDesc)))
when(taskScheduler.CPUS_PER_TASK).thenReturn(2)
Expand Down Expand Up @@ -339,11 +339,11 @@ class MesosFineGrainedSchedulerBackendSuite
val backend = new MesosFineGrainedSchedulerBackend(taskScheduler, sc, "master")

val expectedWorkerOffers = new ArrayBuffer[WorkerOffer](1)
expectedWorkerOffers.append(new WorkerOffer(
expectedWorkerOffers += new WorkerOffer(
mesosOffers.get(0).getSlaveId.getValue,
mesosOffers.get(0).getHostname,
2 // Deducting 1 for executor
))
)

val taskDesc = new TaskDescription(1L, 0, "s1", "n1", 0, ByteBuffer.wrap(new Array[Byte](0)))
when(taskScheduler.resourceOffers(expectedWorkerOffers)).thenReturn(Seq(Seq(taskDesc)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -991,7 +991,7 @@ object Matrices {
val data = new ArrayBuffer[(Int, Int, Double)]()
dnMat.foreachActive { (i, j, v) =>
if (v != 0.0) {
data.append((i, j + startCol, v))
data += Tuple3(i, j + startCol, v)
}
}
startCol += nCols
Expand Down Expand Up @@ -1061,7 +1061,7 @@ object Matrices {
val data = new ArrayBuffer[(Int, Int, Double)]()
dnMat.foreachActive { (i, j, v) =>
if (v != 0.0) {
data.append((i + startRow, j, v))
data += Tuple3(i + startRow, j, v)
}
}
startRow += nRows
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1128,7 +1128,7 @@ object Matrices {
val data = new ArrayBuffer[(Int, Int, Double)]()
dnMat.foreachActive { (i, j, v) =>
if (v != 0.0) {
data.append((i, j + startCol, v))
data += Tuple3(i, j + startCol, v)
}
}
startCol += nCols
Expand Down Expand Up @@ -1198,7 +1198,7 @@ object Matrices {
val data = new ArrayBuffer[(Int, Int, Double)]()
dnMat.foreachActive { (i, j, v) =>
if (v != 0.0) {
data.append((i + startRow, j, v))
data += Tuple3(i + startRow, j, v)
}
}
startRow += nRows
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ class BlockMatrix @Since("1.3.0") (
val colStart = blockColIndex.toLong * colsPerBlock
val entryValues = new ArrayBuffer[MatrixEntry]()
mat.foreachActive { (i, j, v) =>
if (v != 0.0) entryValues.append(new MatrixEntry(rowStart + i, colStart + j, v))
if (v != 0.0) entryValues += new MatrixEntry(rowStart + i, colStart + j, v)
}
entryValues
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ object GradientDescent extends Logging {
* lossSum is computed using the weights from the previous iteration
* and regVal is the regularization value computed in the previous iteration as well.
*/
stochasticLossHistory.append(lossSum / miniBatchSize + regVal)
stochasticLossHistory += lossSum / miniBatchSize + regVal
val update = updater.compute(
weights, Vectors.fromBreeze(gradientSum / miniBatchSize.toDouble),
stepSize, i, regParam)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ class StreamingLogisticRegressionSuite extends SparkFunSuite with TestSuiteBase
// (we add a count to ensure the result is a DStream)
ssc = setupStreams(input, (inputDStream: DStream[LabeledPoint]) => {
model.trainOn(inputDStream)
inputDStream.foreachRDD(x => history.append(math.abs(model.latestModel().weights(0) - B)))
inputDStream.foreachRDD(x => history += math.abs(model.latestModel().weights(0) - B))
inputDStream.count()
})
runStreams(ssc, numBatches, numBatches)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ class StreamingLinearRegressionSuite extends SparkFunSuite with TestSuiteBase {
// (we add a count to ensure the result is a DStream)
ssc = setupStreams(input, (inputDStream: DStream[LabeledPoint]) => {
model.trainOn(inputDStream)
inputDStream.foreachRDD(x => history.append(math.abs(model.latestModel().weights(0) - 10.0)))
inputDStream.foreachRDD(x => history += math.abs(model.latestModel().weights(0) - 10.0))
inputDStream.count()
})
runStreams(ssc, numBatches, numBatches)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ class ExpressionSet protected(
protected def add(e: Expression): Unit = {
if (!baseSet.contains(e.canonicalized)) {
baseSet.add(e.canonicalized)
originals.append(e)
originals += e
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -596,12 +596,12 @@ class CodegenContext {
// also not be too small, or it will have many function calls (for wide table), see the
// results in BenchmarkWideTable.
if (blockBuilder.length > 1024) {
blocks.append(blockBuilder.toString())
blocks += blockBuilder.toString()
blockBuilder.clear()
}
blockBuilder.append(code)
}
blocks.append(blockBuilder.toString())
blocks += blockBuilder.toString()

if (blocks.length == 1) {
// inline execution if only one block
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ class QuantileSummaries(
* @param x the new observation to insert into the summary
*/
def insert(x: Double): QuantileSummaries = {
headSampled.append(x)
headSampled += x
if (headSampled.size >= defaultHeadSize) {
this.withHeadBufferInserted
} else {
Expand Down Expand Up @@ -90,7 +90,7 @@ class QuantileSummaries(
val currentSample = sorted(opsIdx)
// Add all the samples before the next observation.
while(sampleIdx < sampled.size && sampled(sampleIdx).value <= currentSample) {
newSamples.append(sampled(sampleIdx))
newSamples += sampled(sampleIdx)
sampleIdx += 1
}

Expand All @@ -104,13 +104,13 @@ class QuantileSummaries(
}

val tuple = Stats(currentSample, 1, delta)
newSamples.append(tuple)
newSamples += tuple
opsIdx += 1
}

// Add all the remaining existing samples
while(sampleIdx < sampled.size) {
newSamples.append(sampled(sampleIdx))
newSamples += sampled(sampleIdx)
sampleIdx += 1
}
new QuantileSummaries(compressThreshold, relativeError, newSamples.toArray, currentCount)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,8 @@ class TreeNodeSuite extends SparkFunSuite {
val expected = Seq("+", "1", "*", "2", "-", "3", "4")
val expression = Add(Literal(1), Multiply(Literal(2), Subtract(Literal(3), Literal(4))))
expression transformDown {
case b: BinaryOperator => actual.append(b.symbol); b
case l: Literal => actual.append(l.toString); l
case b: BinaryOperator => actual += b.symbol; b
case l: Literal => actual += l.toString; l
}

assert(expected === actual)
Expand All @@ -94,8 +94,8 @@ class TreeNodeSuite extends SparkFunSuite {
val expected = Seq("1", "2", "3", "4", "-", "*", "+")
val expression = Add(Literal(1), Multiply(Literal(2), Subtract(Literal(3), Literal(4))))
expression transformUp {
case b: BinaryOperator => actual.append(b.symbol); b
case l: Literal => actual.append(l.toString); l
case b: BinaryOperator => actual += b.symbol; b
case l: Literal => actual += l.toString; l
}

assert(expected === actual)
Expand Down Expand Up @@ -134,8 +134,8 @@ class TreeNodeSuite extends SparkFunSuite {
val expected = Seq("1", "2", "3", "4", "-", "*", "+")
val expression = Add(Literal(1), Multiply(Literal(2), Subtract(Literal(3), Literal(4))))
expression foreachUp {
case b: BinaryOperator => actual.append(b.symbol);
case l: Literal => actual.append(l.toString);
case b: BinaryOperator => actual += b.symbol;
case l: Literal => actual += l.toString;
}

assert(expected === actual)
Expand Down
Loading

0 comments on commit 3ce3a28

Please sign in to comment.