0% found this document useful (0 votes)
6 views48 pages

PageRank and HITS Algorithm Analysis

The document outlines five experiments focusing on different algorithms and technologies related to web page analysis and data management. It covers PageRank and HITS algorithms for evaluating web link structures, caching algorithms for optimizing web performance, and the installation and file management tasks in Hadoop's HDFS. Each experiment includes theoretical explanations and code implementations to illustrate the concepts discussed.

Uploaded by

Aviral Sharma
Copyright
© © All Rights Reserved
We take content rights seriously. If you suspect this is your content, claim it here.
Available Formats
Download as DOCX, PDF, TXT or read online on Scribd
0% found this document useful (0 votes)
6 views48 pages

PageRank and HITS Algorithm Analysis

The document outlines five experiments focusing on different algorithms and technologies related to web page analysis and data management. It covers PageRank and HITS algorithms for evaluating web link structures, caching algorithms for optimizing web performance, and the installation and file management tasks in Hadoop's HDFS. Each experiment includes theoretical explanations and code implementations to illustrate the concepts discussed.

Uploaded by

Aviral Sharma
Copyright
© © All Rights Reserved
We take content rights seriously. If you suspect this is your content, claim it here.
Available Formats
Download as DOCX, PDF, TXT or read online on Scribd

EXPERIMENT NUMBER 1

AIM OF EXPERIMENT:
Analyze the link structure of web pages using PageRank algorithms.

THEORY:
PageRank is an algorithm developed by Larry Page and Sergey Brin, the founders of Google, to
rank web pages in search engine results. It evaluates the importance of web pages based on their
link structure, where a link from one page to another is seen as a vote for the latter. The more
votes (links) a page receives, the more important it is considered. To prevent the algorithm from
getting stuck in cycles, it introduces a damping factor (typically around 0.85), representing the
probability that a user will continue clicking on links. The PageRank of a page is calculated
using the following formula:

In this formula, PR(A)PR(A)PR(A) is the PageRank of page A, d is the damping factor, PR(Bi)
is the PageRank of each page BiB_iBi that links to page A, and L(Bi) is the number of outbound
links from page Bi. This algorithm is widely applied in search engines for ranking web pages, in
social networks for evaluating user influence, and in information retrieval systems to enhance
search results.

CODE:

# Mapper Function for PageRank


def mapper(url, page_info):
"""
Distribute rank from the URL to all outgoing links.

Parameters:
url (str): The URL of the website.
page_info (dict): Dictionary containing 'rank' of the page and 'outgoing_links'.
"""
rank = page_info['rank']
outgoing_links = page_info['outgoing_links']
num_links = len(outgoing_links)

# Distribute rank to each linked website


for linked_url in outgoing_links:
yield (linked_url, rank / num_links)

# Emit the website itself to ensure it appears in the final output


yield (url, 0)
# Reducer Function for PageRank
def reducer(url, incoming_ranks):
"""
Compute the new rank of each website based on incoming ranks.

Parameters:
url (str): The URL of the website.
incoming_ranks (list): List of rank values from incoming links.
"""
damping_factor = 0.85 # Typically used to ensure rank stability
new_rank = (1 - damping_factor) + damping_factor * sum(incoming_ranks)
yield (url, new_rank)

# Example Data Input for 10 websites


pages = {
'[Link]': {'rank': 1.0, 'outgoing_links': ['[Link]', '[Link]',
'[Link]']},
'[Link]': {'rank': 1.0, 'outgoing_links': ['[Link]', '[Link]',
'[Link]']},
'[Link]': {'rank': 1.0, 'outgoing_links': ['[Link]', '[Link]']},
'[Link]': {'rank': 1.0, 'outgoing_links': ['[Link]', '[Link]',
'[Link]']},
'[Link]': {'rank': 1.0, 'outgoing_links': ['[Link]',
'[Link]']},
'[Link]': {'rank': 1.0, 'outgoing_links': ['[Link]',
'[Link]']},
'[Link]': {'rank': 1.0, 'outgoing_links': ['[Link]', '[Link]']},
'[Link]': {'rank': 1.0, 'outgoing_links': ['[Link]',
'[Link]']},
'[Link]': {'rank': 1.0, 'outgoing_links': ['[Link]',
'[Link]']},
'[Link]': {'rank': 1.0, 'outgoing_links': ['[Link]',
'[Link]']}
}

# Map Step
print("Mapping Step:")
mapped_data = []
for url, page_info in [Link]():
url_contributions = list(mapper(url, page_info))
mapped_data.extend(url_contributions)
print(f"{url} -> {url_contributions}")

# Organize data by URL for reducing


from collections import defaultdict
grouped_data = defaultdict(list)
for url, rank_contribution in mapped_data:
grouped_data[url].append(rank_contribution)

# Reduce Step
print("\nReducing Step:")
new_ranks = {}
for url, incoming_ranks in grouped_data.items():
for output in reducer(url, incoming_ranks):
new_ranks[output[0]] = output[1]
print(f"{url} received contributions {incoming_ranks} -> New Rank: {new_ranks[url]:.4f}")

# Final PageRank Results


print("\nFinal Computed PageRank for each website:")
sorted_ranks = sorted(new_ranks.items(), key=lambda x: -x[1])
for url, rank in sorted_ranks:
print(f"{url}: {rank:.4f}")

# Best and Least PageRank URLs


