Skip to content

Commit

Permalink
Move simple classes to API (#9541)
Browse files Browse the repository at this point in the history
contributes to #6565

Signed-off-by: Gera Shegalov <gera@apache.org>
  • Loading branch information
gerashegalov authored Oct 26, 2023
1 parent 1b8f9fd commit 01a2d20
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 28 deletions.
5 changes: 0 additions & 5 deletions dist/unshimmed-common-from-spark311.txt
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,7 @@ com/nvidia/spark/rapids/GpuColumnVectorUtils*
com/nvidia/spark/rapids/ExplainPlan.class
com/nvidia/spark/rapids/ExplainPlan$.class
com/nvidia/spark/rapids/ExplainPlanBase.class
com/nvidia/spark/rapids/GpuKryoRegistrator*
com/nvidia/spark/rapids/PlanUtils*
com/nvidia/spark/rapids/RapidsExecutorHeartbeatMsg*
com/nvidia/spark/rapids/RapidsExecutorStartupMsg*
com/nvidia/spark/rapids/RapidsExecutorUpdateMsg*
com/nvidia/spark/rapids/RapidsShuffleHeartbeatHandler*
com/nvidia/spark/rapids/SQLExecPlugin*
com/nvidia/spark/rapids/ShimLoaderTemp*
com/nvidia/spark/rapids/SparkShims*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Copyright (c) 2023, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.nvidia.spark.rapids

import org.apache.spark.storage.BlockManagerId


/**
* This is the first message sent from the executor to the driver.
* @param id `BlockManagerId` for the executor
*/
case class RapidsExecutorStartupMsg(id: BlockManagerId)

/**
* Executor heartbeat message.
* This gives the driver an opportunity to respond with `RapidsExecutorUpdateMsg`
*/
case class RapidsExecutorHeartbeatMsg(id: BlockManagerId)

/**
* Driver response to an startup or heartbeat message, with new (to the peer) executors
* from the last heartbeat.
*/
case class RapidsExecutorUpdateMsg(ids: Array[BlockManagerId])

trait RapidsShuffleHeartbeatHandler {
/** Called when a new peer is seen via heartbeats */
def addPeer(peer: BlockManagerId): Unit
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,24 +29,6 @@ import org.apache.spark.internal.Logging
import org.apache.spark.sql.rapids.RapidsShuffleInternalManagerBase
import org.apache.spark.storage.BlockManagerId

/**
* This is the first message sent from the executor to the driver.
* @param id `BlockManagerId` for the executor
*/
case class RapidsExecutorStartupMsg(id: BlockManagerId)

/**
* Executor heartbeat message.
* This gives the driver an opportunity to respond with `RapidsExecutorUpdateMsg`
*/
case class RapidsExecutorHeartbeatMsg(id: BlockManagerId)

/**
* Driver response to an startup or heartbeat message, with new (to the peer) executors
* from the last heartbeat.
*/
case class RapidsExecutorUpdateMsg(ids: Array[BlockManagerId])

class RapidsShuffleHeartbeatManager(heartbeatIntervalMillis: Long,
heartbeatTimeoutMillis: Long) extends Logging {
require(heartbeatIntervalMillis > 0,
Expand Down Expand Up @@ -179,11 +161,6 @@ class RapidsShuffleHeartbeatManager(heartbeatIntervalMillis: Long,
}
}

trait RapidsShuffleHeartbeatHandler {
/** Called when a new peer is seen via heartbeats */
def addPeer(peer: BlockManagerId): Unit
}

class RapidsShuffleHeartbeatEndpoint(pluginContext: PluginContext, conf: RapidsConf)
extends Logging with AutoCloseable {
// Number of milliseconds between heartbeats to driver
Expand Down

0 comments on commit 01a2d20

Please sign in to comment.