Sunday, 9 February 2025

Installing Hadoop on Ubuntu

 Steps for Java 8 installations

..............................

Step 1: sudo apt update

Step 2: sudo apt install openjdk-8-jdk -y

Step 3: java -version; javac -version (verification of java installations)



Steps for Hadoop 3.3.0 Installations

......................................

Step 1: (Install OpenSSH server and client): sudo apt install openssh-server openssh-client -y

Step 2: Create an admin user for hadoop ecosystem: sudo adduser hdoop

Step 3: su - hdoop (always repeat this step when starting hadoop)

Step 4: (Enable Passwordless SSH for Hadoop User) ssh-keygen -t rsa -P '' -f ~/.ssh/id_rsa

Step 5: (Store the public key as authorized_keys in the ssh directory) cat ~/.ssh/id_rsa.pub >> ~/.ssh/authorized_keys

Step 6: (Set the file permissions) chmod 0600 ~/.ssh/authorized_keys

Step 7: ssh the new user to local host: ssh localhost (always repeat this step when starting hadoop)

Step 8: (download the tar): wget https://archive.apache.org/dist/hadoop/common/hadoop-3.3.0/hadoop-3.3.0.tar.gz

Step 9: (untar): tar xzf hadoop-3.3.0.tar.gz

Step 10: (create symlink): ln -s /home/hdoop/hadoop-3.3.0/ /home/hdoop/hadoop

Step 11: (Set the following environment variables): nano .bashrc


#JAVA Related Options

export JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64


#Hadoop Related Options

export HADOOP_HOME=/home/hdoop/hadoop

export HADOOP_INSTALL=$HADOOP_HOME

export HADOOP_MAPRED_HOME=$HADOOP_HOME

export HADOOP_COMMON_HOME=$HADOOP_HOME

export HADOOP_HDFS_HOME=$HADOOP_HOME

export YARN_HOME=$HADOOP_HOME

export HADOOP_COMMON_LIB_NATIVE_DIR=$HADOOP_HOME/lib/native

export PATH=$PATH:$HADOOP_HOME/sbin:$HADOOP_HOME/bin

export HADOOP_OPTS="-Djava.library.path=$HADOOP_HOME/lib/native"

export HADOOP_CLASSPATH=${JAVA_HOME}/lib/tools.jar


#HIVE Related Options

export HIVE_HOME=/home/hdoop/hive

export PATH=$PATH:$HIVE_HOME/bin


Step 12: (Source the .bashrcsource): ~/.bashrc

Step 13: (configure haddop environment): nano $HADOOP_HOME/etc/hadoop/hadoop-env.sh (uncomment JAVA_HOME and configure as follows)


export JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64


Step 14: (Configure hadoop core): nano $HADOOP_HOME/etc/hadoop/core-site.xml (add the below properties)


<property>

  <name>hadoop.tmp.dir</name>

  <value>/home/hdoop/tmpdata</value>

</property>

<property>

  <name>fs.default.name</name>

  <value>hdfs://127.0.0.1:9000</value>

</property>


Step 15: (Create the /home/hdoop/tmpdata folder): mkdir /home/hdoop/tmpdata

Step 16: (edit hdfs site file): nano $HADOOP_HOME/etc/hadoop/hdfs-site.xml (add the below entries)


<property>

  <name>dfs.data.dir</name>

  <value>/home/hdoop/dfsdata/namenode</value>

</property>

<property>

  <name>dfs.data.dir</name>

  <value>/home/hdoop/dfsdata/datanode</value>

</property>

<property>

  <name>dfs.replication</name>

  <value>1</value>

</property>


Step 17: (Configure mapred site): nano $HADOOP_HOME/etc/hadoop/mapred-site.xml (add the below property)


<property>

  <name>mapreduce.framework.name</name>

  <value>yarn</value>

</property>


Step 18: (Configure yarn): nano $HADOOP_HOME/etc/hadoop/yarn-site.xml (add the below properties)


<property>

  <name>yarn.nodemanager.aux-services</name>

  <value>mapreduce_shuffle</value>

</property>

<property>

  <name>yarn.nodemanager.aux-services.mapreduce.shuffle.class</name>

  <value>org.apache.hadoop.mapred.ShuffleHandler</value>

</property>

<property>

  <name>yarn.resourcemanager.hostname</name>

  <value>127.0.0.1</value>

</property>

<property>

  <name>yarn.acl.enable</name>

  <value>0</value>

</property>

<property>

  <name>yarn.nodemanager.env-whitelist</name>   

  <value>JAVA_HOME,HADOOP_COMMON_HOME,HADOOP_HDFS_HOME,HADOOP_CONF_DIR,CLASSPATH_PERPEND_DISTCACHE,HADOOP_YARN_HOME,HADOOP_MAPRED_HOME</value>

</property>


Step 19: (Format Name Node): hdfs namenode -format

Step 20: Play around with hdfs dfs command (File already provided)


Steps for running WordCount MapReduce Programs

.................................................

Step 1: create following path to store your mapreduce application (optional): mkdir /home/hdoop/application/mapreduce_app/wordcount_app/

Step 2: create the mapreduce java application file: nano WordCount.java


