Tuesday, 4 February 2025

Common Spark Applications



In this blog I am going to discusses about some of the common spark program patterns, written in Scala.


1. Read from RDBMS (MySQL)  append to an existing HIVE Warehouse Table

I will read a table from a remote MySQL database using spark to a DataFrame and insert the DataFrame to an existing HIVE table.

// MySQL connection parameters

val jdbcUrl = "jdbc:mysql://your-mysql-host:3306/your_database"

val dbTable = "your_table"

val dbUser = "your_username"

val dbPassword = "your_password"

// Read data from MySQL

def df = spark.read

    .format("jdbc")

    .option("url", jdbcUrl)

    .option("dbtable", dbTable)

    .option("user", dbUser)

    .option("password", dbPassword)

    .option("driver", "com.mysql.cj.jdbc.Driver")

    .load()

// Append data to an existing Hive table

df.write

    .mode("append")

    .saveAsTable("your_hive_database.your_hive_table")

2. Read from an HIVE table append to MySQL table

I will do the reverse here, I will read the data from a HIVE table into Spark DataFrame and store the data to a remote RDBMS (MySQL) table. 

// Hive table parameters

val hiveDatabase = "your_hive_database"

val hiveTable = "your_hive_table"

// MySQL connection parameters

val jdbcUrl = "jdbc:mysql://your-mysql-host:3306/your_database"

val dbTable = "your_mysql_table"

val dbUser = "your_username"

val dbPassword = "your_password"

// Read data from Hive

def df = spark.read

    .table(s"$hiveDatabase.$hiveTable")

// Append data to MySQL table

df.write

    .format("jdbc")

    .option("url", jdbcUrl)

    .option("dbtable", dbTable)

    .option("user", dbUser)

    .option("password", dbPassword)

    .option("driver", "com.mysql.cj.jdbc.Driver")

    .mode("append")

    .save()

3. Read CSV from Local (Linux) file System and store it as Parquet in HDFS 

Here we will read a CSV from local (Linux) file system to a Spark DataFrame and store the data to HDFS as Parquet.

// Define input and output paths

val inputCsvPath = "file:///path/to/input.csv" // Local Linux file path

val outputHdfsPath = "hdfs:///path/to/output_parquet" // HDFS path

// Read the CSV file with header and infer schema

def df = spark.read

    .option("header", "true")

    .option("inferSchema", "true")

    .csv(inputCsvPath)

// Write as Parquet to HDFS

df.write

    .mode("overwrite")

    .parquet(outputHdfsPath)

4. Read a JSON file from HDFS and store the data in HIVE

Here we will read a JSON file from HDFS to a Spark DataFrame and insert the data to a HIVE table.

// Define input JSON path in HDFS

val inputJsonPath = "hdfs:///path/to/input.json"

// Hive table details

val hiveDatabase = "your_hive_database"

val hiveTable = "your_hive_table"

// Read the JSON file from HDFS

def df = spark.read

    .option("inferSchema", "true")

    .json(inputJsonPath)

// Write data to Hive table

df.write

    .mode("overwrite")

    .saveAsTable(s"$hiveDatabase.$hiveTable")

5. Read from an XML in HDFS store the data in a HIVE table

In this code snippet I am reading an XML file from HDFS to a Spark DataFrame and storing the data to a HIVE table. 

// Define input XML path in HDFS

val inputXmlPath = "hdfs:///path/to/input.xml"

// Hive table details

val hiveDatabase = "your_hive_database"

val hiveTable = "your_hive_table"

// Read the XML file from HDFS

def df = spark.read

    .format("com.databricks.spark.xml")

    .option("rowTag", "rootElement") // Replace 'rootElement' with the actual XML tag

    .option("inferSchema", "true")

    .load(inputXmlPath)

// Write data to Hive table

df.write

    .mode("overwrite")

    .saveAsTable(s"$hiveDatabase.$hiveTable")

6. Read from CSV in HDFS and store it as HIVE ORC table

Here I will be reading a CSV in HDFS to Spark DataFrame and Store the data in a HIVE ORC table.

// Define input CSV path in HDFS

val inputCsvPath = "hdfs:///path/to/input.csv"

// Hive table details

val hiveDatabase = "your_hive_database"

val hiveTable = "your_hive_table"

// Read the CSV file from HDFS

def df = spark.read

    .option("header", "true")

    .option("inferSchema", "true")

    .csv(inputCsvPath)

// Write data to Hive table in ORC format

df.write

    .mode("overwrite")

    .format("orc")

    .saveAsTable(s"$hiveDatabase.$hiveTable")

7. Insert records to an HIVE ORC from Spark

