Skip to content

Commit

Permalink
Initial support for using ParquetTableScan to read HiveMetaStore tables.
Browse files Browse the repository at this point in the history
  • Loading branch information
marmbrus committed Aug 6, 2014
1 parent 4878911 commit 212d5cd
Show file tree
Hide file tree
Showing 4 changed files with 250 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -303,3 +303,15 @@ case class Intersect(left: SparkPlan, right: SparkPlan) extends BinaryNode {
left.execute().map(_.copy()).intersection(right.execute().map(_.copy()))
}
}

/**
* :: DeveloperApi ::
* A plan node that does nothing but lie about the output of its child. Used to spice a
* (hopefully structurally equivalent) tree from a different optimization sequence into an already
* resolved tree.
*/
@DeveloperApi
case class OutputFaker(output: Seq[Attribute], child: SparkPlan) extends SparkPlan {
def children = child :: Nil
def execute() = child.execute()
}
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,14 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
// Change the default SQL dialect to HiveQL
override private[spark] def dialect: String = getConf(SQLConf.DIALECT, "hiveql")

/**
* When true, enables an experimental feature where metastore tables that use the parquet SerDe
* are automatically converted to use the Spark SQL parquet table scan, instead of the Hive
* SerDe.
*/
private[spark] def convertMetastoreParquet: Boolean =
getConf("spark.sql.hive.convertMetastoreParquet", "false") == "true"

override protected[sql] def executePlan(plan: LogicalPlan): this.QueryExecution =
new this.QueryExecution { val logical = plan }

Expand Down Expand Up @@ -328,6 +336,7 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
TakeOrdered,
ParquetOperations,
InMemoryScans,
ParquetConversion, // Must be before HiveTableScans
HiveTableScans,
DataSinks,
Scripts,
Expand Down
105 changes: 103 additions & 2 deletions sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,122 @@

package org.apache.spark.sql.hive

import org.apache.spark.sql.SQLContext
import org.apache.spark.annotation.Experimental
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate
import org.apache.spark.sql.catalyst.planning._
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, LowerCaseSchema}
import org.apache.spark.sql.execution._
import org.apache.spark.sql.hive.execution._
import org.apache.spark.sql.columnar.InMemoryRelation
import org.apache.spark.sql.parquet.ParquetTableScan

import scala.collection.JavaConversions._

private[hive] trait HiveStrategies {
// Possibly being too clever with types here... or not clever enough.
self: SQLContext#SparkPlanner =>

val hiveContext: HiveContext

/**
* :: Experimental ::
* Finds table scans that would use the Hive SerDe and replaces them with our own native parquet
* table scan operator.
*
* TODO: Much of this logic is duplicated in HiveTableScan. Ideally we would do some refactoring
* but since this is after the code freeze for 1.1 all logic is here to minimize disruption.
*/
@Experimental
object ParquetConversion extends Strategy {
implicit class LogicalPlanHacks(s: SchemaRDD) {
def lowerCase =
new SchemaRDD(s.sqlContext, LowerCaseSchema(s.logicalPlan))
}

implicit class PhysicalPlanHacks(s: SparkPlan) {
def fakeOutput(newOutput: Seq[Attribute]) = OutputFaker(newOutput, s)
}

def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case PhysicalOperation(projectList, predicates, relation: MetastoreRelation)
if relation.tableDesc.getSerdeClassName.contains("Parquet") &&
hiveContext.convertMetastoreParquet =>

// Filter out all predicates that only deal with partition keys
val partitionKeyIds = relation.partitionKeys.map(_.exprId).toSet
val (pruningPredicates, otherPredicates) = predicates.partition {
_.references.map(_.exprId).subsetOf(partitionKeyIds)
}

// We are going to throw the predicates and projection back at the whole optimization
// sequence so lets unresolve all the attributes, allowing them to be rebound to the
// matching parquet attributes.
val unresolvedOtherPredicates = otherPredicates.map(_ transform {
case a: AttributeReference => UnresolvedAttribute(a.name)
}).reduceOption(And).getOrElse(Literal(true))

val unresolvedProjection = projectList.map(_ transform {
case a: AttributeReference => UnresolvedAttribute(a.name)
})

if (relation.hiveQlTable.isPartitioned) {
val rawPredicate = pruningPredicates.reduceOption(And).getOrElse(Literal(true))
// Translate the predicate so that it automatically casts the input values to the correct
// data types during evaluation
val castedPredicate = rawPredicate transform {
case a: AttributeReference =>
val idx = relation.partitionKeys.indexWhere(a.exprId == _.exprId)
val key = relation.partitionKeys(idx)
Cast(BoundReference(idx, StringType, nullable = true), key.dataType)
}

val inputData = new GenericMutableRow(relation.partitionKeys.size)
val pruningCondition =
if(codegenEnabled) {
GeneratePredicate(castedPredicate)
} else {
InterpretedPredicate(castedPredicate)
}

val partitions = relation.hiveQlPartitions.filter { part =>
val partitionValues = part.getValues
var i = 0
while (i < partitionValues.size()) {
inputData(i) = partitionValues(i)
i += 1
}
pruningCondition(inputData)
}

org.apache.spark.sql.execution.Union(
partitions.par.map(p =>
hiveContext
.parquetFile(p.getLocation)
.lowerCase
.where(unresolvedOtherPredicates)
.select(unresolvedProjection:_*)
.queryExecution
.executedPlan
.fakeOutput(projectList.map(_.toAttribute))).seq) :: Nil

} else {
hiveContext
.parquetFile(relation.hiveQlTable.getDataLocation.getPath)
.lowerCase
.where(unresolvedOtherPredicates)
.select(unresolvedProjection:_*)
.queryExecution
.executedPlan
.fakeOutput(projectList.map(_.toAttribute)) :: Nil
}
case _ => Nil
}
}

