MapReduce
The MapReduce framework was inspired by these concepts and introduced by Google in 2004
to support distributed processing on large data sets distributed over clusters of computers.
MapReduce was introduced to solve large-data computational problems, and is specifically designed
to run on commodity hardware.
It is based on divide-and-conquer principles — the input data sets are split into independent chunks,
which are processed by the mappers in parallel.
Additionally, execution of the maps is typically co-located with the data.
The framework then sorts the outputs of the maps, and uses them as an input to the reducers.
Slide 1
Functionality of Mappers and Reducers
Single Hadoop Job = Mapper + [Reducer] + Driver
Where driver is the main application controlling some of the aspects of the execution.
Slide 2
Responsibility of MapReduce Framework
1. Choosing appropriate machines (nodes) for running mappers
2. Starting and monitoring the mapper’s execution
3. Choosing appropriate locations for the reducer’s execution
4. Sorting and shuffling the output of mappers and delivering the output to reducer nodes
5. Starting and monitoring the reducer’s execution
Other than these,
The framework takes care of scheduling tasks, monitoring them, and re-executing the failed tasks.
Slide 3
The three key Methods in Mapper Class
The Mapper class has three key methods (which you can overwrite):
setup, cleanup, and map
Both setup and cleanup methods are invoked only once during a specific mapper life cycle — at the
beginning and end of mapper execution, respectively.
The setup method is used to implement the mapper’s initialization (for example, reading shared resources,
connecting to HBase tables, and so on), whereas cleanup is used for cleaning up the mapper’s resources
and, optionally, if the mapper implements an associative array or counter, to write out the information.
The business functionality (that is, the application-specific logic) of the mapper is implemented in the map
function. Typically, given a key/value pair, this method processes the pair and writes out (using a context
object) one or more resulting key/value pairs. Slide 4
Hollywood Principle in Mapper Class
A context object passed to this method allows the map method to get additional information about the
execution environment, and report on its execution. An important thing to note is that a map function does
not read data. It is invoked based on the “Hollywood principle” every time a reader reads (and optionally
parses) a new record with the data that is passed to it (through context) by the reader.
Hollywood principle — “Don’t call us, we’ll call you” principle
It is a useful software development technique in which an object’s (or component’s) initial condition and
ongoing life cycle is handled by its environment, rather than by the object itself. This principle is typically used
for implementing a class/component that must fit into the constraints of an existing framework.
Slide 5
MapReduce Execution Architecture
Slide 6
MapReduce Execution Architecture (Contd.,)
1. Choosing appropriate machines (nodes) for
running mappers
2. Starting and monitoring the mapper’s execution
3. Choosing appropriate locations for the reducer’s
execution
4. Sorting and shuffling the output of mappers and
delivering the output to reducer nodes
5. Starting and monitoring the reducer’s execution
Other than these,
The framework takes care of scheduling tasks,
monitoring them, and re-executing the failed tasks.
Slide 7
Main Components of MapReduce Execution Pipelines
1. Driver
2. Context
3. InputData
4. InputFormat
5. InputSplit
6. RecordReader
7. Mapper
8. Partition
9. Shuffle
10. Sort
11. Reducer
12. OutputFormat 13. RecordWriter
14. [Combiner] 15. [DistributedCache] Slide 8
Main Components of MapReduce Execution Pipelines (Contd.,)
1. Driver – main program
- defines job-specific configuration &
specifies all components
2. Context - Coordinator object to manage the phases
of process runs in different machines
- It provides system and job-wide
information
3. InputData - Arrival of 10s of GBs of data will be
splitted and stored in HDFS / Hbase / Other storage
4. InputFormat - It is invoked by job driver to decide the
number and location of map task
execution and for data split.
- It sets InputSplits which is used to split
the input data.
Slide 9
Main Components of MapReduce Execution Pipelines (Contd.,)
5. InputSplit - It defines a unit of work for a
single map task in a MapReduce program.
6. RecordReader - It reads the data from its source.
- It resides inside the mapper task
- It converts it into key/value pairs and
delivers them to the map method
7. Mapper - 1st phase of user-defined work
- It takes input data in the form of a series of
key/value pairs (k1, v1), which are used for individual
map execution.
- It typically transforms the input pair into an
output pair (k2, v2), which is used as an input for
shuffle and sort.
- Individual mappers can’t communicate with each
other.
Slide 10
Main Components of MapReduce Execution Pipelines (Contd.,)
8. Partition - Each map task may emit key/value pairs
to any partition, i.e., (k2,v2) is known as
subset of intermediate key space.
- Partitioner class computes a hash value
for the key, and assigns the partition to specific
reducer based on this result
7. Shuffle - It moves map outputs to the reducers
which is known as shuffling.
8. Sort - The set of intermediate key/value pairs
for a given reducer is automatically sorted by
Hadoop to form keys/values (k2, {v2, v2,…}) before
they are
presented to the reducer.
Slide 11
YARN – Map Reduce Phases
Slide 12
Main Components of MapReduce Execution Pipelines (Contd.,)
9. Reducer - For each key assigned to a given reducer, it calls reduce() once.
- Its iterator returns the values associated with a key are in an undefined order.
- The reducer typically transforms the input key/value pairs into output pairs
(k3, v3).
10. OutputFormat - It defines a location of the output data and RecordWriter used for storing the resulting
data
11. RecordWriter - It defines how individual output records are written.
Slide 13
Main Components of MapReduce Execution Pipelines (Contd.,)
12. [Combiner] - If present, a combiner runs after the
mapper and before the reducer.
- It receives all data emitted by mapper instances
as input, and tries to combine
values with the same key, thus reducing the
keys’ space, and decreasing the number of keys (not
necessarily data) that must be sorted.
13. [DistributedCache] - It enables the sharing of data
globally by all nodes on the cluster.
- It is a shared library to be accessed by
each task, a global lookup file holding key/value
pairs, jar files (or archives) containing executable
code, and so on.
- The cache copies over the file(s) to the
machines where the actual execution occurs, and
makes them available for the local usage.
Slide 14
Features of MapReduce
1. It completely hides the complexity of managing a large distributed cluster of machines, and coordination of
job execution between these nodes.
2. A developer’s programming model is very simple – only to implement mapper and reducer functionality, as
well as a driver, bringing them together as a single job and configuring required parameters.
3. All users’ code is then packaged into a single jar file (in reality, the MapReduce framework can operate on
multiple jar files), that can be submitted for execution on the MapReduce cluster.
Slide 15
Runtime Coordination and Task Management in MapReduce
Scheduling, Synchronization, and Error and fault handling
Scheduling — The framework ensures that multiple tasks from multiple jobs are executed on the cluster.
Different schedulers provide different scheduling strategies.
1. First come, first served
It is for ensuring that all the jobs from all users get their fair share of a cluster’s execution.
2. Speculative execution
It is for ensuring that non-anticipated slowness of a given machine will not slow down execution of the task. It
is default one ([Link] = true;) ReSearch!!
Slide 16
Runtime Coordination and Task Management in MapReduce (Contd.,)
Synchronization - The reduce phase cannot start until all of a map’s key/value pairs are emitted.
So, it requires synchronization between the map and reduce phases of processing. At this point, sorting of the
intermediate key/value pairs which are grouped by key is being done.
Error and fault handling - To accomplish job execution in the environment where errors and faults are the
norm, the JobTracker attempts to restart failed task executions.
Slide 17
MapReduce Execution
Slide 18
MapReduce Execution
Initial Process
Job Driver Job Tracker Job Client
Job Driver (InputFormat to partition the data, and communicates with the scheduler to get map details) Job
Client
Regular Process
Job Tracker Task Tracker
Job Tracker (receives job, InputSplit from Job Driver and submits it to the Task Tracker in its allocated node,
then starts tracking the Job Client’s submitted job, and monitors Task Tracker to know the job status. It
creates set of reducer tasks also)
Task Tracker (process its allocated job, and start a loop to send periodic heartbeat messages)
Task Runner (uses distributed cache to create a child JVM for mapper and reducer)
Job Tracker (analyses the received message) Scheduler (updates cluster status information)
Slide 19
MapReduce Execution (Contd.,)
Single node can manage with multiple maps and multiple reducers
Single JVM can be allocated for single task or multiple tasks
Configuration file [Link] = 1 single task
= -1 multiple tasks
= above 1 [Link]().setInt(Job.JVM_Num_Tasks.To_Run,int)
Slide 20
MapReduce Application:
Simple implementation of a word count MapReduce job - 1
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
import [Link];
Slide 21
MapReduce Application:
Simple implementation of a word count MapReduce job - 5
public class WordCount extends Configured implements Tool{
public static class Map extends Mapper<LongWritable, Text, Text,
IntWritable> {
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
@Override
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String line = [Link]();
StringTokenizer tokenizer = new StringTokenizer(line);
while ([Link]()) {
[Link]([Link]());
[Link](word, one);
}
}
}
Slide 22
MapReduce Application:
Simple implementation of a word count MapReduce job - 4
public class WordCount extends Configured implements Tool{
public static class Map extends Mapper<LongWritable, Text, Text,
IntWritable> {
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
@Override
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String line = [Link]();
StringTokenizer tokenizer = new StringTokenizer(line);
while ([Link]()) {
[Link]([Link]());
[Link](word, one);
}
}
}
Slide 23
MapReduce Application:
Simple implementation of a word count MapReduce job - 3
public int run(String[] args) throws Exception {
Configuration conf = new Configuration(); Job job = new Job(conf, "Word Count");
[Link]([Link]);
// Set up the input
[Link]([Link]); [Link](job, new
Path(args[0]));
// Mapper
[Link]([Link]);
// Reducer
[Link]([Link]);
// Output
[Link]([Link]); [Link]([Link]);
[Link]([Link]); [Link](job, new
Path(args[1]));
//Execute
boolean res = [Link](true);
if (res) return 0;
else return -1;
}
Slide 24
MapReduce Application:
Simple implementation of a word count MapReduce job - 2
public static void main(String[] args) throws Exception {
int res = [Link](new WordCount(), args);
[Link](res);
}
}
Slide 25
MapReduce Job Execution – Worker-driven Load Balancing Approach
In this case, all of the execution requests are written to the queue. Each worker tries to read a new
request from the queue, and then executes it. Once execution is complete, a worker tries to read the next
request. This type of load balancing is called worker-driven load balancing. In this case, the requester
does not know anything about execution capabilities, or even the number of workers. A worker reads a
new request only after a current one is completed, thus ensuring effective utilization of resources.
Slide 26
Implementing InputFormat for Multiple HBase Tables
For MapReduce jobs leveraging HBase-based data sources, Hadoop provides TableInputFormat. The
limitation of this implementation is that it supports only a single table.
Because Hadoop’s implementation of TableInputFormat supports a single table/scan, all of the information
about the table and scan is contained in the TableInputFormat implementation, and does not need to be
defined in the InputSplit class. In this case, different splits can refer to different tables/scan pairs. As a result,
you must extend the table split class to contain not only table-related information (name, start and end row,
region server location), but also a scan for a given table.
Slide 27