Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a Reader/Writer Interface for Streaming #25

Merged
merged 7 commits into from
Jan 10, 2016
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1685,6 +1685,13 @@ class DataFrame private[sql](
@Experimental
def write: DataFrameWriter = new DataFrameWriter(this)

/**
* :: Experimental ::
* Interface for starting a streaming query that will continually save data to the specified
* external sink as new data arrives.
*/
def streamTo: DataStreamWriter = new DataStreamWriter(this)

/**
* Returns the content of the [[DataFrame]] as a RDD of JSON strings.
* @group rdd
Expand Down
124 changes: 124 additions & 0 deletions sql/core/src/main/scala/org/apache/spark/sql/DataStreamReader.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql

import org.apache.spark.sql.execution.streaming.StreamingRelation

import scala.collection.JavaConverters._

import org.apache.hadoop.util.StringUtils

import org.apache.spark.Logging
import org.apache.spark.annotation.Experimental
import org.apache.spark.sql.execution.datasources.{LogicalRelation, ResolvedDataSource}
import org.apache.spark.sql.types.StructType

/**
* :: Experimental ::
* An interface to reading streaming data. Use `sqlContext.streamFrom` to access these methods.
*/
@Experimental
class DataStreamReader private[sql](sqlContext: SQLContext) extends Logging {

/**
* Specifies the input data source format.
*
* @since 2.0.0
*/
def format(source: String): DataStreamReader = {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Writing reader.format("kafka") is a quite weird, and will be weird for most non-fs streaming sources. Rather I propose having an alias called source, which works nice for both batch and streaming - source("text"), source("parquet"), source("kafka") all make sense.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

/cc @rxin

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this depends on what other methods are available on the reader/writer interfaces.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah never mind -- i misunderstood it. your proposal makes sense

this.source = source
this
}

/**
* Specifies the input schema. Some data streams (e.g. JSON) can infer the input schema
* automatically from data. By specifying the schema here, the underlying data stream can
* skip the schema inference step, and thus speed up data reading.
*
* @since 2.0.0
*/
def schema(schema: StructType): DataStreamReader = {
this.userSpecifiedSchema = Option(schema)
this
}

/**
* Adds an input option for the underlying data stream.
*
* @since 2.0.0
*/
def option(key: String, value: String): DataStreamReader = {
this.extraOptions += (key -> value)
this
}

/**
* (Scala-specific) Adds input options for the underlying data stream.
*
* @since 2.0.0
*/
def options(options: scala.collection.Map[String, String]): DataStreamReader = {
this.extraOptions ++= options
this
}

/**
* Adds input options for the underlying data stream.
*
* @since 2.0.0
*/
def options(options: java.util.Map[String, String]): DataStreamReader = {
this.options(options.asScala)
this
}

/**
* Loads streaming input in as a [[DataFrame]], for data streams that don't require a path (e.g.
* external key-value stores).
*
* @since 2.0.0
*/
def open(): DataFrame = {
val resolved = ResolvedDataSource.createSource(
sqlContext,
userSpecifiedSchema = userSpecifiedSchema,
providerName = source,
options = extraOptions.toMap)
DataFrame(sqlContext, StreamingRelation(resolved))
}

/**
* Loads input in as a [[DataFrame]], for data streams that read from some path.
*
* @since 2.0.0
*/
def open(path: String): DataFrame = {
option("path", path).open()
}

///////////////////////////////////////////////////////////////////////////////////////
// Builder pattern config options
///////////////////////////////////////////////////////////////////////////////////////

private var source: String = sqlContext.conf.defaultDataSourceName

private var userSpecifiedSchema: Option[StructType] = None

private var extraOptions = new scala.collection.mutable.HashMap[String, String]

}
144 changes: 144 additions & 0 deletions sql/core/src/main/scala/org/apache/spark/sql/DataStreamWriter.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql

import java.util.Properties

import org.apache.spark.sql.execution.streaming.{Offset, Sink, Batch, StreamExecution}

import scala.collection.JavaConverters._

import org.apache.spark.annotation.Experimental
import org.apache.spark.sql.catalyst.{SqlParser, TableIdentifier}
import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedRelation}
import org.apache.spark.sql.catalyst.plans.logical.{Project, InsertIntoTable}
import org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils
import org.apache.spark.sql.execution.datasources.{CreateTableUsingAsSelect, ResolvedDataSource}
import org.apache.spark.sql.sources.HadoopFsRelation