import java.io.IOException;

import java.util.StringTokenizer;


import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.Mapper;

import org.apache.hadoop.mapreduce.Reducer;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;


public class WordCount {


  public static class TokenizerMapper

       extends Mapper<Object, Text, Text, IntWritable>{


    private final static IntWritable one = new IntWritable(1);

    private Text word = new Text();


    public void map(Object key, Text value, Context context

                    ) throws IOException, InterruptedException {

      StringTokenizer itr = new StringTokenizer(value.toString());

      while (itr.hasMoreTokens()) {

        word.set(itr.nextToken());

        context.write(word, one);

      }

    }

  }


  public static class IntSumReducer

       extends Reducer<Text,IntWritable,Text,IntWritable> {

    private IntWritable result = new IntWritable();


    public void reduce(Text key, Iterable<IntWritable> values,

                       Context context

                       ) throws IOException, InterruptedException {

      int sum = 0;

      for (IntWritable val : values) {

        sum += val.get();

      }

      result.set(sum);

      context.write(key, result);

    }

  }


  public static void main(String[] args) throws Exception {

    Configuration conf = new Configuration();

    Job job = Job.getInstance(conf, "word count");

    job.setJarByClass(WordCount.class);

    job.setMapperClass(TokenizerMapper.class);

    job.setCombinerClass(IntSumReducer.class);

    job.setReducerClass(IntSumReducer.class);

    job.setOutputKeyClass(Text.class);

    job.setOutputValueClass(IntWritable.class);

    FileInputFormat.addInputPath(job, new Path(args[0]));

    FileOutputFormat.setOutputPath(job, new Path(args[1]));

    System.exit(job.waitForCompletion(true) ? 0 : 1);

  }

}


Step 3: compile the java file: hadoop com.sun.tools.javac.Main WordCount.java

Step 4: create the jar file: jar cf wc.jar WordCount*.class

Step 5: submit the mapreduce program to the cluster: hadoop jar wc.jar WordCount /input/test/file1.txt /output/test/

(create the input hdfs path and load the file1.txt having some text to the hdfs path using hdfs dfs -put command)

