Tuesday, 4 February 2025

Common Spark Applications



In this blog I am going to discusses about some of the common spark program patterns, written in Scala.


1. Read from RDBMS (MySQL)  append to an existing HIVE Warehouse Table

I will read a table from a remote MySQL database using spark to a DataFrame and insert the DataFrame to an existing HIVE table.

// MySQL connection parameters

val jdbcUrl = "jdbc:mysql://your-mysql-host:3306/your_database"

val dbTable = "your_table"

val dbUser = "your_username"

val dbPassword = "your_password"

// Read data from MySQL

def df = spark.read

    .format("jdbc")

    .option("url", jdbcUrl)

    .option("dbtable", dbTable)

    .option("user", dbUser)

    .option("password", dbPassword)

    .option("driver", "com.mysql.cj.jdbc.Driver")

    .load()

// Append data to an existing Hive table

df.write

    .mode("append")

    .saveAsTable("your_hive_database.your_hive_table")

2. Read from an HIVE table append to MySQL table

I will do the reverse here, I will read the data from a HIVE table into Spark DataFrame and store the data to a remote RDBMS (MySQL) table. 

// Hive table parameters

val hiveDatabase = "your_hive_database"

val hiveTable = "your_hive_table"

// MySQL connection parameters

val jdbcUrl = "jdbc:mysql://your-mysql-host:3306/your_database"

val dbTable = "your_mysql_table"

val dbUser = "your_username"

val dbPassword = "your_password"

// Read data from Hive

def df = spark.read

    .table(s"$hiveDatabase.$hiveTable")

// Append data to MySQL table

df.write

    .format("jdbc")

    .option("url", jdbcUrl)

    .option("dbtable", dbTable)

    .option("user", dbUser)

    .option("password", dbPassword)

    .option("driver", "com.mysql.cj.jdbc.Driver")

    .mode("append")

    .save()

3. Read CSV from Local (Linux) file System and store it as Parquet in HDFS 

Here we will read a CSV from local (Linux) file system to a Spark DataFrame and store the data to HDFS as Parquet.

// Define input and output paths

val inputCsvPath = "file:///path/to/input.csv" // Local Linux file path

val outputHdfsPath = "hdfs:///path/to/output_parquet" // HDFS path

// Read the CSV file with header and infer schema

def df = spark.read

    .option("header", "true")

    .option("inferSchema", "true")

    .csv(inputCsvPath)

// Write as Parquet to HDFS

df.write

    .mode("overwrite")

    .parquet(outputHdfsPath)

4. Read a JSON file from HDFS and store the data in HIVE

Here we will read a JSON file from HDFS to a Spark DataFrame and insert the data to a HIVE table.

// Define input JSON path in HDFS

val inputJsonPath = "hdfs:///path/to/input.json"

// Hive table details

val hiveDatabase = "your_hive_database"

val hiveTable = "your_hive_table"

// Read the JSON file from HDFS

def df = spark.read

    .option("inferSchema", "true")

    .json(inputJsonPath)

// Write data to Hive table

df.write

    .mode("overwrite")

    .saveAsTable(s"$hiveDatabase.$hiveTable")

5. Read from an XML in HDFS store the data in a HIVE table

In this code snippet I am reading an XML file from HDFS to a Spark DataFrame and storing the data to a HIVE table. 

// Define input XML path in HDFS

val inputXmlPath = "hdfs:///path/to/input.xml"

// Hive table details

val hiveDatabase = "your_hive_database"

val hiveTable = "your_hive_table"

// Read the XML file from HDFS

def df = spark.read

    .format("com.databricks.spark.xml")

    .option("rowTag", "rootElement") // Replace 'rootElement' with the actual XML tag

    .option("inferSchema", "true")

    .load(inputXmlPath)

// Write data to Hive table

df.write

    .mode("overwrite")

    .saveAsTable(s"$hiveDatabase.$hiveTable")

6. Read from CSV in HDFS and store it as HIVE ORC table

Here I will be reading a CSV in HDFS to Spark DataFrame and Store the data in a HIVE ORC table.

// Define input CSV path in HDFS

val inputCsvPath = "hdfs:///path/to/input.csv"

// Hive table details

val hiveDatabase = "your_hive_database"

val hiveTable = "your_hive_table"

// Read the CSV file from HDFS

def df = spark.read

    .option("header", "true")

    .option("inferSchema", "true")

    .csv(inputCsvPath)

// Write data to Hive table in ORC format

df.write

    .mode("overwrite")

    .format("orc")

    .saveAsTable(s"$hiveDatabase.$hiveTable")

7. Insert records to an HIVE ORC from Spark

Here in this code snippet I will read a CSV in HDFS and insert the data into an HIVE ORC table, then I will create some record manually and insert the manually created rows to the same HIVE table. 

