In this blog I am going to discusses about some of the common spark program patterns, written in Scala.
1. Read from RDBMS (MySQL) append to an existing HIVE Warehouse Table
// MySQL connection parameters
val jdbcUrl = "jdbc:mysql://your-mysql-host:3306/your_database"
val dbTable = "your_table"
val dbUser = "your_username"
val dbPassword = "your_password"
// Read data from MySQL
def df = spark.read
.format("jdbc")
.option("url", jdbcUrl)
.option("dbtable", dbTable)
.option("user", dbUser)
.option("password", dbPassword)
.option("driver", "com.mysql.cj.jdbc.Driver")
.load()
// Append data to an existing Hive table
df.write
.mode("append")
.saveAsTable("your_hive_database.your_hive_table")
2. Read from an HIVE table append to MySQL table
// Hive table parameters
val hiveDatabase = "your_hive_database"
val hiveTable = "your_hive_table"
// MySQL connection parameters
val jdbcUrl = "jdbc:mysql://your-mysql-host:3306/your_database"
val dbTable = "your_mysql_table"
val dbUser = "your_username"
val dbPassword = "your_password"
// Read data from Hive
def df = spark.read
.table(s"$hiveDatabase.$hiveTable")
// Append data to MySQL table
df.write
.format("jdbc")
.option("url", jdbcUrl)
.option("dbtable", dbTable)
.option("user", dbUser)
.option("password", dbPassword)
.option("driver", "com.mysql.cj.jdbc.Driver")
.mode("append")
.save()
3. Read CSV from Local (Linux) file System and store it as Parquet in HDFS
// Define input and output paths
val inputCsvPath = "file:///path/to/input.csv" // Local Linux file path
val outputHdfsPath = "hdfs:///path/to/output_parquet" // HDFS path
// Read the CSV file with header and infer schema
def df = spark.read
.option("header", "true")
.option("inferSchema", "true")
.csv(inputCsvPath)
// Write as Parquet to HDFS
df.write
.mode("overwrite")
.parquet(outputHdfsPath)
4. Read a JSON file from HDFS and store the data in HIVE
// Define input JSON path in HDFS
val inputJsonPath = "hdfs:///path/to/input.json"
// Hive table details
val hiveDatabase = "your_hive_database"
val hiveTable = "your_hive_table"
// Read the JSON file from HDFS
def df = spark.read
.option("inferSchema", "true")
.json(inputJsonPath)
// Write data to Hive table
df.write
.mode("overwrite")
.saveAsTable(s"$hiveDatabase.$hiveTable")
5. Read from an XML in HDFS store the data in a HIVE table
// Define input XML path in HDFS
val inputXmlPath = "hdfs:///path/to/input.xml"
// Hive table details
val hiveDatabase = "your_hive_database"
val hiveTable = "your_hive_table"
// Read the XML file from HDFS
def df = spark.read
.format("com.databricks.spark.xml")
.option("rowTag", "rootElement") // Replace 'rootElement' with the actual XML tag
.option("inferSchema", "true")
.load(inputXmlPath)
// Write data to Hive table
df.write
.mode("overwrite")
.saveAsTable(s"$hiveDatabase.$hiveTable")
6. Read from CSV in HDFS and store it as HIVE ORC table
// Define input CSV path in HDFS
val inputCsvPath = "hdfs:///path/to/input.csv"
// Hive table details
val hiveDatabase = "your_hive_database"
val hiveTable = "your_hive_table"
// Read the CSV file from HDFS
def df = spark.read
.option("header", "true")
.option("inferSchema", "true")
.csv(inputCsvPath)
// Write data to Hive table in ORC format
df.write
.mode("overwrite")
.format("orc")
.saveAsTable(s"$hiveDatabase.$hiveTable")
7. Insert records to an HIVE ORC from Spark
// Import the required libraries
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
// Define input CSV path in HDFS
val inputCsvPath = "hdfs:///path/to/input.csv"
// Hive table details
val hiveDatabase = "your_hive_database"
val hiveTable = "your_hive_table"
// Read the CSV file from HDFS
def df = spark.read
.option("header", "true")
.option("inferSchema", "true")
.csv(inputCsvPath)
// Write data to Hive table in ORC format
df.write
.mode("overwrite")
.format("orc")
.saveAsTable(s"$hiveDatabase.$hiveTable")
// Manually create a new record
val newDataSchema = StructType(Seq(
StructField("id", IntegerType, false),
StructField("name", StringType, true),
StructField("age", IntegerType, true)
))
val newData = Seq(
Row(101, "John Doe", 30),
Row(102, "Jane Smith", 28)
)
val newDF = spark.createDataFrame(spark.sparkContext.parallelize(newData), newDataSchema)
// Append new records to Hive table in ORC format
newDF.write
.mode("append")
.format("orc")
.saveAsTable(s"$hiveDatabase.$hiveTable")
8. Word Count in Spark
//Import the required libraries
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
// Define input text file path in HDFS
val inputTextPath = "hdfs:///path/to/input.txt"
// Read the text file from HDFS and perform word count
val textFile = spark.read.textFile(inputTextPath)
val wordCounts = textFile.rdd
.flatMap(line => line.split(" "))
.map(word => (word, 1))
.reduceByKey(_ + _)
.toDF("word", "count")
// MySQL connection parameters
val jdbcUrl = "jdbc:mysql://your-mysql-host:3306/your_database"
val dbTable = "word_count_table"
val dbUser = "your_username"
val dbPassword = "your_password"
// Write word count results to MySQL
wordCounts.write
.format("jdbc")
.option("url", jdbcUrl)
.option("dbtable", dbTable)
.option("user", dbUser)
.option("password", dbPassword)
.option("driver", "com.mysql.cj.jdbc.Driver")
.mode("overwrite")
.save()
9. Common transformations on DataFrames
//Import required libraries
import org.apache.spark.sql.{SparkSession, DataFrame} import org.apache.spark.sql.functions._
// Step 1: Load a sample DataFrame (you can replace it with your own dataset)
val data = Seq( (1, "Alice", 29, "New York"), (2, "Bob", 31, "Los Angeles"), (3, "Catherine", 24, "Chicago"), (4, "David", 35, "New York"), (5, "Eva", 29, "Chicago") ) val columns = Seq("id", "name", "age", "city") // Create a DataFrame val df: DataFrame = spark.createDataFrame(data).toDF(columns: _*) // Step 2: Apply common transformations // 2.1 Select specific columns val selectedColumnsDF = df.select("name", "age") selectedColumnsDF.show() // 2.2 Filter data (e.g., select people older than 30) val filteredDF = df.filter(col("age") > 30) filteredDF.show() // 2.3 Add a new column (e.g., increase age by 1) val updatedDF = df.withColumn("age_plus_one", col("age") + 1) updatedDF.show() // 2.4 Group by and aggregate (e.g., count people by city) val groupByCityDF = df.groupBy("city").count() groupByCityDF.show() // 2.5 Join with another DataFrame val data2 = Seq( (1, "Engineer"), (2, "Doctor"), (3, "Artist"), (4, "Chef"), (5, "Teacher") ) val columns2 = Seq("id", "profession") val df2 = spark.createDataFrame(data2).toDF(columns2: _*) val joinedDF = df.join(df2, "id") joinedDF.show() // 2.6 Drop a column val droppedDF = df.drop("age") droppedDF.show() // 2.7 Rename a column val renamedDF = df.withColumnRenamed("name", "full_name") renamedDF.show() // 2.8 Order by a column (e.g., order by age) val orderedDF = df.orderBy(col("age").desc) orderedDF.show() // 2.9 Distinct values (remove duplicates) val distinctDF = df.distinct() distinctDF.show() // 2.10 Create a new column using a conditional expression val conditionalDF = df.withColumn( "age_group", when(col("age") < 30, "Young") .when(col("age") >= 30 && col("age") < 40, "Middle-aged") .otherwise("Older") ) conditionalDF.show()
10. Common transformations on DataSets
//Import Required libraries
import org.apache.spark.sql.{SparkSession}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.Dataset
// Step 1: Define case class for a strongly-typed Dataset
case class Person(id: Int, name: String, age: Int, city: String, profession: String)
// Step 2: Load data into Dataset (you can replace it with your own dataset)
import spark.implicits._
val data = Seq(
Person(1, "Alice", 29, "New York", "Engineer"),
Person(2, "Bob", 31, "Los Angeles", "Doctor"),
Person(3, "Catherine", 24, "Chicago", "Artist"),
Person(4,"David", 35, "New York", "Chef"),
Person(5,"Eva", 29, "Chicago", "Teacher")
)
// Convert the sequence to a Dataset
val ds: Dataset[Person] = spark.createDataset(data)
// Step 3: Apply common transformations
// 3.1 Select specific columns (using Dataset API)
val selectedColumnsDS = ds.select($"name", $"age")
selectedColumnsDS.show()
// 3.2 Filter data (e.g., select people older than 30)
val filteredDS = ds.filter($"age" > 30)
filteredDS.show()
// 3.3 Add a new column (e.g., increase age by 1)
val updatedDS = ds.withColumn("age_plus_one", $"age" + 1)
updatedDS.show()
// 3.4 Group by and aggregate (e.g., count people by city)
val groupByCityDS = ds.groupBy($"city").count()
groupByCityDS.show()
// 3.5 Join with another Dataset (using Dataset API)
val data2 = Seq(
(1,"Engineer"),
(2,"Doctor"),
(3,"Artist"),
(4,"Chef"),
(5,"Teacher"))
val columns2 = Seq("id", "profession")
val ds2 = data2.toDF("id", "profession").as[(Int, String)]
val joinedDS = ds.join(ds2, "id")
joinedDS.show()
// 3.6 Drop a column
val droppedDS = ds.drop("age")
droppedDS.show()
// 3.7 Rename a column
val renamedDS = ds.withColumnRenamed("name","full_name")
renamedDS.show()
// 3.8 Order by a column (e.g., order by age)
val orderedDS = ds.orderBy($"age".desc)
orderedDS.show()
// 3.9 Distinct values (remove duplicates)
val distinctDS = ds.distinct()
distinctDS.show()
// 3.10 Create a new column using a conditional expression
val conditionalDS = ds.withColumn("age_group",
when($"age" < 30, "Young")
.when($"age" >= 30 && $"age" < 40,"Middle-aged")
.otherwise("Older"))
conditionalDS.show()
11. Common transformations using Spark SQL
// Step 1: Import the required libraries
import org.apache.spark.sql.{SparkSession}
import org.apache.spark.sql.functions._
// Step 2: Load data into a DataFrame (you can replace it with your own dataset)
import spark.implicits._
val data = Seq(
(1, "Alice", 29, "New York", "Engineer"),
(2, "Bob", 31, "Los Angeles", "Doctor"),
(3, "Catherine", 24, "Chicago", "Artist"),
(4, "David", 35, "New York", "Chef"),
(5, "Eva", 29, "Chicago", "Teacher")
)
// Convert the sequence to a DataFrame
val df = data.toDF("id", "name", "age", "city", "profession")
// Step 3: Register the DataFrame as a temporary SQL table
df.createOrReplaceTempView("people")
// Step 4: Apply common transformations using Spark SQL
// 4.1 Select specific columns
println("Select specific columns:")
spark.sql("SELECT name, age FROM people").show()
// 4.2 Filter data (e.g., select people older than 30)
println("Filter people older than 30:")
spark.sql("SELECT * FROM people WHERE age > 30").show()
// 4.3 Add a new column (e.g., increase age by 1)
println("Add a new column (age + 1):")
spark.sql("SELECT *, age + 1 AS age_plus_one FROM people").show()
// 4.4 Group by and aggregate (e.g., count people by city)
println("Group by and count people by city:")
spark.sql("SELECT city, COUNT(*) AS count FROM people GROUP BY city").show()
// 4.5 Join with another DataFrame (using Spark SQL)
val data2 = Seq(
(1, "Engineer"),
(2, "Doctor"),
(3, "Artist"),
(4, "Chef"),
(5, "Teacher")
)
// Convert the second dataset to a DataFrame
val df2 = data2.toDF("id", "profession")
// Register the second DataFrame as a temporary table
df2.createOrReplaceTempView("professions")
spark.sql("SELECT p.id, p.name, p.age, p.city, pr.profession FROM people p JOIN professions pr ON p.id = pr.id").show()
// 4.6 Drop a column (Spark SQL doesn't support dropping columns directly, but we can select without it)
spark.sql("SELECT id, name, city, profession FROM people").show()
// 4.7 Rename a column (again, done by selecting with the new name)
spark.sql("SELECT id, name AS full_name, age, city, profession FROM people").show()
// 4.8 Order by a column (e.g., order by age)
spark.sql("SELECT * FROM people ORDER BY age DESC").show()
// 4.9 Distinct values (remove duplicates)
spark.sql("SELECT DISTINCT * FROM people").show()
// 4.10 Create a new column using a conditional expression
spark.sql("""
SELECT id, name, age, city, profession,
CASE
WHEN age < 30 THEN 'Young'
WHEN age >= 30 AND age < 40 THEN 'Middle-aged'
ELSE 'Older'
END AS age_group
FROM people
""").show()
12. Common transformations using RDD
// Step 1: Import libraries
import org.apache.spark.sql.SparkSession
// Step 2: Create an RDD from a collection val data = Seq( (1, "Alice", 29), (2, "Bob", 31), (3, "Catherine", 24), (4, "David", 35), (5, "Eva", 29) ) // Create an RDD from a sequence of tuples val rdd = spark.sparkContext.parallelize(data) // Step 3: Apply common RDD transformations // 3.1 Map operation (Transforming the data) val namesRDD = rdd.map(record => record._2) println("Names RDD:") namesRDD.collect().foreach(println) // 3.2 Filter operation (Filter out records where age is greater than 30) val filteredRDD = rdd.filter(record => record._3 > 30) println("Filtered RDD (age > 30):") filteredRDD.collect().foreach(println) // 3.3 FlatMap operation (Used to flatten results, e.g., splitting a string into words) val wordsRDD = rdd.flatMap(record => record._2.split(" ")) println("FlatMap RDD (splitting names into words):") wordsRDD.collect().foreach(println) // 3.4 GroupBy operation (Group by age) val groupedByAgeRDD = rdd.groupBy(record => record._3) println("Grouped by Age RDD:") groupedByAgeRDD.collect().foreach(println) // 3.5 Reduce operation (Sum of all ages) val sumOfAges = rdd.map(record => record._3).reduce((a, b) => a + b) println(s"Sum of ages: $sumOfAges") // 3.6 Aggregate operation (Calculate sum and count of ages) val aggregatedRDD = rdd.map(record => record._3).aggregate((0, 0))( (acc, age) => (acc._1 + age, acc._2 + 1), // combiner logic (acc1, acc2) => (acc1._1 + acc2._1, acc1._2 + acc2._2) // combiner logic for different partitions ) println(s"Sum of ages: ${aggregatedRDD._1}, Count of records: ${aggregatedRDD._2}") // 3.7 Union operation (Combine two RDDs) val moreData = Seq( (6, "George", 28), (7, "Hannah", 32) ) val moreDataRDD = spark.sparkContext.parallelize(moreData) val unionRDD = rdd.union(moreDataRDD) println("Union of RDDs:") unionRDD.collect().foreach(println) // 3.8 Intersection operation (Find common elements between two RDDs) val intersectionRDD = rdd.intersection(moreDataRDD) println("Intersection of RDDs:") intersectionRDD.collect().foreach(println) // 3.9 Distinct operation (Remove duplicates) val distinctRDD = rdd.distinct() println("Distinct RDD:") distinctRDD.collect().foreach(println) // 3.10 SortBy operation (Sort by age) val sortedRDD = rdd.sortBy(record => record._3) println("Sorted by age RDD:") sortedRDD.collect().foreach(println) // Step 4: Apply common RDD actions // 4.1 Collect operation (Collect all elements of the RDD to the driver) val collectedData = rdd.collect() println("Collected RDD data:") collectedData.foreach(println) // 4.2 Count operation (Count the number of records) val count = rdd.count() println(s"Number of records in RDD: $count") // 4.3 First operation (Get the first element of the RDD) val firstElement = rdd.first() println(s"First element in RDD: $firstElement") // 4.4 Take operation (Take the first n elements) val takeTwo = rdd.take(2) println("First 2 elements in RDD:") takeTwo.foreach(println) // 4.5 Save operation (Save the RDD to a text file) rdd.saveAsTextFile("output_rdd.txt") println("RDD data has been saved to 'output_rdd.txt'")