best_url = sorted_ranks[0]
least_url = sorted_ranks[-1]

print(f"\nBest URL: {best_url[0]} with PageRank: {best_url[1]:.4f}")


print(f"Least URL: {least_url[0]} with PageRank: {least_url[1]:.4f}")
EXPERIMENT NUMBER 2
AIM OF EXPERIMENT:
Analyze web links using HITS algorithms.

THEORY:

The HITS (Hyperlink-Induced Topic Search) algorithm, introduced by Jon Kleinberg, identifies
two types of nodes in a network: hubs and authorities. Hubs are web pages that link to many
other pages, while authorities are pages that are linked to by many hubs. The HITS algorithm
iteratively updates hub and authority scores until convergence. Hubs receive scores based on the
authority scores of the pages they link to, and authorities receive scores based on the hub scores
of the pages linking to them. This can be represented by the following formulas:

In these formulas, hih_ihi represents the hub score of page i, aj is the authority score of page j,
L(i) is the set of pages linked by page iii, and B(j) is the set of pages that link to page j. This
method is useful in search engines for improving the relevancy of search results, in
recommendation systems based on item connectivity, and in topic detection and classification.

CODE:

import numpy as np

# HITS Algorithm Implementation


def compute_hits(pages):
# Initialize authority and hub scores
authority_scores = {url: 1.0 for url in pages}
hub_scores = {url: 1.0 for url in pages}

# Number of iterations for convergence


num_iterations = 10

for _ in range(num_iterations):
# Update authority scores
for url in pages:
authority_scores[url] = sum(hub_scores[link] for link in pages[url]['outgoing_links'])

# Update hub scores


for url in pages:
hub_scores[url] = sum(authority_scores[link] for link in pages if url in pages[link]
['outgoing_links'])

# Normalize authority scores


norm = [Link](sum(score ** 2 for score in authority_scores.values()))
for url in authority_scores:
authority_scores[url] /= norm

# Normalize hub scores


norm = [Link](sum(score ** 2 for score in hub_scores.values()))
for url in hub_scores:
hub_scores[url] /= norm

return authority_scores, hub_scores

# Example Data Input for Websites


pages = {
'[Link]': {'outgoing_links': ['[Link]', '[Link]',
'[Link]']},
'[Link]': {'outgoing_links': ['[Link]', '[Link]',
'[Link]']},
'[Link]': {'outgoing_links': ['[Link]', '[Link]']},
'[Link]': {'outgoing_links': ['[Link]', '[Link]']},
'[Link]': {'outgoing_links': ['[Link]', '[Link]']},
'[Link]': {'outgoing_links': []},
'[Link]': {'outgoing_links': []},
'[Link]': {'outgoing_links': ['[Link]']},
'[Link]': {'outgoing_links': []},
'[Link]': {'outgoing_links': []}
}

# Compute HITS scores


authority_scores, hub_scores = compute_hits(pages)

# Display the scores


print("Authority Scores:")
for url, score in sorted(authority_scores.items(), key=lambda x: -x[1]):
print(f"{url}: {score:.4f}")

print("\nHub Scores:")
for url, score in sorted(hub_scores.items(), key=lambda x: -x[1]):
print(f"{url}: {score:.4f}")

# Find best and least authority


best_authority = max(authority_scores.items(), key=lambda x: x[1])
least_authority = min(authority_scores.items(), key=lambda x: x[1])
# Find best and least hub
best_hub = max(hub_scores.items(), key=lambda x: x[1])
least_hub = min(hub_scores.items(), key=lambda x: x[1])

# Display best and least authority and hub


print(f"\nBest Authority: {best_authority[0]} with score {best_authority[1]:.4f}")
print(f"Least Authority: {least_authority[0]} with score {least_authority[1]:.4f}")

print(f"\nBest Hub: {best_hub[0]} with score {best_hub[1]:.4f}")


print(f"Least Hub: {least_hub[0]} with score {least_hub[1]:.4f}")
EXPERIMENT NUMBER 3
AIM OF EXPERIMENT:
Explore web caching algorithms, such as LRU, FIFO, and LFU.

THEORY:

Caching is a technique employed to store frequently accessed data in a temporary storage area to
enhance retrieval times and reduce latency. Among the common web caching algorithms, Least
Recently Used (LRU) maintains a list of cached pages and tracks their access order, evicting the
least recently used page when the cache is full. This algorithm adapts well to changing access
patterns but requires additional memory for tracking.

First In, First Out (FIFO) is simpler, maintaining a queue of cached pages and removing the
oldest page when the cache reaches its limit. However, it may evict frequently used pages
regardless of their access patterns. Least Frequently Used (LFU) tracks how often pages are
accessed, evicting the least frequently used page when the cache is full. This makes it better
suited for stable access patterns, albeit at the cost of complexity.

Understanding and analyzing web link structures with algorithms like PageRank and HITS
enhances the relevancy and quality of search results, while caching algorithms optimize web
applications' performance through efficient data management. Together, these techniques are
essential for developing scalable and efficient web services and search engines.

CODE:

class LRUCache:
def __init__(self, capacity):
[Link] = {}
[Link] = []
[Link] = capacity

def access_page(self, url):


if url in [Link]:
[Link](url) # Move to the front
[Link](url)
else:
if len([Link]) >= [Link]:
lru = [Link](0) # Remove least recently used
del [Link][lru]
[Link][url] = True
[Link](url)