(don't create the output path)


....................Hive 3.1.2 Installation Steps .......

Step 1: Download the tar at /home/hdoop/: wget https://apache.root.lu/hive/hive-3.1.2/apache-hive-3.1.2-bin.tar.gz

Step 2: Untar the tar file: tar xzf apache-hive-3.1.2-bin.tar.gz

Step 3: Create a symlink: ln -s /home/hdoop/apache-hive-3.1.2-bin /home/hdoop/hive

Step 4: edit the .bashrc to add the following environment variables:


export HIVE_HOME=/home/hdoop/hive

export PATH=$PATH:$HIVE_HOME/bin/


Step 5: nano $HADOOP_HOME/etc/hadoop/mapred-site.xml: Add the following Configuration


<property>

  <name>mapreduce.map.memory.mb</name>

  <value>4096</value>

</property>

<property>

  <name>mapreduce.reduce.memory.mb</name>

  <value>4096</value>

</property>


Step 6: Resrtart Hadoop


cd /home/hdoop/hadoop/sbin

./stop-all.sh

./start-all.sh


Step 7: Create temp and warehouse folder in HDFS and assign permissions


hdfs dfs -mkdir -p /user/hive/warehouse

hdfs dfs -mkdir -p /tmp/hive


hdfs dfs -chmod 777 /tmp

hdfs dfs -chmod 777 /tmp/hive


hdfs dfs -chmod 777 /user/hive/warehouse


Step 8: Delete Obsolete log4j-slf4j


ls /home/hdoop/hive/lib/log4j-slf4j-impl-*.jar

rm /home/hdoop/hive/lib/log4j-slf4j-impl-2.10.0.jar


Step 9: Replace Guava.Jar


ls /home/hdoop/hadoop/share/hadoop/common/lib/guava*.jar

ls /home/hdoop/hive/lib/guava*.jar

cp /home/hdoop/hadoop/share/hadoop/common/lib/guava*.jar /home/hdoop/hive/lib/

rm /home/hdoop/hive/lib/guava-19.0.jar


Step 10: Intialize Derby Database

/home/hdoop/hive/bin/schematool -initSchema -dbType derby


a metastore_db should be created upon successfull execution


Step 10: Configure yarn-site.xml: nano $HADOOP_HOME/etc/hadoop/yarn-site.xml

(Add the below properties at the end)


<property>

  <name>yarn.nodemanager.vmem-check-enabled</name>

  <value>false</value>

</property>

<property>

  <name>yarn.nodemanager.vmem-pmem-ratio</name>

  <value>4</value>

</property>


Step 11: Start hive: hive


Create some tables and check if HQL is able to trigger map-reduce jobs


...................MySQL as meta store for (Hive 3.1.2).......

Step 1: Install MySQL


sudo apt update

sudo apt install mysql-server


Step 2: Ensure MySQL erver is running: sudo systemctl start mysql.service


Step 3: Create user credentials for hive


sudo mysql

CREATE USER 'hiveuser'@'localhost' IDENTIFIED BY 'Hive111!!!';

GRANT ALL PRIVILEGES ON *.* TO 'hiveuser'@'localhost';


Step 4: Download mysql-connector-java-8.0.28.jar to /home/hdoop/hive/lib/


cd /home/hdoop/hive/lib/

wget https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.28/mysql-connector-java-8.0.28.jar


Step 5: create hive-site.xml file in /home/hdoop/hive/conf/


nano /home/hdoop/hive/conf/hive-site.xml


(add the below configuratuons)

<?xml version="1.0" encoding="UTF-8" standalone="no"?>

<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>

<configuration>

<property>

    <name>javax.jdo.option.ConnectionURL</name>

    <value>jdbc:mysql://localhost/metastore_db?createDatabaseIfNotExist=true</value>

    <description>

      JDBC connect string for a JDBC metastore.

      To use SSL to encrypt/authenticate the connection, provide database-specific SSL flag in the connection URL.

      For example, jdbc:postgresql://myhost/db?ssl=true for postgres database.

    </description>

</property>

<property>

    <name>javax.jdo.option.ConnectionDriverName</name>

    <value>com.mysql.cj.jdbc.Driver</value>

    <description>Driver class name for a JDBC metastore</description>

</property>

<property>

    <name>javax.jdo.option.ConnectionPassword</name>

    <value>Hive111!!!</value>

    <description>password to use against metastore database</description>

</property>

<property>

    <name>javax.jdo.option.ConnectionUserName</name>

    <value>hiveuser</value>

    <description>Username to use against metastore database</description>

</property>

</configuration>


Step 6: Hive Schema Initial Tool


/home/hdoop/hive/bin/schematool  -initSchema  -dbType mysql

login to MySQL to check if metastore_db database is created or not


Step 7: Restart hadoop and hive and check if HQL are running





Zookeeper, its Importance and Installation


Introduction

Apache ZooKeeper is a distributed coordination service for managing configuration, synchronization, and naming services in large-scale distributed systems. It helps maintain consistent and fault-tolerant cluster management.

Key Features of ZooKeeper: 

✅ Leader Election
✅ Configuration Management
✅ Distributed Synchronization
✅ Naming Service
✅ Failure Detection

ZooKeeper in HBase

HBase is a distributed NoSQL database that relies on ZooKeeper for coordination.

🔹 HBase uses ZooKeeper for:
Master Election – Ensures only one HMaster is active.
RegionServer Coordination – Tracks active RegionServers.
Failure Detection – Detects RegionServer crashes and triggers reassignments.
Metadata Storage – Stores HBase root metadata, such as table structure and regions.

👉 Without ZooKeeper: HBase cannot assign or manage regions effectively, leading to inconsistency and failure.

ZooKeeper in Kafka

Kafka is a distributed event streaming platform that requires ZooKeeper to manage brokers.

🔹 Kafka uses ZooKeeper for:
Broker Coordination – Tracks active brokers in the cluster.
Topic Management – Stores metadata about topics, partitions, and replicas.
Leader Election – Selects the leader for each partition.
Consumer Group Management – Keeps track of consumer offsets.

👉 Without ZooKeeper: Kafka brokers cannot coordinate, leading to potential data loss or unavailability.

ZooKeeper in Sqoop

Sqoop is a tool for importing and exporting data between HDFS and RDBMS.

🔹 Sqoop uses ZooKeeper for:
Job Coordination – When used with Sqoop Metastore, ZooKeeper helps track job status.
Fault Tolerance – Ensures jobs resume correctly if interrupted.
Load Balancing – Helps manage parallel data transfer across multiple nodes.

👉 Without ZooKeeper: Distributed Sqoop jobs might fail due to lack of synchronization.

ZooKeeper Installation


Step 1: Install Java

Zookeeper runs on Java. Please ensure Java, preferably java 8 is installed.

sudo apt update
sudo apt install default-jdk

java --version

Step 2: Create a Dedicate user for Zookeeper

Create a dedicate user for for security and management. You can use the same user we have created (hdoop) for building the Hadoop ecosystem or a new one following the steps mentioned in the Hadoop installation page.

Step 3: Download and Install Zookeeper

You need to download and install a ZooKeeper version compatible with the Hadoop ecosystem. I have Hadoop 3.3.0 in my VM, the compatible HBase for this version of Hadoop is, HBase 2.3.0 and the compatible Zookeeper version for HBase 2.3.0, is ZooKeeper 3.5.9, which is also compatible with the Kafka 2.3.0 I have in my system.

Navigate to the zookeeper archive page and copy the link as shown below and download the apache-zookeeper-3.5.9-bin.tar.gz using the wget command.




wget https://archive.apache.org/dist/zookeeper/zookeeper-3.5.9/apache-zookeeper-3.5.9-bin.tar.gz


Extract the downloaded tar ball:


sudo tar -xzf apache-zookeeper-*.tar.gz


Create a Symlink:


ln -s /home/hdoop/apache-zookeeper-3.5.9-bin /home/hdoop/zookeeper


Ensure hdoop (or the new user created in step 2) fully owns the zookeeper folder:


sudo chown -R hdoop:hdoop /home/hdoop/zookeeper

sudo chown -R hdoop:hdoop /home/hdoop/apache-zookeeper-3.5.9-bin


Step 4: Setup the Zookeeper Data Directory

Create a directory where the zookeeper will store the data, ensure the user (hdoop or the one created in step 2) fully owns it.
 

sudo mkdir -p /home/hdoop/zookeeper/data

sudo chown hdoop:hdoop /home/hdoop/zookeeper/data

Step 5: Configure ZooKeeper

Create a configuration file by using the provided sample in config folder. 

cp /home/hdoop/zookeeper/conf/zoo_sample.cfg /home/hdoop/zookeeper/conf/zoo.cfg

nano /home/hdoop/zookeeper/conf/zoo.cfg

Add or modify the following basic configuration:

tickTime=2000

initLimit=10

syncLimit=5

dataDir=/home/hdoop/zookeeper/data

clientPort=2181

maxClientCnxns=60

Save and exit the file (ctrl+x, Y, enter)

6. Perform System Service Setup

Create a systemd service file for ZooKeeper:

sudo nano /etc/systemd/system/zookeeper.service

Copy Paste the below content into this file, then save and exit (ctrl + x, Y, enter)

[Unit]

Description=Apache ZooKeeper Service

After=network.target

[Service]

Type=forking

User=hdoop

Group=hdoop

ExecStart=/home/hdoop/zookeeper/bin/zkServer.sh start /home/hdoop/zookeeper/conf/zoo.cfg

ExecStop=/home/hdoop/zookeeper/bin/zkServer.sh stop

Restart=always

WorkingDirectory=/home/hdoop/zookeeper

#PIDFile=/home/hdoop/zookeeper/zookeeper_server.pid

[Install]

WantedBy=multi-user.target



This allows ZooKeeper to be managed by systemd, enabling automatic start on power on.

Reload systemd to recognize the newly created ZooKeeper Serice.

sudo systemctl daemon-reload


Start and enable ZooKeeper Service:

sudo systemctl start zookeeper

sudo systemctl enable zookeeper


Check the Status of the ZooKeeper:

sudo systemctl status zookeeper




Step 3: Connect to ZooKeeper Service

Check, if you can connect to the ZooKeeper Server using the below command:

zookeeper/bin/zkCli.sh -server 127.0.0.1:2181


On successful connection, you should see something like the below picture:


Perform an ls command at the ZooKeeper server command prompt:

ls /

You should notice below entries as shown in the picture:











Tuesday, 4 February 2025

Common Spark Applications



In this blog I am going to discusses about some of the common spark program patterns, written in Scala.


1. Read from RDBMS (MySQL)  append to an existing HIVE Warehouse Table

I will read a table from a remote MySQL database using spark to a DataFrame and insert the DataFrame to an existing HIVE table.

// MySQL connection parameters

val jdbcUrl = "jdbc:mysql://your-mysql-host:3306/your_database"

val dbTable = "your_table"

val dbUser = "your_username"

val dbPassword = "your_password"

// Read data from MySQL

def df = spark.read

    .format("jdbc")

    .option("url", jdbcUrl)

    .option("dbtable", dbTable)

    .option("user", dbUser)

    .option("password", dbPassword)

    .option("driver", "com.mysql.cj.jdbc.Driver")

    .load()

// Append data to an existing Hive table

df.write

    .mode("append")

    .saveAsTable("your_hive_database.your_hive_table")

2. Read from an HIVE table append to MySQL table

I will do the reverse here, I will read the data from a HIVE table into Spark DataFrame and store the data to a remote RDBMS (MySQL) table. 

// Hive table parameters

val hiveDatabase = "your_hive_database"

val hiveTable = "your_hive_table"

// MySQL connection parameters

val jdbcUrl = "jdbc:mysql://your-mysql-host:3306/your_database"

val dbTable = "your_mysql_table"

val dbUser = "your_username"

val dbPassword = "your_password"

// Read data from Hive

def df = spark.read

    .table(s"$hiveDatabase.$hiveTable")

// Append data to MySQL table

df.write

    .format("jdbc")

    .option("url", jdbcUrl)

    .option("dbtable", dbTable)

    .option("user", dbUser)

    .option("password", dbPassword)

    .option("driver", "com.mysql.cj.jdbc.Driver")

    .mode("append")

    .save()

3. Read CSV from Local (Linux) file System and store it as Parquet in HDFS 

Here we will read a CSV from local (Linux) file system to a Spark DataFrame and store the data to HDFS as Parquet.

// Define input and output paths

val inputCsvPath = "file:///path/to/input.csv" // Local Linux file path

val outputHdfsPath = "hdfs:///path/to/output_parquet" // HDFS path

// Read the CSV file with header and infer schema

def df = spark.read

    .option("header", "true")

    .option("inferSchema", "true")

    .csv(inputCsvPath)

// Write as Parquet to HDFS

df.write

    .mode("overwrite")

    .parquet(outputHdfsPath)

4. Read a JSON file from HDFS and store the data in HIVE

Here we will read a JSON file from HDFS to a Spark DataFrame and insert the data to a HIVE table.

// Define input JSON path in HDFS

val inputJsonPath = "hdfs:///path/to/input.json"

// Hive table details

val hiveDatabase = "your_hive_database"

val hiveTable = "your_hive_table"

// Read the JSON file from HDFS

def df = spark.read

    .option("inferSchema", "true")

    .json(inputJsonPath)

// Write data to Hive table

df.write

    .mode("overwrite")

    .saveAsTable(s"$hiveDatabase.$hiveTable")

5. Read from an XML in HDFS store the data in a HIVE table

In this code snippet I am reading an XML file from HDFS to a Spark DataFrame and storing the data to a HIVE table. 

// Define input XML path in HDFS

val inputXmlPath = "hdfs:///path/to/input.xml"

// Hive table details

val hiveDatabase = "your_hive_database"

val hiveTable = "your_hive_table"

// Read the XML file from HDFS

def df = spark.read

    .format("com.databricks.spark.xml")

    .option("rowTag", "rootElement") // Replace 'rootElement' with the actual XML tag

    .option("inferSchema", "true")

    .load(inputXmlPath)

// Write data to Hive table

df.write

    .mode("overwrite")

    .saveAsTable(s"$hiveDatabase.$hiveTable")

6. Read from CSV in HDFS and store it as HIVE ORC table

Here I will be reading a CSV in HDFS to Spark DataFrame and Store the data in a HIVE ORC table.

// Define input CSV path in HDFS

val inputCsvPath = "hdfs:///path/to/input.csv"

// Hive table details

val hiveDatabase = "your_hive_database"

val hiveTable = "your_hive_table"

// Read the CSV file from HDFS

def df = spark.read

    .option("header", "true")

    .option("inferSchema", "true")

    .csv(inputCsvPath)

// Write data to Hive table in ORC format

df.write

    .mode("overwrite")

    .format("orc")

    .saveAsTable(s"$hiveDatabase.$hiveTable")

7. Insert records to an HIVE ORC from Spark

Here in this code snippet I will read a CSV in HDFS and insert the data into an HIVE ORC table, then I will create some record manually and insert the manually created rows to the same HIVE table. 

// Import the required libraries

import org.apache.spark.sql.{Row, SparkSession}

import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}

