HADOOP DISTRIBUTED
FILE SYSTEM
MODULE 2
HDFS ARCHITECURE
HDFS is Hadoop’s implementation of a
distributed filesystem.
It is designed to hold a large amount of
data, and provide access to this data to
many clients distributed across a network.
The HDFS design is based on the design of
the Google File System (GFS).
Its implementation addresses a number of
problems that are present in a number of
distributed filesystems such as Network
File System (NFS).
The implementation of HDFS addresses the following:
Tobe able to store a very large amount
of data (terabytes or petabytes), HDFS
is designed to spread the data across a
large number of machines, and to
support much larger file sizes compared
to distributed file systems such as NFS.
To store data reliably, and to cope with
the malfunctioning or loss of individual
machines in the cluster, HDFS uses data
replication.
To better integrate with Hadoop’s
MapReduce, HDFS allows data to be
read and processed locally.
HDFS is restricted to a particular class of
applications — it is not a general-purpose
distributed filesystem.
A large number of additional decisions
and trade-offs govern HDFS architecture
andimplementation, including the
following:
HDFS is optimized to support high-
streaming read performance, and this
comes at the expense of random seek
performance. This means that if an
application is reading from HDFS, it
should avoid (or at least minimize) the
number of seeks. Sequential reads are
the preferred way to access HDFS files.
HDFS supports only a limited set of operations on
files — writes, deletes, appends, and reads, but not
updates. It assumes that the data will be written to
the HDFS once, and then read multiple times.
•HDFS does not provide a mechanism
for local caching of data. The overhead
of caching is large enough that data
should simply be re-read from the
source, which is not a problem for
applications that are mostly doing
sequential reads of large-sized data
files.
•HDFS is implemented as a block-
structured [Link] files
are broken into blocks of a fixed size,
which are stored across an Hadoop
cluster. A file can be made up of
several blocks, which are stored on
different DataNodes (individual
The DataNode stores each HDFS data block in a
separate file on its local filesystem with no
knowledge about the HDFS files themselves. To
improve throughput even further, the DataNode
does not create all files in the same directory.
Instead, it uses heuristics to determine the
optimal number of files per directory, and creates
subdirectories appropriately.
One of the requirements for such a block-
structured filesystem is the capability to store,
manage, and access file metadata (information
about files and blocks) reliably, and to provide
fast access to the metadata store. Unlike HDFS
files themselves (which are accessed in a write-
once and readmanymodel), the metadata
structures can be modified by a large number of
clients concurrently.
It is important that this information never gets
out of sync. HDFS solves this problem by
introducing a dedicated special machine,
called the NameNode, which stores all the
metadata for the file system across the cluster.
This means that HDFS implements a
master/slave architecture. A single NameNode
(which is a master server) manages the file
system namespace and regulates access to
files by clients.
The existence of a single master in a cluster
greatly simplifies the architecture of the
system. The NameNode serves as a single
arbitrator and repository for all HDFS
metadata.
Because of the relatively low
amount of metadata per file (it only
tracks filenames, permissions, and
the locations of each block), the
NameNode stores all of the
metadata in the main memory, thus
allowing for a fast random access.
The metadata storage is designed to
be compact.
As a result, a NameNode with 4 GB
of RAM is capable of supporting a
huge number of files and directories.
The default size of an HDFS block
is 64 MB — orders of magnitude
larger than the block size of the
majority of most other block
structured filesystems. The
additional benefit of the large
data block is that it allows HDFS
to keep large amounts of data
stored on the disk sequentially,
which supports fast streaming
reads of data.
The downside of HDFS file organization is that
several DataNodes are involved in the serving of
a file, which means that a file can become
unavailable in the case where any one of those
machines is lost. To avoid this problem, HDFS
replicates each block across a number of
machines (three, by default).
Data replication in HDFS is implemented as part
of a write operation in the form of a data
pipeline.
When a client is writing data to an HDFS file, this
data is first written to a local file. When the local
file accumulates a full block of data, the client
consults the NameNode to get a list of
DataNodes that are assigned to host replicas of
that block.
The client then writes the data block
from its local storage to the first
DataNode (see Figure)in 4K portions.
The DataNode stores the received
blocks in a local filesystem, and
forwards that portion of data to the
next DataNode in the list.
The same operation is repeated by
the next receiving DataNode until the
last node in the replica set receives
data. This DataNode stores data
locally without sending it any further.
If one of the DataNodes fails while the block
is being written, it is removed from the
pipeline. In this case, when the write
operation on the current block completes,
the NameNode re-replicates it to make up for
the missing replica caused by the failed
DataNode.
When a file is closed, the remaining data in
the temporary local file is pipelined to the
DataNodes. The client then informs the
NameNode that the file is closed. At this
point, the NameNode commits the file
creation operation into a persistent store. If
the NameNode dies before the file is closed,
the file is lost.
One of the most powerful features of HDFS is
optimization of replica placement, which is
crucial to HDFS reliability and performance.
All decisions regarding replication of blocks are
made by the NameNode, which periodically
(every 3 seconds) receives a heartbeat and a
block report from each of the DataNodes. A
heartbeat is used to ensure proper functioning of
DataNodes, and a block report allows verifying
that a list of blocks on a DataNode corresponds
to the NameNode information.
One of the first things that a DataNode does on
startup is sending a block report to the
NameNode. This allows the NameNode to rapidly
form a picture of the block distribution across
the cluster.
An important characteristic of the
data replication in HDFS is rack
awareness. Large HDFS instances
run on a cluster of computers that
is commonly spread across many
racks. Typically, network
bandwidth (and consequently
network performance) between
machines in the same rack is
greater than network bandwidth
between machines in different
racks.
The NameNode determines the rack
ID that each DataNode belongs to via
the Hadoop Rack Awareness process.
A simple policy is to place replicas on
unique racks. This policy prevents
losing data when an entire rack is
lost, and evenly distributes replicas
in the cluster. It also allows using
bandwidth from multiple racks when
reading data. But because a write
must, in this case, transfer blocks to
multiple racks, the performance of
writes suffers.
An optimization of a Rack Aware
policy is to cut inter-rack write
traffic (and consequently improve
write performance) by using the
number of racks that is less than
the number of replicas.
For example, when a replication
factor is three, two replicas are
placed on one rack, and the third
one is on a different rack.
Each DataNode periodically sends a
heartbeat message to the NameNode (see
Figure 2-1), which is used by the NameNode
to discover DataNode failures (based on
missing heartbeats).
The NameNode marks DataNodes without
recent heartbeats as dead, and does not
dispatch any new I/O requests to them.
Because data located at a dead DataNode is
no longer available to HDFS, DataNode death
may cause the replication factor of some
blocks to fall below their specified values.
The NameNode constantly tracks which
blocks must be re-replicated, and initiates
replication whenever necessary.
HDFS supports a traditional
hierarchical file organization
similar to most other existing
filesystems.
It supports creation and removal
of files within a directory, moving
files between directories, and
soon. It also supports user’s
quota and read/write
permissions.
Using HDFS Files
User applications access the HDFS filesystem using an
HDFS client, a library that exposes the HDFS filesystem
interface that hides most of the complexities of HDFS
implementation described.
Access to HDFS is through an instance of the FileSystem
object. A FileSystem class is an abstract base class for a
generic filesystem. (In addition to HDFS, Apache
provides implementation of FileSystem objects for other
filesystems,
includingKosmosFileSystem,NativeS3FileSystem,RawLoc
alFileSystem, and S3FileSystem.)
It may be implemented as a distributed filesystem, oras
a “local” one that uses the locally connected disk. The
local version exists for small Hadoop instances and for
testing. All user code that may potentially use HDFS
should be written to use a FileSystem object.
You can create an instance of the File
System object by passing a new
Configuration object into a constructor.
Assuming that Hadoop configuration files
([Link] and [Link])
are available on the class path, the code
snippet shown in Listing 2-1 creates an
instance of FileSystem object.
(Configuration files are always available if
the execution is done on one of
theHadoop cluster’s nodes. If execution is
done on the remote machine, the
configuration file must be explicitly added
to the application class path.)
Another important HDFS object is
Path, which represents names of files
or directories in a filesystem. A Path
object can be created from a string
representing the location of the
file/directory on the HDFS. A
combination of FileSystem and Path
objects allows for many
programmatic operations on HDFS
files and directories. Listing 2-2
shows an example.
The last two lines in Listing 2-2
show how to create
FSDataInputStream and
FSDataOutputStream objects based
on a file path.
These two objects are subclasses of
DataInputStream and
DataOutputStream from the Java
I/O package, which means that they
support standard I/O operations.
HDFS also introduced several specialized
files types (such as SequenceFile, MapFile,
SetFile, ArrayFile, and BloomMapFile) that
provide much richer functionality, which
often simplifies data processing.
SequenceFile provides a persistent data
structure for binary key/value pairs. Here,
different instances of both key and value
must represent the same Java class, but can
have different sizes. Similar to other Hadoop
files, SequenceFiles are append-only.
The use of SequenceFile provides a storage
mechanism natively supporting key/value
structure, thus making implementations
using this data layout much simpler.
SequenceFile has three available
formats: Uncompressed, Record-
Compressed, and Block-
Compressed. The first two are
stored in a record-based format,
third one uses block-based format.
The choice of a specific format for
a sequence file defines the length
of the file on the hard drive. Block-
Compressed files typically are the
smallest, while Uncompressed are
the largest.
The header contains general
information about SequenceFiles,
A record contains the actual data
for keys and values, along with
their lengths.
In this case, header and sync are
serving the same purpose as in
the case of a record-based
SequenceFile format. The actual
data is contained in the blocks,
The Design of HDFS
HDFS is a filesystem designed for
storing very large files with streaming
data access patterns, running on
clusters of commodity hardware.
Very large files: Very large” in this
context means files that are hundreds
of megabytes, gigabytes, or terabytes
in size. There are Hadoop clusters
running today that store petabytes of
data.
Streaming data access: HDFS is built
around the idea that the most efficient
data processing pattern is a write-
once, read-many-times pattern.
A dataset is typically generated or
copied from source, then various
analyses are performed on that
dataset over time.
Each analysis will involve a large
proportion, if not all, of the dataset, so
the time to read the whole dataset is
more important than the latency in
reading the first record.
Commodity hardware: Hadoop
doesn’t require expensive, highly
reliable hardware to run on. It’s
designed to run on clusters of
commodity hardware (commonly
available hardware available from
multiple vendors3) for which the
chance of node failure across the
cluster is high, at least for large
clusters.
HDFS is designed to carry on working
without a noticeable interruption to
the user in the face of such failure.
It is also worth examining the
applications for which using HDFS
does not work so well. While this
may change in the future, these
are areas where HDFS is not a
good fit
today:
Low-latency data access: Applications that require
low-latency access to data, in the tens of milliseconds
range, will not work well with HDFS. Remember, HDFS
is optimized for delivering a high throughput of data,
and this may be at the expense of latency. Hbase is
currently a better choice for low-latency access
Lots of small files: Since the namenode
holds filesystem metadata in memory,
the limit to the number of files in a
filesystem is governed by the amount
of memory on the namenode. As a rule
of thumb, each file, directory, and
block takes about 150 bytes.
So, for example, if you had one million
files, each taking one block, you would
need at least 300 MB of memory. While
storing millions of files is feasible,
billions is beyond the capability of
current hardware.
Multiple writers, arbitrary file
modifications: Files in HDFS may
be written to by a single writer.
Writes are always made at the
end of the file. There is no
support for multiple writers, or for
modifications at arbitrary offsets
in the file. (These might be
supported in the future, but they
are likely to be relatively
inefficient.)
HDFS Concepts
Blocks:
A disk has a block size, which is the minimum
amount of data that it can read or write.
Filesystems for a single disk build on this by
dealing with data in blocks, which are an integral
multiple of the disk block size.
Filesystem blocks are typically a few kilobytes in
size, while disk blocks are normally 512 bytes.
This is generally transparent to the filesystem
user who is simply reading or writing a file—of
whatever length.
However, there are tools to perform filesystem
maintenance, such as df and fsck, that operate
on the filesystem block level.
HDFS, too, has the concept of a block,
but it is a much larger unit—64 MB by
default.
Like in a filesystem for a single disk,
files in HDFS are broken into block-
sized chunks, which are stored as
independent units. Unlike a filesystem
for a single disk, a file in HDFS that is
smaller than a single block does not
occupy a full block’s worth of
underlying storage.
When unqualified, the term “block” in
this book refers to a block in HDFS.
Block abstraction for a distributed
filesystem brings several benefits:
The first benefit is the most obvious: a file can
be larger than any single disk in the network
There’s nothing that requires the blocks from a
file to be stored on the same disk, so they can
take advantage of any of the disks in the
cluster.
Second, making the unit of abstraction a block
rather than a file simplifies the storage
subsystem.
Simplicity is something to strive for all in all
systems, but is especially important for a
distributed system in which the failure modes
are so varied.
Blocks fit well with replication for
providing fault tolerance and availability.
To insure against corrupted blocks and
disk and machine failure, each block is
replicated to a small number of physically
separate machines.
If a block becomes unavailable, a copy
can be read from another location in a
way that is transparent to the client.
A block that is no longer available due to
corruption or machine failure can be
replicated from its alternative locations to
other live machines to bring the
replication factor back to the normal level.
Namenodes & DataNodes
An HDFS cluster has two types of node operating in
a master-worker pattern: a namenode (the master)
and a number of datanodes (workers).
The namenode manages the filesystem namespace.
It maintains the filesystem tree and the metadata
for all the files and directories in the tree.
This information is stored persistently on the local
disk in the form of two files: the namespace image
and the edit log.
The namenode also knows the datanodes on which
all the blocks for a given file are located, however,
it does not store block locations persistently, since
this information is reconstructed from datanodes
when the system starts.
Namenodes & DataNodes
A client accesses the filesystem on behalf of
the user by communicating with the
namenode and datanodes. The client
presents a POSIX-like filesystem interface, so
the user code does not need to know about
the namenode and datanode to function.
Datanodes are the workhorses of the
filesystem. They store and retrieve blocks
when they are told to (by clients or the
namenode), and they report back to the
namenode periodically with lists of blocks
that they are storing.
Namenodes & DataNodes
Without the namenode, the filesystem
cannot be used. In fact, if the machine
running the namenode were
obliterated, all the files on the
filesystem would be lost since there
would be no way of knowing how to
reconstruct the files from the blocks
on the datanodes. For this reason, it is
important to make the namenode
resilient to failure, and Hadoop
provides two mechanisms for this.
Namenodes & DataNodes
The first way is to back up the files
that make up the persistent state
of the filesystem metadata.
Hadoop can be configured so that
the namenode writes its persistent
state to multiple filesystems.
These writes are synchronous and
atomic. The usual configuration
choice is to write to local disk as
well as a remote NFS mount.
Namenodes & DataNodes
It is also possible to run a secondary namenode, which
despite its name does not act as a namenode. Its main
role is to periodically merge the namespace image with
the edit log to prevent the edit log from becoming too
large.
The secondary namenode usually runs on a separate
physical machine, since it requires plenty of CPU and as
much memory as the namenode to perform the merge. It
keeps a copy of the merged namespace image, which
can be used in the event of the namenode failing.
However, the state of the secondary namenode lags that
of the primary, so in the event of total failure of the
primary, data loss is almost certain. The usual course of
action in this case is to copy the namenode’s metadata
files that are on NFS to the secondary and run it as the
new primary.
Basic Filesystem
Operations
The filesystem is ready to be used,
and we can do all of the usual
filesystem operations such as reading
files, creating directories, moving files,
deleting data, and listing directories.
You can type hadoop fs -help to get
detailed help on every command.
Start by copying a file from the local
filesystem to HDFS:
% hadoop fs –copyFromLocal
input/docs/[Link]
hdfs://localhost/user/tom/quangle.
txt
This command invokes Hadoop’s
filesystem shell command fs, which
supports a number of subcommands
—in this case, we are running -
copyFromLocal.
The local file [Link] is copied to
the file /user/tom/[Link] on the
HDFS instance running on localhost.
In fact, we could have omitted the
scheme and host of the URI and
picked up the default,
hdfs://localhost, as specified in core-
[Link]:
% hadoop fs –copyFromLocal
input/docs/[Link]
/user/tom/[Link]
We could also have used a relative
path and copied the file to our home
directory in HDFS, which in this case
is /user/tom:
% hadoop fs -copyFromLocal
input/d
ocs/[Link] [Link]
• Let’s copy the file back to the local
filesystem and check whether it’s the
same:
% hadoop fs -copyToLocal
[Link] [Link]
% md5 input/docs/[Link]
[Link]
MD5 (input/docs/[Link]) =
a16f231da6b05e2ba7a339320e7dacd
9\
MD5 ([Link]) =
a16f231da6b05e2ba7a339320e7dacd
9
The MD5 digests are the same,
showing that the file survived its trip
to HDFS and is back intact.
Finally, let’s look at an HDFS file
listing. We create a directory first
just to see how it is displayed in the
listing:
% hadoop fs -mkdir books
% hadoop fs -ls .
Found 2 items
drwxr-xr-x - tom supergroup 0 2009-
04-02 22:41 /user/tom/books
-rw-r--r-- 1 tom supergroup 118
2009-04-02 22:29
/user/tom/[Link]
The information returned is very similar to
the Unix command ls -l, with a few minor
differences.
The first column shows the file mode. The
second column is the replication factor of
the file (something a traditional Unix
filesystem does not have).
we set the default replication factor in the
site-wide configuration to be 1, which is
why we see the same value here.
The entry in this column is empty for
directories since the concept of replication
does not apply to them—directories are
treated as metadata and stored by the
namenode, not the datanodes.
The third and fourth columns
show the file owner and group.
The fifth column is the size of the
file in bytes, or zero for
directories.
The sixth and seventh columns
are the last modified date and
time.
Finally, the eighth column is the
absolute name of the file or
directory.
Hadoop Filesystems
Hadoop has an abstract notion of
filesystem, of which HDFS is just
one implementation. The Java
abstract class
[Link]
represents a filesystem in
Hadoop, and there are several
concret implementations,
Interfaces
Hadoop is written in Java, and all
Hadoop filesystem interactions are
mediated through the Java API.
HTTP:
There are two ways of accessing HDFS
over HTTP: directly, where the HDFS
daemons serve HTTP requests to
clients; and via a proxy (or proxies),
which accesses HDFS on the client’s
behalf using the usual
DistributedFileSystem API.
The original direct HTTP interface (HFTP and
HSFTP) was read-only, while the new WebHDFS
implementation supports all filesystem
operations, including Kerberos authentication.
WebHDFS must be enabled by setting
[Link] to true, for you to be able
to use webhdfs URIs.
The second way of accessing HDFS over HTTP
relies on one or more standalone proxy servers.
(The proxies are stateless so they can run
behind a standard load balancer.)
All traffic to the cluster passes through the
proxy. This allows for stricter firewall and
bandwidth limiting policies to be put in place.
It’s common to use a proxy for transfers
between Hadoop clusters located in different
data centers.
C:
Hadoop provides a C library
called libhdfs that mirrors the
Java FileSystem interface (it was
written as a C library for
accessing HDFS, but despite its
name it can be used to access
any Hadoop filesystem). It works
using the Java Native Interface
(JNI) to call a Java filesystem
client.
The Java Interface
Reading Data from a Hadoop URL:
One of the simplest ways to read a file
from a Hadoop filesystem is by using a
[Link] object to open a stream to
read the data from. The general idiom is:
InputStream in = null;
try {
in = new
URL("hdfs://host/path").openStream();
// process in
} finally {
[Link](in);
}
There’s a little bit more work required to
make Java recognize Hadoop’s hdfs URL
scheme.
This is achieved by calling the
setURLStreamHandlerFactory method on
URL with an instance of
FsUrlStreamHandlerFactory.
This method can only be called once per
JVM, so it is typically executed in a static
block. This limitation means that if some
other part of your program—perhaps a
third-party component outside your
control— sets URLStreamHandlerFactory,
you won’t be able to use this approach for
reading data from Hadoop.
Example 3-1. Displaying files from a Hadoop filesystem
on standard output using a URLStreamHandler
public class URLCat {
static {
[Link](new
FsUrlStreamHandlerFactory());
}
public static void main(String[] args) throws Exception {
InputStream in = null;
try {
in = new URL(args[0]).openStream();
[Link](in, [Link], 4096, false);
} finally {
[Link](in);
}
}
}
We make use of the handy IOUtils class
that comes with Hadoop for closing the
stream in the finally clause, and also
for copying bytes between the input
stream and the output stream
([Link] in this case).
The last two arguments to the
copyBytes method are the buffer size
used for copying and whether to close
the streams when the copy is
complete. We close the input stream
ourselves, and [Link] doesn’t
need to be closed.
Reading Data Using the FileSystem API:
it is impossible to set a
URLStreamHandlerFactory for your
application. In this case, you will need to
use the FileSystem API to open an input
stream for a file.
A file in a Hadoop filesystem is
represented by a Hadoop Path object
(and not a [Link] object, since its
semantics are too closely tied to the local
filesystem).
You can think of a Path as a Hadoo
filesystem URI, such a
hdfs://localhost/user/tom/ [Link].
A Configuration object encapsulates a
client or server’s configuration, which is
set using configuration files read from the
classpath, such as conf/[Link].
The first method returns the default
filesystem (as specified in the file
conf/[Link], or the default local
filesystem if not specified there).
The second uses the given URI’s scheme
and authority to determine the filesystem
to use, falling back to the default
filesystem if no scheme is specified in the
given URI.
The third retrieves the filesystem as the
given user.
Example 3-2. Displaying files from a Hadoop
filesystem on standard output by using the
FileSystem directly
public class FileSystemCat {
public static void main(String[] args) throws Exception {
String uri = args[0];
Configuration conf = new Configuration();
FileSystem fs = [Link]([Link](uri), conf);
InputStream in = null;
try {
in = [Link](new Path(uri));
[Link](in, [Link], 4096, false);
} finally {
[Link](in);
}
}
}
Directories
FileSystem provides a method to
create a directory:
public boolean mkdirs(Path f) throw
IOException
This method creates all of the
necessary parent directories if they
don’t already exist, just like the
[Link]’s mkdirs() method. It
returns true if the directory (and all
parent directories) was (were)
successfully created.
Deleting Data:
Use the delete() method on FileSystem
to permanently remove files or
directories:
public boolean delete(Path f, boolean recursive) throws
IOException
If f is a file or an empty directory, then
the value of recursive is ignored. A
nonempty directory is only deleted,
along with its contents, if recursive is
true (otherwise an IOException is
thrown).