class FIFOCache:
def __init__(self, capacity):
[Link] = {}
[Link] = []
[Link] = capacity

def access_page(self, url):


if url not in [Link]:
if len([Link]) >= [Link]:
fifo = [Link](0) # Remove first added page
del [Link][fifo]
[Link][url] = True
[Link](url)

class LFUCache:
def __init__(self, capacity):
[Link] = {}
[Link] = {}
[Link] = capacity

def access_page(self, url):


if url in [Link]:
[Link][url] += 1 # Increment access count
else:
if len([Link]) >= [Link]:
lfu = min([Link], key=[Link]) # Find least frequently used
del [Link][lfu]
del [Link][lfu]
[Link][url] = True
[Link][url] = 1

# Example Usage
lru_cache = LRUCache(3)
fifo_cache = FIFOCache(3)
lfu_cache = LFUCache(3)

web_pages = [
'[Link]',
'[Link]',
'[Link]',
'[Link]',
'[Link]',
'[Link]',
'[Link]'
]

# Accessing pages in various orders


print("LRU Cache Access:")
for page in web_pages:
print(f"Accessing {page}:")
lru_cache.access_page(page)
print(lru_cache.cache)

print("\nFIFO Cache Access:")


for page in web_pages:
print(f"Accessing {page}:")
fifo_cache.access_page(page)
print(fifo_cache.cache)

print("\nLFU Cache Access:")


for page in web_pages:
print(f"Accessing {page}:")
lfu_cache.access_page(page)
print(lfu_cache.cache)
EXPERIMENT NUMBER 4
AIM OF EXPERIMENT:
Download, install, and configure Hadoop, covering modes, scripts, and configuration files.

THEORY:
Hadoop is an open-source framework designed for distributed storage and processing of large
datasets across clusters of computers using simple programming models. It is built to scale up
from a single server to thousands of machines, each offering local computation and storage.
Hadoop consists of two main components: Hadoop Distributed File System (HDFS) for storing
data across multiple machines and MapReduce, a programming model for processing large data
sets in a parallel and distributed manner. Hadoop’s architecture enables high availability and
fault tolerance, allowing it to handle vast amounts of data efficiently. This makes it an essential
tool for big data analytics in various industries.

CODE:

1) Download Hadoop: Visit the official Apache Hadoop website and download the latest
stable release.
2) Install Java: Ensure Java is installed on your system, as Hadoop requires it. You can
check by running java -version.
3) Extract Hadoop: Use the command tar -xzvf [Link] to extract the
downloaded tarball.
4) Move Hadoop Directory: Move the extracted folder to a suitable directory, typically
/usr/local/hadoop.
5) Configure Environment Variables:

Edit the .bashrc file to set:


bash
Copy code
export HADOOP_HOME=/usr/local/hadoop
export JAVA_HOME=/usr/lib/jvm/java-x.x.x
export PATH=$PATH:$HADOOP_HOME/bin
Run source ~/.bashrc to apply the changes.

6) Configure Hadoop Files: Navigate to etc/hadoop and edit the configuration files:
i) [Link]: Set the default filesystem.
ii) [Link]: Configure replication factors and storage directories.
iii) [Link]: Specify MapReduce framework settings.
7) Start Hadoop: Use the scripts in the sbin directory:
i) Run [Link] to start HDFS services.
ii) Run [Link] to start YARN services.
8) Verify Installation: Access the web interfaces to verify the setup:
i) HDFS: [Link]
ii) YARN ResourceManager: [Link]
EXPERIMENT NUMBER 5
AIM OF EXPERIMENT:
Perform file management tasks in Hadoop, including adding, retrieving, and deleting files within
HDFS.

THEORY:
HDFS (Hadoop Distributed File System) is a scalable distributed filesystem specifically
designed to accommodate petabytes of data while operating on top of the underlying filesystem
of the operating system. One of HDFS's key features is its ability to track the physical location of
data across a network by associating the name of each data block with the rack or network switch
it resides on. This association enables Hadoop to efficiently schedule processing tasks on nodes
that contain the data or are closest to it, thereby optimizing bandwidth utilization. HDFS
provides a set of command-line utilities that function similarly to traditional Linux file
commands, serving as the primary interface for users to interact with the filesystem. Common
file management tasks in HDFS include adding files and directories, retrieving files from HDFS
to the local filesystem, and deleting files from HDFS, allowing users to manage their data
effectively within the Hadoop ecosystem.

CODE:

Before you can run Hadoop programs on data stored in HDFS, you'll need to put the data into
HDFS first. Here’s how to perform some common file management tasks in HDFS.
Step 1: Creating a Directory and Adding Files to HDFS
To create a directory in HDFS and upload a file, follow these commands. For this illustration, we
will use "chuck" as the username, but you should replace it with your actual username.
Create the directory using the command:
hadoop fs -mkdir /user/chuck.
Upload a file (e.g., [Link]) to the created directory using the command:
hadoop fs -put [Link] /user/chuck.

Step 2: Retrieving Files from HDFS


To copy files from HDFS back to the local filesystem, use the get command. For example, to
retrieve [Link], run:
hadoop fs -cat /user/chuck/[Link].

Step 3: Deleting Files from HDFS


To delete a file in HDFS, use the rm command:
hadoop fs -rm /user/chuck/[Link].