Here in this code snippet I will read a CSV in HDFS and insert the data into an HIVE ORC table, then I will create some record manually and insert the manually created rows to the same HIVE table. 

// Import the required libraries

import org.apache.spark.sql.{Row, SparkSession}

import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}

// Define input CSV path in HDFS

val inputCsvPath = "hdfs:///path/to/input.csv"

// Hive table details

val hiveDatabase = "your_hive_database"

val hiveTable = "your_hive_table"

// Read the CSV file from HDFS

def df = spark.read

    .option("header", "true")

    .option("inferSchema", "true")

    .csv(inputCsvPath)

// Write data to Hive table in ORC format

df.write

    .mode("overwrite")

    .format("orc")

    .saveAsTable(s"$hiveDatabase.$hiveTable")

// Manually create a new record

val newDataSchema = StructType(Seq(

    StructField("id", IntegerType, false),

    StructField("name", StringType, true),

    StructField("age", IntegerType, true)

))

val newData = Seq(

    Row(101, "John Doe", 30),

    Row(102, "Jane Smith", 28)

)

val newDF = spark.createDataFrame(spark.sparkContext.parallelize(newData), newDataSchema)

// Append new records to Hive table in ORC format

newDF.write

    .mode("append")

    .format("orc")

    .saveAsTable(s"$hiveDatabase.$hiveTable")

8. Word Count in Spark

Here is a wordcount program written in Spark, the result is stored to a RDBMS table.

//Import the required libraries

import org.apache.spark.sql.{Row, SparkSession}

import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}

// Define input text file path in HDFS

val inputTextPath = "hdfs:///path/to/input.txt"

// Read the text file from HDFS and perform word count

val textFile = spark.read.textFile(inputTextPath)

val wordCounts = textFile.rdd

    .flatMap(line => line.split(" "))

    .map(word => (word, 1))

    .reduceByKey(_ + _)

    .toDF("word", "count")

// MySQL connection parameters

val jdbcUrl = "jdbc:mysql://your-mysql-host:3306/your_database"

val dbTable = "word_count_table"

val dbUser = "your_username"

val dbPassword = "your_password"

// Write word count results to MySQL

wordCounts.write

    .format("jdbc")

    .option("url", jdbcUrl)

    .option("dbtable", dbTable)

    .option("user", dbUser)

    .option("password", dbPassword)

    .option("driver", "com.mysql.cj.jdbc.Driver")

    .mode("overwrite")

    .save()

9. Common transformations on DataFrames

Here is a Spark Programme covering most common transformations on DataFrames.

//Import required libraries

import org.apache.spark.sql.{SparkSession, DataFrame} import org.apache.spark.sql.functions._

// Step 1: Load a sample DataFrame (you can replace it with your own dataset)

val data = Seq( (1, "Alice", 29, "New York"), (2, "Bob", 31, "Los Angeles"), (3, "Catherine", 24, "Chicago"), (4, "David", 35, "New York"), (5, "Eva", 29, "Chicago") ) val columns = Seq("id", "name", "age", "city") // Create a DataFrame val df: DataFrame = spark.createDataFrame(data).toDF(columns: _*) // Step 2: Apply common transformations // 2.1 Select specific columns val selectedColumnsDF = df.select("name", "age") selectedColumnsDF.show() // 2.2 Filter data (e.g., select people older than 30) val filteredDF = df.filter(col("age") > 30) filteredDF.show() // 2.3 Add a new column (e.g., increase age by 1) val updatedDF = df.withColumn("age_plus_one", col("age") + 1) updatedDF.show() // 2.4 Group by and aggregate (e.g., count people by city) val groupByCityDF = df.groupBy("city").count() groupByCityDF.show() // 2.5 Join with another DataFrame val data2 = Seq( (1, "Engineer"), (2, "Doctor"), (3, "Artist"), (4, "Chef"), (5, "Teacher") ) val columns2 = Seq("id", "profession") val df2 = spark.createDataFrame(data2).toDF(columns2: _*) val joinedDF = df.join(df2, "id") joinedDF.show() // 2.6 Drop a column val droppedDF = df.drop("age") droppedDF.show() // 2.7 Rename a column val renamedDF = df.withColumnRenamed("name", "full_name") renamedDF.show() // 2.8 Order by a column (e.g., order by age) val orderedDF = df.orderBy(col("age").desc) orderedDF.show() // 2.9 Distinct values (remove duplicates) val distinctDF = df.distinct() distinctDF.show() // 2.10 Create a new column using a conditional expression val conditionalDF = df.withColumn( "age_group", when(col("age") < 30, "Young") .when(col("age") >= 30 && col("age") < 40, "Middle-aged") .otherwise("Older") ) conditionalDF.show()