/**
* :: Experimental ::
* Interface used to start a streaming query query execution.
*
* @since 2.0.0
*/
@Experimental
final class DataStreamWriter private[sql](df: DataFrame) {

/**
* Specifies the underlying output data source. Built-in options include "parquet", "json", etc.
*
* @since 2.0.0
*/
def format(source: String): DataStreamWriter = {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar to my suggestion for source instead of format for DataStreamReader, how about sink for DataStreamWriter? writer.format("kafka") is weird, writer.sink("kafka") is better.

this.source = source
this
}

/**
* Adds an output option for the underlying data source.
*
* @since 2.0.0
*/
def option(key: String, value: String): DataStreamWriter = {
this.extraOptions += (key -> value)
this
}

/**
* (Scala-specific) Adds output options for the underlying data source.
*
* @since 2.0.0
*/
def options(options: scala.collection.Map[String, String]): DataStreamWriter = {
this.extraOptions ++= options
this
}

/**
* Adds output options for the underlying data source.
*
* @since 2.0.0
*/
def options(options: java.util.Map[String, String]): DataStreamWriter = {
this.options(options.asScala)
this
}

/**
* Partitions the output by the given columns on the file system. If specified, the output is
* laid out on the file system similar to Hive's partitioning scheme.\
* @since 2.0.0
*/
@scala.annotation.varargs
def partitionBy(colNames: String*): DataStreamWriter = {
this.partitioningColumns = Option(colNames)
this
}

/**
* Starts the execution of the streaming query, which will continually output results to the given
* path as new data arrives. The returned [[StandingQuery]] object can be used to interact with
* the stream.
* @since 2.0.0
*/
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since 2.0.0 missing.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added

def start(path: String): StandingQuery = {
this.extraOptions += ("path" -> path)
start()
}

/**
* Starts the execution of the streaming query, which will continually output results to the given
* path as new data arrives. The returned [[StandingQuery]] object can be used to interact with
* the stream.
*
* @since 2.0.0
*/
def start(): StandingQuery = {
val sink = ResolvedDataSource.createSink(
df.sqlContext,
source,
extraOptions.toMap)

new StreamExecution(df.sqlContext, df.logicalPlan, sink)
}

private def normalizedParCols: Option[Seq[String]] = partitioningColumns.map { parCols =>
parCols.map { col =>
df.logicalPlan.output
.map(_.name)
.find(df.sqlContext.analyzer.resolver(_, col))
.getOrElse(throw new AnalysisException(s"Partition column $col not found in existing " +
s"columns (${df.logicalPlan.output.map(_.name).mkString(", ")})"))
}
}

///////////////////////////////////////////////////////////////////////////////////////
// Builder pattern config options
///////////////////////////////////////////////////////////////////////////////////////

private var source: String = df.sqlContext.conf.defaultDataSourceName
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I dont expect any one use this in multithread, but nonetheless it better to make these volatile?


private var mode: SaveMode = SaveMode.ErrorIfExists
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this mode does not seem to be used anywhere.


private var extraOptions = new scala.collection.mutable.HashMap[String, String]

private var partitioningColumns: Option[Seq[String]] = None

}
9 changes: 9 additions & 0 deletions sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -622,6 +622,15 @@ class SQLContext private[sql](
@Experimental
def read: DataFrameReader = new DataFrameReader(this)


/**
* :: Experimental ::
* Returns a [[DataStreamReader]] than can be used to create queries that execute continuously
* as new data arrives.
*/
@Experimental
def streamFrom: DataStreamReader = new DataStreamReader(this)

/**
* :: Experimental ::
* Creates an external table from the given path and returns the corresponding DataFrame.
Expand Down
40 changes: 40 additions & 0 deletions sql/core/src/main/scala/org/apache/spark/sql/StandingQuery.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql

import org.apache.spark.annotation.DeveloperApi

trait StandingQuery {

/**
* Stops the execution of the streaming query if it is running. This method blocks until the
* thread performing execution has stopped.
*/
def stop(): Unit

/** Clears the indicator that a batch has completed. Used for testing. */
@DeveloperApi
def clearBatchMarker(): Unit
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: its slightly non-intuitive with marker and stuff in the name on what these two function combo does. If its only for testing in Scala, another idea could be something like

def awaitBatchAfter()(preBatchBody: => Unit): Unit
This will run internally rest the marker, run the preBatchBody function and then wait for it. The DataStreamReaderSuite will be...

query.awaitBatchAfter {
    // Add some data
    stringToFile(new File(src, "1"), "drop1\nkeep2\nkeep3")
}

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thats a good idea, though I think we need a java version to. I'd probably defer this, but we should keep this suggestion in mind when we finalize the testing API.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For future note, we can extend it to
def awaitBatchAfter()(preBatchBody: => Unit)(postBatchBody: Batch => Unit)

Then we can internally guarantee that the preBatchBody can be executed before a batch starts, and exactly one batch gets executed, and the postBatchBody is run.

query.awaitBatchAfter {
    // Add some data
} { 
    // Test whether data has been received.
}

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe a listener is better?


/**
* Awaits the completion of at least one streaming batch. Must be called after `clearBatchMarker`
* to gurantee that a new batch has been processed.
*/
@DeveloperApi
def awaitBatchCompletion(): Unit
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ package org.apache.spark.sql.execution.datasources

import java.util.ServiceLoader

import com.sun.jersey.core.impl.provider.entity.DataSourceProvider
import org.apache.spark.sql.execution.streaming.{Sink, Source}

import scala.collection.JavaConverters._
import scala.language.{existentials, implicitConversions}
import scala.util.{Success, Failure, Try}
Expand Down Expand Up @@ -92,6 +95,36 @@ object ResolvedDataSource extends Logging {
}
}

def createSource(
sqlContext: SQLContext,
userSpecifiedSchema: Option[StructType],
providerName: String,
options: Map[String, String]): Source = {
val provider = lookupDataSource(providerName).newInstance() match {
case s: StreamSourceProvider => s
case _ =>
throw new UnsupportedOperationException(
s"Data source $providerName does not support streamed reading")
}

provider.createSource(sqlContext, options, userSpecifiedSchema)
}

def createSink(
sqlContext: SQLContext,
providerName: String,
options: Map[String, String]): Sink = {
val provider = lookupDataSource(providerName).newInstance() match {
case s: StreamSinkProvider => s
case _ =>
throw new UnsupportedOperationException(
s"Data source $providerName does not support streamed writing")
}

provider.createSink(sqlContext, options)
}


/** Create a [[ResolvedDataSource]] for reading data in. */
def apply(
sqlContext: SQLContext,
Expand Down
Loading