Additional Commands
To create a directory in HDFS, use the command:
hdfs dfs -mkdir /lendicse.
To add a directory to HDFS, use the command:
hdfs dfs -put lendi_english /.
Step 4: Copying Data from NFS to HDFS
To copy data from a local directory to HDFS, use the command:
hdfs dfs -copyFromLocal /home/lendi/Desktop/shakes/glossary /lendicse/.
To view the contents of a file in HDFS, use the command:
hdfs dfs -cat /lendicse/glossary.
To list items in Hadoop, use the command:
hdfs dfs -ls hdfs://localhost:9000/.
To delete files in HDFS, use the command:
hdfs dfs -rm -r /kartheek.

These commands will help you manage files within HDFS effectively.
EXPERIMENT NUMBER 6
AIM OF EXPERIMENT:
Implement matrix multiplication using Hadoop MapReduce.

THEORY:
A matrix can be represented as a relation (table) in a relational database management system
(RDBMS), where each cell in the matrix corresponds to a record in the format (i, j, value). For
instance, consider a matrix with 5 rows and 6 columns, which contains a total of 30 values.
However, if we represent this matrix as a relation, we would need to store 30 row IDs, 30
column IDs, and 30 values, effectively tripling the amount of data stored. This leads to an
important question: why store data in this format? In practice, most matrices are sparse, meaning
that not all cells contain values. As a result, we do not need to store the empty cells in the
database, making this representation much more efficient for sparse matrices.

CODE:

import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];

import [Link];

public class MatrixMultiplication {

// Mapper class
public static class MatrixMapper extends Mapper<Text, Text, CompositeKey, Text> {
private int IB; // Block size for matrix A
private int KB; // Block size for matrix B
private int JB; // Block size for matrix C

@Override
protected void setup(Context context) throws IOException, InterruptedException {
Configuration conf = [Link]();
IB = [Link]("IB", 1);
KB = [Link]("KB", 1);
JB = [Link]("JB", 1);
}

@Override
protected void map(Text key, Text value, Context context) throws IOException,
InterruptedException {
String[] keys = [Link]().split(",");
int i = [Link](keys[0]);
int k = [Link](keys[1]);
String[] values = [Link]().split(",");
int m = [Link](values[0]);
int v = [Link](values[1]);

int NIB = ([Link]() - 1) / IB + 1;


int NKB = (k - 1) / KB + 1;
int NJB = ([Link]() - 1) / JB + 1;

// Emit for matrix A


if (m == 0) {
for (int jb = 0; jb < NJB; jb++) {
[Link](new CompositeKey(i / IB, k / KB, jb, 0), new Text(i % IB + "," + k %
KB + "," + v));
}
}

// Emit for matrix B


if (m == 1) {
for (int ib = 0; ib < NIB; ib++) {
[Link](new CompositeKey(ib, k / KB, j, 1), new Text(k % KB + "," + j % JB
+ "," + v));
}
}
}
}

// Reducer class
public static class MatrixReducer extends Reducer<CompositeKey, Text, Text, IntWritable> {
private int[][] A;
private int[][] B;
private int ibase;
private int jbase;
private int rowA;
private int colA;
private int colB;

@Override
protected void setup(Context context) throws IOException, InterruptedException {
Configuration conf = [Link]();
rowA = [Link]("rowA", 1);
colA = [Link]("colA", 1);
colB = [Link]("colB", 1);
A = new int[IB][KB];
B = new int[KB][JB];
}

@Override
protected void reduce(CompositeKey key, Iterable<Text> values, Context context) throws
IOException, InterruptedException {
int m = [Link]();
if (m == 0) {
// Load A block
for (Text value : values) {
String[] data = [Link]().split(",");
int i = [Link](data[0]);
int k = [Link](data[1]);
A[i][k] = [Link](data[2]);
}
} else if (m == 1) {
// Load B block
for (Text value : values) {
String[] data = [Link]().split(",");
int k = [Link](data[0]);
int j = [Link](data[1]);
B[k][j] = [Link](data[2]);
}

// Multiply the blocks and emit the result


ibase = [Link]() * IB;
jbase = [Link]() * JB;
for (int i = 0; i < rowA; i++) {
for (int j = 0; j < colB; j++) {
int sum = 0;
for (int k = 0; k < colA; k++) {
sum += A[i][k] * B[k][j];
}
if (sum != 0) {
[Link](new Text(ibase + i + "," + jbase + j), new IntWritable(sum));
}
}
}
}
}
}

// Main method to configure and run the job


public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
[Link]("IB", 2); // Set block size for A
[Link]("KB", 2); // Set block size for B
[Link]("JB", 2); // Set block size for C
[Link]("rowA", 2); // Number of rows in A
[Link]("colA", 2); // Number of columns in A
[Link]("colB", 2); // Number of columns in B

Job job = [Link](conf, "Matrix Multiplication");


[Link]([Link]);
[Link]([Link]);
[Link]([Link]);
[Link]([Link]);
[Link]([Link]);
[Link]([Link]);
[Link]([Link]);

[Link](job, new Path(args[0])); // Input path


[Link](job, new Path(args[1])); // Output path

[Link]([Link](true) ? 0 : 1);
}
}

CompositeKey Class

import [Link];
import [Link];
import [Link];
import [Link];
import [Link];