10. Common transformations on DataSets

//Import Required libraries

import org.apache.spark.sql.{SparkSession}

import org.apache.spark.sql.functions._

import org.apache.spark.sql.Dataset

// Step 1: Define case class for a strongly-typed Dataset

case class Person(id: Int, name: String, age: Int, city: String, profession: String)

// Step 2: Load data into Dataset (you can replace it with your own dataset)

import spark.implicits._

val data = Seq(

  Person(1, "Alice", 29, "New York", "Engineer"),

  Person(2, "Bob", 31, "Los Angeles", "Doctor"),

  Person(3, "Catherine", 24, "Chicago", "Artist"),

  Person(4,"David", 35, "New York", "Chef"),

  Person(5,"Eva", 29, "Chicago", "Teacher")

)

// Convert the sequence to a Dataset

val ds: Dataset[Person] = spark.createDataset(data)

// Step 3: Apply common transformations

// 3.1 Select specific columns (using Dataset API)

val selectedColumnsDS = ds.select($"name", $"age")

selectedColumnsDS.show()

// 3.2 Filter data (e.g., select people older than 30)

val filteredDS = ds.filter($"age" > 30)

filteredDS.show()

// 3.3 Add a new column (e.g., increase age by 1)

val updatedDS = ds.withColumn("age_plus_one", $"age" + 1)

updatedDS.show()

// 3.4 Group by and aggregate (e.g., count people by city)

val groupByCityDS = ds.groupBy($"city").count()

groupByCityDS.show()

// 3.5 Join with another Dataset (using Dataset API)

val data2 = Seq(

  (1,"Engineer"),

  (2,"Doctor"),

  (3,"Artist"),

  (4,"Chef"),

  (5,"Teacher"))

val columns2 = Seq("id", "profession")

val ds2 = data2.toDF("id", "profession").as[(Int, String)]

val joinedDS = ds.join(ds2, "id")

joinedDS.show()

// 3.6 Drop a column

val droppedDS = ds.drop("age")

droppedDS.show()

// 3.7 Rename a column

val renamedDS = ds.withColumnRenamed("name","full_name")

renamedDS.show()

// 3.8 Order by a column (e.g., order by age)

val orderedDS = ds.orderBy($"age".desc)

orderedDS.show()

// 3.9 Distinct values (remove duplicates)

val distinctDS = ds.distinct()

distinctDS.show()

// 3.10 Create a new column using a conditional expression

val conditionalDS = ds.withColumn("age_group", 

when($"age" < 30, "Young")  

.when($"age" >= 30 && $"age" < 40,"Middle-aged")

.otherwise("Older"))

conditionalDS.show()

11. Common transformations using Spark SQL

In this code snippet I will be rewriting the above program using Spark SQL.
 

   // Step 1: Import the required libraries

     import org.apache.spark.sql.{SparkSession}

    import org.apache.spark.sql.functions._

    // Step 2: Load data into a DataFrame (you can replace it with your own dataset)

    import spark.implicits._

    val data = Seq(

      (1, "Alice", 29, "New York", "Engineer"),

      (2, "Bob", 31, "Los Angeles", "Doctor"),

      (3, "Catherine", 24, "Chicago", "Artist"),

      (4, "David", 35, "New York", "Chef"),

      (5, "Eva", 29, "Chicago", "Teacher")

    )

    // Convert the sequence to a DataFrame

    val df = data.toDF("id", "name", "age", "city", "profession")

    // Step 3: Register the DataFrame as a temporary SQL table

    df.createOrReplaceTempView("people")

    // Step 4: Apply common transformations using Spark SQL

    // 4.1 Select specific columns

    println("Select specific columns:")

    spark.sql("SELECT name, age FROM people").show()

    // 4.2 Filter data (e.g., select people older than 30)

    println("Filter people older than 30:")

    spark.sql("SELECT * FROM people WHERE age > 30").show()

    // 4.3 Add a new column (e.g., increase age by 1)

    println("Add a new column (age + 1):")

    spark.sql("SELECT *, age + 1 AS age_plus_one FROM people").show()

    // 4.4 Group by and aggregate (e.g., count people by city)

    println("Group by and count people by city:")

    spark.sql("SELECT city, COUNT(*) AS count FROM people GROUP BY city").show()

    // 4.5 Join with another DataFrame (using Spark SQL)

    val data2 = Seq(

      (1, "Engineer"),

      (2, "Doctor"),

      (3, "Artist"),

      (4, "Chef"),

      (5, "Teacher")

    )

    // Convert the second dataset to a DataFrame

    val df2 = data2.toDF("id", "profession")

    // Register the second DataFrame as a temporary table

    df2.createOrReplaceTempView("professions")

    spark.sql("SELECT p.id, p.name, p.age, p.city, pr.profession FROM people p JOIN professions pr ON p.id = pr.id").show()

    // 4.6 Drop a column (Spark SQL doesn't support dropping columns directly, but we can select without it)

    spark.sql("SELECT id, name, city, profession FROM people").show()

    // 4.7 Rename a column (again, done by selecting with the new name)

    spark.sql("SELECT id, name AS full_name, age, city, profession FROM people").show()

    // 4.8 Order by a column (e.g., order by age)

    spark.sql("SELECT * FROM people ORDER BY age DESC").show()

    // 4.9 Distinct values (remove duplicates)

    spark.sql("SELECT DISTINCT * FROM people").show()

    // 4.10 Create a new column using a conditional expression

    spark.sql("""

      SELECT id, name, age, city, profession,

             CASE

               WHEN age < 30 THEN 'Young'

               WHEN age >= 30 AND age < 40 THEN 'Middle-aged'

               ELSE 'Older'

             END AS age_group

      FROM people

    """).show()

