Big Data (CS-3032)
Kalinga Institute of Industrial Technology
Deemed to be University
Bhubaneswar-751024
School of Computer Engineering
Strictly for internal circulation (within KIIT) and reference only. Not for outside circulation without permission
3 Credit Lecture Note
Course Contents
2
Sr # Major and Detailed Coverage Area Hrs
3
Hadoop Ecosystem 8
Introduction to Hadoop, Hadoop Ecosystem,
Hadoop Distributed File System, MapReduce,.
School of Computer Engineering
Introduction to Hadoop
3
Hadoop is an open-source project of the Apache Foundation. Apache Hadoop is
written in Java and a collection of open-source software utilities that facilitate
using a network of many computers to solve problems involving massive
amounts of data and computation. It provides a software framework for
distributed storage and processing of big data and uses Google’s
MapReduce and Google File System as its foundation.
Hadoop
Apache open-source software framework
Inspired by:
- Google MapReduce
- Google File System
Hadoop provides various tools and technologies, collectively termed as Hadoop
ecosystem, to enable development and deployment of Big Data solutions. It
accomplishes two tasks namely i) Massive data storage, and ii) Faster data
processing.
School of Computer Engineering
Hadoop Components
4
School of Computer Engineering
Hadoop Components cont’d
5
Hadoop Core Components:
HDFS
Storage component
Distributed data across several nodes
Natively redundant
MapReduce
Computational Framework
Splits a task across multiple nodes
Process data in parallel
Hadoop Ecosystems: These are support projects to enhance the functionality of
Hadoop Core components. The projects are as follows:
Hive Flume HBase
Pig Oozie
Sqoop Mahout
School of Computer Engineering
Hadoop Ecosystem
6
Data Management
Data Access
Data Processing
Data Storage
School of Computer Engineering
How are Files Stored
Files are split into blocks
Blocks are split across many machines at load time
Different blocks from the same file will be stored on
different machines
Blocks are replicated across multiple machines
The NameNode keeps track of which blocks make
up a file and where they are stored
HDFS Architecture
Cluster Membership
NameNode
Secondary
NameNode
Client
Cluster Membership
NameNode : Maps a file to a file-id and list of MapNodes DataNodes
DataNode : Maps a block-id to a physical location on disk
SecondaryNameNode: Periodic merge of Transaction log
Running on a single machine, the NameNode daemon determines and
tracks where the various blocks of a data file are stored.
The DataNode daemon manages the data stored on each machine.
If a client application wants to access a particular file stored in HDFS, the
application contacts the NameNode.
NameNode provides the application with the locations of the various
blocks for that file.
The application then communicates with the appropriate DataNodes to
access the file.
Each DataNode periodically builds a report about the blocks stored on
the DataNode and sends the report to the NameNode.
For performance reasons, the NameNode resides in a
machine’s memory.
Because the NameNode is critical to the operation of HDFS, any
unavailability or corruption of the NameNode results in a data
unavailability event on the cluster.
Thus, the NameNode is viewed as a single point of failure in the
Hadoop environment.
To minimize the chance of a NameNode failure and to improve
performance, the NameNode is typically run on a dedicated
machine.
A third daemon, the Secondary NameNode.
It provides the capability to perform some of the
NameNode tasks to reduce the load on the NameNode.
Such tasks include updating the file system image with
the contents of the file system edit logs.
In the event of a NameNode outage, the NameNode
must be restarted and initialized with the last file system
image file and the contents of the edits logs.
Data Replication
Default replication is 3-fold
Data Retrieval
When a client wants to retrieve data
Communicates with the NameNode to determine
which blocks make up a file and on which data nodes
those blocks are stored
Then communicated directly with the data nodes to
read the data
Hadoop
19
School of Computer Engineering
Version of Hadoop
20
There are 3 versions of Hadoop available:
Hadoop 1.x Hadoop 3.x
Hadoop 2.x
Hadoop 1.x vs. Hadoop 2.x
Hadoop 1.x Hadoop 2.x
Other Data Processing
MapReduce MapReduce
Framework
Data Processing & Resource
Management YARN
Resource Management
HDFS HDFS2
Distributed File Storage Distributed File Storage
(redundant, reliable storage) (redundant, highly-available, reliable storage)
School of Computer Engineering
Hadoop 2.x vs. Hadoop 3.x
Characteristi Hadoop 2.x Hadoop 3.x
cs21
Minimum Java 7 Java 8
supported
Fault Handled by replication (which is Handled by erasure coding
tolerance wastage of space). Erasure coding
(EC) is a method
Data Uses HDFS balancer Uses Intra-data node of data
Balancing balancer, which is invoked protection in
via the HDFS disk balancer which data is
CLI. broken into
fragments,
Storage Uses 3X replication scheme. E.g. Support for erasure expanded and
Scheme If there is 6 block so there will encoding in HDFS. E.g. If encoded with
be 18 blocks occupied the space there is 6 block so there will redundant data
because of the replication be 9 blocks occupied the pieces and
scheme. space 6 block and 3 for stored across a
parity. set of different
locations or
Scalability Scale up to 10,000 nodes per Scale more than 10,000 storage media.
cluster. nodes per cluster.
School of Computer Engineering
High Level Hadoop 2.0 Architecture
22
Hadoop is distributed Master-Slave architecture.
Distributed data storage Distributed data processing
Client
HDFS YARN
HDFS Master Node YARN Master Node
Active Namenode Resource Manager
Master
Standby Namenode
Secondary Namenode
HDFS Slave Node YARN Slave Node
DataNode 1 Slave Node Manager 1
DataNode n Node Manager n
School of Computer Engineering
High Level Hadoop 2.0 Architecture cont’d
23
Resource Node Node Node
YARN Manager Manager Manager Manager
HDFS
Cluster NameNode DataNode DataNode DataNode
School of Computer Engineering
Hadoop HDFS
24
School of Computer Engineering
Hadoop HDFS
25
The Hadoop Distributed File System (HDFS) is the primary data storage
system used by Hadoop applications.
HDFS holds very large amount of data and employs a NameNode and
DataNode architecture to implement a distributed file system that provides
high-performance access to data across highly scalable Hadoop clusters.
To store such huge data, the files are stored across multiple machines.
These files are stored in redundant fashion to rescue the system from
possible data losses in case of failure.
It’s run on commodity hardware.
Unlike other distributed systems, HDFS is highly fault-tolerant and designed
using low-cost hardware.
School of Computer Engineering
Hadoop HDFS Key points
26
Some key points of HDFS are as follows:
1. Storage component of Hadoop.
2. Distributed File System.
3. Modeled after Google File System.
4. Optimized for high throughput (HDFS leverages large block size and moves
computation where data is stored).
5. One can replicate a file for a configured number of times, which is tolerant
in terms of both software and hardware.
6. Re-replicates data blocks automatically on nodes that have failed.
7. Sits on top of native file system
School of Computer Engineering
HDFS Daemons
27
Key components of HDFS are as follows:
1. NameNode 3. Secondary NameNode
2. DataNodes 4. Standby NameNode
Blocks: Generally the user data is stored in the files of HDFS. HDFS breaks a
large file into smaller pieces called blocks. In other words, the minimum
amount of data that HDFS can read or write is called a block. By default the
block size is 128 MB in Hadoop 2.x and 64 MB in Hadoop 1.x. But it can be
increased as per the need to change in HDFS configuration.
Hadoop 2.X Hadoop 1.X
200 MB – [Link] 200 MB – [Link]
128 MB – Block 1
72 MB – Block 2
Why block size is large?
1. Reduce the cost of seek time and 2. Proper usage of storage space
School of Computer Engineering
Rack
28
A rack is a collection of 30 or 40 nodes that are physically stored close together
and are all connected to the same network switch. Network bandwidth between
any two nodes in rack is greater than bandwidth between two nodes on different
racks. A Hadoop Cluster is a collection of racks. Switch
Node 1 Node 1 Node 1
S S S
Node 2 Node 2 Node 2
W W W
I I I
T T T
C C C
H H H
Node N Node N Node N
Rack 1 Rack 2 Rack N
School of Computer Engineering
NameNode
29
1. NameNode is the centerpiece of HDFS.
2. NameNode is also known as the Master.
3. NameNode only stores the metadata of HDFS – the directory tree of all files in the
file system, and tracks the files across the cluster.
4. NameNode does not store the actual data or the dataset. The data itself is actually
stored in the DataNodes
5. NameNode knows the list of the blocks and its location for any given file in HDFS.
With this information NameNode knows how to construct the file from blocks.
6. NameNode is usually configured with a lot of memory (RAM).
7. NameNode is so critical to HDFS and when the NameNode is down, HDFS/Hadoop
cluster is inaccessible and considered down.
8. NameNode is a single point of failure in Hadoop cluster.
Configuration
Processors: 2 Quad Core CPUs running @ 2 GHz
RAM: 128 GB
Disk: 6 x 1TB SATA
Network: 10 Gigabit Ethernet
School of Computer Engineering
NameNode Metadata
30
1. Metadata stored about the file consists of file name, file path, number of
blocks, block Ids, replication level.
2. This metadata information is stored on the local disk. Namenode uses two
files for storing this metadata information.
FsImage EditLog
3. NameNode in HDFS also keeps in it’s memory, location of the DataNodes
that store the blocks for any given file. Using that information Namenode
can reconstruct the whole file by getting the location of all the blocks of a
given file.
Example
(File Name, numReplicas, rack-ids, machine-ids, block-ids, …)
/user/in4072/data/part-0, 3, r:3, M3, {1, 3}, …
/user/in4072/data/part-1, 3, r:2, M1, {2, 4, 5}, …
/user/in4072/data/part-2, 3, r:1, M2, {6, 9, 8}, …
School of Computer Engineering
DataNode
31
1. DataNode is responsible for storing the actual data in HDFS.
2. DataNode is also known as the Slave
3. NameNode and DataNode are in constant communication.
4. When a DataNode starts up it announce itself to the NameNode along with
the list of blocks it is responsible for.
5. When a DataNode is down, it does not affect the availability of data or the
cluster. NameNode will arrange for replication for the blocks managed by
the DataNode that is not available.
6. DataNode is usually configured with a lot of hard disk space. Because the
actual data is stored in the DataNode.
Configuration
Processors: 2 Quad Core CPUs running @ 2 GHz
RAM: 64 GB
Disk: 12-24 x 1TB SATA
Network: 10 Gigabit Ethernet
School of Computer Engineering
Secondary NameNode
32
1. Secondary NameNode in Hadoop is more of a helper to NameNode, it is not
a backup NameNode server which can quickly take over in case of
NameNode failure.
2. EditLog– All the file write operations done by client applications are first
recorded in the EditLog.
3. FsImage– This file has the complete information about the file system
metadata when the NameNode starts. All the operations after that are
recorded in EditLog.
4. When the NameNode is restarted it first takes metadata information from
the FsImage and then apply all the transactions recorded in EditLog.
NameNode restart doesn’t happen that frequently so EditLog grows quite
large. That means merging of EditLog to FsImage at the time of startup
takes a lot of time keeping the whole file system offline during that process.
5. Secondary NameNode take over this job of merging FsImage and EditLog
and keep the FsImage current to save a lot of time. Its main function is to
check point the file system metadata stored on NameNode.
School of Computer Engineering
Secondary NameNode cont’d
33
The process followed by Secondary NameNode to periodically merge the
fsimage and the edits log files is as follows:
1. Secondary NameNode pulls the latest FsImage and EditLog files from the
primary NameNode.
2. Secondary NameNode applies each transaction from EditLog file to FsImage
to create a new merged FsImage file.
3. Merged FsImage file is transferred back to primary NameNode.
1
2
Secondary
NameNode
NameNode
3
It’s been an hour,
provide your
metadata
School of Computer Engineering
Standby NameNode
34
With Hadoop 2.0, built into the platform, HDFS now has automated failover
with a hot standby, with full stack resiliency.
1. Automated Failover: Hadoop pro-actively detects NameNode host and
process failures and will automatically switch to the standby NameNode to
maintain availability for the HDFS service. There is no need for human
intervention in the process – System Administrators can sleep in peace!
2. Hot Standby: Both Active and Standby NameNodes have up to date HDFS
metadata, ensuring seamless failover even for large clusters – which means
no downtime for your HDP cluster!
3. Full Stack Resiliency: The entire Hadoop stack (MapReduce, Hive, Pig,
HBase, Oozie etc.) has been certified to handle a NameNode failure scenario
without losing data or the job progress. This is vital to ensure long running
jobs that are critical to complete on schedule will not be adversely affected
during a NameNode failure scenario.
School of Computer Engineering
Replication
35
HDFS provides a reliable way to store huge data in a distributed environment as
data blocks. The blocks are also replicated to provide fault tolerance. The default
replication factor is 3 which is configurable. Therefore, if a file to be stored of
128 MB in HDFS using the default configuration, it would occupy a space of 384
MB (3*128 MB) as the blocks will be replicated three times and each replica will
be residing on a different DataNode.
School of Computer Engineering
Rack Awareness
36
All machines in rack are connected using the same network switch and if that
network goes down then all machines in that rack will be out of service. Thus
the rack is down. Rack Awareness was introduced by Apache Hadoop to
overcome this issue. In Rack Awareness, NameNode chooses the DataNode
which is closer to the same rack or nearby rack. NameNode maintains Rack ids
of each DataNode to achieve rack information. Thus, this concept chooses
DataNodes based on the rack information. NameNode in Hadoop makes ensures
that all the replicas should not stored on the same rack or single rack. Default
replication factor is 3. Therefore according to Rack Awareness Algorithm:
When a Hadoop framework creates new block, it places first replica on the
local node, and place a second one in a different rack, and the third one is on
different node on same remote node.
When re-replicating a block, if the number of existing replicas is one, place
the second on a different rack.
When number of existing replicas are two, if the two replicas are in the
same rack, place the third one on a different rack.
School of Computer Engineering
Rack Awareness & Replication
37
File B1 Block 1 B3 Block 3
B1 B2 B3 B2 Block 2
B3 DN 1 B1 DN 1 B2 DN 1
B1 DN 2 B2 DN 2 B3 DN 2
B3 DN 3 B1 DN 3 B2 DN 3
DN 4 DN 4 DN 4
Rack 1 Rack 2 Rack 3
School of Computer Engineering
Rack Awareness Advantages
38
Provide higher bandwidth and low latency – This policy
maximizes network bandwidth by transferring block within a rack
rather than between racks. The YARN is able to optimize MapReduce
job performance by assigning tasks to nodes that are closer to their
data in terms of network topology.
Provides data protection against rack failure – Namenode assign
the block replicas of 2nd And 3rd Block to nodes in different rack
from the first replica. Thus, it provides data protection even against
rack failure. However, this is possible only if Hadoop was configured
with knowledge of its rack configuration.
Minimize the writing cost and Maximize read speed – Rack
awareness, policy places read/write requests to replicas which are in
the same rack. Thus, this minimizes writing cost and maximizes
reading speed.
School of Computer Engineering
Anatomy of File Write
39
HDFS follow Write once Read many models. So files can’t be edited that are
already stored in HDFS, but data can be appended by reopening the file.
Distributed
HDFS File System
Client NameNode
FSData
Client JVM OutputStream
7. Complete
Client Node
4. Write Packet 5. Acknowledge Packet
4 4
DataNode1 5 DataNode2 5 DataNode3
Pipelines of DataNode
School of Computer Engineering
Anatomy of File Write cont’d
40
1. The client calls create function on DistributedFileSystem (a class extends
from FileSystem) to create a file.
2. The RPC call to the NameNode happens through the DistributedFileSystem
to create a new file. The NameNode performs various checks (existence of
the file) to create a new file. Initially, the NameNode creates a file without
associating any data blocks to the file. The DistributedFileSystem returns
an FSDataOutputStream (i.e. class instance) to the client to perform write.
3. As the client writes data, data is split into packets by DFSOutputStream (i.e.
a class), which is then written to the internal queue, called data queue.
DataStreamer (i.e. a class) consumes the data queue. The DataStreamer
requests the NameNode to allocate new blocks by selecting a list of suitable
DataNodes to store replicas. The list of DataNodes makes a pipeline. With
the default replication factor of 3, there will be 3 nodes in the pipeline for
the first block.
School of Computer Engineering
Anatomy of File Write cont’d
41
4. DataStreamer streams the packets to first DataNode in the pipeline. It stores
packet and forwards it to the second DataNode in the pipeline. In the
sameway, the cecond DataNode stores the packet and forwards to the third
DataNode in the pipeline.
5. In additional to the internal queue, DFSOutputStream also manages an “Ack
queue” of packets that are waiting for the acknowledgement by DataNodes.
A packet is removed from the “Ack Queue” only if it is acknowledged by all
the DataNodes in the pipeline.
6. When the client finishes writing the file, it calls close() on the stream.
7. This flushes all the remaining packets to the DataNode pipeline and waits
for relevant acknowledgements before communicating with the NameNode
to inform the client that the creation of file is complete.
School of Computer Engineering
Anatomy of File Read
42
Distributed
HDFS File System
Client NameNode
FSData
Client JVM InputStream
Client Node
4.2. read
4.1. read 4.3. read
DataNode1 DataNode2 DataNode3
DataNodes
School of Computer Engineering
Anatomy of File Write cont’d
43
1. The client opens the file that it wishes to read from by calling open() on the
DistributedFileSystem
2. DistributedFileSystem communicates with the NameNode to get the
location of the data blocks. NameNode returns the address of the
DataNodes that the data blocks are stored on. Subsequent to this,
DistributedFileSystem returns DFSInputStream (i.e. a class) to the client to
read from the file.
3. Client then calls read() on the stream DFSInputStream, which has address of
the DataNodes for the first few blocks of the file, connects to the closet
DataNode for the first block in the file.
4. Client calls read() repeatedly to stream the data from the DataNode.
5. When the end of the block is reached, DFSInputStream closes the
connection with the DataNode. It repeats the steps to find the best
DataNode for the next block and subsequent blocks.
6. When the client completes the reading of the file, it calls close() on
FSDataInputStream to close the connection.
School of Computer Engineering
HDFS Commands
44
To get the list of directories and files at the root of HDFS.
hadoop fs –ls /
To create a directory (say, sample) in HDFS
hadoop fs –mkdir /sample
To copy a file from local file system to HDFS
hadoop fs –put /root/sample/[Link] /sample/[Link]
To copy a file from HDFS to local file system
hadoop fs –get /sample/[Link] /root/sample/[Link]
To display the contents of an HDFS file on console
hadoop fs –cat /sample/[Link]
To copy a file from one directory to another on HDFS
hadoop fs –cp /sample/[Link] /sample1
To remove a directory from HDFS
hadoop fs –rm –r /sample1
School of Computer Engineering
HDFS Example
45
Let’s assume that this [Link] file contains few lines as text. The content of the file is
as follows:
Hello I am expert in Big Data
How can I help you
How can I assist you
Are you an engineer
Are you looking for coding
Are you looking for interview questions
what are you doing these days
what are your strengths
Hence, the above 8 lines are the content of the file. Let’s assume that while storing this
file in Hadoop, HDFS broke this file into four parts and named each part as [Link],
[Link], [Link], and [Link]. So, you can easily see that the above file will be
divided into four equal parts and each part will contain 2 lines. First two lines will be in
the file [Link], next two lines in [Link], next two in [Link] and the last two lines
will be stored in [Link]. All these files will be stored in DataNodes and the Name Node
will contain the metadata about them. All this is the task of HDFS.
School of Computer Engineering
Data Processing with Hadoop
46
School of Computer Engineering
Data Processing with Hadoop
47
1. MapReduce is a processing technique and a program model for
distributed computing based on java. It is built on divide and
conquer algorithm.
2. In MapReduce Programming, the input dataset in split into
independent chunks.
3. It contains two important tasks, namely Map and Reduce.
4. Map takes a set of data and converts it into another set of
data, where individual elements are broken down into tuples
(key/value pairs). The processing primitive is called mapper.
The processing is done in parallel manner. The output
produced by the map tasks serves as intermediate data and is
stored on the local disk of that server.
School of Computer Engineering
Data Processing with Hadoop
48
1. Reduce task takes the output from a map as an input and
combines those data tuples into a smaller set of tuples. The
processing primitive is called reducer. The input and output are
stored in a file system.
2. Reduce task is always performed after the map job.
3. The major advantage of MapReduce is that it is easy to scale
data processing over multiple computing nodes and takes care
of other tasks such as scheduling, monitoring, re-executing
failed tasks etc.
School of Computer Engineering
Data Processing with Hadoop cont’d
49
School of Computer Engineering
Data Processing with Hadoop cont’d
50
MapReduce program executes in three stages: map stage, shuffle & sorting
stage, and reduce stage.
Map Stage: The map or mapper’s job is to process the input data. Generally
the input data is in the form of file or directory and is stored in the Hadoop
file system (HDFS). The input file is passed to the mapper function line by
line. The mapper processes the data and creates several small chunks of
data.
Shuffle & Sorting Stage: Shuffle phase in Hadoop transfers the map output
from Mapper to a Reducer in MapReduce. Sort phase in MapReduce
covers the merging and sorting of map outputs.
Reducer Stage: The Reducer’s job is to process the data that comes from
the mapper. After processing, it produces a new set of output, which will be
stored in the HDFS.
School of Computer Engineering
How MapReduce Work?
51
At the crux of MapReduce are two functions: Map and Reduce. They are
sequenced one after the other.
The Map function takes input from the disk as <key,value> pairs, processes
them, and produces another set of intermediate <key,value> pairs as output.
The Reduce function also takes inputs as <key,value> pairs, and produces
<key,value> pairs as output.
School of Computer Engineering
Working of MapReduce
52
The types of keys and values differ based on the use case. All inputs and outputs
are stored in the HDFS. While the map is a mandatory step to filter and sort the
initial data, the reduce function is optional.
<k1, v1> -> Map() -> list(<k2, v2>)
<k2, list(v2)> -> Reduce() -> list(<k3, v3>)
Mappers and Reducers are the Hadoop servers that run the Map and Reduce
functions respectively. It doesn’t matter if these are the same or different
servers.
Map: The input data is first split into smaller blocks. Each block is then
assigned to a mapper for processing. For example, if a file has 100 records
to be processed, 100 mappers can run together to process one record each.
Or maybe 50 mappers can run together to process two records each. The
Hadoop framework decides how many mappers to use, based on the size of
the data to be processed and the memory block available on each mapper
server.
School of Computer Engineering
Working of MapReduce cont’d
53
Reduce: After all the mappers complete processing, the framework shuffles
and sorts the results before passing them on to the reducers. A reducer
cannot start while a mapper is still in progress. All the map output values
that have the same key are assigned to a single reducer, which then
aggregates the values for that key.
Class Exercise 1
Draw the MapReduce process to
count the number of words for the
input:
Dog Cat Rat
Car Car Rat
Dog car Rat
Rat Rat Rat
School of Computer Engineering
54
Question 1.
Peter Piper Picked a peck of pickled
pepers. A peck of pickled peppers Peter
Piper picked. If Peter Piper picked a peck
of pickled pepers. Where’s the peck of
pickled peppers Peter Piper Picked?
School of Computer Engineering
Draw the MapReduce process to find the maximum
electrical consumption for each year.
55
Input File
School of Computer Engineering
Perform Map Reduce
56
School of Computer Engineering
57
School of Computer Engineering
58
School of Computer Engineering
59
School of Computer Engineering
Example: Build an inverted index for a set of tweets based on their
hashtags and how we can map the solution as a MapReduce.
60
Input Data:
“It’s not too late to vote. #ElectionDay”
“Midtown polling office seeing a steady flow of voters!
#PrimaryDay”
“Today's the day. Be a voter! #ElectionDay”
“Happy #PrimaryDay”
“Say NO to corruption & vote! #ElectionDay”
“About to go cast my vote...first time #ElectionDay”
MapReduce mapping:
𝑚𝑎𝑝 ∶ 𝑡𝑤𝑒𝑒𝑡 → (ℎ𝑎𝑠ℎ𝑡𝑎𝑔,𝑡𝑤𝑒𝑒𝑡)
𝑟𝑒𝑑𝑢𝑐𝑒 ∶ (ℎ𝑎𝑠ℎ𝑡𝑎𝑔,{𝑙𝑖𝑠𝑡(𝑡𝑤𝑒𝑒𝑡)}) → ℎ𝑎𝑠ℎ𝑡𝑎𝑔,{𝑙𝑖𝑠𝑡(𝑡𝑤𝑒𝑒𝑡)}
61
School of Computer Engineering
62
School of Computer Engineering
Example
63
School of Computer Engineering
64
School of Computer Engineering
65
School of Computer Engineering