Skip to content

Commit

Permalink
[SPARK-39196][CORE][SQL][K8S] replace getOrElse(null) with orNull
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

This PR aims to replace `getOrElse(null)` with `orNull`.

### Why are the changes needed?

Code simplification.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Pass the GA.

Closes apache#36567 from dcoliversun/SPARK-39196.

Authored-by: Qian.Sun <qian.sun2020@gmail.com>
Signed-off-by: Sean Owen <srowen@gmail.com>
  • Loading branch information
dcoliversun authored and srowen committed May 17, 2022
1 parent 98fad57 commit b4c0196
Show file tree
Hide file tree
Showing 14 changed files with 17 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ private[kafka010] class KafkaSource(
}

override def reportLatestOffset(): streaming.Offset = {
latestPartitionOffsets.map(KafkaSourceOffset(_)).getOrElse(null)
latestPartitionOffsets.map(KafkaSourceOffset(_)).orNull
}

override def latestOffset(startOffset: streaming.Offset, limit: ReadLimit): streaming.Offset = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -556,7 +556,7 @@ private[spark] abstract class BasePythonRunner[IN, OUT](
val obj = new Array[Byte](exLength)
stream.readFully(obj)
new PythonException(new String(obj, StandardCharsets.UTF_8),
writerThread.exception.getOrElse(null))
writerThread.exception.orNull)
}

protected def handleEndOfDataSection(): Unit = {
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/rdd/RDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1816,7 +1816,7 @@ abstract class RDD[T: ClassTag](
*/
@Experimental
@Since("3.1.0")
def getResourceProfile(): ResourceProfile = resourceProfile.getOrElse(null)
def getResourceProfile(): ResourceProfile = resourceProfile.orNull

// =======================================================================
// Other internal methods and fields
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1272,7 +1272,7 @@ private[spark] object JsonProtocol {
val properties = new Properties
mapFromJson(json).foreach { case (k, v) => properties.setProperty(k, v) }
properties
}.getOrElse(null)
}.orNull
}

def UUIDFromJson(json: JValue): UUID = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ object TriangleCount {

// join the sets with the graph
val setGraph: Graph[VertexSet, ED] = graph.outerJoinVertices(nbrSets) {
(vid, _, optSet) => optSet.getOrElse(null)
(vid, _, optSet) => optSet.orNull
}

// Edge function computes intersection of smaller vertex with larger vertex
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ private[spark] object SparkKubernetesClientFactory extends Logging {
// Start from an auto-configured config with the desired context
// Fabric 8 uses null to indicate that the users current context should be used so if no
// explicit setting pass null
val config = new ConfigBuilder(autoConfigure(kubeContext.getOrElse(null)))
val config = new ConfigBuilder(autoConfigure(kubeContext.orNull))
.withApiVersion("v1")
.withMasterUrl(master)
.withRequestTimeout(clientType.requestTimeout(sparkConf))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,8 +156,8 @@ class KubernetesSuite extends SparkFunSuite
// Try the spark test home
sys.props("spark.test.home")
)
val sparkDirProp = possible_spark_dirs.filter(x =>
new File(Paths.get(x).toFile, "bin/spark-submit").exists).headOption.getOrElse(null)
val sparkDirProp = possible_spark_dirs.find(x =>
new File(Paths.get(x).toFile, "bin/spark-submit").exists).orNull
require(sparkDirProp != null,
s"Spark home directory must be provided in system properties tested $possible_spark_dirs")
sparkHomeDir = Paths.get(sparkDirProp)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,7 @@ private[spark] class KubeConfigBackend(var context: String)

// If an explicit master URL was specified then override that detected from the
// K8S config if it is different
var masterUrl = Option(System.getProperty(TestConstants.CONFIG_KEY_KUBE_MASTER_URL))
.getOrElse(null)
var masterUrl = Option(System.getProperty(TestConstants.CONFIG_KEY_KUBE_MASTER_URL)).orNull
if (StringUtils.isNotBlank(masterUrl)) {
// Clean up master URL which would have been specified in Spark format into a normal
// K8S master URL
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1123,8 +1123,8 @@ case class MapZipWith(left: Expression, right: Expression, function: Expression)
val valueData2 = mapData2.valueArray()
var i = 0
for ((key, Array(index1, index2)) <- keysWithIndexes) {
val v1 = index1.map(valueData1.get(_, leftValueType)).getOrElse(null)
val v2 = index2.map(valueData2.get(_, rightValueType)).getOrElse(null)
val v1 = index1.map(valueData1.get(_, leftValueType)).orNull
val v2 = index2.map(valueData2.get(_, rightValueType)).orNull
keyVar.value.set(key)
value1Var.value.set(v1)
value2Var.value.set(v2)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -806,7 +806,7 @@ object QueryExecutionErrors extends QueryErrorsBase {
|Could not execute broadcast in $timeout secs. You can increase the timeout
|for broadcasts via ${SQLConf.BROADCAST_TIMEOUT.key} or disable broadcast join
|by setting ${SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key} to -1
""".stripMargin.replaceAll("\n", " "), ex.getOrElse(null))
""".stripMargin.replaceAll("\n", " "), ex.orNull)
}

def cannotCompareCostWithTargetCostError(cost: String): Throwable = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ class AnsiTypeCoercionSuite extends TypeCoercionSuiteBase {
val input = Literal("123")
val castResult = AnsiTypeCoercion.implicitCast(input, to)
assert(DataType.equalsIgnoreCaseAndNullability(
castResult.map(_.dataType).getOrElse(null), expected),
castResult.map(_.dataType).orNull, expected),
s"Failed to cast String literal to $to")
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,13 @@ abstract class TypeCoercionSuiteBase extends AnalysisTest {
// Check default value
val castDefault = implicitCast(default(from), to)
assert(DataType.equalsIgnoreCompatibleNullability(
castDefault.map(_.dataType).getOrElse(null), expected),
castDefault.map(_.dataType).orNull, expected),
s"Failed to cast $from to $to")

// Check null value
val castNull = implicitCast(createNull(from), to)
assert(DataType.equalsIgnoreCaseAndNullability(
castNull.map(_.dataType).getOrElse(null), expected),
castNull.map(_.dataType).orNull, expected),
s"Failed to cast $from to $to")
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ private[ui] class ThriftServerSessionPage(parent: ThriftServerTab)
require(parameterId != null && parameterId.nonEmpty, "Missing id parameter")

val content = store.synchronized { // make sure all parts in this page are consistent
val sessionStat = store.getSession(parameterId).getOrElse(null)
val sessionStat = store.getSession(parameterId).orNull
require(sessionStat != null, "Invalid sessionID[" + parameterId + "]")

generateBasicStats() ++
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ private[streaming] abstract class ReceiverSupervisor(
// This is a blocking action so we should use "futureExecutionContext" which is a cached
// thread pool.
logWarning("Restarting receiver with delay " + delay + " ms: " + message,
error.getOrElse(null))
error.orNull)
stopReceiver("Restarting receiver with delay " + delay + "ms: " + message, error)
logDebug("Sleeping for " + delay)
Thread.sleep(delay)
Expand Down

0 comments on commit b4c0196

Please sign in to comment.