// Define input CSV path in HDFS

val inputCsvPath = "hdfs:///path/to/input.csv"

// Hive table details

val hiveDatabase = "your_hive_database"

val hiveTable = "your_hive_table"

// Read the CSV file from HDFS

def df = spark.read

    .option("header", "true")

    .option("inferSchema", "true")

    .csv(inputCsvPath)

// Write data to Hive table in ORC format

df.write

    .mode("overwrite")

    .format("orc")

    .saveAsTable(s"$hiveDatabase.$hiveTable")

// Manually create a new record

val newDataSchema = StructType(Seq(

    StructField("id", IntegerType, false),

    StructField("name", StringType, true),

    StructField("age", IntegerType, true)

))

val newData = Seq(

    Row(101, "John Doe", 30),

    Row(102, "Jane Smith", 28)

)

val newDF = spark.createDataFrame(spark.sparkContext.parallelize(newData), newDataSchema)

// Append new records to Hive table in ORC format

newDF.write

    .mode("append")

    .format("orc")

    .saveAsTable(s"$hiveDatabase.$hiveTable")

8. Word Count in Spark

Here is a wordcount program written in Spark, the result is stored to a RDBMS table.

//Import the required libraries

import org.apache.spark.sql.{Row, SparkSession}

import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}

// Define input text file path in HDFS

val inputTextPath = "hdfs:///path/to/input.txt"

// Read the text file from HDFS and perform word count

val textFile = spark.read.textFile(inputTextPath)

val wordCounts = textFile.rdd

    .flatMap(line => line.split(" "))

    .map(word => (word, 1))

    .reduceByKey(_ + _)

    .toDF("word", "count")

// MySQL connection parameters

val jdbcUrl = "jdbc:mysql://your-mysql-host:3306/your_database"

val dbTable = "word_count_table"

val dbUser = "your_username"

val dbPassword = "your_password"

// Write word count results to MySQL

wordCounts.write

    .format("jdbc")

    .option("url", jdbcUrl)

    .option("dbtable", dbTable)

    .option("user", dbUser)

    .option("password", dbPassword)

    .option("driver", "com.mysql.cj.jdbc.Driver")

    .mode("overwrite")

    .save()

9. Common transformations on DataFrames

Here is a Spark Programme covering most common transformations on DataFrames.

//Import required libraries

import org.apache.spark.sql.{SparkSession, DataFrame} import org.apache.spark.sql.functions._

// Step 1: Load a sample DataFrame (you can replace it with your own dataset)

val data = Seq( (1, "Alice", 29, "New York"), (2, "Bob", 31, "Los Angeles"), (3, "Catherine", 24, "Chicago"), (4, "David", 35, "New York"), (5, "Eva", 29, "Chicago") ) val columns = Seq("id", "name", "age", "city") // Create a DataFrame val df: DataFrame = spark.createDataFrame(data).toDF(columns: _*) // Step 2: Apply common transformations // 2.1 Select specific columns val selectedColumnsDF = df.select("name", "age") selectedColumnsDF.show() // 2.2 Filter data (e.g., select people older than 30) val filteredDF = df.filter(col("age") > 30) filteredDF.show() // 2.3 Add a new column (e.g., increase age by 1) val updatedDF = df.withColumn("age_plus_one", col("age") + 1) updatedDF.show() // 2.4 Group by and aggregate (e.g., count people by city) val groupByCityDF = df.groupBy("city").count() groupByCityDF.show() // 2.5 Join with another DataFrame val data2 = Seq( (1, "Engineer"), (2, "Doctor"), (3, "Artist"), (4, "Chef"), (5, "Teacher") ) val columns2 = Seq("id", "profession") val df2 = spark.createDataFrame(data2).toDF(columns2: _*) val joinedDF = df.join(df2, "id") joinedDF.show() // 2.6 Drop a column val droppedDF = df.drop("age") droppedDF.show() // 2.7 Rename a column val renamedDF = df.withColumnRenamed("name", "full_name") renamedDF.show() // 2.8 Order by a column (e.g., order by age) val orderedDF = df.orderBy(col("age").desc) orderedDF.show() // 2.9 Distinct values (remove duplicates) val distinctDF = df.distinct() distinctDF.show() // 2.10 Create a new column using a conditional expression val conditionalDF = df.withColumn( "age_group", when(col("age") < 30, "Young") .when(col("age") >= 30 && col("age") < 40, "Middle-aged") .otherwise("Older") ) conditionalDF.show()

10. Common transformations on DataSets

//Import Required libraries

import org.apache.spark.sql.{SparkSession}

import org.apache.spark.sql.functions._

import org.apache.spark.sql.Dataset

// Step 1: Define case class for a strongly-typed Dataset

case class Person(id: Int, name: String, age: Int, city: String, profession: String)

// Step 2: Load data into Dataset (you can replace it with your own dataset)

import spark.implicits._

val data = Seq(

  Person(1, "Alice", 29, "New York", "Engineer"),

  Person(2, "Bob", 31, "Los Angeles", "Doctor"),

  Person(3, "Catherine", 24, "Chicago", "Artist"),

  Person(4,"David", 35, "New York", "Chef"),

  Person(5,"Eva", 29, "Chicago", "Teacher")

)

// Convert the sequence to a Dataset

val ds: Dataset[Person] = spark.createDataset(data)

// Step 3: Apply common transformations

// 3.1 Select specific columns (using Dataset API)

val selectedColumnsDS = ds.select($"name", $"age")

selectedColumnsDS.show()

// 3.2 Filter data (e.g., select people older than 30)

val filteredDS = ds.filter($"age" > 30)

filteredDS.show()

// 3.3 Add a new column (e.g., increase age by 1)

val updatedDS = ds.withColumn("age_plus_one", $"age" + 1)

updatedDS.show()

// 3.4 Group by and aggregate (e.g., count people by city)

val groupByCityDS = ds.groupBy($"city").count()

groupByCityDS.show()

// 3.5 Join with another Dataset (using Dataset API)

val data2 = Seq(

  (1,"Engineer"),

  (2,"Doctor"),

  (3,"Artist"),

  (4,"Chef"),

  (5,"Teacher"))

val columns2 = Seq("id", "profession")

val ds2 = data2.toDF("id", "profession").as[(Int, String)]

val joinedDS = ds.join(ds2, "id")

joinedDS.show()

// 3.6 Drop a column

val droppedDS = ds.drop("age")

droppedDS.show()

// 3.7 Rename a column

val renamedDS = ds.withColumnRenamed("name","full_name")

renamedDS.show()

// 3.8 Order by a column (e.g., order by age)

val orderedDS = ds.orderBy($"age".desc)

orderedDS.show()

// 3.9 Distinct values (remove duplicates)

val distinctDS = ds.distinct()

distinctDS.show()

// 3.10 Create a new column using a conditional expression

val conditionalDS = ds.withColumn("age_group", 

when($"age" < 30, "Young")  

.when($"age" >= 30 && $"age" < 40,"Middle-aged")

.otherwise("Older"))

conditionalDS.show()

11. Common transformations using Spark SQL

In this code snippet I will be rewriting the above program using Spark SQL.
 

   // Step 1: Import the required libraries

     import org.apache.spark.sql.{SparkSession}

    import org.apache.spark.sql.functions._

    // Step 2: Load data into a DataFrame (you can replace it with your own dataset)

    import spark.implicits._

    val data = Seq(

      (1, "Alice", 29, "New York", "Engineer"),

      (2, "Bob", 31, "Los Angeles", "Doctor"),

      (3, "Catherine", 24, "Chicago", "Artist"),

      (4, "David", 35, "New York", "Chef"),

      (5, "Eva", 29, "Chicago", "Teacher")

    )

    // Convert the sequence to a DataFrame

    val df = data.toDF("id", "name", "age", "city", "profession")

    // Step 3: Register the DataFrame as a temporary SQL table

    df.createOrReplaceTempView("people")

    // Step 4: Apply common transformations using Spark SQL

    // 4.1 Select specific columns

    println("Select specific columns:")

    spark.sql("SELECT name, age FROM people").show()

    // 4.2 Filter data (e.g., select people older than 30)

    println("Filter people older than 30:")

    spark.sql("SELECT * FROM people WHERE age > 30").show()

    // 4.3 Add a new column (e.g., increase age by 1)

    println("Add a new column (age + 1):")

    spark.sql("SELECT *, age + 1 AS age_plus_one FROM people").show()

    // 4.4 Group by and aggregate (e.g., count people by city)

    println("Group by and count people by city:")

    spark.sql("SELECT city, COUNT(*) AS count FROM people GROUP BY city").show()

    // 4.5 Join with another DataFrame (using Spark SQL)

    val data2 = Seq(

      (1, "Engineer"),

      (2, "Doctor"),

      (3, "Artist"),

      (4, "Chef"),

      (5, "Teacher")

    )

    // Convert the second dataset to a DataFrame

    val df2 = data2.toDF("id", "profession")

    // Register the second DataFrame as a temporary table

    df2.createOrReplaceTempView("professions")

    spark.sql("SELECT p.id, p.name, p.age, p.city, pr.profession FROM people p JOIN professions pr ON p.id = pr.id").show()

    // 4.6 Drop a column (Spark SQL doesn't support dropping columns directly, but we can select without it)

    spark.sql("SELECT id, name, city, profession FROM people").show()

    // 4.7 Rename a column (again, done by selecting with the new name)

    spark.sql("SELECT id, name AS full_name, age, city, profession FROM people").show()

    // 4.8 Order by a column (e.g., order by age)

    spark.sql("SELECT * FROM people ORDER BY age DESC").show()

    // 4.9 Distinct values (remove duplicates)

    spark.sql("SELECT DISTINCT * FROM people").show()

    // 4.10 Create a new column using a conditional expression

    spark.sql("""

      SELECT id, name, age, city, profession,

             CASE

               WHEN age < 30 THEN 'Young'

               WHEN age >= 30 AND age < 40 THEN 'Middle-aged'

               ELSE 'Older'

             END AS age_group

      FROM people

    """).show()

12. Common transformations using RDD

In this code snippet I will be showing some common transformation on RDD.

    // Step 1: Import libraries  

    import org.apache.spark.sql.SparkSession

// Step 2: Create an RDD from a collection val data = Seq( (1, "Alice", 29), (2, "Bob", 31), (3, "Catherine", 24), (4, "David", 35), (5, "Eva", 29) ) // Create an RDD from a sequence of tuples val rdd = spark.sparkContext.parallelize(data) // Step 3: Apply common RDD transformations // 3.1 Map operation (Transforming the data) val namesRDD = rdd.map(record => record._2) println("Names RDD:") namesRDD.collect().foreach(println) // 3.2 Filter operation (Filter out records where age is greater than 30) val filteredRDD = rdd.filter(record => record._3 > 30) println("Filtered RDD (age > 30):") filteredRDD.collect().foreach(println) // 3.3 FlatMap operation (Used to flatten results, e.g., splitting a string into words) val wordsRDD = rdd.flatMap(record => record._2.split(" ")) println("FlatMap RDD (splitting names into words):") wordsRDD.collect().foreach(println) // 3.4 GroupBy operation (Group by age) val groupedByAgeRDD = rdd.groupBy(record => record._3) println("Grouped by Age RDD:") groupedByAgeRDD.collect().foreach(println) // 3.5 Reduce operation (Sum of all ages) val sumOfAges = rdd.map(record => record._3).reduce((a, b) => a + b) println(s"Sum of ages: $sumOfAges") // 3.6 Aggregate operation (Calculate sum and count of ages) val aggregatedRDD = rdd.map(record => record._3).aggregate((0, 0))( (acc, age) => (acc._1 + age, acc._2 + 1), // combiner logic (acc1, acc2) => (acc1._1 + acc2._1, acc1._2 + acc2._2) // combiner logic for different partitions ) println(s"Sum of ages: ${aggregatedRDD._1}, Count of records: ${aggregatedRDD._2}") // 3.7 Union operation (Combine two RDDs) val moreData = Seq( (6, "George", 28), (7, "Hannah", 32) ) val moreDataRDD = spark.sparkContext.parallelize(moreData) val unionRDD = rdd.union(moreDataRDD) println("Union of RDDs:") unionRDD.collect().foreach(println) // 3.8 Intersection operation (Find common elements between two RDDs) val intersectionRDD = rdd.intersection(moreDataRDD) println("Intersection of RDDs:") intersectionRDD.collect().foreach(println) // 3.9 Distinct operation (Remove duplicates) val distinctRDD = rdd.distinct() println("Distinct RDD:") distinctRDD.collect().foreach(println) // 3.10 SortBy operation (Sort by age) val sortedRDD = rdd.sortBy(record => record._3) println("Sorted by age RDD:") sortedRDD.collect().foreach(println) // Step 4: Apply common RDD actions // 4.1 Collect operation (Collect all elements of the RDD to the driver) val collectedData = rdd.collect() println("Collected RDD data:") collectedData.foreach(println) // 4.2 Count operation (Count the number of records) val count = rdd.count() println(s"Number of records in RDD: $count") // 4.3 First operation (Get the first element of the RDD) val firstElement = rdd.first() println(s"First element in RDD: $firstElement") // 4.4 Take operation (Take the first n elements) val takeTwo = rdd.take(2) println("First 2 elements in RDD:") takeTwo.foreach(println) // 4.5 Save operation (Save the RDD to a text file) rdd.saveAsTextFile("output_rdd.txt") println("RDD data has been saved to 'output_rdd.txt'")

