Skip to content

Commit

Permalink
Apache Spark Examples
Browse files Browse the repository at this point in the history
  • Loading branch information
sparkcodegeeks committed Mar 14, 2021
1 parent 6e583d9 commit fe89391
Show file tree
Hide file tree
Showing 12 changed files with 308 additions and 11 deletions.
7 changes: 7 additions & 0 deletions src/main/resources/address-multiline.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
Id,Address Line1,City,State,Zipcode
1,9182 Clear Water Rd,Fayetteville,AR,72704
2,"9920 State
Highway 89",Ringling,OK,73456
3,9724 E Landon Ln,Kennewick,WA,99338


2 changes: 1 addition & 1 deletion src/main/resources/free-zipcode-database.csv
Original file line number Diff line number Diff line change
Expand Up @@ -47392,7 +47392,7 @@
47391,"95476","STANDARD","SONOMA","CA","PRIMARY",38.24,-122.46,-0.42,-0.66,0.61,"NA","US","Sonoma, CA","NA-US-CA-SONOMA","false",14900,24893,627942566,
47392,"95476","STANDARD","AGUA CALIENTE","CA","NOT ACCEPTABLE",38.24,-122.46,-0.42,-0.66,0.61,"NA","US","Agua Caliente, CA","NA-US-CA-AGUA CALIENTE","false",14900,24893,627942566,
47393,"95476","STANDARD","SCHELLVILLE","CA","NOT ACCEPTABLE",38.24,-122.46,-0.42,-0.66,0.61,"NA","US","Schellville, CA","NA-US-CA-SCHELLVILLE","false",14900,24893,627942566,
47394,"94080","STANDARD","SOUTH SAN FRANCISCO","CA","PRIMARY",37.65,-122.42,-0.42,-0.66,0.61,"NA","US","South San Francisco, CA","NA-US-CA-SOUTH SAN FRANCISCO","false",30695,53242,1494580412,
47394,"94080","STANDARD","SOUTH SAN FRANCISCO","HZFB MNHFFGHN ","PRIMARY",37.65,-122.42,-0.42,-0.66,0.61,"NA","US","South San Francisco, CA","NA-US-CA-SOUTH SAN FRANCISCO","false",30695,53242,1494580412,
47395,"94080","STANDARD","S SAN FRAN","CA","ACCEPTABLE",37.65,-122.42,-0.42,-0.66,0.61,"NA","US","S San Fran, CA","NA-US-CA-S SAN FRAN","false",30695,53242,1494580412,
47396,"94080","STANDARD","S SAN FRANCISCO","CA","NOT ACCEPTABLE",37.65,-122.42,-0.42,-0.66,0.61,"NA","US","S San Francisco, CA","NA-US-CA-S SAN FRANCISCO","false",30695,53242,1494580412,
47397,"94080","STANDARD","SSF","CA","NOT ACCEPTABLE",37.65,-122.42,-0.42,-0.66,0.61,"NA","US","Ssf, CA","NA-US-CA-SSF","false",30695,53242,1494580412,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package com.sparkbyexamples.spark.dataframe

import org.apache.spark.sql.SparkSession

object FromCSVMultiline extends App {

val spark:SparkSession = SparkSession.builder()
.master("local[3]")
.appName("SparkByExamples.com")
.getOrCreate()


val df = spark.read
.option("header",true)
.option("delimiter",",")
.option("multiLine",true)
.option("quotes","\"")
.csv("src/main/resources/address-multiline.csv")

df.show(false)
}









Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package com.sparkbyexamples.spark.dataframe.examples

import org.apache.spark.sql.SparkSession