12. Common transformations using RDD

In this code snippet I will be showing some common transformation on RDD.

    // Step 1: Import libraries  

    import org.apache.spark.sql.SparkSession

// Step 2: Create an RDD from a collection val data = Seq( (1, "Alice", 29), (2, "Bob", 31), (3, "Catherine", 24), (4, "David", 35), (5, "Eva", 29) ) // Create an RDD from a sequence of tuples val rdd = spark.sparkContext.parallelize(data) // Step 3: Apply common RDD transformations // 3.1 Map operation (Transforming the data) val namesRDD = rdd.map(record => record._2) println("Names RDD:") namesRDD.collect().foreach(println) // 3.2 Filter operation (Filter out records where age is greater than 30) val filteredRDD = rdd.filter(record => record._3 > 30) println("Filtered RDD (age > 30):") filteredRDD.collect().foreach(println) // 3.3 FlatMap operation (Used to flatten results, e.g., splitting a string into words) val wordsRDD = rdd.flatMap(record => record._2.split(" ")) println("FlatMap RDD (splitting names into words):") wordsRDD.collect().foreach(println) // 3.4 GroupBy operation (Group by age) val groupedByAgeRDD = rdd.groupBy(record => record._3) println("Grouped by Age RDD:") groupedByAgeRDD.collect().foreach(println) // 3.5 Reduce operation (Sum of all ages) val sumOfAges = rdd.map(record => record._3).reduce((a, b) => a + b) println(s"Sum of ages: $sumOfAges") // 3.6 Aggregate operation (Calculate sum and count of ages) val aggregatedRDD = rdd.map(record => record._3).aggregate((0, 0))( (acc, age) => (acc._1 + age, acc._2 + 1), // combiner logic (acc1, acc2) => (acc1._1 + acc2._1, acc1._2 + acc2._2) // combiner logic for different partitions ) println(s"Sum of ages: ${aggregatedRDD._1}, Count of records: ${aggregatedRDD._2}") // 3.7 Union operation (Combine two RDDs) val moreData = Seq( (6, "George", 28), (7, "Hannah", 32) ) val moreDataRDD = spark.sparkContext.parallelize(moreData) val unionRDD = rdd.union(moreDataRDD) println("Union of RDDs:") unionRDD.collect().foreach(println) // 3.8 Intersection operation (Find common elements between two RDDs) val intersectionRDD = rdd.intersection(moreDataRDD) println("Intersection of RDDs:") intersectionRDD.collect().foreach(println) // 3.9 Distinct operation (Remove duplicates) val distinctRDD = rdd.distinct() println("Distinct RDD:") distinctRDD.collect().foreach(println) // 3.10 SortBy operation (Sort by age) val sortedRDD = rdd.sortBy(record => record._3) println("Sorted by age RDD:") sortedRDD.collect().foreach(println) // Step 4: Apply common RDD actions // 4.1 Collect operation (Collect all elements of the RDD to the driver) val collectedData = rdd.collect() println("Collected RDD data:") collectedData.foreach(println) // 4.2 Count operation (Count the number of records) val count = rdd.count() println(s"Number of records in RDD: $count") // 4.3 First operation (Get the first element of the RDD) val firstElement = rdd.first() println(s"First element in RDD: $firstElement") // 4.4 Take operation (Take the first n elements) val takeTwo = rdd.take(2) println("First 2 elements in RDD:") takeTwo.foreach(println) // 4.5 Save operation (Save the RDD to a text file) rdd.saveAsTextFile("output_rdd.txt") println("RDD data has been saved to 'output_rdd.txt'")

