Skip to content

Commit

Permalink
Draft of data sources API
Browse files Browse the repository at this point in the history
  • Loading branch information
marmbrus committed Oct 30, 2014
1 parent 3535467 commit 0fd3a07
Show file tree
Hide file tree
Showing 19 changed files with 844 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@

package org.apache.spark.sql

/**
* Catalyst is a library for manipulating relational query plans. All classes in catalyst are
* considered an internal API to Spark SQL and are subject to change between minor releases.
*/
package object catalyst {
/**
* A JVM-global lock that should be used to prevent thread safety issues when using things in
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -389,7 +389,7 @@ case class ArrayType(elementType: DataType, containsNull: Boolean) extends DataT
* @param dataType The data type of this field.
* @param nullable Indicates if values of this field can be `null` values.
*/
case class StructField(name: String, dataType: DataType, nullable: Boolean) {
case class StructField(name: String, dataType: DataType, nullable: Boolean = true) {

private[sql] def buildFormattedString(prefix: String, builder: StringBuilder): Unit = {
builder.append(s"$prefix-- $name: ${dataType.typeName} (nullable = $nullable)\n")
Expand Down
14 changes: 13 additions & 1 deletion sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import org.apache.spark.sql.catalyst.types.DataType
import org.apache.spark.sql.execution.{SparkStrategies, _}
import org.apache.spark.sql.json._
import org.apache.spark.sql.parquet.ParquetRelation
import org.apache.spark.sql.sources.{BaseRelation, DDLParser, LogicalRelation}

/**
* :: AlphaComponent ::
Expand Down Expand Up @@ -69,13 +70,19 @@ class SQLContext(@transient val sparkContext: SparkContext)
@transient
protected[sql] lazy val optimizer: Optimizer = DefaultOptimizer

@transient
protected[sql] val ddlParser = new DDLParser

@transient
protected[sql] val sqlParser = {
val fallback = new catalyst.SqlParser
new catalyst.SparkSQLParser(fallback(_))
}

protected[sql] def parseSql(sql: String): LogicalPlan = sqlParser(sql)
protected[sql] def parseSql(sql: String): LogicalPlan = {
ddlParser(sql).getOrElse(sqlParser(sql))
}

protected[sql] def executeSql(sql: String): this.QueryExecution = executePlan(parseSql(sql))
protected[sql] def executePlan(plan: LogicalPlan): this.QueryExecution =
new this.QueryExecution { val logical = plan }
Expand Down Expand Up @@ -105,6 +112,10 @@ class SQLContext(@transient val sparkContext: SparkContext)
LogicalRDD(ScalaReflection.attributesFor[A], RDDConversions.productToRowRdd(rdd))(self))
}

implicit def baseRelationToSchemaRDD(baseRelation: BaseRelation): SchemaRDD = {
logicalPlanToSparkQuery(LogicalRelation(baseRelation))
}

/**
* :: DeveloperApi ::
* Creates a [[SchemaRDD]] from an [[RDD]] containing [[Row]]s by applying a schema to this RDD.
Expand Down Expand Up @@ -295,6 +306,7 @@ class SQLContext(@transient val sparkContext: SparkContext)

val strategies: Seq[Strategy] =
CommandStrategy(self) ::
DataSources ::
TakeOrdered ::
HashAggregation ::
LeftSemiJoin ::
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,6 @@ object RDDConversions {
}
}
}

/*
def toLogicalPlan[A <: Product : TypeTag](productRdd: RDD[A]): LogicalPlan = {
LogicalRDD(ScalaReflection.attributesFor[A], productToRowRdd(productRdd))
}
*/
}

case class LogicalRDD(output: Seq[Attribute], rdd: RDD[Row])(sqlContext: SQLContext)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.sql.execution

import org.apache.spark.sql.{SQLContext, execution}
import org.apache.spark.sql.{sources, SQLContext, execution}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.planning._
import org.apache.spark.sql.catalyst.plans._
Expand Down Expand Up @@ -252,6 +252,31 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
}
}

object DataSources extends Strategy {
import sources._

def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case PhysicalOperation(projectList, filters, l @ LogicalRelation(t: FilteredScan)) =>
pruneFilterProject(
projectList,
filters,
identity[Seq[Expression]], // All filters still need to be evaluated
a => PhysicalRDD(a, t.buildScan(a.map(l.attributeMap), filters))) :: Nil

case PhysicalOperation(projectList, filters, l @ LogicalRelation(t: PrunedScan)) =>
pruneFilterProject(
projectList,
filters,
identity[Seq[Expression]], // All filters still need to be evaluated.
a => PhysicalRDD(a, t.buildScan(a.map(l.attributeMap)))) :: Nil

case l @ LogicalRelation(t: TableScan) =>
PhysicalRDD(l.output, t.buildScan()) :: Nil

case _ => Nil
}
}

