What is meant by MapReduce in Hadoop

Oliver Fischer

In order to operate their services, Internet companies have to store and efficiently process amounts of data in the gigabyte and peta range every day. Google's MapReduce or the open source replica Hadoop can take on the part of data processing by partitioning the input data into pieces so that they can then be processed as independent subtasks.

One architectural approach to dealing with large amounts of data is distributed computing in clusters, with smaller weak systems tackling a task at the same time instead of processing them sequentially on a single large system.

For the operation of their services, companies like Facebook, Google and Yahoo are faced with the task of being able to store and efficiently process amounts of data in the gigabyte and peta range every day. At Google, this led to the development of the Google File System (GFS), a distributed cluster file system that works on the master-slave principle. One of the main requirements of the GFS was fault tolerance in the event of hardware failure, since the GFS was to run on standard hardware. Google met this requirement through replication.

With that, Google had solved part of the task: data storage. The only thing left open was the processing. Conventionally, the data to be processed are brought to the program. The program starts on one computer and gets the required data from another source such as a database. This path was not feasible for Google with the data volumes to be processed, because the network throughput for the data transfer turned out to be a bottleneck for the entire processing. Therefore, the processing of the data must take place where the data was stored. The principle is called data locality.

For this purpose, Google developed the MapReduce programming model, which divides the data into small parts and distributes it to different computers for simultaneous processing. The end result is created by bringing together all the partial results. MapReduce thus offered a solution for processing distributed data and enabled good integration with the GFS. Google introduced both MapReduce and the Google File System in 2003 and 2004, paving the way for open source implementations. Google did not provide its own implementations.

Doug Cutting implemented MapReduce for Nutch in 2005, laying the foundation for the open source Hadoop project, an implementation of GFS and MapReduce. After Hadoop started as a Lucene sub-project in 2006, it received the inauguration of a top-level project from the Apache Software Foundation (ASF) in 2008 and is now home to its own sub-projects. In July 2009, a cluster with the Hadoop framework won the Terabyte Sort Benchmark twice, which determines the most powerful systems for data sorting in different categories. A Yahoo Hadoop cluster could sort 100 terabytes in two hours and 53 minutes.

Architecture and installation

Over the course of time, the Hadoop project has been restructured several times to take account of the ongoing development. Today, Hadoop essentially consists of the three components Hadoop Common, Hadoop Distributed File System (HDFS) and MapReduce and works according to the master-slave principle. One node in the cluster takes on the role of master, coordinating the work of the remaining nodes.

Hadoop Common provides the basic functions that all other components require. This includes an implementation-neutral file system interface, the interface for "Remote Procedure Call" communication in the cluster and libraries for the serialization of data. The HDFS is the standard file system of the Hadoop framework. MapReduce offers all functions to develop according to the programming model. In addition to HDFS, other systems can be used to store the data, including CloudStore and Amazon's S3.

With the three components you can build a complete Hadoop cluster. The Hadoop documentation recommends Linux as the production operating system for the individual nodes of a Hadoop cluster, although Hadoop clusters can also be built on the basis of other Unix-like systems such as FreeBSD or Solaris. Sun's JDK version 1.6 or higher must be available for the selected platform. Operation with OpenJDK is not possible. The current release 0.20.2 can be downloaded from the project website. Hadoop users such as Yahoo, A9 (Amazon's search engine) or last.fm operate Hadoop clusters in sizes from 10 to several thousand nodes.

For example, Yahoo's largest installation is 4000 nodes. Therefore, it is difficult to make a recommendation for sizing a Hadoop cluster, as that depends largely on the intended use. The exception is the size ratio of the master node to the other nodes in the cluster: the master node must have more performance and more memory than the other nodes. One of the reasons for this is that individual framework components that run on the master keep all information in the main memory for performance reasons and thus avoid expensive I / O operations.

For the first steps, Hadoop supports a pseudo-distributed mode in which all components run on a single computer in a separate process. A normal workstation computer such as your own notebook is sufficient for this. This makes it easy to develop and test your own MapReduce applications, but in this configuration no conclusions can be drawn about the performance of the application in a cluster.

Detailed installation instructions can be found on the project website. Setting up a real cluster shouldn't be a hurdle for users with a Unix background thanks to the project documentation. The US company Cloudera provides RPM and Debian packages for simplified installation. For a first encounter with Hadoop, she offers a free VMWare image that only requires the free VMWare player.

Those who decided to develop their own MapReduce applications in the next step, but cannot or do not want to provide their own cluster, can use Amazon's Elastic MapReduce Hadoop as a SaaS offer as part of the Amazon web services. With their EC2 (Elastic Cloud), an individual Hadoop cluster can also be configured.

Hadoop Distributed File System

Hadoop Distributed File System

HDFS follows the example of the Google File System (GFS). Therefore it agrees in important properties as described in the paper "The Google Filesystem" published by Google. When developing the GFS, Google assumed that the failure of individual nodes in computer clusters is not an exception, but the normal case when expensive special hardware is dispensed with in favor of cheaper standard hardware for the cluster structure. Hence, fault tolerance to hardware failure has been a major design goal for GFS. In addition, it was necessary for Google to have a distributed file system that could handle the high volume of data and that could easily be expanded by adding new nodes to the cluster.