Sunday, 2 February 2025

Installing and Configuring DbVisualizer in Ubuntu to Connect to a Remote Hive Warehouse

 



In this blog post we will discusses how database developers and data analyst can connect to a remote HIVE warehouse to perform querying and analysis of data in HDFS.

Step 1: Install Java 21 in data analyst system (different user account)

Command:

sudo apt update 

sudo apt install openjdk-21-jdk

Step 2: Find the Java Installation Path

Command:

update-alternatives --list java

Step 3: Set Java 21 for Only One User

Command:

nano .bashrc

Add the below environment variables

#JAVA Related Options

export JAVA_HOME=/usr/lib/jvm/java-21-openjdk-amd64/bin/java

export PATH=$PATH:$JAVA_HOME/bin

Refresh the profile

source ~/.bashrc

Step 4: Verify Java version

Command

java -version

Step 5: Download the DbVisualizer

Command

Go to https://www.dbvis.com/download/ and copy the link of the latest linux distribution.


wget https://www.dbvis.com/product_download/dbvis-24.3.3/media/dbvis_linux_24_3_3.tar.gz

Extract the binaries:

tar xvfz dbvis_linux_24_3_3.tar.gz

Step 6: Configure the class path

Command:

nano .bashrc

Add the following lines to the end of .bashrc file

#DbVisualizer
export INSTALL4J_JAVA_HOME=/usr/lib/jvm/java-21-openjdk-amd64/bin/java
export DB_VIS=/home/aksahoo/applications/DbVisualizer
export PATH=$PATH:$DB_VIS/

Refresh the profile:

source ~/.bashrc

Step 7: Start the DbVisualizer


Command:

dbvis

Step 8: Create a connection to Remote HIVE Warehouse








Provide the Hive Server 2 details.

Database Server: localhost
Database Port: 10000
Database: default
Database Userid: hdoop
database Password: 

Step 9: Allow User Impersonation in Hadoop Core Site (Hadoop user account)


Edit core-stite.xml: nano hadoop/etc/hadoop/core-site.xml

Add the following configurations

<property>
  <name>hadoop.proxyuser.hdoop.hosts</name>
  <value>*</value>
</property>
<property>
  <name>hadoop.proxyuser.hdoop.groups</name>
  <value>*</value>
</property>

Step 10: Allow Impersonation in Hive Configuration (Hadoop user account)


Edit hive-site.xml: nano hive/conf/hive-site.xml

Add the following configurations

<property>
  <name>hive.server2.enable.doAs</name>
  <value>true</value>
</property>

Step 11: Restart Hadoop and ensure Hadoop Services are up and running (Hadoop user account) in this case hdoop



Command:

cd /home/hdoop/hadoop/sbin
./stop-all.sh
./start-all.sh
jps

Step 12: Refresh user to group Mapping


Command:
hdfs dfsadmin -refreshUserToGroupsMappings


Step 13: Start the HIVE Metastore Service and HiveServer2

Command:
hive --service metastore &
hive --service hiveserver2 &


Step 14: Go to DbVisualizer and test connection






Step 15: Quey HIVE tables from DbVisualizer

HQL: select * from employees;






Saturday, 1 February 2025

Spark and Hive Metastore Integration


Spark can be integrated with hive metastore to have a common metastore layer between hive and spark. In this blog I will detail out steps on how to reuse hive metastore for spark engine.

Prerequisite:

1. Existing Hadoop installation

2. Existing Hive Installation

3. Existing Spark Installation: Steps to install Spark can be found here

Step 1: Copy the Hive Metastore RDBMS Driver from hive/lib to spark/jars folder

Command: cp hive/lib/mysql-connector-java-8.0.28.jar spark/jars/

Note: Assuming the Hive Metastore is MySQL database.




Step 2: Ensure MySQL and Hive Metastore Services are running

command:

sudo systemctl start mysql

hive --service metastore &

Step 3: Edit $SPARK_HOME/conf/spark-defaults.conf (create it if missing):


Add the following line.

spark.sql.catalogImplementation=hive


Step 4: Verify Spark-Hive Metastore Integration


Start Spark Shell: spak-shell

And execute below line at the Scala prompt: spark.sql("SHOW DATABASES").show()

If it shows all the hive databases, then the integration is successful.


Step 5: Make Sure Hadoop Services are up and running, if not start it.




Command:

To verify: jps

To start:
cd /home/hdoop/hadoop/sbin
./start-all.sh