// Import the required libraries

import org.apache.spark.sql.{Row, SparkSession}

import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}

// Define input CSV path in HDFS

val inputCsvPath = "hdfs:///path/to/input.csv"

// Hive table details

val hiveDatabase = "your_hive_database"

val hiveTable = "your_hive_table"

// Read the CSV file from HDFS

def df = spark.read

    .option("header", "true")

    .option("inferSchema", "true")

    .csv(inputCsvPath)

// Write data to Hive table in ORC format

df.write

    .mode("overwrite")

    .format("orc")

    .saveAsTable(s"$hiveDatabase.$hiveTable")

// Manually create a new record

val newDataSchema = StructType(Seq(

    StructField("id", IntegerType, false),

    StructField("name", StringType, true),

    StructField("age", IntegerType, true)

))

val newData = Seq(

    Row(101, "John Doe", 30),

    Row(102, "Jane Smith", 28)

)

val newDF = spark.createDataFrame(spark.sparkContext.parallelize(newData), newDataSchema)

// Append new records to Hive table in ORC format

newDF.write

    .mode("append")

    .format("orc")

    .saveAsTable(s"$hiveDatabase.$hiveTable")

8. Word Count in Spark

Here is a wordcount program written in Spark, the result is stored to a RDBMS table.

//Import the required libraries

import org.apache.spark.sql.{Row, SparkSession}

import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}

// Define input text file path in HDFS

val inputTextPath = "hdfs:///path/to/input.txt"

// Read the text file from HDFS and perform word count

val textFile = spark.read.textFile(inputTextPath)

val wordCounts = textFile.rdd

    .flatMap(line => line.split(" "))

    .map(word => (word, 1))

    .reduceByKey(_ + _)

    .toDF("word", "count")

// MySQL connection parameters

val jdbcUrl = "jdbc:mysql://your-mysql-host:3306/your_database"

val dbTable = "word_count_table"

val dbUser = "your_username"

val dbPassword = "your_password"

// Write word count results to MySQL

wordCounts.write

    .format("jdbc")

    .option("url", jdbcUrl)

    .option("dbtable", dbTable)

    .option("user", dbUser)

    .option("password", dbPassword)

    .option("driver", "com.mysql.cj.jdbc.Driver")

    .mode("overwrite")

    .save()

9. Common transformations on DataFrames

Here is a Spark Programme covering most common transformations on DataFrames.

//Import required libraries

import org.apache.spark.sql.{SparkSession, DataFrame} import org.apache.spark.sql.functions._

// Step 1: Load a sample DataFrame (you can replace it with your own dataset)

val data = Seq( (1, "Alice", 29, "New York"), (2, "Bob", 31, "Los Angeles"), (3, "Catherine", 24, "Chicago"), (4, "David", 35, "New York"), (5, "Eva", 29, "Chicago") ) val columns = Seq("id", "name", "age", "city") // Create a DataFrame val df: DataFrame = spark.createDataFrame(data).toDF(columns: _*) // Step 2: Apply common transformations // 2.1 Select specific columns val selectedColumnsDF = df.select("name", "age") selectedColumnsDF.show() // 2.2 Filter data (e.g., select people older than 30) val filteredDF = df.filter(col("age") > 30) filteredDF.show() // 2.3 Add a new column (e.g., increase age by 1) val updatedDF = df.withColumn("age_plus_one", col("age") + 1) updatedDF.show() // 2.4 Group by and aggregate (e.g., count people by city) val groupByCityDF = df.groupBy("city").count() groupByCityDF.show() // 2.5 Join with another DataFrame val data2 = Seq( (1, "Engineer"), (2, "Doctor"), (3, "Artist"), (4, "Chef"), (5, "Teacher") ) val columns2 = Seq("id", "profession") val df2 = spark.createDataFrame(data2).toDF(columns2: _*) val joinedDF = df.join(df2, "id") joinedDF.show() // 2.6 Drop a column val droppedDF = df.drop("age") droppedDF.show() // 2.7 Rename a column val renamedDF = df.withColumnRenamed("name", "full_name") renamedDF.show() // 2.8 Order by a column (e.g., order by age) val orderedDF = df.orderBy(col("age").desc) orderedDF.show() // 2.9 Distinct values (remove duplicates) val distinctDF = df.distinct() distinctDF.show() // 2.10 Create a new column using a conditional expression val conditionalDF = df.withColumn( "age_group", when(col("age") < 30, "Young") .when(col("age") >= 30 && col("age") < 40, "Middle-aged") .otherwise("Older") ) conditionalDF.show()

10. Common transformations on DataSets

//Import Required libraries

import org.apache.spark.sql.{SparkSession}

