Skip to content

Commit

Permalink
Cleans up redundant code in proxy/internal RAPIDS Shuffle Managers (#…
Browse files Browse the repository at this point in the history
…6030)

Signed-off-by: Alessandro Bellina <abellina@nvidia.com>
  • Loading branch information
abellina authored Jul 20, 2022
1 parent fe0236b commit 392dba0
Show file tree
Hide file tree
Showing 16 changed files with 80 additions and 422 deletions.
2 changes: 1 addition & 1 deletion dist/unshimmed-common-from-spark311.txt
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,6 @@ com/nvidia/spark/rapids/SparkShimVersion*
com/nvidia/spark/rapids/SparkShims*
com/nvidia/spark/udf/Plugin*
org/apache/spark/sql/rapids/ProxyRapidsShuffleInternalManagerBase*
org/apache/spark/sql/rapids/VisibleShuffleManager*
org/apache/spark/sql/rapids/RapidsShuffleManagerLike*
rapids/*.py
rapids4spark-version-info.properties
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019-2021, NVIDIA CORPORATION.
* Copyright (c) 2019-2022, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -16,7 +16,7 @@

package org.apache.spark.sql.rapids.shims.spark311

import org.apache.spark.{SparkConf, TaskContext}
import org.apache.spark.SparkConf
import org.apache.spark.shuffle._
import org.apache.spark.sql.rapids.{ProxyRapidsShuffleInternalManagerBase, RapidsShuffleInternalManagerBase}

Expand All @@ -26,34 +26,8 @@ import org.apache.spark.sql.rapids.{ProxyRapidsShuffleInternalManagerBase, Rapid
* `ShuffleManager` and `SortShuffleManager` classes.
*/
class RapidsShuffleInternalManager(conf: SparkConf, isDriver: Boolean)
extends RapidsShuffleInternalManagerBase(conf, isDriver) {

def getReader[K, C](
handle: ShuffleHandle,
startMapIndex: Int,
endMapIndex: Int,
startPartition: Int,
endPartition: Int,
context: TaskContext,
metrics: ShuffleReadMetricsReporter): ShuffleReader[K, C] = {
getReaderInternal(handle, startMapIndex, endMapIndex, startPartition, endPartition, context,
metrics)
}
}
extends RapidsShuffleInternalManagerBase(conf, isDriver)

class ProxyRapidsShuffleInternalManager(conf: SparkConf, isDriver: Boolean)
extends ProxyRapidsShuffleInternalManagerBase(conf, isDriver) with ShuffleManager {

def getReader[K, C](
handle: ShuffleHandle,
startMapIndex: Int,
endMapIndex: Int,
startPartition: Int,
endPartition: Int,
context: TaskContext,
metrics: ShuffleReadMetricsReporter
): ShuffleReader[K, C] = {
self.getReader(handle, startMapIndex, endMapIndex, startPartition, endPartition, context,
metrics)
}
}
extends ProxyRapidsShuffleInternalManagerBase(conf, isDriver)
with ShuffleManager
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2021, NVIDIA CORPORATION.
* Copyright (c) 2021-2022, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -16,7 +16,7 @@

package org.apache.spark.sql.rapids.shims.spark312

import org.apache.spark.{SparkConf, TaskContext}
import org.apache.spark.SparkConf
import org.apache.spark.shuffle._
import org.apache.spark.sql.rapids.{ProxyRapidsShuffleInternalManagerBase, RapidsShuffleInternalManagerBase}