public class CompositeKey implements WritableComparable<CompositeKey> {


private int ib;
private int kb;
private int jb;
private int m;

// Constructor, getters, setters, and other methods omitted for brevity.

@Override
public void write(DataOutput out) throws IOException {
[Link](ib);
[Link](kb);
[Link](jb);
[Link](m);
}

@Override
public void readFields(DataInput in) throws IOException {
ib = [Link]();
kb = [Link]();
jb = [Link]();
m = [Link]();
}

@Override
public int compareTo(CompositeKey o) {
int result = [Link]([Link], [Link]);
if (result == 0) {
result = [Link]([Link], [Link]);
if (result == 0) {
result = [Link]([Link], [Link]);
if (result == 0) {
result = [Link](this.m, o.m);
}
}
}
return result;
}

// toString method omitted for brevity


}
EXPERIMENT NUMBER 7
AIM OF EXPERIMENT:
Create a MapReduce program for weather data analysis.

THEORY:
Climate change has garnered significant attention for an extended period due to its far-reaching
impacts across the globe. The detrimental effects of climate change are evident in various forms,
including rising sea levels, decreased rainfall, and increased humidity. To address some of these
challenges, our proposed system utilizes Big Data technology through Hadoop. The architecture
we propose enables the processing of offline data sourced from the National Climatic Data
Centre (NCDC). This allows us to analyze historical climate data to determine the maximum and
minimum temperatures for each year and make predictions about future weather conditions.
Ultimately, we visualize our findings by plotting graphs that illustrate the maximum and
minimum temperatures for each month of a given year. This predictive model leverages
historical weather data to forecast the climate for the upcoming year, providing valuable insights
into potential future scenarios.

CODE:

//WeatherDataMapper
import [Link];
import [Link];
import [Link];

import [Link];

public class WeatherDataMapper extends Mapper<Object, Text, IntWritable, IntWritable> {


private IntWritable maxTemp = new IntWritable();
private IntWritable minTemp = new IntWritable();

@Override
public void map(Object key, Text value, Context context) throws IOException,
InterruptedException {
// Split the line by commas (assuming the format is: date,location,max_temp,min_temp)
String[] fields = [Link]().split(",");

// Check if we have the right number of fields


if ([Link] == 4) {
try {
// Extract maximum and minimum temperatures
int maxTemperature = [Link](fields[2]); // Assuming fields[2] is max_temp
int minTemperature = [Link](fields[3]); // Assuming fields[3] is min_temp
// Set the max and min temperature IntWritable values
[Link](maxTemperature);
[Link](minTemperature);

// Emit the max and min temperatures with count of 1


[Link](maxTemp, new IntWritable(1)); // For max_temp
[Link](minTemp, new IntWritable(1)); // For min_temp
} catch (NumberFormatException e) {
// Handle parsing error (optional)
}
}
}
}

//WeatherDataReducer
import [Link];
import [Link];
import [Link];

import [Link];

public class WeatherDataReducer extends Reducer<Text, FloatWritable, Text, FloatWritable> {


private FloatWritable averageTemperature = new FloatWritable();

@Override
public void reduce(Text key, Iterable<FloatWritable> values, Context context) throws
IOException, InterruptedException {
float sum = 0;
int count = 0;

// Calculate the sum of temperatures and count the entries


for (FloatWritable val : values) {
sum += [Link]();
count++;
}

// Calculate average temperature


if (count > 0) {
[Link](sum / count);
[Link](key, averageTemperature); // Output location and average temperature
}
}
}

//WeatherDataAnalysis
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];

public class WeatherDataAnalysis {


public static void main(String[] args) throws Exception {
// Check for valid arguments
if ([Link] != 2) {
[Link]("Usage: WeatherDataAnalysis <input path> <output path>");
[Link](-1);
}

// Create configuration and job


Configuration conf = new Configuration();
Job job = [Link](conf, "Weather Data Analysis");

// Set job parameters


[Link]([Link]);
[Link]([Link]);
[Link]([Link]);

// Set output key and value types


[Link]([Link]);
[Link]([Link]);

// Set input and output paths


[Link](job, new Path(args[0]));
[Link](job, new Path(args[1]));

// Run the job and wait for its completion


[Link]([Link](true) ? 0 : 1);
}
}
EXPERIMENT NUMBER 8
AIM OF EXPERIMENT:
Run a Word Count MapReduce program to understand the MapReduce paradigm.

THEORY:
MapReduce is the core component of Hadoop, serving as the programming paradigm that
enables massive scalability across hundreds or thousands of servers within a Hadoop cluster. The
concept of MapReduce is relatively straightforward, particularly for those familiar with clustered
scale-out data processing solutions. The term "MapReduce" encompasses two distinct tasks that
Hadoop programs execute. The first task is the map job, which processes a dataset and
transforms it into another dataset, breaking down individual elements into tuples (key/value
pairs). The second task is the reduce job, which takes the output from the map as input and
aggregates these data tuples into a smaller, more manageable set of tuples. As suggested by the
name MapReduce, the reduce job always follows the completion of the map job, effectively
allowing for efficient data processing and aggregation in large-scale data environments.

CODE:

//WordCountMapper
import [Link];
import [Link];
import [Link];

import [Link];