import org.apache.spark.sql.functions._

import org.apache.spark.sql.Dataset

// Step 1: Define case class for a strongly-typed Dataset

case class Person(id: Int, name: String, age: Int, city: String, profession: String)

// Step 2: Load data into Dataset (you can replace it with your own dataset)

import spark.implicits._

val data = Seq(

  Person(1, "Alice", 29, "New York", "Engineer"),

  Person(2, "Bob", 31, "Los Angeles", "Doctor"),

  Person(3, "Catherine", 24, "Chicago", "Artist"),

  Person(4,"David", 35, "New York", "Chef"),

  Person(5,"Eva", 29, "Chicago", "Teacher")

)

// Convert the sequence to a Dataset

val ds: Dataset[Person] = spark.createDataset(data)

// Step 3: Apply common transformations

// 3.1 Select specific columns (using Dataset API)

val selectedColumnsDS = ds.select($"name", $"age")

selectedColumnsDS.show()

// 3.2 Filter data (e.g., select people older than 30)

val filteredDS = ds.filter($"age" > 30)

filteredDS.show()

// 3.3 Add a new column (e.g., increase age by 1)

val updatedDS = ds.withColumn("age_plus_one", $"age" + 1)

updatedDS.show()

// 3.4 Group by and aggregate (e.g., count people by city)

val groupByCityDS = ds.groupBy($"city").count()

groupByCityDS.show()

// 3.5 Join with another Dataset (using Dataset API)

val data2 = Seq(

  (1,"Engineer"),

  (2,"Doctor"),

  (3,"Artist"),

  (4,"Chef"),

  (5,"Teacher"))

val columns2 = Seq("id", "profession")

val ds2 = data2.toDF("id", "profession").as[(Int, String)]

val joinedDS = ds.join(ds2, "id")

joinedDS.show()

// 3.6 Drop a column

val droppedDS = ds.drop("age")

droppedDS.show()

// 3.7 Rename a column

val renamedDS = ds.withColumnRenamed("name","full_name")

renamedDS.show()

// 3.8 Order by a column (e.g., order by age)

val orderedDS = ds.orderBy($"age".desc)

orderedDS.show()

// 3.9 Distinct values (remove duplicates)

val distinctDS = ds.distinct()

distinctDS.show()

// 3.10 Create a new column using a conditional expression

val conditionalDS = ds.withColumn("age_group", 

when($"age" < 30, "Young")  

.when($"age" >= 30 && $"age" < 40,"Middle-aged")

.otherwise("Older"))

conditionalDS.show()

11. Common transformations using Spark SQL

In this code snippet I will be rewriting the above program using Spark SQL.
 

   // Step 1: Import the required libraries

     import org.apache.spark.sql.{SparkSession}

    import org.apache.spark.sql.functions._

    // Step 2: Load data into a DataFrame (you can replace it with your own dataset)

    import spark.implicits._

    val data = Seq(

      (1, "Alice", 29, "New York", "Engineer"),

      (2, "Bob", 31, "Los Angeles", "Doctor"),

      (3, "Catherine", 24, "Chicago", "Artist"),

      (4, "David", 35, "New York", "Chef"),

      (5, "Eva", 29, "Chicago", "Teacher")

    )

    // Convert the sequence to a DataFrame

    val df = data.toDF("id", "name", "age", "city", "profession")

    // Step 3: Register the DataFrame as a temporary SQL table

    df.createOrReplaceTempView("people")

    // Step 4: Apply common transformations using Spark SQL

    // 4.1 Select specific columns

    println("Select specific columns:")

    spark.sql("SELECT name, age FROM people").show()

    // 4.2 Filter data (e.g., select people older than 30)

    println("Filter people older than 30:")

    spark.sql("SELECT * FROM people WHERE age > 30").show()

    // 4.3 Add a new column (e.g., increase age by 1)

    println("Add a new column (age + 1):")

    spark.sql("SELECT *, age + 1 AS age_plus_one FROM people").show()

    // 4.4 Group by and aggregate (e.g., count people by city)

    println("Group by and count people by city:")

    spark.sql("SELECT city, COUNT(*) AS count FROM people GROUP BY city").show()

    // 4.5 Join with another DataFrame (using Spark SQL)

    val data2 = Seq(

      (1, "Engineer"),

      (2, "Doctor"),

      (3, "Artist"),

      (4, "Chef"),

      (5, "Teacher")

    )

    // Convert the second dataset to a DataFrame

    val df2 = data2.toDF("id", "profession")

    // Register the second DataFrame as a temporary table

    df2.createOrReplaceTempView("professions")

    spark.sql("SELECT p.id, p.name, p.age, p.city, pr.profession FROM people p JOIN professions pr ON p.id = pr.id").show()

    // 4.6 Drop a column (Spark SQL doesn't support dropping columns directly, but we can select without it)

    spark.sql("SELECT id, name, city, profession FROM people").show()

    // 4.7 Rename a column (again, done by selecting with the new name)

    spark.sql("SELECT id, name AS full_name, age, city, profession FROM people").show()

    // 4.8 Order by a column (e.g., order by age)

    spark.sql("SELECT * FROM people ORDER BY age DESC").show()

    // 4.9 Distinct values (remove duplicates)

    spark.sql("SELECT DISTINCT * FROM people").show()

    // 4.10 Create a new column using a conditional expression

    spark.sql("""

      SELECT id, name, age, city, profession,

             CASE

               WHEN age < 30 THEN 'Young'

               WHEN age >= 30 AND age < 40 THEN 'Middle-aged'

               ELSE 'Older'

             END AS age_group

      FROM people

    """).show()