object CastStringToInt extends App {

val spark = SparkSession.builder
.master("local[1]")
.appName("SparkByExamples.com")
.getOrCreate()

val simpleData = Seq(("James",34,"true","M","3000.6089"),
("Michael",33,"true","F","3300.8067"),
("Robert",37,"false","M","5000.5034")
)

import spark.implicits._
val df = simpleData.toDF("firstname","age","isGraduated","gender","salary")
df.printSchema()

import org.apache.spark.sql.functions.col
import org.apache.spark.sql.types.IntegerType
// Convert String to Integer Type
val df2= df.withColumn("salary",col("salary").cast(IntegerType))
df2.printSchema()
df2.show()

df.withColumn("salary",col("salary").cast("int")).printSchema()
df.withColumn("salary",col("salary").cast("integer")).printSchema()

// Using select
df.select(col("salary").cast("int").as("salary")).printSchema()

//Using selectExpr()
df.selectExpr("cast(salary as int) salary","isGraduated").printSchema()
df.selectExpr("INT(salary)","isGraduated").printSchema()

//Using with spark.sql()
df.createOrReplaceTempView("CastExample")
spark.sql("SELECT INT(salary),BOOLEAN(isGraduated),gender from CastExample").printSchema()
spark.sql("SELECT cast(salary as int) salary, BOOLEAN(isGraduated),gender from CastExample").printSchema()
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package com.sparkbyexamples.spark.dataframe.examples

import com.sparkbyexamples.spark.rdd.functions.FlatMapExample.spark
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.types.{ArrayType, StringType, StructType}

object MapFlatMap extends App{

val spark: SparkSession = SparkSession.builder()
.master("local[1]")
.appName("SparkByExamples.com")
.getOrCreate()

val data = Seq("Project Gutenberg’s",
"Alice’s Adventures in Wonderland",
"Project Gutenberg’s",
"Adventures in Wonderland",
"Project Gutenberg’s")

import spark.sqlContext.implicits._
val df = data.toDF("data")
df.show(false)

//Map Transformation
val mapDF=df.map(fun=> {
fun.getString(0).split(" ")
})
mapDF.show(false)

//Flat Map Transformation
val flatMapDF=df.flatMap(fun=>
{
fun.getString(0).split(" ")
})
flatMapDF.show()

val arrayStructureData = Seq(
Row("James,,Smith",List("Java","Scala","C++"),"CA"),
Row("Michael,Rose,",List("Spark","Java","C++"),"NJ"),
Row("Robert,,Williams",List("CSharp","VB","R"),"NV")
)

val arrayStructureSchema = new StructType()
.add("name",StringType)
.add("languagesAtSchool", ArrayType(StringType))
.add("currentState", StringType)

val df1 = spark.createDataFrame(
spark.sparkContext.parallelize(arrayStructureData),arrayStructureSchema)


//flatMap() Usage
val df2=df1.flatMap(f => {
val lang=f.getSeq[String](1)
lang.map((f.getString(0),_,f.getString(2)))
})

val df3=df2.toDF("Name","language","State")
df3.show(false)


}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package com.sparkbyexamples.spark.dataframe.examples

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.col
object RangePartition extends App{

val spark: SparkSession = SparkSession.builder() .master("local[1]")
.appName("SparkByExamples.com")
.getOrCreate()

/**
* Simple using columns list
*/
val data = Seq((1,10),(2,20),(3,10),(4,20),(5,10),
(6,30),(7,50),(8,50),(9,50),(10,30),
(11,10),(12,10),(13,40),(14,40),(15,40),
(16,40),(17,50),(18,10),(19,40),(20,40)
)

import spark.sqlContext.implicits._
val dfRange = data.toDF("id","count")
.repartitionByRange(5,col("count"))

dfRange.write.option("header",true).csv("c:/tmp/range-partition")
dfRange.write.partitionBy()

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package com.sparkbyexamples.spark.dataframe.examples

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, FileUtil, Path}
import org.apache.spark.sql.SparkSession

object RenameDeleteFile extends App{

val spark:SparkSession = SparkSession.builder()
.master("local[3]")
.appName("SparkByExamples.com")
.getOrCreate()

//Create Hadoop Configuration from Spark
val fs = FileSystem.get(spark.sparkContext.hadoopConfiguration)

val srcPath=new Path("/tmp/address_rename_merged.csv")
val destPath= new Path("/tmp/address_merged.csv")

//Rename a File
if(fs.exists(srcPath) && fs.isFile(srcPath))
fs.rename(srcPath,destPath)

//Alternatively, you can also create Hadoop configuration
val hadoopConfig = new Configuration()
val hdfs = FileSystem.get(hadoopConfig)
if(hdfs.isFile(srcPath))
hdfs.rename(srcPath,destPath)


//Delete a File
if(hdfs.isDirectory(srcPath))
hdfs.delete(new Path("/tmp/.address_merged2.csv.crc"),true)

import scala.sys.process._
//Delete a File
s"hdfs dfs -rm /tmp/.address_merged2.csv.crc" !

//Delete a Directory
s"hdfs dfs -rm -r /tmp/.address_merged2.csv.crc" !


}
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,19 @@ object SaveSingleFile extends App{
.appName("SparkByExamples.com")
.getOrCreate()

val df = spark.read.option("header",true).csv("src/main/resources/address.csv")
df.repartition(1).write.mode(SaveMode.Overwrite).csv("/tmp/address")
val df = spark.read.option("header",true)
.csv("src/main/resources/address.csv")
df.repartition(1)
.write.mode(SaveMode.Overwrite).csv("/tmp/address")


val hadoopConfig = new Configuration()
val hdfs = FileSystem.get(hadoopConfig)

val srcPath=new Path("/tmp/address")
val destPath= new Path("/tmp/address_merged.csv")
val srcFile=FileUtil.listFiles(new File("c:/tmp/address")).filterNot(f=>f.getPath.endsWith(".csv"))(0)
val srcFile=FileUtil.listFiles(new File("c:/tmp/address"))
.filterNot(f=>f.getPath.endsWith(".csv"))(0)
//Copy the CSV file outside of Directory and rename
FileUtil.copy(srcFile,hdfs,destPath,true,hadoopConfig)
//Remove Directory created by df.write()
Expand All @@ -31,10 +34,12 @@ object SaveSingleFile extends App{
hdfs.delete(new Path("/tmp/.address_merged.csv.crc"),true)

// Merge Using Haddop API
df.repartition(1).write.mode(SaveMode.Overwrite).csv("/tmp/address-tmp")
df.repartition(1).write.mode(SaveMode.Overwrite)
.csv("/tmp/address-tmp")
val srcFilePath=new Path("/tmp/address-tmp")
val destFilePath= new Path("/tmp/address_merged2.csv")
FileUtil.copyMerge(hdfs, srcFilePath, hdfs, destFilePath, true, hadoopConfig, null)
//Remove hidden CRC file if not needed.
hdfs.delete(new Path("/tmp/.address_merged2.csv.crc"),true)

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package com.sparkbyexamples.spark.dataframe.examples

import org.apache.spark.sql.types.{StringType, StructType}
import org.apache.spark.sql.{Row, SparkSession}

object SelectExamples extends App{

val spark = SparkSession.builder
.master("local[1]")
.appName("SparkByExamples.com")
.getOrCreate()

val data = Seq(("James","Smith","USA","CA"),
("Michael","Rose","USA","NY"),
("Robert","Williams","USA","CA"),
("Maria","Jones","USA","FL")
)

val columns = Seq("firstname","lastname","country","state")

import spark.implicits._
val df = data.toDF(columns:_*)
df.show(false)


df.select("firstname","lastname").show()
//Using Dataframe object name
df.select(df("firstname"),df("lastname")).show()
//Using col function
import org.apache.spark.sql.functions.col
df.select(col("firstname"),col("lastname")).show()

//Show all columns
df.select("*").show()
val columnsAll=df.columns.map(m=>col(m))
df.select(columnsAll:_*).show()
df.select(columns.map(m=>col(m)):_*).show()

//Show columns from list
val listCols= List("lastname","country")
df.select(listCols.map(m=>col(m)):_*).show()

//Show first few columns
df.select(df.columns.slice(0,3).map(m=>col(m)):_*).show(1)

//Show columns by index or position
df.select(df.columns(3)).show(3)

//Show columns from start and end index
df.select(df.columns.slice(2,4).map(m=>col(m)):_*).show(3)

//Show columns by regular expression
df.select(df.colRegex("`^.*name*`")).show()

df.select(df.columns.filter(f=>f.startsWith("first")).map(m=>col(m)):_*).show(3)
df.select(df.columns.filter(f=>f.endsWith("name")).map(m=>col(m)):_*).show(3)

//Show Nested columns
val data2 = Seq(Row(Row("James","","Smith"),"OH","M"),
Row(Row("Anna","Rose",""),"NY","F"),
Row(Row("Julia","","Williams"),"OH","F"),
Row(Row("Maria","Anne","Jones"),"NY","M"),
Row(Row("Jen","Mary","Brown"),"NY","M"),
Row(Row("Mike","Mary","Williams"),"OH","M")
)

val schema = new StructType()
.add("name",new StructType()
.add("firstname",StringType)
.add("middlename",StringType)
.add("lastname",StringType))
.add("state",StringType)
.add("gender",StringType)

val df2 = spark.createDataFrame(
spark.sparkContext.parallelize(data2),schema)
df2.printSchema()
df2.show(false)
df2.select("name").show(false)
df2.select("name.firstname","name.lastname").show(false)
df2.select("name.*").show(false)
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,15 @@ object SelectSelectExpr extends App {

val spark:SparkSession = SparkSession.builder()
.master("local[1]")
.appName("SparkByExample")
.appName("SparkByExamples.com")
.getOrCreate()

val data = Seq(("Java", "20000"), ("Python", "100000"), ("Scala", "3000"))
val df = spark.createDataFrame(data).toDF("language","users_count")
df.select("language","users_count as count").show() //Example 1
df.select(df("language"),df("users_count").as("count")).show() //Example 2
df.select(col("language"),col("users_count")).show() ////Example 3
//df.select("language",col("users_count")).show() ////Example 3

df.selectExpr("language","users_count as count").show() //Example 1
//df.selectExpr(df("language"),df("users_count").as("count")).show() //Example 2
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package com.sparkbyexamples.spark.dataframe.functions.datetime

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{expr,col}
import org.apache.spark.sql.functions._
object AddTime extends App {

val spark:SparkSession = SparkSession.builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,12 +66,10 @@ object WindowFunctions extends App {
//Aggregate Functions
val windowSpecAgg = Window.partitionBy("department")
val aggDF = df.withColumn("row",row_number.over(windowSpec))
.withColumn("avg", avg(col("salary")).over(windowSpecAgg))
.withColumn("avg", avg(col("salary")).over(windowSpecAgg))
.withColumn("sum", sum(col("salary")).over(windowSpecAgg))
.withColumn("min", min(col("salary")).over(windowSpecAgg))
.withColumn("max", max(col("salary")).over(windowSpecAgg))
// .where(col("row")===1).select("department","avg","sum","min","max")
.where(col("row")===1).select("department","avg","sum","min","max")
.show()


}

0 comments on commit fe89391

Please sign in to comment.