public class WordCountMapper extends Mapper<Object, Text, Text, IntWritable> {


private final static IntWritable one = new IntWritable(1);
private Text word = new Text();

@Override
public void map(Object key, Text value, Context context) throws IOException,
InterruptedException {
// Split the line into words
String[] words = [Link]().split("\\s+");

// Emit each word with a count of 1


for (String w : words) {
[Link](w);
[Link](word, one);
}
}
}

//WordCountReducer
import [Link];
import [Link];
import [Link];
import [Link];

public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {


private IntWritable result = new IntWritable();

@Override
public void reduce(Text key, Iterable<IntWritable> values, Context context) throws
IOException, InterruptedException {
int sum = 0;

// Sum the counts for the word


for (IntWritable val : values) {
sum += [Link]();
}

// Emit the word and its total count


[Link](sum);
[Link](key, result);
}
}

//WordCountDriver
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];

public class WordCount {


public static void main(String[] args) throws Exception {
// Check for valid input and output paths
if ([Link] != 2) {
[Link]("Usage: WordCount <input path> <output path>");
[Link](-1);
}

// Create a new job configuration


Configuration conf = new Configuration();
Job job = [Link](conf, "Word Count");

// Set the main class and Mapper/Reducer classes


[Link]([Link]);
[Link]([Link]);
[Link]([Link]);

// Set output key and value types


[Link]([Link]);
[Link]([Link]);

// Set input and output paths


[Link](job, new Path(args[0]));
[Link](job, new Path(args[1]));

// Submit the job and wait for completion


[Link]([Link](true) ? 0 : 1);
}
}
EXPERIMENT NUMBER 9
AIM OF EXPERIMENT:
Implement K-means clustering using MapReduce.

THEORY:
K-means clustering using Hadoop MapReduce is an effective way to handle large datasets
through distributed computing. The algorithm begins by randomly initializing K centroids from
the dataset. In the Map phase, each data point is assigned to the nearest centroid based on the
calculated distance, generating key-value pairs where the key represents the centroid and the
value is the data point. The Reduce phase aggregates these points for each centroid and
recalculates the new centroid by averaging the assigned points. This assignment and recalibration
process continues iteratively until the centroids stabilize or a set number of iterations is reached.
Implementing K-means in this way allows for efficient clustering of massive datasets while
leveraging Hadoop’s scalability and fault tolerance.

CODE:

#[Link]
#!/bin/bash

i=1
while :
do
hadoop jar ../../../../usr/lib/hadoop-mapreduce/[Link] \
-file [Link] \
-file ./[Link] \
-mapper ./[Link] \
-file ./[Link] \
-reducer ./[Link] \
-input /testMapReduce/dataset \
-output /testMapReduce/mapreduce-output$i

rm -f [Link]

hadoop fs -copyToLocal /testMapReduce/mapreduce-output$i/part-00000 [Link]

seeiftrue=$(python [Link])
if [ $seeiftrue = 1 ]; then
rm [Link]
hadoop fs -copyToLocal /testMapReduce/mapreduce-output$i/part-00000 [Link]
break
else
rm [Link]
hadoop fs -copyToLocal /testMapReduce/mapreduce-output$i/part-00000 [Link]
fi
i=$((i + 1))
done

#[Link]
__authors__ = "Vaggelis Malandrakis, Kleio Fragkedaki"

from mapper import getCentroids

# check if distance of centroids and centroids1 is less than 1


def checkCentroidsDistance(centroids, centroids1):
f1x = abs(centroids[0][0] - centroids1[0][0]) < 1
f1y = abs(centroids[0][1] - centroids1[0][1]) < 1
f2x = abs(centroids[1][0] - centroids1[1][0]) < 1
f2y = abs(centroids[1][1] - centroids1[1][1]) < 1
f3x = abs(centroids[2][0] - centroids1[2][0]) < 1
f3y = abs(centroids[2][1] - centroids1[2][1]) < 1

if f1x and f1y and f2x and f2y and f3x and f3y:
print(1)
else:
print(0)

if __name__ == "__main__":
centroids = getCentroids('[Link]')
centroids1 = getCentroids('[Link]')

checkCentroidsDistance(centroids, centroids1)

#[Link]
#!/usr/bin/env python
""" [Link] """

__authors__ = "Vaggelis Malandrakis, Kleio Fragkedaki"

import sys
from math import sqrt

# get initial centroids from a txt file and add them in an array
def getCentroids(filepath):
centroids = []

with open(filepath) as fp:


line = [Link]()
while line:
if line:
try:
line = [Link]()
cord = [Link](', ')
# cord[0] is x and cord[1] is y point of a centroid
[Link]([float(cord[0]), float(cord[1])])
except:
break
line = [Link]()

return centroids

# create clusters based on initial centroids


def createClusters(centroids):
# read [Link]
for line in [Link]:
line = [Link]()
cord = [Link](',')
min_dist = float('inf')
index = -1

for centroid in centroids:


try:
cord[0] = float(cord[0])
cord[1] = float(cord[1])
except ValueError:
# float was not a number, so silently ignore/discard this line
continue

# euclidean distance from every point of dataset to every centroid


cur_dist = sqrt(pow(cord[0] - centroid[0], 2) + pow(cord[1] - centroid[1], 2))

# find the centroid which is closer to the point


if cur_dist <= min_dist:
min_dist = cur_dist
index = [Link](centroid)

var = "%s\t%s\t%s" % (index, cord[0], cord[1])


print(var)

if __name__ == "__main__":
centroids = getCentroids('[Link]')
createClusters(centroids)