Step 6: Run a HQL to read the data (which is in HDFS) from a table


 Step 7: Accessing HIVE Databases and Tables from spark-sql

If the above configuration is working fine, hive databases and tables can be access direct from the spark-sql.

Command: spark-sql









Tuesday, 28 January 2025

Using Spark as the Hive Execution Engine

 


In this blog I will be discussing how to configure Spark as an execution engine for HIVE.

Step 0: Prerequisite


  • Existing Java, Hadoop and HIVE installation
  • Find the compatible Spark version that can be an execution engine for your HIVE. Here is the HIVE and Spark compatibility matrix.


I have HIVE 3.1.2 in my VM, so I will download and configure Spark 2.4.8 as the execution engine of HIVE as HIVE 3.1.2 is compatible with Spark 2.4.8.

Step 1: Configure Environment Variables

Please make sure following environment variables are configured in your .bashrc file:

#JAVA Related Options

export JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64

export PATH=$PATH:$JAVA_HOME/bin

#Hadoop Related Options

export HADOOP_HOME=/home/hdoop/hadoop

export HADOOP_INSTALL=$HADOOP_HOME

export HADOOP_MAPRED_HOME=$HADOOP_HOME

export HADOOP_COMMON_HOME=$HADOOP_HOME

export HADOOP_HDFS_HOME=$HADOOP_HOME

export YARN_HOME=$HADOOP_HOME

export HADOOP_COMMON_LIB_NATIVE_DIR=$HADOOP_HOME/lib/native

export PATH=$PATH:$HADOOP_HOME/sbin:$HADOOP_HOME/bin

export HADOOP_OPTS="-Djava.library.path=$HADOOP_HOME/lib/native"

export HADOOP_CLASSPATH=${JAVA_HOME}/lib/tools.jar

export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop

#HIVE Related Options export HIVE_HOME=/home/hdoop/hive export PATH=$PATH:$HIVE_HOME/bin

Refresh the profile with the command: ~/.bahrc

Step 2: Download Spark 2.4.8 version


Download the Spark 2.4.8 without Hadoop tar from the Spark Archive. Copy the link address as shown in the picture and download the .tgz file to your current directory using wget command, and then unzip it.

wget https://archive.apache.org/dist/spark/spark-2.4.8/spark-2.4.8-bin-without-hadoop.tgz

tar xvf spark-*.tgz




Step 3: Add the Spark Dependency to Hive

Create link to the following Jars in $HIVE_HOME/lib (pointing to respective jar in spark-2.4.8-bin-without-hadoop/jars). Execute the below commands to create the links.

cd $HIVE_HOME/lib

ln -s /home/hdoop/spark-2.4.8-bin-without-hadoop/jars/scala-library*.jar

ln -s /home/hdoop/spark-2.4.8-bin-without-hadoop/jars/spark-core*.jar

ln -s /home/hdoop/spark-2.4.8-bin-without-hadoop/jars/spark-network-common*.jar

ln -s /home/hdoop/spark-2.4.8-bin-without-hadoop/jars/spark-network-shuffle*.jar

ln -s /home/hdoop/spark-2.4.8-bin-without-hadoop/jars/jersey-server*.jar

ln -s /home/hdoop/spark-2.4.8-bin-without-hadoop/jars/jersey-container-servlet-core*.jar

ln -s /home/hdoop/spark-2.4.8-bin-without-hadoop/jars/jackson-module*.jar

ln -s/home/hdoop/spark-2.4.8-bin-without-hadoop/jars/chill*.jar

ln -s /home/hdoop/spark-2.4.8-bin-without-hadoop/jars/json4s-ast*.jar

ln -s /home/hdoop/spark-2.4.8-bin-without-hadoop/jars/kryo-shaded*.jar

ln -s /home/hdoop/spark-2.4.8-bin-without-hadoop/jars/minlog*.jar

ln -s /home/hdoop/spark-2.4.8-bin-without-hadoop/jars/scala-xml*.jar

ln -s /home/hdoop/spark-2.4.8-bin-without-hadoop/jars/spark-launcher*.jar

ln -s /home/hdoop/spark-2.4.8-bin-without-hadoop/jars/spark-network-shuffle*.jar

ln -s /home/hdoop/spark-2.4.8-bin-without-hadoop/jars/spark-unsafe*.jar

ln -s /home/hdoop/spark-2.4.8-bin-without-hadoop/jars/xbean-asm5-shaded.jar

Step 4: Configure Spark to Access Hadoop Class path

Edit the spark-env.sh (create it from spark-env.sh.template if not exists) and then add the following configurations.