// Can we automate these 'pass through' operations?
object BasicOperators extends Strategy {
def numPartitions = self.numPartitions
Expand Down Expand Up @@ -304,6 +329,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {

case class CommandStrategy(context: SQLContext) extends Strategy {
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case r: RunnableCommand => ExecutedCommand(r) :: Nil
case logical.SetCommand(kv) =>
Seq(execution.SetCommand(kv, plan.output)(context))
case logical.ExplainCommand(logicalPlan, extended) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,12 @@ import org.apache.spark.Logging
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.errors.TreeNodeException
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.expressions.{Row, Attribute}
import org.apache.spark.sql.catalyst.plans.logical
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.{Row, SQLConf, SQLContext}
import org.apache.spark.sql.{SQLConf, SQLContext}

// TODO: DELETE ME...
trait Command {
this: SparkPlan =>

Expand All @@ -44,6 +46,35 @@ trait Command {
override def execute(): RDD[Row] = sqlContext.sparkContext.parallelize(sideEffectResult, 1)
}

// TODO: Replace command with runnable command.
trait RunnableCommand extends logical.Command {
self: Product =>

def output: Seq[Attribute]
def run(sqlContext: SQLContext): Seq[Row]
}

case class ExecutedCommand(cmd: RunnableCommand) extends SparkPlan {
/**
* A concrete command should override this lazy field to wrap up any side effects caused by the
* command or any other computation that should be evaluated exactly once. The value of this field
* can be used as the contents of the corresponding RDD generated from the physical plan of this
* command.
*
* The `execute()` method of all the physical command classes should reference `sideEffectResult`
* so that the command can be executed eagerly right after the command query is created.
*/
protected[sql] lazy val sideEffectResult: Seq[Row] = cmd.run(sqlContext)

override def output = cmd.output

def children = Nil

override def executeCollect(): Array[Row] = sideEffectResult.toArray

override def execute(): RDD[Row] = sqlContext.sparkContext.parallelize(sideEffectResult, 1)
}

/**
* :: DeveloperApi ::
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.json

import org.apache.spark.sql.{SQLConf, SQLContext}
import org.apache.spark.sql.sources._

private[sql] class DefaultSource extends RelationProvider {
/** Returns a new base relation with the given parameters. */
override def createRelation(
sqlContext: SQLContext,
parameters: Map[String, String]): BaseRelation = {
val fileName =
parameters.getOrElse("fileName", sys.error(s"Option 'fileName' not specified"))

val samplingRatio =
parameters.get("samplingRatio").map(_.toDouble).getOrElse(1.0)

JSONRelation(fileName, samplingRatio)(sqlContext)
}
}

private[sql] case class JSONRelation(fileName: String, samplingRatio: Double)(
@transient val sqlContext: SQLContext)
extends BaseRelation with TableScan {

private def baseRDD = sqlContext.sparkContext.textFile(fileName)

override val schema =
JsonRDD.inferSchema(
baseRDD,
samplingRatio,
sqlContext.columnNameOfCorruptRecord)

override def buildScan() =
JsonRDD.jsonStringToRow(baseRDD, schema, sqlContext.columnNameOfCorruptRecord)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.sources

import org.apache.spark.sql.SQLContext


Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.sources

import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
import org.apache.spark.sql.catalyst.expressions.{AttributeMap, Expression, Attribute}
import org.apache.spark.sql.catalyst.plans.logical.{Statistics, LogicalPlan}
import org.apache.spark.sql.catalyst.trees.LeafNode

/**
* Used to link a [[BaseRelation]] in to a logical query plan.
*/
private[sql] case class LogicalRelation(relation: BaseRelation)
extends LogicalPlan
with LeafNode[LogicalPlan]
with MultiInstanceRelation {

val output = relation.schema.toAttributes

// Logical Relations are distinct if they have different output for the sake of transformations.
override def equals(other: Any) = other match {
case l @ LogicalRelation(otherRelation) => relation == otherRelation && output == l.output
case _ => false
}

override def sameResult(otherPlan: LogicalPlan) = otherPlan match {
case LogicalRelation(otherRelation) => relation == otherRelation
case _ => false
}

@transient override lazy val statistics = Statistics(
// TODO: Allow datasources to provide statistics as well.
sizeInBytes = BigInt(relation.sqlContext.defaultSizeInBytes)
)

/** Used to lookup original attribute capitalization */
val attributeMap = AttributeMap(output.map(o => (o, o)))

def newInstance() = LogicalRelation(relation).asInstanceOf[this.type]

override def simpleString = s"Relation[${output.mkString(",")}] $relation"
}
Loading

0 comments on commit 0fd3a07

Please sign in to comment.