As part of the master-slave principle, the role of the master in HDFS is taken over by the NameNode, which manages all metadata of the file system such as directory structures and files. The so-called DataNodes take on the role of slaves and are responsible for managing the user data in the HDFS on the individual cluster nodes.

To a user, the HDFS looks like a normal hierarchical file system, but its functions are not comparable to other distributed file systems such as the Andrew File System, as it is optimized for the development of MapReduce applications. Like the GFS, HDFS assumes that files are only written once but read frequently, so read operations must be more efficient than write operations. Another special feature is the lack of operations for byte-precise access to files, as they are known from the C function, among other things.

As the size of the cluster increases, so does the probability of a hardware failure. Under normal circumstances, this should not lead to data loss in GFS / HDFS clusters. For this reason, no file may only be located on one node, the failure of which would lead to data loss. The data security required for this is achieved by the HDFS by replicating all user data.

Instead of replicating entire files on different nodes, the developers of GFS / HDFS chose a block-oriented approach. With this approach, each file stored in HDFS is divided into individual blocks with a fixed byte size and stored on different cluster nodes using the NameNode. In addition, the NameNode ensures that each block is replicated multiple times on different nodes in the cluster.

In the standard HDFS configuration, the NameNode replicates each block three times. If a node in the cluster fails, only the blocks on the failed computer are lost and not an entire file, as the lost blocks can be replaced by their copies on other nodes and the NameNode can then provide the entire file.

The NameNode, which knows all the directories and files that exist in HDFS, must therefore know for each file which blocks it was divided into, which cluster node it is on and which copies of the individual blocks are available. So-called DataNodes run on the cluster nodes to manage them. In contrast to the NameNode, of which there is only one per HDFS instance, the DataNodes only manage the individual blocks that were stored on their node. Only the NameNode has information about which blocks belong to which file.

For example, if a client is to read a file from the HDFS, it first establishes a connection to the NameNode. This determines which blocks belong to the requested file and which DataNodes they are on. The requesting client and the DataNodes are now responsible for the data transfer, since the data transfer is not part of the NameNode's task. The division ensures that the NameNode is not overloaded and that transfers can be carried out decentrally, while the NameNode is available for other tasks.

The procedure is similar when transferring a file to HDFS. The client informs the NameNode that it should create a file, whereupon the NameNode creates an entry in its namespace for the new file and the client can start the data transfer. To do this, the client builds the byte blocks for the HDFS locally from the data to be transferred. As soon as a block has reached its specified size, the client requests storage space for it from the NameNode. The NameNode then determines a list of DataNodes on which the block and its copies are to be stored.

A process known as pipelining is then used for the actual transmission. The client transfers the block to the first DataNode from the list received. As soon as he has successfully stored the block on his local system, he transfers it to the next DataNode in the list. The replication process repeats itself until the required number of block copies is reached. However, it is transparent for the client, who in the meantime can build and transfer the next block for the HDFS. The transfer of the entire file does not come to an end until the NameNode has ensured that every block has been stored in the HDFS with a minimum number of copies.

With sizes of up to several hundred nodes in an HDFS cluster, hardware failure is a daily occurrence, but it must be documented. Therefore, each DataNode sends a heartbeat at configurable intervals to the central NameNode in HDFS, which remembers the time of the last heartbeat for each DataNode. If it is absent for a longer period of time, the NameNode declares it dead. So that the supposed failure does not fall below the minimum number of copies per block and thus increases the probability of data loss, the NameNode instructs the DataNodes with the remaining copies to show them to replicate other DataNodes.

However, it looks a little different if the NameNode fails. It only exists once per HDFS instance, which is why it represents a "single point of failure" in HDFS. The NameNode saves its information about the metadata of the HDFS in the file system of its host. The image is called FsImage, which is completely loaded into the working memory when the NameNode is started. This enables the NameNode to answer queries quickly. In order to avoid the FsImage being damaged by any write operations when changes are made to the metadata of the HDFS, changes are only kept in memory and are not instantly persisted. The SecondaryNameNode, which is attached to each NameNode, takes on the task. It logs all changes in the EditLog. If a NameNode starts, it reads in both the last FsImage and the EditLog and updates all changes from the EditLog on the FsImage. The NameNode saves the updated FsImage and only then starts its work.

In addition to the command line client provided, you can work with the HDFS from any Java application using the Java API.

Programming model and framework

Programming model and framework

MapReduce often means two things. On the one hand, it is understood as a programming model and, on the other hand, it is called MapReduce frameworks. The latter work according to the MapReduce model, but differ in the choice of programming language and in the implementation details. Google designed MapReduce as a programming model for highly parallel data processing in clusters. As a concept, it has its roots in Lisp, which knows the two functions and, which also acted as namesake for MapReduce. Lisps function is called with a list of input values ​​and a function as parameters so that it applies the passed function to each value in the list.

The results of the function calls can be returned in list form as the result. works similarly to by calling it with a function and a list of input values ​​and passing each value from the input list of the function. In contrast to, merges all results and returns the result as scalar values.

