Skip to content

Commit

Permalink
[CELEBORN] Optimize the performance of Celeborn's pushPartitionData (f…
Browse files Browse the repository at this point in the history
  • Loading branch information
kerwin-zk authored Oct 26, 2023
1 parent 5038204 commit 87372db
Show file tree
Hide file tree
Showing 3 changed files with 6 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,14 +51,16 @@ abstract class CelebornHashBasedColumnarShuffleWriter[K, V](

protected val mapId: Int = context.partitionId()

protected val clientPushBufferMaxSize: Int = celebornConf.clientPushBufferMaxSize

protected val celebornPartitionPusher = new CelebornPartitionPusher(
shuffleId,
numMappers,
numPartitions,
context,
mapId,
client,
celebornConf)
clientPushBufferMaxSize)

protected val blockManager: BlockManager = SparkEnv.get.blockManager

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import org.apache.spark._
import org.apache.spark.internal.Logging

import org.apache.celeborn.client.ShuffleClient
import org.apache.celeborn.common.CelebornConf

import java.io.IOException

Expand All @@ -31,13 +30,13 @@ class CelebornPartitionPusher(
val context: TaskContext,
val mapId: Int,
val client: ShuffleClient,
val celebornConf: CelebornConf)
val clientPushBufferMaxSize: Int)
extends Logging {

@throws[IOException]
def pushPartitionData(partitionId: Int, buffer: Array[Byte], length: Int): Int = {
logDebug(s"Push record, size ${buffer.length}.")
if (buffer.length > celebornConf.clientPushBufferMaxSize) {
if (buffer.length > clientPushBufferMaxSize) {
client.pushData(
shuffleId,
mapId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ class VeloxCelebornHashBasedColumnarShuffleWriter[K, V](
customizedCompressionCodec,
bufferCompressThreshold,
GlutenConfig.getConf.columnarShuffleCompressionMode,
celebornConf.clientPushBufferMaxSize,
clientPushBufferMaxSize,
celebornPartitionPusher,
NativeMemoryManagers
.create(
Expand Down

0 comments on commit 87372db

Please sign in to comment.