#[Link]
#!/usr/bin/env python
""" [Link] """

__authors__ = "Vaggelis Malandrakis, Kleio Fragkedaki"

import sys

def calculateNewCentroids():
current_centroid = None
sum_x = 0
sum_y = 0
count = 0

# input comes from STDIN


for line in [Link]:
# parse the input of [Link]
centroid_index, x, y = [Link]('\t')

# convert x and y (currently a string) to float


try:
x = float(x)
y = float(y)
except ValueError:
# float was not a number, so silently ignore/discard this line
continue

# this IF - switch only works because Hadoop sorts map output by key (here: centroid
index)
if current_centroid == centroid_index:
count += 1
sum_x += x
sum_y += y
else:
if count != 0:
# print the average of every cluster to get new centroids
print(str(sum_x / count) + ", " + str(sum_y / count))

current_centroid = centroid_index
sum_x = x
sum_y = y
count = 1

# print last cluster's centroids


if current_centroid == centroid_index and count != 0:
print(str(sum_x / count) + ", " + str(sum_y / count))

if __name__ == "__main__":
calculateNewCentroids()

#[Link]
import [Link] as plt
from [Link] import distance

centroids = []
# initialize list for points
X = [[[], []], [[], []], [[], []]]
# initialize list for centroids
M = [[], []]

# import [Link] file


filepath = '[Link]'
with open(filepath) as fp:
line = [Link]()
while line:
if line:
# delete blanks for each line
line = [Link]()
# extract centroids coordinates
cord = [Link](', ')
[Link]((float(cord[0]), float(cord[1])))
M[0].append(float(cord[0]))
M[1].append(float(cord[1]))
line = [Link]()

# import [Link] file


filepath = '[Link]'
with open(filepath) as fp:
line = [Link]()
while line:
if line:
# delete blanks for each line
line = [Link]()
# extract points coordinates
cord = [Link](',')
x = (float(cord[0]), float(cord[1]))
# implement k-means loop
# find the nearest centroid
dist = float('inf')
selected_m = -1
for m in centroids:
test_distance = [Link](x, m)
if test_distance < dist:
dist = test_distance
selected_m = [Link](m)

X[selected_m][0].append(x[0])
X[selected_m][1].append(x[1])
line = [Link]()

# print cluster 0 with red color


[Link](X[0][0], X[0][1], 'ro')
# print cluster 1 with green color
[Link](X[1][0], X[1][1], 'go')
# print cluster 2 with blue color
[Link](X[2][0], X[2][1], 'bo')
# print centroids with yellow color
[Link](M[0], M[1], 'yo')
[Link]([-22, 20, -50, 40])
[Link]()
EXPERIMENT NUMBER 10
AIM OF EXPERIMENT:
Install and practice with Hive.

THEORY:
Hive is a data warehousing tool that enables SQL developers to write Hive Query Language
(HQL) statements, which closely resemble standard SQL statements. While HQL is somewhat
limited in the commands it supports, it remains a powerful tool for querying large datasets. When
HQL statements are executed, the Hive service translates them into MapReduce jobs, which are
then run across a Hadoop cluster. The syntax of Hive appears similar to traditional database code
with SQL access; however, there are significant differences due to its underlying architecture.
One major distinction is that Hadoop is optimized for long, sequential scans of data.
Consequently, queries executed in Hive often experience high latency, taking several minutes to
complete. This characteristic makes Hive unsuitable for applications that require rapid response
times, unlike traditional databases such as DB2. Additionally, Hive is primarily read-based and
not designed for transaction processing, which typically involves a high volume of write
operations.

CODE:

Run the following command to install MySQL Server:

1. sudo apt-get install mysql-server


2. mysql -uroot -proot
3. CREATE USER <USER_NAME> IDENTIFIED BY <PASSWORD>;
4. GRANT ALL PRIVILEGES ON . TO '<USER_NAME>' WITH GRANT OPTION;
5. tar xvfz [Link]
6. export HIVE_HOME=/home/apache-hive
7. export PATH=$PATH:$HIVE_HOME/bin
8. <property>
<name>[Link]</name>
<value>jdbc:mysql://localhost:3306/hive?createDatabaseIfNotExist=true</value>
</property> <property>
<name>[Link]</name>
<value>[Link]</value>
</property> <property>
<name>[Link]</name>
<value>hadoop</value>
</property>
<property>
<name>[Link]</name>
<value>hadoop</value> </property>

//Create a Database:
CREATE DATABASE | SCHEMA [IF NOT EXISTS] <database_name>;
//Drop a Database:
DROP DATABASE [IF EXISTS] database_name [RESTRICT | CASCADE];

//Create a Table:
CREATE [TEMPORARY] [EXTERNAL] TABLE [IF NOT EXISTS] [db_name.] table_name
(col_name data_type [COMMENT col_comment], ...) [COMMENT table_comment] [ROW
FORMAT row_format] [STORED AS file_format];

//Load Data into a Table:


LOAD DATA LOCAL INPATH '<path>/[Link]' OVERWRITE INTO TABLE u_data;

//Alter a Table Name:


ALTER TABLE name RENAME TO new_name;

//Add Columns to a Table:


ALTER TABLE name ADD COLUMNS (col_spec[, col_spec ...]);

//Drop a Column from a Table:


ALTER TABLE name DROP [COLUMN] column_name;

