Skip to content

Commit

Permalink
Merge pull request #25 from marmbrus/streaming-readwrite
Browse files Browse the repository at this point in the history
Add a Reader/Writer Interface for Streaming
  • Loading branch information
marmbrus committed Jan 10, 2016
2 parents addb3ab + a52200b commit c1139ec
Show file tree
Hide file tree
Showing 15 changed files with 724 additions and 33 deletions.
7 changes: 7 additions & 0 deletions sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1685,6 +1685,13 @@ class DataFrame private[sql](
@Experimental
def write: DataFrameWriter = new DataFrameWriter(this)

/**
* :: Experimental ::
* Interface for starting a streaming query that will continually save data to the specified
* external sink as new data arrives.
*/
def streamTo: DataStreamWriter = new DataStreamWriter(this)

/**
* Returns the content of the [[DataFrame]] as a RDD of JSON strings.
* @group rdd
Expand Down
124 changes: 124 additions & 0 deletions sql/core/src/main/scala/org/apache/spark/sql/DataStreamReader.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql

import org.apache.spark.sql.execution.streaming.StreamingRelation

import scala.collection.JavaConverters._

import org.apache.hadoop.util.StringUtils

import org.apache.spark.Logging
import org.apache.spark.annotation.Experimental
import org.apache.spark.sql.execution.datasources.{LogicalRelation, ResolvedDataSource}
import org.apache.spark.sql.types.StructType

/**
* :: Experimental ::
* An interface to reading streaming data. Use `sqlContext.streamFrom` to access these methods.
*/
@Experimental
class DataStreamReader private[sql](sqlContext: SQLContext) extends Logging {

/**
* Specifies the input data source format.
*
* @since 2.0.0
*/
def format(source: String): DataStreamReader = {
this.source = source
this
}

/**
* Specifies the input schema. Some data streams (e.g. JSON) can infer the input schema
* automatically from data. By specifying the schema here, the underlying data stream can
* skip the schema inference step, and thus speed up data reading.
*
* @since 2.0.0
*/
def schema(schema: StructType): DataStreamReader = {
this.userSpecifiedSchema = Option(schema)
this
}

/**
* Adds an input option for the underlying data stream.
*
* @since 2.0.0
*/
def option(key: String, value: String): DataStreamReader = {
this.extraOptions += (key -> value)
this
}

/**
* (Scala-specific) Adds input options for the underlying data stream.
*
* @since 2.0.0
*/
def options(options: scala.collection.Map[String, String]): DataStreamReader = {
this.extraOptions ++= options
this
}

/**
* Adds input options for the underlying data stream.
*
* @since 2.0.0
*/
def options(options: java.util.Map[String, String]): DataStreamReader = {
this.options(options.asScala)
this
}

/**
* Loads streaming input in as a [[DataFrame]], for data streams that don't require a path (e.g.
* external key-value stores).
*
* @since 2.0.0
*/
def open(): DataFrame = {
val resolved = ResolvedDataSource.createSource(
sqlContext,
userSpecifiedSchema = userSpecifiedSchema,
providerName = source,
options = extraOptions.toMap)
DataFrame(sqlContext, StreamingRelation(resolved))
}

/**
* Loads input in as a [[DataFrame]], for data streams that read from some path.
*
* @since 2.0.0
*/
def open(path: String): DataFrame = {
option("path", path).open()
}

///////////////////////////////////////////////////////////////////////////////////////
// Builder pattern config options
///////////////////////////////////////////////////////////////////////////////////////

private var source: String = sqlContext.conf.defaultDataSourceName

private var userSpecifiedSchema: Option[StructType] = None

private var extraOptions = new scala.collection.mutable.HashMap[String, String]

}
142 changes: 142 additions & 0 deletions sql/core/src/main/scala/org/apache/spark/sql/DataStreamWriter.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql

import java.util.Properties

import org.apache.spark.sql.execution.streaming.{Offset, Sink, Batch, StreamExecution}

import scala.collection.JavaConverters._

import org.apache.spark.annotation.Experimental
import org.apache.spark.sql.catalyst.{SqlParser, TableIdentifier}
import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedRelation}
import org.apache.spark.sql.catalyst.plans.logical.{Project, InsertIntoTable}
import org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils
import org.apache.spark.sql.execution.datasources.{CreateTableUsingAsSelect, ResolvedDataSource}
import org.apache.spark.sql.sources.HadoopFsRelation


