Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Azure event hub - JSON data handling in Scala notebook - Data bricks #557

Closed
ganesh-gawande opened this issue Nov 9, 2020 · 2 comments
Closed

Comments

@ganesh-gawande
Copy link

I have configured event hub in Notebook to receive the data. The data in the event hub is JSON format like below.

{
    "header": {
        "p1": "abcd",
        "p2": 5170,
    },
    "data": [{
            "name": "propoertyName",
            "value": 10,
        }]
}

Below is the scala code i have written -

val incomingStream  = spark.readStream
  .format("eventhubs")
  .options(eventHubsConf.toMap)
  .load()

val messages =incomingStream.withColumn("Body", $"body".cast(StringType)).select("Body")

val query2  = messages.writeStream
  .format("delta")
  .outputMode("append")
  .option("checkpointLocation", "_")
  .start("/delta/data/")

 query2.awaitTermination()

Using this code, string is getting saved in the delta lake.

When I query it using SQL syntax - using - I can see the string. So following query works well.
SELECT star FROM delta./delta/data/

But I want to query based on internal parameters as well - which does not work. How I can store the JSON to delta lake as it is to query on parameter like below. So I want to write following query, which gives me error.

SELECT Body.header.P1 FROM delta./delta/data/

In the documentation or few examples of twitter streaming, only string examples are there. I did not found json examples and associated manipulations.

@ganesh-gawande ganesh-gawande changed the title Azure event hub - JSON data handling Azure event hub - JSON data handling in Scala notebook - Data bricks Nov 9, 2020
@ganeshchand
Copy link

ganeshchand commented Nov 30, 2020

What you ihave s a delta table with a column containing JSON string stored as parquet. If you want to read this column containing JSON string value, you can use from_json() function. This function requires a column name and schema of the json.
For more information, see here https://spark.apache.org/docs/latest/api/scala/org/apache/spark/sql/functions$.html#from_json(e:org.apache.spark.sql.Column,schema:org.apache.spark.sql.Column,options:java.util.Map[String,String]):org.apache.spark.sql.Column

@nyaghma
Copy link
Contributor

nyaghma commented Jun 23, 2021

The library delivers the event body as binary as it has been discussed here. As Ganesh mentioned, you should be able to get the data in JSON format using from_json() function after casting the body to string.

@nyaghma nyaghma closed this as completed Jun 23, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants