0% found this document useful (0 votes)
8 views60 pages

Hadoop File Management Tasks: Separate Blocks Instead of A Flowing Answer Continuity Marks Smooth, Connected Narrative

The document outlines the implementation of file management operations in Hadoop's HDFS, including creating directories, uploading, retrieving, and deleting files. It also details the MapReduce framework, specifically through a Word Count program and a Max Temperature program, illustrating their execution steps and source code. The document emphasizes Hadoop's distributed architecture and its efficiency in processing large datasets.
Copyright
© © All Rights Reserved
We take content rights seriously. If you suspect this is your content, claim it here.
Available Formats
Download as DOCX, PDF, TXT or read online on Scribd
0% found this document useful (0 votes)
8 views60 pages

Hadoop File Management Tasks: Separate Blocks Instead of A Flowing Answer Continuity Marks Smooth, Connected Narrative

The document outlines the implementation of file management operations in Hadoop's HDFS, including creating directories, uploading, retrieving, and deleting files. It also details the MapReduce framework, specifically through a Word Count program and a Max Temperature program, illustrating their execution steps and source code. The document emphasizes Hadoop's distributed architecture and its efficiency in processing large datasets.
Copyright
© © All Rights Reserved
We take content rights seriously. If you suspect this is your content, claim it here.
Available Formats
Download as DOCX, PDF, TXT or read online on Scribd

Yeah, you’re right to call that out — the content is correct, but it reads like separate blocks

instead of a flowing answer. In a 50-mark paper, continuity = marks.

Let me fix it into a smooth, connected narrative (while still keeping commands and
structure clear).

HADOOP FILE MANAGEMENT TASKS

1. Aim
The aim of this experiment is to understand and implement file management operations in
Hadoop Distributed File System (HDFS), including creating directories, uploading files,
retrieving files, and deleting files using Hadoop commands.

2. Software Requirements
To perform this experiment, the following software setup is required. A system with Apache
Hadoop (version 2.x or 3.x) installed and configured is essential. Java (JDK 8 or above) must
be available since Hadoop is Java-based. A Linux/Ubuntu environment is preferred for
smooth execution, along with access to a terminal or command line interface.
3. Introduction to Hadoop

Apache Hadoop is an open-source framework designed to store and process large datasets in
a distributed environment. Instead of relying on a single system, Hadoop distributes data
across multiple machines, allowing efficient storage and parallel processing.

The Hadoop ecosystem mainly consists of HDFS for storage, YARN for resource
management, and MapReduce for data processing. When a file is stored in Hadoop, it is
divided into smaller blocks and distributed across different machines, ensuring both
scalability and fault tolerance.

This distributed nature makes Hadoop highly reliable, as even if one node fails, data can still
be retrieved from other nodes.

4. HDFS Architecture
HDFS follows a master-slave architecture. At the center of this system is the NameNode,
which acts as the master and manages all metadata such as file names, directory structure,
and block locations.

The actual data is stored in DataNodes, which are worker nodes responsible for storing and
retrieving data blocks. These DataNodes continuously communicate with the NameNode
through heartbeat signals to indicate that they are functioning properly.

When a file is stored, it is split into blocks and replicated across multiple DataNodes. This
replication ensures fault tolerance and data reliability.

Understanding this architecture is important because all file management operations


internally depend on the interaction between NameNode and DataNodes.

5. File Management Operations in HDFS


All file operations in Hadoop are performed using the command:

hdfs dfs <command>

These operations can be understood step by step in a logical flow, starting from creating
directories, then adding files, retrieving them, and finally deleting them.

i) Adding Files and Directories


The first step in file management is creating a directory structure in HDFS, similar to how
folders are created in a local system.
To create a directory in HDFS, the following command is used:

hdfs dfs -mkdir /user/hadoop


hdfs dfs -mkdir /user/hadoop/input

Once the directory is created, files can be uploaded from the local system into HDFS using:

hdfs dfs -put [Link] /user/hadoop/input/

Alternatively, the same operation can be performed using:

hdfs dfs -copyFromLocal [Link] /user/hadoop/input/

At this stage, Hadoop splits the file into blocks and distributes them across multiple
DataNodes. These blocks are also replicated to ensure data safety.

ii) Retrieving Files


After storing data in HDFS, the next logical operation is retrieving it back to the local system.

To download a file from HDFS, the following command is used:

hdfs dfs -get /user/hadoop/input/[Link] /home/user/

This operation can also be performed using:

hdfs dfs -copyToLocal /user/hadoop/input/[Link] /home/user/

If the user wants to directly view the content of a file without downloading it, the command
is:

hdfs dfs -cat /user/hadoop/input/[Link]

During retrieval, the NameNode identifies where the file blocks are stored, and the
DataNodes send those blocks to the client, where they are combined to reconstruct the
original file.

iii) Listing Files (Supporting Operation)


Before performing retrieval or deletion, it is often necessary to view the files present in
HDFS.

This can be done using:

hdfs dfs -ls /user/hadoop/


To list all files recursively:

hdfs dfs -ls -R /user/hadoop/

This step helps in verifying file locations and directory structure.

iv) Deleting Files and Directories


The final step in file management is deleting files or directories when they are no longer
needed.

To delete a file:

hdfs dfs -rm /user/hadoop/input/[Link]

To remove an empty directory:

hdfs dfs -rmdir /user/hadoop/input

If the directory contains files, it can be deleted using:

hdfs dfs -rm -r /user/hadoop/input

Additionally, to permanently delete files without moving them to trash:

hdfs dfs -rm -skipTrash /user/hadoop/input/[Link]

When a file is deleted, its blocks are removed from DataNodes, and the NameNode updates
the metadata accordingly.

6. Result
Thus, the file management operations in Hadoop HDFS were successfully performed,
including creating directories, uploading files, retrieving files, listing contents, and deleting
files.

7. Conclusion
From this experiment, it can be concluded that Hadoop provides an efficient and reliable way
to manage large-scale data using HDFS. The system’s distributed architecture ensures high
availability, fault tolerance, and scalability. Using simple commands, users can perform
complex file operations seamlessly in a distributed environment.
Good catch — calling it “Terminologies” sounds like theory class, not a lab record. We’ll
convert that section into something more practical and exam-friendly like “Components
Involved in Execution” and keep the flow intact.

Here’s your final structured answer (clean + continuous + lab-ready) 👇

AIM:
To run a basic Word Count MapReduce program and understand the MapReduce paradigm in
Hadoop.

Software Requirements:
 Windows / Linux Operating System
 Cloudera (Hadoop Environment)
 Eclipse IDE
 Java (JDK 8 or above)

Definition of MapReduce:
MapReduce is the processing framework of Apache Hadoop used for handling large-scale
data in a distributed environment. It is based on two main operations: Map and Reduce.

The Map phase converts input data into key-value pairs, and the Reduce phase processes
these pairs to generate the final output. This approach enables parallel processing, making it
efficient for handling big data.

Components Involved in Execution:


Instead of just theoretical terms, the following components actively participate during the
execution of a MapReduce job:

 Mapper – Processes input data and generates intermediate key-value pairs


 Reducer – Aggregates intermediate data and produces final output
 NameNode – Maintains metadata and manages file locations in HDFS
 DataNode – Stores actual data and performs operations on data blocks
 Master Node – Controls job execution and scheduling
 Worker Nodes – Execute mapping and reducing tasks
 Job – Complete execution of Map and Reduce tasks
 Task – A single unit of work assigned to a node

Stages of MapReduce:
The MapReduce process consists of multiple stages that ensure efficient distributed
processing.

1. Input Splitting
The input file stored in HDFS is divided into smaller chunks called input splits. Each split is
processed independently, enabling parallel execution.

2. Mapping Phase
In this phase, each split is processed by a Mapper. The Mapper reads the data and converts it
into key-value pairs.

For the Word Count problem:

 Input → Text data


 Output → (word, 1)

3. Shuffling and Sorting


This phase groups all intermediate values based on their keys. Words with the same key are
combined together and prepared for reduction.

4. Reducing Phase
In this phase, grouped data is processed to produce final results. The Reducer sums up values
for each key and generates the total count.

5. Output Phase
The final output is written back into HDFS as result files.

The execution of the Word Count program follows a complete workflow:

1. Input file is stored in HDFS


2. File is split into blocks and distributed across DataNodes
3. Each block is processed by a separate Mapper
4. Mapper generates intermediate (word, 1) pairs
5. Shuffle phase groups similar words together
6. Reducer aggregates counts for each word
7. Final output is written into HDFS

This workflow demonstrates parallel processing and distributed computation.

SOURCE CODE:

import [Link];

import [Link];

import [Link];

import [Link];

import [Link];

import [Link];

import [Link];

import [Link];

import [Link];

import [Link];

import [Link];

import [Link];

public class WordCount {

// Simple Mapper Class Name

public static class WordCountMapper


extends Mapper<LongWritable, Text, Text, IntWritable> {

private final static IntWritable one = new IntWritable(1);

private Text word = new Text();

public void map(LongWritable key, Text value, Context context)

throws IOException, InterruptedException {

StringTokenizer itr = new StringTokenizer([Link]());

while ([Link]()) {

[Link]([Link]());

[Link](word, one);

// Simple Reducer Class Name

public static class WordCountReducer

extends Reducer<Text, IntWritable, Text, IntWritable> {

public void reduce(Text key, Iterable<IntWritable> values, Context context)

throws IOException, InterruptedException {


int sum = 0;

for (IntWritable val : values) {

sum += [Link]();

[Link](key, new IntWritable(sum));

public static void main(String[] args) throws Exception {

Configuration conf = new Configuration();

Job job = [Link](conf, "word count");

[Link]([Link]);

// Updated class names here

[Link]([Link]);

[Link]([Link]);

[Link]([Link]);

[Link]([Link]);

[Link]([Link]);
[Link](job, new Path(args[0]));

[Link](job, new Path(args[1]));

[Link]([Link](true) ? 0 : 1);

Steps to Create Project in Eclipse:


1. Open Eclipse → Click on File → New → Java Project
2. Enter project name (e.g., WordCount) → Click Next
3. Go to Libraries tab → Click Add External JARs
4. Navigate to Hadoop directory:
o Select required JAR files from /usr/lib/hadoop
o Also add JARs from /usr/lib/hadoop/client
5. Click Finish to create the project

Creating and Exporting JAR File:

 Create a new class and write the program


 Save the file
 Right-click project → Export
 Select Java → JAR file
 Provide name ([Link])
 Click Finish

Steps to Execute in Cloudera:


Step 1: Start Cloudera Environment

 Launch Cloudera QuickStart VM


 Open terminal
Step 2: Create Input Directory
hdfs dfs -mkdir /user/cloudera/input

Step 3: Upload Input File


hdfs dfs -put [Link] /user/cloudera/input/

Step 4: Run the MapReduce Program


hadoop jar [Link] WordCount /user/cloudera/input /user/cloudera/output

Step 5: View Output


hdfs dfs -cat /user/cloudera/output/part-r-00000

Step 6: (Optional) Using Hue Interface

 Open browser → Hue


 Navigate to File Browser
 View output file

Result:
The Word Count MapReduce program was successfully executed, and the frequency of each
word was generated and stored in HDFS.

Conclusion:
Thus, the MapReduce paradigm was successfully implemented using the Word Count
program. It demonstrates how Hadoop processes large datasets in a distributed and parallel
manner, ensuring efficiency and scalability.

Additional Content for Max Temperature


Program
Program Description
The Max Temperature program is a MapReduce application used to find the maximum
temperature for each year from a given dataset.

The input data is expected in the format:

year,temperature

Example:

2020,35
2020,40
2021,38

The program processes this data and outputs the maximum temperature for each year.

Working Steps of the Program


The execution of the Max Temperature program follows these steps:

Step 1: Input Data in HDFS

 The dataset is stored in HDFS


 Each line contains year and temperature values

Step 2: Input Splitting

 The input file is divided into smaller splits


 Each split is assigned to a Mapper

Step 3: Mapping Phase

 Each Mapper reads a line of input


 Extracts year and temperature
 Emits intermediate key-value pairs

👉 Output format:

(year, temperature)

Example:

(2020, 35)
(2020, 40)
(2021, 38)
Step 4: Shuffle and Sort

 All values with the same year are grouped together


 Data is sorted and sent to Reducers

Example:

(2020, [35, 40])


(2021, [38])

Step 5: Reducing Phase

 Reducer processes each group


 Finds the maximum temperature for each year

👉 Output:

(2020, 40)
(2021, 38)

Step 6: Output Storage

 Final results are stored in HDFS


 Output is written in part files

Algorithm
1. Start the program
2. Load dataset into HDFS
3. Split input into smaller chunks
4. Mapper emits (year, temperature) pairs
5. Shuffle groups data by year
6. Reducer finds maximum temperature
7. Store results in HDFS
8. End program

Sample Input and Output


Input:
2020,35
2020,40
2021,38

Output:
2020 40
2021 38

Applications
 Weather data analysis
 Climate monitoring systems
 Time-series data processing
 Data aggregation tasks

Formatted Source Code (Your Code


Cleaned)
import [Link];

import [Link];
import [Link];
import [Link].*;
import [Link].*;
import [Link];
import [Link];

public class MaxTemp {

// Mapper
public static class MaxTempMapper
extends Mapper<Object, Text, Text, IntWritable> {

public void map(Object key, Text value, Context context)


throws IOException, InterruptedException {

String line = [Link]();


String[] parts = [Link](",");

// Check to avoid errors


if ([Link] == 2) {
String year = parts[0];
int temp = [Link](parts[1]);

[Link](new Text(year), new IntWritable(temp));


}
}
}

// Reducer
public static class MaxTempReducer
extends Reducer<Text, IntWritable, Text, IntWritable> {

public void reduce(Text key, Iterable<IntWritable> values, Context context)


throws IOException, InterruptedException {

int max = Integer.MIN_VALUE;

for (IntWritable val : values) {


if ([Link]() > max) {
max = [Link]();
}
}

[Link](key, new IntWritable(max));


}
}

// Driver
public static void main(String[] args) throws Exception {

Configuration conf = new Configuration();


Job job = [Link](conf, "Max Temperature");

[Link]([Link]);

// Set classes
[Link]([Link]);
[Link]([Link]);

// Combiner (optional)
[Link]([Link]);

[Link]([Link]);
[Link]([Link]);

[Link](job, new Path(args[0]));


[Link](job, new Path(args[1]));

[Link]([Link](true) ? 0 : 1);
}
}

Exercise -4
import [Link];
import [Link];
import [Link];
import [Link].*;
import [Link].*;
import [Link];
import [Link];

public class SalaryGender {

// Mapper Class
public static class MyMapper
extends Mapper<LongWritable, Text, Text, IntWritable> {

public void map(LongWritable key, Text value, Context context)


throws IOException, InterruptedException {

String data[] = [Link]().split(" ");

String gender = data[2];


int salary = [Link](data[3]);

[Link](new Text(gender), new IntWritable(salary));


}
}
// Reducer Class
public static class MyReducer
extends Reducer<Text, IntWritable, Text, Text> {

public void reduce(Text key, Iterable<IntWritable> values,


Context context)
throws IOException, InterruptedException {

int sum = 0;
int count = 0;
for (IntWritable val : values) {
sum += [Link]();
count++;
}

double avg = (double) sum / count;

[Link](key,
new Text("Total Salary = " + sum +
", Average Salary = " + avg));
}
}

// Driver/Main Method
public static void main(String[] args) throws Exception {

Configuration conf = new Configuration();


Job job = [Link](conf, "Salary by Gender");

[Link]([Link]);

[Link]([Link]);
[Link]([Link]);

[Link]([Link]);
[Link]([Link]);

[Link]([Link]);
[Link]([Link]);

[Link](job, new Path(args[0]));


[Link](job, new Path(args[1]));

[Link]([Link](true) ? 0 : 1);
}
}

Input ([Link])
1 John Male 50000
2 Anita Female 60000
3 Ravi Male 45000
4 Priya Female 55000
5 Arun Male 70000
Compile
hadoop [Link] [Link]
jar cf [Link] SalaryGender*.class

Run
hadoop jar [Link] SalaryGender input output

Output
Female Total Salary = 115000, Average Salary = 57500.0
Male Total Salary = 165000, Average Salary = 55000.0

FINAL full Experiment 5 (Apache Pig) answer with everything included except errors,
comparison, and limitations, cleanly structured and continuous so you can directly write it
in exam/lab record.

AIM:
To implement Word Count using Apache Pig and understand data processing using Pig Latin
scripts.
Software Requirements:
 Hadoop (Cloudera Environment)
 Apache Pig
 Linux Terminal
 HDFS

Introduction to Apache Pig


Apache Pig is a high-level platform used for analyzing large datasets in Hadoop. It uses a
scripting language called Pig Latin, which simplifies complex data processing tasks.
Instead of writing lengthy MapReduce programs in Java, users can write simple Pig scripts
that are internally converted into MapReduce jobs. This makes development faster, easier,
and more readable.

Pig Architecture (Detailed)


Apache Pig follows a layered architecture:
1. Pig Latin Script Layer
 User writes scripts using Pig Latin
 Includes operations like LOAD, FOREACH, GROUP, STORE

2. Parser
 Checks syntax of the script
 Validates structure and correctness

3. Logical Plan
 Converts script into logical operations
 Example: LOAD → TOKENIZE → GROUP → COUNT

4. Optimizer
 Optimizes execution plan
 Reduces unnecessary data movement
 Improves performance

5. Physical Plan
 Converts logical plan into executable steps

6. Execution Engine
 Converts Pig script into MapReduce jobs
 Executes on Hadoop cluster using HDFS

Pig Execution Modes


Apache Pig can run in two modes:
1. Local Mode
 Runs on a single machine
 Uses local file system
 Suitable for testing small datasets
pig -x local

2. MapReduce Mode (Hadoop Mode)


 Runs on Hadoop cluster
 Uses HDFS for storage
 Suitable for large datasets
pig -x mapreduce
👉 In Cloudera, Pig runs in MapReduce mode by default.

Types of Data in Pig


Pig supports various data types:
 Atomic Types – int, float, chararray
 Tuple – Ordered set of fields
 Bag – Collection of tuples
 Map – Key-value pairs
Example:
(student:tuple(name, age), marks:bag{(sub,score)})

Built-in Functions Used


 LOAD → Reads data from HDFS
 TOKENIZE() → Splits sentence into words
 FLATTEN() → Converts nested data into simple format
 GROUP → Groups data based on key
 COUNT() → Counts number of occurrences
 STORE → Saves output into HDFS

Working of Pig Script

Step 1: Load Data


 Reads input file from HDFS
 Stores data as a relation

Step 2: Tokenize Words


 Splits each line into individual words
 FLATTEN converts nested output into simple format

Step 3: Group Words


 Groups identical words together

Step 4: Count Words


 Counts occurrences of each word

Step 5: Store Output


 Saves final result into HDFS

Data Flow of Pig Execution


The execution flow of Pig is as follows:
1. Data is loaded from HDFS
2. Script is parsed and validated
3. Logical plan is created
4. Plan is optimized
5. Converted into MapReduce jobs
6. Executed on Hadoop cluster
7. Output stored in HDFS

Pig Script (Word Count)


data = LOAD '/user/cloudera/[Link]' USING PigStorage(',') AS (line:chararray);

words = FOREACH data GENERATE FLATTEN(TOKENIZE(line)) AS word;

grouped_words = GROUP words BY word;

word_count = FOREACH grouped_words GENERATE group AS word, COUNT(words) AS


count;

STORE word_count INTO '/user/cloudera/word_count_output' USING PigStorage(',');

HDFS Commands
1. Upload Input File
hdfs dfs -put [Link] /user/cloudera/

2. Verify File
hdfs dfs -ls /user/cloudera

3. Remove Old Output


hdfs dfs -rm -r /user/cloudera/word_count_output
4. View Output
hdfs dfs -cat /user/cloudera/word_count_output/part-r-00000

Execution Steps
1. Start Pig using terminal:
pig
2. Enter Pig script commands
3. Execute script
4. Output is generated in HDFS

Sample Input and Output


Input:
hello world hello
hadoop pig hadoop

Output:
hello,2
world,1
hadoop,2
pig,1

Applications
 Log file analysis
 Text processing
 Data transformation
 ETL operations
 Big data analytics

Result
The Word Count operation was successfully implemented using Apache Pig, and the
frequency of words was generated and stored in HDFS.

Conclusion
Apache Pig provides a simple and efficient way to process large datasets using high-level
scripts. It reduces the complexity of writing MapReduce programs and enables faster
development of big data applications.

🧪 Experiment No: 7

Title:
Write Pig Latin scripts to perform filter, group, join, aggregation, and sorting operations
on datasets.

🎯 Aim

To design and execute Pig Latin scripts that perform data selection, transformation,
aggregation, integration, and ordering using large datasets stored in HDFS, thereby
understanding how high-level data flow operations are executed in Hadoop using Apache
Pig.

📚 In-Depth Theory of Operations

🔹 1. Filtering (Data Selection Phase)

Filtering is a data reduction operation used to extract only relevant records from a dataset
based on a condition.
 It minimizes data size before further processing
 Improves performance by avoiding unnecessary computation
 Equivalent to WHERE clause in SQL
👉 In this experiment:
We filter employees where age > 30, meaning only senior employees are selected for further
analysis.

🔹 2. Grouping (Data Organization Phase)


Grouping rearranges data into collections based on a key.
 Produces a bag of tuples for each key
 Required before aggregation
 Equivalent to GROUP BY in SQL
👉 Here:
Employees are grouped based on dept_id, so all employees belonging to the same
department are processed together.

🔹 3. Aggregation (Analytical Phase)

Aggregation computes summary values from grouped data.


 Requires grouping as a prerequisite
 Common functions: COUNT, SUM, AVG, MIN, MAX
 Used in analytics and reporting
👉 In this experiment:
We calculate the number of employees in each department, which helps in organizational
analysis.

🔹 4. Join (Data Integration Phase)

Join combines two or more datasets based on a common attribute.


 Used to enrich data
 Similar to relational database joins
 Can be inner, outer, left, right joins
👉 Here:
Employee data is joined with department data using dept_id, allowing us to attach
department names to employees.

🔹 5. Sorting (Data Ordering Phase)

Sorting arranges records in a specified order.


 Helps in ranking and prioritization
 Supports ascending (ASC) and descending (DESC)
 Equivalent to ORDER BY in SQL
👉 In this experiment:
Employees are sorted by age in descending order, identifying the oldest employees first.

💻 Pig Script with Detailed Outputs & Explanation

🔹 Step 1: Load Data into Pig

employees = LOAD '/pigdata/[Link]'


USING PigStorage(',')
AS (id:int, name:chararray, age:int, dept_id:int);

departments = LOAD '/pigdata/[Link]'


USING PigStorage(',')
AS (dept_id:int, dept_name:chararray);
👉 This step defines schema and loads structured data from HDFS into Pig relations.

🔹 Step 2: Display Employee Data

DUMP employees;
Output:
(1,John,30,101)
(2,Jane,28,102)
(3,Mark,35,101)
(4,Susan,25,103)
(5,Paul,40,102)
👉 Confirms successful loading and schema correctness.

🔹 Step 3: Filtering Employees (age > 30)

filtered = FILTER employees BY age > 30;


DUMP filtered;
Output:
(3,Mark,35,101)
(5,Paul,40,102)
👉 Only records satisfying the condition are retained.
👉 Reduces dataset size → improves efficiency of further operations.
🔹 Step 4: Grouping by Department

grouped = GROUP employees BY dept_id;


DUMP grouped;
Output:
(101,{(3,Mark,35,101),(1,John,30,101)})
(102,{(5,Paul,40,102),(2,Jane,28,102)})
(103,{(4,Susan,25,103)})
👉 Each department ID now maps to a bag of employee tuples.
👉 This structure is necessary for performing aggregation.

🔹 Step 5: Aggregation (Count Employees per Department)

counts = FOREACH grouped


GENERATE group AS dept_id, COUNT(employees) AS total;

DUMP counts;
Output:
(101,2)
(102,2)
(103,1)
👉 Shows how many employees exist in each department.
👉 Demonstrates Pig’s capability for analytical queries.

🔹 Step 6: Join Employees with Departments

joined = JOIN employees BY dept_id,


departments BY dept_id;

DUMP joined;
Output:
(1,John,30,101,101,HR)
(3,Mark,35,101,101,HR)
(2,Jane,28,102,102,Finance)
(5,Paul,40,102,102,Finance)
(4,Susan,25,103,103,Engineering)
👉 Combines employee details with department names.
👉 Transforms raw data into meaningful business information.
🔹 Step 7: Sorting Employees by Age (Descending)

sorted = ORDER employees BY age DESC;


DUMP sorted;
Output:
(5,Paul,40,102)
(3,Mark,35,101)
(1,John,30,101)
(2,Jane,28,102)
(4,Susan,25,103)
👉 Data is arranged from highest age to lowest.
👉 Useful for ranking and analysis.

🔹 Step 8: Store Final Output

STORE joined INTO '/pigdata/output'


USING PigStorage(',');
👉 Saves processed data into HDFS for future use.

🌟 WEEK–8 LAB RECORD

Experiment: Creating Hive Tables and Loading Data

Aim
To create internal and external Hive tables and load data into them using CSV and Sqoop.

Theory
🔹 Introduction to Hive

Hive is a data warehouse tool built on Hadoop used to process structured data using SQL-
like language called HiveQL.
🔹 Hive Architecture

1. Hive Clients:
Apache Hive supports different types of client applications for performing queries on
the Hive. These clients can be categorized into three types:
 Thrift Clients: As Hive server is based on Apache Thrift, it can serve the request from
all those programming language that supports Thrift.
 JDBC Clients: Hive allows Java applications to connect to it using the JDBC driver
which is defined in the class [Link].
 ODBC Clients: The Hive ODBC Driver allows applications that support the ODBC
protocol to connect to Hive. (Like the JDBC driver, the ODBC driver uses Thrift to
communicate with the Hive server.)
2. Hive Services:
Hive provides many services as shown in the image above. Let us have a look at each of
them:
 Hive CLI (Command Line Interface): This is the default shell provided by the Hive
where you can execute your Hive queries and commands directly.
 Apache Hive Web Interfaces: Apart from the command line interface, Hive also
provides a web based GUI for executing Hive queries and commands.
 Hive Server: Hive server is built on Apache Thrift and therefore, is also referred as
Thrift Server that allows different clients to submit requests to Hive and retrieve the
final result.
 Apache Hive Driver: It is responsible for receiving the queries submitted through the
CLI, the web UI, Thrift, ODBC or JDBC interfaces by a client. Then, the driver
passes the query to the compiler where parsing, type checking and semantic analysis
takes place with the help of schema present in the metastore. In the next step, an
optimized logical plan is generated in the form of a DAG (Directed Acyclic Graph) of
map-reduce tasks and HDFS tasks. Finally, the execution engine executes these tasks
in the order of their dependencies, using Hadoop.
 Metastore: You can think metastore as a central repository for storing all the Hive
metadata information. Hive metadata includes various types of information like
structure of tables and the partitions along with the column, column type, serializer
and deserializer which is required for Read/Write operation on the data present in
HDFS. The metastore comprises of two fundamental units:
o A service that provides metastore access to other Hive services.

o Disk storage for the metadata which is separate from HDFS storage.

🔹 Types of Tables

Internal Table
 Managed by Hive
 Data deleted when table is dropped
External Table
 Only metadata managed
 Data remains even if table is dropped

Procedure
🔹 Step 1: Create Internal Table

Query
CREATE TABLE employee (
id INT,
name STRING,
salary FLOAT,
dept STRING
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
STORED AS TEXTFILE;
Output
OK
Time taken: 0.532 seconds

🔹 Step 2: Show Tables

Query
SHOW TABLES;
Output
employee

🔹 Step 3: Describe Table

Query
DESCRIBE employee;
Output
id int
name string
salary float
dept string

🔹 Step 4: Create External Table

Query
CREATE EXTERNAL TABLE employee_ext (
id INT,
name STRING,
salary FLOAT,
dept STRING
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
LOCATION '/user/hive/external/employee';
Output
OK
Time taken: 0.421 seconds

🔹 Step 5: Load CSV Data

Query
LOAD DATA LOCAL INPATH '/home/hive/[Link]'
INTO TABLE employee;
Output
Loading data to table [Link]
OK
Time taken: 1.24 seconds

🔹 Step 6: View Data

Query
SELECT * FROM employee;
Output
1 Alice 50000 HR
2 Bob 60000 IT
3 John 55000 Finance
4 Sam 62000 IT
5 Meera 70000 HR

🌟 WEEK–9 LAB RECORD

Experiment: Hive Operations and Joins

Aim
To perform filtering, updating, and join operations in Hive.

Theory
🔹 Hive Operations

Filtering
 Uses WHERE clause to retrieve specific data
Updating
 Hive does not support direct UPDATE
 Uses INSERT OVERWRITE

🔹 Joins in Hive

Inner Join
 Returns matching records
Left Outer Join
 Returns all records from left table
Right Outer Join
 Returns all records from right table
Full Outer Join
 Returns all records from both tables

Procedure

🔹 Step 1: Filtering Data

Query
SELECT * FROM employee
WHERE salary > 55000;
Output
2 Bob 60000 IT
4 Sam 62000 IT
5 Meera 70000 HR

🔹 Step 2: Filtering with Multiple Conditions

Query
SELECT * FROM employee
WHERE dept='IT' AND salary > 60000;
Output
4 Sam 62000 IT

🔹 Step 3: Filtering using LIKE

Query
SELECT * FROM employee
WHERE name LIKE 'A%';
Output
1 Alice 50000 HR
🔹 Step 4: Update Data

Query
INSERT OVERWRITE TABLE employee
SELECT id, name, salary+1000, dept
FROM employee;
Output
Query executed successfully

🔹 Step 5: Verify Update

Query
SELECT * FROM employee;
Output
1 Alice 51000 HR
2 Bob 61000 IT
3 John 56000 Finance
4 Sam 63000 IT
5 Meera 71000 HR

🔹 Step 6: Create Department Table

Query
CREATE TABLE department (
dept_id INT,
dept_name STRING
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ',';
Output
OK
🔹 Step 7: Insert Data

Query
INSERT INTO department VALUES
(1,'HR'),
(2,'IT'),
(3,'Finance');
Output
3 rows inserted

🔹 Step 8: Inner Join

Query
SELECT [Link], d.dept_name
FROM employee e
JOIN department d
ON [Link] = d.dept_id;
Output
Alice HR
Bob IT
John Finance

🔹 Step 9: Left Outer Join

Query
SELECT [Link], d.dept_name
FROM employee e
LEFT OUTER JOIN department d
ON [Link] = d.dept_id;
Output
Alice HR
Bob IT
John Finance
Sam NULL
Meera NULL

🔹 Step 10: Right Outer Join

Query
SELECT [Link], d.dept_name
FROM employee e
RIGHT OUTER JOIN department d
ON [Link] = d.dept_id;
Output
Alice HR
Bob IT
John Finance

🔹 Step 11: Full Outer Join

Query
SELECT [Link], d.dept_name
FROM employee e
FULL OUTER JOIN department d
ON [Link] = d.dept_id;
Output
Alice HR
Bob IT
John Finance
Sam NULL
Meera NULL
NULL Marketing
Result
Thus, Hive operations like filtering, updating, and different join operations were successfully
performed.

**WEEK-10**

**Title:** Implement the following PySpark Operations


**i)** SparkSession
**ii)** SparkContext
**iii)** Ways to Create RDD

1. Driver
The driver is the master node that controls the execution of parallel operations on the cluster.
It is responsible for:
 Converting user programs into tasks.
 Scheduling tasks on executors.
 Coordinating between executors.
 Tracking the progress of tasks and handling failures.
The driver runs the main function of the application and maintains the SparkContext, which
is the primary entry point for Spark functionality.
2. SparkContext
SparkContext is the gateway to all Spark functionalities. It is responsible for:
 Connecting to the cluster manager.
 Acquiring resources on the cluster.
 Creating RDDs and performing transformations and actions on them.
When you write a Spark application, the first thing you do is create a SparkContext object.
3. Cluster Manager
The cluster manager allocates resources across the cluster. Spark supports several cluster
managers:
 Standalone Cluster Manager: A simple cluster manager is included with Spark that
makes it easy to set up a cluster.
 Apache Mesos: A general cluster manager that can run Hadoop MapReduce and other
applications.
 Hadoop YARN: The resource manager in Hadoop 2.0 that manages the resources of a
Hadoop cluster.
4. Executors
Executors are worker nodes in the cluster that run individual tasks. Each application has its
own executors that:
 Execute code assigned to them by the driver.
 Report the status of computation and storage to the driver.
Executors store data for RDDs that are cached by user programs
through [Link]() or [Link]().
5. Tasks
Tasks are the smallest unit of work in Spark. They are a part of a job, which is divided into
stages. Each stage is further divided into tasks that run on individual executors. Tasks are
assigned to executors based on data locality and resource availability.

## **AIM**
To study and implement the basic operations of **PySpark**, namely **SparkSession**,
**SparkContext**, and different **ways of creating RDDs (Resilient Distributed Datasets)**
for distributed data processing.

---

## **SOFTWARE REQUIREMENTS**
- **Operating System:** Windows / Linux / macOS
- **Tool:** Google Colab / Jupyter Notebook
- **Language:** Python
- **Technology:** Apache Spark using PySpark
- **Libraries Required:**
- pyspark

---

## **DESCRIPTION**

### **Introduction to PySpark**


PySpark is the Python API for **Apache Spark**, which is an open-source distributed
computing framework used for processing large amounts of data quickly and efficiently. It
allows users to perform big data processing using Python in an easy and understandable way.
PySpark supports distributed computation, fault tolerance, in-memory processing, and
parallel execution, making it much faster than traditional data processing techniques for
large-scale datasets.

PySpark is widely used in **data engineering, machine learning, data analysis, and real-time
data processing**. It provides powerful components such as **SparkSession**,
**SparkContext**, **RDDs**, **DataFrames**, and **Spark SQL**.

In this experiment, the focus is on understanding the **basic building blocks of PySpark**,
which are essential for working with Spark applications.
---

### **1. SparkSession**


**SparkSession** is the main entry point to work with PySpark. It is used to create Spark
applications and provides a single unified interface for interacting with Spark. It combines the
functionalities of earlier contexts such as **SQLContext**, **HiveContext**, and
**SparkContext**.

Using SparkSession, we can:


- Start a Spark application
- Create DataFrames
- Read structured data from files
- Execute SQL queries
- Configure Spark settings

In simple terms, **SparkSession acts like the gateway to Spark functionality**.

**Purpose of SparkSession:**
- To initialize the Spark environment
- To provide access to DataFrame and SQL operations
- To simplify Spark application development

---

### **2. SparkContext**


**SparkContext** is the core engine of a Spark application. It establishes the connection
between the Python program and the Spark cluster. It is responsible for resource
management, task scheduling, and execution of operations across distributed systems.

SparkContext allows us to:


- Create RDDs
- Distribute data across the cluster
- Perform parallel operations
- Manage Spark execution

Although SparkSession is commonly used in modern PySpark, **SparkContext remains the


backbone for low-level Spark operations**, especially for working with **RDDs**.

**Purpose of SparkContext:**
- To connect the application to Spark
- To control distributed execution
- To create and manipulate RDDs

---

### **3. RDD (Resilient Distributed Dataset)**


An **RDD** is the fundamental data structure in Spark. It is a distributed collection of data
stored across multiple nodes, which can be processed in parallel. RDDs are fault-tolerant,
meaning that if a part of the data is lost, Spark can recreate it using lineage information.

RDDs are called:


- **Resilient** because they can recover from failures
- **Distributed** because data is spread across different nodes
- **Dataset** because they represent a collection of data

RDDs support two main types of operations:


- **Transformations** – such as `map()`, `filter()`, `flatMap()`
- **Actions** – such as `collect()`, `count()`, `take()`

---
## **Ways to Create RDD**
There are multiple ways to create an RDD in PySpark. Some of the common methods are:

### **a) Creating RDD from a Python Collection**


A list, tuple, or other Python collection can be converted into an RDD using the `parallelize()`
method of SparkContext.

**Example:**
A Python list like `[10, 20, 30, 40]` can be distributed as an RDD.

**Use:**
This method is useful for small datasets and learning purposes.

---

### **b) Creating RDD from an External Text File**


RDDs can be created by reading data from an external file such as a text file using the
`textFile()` method.

**Example:**
A text file stored in local storage or HDFS can be read line by line into an RDD.

**Use:**
This method is used when data is available in files and needs distributed processing.

---

### **c) Creating RDD from Existing DataFrame**


An RDD can also be created from a DataFrame by using the `.rdd` property.
**Example:**
A DataFrame created from structured data can be converted into an RDD for lower-level
operations.

**Use:**
This is useful when switching from structured processing to RDD-based transformations.

---

### **d) Creating RDD using Empty RDD**


An empty RDD can also be created when no initial data is available.

**Use:**
This is useful in special cases where data will be added later or for testing purposes.

---

**WEEK-11**

**Title:** Perform the following PySpark Transformations


**i)** Convert RDD to DataFrame
**ii)** PySpark Read CSV File into DataFrame
**iii)** drop() and dropDuplicates()
**iv)** PySpark orderBy() and sort()
**v)** map and flatMap
**vi)** PySpark GroupBy, sort and join

---
## **AIM**
To perform and understand various **PySpark transformations and DataFrame operations**
such as converting an **RDD into a DataFrame**, reading a **CSV file into a DataFrame**,
using **drop()** and **dropDuplicates()**, applying **orderBy()** and **sort()**, and
performing **map**, **flatMap**, **groupBy**, and **join** operations for distributed
data processing.

---

## **SOFTWARE REQUIREMENTS**
- **Operating System:** Windows / Linux / macOS
- **Tool:** Google Colab / Jupyter Notebook
- **Programming Language:** Python
- **Technology:** Apache Spark using PySpark
- **Library Required:** pyspark

---

## **DESCRIPTION**

### **Introduction to PySpark Transformations**


PySpark provides powerful operations for handling large volumes of data in a distributed
environment. It supports both **RDD-based transformations** and **DataFrame-based
operations**, making it suitable for large-scale data analysis and data preprocessing.

Transformations in PySpark are operations that create a **new dataset from an existing
dataset**. These transformations are executed lazily, meaning they are not performed
immediately until an action is called.

In this experiment, different commonly used PySpark transformations and DataFrame


operations are performed to understand how data can be converted, cleaned, sorted, grouped,
and combined.
---

### **1. Convert RDD to DataFrame**


An **RDD (Resilient Distributed Dataset)** is the basic distributed data structure in Spark,
while a **DataFrame** is a higher-level structured data format with rows and columns.

RDDs can be converted into DataFrames to make data processing easier and more efficient.
DataFrames provide better optimization and are easier to use for structured data analysis.

**Purpose:**
- To convert unstructured or semi-structured distributed data into tabular form
- To perform SQL-like operations on distributed data
- To improve readability and performance

**Example Concept:**
A list of employee records stored as an RDD can be converted into a DataFrame with column
names such as **ID**, **Name**, and **Department**.

---

### **2. PySpark Read CSV File into DataFrame**


PySpark allows reading data from external files such as CSV files directly into a DataFrame.

A CSV file is a common format used for storing structured data. By reading a CSV file into a
DataFrame, the data can be processed, filtered, sorted, and analyzed easily.

**Purpose:**
- To load structured data from external storage
- To perform distributed processing on file-based data
- To simplify analysis using DataFrame operations
**Example Concept:**
A student details CSV file containing columns like **Roll No**, **Name**, and **Marks**
can be loaded into a PySpark DataFrame.

---

### **3. drop() and dropDuplicates()**


Data cleaning is an important step in data processing. PySpark provides methods to remove
unnecessary columns and duplicate rows.

#### **drop()**
The **drop()** function is used to remove one or more columns from a DataFrame.

**Purpose:**
- To eliminate unwanted columns
- To simplify the dataset
- To keep only relevant information

#### **dropDuplicates()**
The **dropDuplicates()** function is used to remove duplicate rows from a DataFrame.

**Purpose:**
- To clean repeated data
- To improve data quality
- To ensure unique records in analysis

**Example Concept:**
If the same student record appears multiple times, **dropDuplicates()** removes the
repeated entries.
---

### **4. PySpark orderBy() and sort()**


PySpark provides sorting functions to arrange data in ascending or descending order.

#### **orderBy()**
The **orderBy()** function is used to sort a DataFrame based on one or more columns.

#### **sort()**
The **sort()** function performs sorting similar to **orderBy()**.

Both are commonly used to organize data for reporting and analysis.

**Purpose:**
- To arrange records systematically
- To identify highest or lowest values
- To improve clarity in output presentation

**Example Concept:**
Student records can be sorted by **marks** in ascending or descending order.

---

### **5. map and flatMap**


These are common **RDD transformations** used for processing elements.

#### **map()**
The **map()** function applies a given function to each element of the RDD and returns a
new RDD.
**Purpose:**
- To transform each element individually
- To modify data values
- To create a new dataset with changed output

**Example Concept:**
If an RDD contains numbers, **map()** can square each number.

#### **flatMap()**
The **flatMap()** function is similar to **map()**, but it can return multiple values for each
input element. The result is flattened into a single RDD.

**Purpose:**
- To split data into multiple parts
- To extract words from lines of text
- To flatten nested output into a simple list

**Example Concept:**
If an RDD contains sentences, **flatMap()** can split each sentence into words.

---

### **6. PySpark GroupBy, sort and join**


These operations are used for data aggregation, arrangement, and combining datasets.

#### **groupBy()**
The **groupBy()** function groups rows having the same values in one or more columns.

**Purpose:**
- To categorize data into groups
- To perform aggregate operations such as count, sum, and average
- To summarize data effectively

**Example Concept:**
Students can be grouped by **department** to count how many students belong to each
department.

#### **sort()**
After grouping, the results can be sorted for better presentation.

**Purpose:**
- To arrange grouped results in an ordered way
- To make analysis easier

#### **join()**
The **join()** function combines two DataFrames based on a common column.

**Purpose:**
- To merge related datasets
- To combine information from multiple tables
- To perform relational-style operations

**Example Concept:**
A student DataFrame and a marks DataFrame can be joined using **Student ID**.

---

**WEEK-12**
**Title:** Build a Machine Learning Model using PySpark ML Library

---

## **AIM**
To build and implement a **Machine Learning model using the PySpark ML library** by
creating a dataset, converting input columns into feature vectors, training the model, and
generating predictions in a distributed environment.

---

## **SOFTWARE REQUIREMENTS**
- **Operating System:** Windows / Linux / macOS
- **Tool:** Google Colab / Jupyter Notebook
- **Programming Language:** Python
- **Technology:** Apache Spark using PySpark
- **Libraries Required:** pyspark, [Link]

---

## **DESCRIPTION**

### **Introduction to PySpark ML Library**


PySpark provides a powerful **Machine Learning library**, called **[Link]**, which
is used to build scalable machine learning applications on large datasets. It supports important
stages of machine learning such as **data preparation, feature transformation, model training,
prediction, and evaluation**.

Unlike traditional machine learning tools that work on a single system, PySpark performs
processing in a **distributed environment**, making it suitable for handling large-scale data
efficiently.
In this experiment, a machine learning model is built using the PySpark ML library. The
given program creates a dataset, prepares the feature columns, trains a classification model,
and predicts the output.

---

### **Machine Learning Model Used in the Program**


The program uses a **classification model** from the PySpark ML library. Since the dataset
contains a **label column** and two input columns **x1** and **x2**, the model learns
the relationship between the input features and the target output.

The labels in the dataset are represented as **0 and 1**, which indicates that the problem
belongs to **binary classification**. For this purpose, the program uses the **Logistic
Regression algorithm** provided by PySpark.
---

### **Explanation of the Program**

#### **1. Creating Spark Session**


The program begins by creating a **SparkSession**.

SparkSession is the entry point for working with PySpark. It initializes the Spark application
and allows the user to perform DataFrame operations and machine learning tasks.

**Purpose:**
- To start the PySpark environment
- To enable DataFrame and ML operations
- To act as the main interface for Spark programming

---

#### **2. Creating the Dataset**


A small sample dataset is created using `createDataFrame()`.

The dataset contains:


- **label** → target/output column
- **x1, x2** → input feature columns

This dataset is used for training and prediction.

**Purpose:**
- To provide structured input data for the model
- To represent the features and corresponding class labels
- To make the data ready for machine learning processing

---

#### **3. Feature Transformation using VectorAssembler**


Machine learning algorithms in PySpark do not take multiple individual columns directly as
input. They require all input columns to be combined into a **single feature vector
column**.

This is done using **VectorAssembler**.

In the given program:


- `x1` and `x2` are combined
- a new column called **features** is created

**Purpose:**
- To transform multiple feature columns into one vector column
- To prepare the dataset in the format required by PySpark ML
- To simplify model training
---

#### **4. Model Training**


After creating the feature vector, the model is trained using the prepared dataset.

The program uses **Logistic Regression** from `[Link]`. The model


learns patterns from the input data and builds a relationship between the feature vector and
the label column.

**Purpose:**
- To train the machine learning model using the input data
- To identify the relationship between features and output
- To prepare the model for prediction

---

#### **5. Prediction**


Once the model is trained, it is applied to the dataset to generate predictions.

The output shows:


- **features**
- **prediction**

This indicates the class predicted by the model for each input record.

**Purpose:**
- To test the working of the trained model
- To generate the predicted class labels
- To observe how the machine learning model behaves on the data
---

### **How the Program Satisfies the Aim**


The aim is to **build any machine learning model using the PySpark ML library**.
The given program satisfies this aim because it includes all the essential stages of machine
learning in PySpark:

- Spark initialization
- Data creation
- Feature preparation
- Model building
- Prediction generation

Thus, although the title is general, the content is correctly mapped to the actual implemented
program.

---

## **ARCHITECTURE DIAGRAM (PYSPARK ML WORKFLOW)**

``` id="71024"
+----------------------+
| Input Dataset |
| label, x1, x2 |
+----------+-----------+
|
v
+----------------------+
| SparkSession |
| Initialize Spark App |
+----------+-----------+
|
v
+----------------------+
| Create DataFrame |
| Structured Dataset |
+----------+-----------+
|
v
+----------------------+
| VectorAssembler |
| x1, x2 -> features |
+----------+-----------+
|
v
+----------------------+
| ML Model Training |
| Logistic Regression |
+----------+-----------+
|
v
+----------------------+
| Prediction |
| Output Classes |
+----------------------+

You might also like