Sunday, 2 February 2025

Installing and Configuring DbVisualizer in Ubuntu to Connect to a Remote Hive Warehouse

 



In this blog post we will discusses how database developers and data analyst can connect to a remote HIVE warehouse to perform querying and analysis of data in HDFS.

Step 1: Install Java 21 in data analyst system (different user account)

Command:

sudo apt update 

sudo apt install openjdk-21-jdk

Step 2: Find the Java Installation Path

Command:

update-alternatives --list java

Step 3: Set Java 21 for Only One User

Command:

nano .bashrc

Add the below environment variables

#JAVA Related Options

export JAVA_HOME=/usr/lib/jvm/java-21-openjdk-amd64/bin/java

export PATH=$PATH:$JAVA_HOME/bin

Refresh the profile

source ~/.bashrc

Step 4: Verify Java version

Command

java -version

Step 5: Download the DbVisualizer

Command

Go to https://www.dbvis.com/download/ and copy the link of the latest linux distribution.


wget https://www.dbvis.com/product_download/dbvis-24.3.3/media/dbvis_linux_24_3_3.tar.gz

Extract the binaries:

tar xvfz dbvis_linux_24_3_3.tar.gz

Step 6: Configure the class path

Command:

nano .bashrc

Add the following lines to the end of .bashrc file

#DbVisualizer
export INSTALL4J_JAVA_HOME=/usr/lib/jvm/java-21-openjdk-amd64/bin/java
export DB_VIS=/home/aksahoo/applications/DbVisualizer
export PATH=$PATH:$DB_VIS/

Refresh the profile:

source ~/.bashrc

Step 7: Start the DbVisualizer


Command:

dbvis

Step 8: Create a connection to Remote HIVE Warehouse








Provide the Hive Server 2 details.

Database Server: localhost
Database Port: 10000
Database: default
Database Userid: hdoop
database Password: 

Step 9: Allow User Impersonation in Hadoop Core Site (Hadoop user account)


Edit core-stite.xml: nano hadoop/etc/hadoop/core-site.xml

Add the following configurations

<property>
  <name>hadoop.proxyuser.hdoop.hosts</name>
  <value>*</value>
</property>
<property>
  <name>hadoop.proxyuser.hdoop.groups</name>
  <value>*</value>
</property>

Step 10: Allow Impersonation in Hive Configuration (Hadoop user account)


Edit hive-site.xml: nano hive/conf/hive-site.xml

Add the following configurations

<property>
  <name>hive.server2.enable.doAs</name>
  <value>true</value>
</property>

Step 11: Restart Hadoop and ensure Hadoop Services are up and running (Hadoop user account) in this case hdoop



Command:

cd /home/hdoop/hadoop/sbin
./stop-all.sh
./start-all.sh
jps

Step 12: Refresh user to group Mapping


Command:
hdfs dfsadmin -refreshUserToGroupsMappings


Step 13: Start the HIVE Metastore Service and HiveServer2

Command:
hive --service metastore &
hive --service hiveserver2 &


Step 14: Go to DbVisualizer and test connection






Step 15: Quey HIVE tables from DbVisualizer

HQL: select * from employees;






Saturday, 1 February 2025

Spark and Hive Metastore Integration


Spark can be integrated with hive metastore to have a common metastore layer between hive and spark. In this blog I will detail out steps on how to reuse hive metastore for spark engine.

Prerequisite:

1. Existing Hadoop installation

2. Existing Hive Installation

3. Existing Spark Installation: Steps to install Spark can be found here

Step 1: Copy the Hive Metastore RDBMS Driver from hive/lib to spark/jars folder

Command: cp hive/lib/mysql-connector-java-8.0.28.jar spark/jars/

Note: Assuming the Hive Metastore is MySQL database.




Step 2: Ensure MySQL and Hive Metastore Services are running

command:

sudo systemctl start mysql

hive --service metastore &

Step 3: Edit $SPARK_HOME/conf/spark-defaults.conf (create it if missing):


Add the following line.

spark.sql.catalogImplementation=hive


Step 4: Verify Spark-Hive Metastore Integration


Start Spark Shell: spak-shell

And execute below line at the Scala prompt: spark.sql("SHOW DATABASES").show()

If it shows all the hive databases, then the integration is successful.


Step 5: Make Sure Hadoop Services are up and running, if not start it.




Command:

To verify: jps

To start:
cd /home/hdoop/hadoop/sbin
./start-all.sh

Step 6: Run a HQL to read the data (which is in HDFS) from a table


 Step 7: Accessing HIVE Databases and Tables from spark-sql

If the above configuration is working fine, hive databases and tables can be access direct from the spark-sql.

Command: spark-sql









Apache Sqoop: A Comprehensive Guide to Data Transfer in the Hadoop Ecosystem

  Introduction In the era of big data, organizations deal with massive volumes of structured and unstructured data stored in various systems...