/**
* :: Experimental ::
* Interface used to start a streaming query query execution.
*
* @since 2.0.0
*/
@Experimental
final class DataStreamWriter private[sql](df: DataFrame) {

/**
* Specifies the underlying output data source. Built-in options include "parquet", "json", etc.
*
* @since 2.0.0
*/
def format(source: String): DataStreamWriter = {
this.source = source
this
}

/**
* Adds an output option for the underlying data source.
*
* @since 2.0.0
*/
def option(key: String, value: String): DataStreamWriter = {
this.extraOptions += (key -> value)
this
}

/**
* (Scala-specific) Adds output options for the underlying data source.
*
* @since 2.0.0
*/
def options(options: scala.collection.Map[String, String]): DataStreamWriter = {
this.extraOptions ++= options
this
}

/**
* Adds output options for the underlying data source.
*
* @since 2.0.0
*/
def options(options: java.util.Map[String, String]): DataStreamWriter = {
this.options(options.asScala)
this
}

/**
* Partitions the output by the given columns on the file system. If specified, the output is
* laid out on the file system similar to Hive's partitioning scheme.\
* @since 2.0.0
*/
@scala.annotation.varargs
def partitionBy(colNames: String*): DataStreamWriter = {
this.partitioningColumns = Option(colNames)
this
}

/**
* Starts the execution of the streaming query, which will continually output results to the given
* path as new data arrives. The returned [[StandingQuery]] object can be used to interact with
* the stream.
* @since 2.0.0
*/
def start(path: String): StandingQuery = {
this.extraOptions += ("path" -> path)
start()
}

/**
* Starts the execution of the streaming query, which will continually output results to the given
* path as new data arrives. The returned [[StandingQuery]] object can be used to interact with
* the stream.
*
* @since 2.0.0
*/
def start(): StandingQuery = {
val sink = ResolvedDataSource.createSink(
df.sqlContext,
source,
extraOptions.toMap)

new StreamExecution(df.sqlContext, df.logicalPlan, sink)
}

private def normalizedParCols: Option[Seq[String]] = partitioningColumns.map { parCols =>
parCols.map { col =>
df.logicalPlan.output
.map(_.name)
.find(df.sqlContext.analyzer.resolver(_, col))
.getOrElse(throw new AnalysisException(s"Partition column $col not found in existing " +
s"columns (${df.logicalPlan.output.map(_.name).mkString(", ")})"))
}
}

///////////////////////////////////////////////////////////////////////////////////////
// Builder pattern config options
///////////////////////////////////////////////////////////////////////////////////////

private var source: String = df.sqlContext.conf.defaultDataSourceName

private var extraOptions = new scala.collection.mutable.HashMap[String, String]

private var partitioningColumns: Option[Seq[String]] = None

}
9 changes: 9 additions & 0 deletions sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -622,6 +622,15 @@ class SQLContext private[sql](
@Experimental
def read: DataFrameReader = new DataFrameReader(this)


/**
* :: Experimental ::
* Returns a [[DataStreamReader]] than can be used to create queries that execute continuously
* as new data arrives.
*/
@Experimental
def streamFrom: DataStreamReader = new DataStreamReader(this)

/**
* :: Experimental ::
* Creates an external table from the given path and returns the corresponding DataFrame.
Expand Down
40 changes: 40 additions & 0 deletions sql/core/src/main/scala/org/apache/spark/sql/StandingQuery.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql

import org.apache.spark.annotation.DeveloperApi

trait StandingQuery {

/**
* Stops the execution of the streaming query if it is running. This method blocks until the
* thread performing execution has stopped.
*/
def stop(): Unit

/** Clears the indicator that a batch has completed. Used for testing. */
@DeveloperApi
def clearBatchMarker(): Unit

/**
* Awaits the completion of at least one streaming batch. Must be called after `clearBatchMarker`
* to gurantee that a new batch has been processed.
*/
@DeveloperApi
def awaitBatchCompletion(): Unit
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ package org.apache.spark.sql.execution.datasources

import java.util.ServiceLoader

import com.sun.jersey.core.impl.provider.entity.DataSourceProvider
import org.apache.spark.sql.execution.streaming.{Sink, Source}

import scala.collection.JavaConverters._
import scala.language.{existentials, implicitConversions}
import scala.util.{Success, Failure, Try}
Expand Down Expand Up @@ -92,6 +95,36 @@ object ResolvedDataSource extends Logging {
}
}

def createSource(
sqlContext: SQLContext,
userSpecifiedSchema: Option[StructType],
providerName: String,
options: Map[String, String]): Source = {
val provider = lookupDataSource(providerName).newInstance() match {
case s: StreamSourceProvider => s
case _ =>
throw new UnsupportedOperationException(
s"Data source $providerName does not support streamed reading")
}

provider.createSource(sqlContext, options, userSpecifiedSchema)
}

def createSink(
sqlContext: SQLContext,
providerName: String,
options: Map[String, String]): Sink = {
val provider = lookupDataSource(providerName).newInstance() match {
case s: StreamSinkProvider => s
case _ =>
throw new UnsupportedOperationException(
s"Data source $providerName does not support streamed writing")
}

provider.createSink(sqlContext, options)
}


/** Create a [[ResolvedDataSource]] for reading data in. */
def apply(
sqlContext: SQLContext,
Expand Down
Loading

0 comments on commit c1139ec

Please sign in to comment.