object Scripts extends Strategy {
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case logical.ScriptTransformation(input, script, output, child) =>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@

/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.parquet

import java.io.File

import org.scalatest.BeforeAndAfterAll

import scala.reflect.ClassTag

import org.apache.spark.sql.{SQLConf, QueryTest}
import org.apache.spark.sql.execution.{BroadcastHashJoin, ShuffledHashJoin}
import org.apache.spark.sql.hive.test.TestHive
import org.apache.spark.sql.hive.test.TestHive._

case class ParquetData(intField: Int, stringField: String)

/**
* Tests for our SerDe -> Native parquet scan conversion.
*/
class ParquetMetastoreSuite extends QueryTest with BeforeAndAfterAll {

override def beforeAll(): Unit = {
setConf("spark.sql.hive.convertMetastoreParquet", "true")
}

override def afterAll(): Unit = {
setConf("spark.sql.hive.convertMetastoreParquet", "false")
}

val partitionedTableDir = File.createTempFile("parquettests", "sparksql")
partitionedTableDir.delete()
partitionedTableDir.mkdir()

(1 to 10).foreach { p =>
val partDir = new File(partitionedTableDir, s"p=$p")
sparkContext.makeRDD(1 to 10)
.map(i => ParquetData(i, s"part-$p"))
.saveAsParquetFile(partDir.getCanonicalPath)
}

sql(s"""
create external table partitioned_parquet
(
intField INT,
stringField STRING
)
PARTITIONED BY (p int)
ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
STORED AS
INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
location '${partitionedTableDir.getCanonicalPath}'
""")

sql(s"""
create external table normal_parquet
(
intField INT,
stringField STRING
)
ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
STORED AS
INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
location '${new File(partitionedTableDir, "p=1").getCanonicalPath}'
""")

(1 to 10).foreach { p =>
sql(s"ALTER TABLE partitioned_parquet ADD PARTITION (p=$p)")
}

test("simple count") {
checkAnswer(
sql("SELECT COUNT(*) FROM partitioned_parquet"),
100)
}

test("pruned count") {
checkAnswer(
sql("SELECT COUNT(*) FROM partitioned_parquet WHERE p = 1"),
10)
}

test("multi-partition pruned count") {
checkAnswer(
sql("SELECT COUNT(*) FROM partitioned_parquet WHERE p IN (1,2,3)"),
30)
}

test("non-partition predicates") {
checkAnswer(
sql("SELECT COUNT(*) FROM partitioned_parquet WHERE intField IN (1,2,3)"),
30)
}

test("sum") {
checkAnswer(
sql("SELECT SUM(intField) FROM partitioned_parquet WHERE intField IN (1,2,3) AND p = 1"),
1 + 2 + 3
)
}

test("non-part select(*)") {
checkAnswer(
sql("SELECT COUNT(*) FROM normal_parquet"),
10
)
}
}

0 comments on commit 212d5cd

Please sign in to comment.