nano /home/hdoop/spark-2.4.8-bin-without-hadoop/conf/spark-env.sh

#Add the below lines:

export SPARK_DIST_CLASSPATH=$(/home/hdoop/hadoop/bin/hadoop classpath)

#Spark related options

export SPARK_HOME=/home/hdoop/spark-2.4.8-bin-without-hadoop

export PATH=$PATH:$SPARK_HOME/bin:$SPARK_HOME/sbin

export PYSPARK_PYTHON=/usr/bin/python3

Step 5: Configure Hive to Access Spark Jars

Edit the hive-env.sh file and make sure following entries exists.

nano /home/hdoop/hive/conf/hive-env.sh

#Add the below lines

export HADOOP_HOME=/home/hdoop/hadoop

# Hive Configuration Directory can be controlled by:

export HIVE_CONF_DIR=$HIVE_HOME/conf

export SPARK_HOME=/home/hdoop/spark-2.4.8-bin-without-hadoop

export SPARK_JARS="" 

for jar in `ls $SPARK_HOME/jars`; do 

        export SPARK_JARS=$SPARK_JARS:$SPARK_HOME/jars/$jar 

done 

export HIVE_AUX_JARS_PATH=$SPARK_JARS

Step 6: Configure Hive to use Spark Engine in YARN mode

Add the following entry to hive-site.xml

nano /home/hdoop/hive/conf/hive-site.xml

# Add the following lines

<property>

    <name>hive.execution.engine</name>

    <value>spark</value>

</property>

<property>

    <name>spark.master</name>

    <value>yarn</value>

</property>

<property>

    <name>spark.eventLog.enabled</name>

    <value>true</value>

</property>

<property>

    <name>spark.eventLog.dir</name>

    <value>/tmp</value>

</property>

<property>

    <name>spark.driver.memory</name>

    <value>2g</value>

</property>

<property>

    <name>spark.executor.memory</name>

    <value>2g</value>

</property>

<property>

    <name>spark.serializer</name>

    <value>org.apache.spark.serializer.KryoSerializer</value>

</property>

<property>

    <name>spark.yarn.jars</name>

    <value>hdfs://127.0.0.1:9000/spark/jars/*</value>

    <!-- <value>hdfs:///spark/jars/*.jar</value> -->

</property>

<property>

    <name>spark.submit.deployMode</name>

    <value>client</value>

    <!-- <value>cluster</value> -->

</property>

<!--

<property>

    <name>spark.yarn.queue</name>

    <value>default</value>

</property>

-->

<property>

  <name>hive.spark.job.monitor.timeout</name>

  <value>600</value>

</property>

<property>

  <name>hive.server2.enable.doAs</name>

  <value>true</value>

</property>

Step 7: Copy Spark jar's in Spark/Jar folder to hdfs:///spark/jars/

Copy all the jars in /home/hdoop/spark-2.4.8-bin-without-hadoop/jars path to hdfs:///spark/jars/ (HDFS path). Refer to the previous step, we have pointed the "spark.yarn.jars" to  hdfs:///spark/jars/ in hive-site.xml. YARN will look for Spark in this HDFS path.

hdfs dfs -mkdir -p /spark/jars/

