Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a Reader/Writer Interface for Streaming #25

Merged
merged 7 commits into from
Jan 10, 2016

Conversation

marmbrus
Copy link
Owner

@marmbrus marmbrus commented Jan 7, 2016

In this PR I add a new interface for opening new streams (as Dataframes) and starting a new streaming query. These are modeled after the DataFrame reader/writer interface.

val df =
  sqlContext
    .streamFrom
    .format("text")
    .open("/michael/logs")

val filtered = df.filter($"value" contains "ERROR")

val runningQuery =
  filtered
    .streamTo
    .format("text")
    .start("/michael/errors")

runningQuery.stop()

Sources and Sinks are created by a StreamSourceProvider or StreamSinkProvider, which are similar to a RelationProvider in the Data Source API (and in fact a single class can be all of the above if desired).

I include a throwaway implementation of a text file source/sink for demonstration / testing.

TODO:

  • Improve tests
  • Improve comments

@AmplabJenkins
Copy link

Merged build finished. Test FAILed.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/spark-streaming-df-test/21/
Test FAILed.

@AmplabJenkins
Copy link

Merged build finished. Test FAILed.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/spark-streaming-df-test/22/
Test FAILed.

@AmplabJenkins
Copy link

Merged build finished. Test FAILed.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/spark-streaming-df-test/23/
Test FAILed.

@AmplabJenkins
Copy link

Merged build finished. Test FAILed.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/spark-streaming-df-test/24/
Test FAILed.

@AmplabJenkins
Copy link

Merged build finished. Test PASSed.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/spark-streaming-df-test/25/
Test PASSed.

@AmplabJenkins
Copy link

Merged build finished. Test PASSed.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/spark-streaming-df-test/26/
Test PASSed.

*/
override def awaitNextBatch(): Unit = {
while (!batchRun) {
awaitBatchLock.synchronized { awaitBatchLock.wait(100) }
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why use wait(100) instead of wait()?

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thats a really good question. When I had it as wait(), it was hanging non-deterministically. I think its okay to spin occasionally?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That sounds like a sign of corner cases we are missing.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From the docs interrupts and spurious wakeups are possible, and this method should always be used in a loop. Do my guess is that we some times wake up spuriously. There is then a race to check the done condition / finish the batch (which is why it would hang with very low probability (3/1000)). So, this actually does seem like the best solution. We don't spin a ton wastefully. In most takes we awake immediately. In very rare cases we sleep 100ms too long.

* to gurantee that a new batch has been processed.
*/
@DeveloperApi
def awaitNextBatch(): Unit
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

awaitBatchCompletion? awaitNextBatch doesnt signify whether to wait for next batch to start or end.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changed

@AmplabJenkins
Copy link

Merged build finished. Test PASSed.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/spark-streaming-df-test/29/
Test PASSed.

@AmplabJenkins
Copy link

Merged build finished. Test PASSed.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/spark-streaming-df-test/30/
Test PASSed.

*
* @since 2.0.0
*/
def format(source: String): DataStreamReader = {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Writing reader.format("kafka") is a quite weird, and will be weird for most non-fs streaming sources. Rather I propose having an alias called source, which works nice for both batch and streaming - source("text"), source("parquet"), source("kafka") all make sense.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

/cc @rxin

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this depends on what other methods are available on the reader/writer interfaces.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah never mind -- i misunderstood it. your proposal makes sense

@AmplabJenkins
Copy link

Merged build finished. Test FAILed.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/spark-streaming-df-test/35/
Test FAILed.

@AmplabJenkins
Copy link

Merged build finished. Test FAILed.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/spark-streaming-df-test/36/
Test FAILed.

@AmplabJenkins
Copy link

Merged build finished. Test PASSed.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/spark-streaming-df-test/37/
Test PASSed.

marmbrus added a commit that referenced this pull request Jan 10, 2016
Add a Reader/Writer Interface for Streaming
@marmbrus marmbrus merged commit c1139ec into streaming-df Jan 10, 2016
@marmbrus marmbrus deleted the streaming-readwrite branch March 8, 2016 00:06
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants