Skip to content

Commit

Permalink
Rename join -> joins package.
Browse files Browse the repository at this point in the history
  • Loading branch information
rxin committed Oct 8, 2014
1 parent a070d44 commit cbc664c
Show file tree
Hide file tree
Showing 14 changed files with 30 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,11 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
// Find left semi joins where at least some predicates can be evaluated by matching join keys
case ExtractEquiJoinKeys(LeftSemi, leftKeys, rightKeys, condition, left, right) =>
val semiJoin = join.LeftSemiJoinHash(leftKeys, rightKeys, planLater(left), planLater(right))
val semiJoin = joins.LeftSemiJoinHash(leftKeys, rightKeys, planLater(left), planLater(right))
condition.map(Filter(_, semiJoin)).getOrElse(semiJoin) :: Nil
// no predicate can be evaluated by matching hash keys
case logical.Join(left, right, LeftSemi, condition) =>
join.LeftSemiJoinBNL(planLater(left), planLater(right), condition) :: Nil
joins.LeftSemiJoinBNL(planLater(left), planLater(right), condition) :: Nil
case _ => Nil
}
}
Expand All @@ -49,13 +49,13 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
* evaluated by matching hash keys.
*
* This strategy applies a simple optimization based on the estimates of the physical sizes of
* the two join sides. When planning a [[join.BroadcastHashJoin]], if one side has an
* the two join sides. When planning a [[joins.BroadcastHashJoin]], if one side has an
* estimated physical size smaller than the user-settable threshold
* [[org.apache.spark.sql.SQLConf.AUTO_BROADCASTJOIN_THRESHOLD]], the planner would mark it as the
* ''build'' relation and mark the other relation as the ''stream'' side. The build table will be
* ''broadcasted'' to all of the executors involved in the join, as a
* [[org.apache.spark.broadcast.Broadcast]] object. If both estimates exceed the threshold, they
* will instead be used to decide the build side in a [[join.ShuffledHashJoin]].
* will instead be used to decide the build side in a [[joins.ShuffledHashJoin]].
*/
object HashJoin extends Strategy with PredicateHelper {

Expand All @@ -65,8 +65,8 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
left: LogicalPlan,
right: LogicalPlan,
condition: Option[Expression],
side: join.BuildSide) = {
val broadcastHashJoin = execution.join.BroadcastHashJoin(
side: joins.BuildSide) = {
val broadcastHashJoin = execution.joins.BroadcastHashJoin(
leftKeys, rightKeys, side, planLater(left), planLater(right))
condition.map(Filter(_, broadcastHashJoin)).getOrElse(broadcastHashJoin) :: Nil
}
Expand All @@ -75,26 +75,26 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
case ExtractEquiJoinKeys(Inner, leftKeys, rightKeys, condition, left, right)
if sqlContext.autoBroadcastJoinThreshold > 0 &&
right.statistics.sizeInBytes <= sqlContext.autoBroadcastJoinThreshold =>
makeBroadcastHashJoin(leftKeys, rightKeys, left, right, condition, join.BuildRight)
makeBroadcastHashJoin(leftKeys, rightKeys, left, right, condition, joins.BuildRight)

case ExtractEquiJoinKeys(Inner, leftKeys, rightKeys, condition, left, right)
if sqlContext.autoBroadcastJoinThreshold > 0 &&
left.statistics.sizeInBytes <= sqlContext.autoBroadcastJoinThreshold =>
makeBroadcastHashJoin(leftKeys, rightKeys, left, right, condition, join.BuildLeft)
makeBroadcastHashJoin(leftKeys, rightKeys, left, right, condition, joins.BuildLeft)

case ExtractEquiJoinKeys(Inner, leftKeys, rightKeys, condition, left, right) =>
val buildSide =
if (right.statistics.sizeInBytes <= left.statistics.sizeInBytes) {
join.BuildRight
joins.BuildRight
} else {
join.BuildLeft
joins.BuildLeft
}
val hashJoin = join.ShuffledHashJoin(
val hashJoin = joins.ShuffledHashJoin(
leftKeys, rightKeys, buildSide, planLater(left), planLater(right))
condition.map(Filter(_, hashJoin)).getOrElse(hashJoin) :: Nil

case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, left, right) =>
join.HashOuterJoin(
joins.HashOuterJoin(
leftKeys, rightKeys, joinType, condition, planLater(left), planLater(right)) :: Nil

case _ => Nil
Expand Down Expand Up @@ -163,11 +163,11 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
case logical.Join(left, right, joinType, condition) =>
val buildSide =
if (right.statistics.sizeInBytes <= left.statistics.sizeInBytes) {
join.BuildRight
joins.BuildRight
} else {
join.BuildLeft
joins.BuildLeft
}
join.BroadcastNestedLoopJoin(
joins.BroadcastNestedLoopJoin(
planLater(left), planLater(right), buildSide, joinType, condition) :: Nil
case _ => Nil
}
Expand All @@ -176,10 +176,10 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
object CartesianProduct extends Strategy {
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case logical.Join(left, right, _, None) =>
execution.join.CartesianProduct(planLater(left), planLater(right)) :: Nil
execution.joins.CartesianProduct(planLater(left), planLater(right)) :: Nil
case logical.Join(left, right, Inner, Some(condition)) =>
execution.Filter(condition,
execution.join.CartesianProduct(planLater(left), planLater(right))) :: Nil
execution.joins.CartesianProduct(planLater(left), planLater(right))) :: Nil
case _ => Nil
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.spark.sql.execution.join
package org.apache.spark.sql.execution.joins

import scala.concurrent._
import scala.concurrent.duration._
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.spark.sql.execution.join
package org.apache.spark.sql.execution.joins

import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.sql.catalyst.expressions._
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.spark.sql.execution.join
package org.apache.spark.sql.execution.joins

import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.sql.catalyst.expressions.JoinedRow
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.spark.sql.execution.join
package org.apache.spark.sql.execution.joins

import org.apache.spark.sql.catalyst.expressions.{Expression, JoinedRow2, Row}
import org.apache.spark.sql.execution.SparkPlan
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.spark.sql.execution.join
package org.apache.spark.sql.execution.joins

import java.util.{HashMap => JavaHashMap}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.spark.sql.execution.join
package org.apache.spark.sql.execution.joins

import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.sql.catalyst.expressions._
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.spark.sql.execution.join
package org.apache.spark.sql.execution.joins

import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.sql.catalyst.expressions.{Expression, Row}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.spark.sql.execution.join
package org.apache.spark.sql.execution.joins

import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.sql.catalyst.expressions.Expression
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import org.apache.spark.annotation.DeveloperApi
* :: DeveloperApi ::
* Physical execution operators for join operations.
*/
package object join {
package object joins {

@DeveloperApi
sealed abstract class BuildSide
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import org.apache.spark.sql.TestData._
import org.apache.spark.sql.catalyst.plans.JoinType
import org.apache.spark.sql.catalyst.plans.{LeftOuter, RightOuter, FullOuter, Inner, LeftSemi}
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.join._
import org.apache.spark.sql.execution.joins._
import org.apache.spark.sql.test.TestSQLContext
import org.apache.spark.sql.test.TestSQLContext._

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.sql

import org.apache.spark.sql.catalyst.errors.TreeNodeException
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.execution.join.BroadcastHashJoin
import org.apache.spark.sql.execution.joins.BroadcastHashJoin
import org.apache.spark.sql.test._
import org.scalatest.BeforeAndAfterAll
import java.util.TimeZone
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import org.apache.spark.sql.{SQLConf, execution}
import org.apache.spark.sql.TestData._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.execution.join.{BroadcastHashJoin, ShuffledHashJoin}
import org.apache.spark.sql.execution.joins.{BroadcastHashJoin, ShuffledHashJoin}
import org.apache.spark.sql.test.TestSQLContext._
import org.apache.spark.sql.test.TestSQLContext.planner._

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import scala.reflect.ClassTag

import org.apache.spark.sql.{SQLConf, QueryTest}
import org.apache.spark.sql.catalyst.plans.logical.NativeCommand
import org.apache.spark.sql.execution.join.{BroadcastHashJoin, ShuffledHashJoin}
import org.apache.spark.sql.execution.joins.{BroadcastHashJoin, ShuffledHashJoin}
import org.apache.spark.sql.hive.test.TestHive
import org.apache.spark.sql.hive.test.TestHive._

Expand Down

0 comments on commit cbc664c

Please sign in to comment.