hdfs dfs -put /home/hdoop/spark-2.4.8-bin-without-hadoop/jars/* /spark/jars/*

Step 8: Restart Hadoop and Hive Services


After the above changes restart all the HADOOP and hive service with the below commands.

cd /home/hdoop/hadoop/sbin

./stop-all.sh

./start-all.sh

jps

hive

Step 9: Run an HQL that triggers execution engine (Spark)


Run an HQL that will trigger the execution engine, example: select count(*) from emplyees;
If all the configurations are correct you should see output similar to the below snapshot.



Also you should notice a HIVE on Spark application in YARN UI.






Monday, 27 January 2025

Spark 3.4.4 Installation in Ubuntu

Step 1: Install Java

Install Java 8, as we will use the same VM for Spark installation where Hadoop 3.x and Hive 3.x are installed. Though we can use Java 11. Java 8 is a safe bate to build Hadoop ecosystem (3.x version)

Update the system with the below command

sudo apt update

Install OpenJDK 8, with below command

sudo apt install openjdk-8-jdk -y

check the installation with following command

java -version; javac -version

 Step 2: Install Scala

Install scala with the below command:

sudo apt install scala -y

check the installation with the following command:


Step 3: Download Spark

Navigate to the Spark download page. And copy the download link path as shown in the below picture.


Use the copied link with wget command to download the spark binary, as shown below:

wget https://www.apache.org/dyn/closer.lua/spark/spark-3.4.4/spark-3.4.4-bin-hadoop3.tgz

The URL next to the wget is the one copied in the previous step. If the download is successful you should see the spark binary file downloaded to your current folder as shown in the picture.


Step 4: Extract the Spark package

Extract the archive Spark binary using the following tar command.

tar xvf spark-*.tgz

After extraction you should see a folder as shown below in the picture:


Step 5: Create a symlink

Create a symlink to the spark-3.4.4-bin-hadoop3 for easier configuration using the below command.

ln -s /home/hdoop/spark-3.4.4-bin-hadoop3/ /home/hdoop/spark

Step 6: Set the environment variables

Add the following lines to the .bashrc file.

export SPARK_HOME=/home/hdoop/spark
export PATH=$PATH:$SPARK_HOME/bin:$SPARK_HOME/sbin
export PYSPARK_PYTHON=/usr/bin/python3

(check if you have python3 in your system and modify the path if necessary)
open the .bashrc using the following command and add the above lines:

nano .bashrc

(make sure you are at the user folder level, in this case /home/hdoop)

save the .bashrc file using CTRL+X and enter button.

Then load the update profile using the below command.

source ~/.bashrc

Step 7: Start Spark Master Server (Stand alone cluster no YARN)

Start the spark master server using the bellow command:

start-master.sh

Post this you can view the Spark Web UI at:

http://127.0.0.1:8080/

If all goes well you should see a web page similar to the one below:



Make a note of the host and the port of the master server (the one in yellow box). Also notice there are no workers running now (please refer to the green box)

Step 8: Start a worker process (Stand alone cluster no YARN)

Use the following command format to start a worker server in a single-server setup:

start-worker.sh spark://master_server:port
start-worker.sh -c 1 -m 512M spark://master_server:port (use this one if you want specfic CPU and memory size for the worker) 

Note: Replace the master_server and port captured in the previous step.

Refresh the Spark Master's Web UI to see the new worker on the list.


Step 9: Test Spark Shell

To run the spark shell (integrated with Scala by default use the below command)

spark-shell

Upon successful execution you should see a screen like this, with spark version and Scala prompt:


Now you can use the Scala prompt to write spark programs interactively.

Step 10: Run a simple spark application

// Import implicits for easier syntax
import org.apache.spark.sql.DataFrame
import spark.implicits._

// Create a sequence of data (rows) as case class or tuples
val data = Seq(
  (1, "Alice", 28),
  (2, "Bob", 25),
  (3, "Catherine", 30)
)

// Create a DataFrame from the sequence with column names
val df: DataFrame = data.toDF("ID", "Name", "Age")

// Show the contents of the DataFrame
df.show()

Note: You can use the :paste and ctrl+d combination to input multiline code in the spark-shell

Step 11: Exit the spark-shell

Supply the below command to exit from the spark (scala) shell:

:q

Step 12: Test PySpark

Enter the following command to start PySpark

pyspark


Step 13: Run a sample PySpark Code


df = spark.createDataFrame(
    [
        ("sue", 32),
        ("li", 3),
        ("bob", 75),
        ("heo", 13),
    ],
    ["first_name", "age"],
)
df.show()
Use quit() to exit the pyspark shell

Step 14: Stopping Spark

Stop Master Server: stop-master.sh

Stop Worker Process: stop-worker.sh

Step 15: Configure Spark to use YARN

Note: Please make sure your out of the stand alone cluster mode by executing step 14.

Start the hadoop services if not running already, supply the below commands

cd /home/hdoop/hadoop/sbin
./start-all.sh
jps

Open the .bashrc file and add the environment variable HADOOP_CONF_DIR

nano .bashrc (at /home/hdoop/)
export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop

Save and Exit (CTRL+X followed by enter)



Refresh the profile
source ~/.bashrc

Start the Spark Shell in YARN Mode now:

spark-shell --master yarn (scala)
pyspark --master yarn (python)

Go to the YARN UI (http://localhost:8088) and verify.



Step 16: Configure Spark to start in YARN mode by default

Stop all the spark application (spark-shell, spark-sql, pyspark, and spark-submit) if running.

Open (or create if not exist) spark-defaults.conf file: nano spark/conf/spark-defaults.conf

And add the below configuration to the files:

spark.master yarn
spark.submit.deployMode client (can be cluster as well)

Start the spark-shell by firing the command: spark-shell (without --master YARN). Navigate the YARN UI (http://localhost:8088), check if spark-shell is listed as shown in the below picture.






Apache Sqoop: A Comprehensive Guide to Data Transfer in the Hadoop Ecosystem

  Introduction In the era of big data, organizations deal with massive volumes of structured and unstructured data stored in various systems...