Expand All @@ -26,35 +26,8 @@ import org.apache.spark.sql.rapids.{ProxyRapidsShuffleInternalManagerBase, Rapid
* `ShuffleManager` and `SortShuffleManager` classes.
*/
class RapidsShuffleInternalManager(conf: SparkConf, isDriver: Boolean)
extends RapidsShuffleInternalManagerBase(conf, isDriver) {

def getReader[K, C](
handle: ShuffleHandle,
startMapIndex: Int,
endMapIndex: Int,
startPartition: Int,
endPartition: Int,
context: TaskContext,
metrics: ShuffleReadMetricsReporter): ShuffleReader[K, C] = {
getReaderInternal(handle, startMapIndex, endMapIndex, startPartition, endPartition, context,
metrics)
}
}

extends RapidsShuffleInternalManagerBase(conf, isDriver)

class ProxyRapidsShuffleInternalManager(conf: SparkConf, isDriver: Boolean)
extends ProxyRapidsShuffleInternalManagerBase(conf, isDriver) with ShuffleManager {

def getReader[K, C](
handle: ShuffleHandle,
startMapIndex: Int,
endMapIndex: Int,
startPartition: Int,
endPartition: Int,
context: TaskContext,
metrics: ShuffleReadMetricsReporter
): ShuffleReader[K, C] = {
self.getReader(handle, startMapIndex, endMapIndex, startPartition, endPartition, context,
metrics)
}
}
extends ProxyRapidsShuffleInternalManagerBase(conf, isDriver)
with ShuffleManager
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2021, NVIDIA CORPORATION.
* Copyright (c) 2021-2022, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -16,7 +16,7 @@

package org.apache.spark.sql.rapids.shims.spark312db

import org.apache.spark.{SparkConf, TaskContext}
import org.apache.spark.SparkConf
import org.apache.spark.shuffle._
import org.apache.spark.sql.rapids.{ProxyRapidsShuffleInternalManagerBase, RapidsShuffleInternalManagerBase}

Expand All @@ -26,34 +26,8 @@ import org.apache.spark.sql.rapids.{ProxyRapidsShuffleInternalManagerBase, Rapid
* `ShuffleManager` and `SortShuffleManager` classes.
*/
class RapidsShuffleInternalManager(conf: SparkConf, isDriver: Boolean)
extends RapidsShuffleInternalManagerBase(conf, isDriver) {

def getReader[K, C](
handle: ShuffleHandle,
startMapIndex: Int,
endMapIndex: Int,
startPartition: Int,
endPartition: Int,
context: TaskContext,
metrics: ShuffleReadMetricsReporter): ShuffleReader[K, C] = {
getReaderInternal(handle, startMapIndex, endMapIndex, startPartition, endPartition, context,
metrics)
}
}
extends RapidsShuffleInternalManagerBase(conf, isDriver)

class ProxyRapidsShuffleInternalManager(conf: SparkConf, isDriver: Boolean)
extends ProxyRapidsShuffleInternalManagerBase(conf, isDriver) with ShuffleManager {

def getReader[K, C](
handle: ShuffleHandle,
startMapIndex: Int,
endMapIndex: Int,
startPartition: Int,
endPartition: Int,
context: TaskContext,
metrics: ShuffleReadMetricsReporter
): ShuffleReader[K, C] = {
self.getReader(handle, startMapIndex, endMapIndex, startPartition, endPartition, context,
metrics)
}
}
extends ProxyRapidsShuffleInternalManagerBase(conf, isDriver)
with ShuffleManager
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2021, NVIDIA CORPORATION.
* Copyright (c) 2021-2022, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -16,7 +16,7 @@

package org.apache.spark.sql.rapids.shims.spark313

import org.apache.spark.{SparkConf, TaskContext}
import org.apache.spark.SparkConf
import org.apache.spark.shuffle._
import org.apache.spark.sql.rapids.{ProxyRapidsShuffleInternalManagerBase, RapidsShuffleInternalManagerBase}

Expand All @@ -26,34 +26,8 @@ import org.apache.spark.sql.rapids.{ProxyRapidsShuffleInternalManagerBase, Rapid
* `ShuffleManager` and `SortShuffleManager` classes.
*/
class RapidsShuffleInternalManager(conf: SparkConf, isDriver: Boolean)
extends RapidsShuffleInternalManagerBase(conf, isDriver) {
def getReader[K, C](
handle: ShuffleHandle,
startMapIndex: Int,
endMapIndex: Int,
startPartition: Int,
endPartition: Int,
context: TaskContext,
metrics: ShuffleReadMetricsReporter): ShuffleReader[K, C] = {
getReaderInternal(handle, startMapIndex, endMapIndex, startPartition, endPartition, context,
metrics)
}
}

extends RapidsShuffleInternalManagerBase(conf, isDriver)

class ProxyRapidsShuffleInternalManager(conf: SparkConf, isDriver: Boolean)
extends ProxyRapidsShuffleInternalManagerBase(conf, isDriver) with ShuffleManager {

def getReader[K, C](
handle: ShuffleHandle,
startMapIndex: Int,
endMapIndex: Int,
startPartition: Int,
endPartition: Int,
context: TaskContext,
metrics: ShuffleReadMetricsReporter
): ShuffleReader[K, C] = {
self.getReader(handle, startMapIndex, endMapIndex, startPartition, endPartition, context,
metrics)
}
}
extends ProxyRapidsShuffleInternalManagerBase(conf, isDriver)
with ShuffleManager
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

package org.apache.spark.sql.rapids.shims.spark314

import org.apache.spark.{SparkConf, TaskContext}
import org.apache.spark.SparkConf
import org.apache.spark.shuffle._
import org.apache.spark.sql.rapids.{ProxyRapidsShuffleInternalManagerBase, RapidsShuffleInternalManagerBase}

Expand All @@ -26,34 +26,9 @@ import org.apache.spark.sql.rapids.{ProxyRapidsShuffleInternalManagerBase, Rapid
* `ShuffleManager` and `SortShuffleManager` classes.
*/
class RapidsShuffleInternalManager(conf: SparkConf, isDriver: Boolean)
extends RapidsShuffleInternalManagerBase(conf, isDriver) {
def getReader[K, C](
handle: ShuffleHandle,
startMapIndex: Int,
endMapIndex: Int,
startPartition: Int,
endPartition: Int,
context: TaskContext,
metrics: ShuffleReadMetricsReporter): ShuffleReader[K, C] = {
getReaderInternal(handle, startMapIndex, endMapIndex, startPartition, endPartition, context,
metrics)
}
}
extends RapidsShuffleInternalManagerBase(conf, isDriver)


class ProxyRapidsShuffleInternalManager(conf: SparkConf, isDriver: Boolean)
extends ProxyRapidsShuffleInternalManagerBase(conf, isDriver) with ShuffleManager {

def getReader[K, C](
handle: ShuffleHandle,
startMapIndex: Int,
endMapIndex: Int,
startPartition: Int,
endPartition: Int,
context: TaskContext,
metrics: ShuffleReadMetricsReporter
): ShuffleReader[K, C] = {
self.getReader(handle, startMapIndex, endMapIndex, startPartition, endPartition, context,
metrics)
}
}
extends ProxyRapidsShuffleInternalManagerBase(conf, isDriver)
with ShuffleManager
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2021, NVIDIA CORPORATION.
* Copyright (c) 2021-2022, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -16,7 +16,7 @@

package org.apache.spark.sql.rapids.shims.spark320

import org.apache.spark.{SparkConf, TaskContext}
import org.apache.spark.SparkConf
import org.apache.spark.shuffle._
import org.apache.spark.sql.rapids.{ProxyRapidsShuffleInternalManagerBase, RapidsShuffleInternalManagerBase}

Expand All @@ -26,35 +26,8 @@ import org.apache.spark.sql.rapids.{ProxyRapidsShuffleInternalManagerBase, Rapid
* `ShuffleManager` and `SortShuffleManager` classes.
*/
class RapidsShuffleInternalManager(conf: SparkConf, isDriver: Boolean)
extends RapidsShuffleInternalManagerBase(conf, isDriver) {

def getReader[K, C](
handle: ShuffleHandle,
startMapIndex: Int,
endMapIndex: Int,
startPartition: Int,
endPartition: Int,
context: TaskContext,
metrics: ShuffleReadMetricsReporter): ShuffleReader[K, C] = {
getReaderInternal(handle, startMapIndex, endMapIndex, startPartition, endPartition, context,
metrics)
}
}

extends RapidsShuffleInternalManagerBase(conf, isDriver)

class ProxyRapidsShuffleInternalManager(conf: SparkConf, isDriver: Boolean)
extends ProxyRapidsShuffleInternalManagerBase(conf, isDriver) with ShuffleManager {

def getReader[K, C](
handle: ShuffleHandle,
startMapIndex: Int,
endMapIndex: Int,
startPartition: Int,
endPartition: Int,
context: TaskContext,
metrics: ShuffleReadMetricsReporter
): ShuffleReader[K, C] = {
self.getReader(handle, startMapIndex, endMapIndex, startPartition, endPartition, context,
metrics)
}
}
extends ProxyRapidsShuffleInternalManagerBase(conf, isDriver)
with ShuffleManager
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2021, NVIDIA CORPORATION.
* Copyright (c) 2021-2022, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -16,7 +16,7 @@

package org.apache.spark.sql.rapids.shims.spark321

import org.apache.spark.{SparkConf, TaskContext}
import org.apache.spark.SparkConf
import org.apache.spark.shuffle._
import org.apache.spark.sql.rapids.{ProxyRapidsShuffleInternalManagerBase, RapidsShuffleInternalManagerBase}

Expand All @@ -26,35 +26,8 @@ import org.apache.spark.sql.rapids.{ProxyRapidsShuffleInternalManagerBase, Rapid
* `ShuffleManager` and `SortShuffleManager` classes.
*/
class RapidsShuffleInternalManager(conf: SparkConf, isDriver: Boolean)
extends RapidsShuffleInternalManagerBase(conf, isDriver) {

def getReader[K, C](
handle: ShuffleHandle,
startMapIndex: Int,
endMapIndex: Int,
startPartition: Int,
endPartition: Int,
context: TaskContext,
metrics: ShuffleReadMetricsReporter): ShuffleReader[K, C] = {
getReaderInternal(handle, startMapIndex, endMapIndex, startPartition, endPartition, context,
metrics)
}
}

extends RapidsShuffleInternalManagerBase(conf, isDriver)

class ProxyRapidsShuffleInternalManager(conf: SparkConf, isDriver: Boolean)
extends ProxyRapidsShuffleInternalManagerBase(conf, isDriver) with ShuffleManager {

def getReader[K, C](
handle: ShuffleHandle,
startMapIndex: Int,
endMapIndex: Int,
startPartition: Int,
endPartition: Int,
context: TaskContext,
metrics: ShuffleReadMetricsReporter
): ShuffleReader[K, C] = {
self.getReader(handle, startMapIndex, endMapIndex, startPartition, endPartition, context,
metrics)
}
}
extends ProxyRapidsShuffleInternalManagerBase(conf, isDriver)
with ShuffleManager
Loading

0 comments on commit 392dba0

Please sign in to comment.