The MapReduce programming model is based on this and divides the processing into a Map and a subsequent Reduce phase, which are carried out on all computers in a cluster. The programmer specifies the concrete implementation of the map and the reduce function, to whom all freedoms are open. Since MapReduce was designed for implementation in computer clusters, it can be assumed that workers can execute each of the two phases in parallel on several nodes of the cluster.

For this purpose, the underlying framework instantiates a certain number of workers with the specified map function, divides the data to be processed into parts called splits and assigns them to the workers. Each worker then executes the map function on the splits assigned to him. The input data can be processed by the Hadoop framework as key-value pairs and transferred to the map function, which processes them and can generate any number of output pairs known as intermediate values. The following listing shows this as pseudocode for counting word lengths in text documents.

;; lineno: line number
;; line: line of text
map (key lineno, value line)
for each word w in line:
EmitIntermediatePair (w, length (w));

The framework receives the intermediate values ​​and distributes them to the workers executing the reduce function, whereby the framework ensures that all intermediate pairs with the same key value are combined and transferred to the same reducer instance. The output of the Reduce function represents the result of the MapReduce application. A central master, which assigns the available workers to one of the two phases, coordinates and monitors the MapReduce applications. Figure 1 shows the processing and the individual phases.

Hadoop's MapReduce framework works on the model described above. The applications for Hadoop are called jobs, and the job tracker acting as master is responsible for their distribution in the cluster and subsequent management. The actual MapReduce job is carried out by the TaskTracker, one instance of which runs on a cluster node. Usually a TaskTracker and a DataNode instance always work as a pair on a node, which enables the interaction of HDFS and the MapReduce framework. Figure 2 gives an overview of the architecture and Figure 3 shows an example cluster with five nodes.

Monitoring of Hadoop clusters

To monitor a Hadoop cluster, the NameNode for HDFS and the JobTracker for MapReduce provide a spartan web interface on which the most important parameters are available, for example for HDFS information on storage capacity and its utilization as well as the number of active nodes or the failed nodes in the cluster. The interface of the JobTracker is also limited to a brief overview of the running jobs and those waiting to be executed, but provides detailed information on the overall progress, the tasks executed and the parameters used by Hadoop for each individual job. The latter is a good source of information for later optimizations.If you want to gain a deeper insight into the internal processes in a Hadoop cluster, you should take a look at the log files written by the Hadoop framework via log4j, which are written for each server instance in the cluster.

Although this helps to understand the internal processes, it does not help to monitor a cluster itself. Hadoop therefore supports the Ganglia monitoring solution, but Nagios can also be used in conjunction with Hadoop's JMX support.


MapReduce ecosystem

In addition to Hadoop, there are other MapReduce implementations such as Quizmt, a .NET-based framework developed by MySpace under the GPL-3 license. The Greenplum Database, a commercial product with SQL and MapReduce support, takes a hybrid approach. Phoenix pursues a shared memory approach, and with Disco, a framework written in Erlang is available for those interested.

Further projects have been founded around Hadoop, which, like Hadoop, originally represent free implementations of Google frameworks or pursue their own approaches. The first group includes the Chukwa project, which enables the monitoring of large, distributed systems, and Hive, which offers a data warehouse solution with its own query language based on Hadoop. The group of projects geared towards Google includes HBase, a BigTable implementation for large, distributed databases. The Pig project allows MapReduce applications to be written in its own query language, which it translates into MapReduce jobs so that the user can concentrate on data analysis.

Mahout is located under the umbrella of the Lucene project, which provides a framework for machine learning based on Hadoop. Many of the projects are largely driving Yahoo employees forward as open source projects, as the company is the largest Hadoop user. Doug Cutting also worked for Yahoo at times before moving to Cloudera.


MapReduce is often incorrectly compared directly to relational database systems and presented as an inefficient way to manage data. The criticism overlooks the fact that MapReduce is not a database system, but a programming model with which data can also be evaluated, but which is not limited to this. In general terms, Hadoop and MapReduce can solve tasks whose input data can be easily partitioned into pieces so that they can then be processed as independent sub-problems, which corresponds to the division of the data into individual blocks in HDFS. Examples of this are searching through text files or converting their format.

Hadoop is not suitable for tasks that cannot be partitioned as a whole due to dependencies during processing, such as weather simulation. In addition to its parallel approach, Hadoop's flexible architecture makes it ideal for processing unstructured data for which there is no schema, the preparation and import of which into a database or another system is not in proportion to the frequency of its access, or which is only in raw format exist because they are processed repeatedly, but always under a different question. Critics often overlook this when they compare MapReduce with relational database systems. As is so often the case, the task and the solution must match.

First experiences and the broad application of Hadoop have left the positive impression of a framework with which MapReduce applications can be reliably developed for productive use. Anyone looking for an alternative approach to mass computing should take a closer look at Hadoop.

A second article will provide a hands-on look at developing MapReduce applications.

Oliver Fischer
works in Berlin and can be reached via www.swe-blog.net. He thanks Isabel Drost for valuable criticism and additions to the article.