//Change a Column Name:


ALTER TABLE name CHANGE column_name new_name new_type;

//Replace Columns in a Table:


ALTER TABLE name REPLACE COLUMNS (col_spec[, col_spec ...]);

//Create a View:
CREATE VIEW [IF NOT EXISTS] view_name [(column_name [COMMENT
column_comment], ...)] [COMMENT table_comment] AS SELECT ...;

//Drop a View:
DROP VIEW view_name;

//Create an Index:
CREATE INDEX index_name ON TABLE base_table_name (col_name, ...) AS
'[Link]' [WITH DEFERRED REBUILD] [IDXPROPERTIES
(property_name=property_value, ...)] [IN TABLE index_table_name] [PARTITIONED BY
(col_name, ...)] [ROW FORMAT ...] STORED AS ... | STORED BY ... [LOCATION hdfs_path]
[TBLPROPERTIES (...)];

//Alter and Rebuild an Index:


ALTER INDEX index_ip_address ON log_data REBUILD;

//Store Index Data in Metastore:


SET [Link]=/home/administrator/Desktop/big/metastore_db/tmp/
index_ipaddress_result;
Set Input Format for Indexing:
SET
[Link]=[Link];

//Drop an Index:
DROP INDEX index_name ON table_name;
EXPERIMENT NUMBER 11
AIM OF EXPERIMENT:
Install HBase and Thrift, with practice examples.

THEORY:
HBase is a distributed, scalable, NoSQL database built on top of the Hadoop ecosystem,
designed to handle large amounts of structured and semi-structured data across clusters of
commodity hardware. It provides a fault-tolerant and highly available architecture, enabling real-
time read and write access to large datasets. HBase uses a column-oriented storage format, which
allows for efficient querying and retrieval of data based on column families rather than rows,
making it particularly suitable for applications that require quick lookups and updates. It
leverages Hadoop's HDFS for storage, ensuring that data is distributed across multiple nodes for
scalability and redundancy.
Thrift, on the other hand, is a software framework for scalable cross-language services
development, originally developed at Facebook. It provides a simple interface definition
language (IDL) and code generation tools to create services that can be easily accessed from
multiple programming languages. Thrift supports various protocols and transports, allowing
developers to build efficient, high-performance services that can communicate over different
networks and languages. By using Thrift, applications can interact with HBase, enabling
seamless integration and allowing developers to utilize HBase's capabilities through various
programming languages. This combination of HBase and Thrift facilitates the development of
data-intensive applications that require rapid access to large datasets while supporting a wide
range of programming languages and platforms.

CODE:

Step 1: Download HBase

1. Download the latest stable version of HBase from the official Apache website.
2. Extract the downloaded file.
3. Move into the HBase directory.

Step 2: Configure HBase

1. Open the [Link] file for editing using a text editor.


2. Add the following configuration settings:
a. Set the root directory for HBase.
b. Specify the Zookeeper quorum as localhost.
c. Set the Zookeeper client port to 2181.

Step 3: Start HBase

1. Start the HBase service.


2. Check if HBase is running by opening the HBase shell. You should see the HBase shell
prompt if it started successfully.
Step 4: Download and Install Thrift

1. Install the Thrift compiler and libraries.


2. If you plan to use Thrift with other programming languages, install the respective Thrift
libraries for those languages.

Step 5: Build the Thrift Interface

1. Create a file named [Link] for your Thrift interface, defining the service and
methods.
2. Compile the Thrift file to generate the necessary code for the service.

Step 6: Start the Thrift Server for HBase

1. Start the Thrift server using the HBase shell command. The default port for the Thrift
server is 9090, but you can change it in the HBase configuration if needed.

Example:

Step 1: Create a Table in HBase

Open the HBase shell and create a table named employers with a column family
empAnagrafical.

hbase(main):001:0> create 'employers', 'empAnagrafical'

Step 2: Insert Data into the Table

You can insert data using the HBase shell as follows:

hbase(main):002:0> put 'employers', 'row1', 'empAnagrafical:name', 'andrea'


hbase(main):003:0> put 'employers', 'row1', 'empAnagrafical:surname', 'guidi'
hbase(main):004:0> put 'employers', 'row2', 'empAnagrafical:name', 'george'
hbase(main):005:0> put 'employers', 'row2', 'empAnagrafical:surname', 'clooney'

Step 3: Set Up Thrift for HBase

Start the Thrift server. This is typically done by running the following command in your
HBase installation directory:

$ bin/hbase thrift start

Step 4: Access HBase Using Thrift in Python

Now you can use a Python client to connect to HBase via Thrift. Ensure you have the Thrift
Python library installed.

pip install thrift


Step 5: Example Python Code to Access HBase

Here is a simple Python example to access the HBase table using Thrift.

from thrift import Thrift


from [Link] import TSocket
from [Link] import TTransport
from [Link] import TBinaryProtocol
from hbase import Hbase

# Create a connection to the Thrift server


transport = [Link]('localhost', 9090)
transport = [Link](transport)
protocol = [Link](transport)
client = [Link](protocol)

# Open the connection


[Link]()

# Retrieve data from HBase


table_name = 'employers'
row1 = [Link](table_name, 'row1')
row2 = [Link](table_name, 'row2')

# Print the results


print(f"Row 1: {row1}")
print(f"Row 2: {row2}")

# Close the connection


[Link]()

You might also like