12. Common transformations using RDD

In this code snippet I will be showing some common transformation on RDD.

    // Step 1: Import libraries  

    import org.apache.spark.sql.SparkSession

// Step 2: Create an RDD from a collection val data = Seq( (1, "Alice", 29), (2, "Bob", 31), (3, "Catherine", 24), (4, "David", 35), (5, "Eva", 29) ) // Create an RDD from a sequence of tuples val rdd = spark.sparkContext.parallelize(data) // Step 3: Apply common RDD transformations // 3.1 Map operation (Transforming the data) val namesRDD = rdd.map(record => record._2) println("Names RDD:") namesRDD.collect().foreach(println) // 3.2 Filter operation (Filter out records where age is greater than 30) val filteredRDD = rdd.filter(record => record._3 > 30) println("Filtered RDD (age > 30):") filteredRDD.collect().foreach(println) // 3.3 FlatMap operation (Used to flatten results, e.g., splitting a string into words) val wordsRDD = rdd.flatMap(record => record._2.split(" ")) println("FlatMap RDD (splitting names into words):") wordsRDD.collect().foreach(println) // 3.4 GroupBy operation (Group by age) val groupedByAgeRDD = rdd.groupBy(record => record._3) println("Grouped by Age RDD:") groupedByAgeRDD.collect().foreach(println) // 3.5 Reduce operation (Sum of all ages) val sumOfAges = rdd.map(record => record._3).reduce((a, b) => a + b) println(s"Sum of ages: $sumOfAges") // 3.6 Aggregate operation (Calculate sum and count of ages) val aggregatedRDD = rdd.map(record => record._3).aggregate((0, 0))( (acc, age) => (acc._1 + age, acc._2 + 1), // combiner logic (acc1, acc2) => (acc1._1 + acc2._1, acc1._2 + acc2._2) // combiner logic for different partitions ) println(s"Sum of ages: ${aggregatedRDD._1}, Count of records: ${aggregatedRDD._2}") // 3.7 Union operation (Combine two RDDs) val moreData = Seq( (6, "George", 28), (7, "Hannah", 32) ) val moreDataRDD = spark.sparkContext.parallelize(moreData) val unionRDD = rdd.union(moreDataRDD) println("Union of RDDs:") unionRDD.collect().foreach(println) // 3.8 Intersection operation (Find common elements between two RDDs) val intersectionRDD = rdd.intersection(moreDataRDD) println("Intersection of RDDs:") intersectionRDD.collect().foreach(println) // 3.9 Distinct operation (Remove duplicates) val distinctRDD = rdd.distinct() println("Distinct RDD:") distinctRDD.collect().foreach(println) // 3.10 SortBy operation (Sort by age) val sortedRDD = rdd.sortBy(record => record._3) println("Sorted by age RDD:") sortedRDD.collect().foreach(println) // Step 4: Apply common RDD actions // 4.1 Collect operation (Collect all elements of the RDD to the driver) val collectedData = rdd.collect() println("Collected RDD data:") collectedData.foreach(println) // 4.2 Count operation (Count the number of records) val count = rdd.count() println(s"Number of records in RDD: $count") // 4.3 First operation (Get the first element of the RDD) val firstElement = rdd.first() println(s"First element in RDD: $firstElement") // 4.4 Take operation (Take the first n elements) val takeTwo = rdd.take(2) println("First 2 elements in RDD:") takeTwo.foreach(println) // 4.5 Save operation (Save the RDD to a text file) rdd.saveAsTextFile("output_rdd.txt") println("RDD data has been saved to 'output_rdd.txt'")

No comments:

Post a Comment

Apache Sqoop: A Comprehensive Guide to Data Transfer in the Hadoop Ecosystem

  Introduction In the era of big data, organizations deal with massive volumes of structured and unstructured data stored in various systems...