Reading the Google paper on map-reduce really peaked my interest in learning more about it. The Apache Hadoop website has some good reading, and Google of course turns up lots of other info. Reading is all well and good, but the best way to learn about a concept or algorithm is to actually implement it.
As a programmer, nothing helps you learn a subject better than implementing it. Want to understand the PNG image format? Write a encoder/decoder. 3DS Max Ascii export format? Implement a scene playback engine for ASE files. A* pathfinding? Code a maze game with some AI controlled Minotaur that wanders around it. I decided that I needed to write a map-reduce system, not just a map-reduce program to plug into an existing engine, but the actual runtime code that executes a map-reduce function over a set of files.
So, here it is, in Java, from a blank page to a working (albeit naive) map-reduce implementation in under 500 lines of code. If you want to grab the complete source, here it is.
Outline
After I finished writing this post, I looked at it and though, crikey, that’s a lot of text to wade through! So I drew a diagram of the process my code implements. Hopefully, it’ll be a useful reference if you need to visualise any of the data structures or the steps in the process.

MapReduce Outline
First is the functional outline of the MapReduce class. A single go() method. I know it will need to handle some command line arguments to create a list of file to process, then coordinate the map and reduce phases.
public class MapReduce implements Runnable
{
/** All the files to process, as specified on the command line */
public static File [] files;
/** A hashtable of MapResult objects that are created by values emitted for a given key */
public static Map <String, MapResult> map_results;
/** The main method is the static entry point for command line Java programs */
public static void main(String [] args)
{
// Usage information
if (args == null || args.length == 0)
{
System.out.println ("Usage: java MapReduce <filespec>");
System.out.println (" Process a set of files listed by <filespec> using a trivial MapReduce implementation.");
System.exit (1);
}
// Out of the static context and off we go
MapReduce me = new MapReduce ();
me.go (args);
}
/** Empty constructor */
public MapReduce ()
{
}
/** The go method gets a list of files to process and invokes the map and reduce phases */
public void go (String [] args)
{
GetFiles (args);
MapPhase ();
ReducePhase ();
PrintResults ();
}
/** Create a worker thread to process each input file and run the map function over the data */
public void MapPhase ()
{
// implement the map phase of the processing
}
/** Create a set of worker threads to perform the reduce function over the intermediate results of the map operation */
public void ReducePhase ()
{
// implement the reduce phase of the processing
}
}
GetFiles just processes the command line arguments and comes up with an array of java.io.File objects. The map phase needs to take each of these files, read it’s contents and pass the contents to a set of worker tasks that execute the map () method on each files data. The first cornerstone of map-reduce is this splitting of the input files and processing by a distinct set of separate worker tasks. A full implementation would have worker tasks partitioned across multiple processors in a cluster, and a distributed filesystem, but that’s a bit beyond an afternoons programming!
I’m simply going to create java.lang.Thread objects, one for each of the input files to perform the map function. The reduce phase needs a bit more work to determine the number of threads to use, but I’ll again create a bunch of them for the job. Notice the MapReduce class implements the java.lang.Runnable interface, I’m going to use the class as the basis for both the map and reduce worker threads. To do that, the class will need a run () method, and a way to designate running threads as map or reduce workers so the code branch can be distinct for each type.
//
// Worker task methods
//
/** Worker task will perform the map function */
public static final int FUNCTION_MAP = 0;
/** Worker task will perform the reduce function */
public static final int FUNCTION_REDUCE = 1;
/** Variable set to either FUNCTION_MAP or FUNCTION_REDUCE */
protected int function;
/** For a map worker, this is the file to operate on */
protected File file;
/** Results of the map function calls to EmitIntermediate (), stored in a table */
protected Map<String, MapResult> results;
public MapReduce (int _function, File _file)
{
function = _function;
file = _file;
}
public MapReduce (int _function, Map<String, MapResult> _results)
{
function = _function;
results = _results;
}
public void run ()
{
switch (function)
{
case FUNCTION_MAP:
// invoke users map function here
break;
case FUNCTION_REDUCE:
// invoke users reduce function here
break;
}
}
You can see from the constructors there that the first is meant for the map phase, accepting a File object along with the function type, and the second for the reduce phase accepting a java.util.Map object. Don’t worry about the MapResult class yet, I’ll get to that shortly. I also don’t really need to pass the function type to the constructor here, it could be deduced, but it makes the code more obvious in the MapPhase () and ReducePhase () methods.
Finally we need the user map and reduce methods, and the output methods these use to emit results. The map function uses an EmitIntermediate method to output key-value pairs as it processes the data in it’s input file. The reduce function uses an Emit method to output a single result given all the previously emitted values for a given key. Like the Google paper, all these methods use String objects for keys and values, and we can use the handy java.util.Iterator for the list of values for the reduce method.
public void EmitIntermediate (String key, String value)
{
// Store the intermdiate key-value pair for later sorting and 'reduction'.
// This is done by getting a MapResult object from the map_results hashtable,
// or creating a new one, and adding this emmitted value to it.
}
public void Emit (String result)
{
// Output this result for the key the reduce function is currently handling.
// Do this by getting the MapResult object from the map_results table
// for the current key and seting it's result.
}
// key: document name
// value: document contents
public void map (String key, String value)
{
// Split value into words
String [] words = value.split (" ");
// Emit '1' associated with each word
for (String word: words)
{
EmitIntermediate (word, "1");
}
}
// key: a word
// values: a list of counts
public void reduce (String key, Iterator<String> values)
{
int result = 0;
while (values.hasNext ())
{
String v = values.next ();
result += Integer.parseInt (v);
}
Emit (result + "");
}
I’ve implemented the simple word count map-reduce example in my map () and reduce () methods. The map function splits the input data into words and emits the value “1″ for each one. The reduce function then takes the emitted values associated with any given key (word) and sums them together, finally emitting the resulting value.
Finer Details
So, if you’ve got this far, you should have the basic outline of my MapReduce implementation. Now to fill in the details for the empty methods and sections I’ve so far glossed over. Just to recap, we need:
- A MapResult class that will link a specific key with a list of intermediate values from the map phase, and then the final result from the reduce phase.
- The MapPhase method that divides the input fileset up between map worker tasks.
- The worker task invocation of the user map () method, in the FUNCTION_MAP case section of the run () method.
- The EmitIntermediate method that allows the user map method to return values to the processor.
- The ReducePhase method that takes the intermediate keys and values and divides them up between a number of reduce worker tasks.
- The worker task invocation of the user reduce () method, in the FUNCTION_REDUCE case section of the run () method.
- The Emit method that allows the user reduce method to output the final result value for a given key.
- A PrintResults method to display all the output.
MapResult
public class MapResult
{
/** The key associated with the result set. Although not stricly necessary to store this here,
it's useful at a couple of points. */
private String key;
/** This list of values is the result of the map phase. Any time a map () method calls
EmitIntermediate (key, value), the value is added to this list.*/
private List <String> values;
/** This result is the combined result of all the values, as output by the reduce phase,
from the Emit (result) method call associated with this result set. */
private String result;
public MapResult (String _key)
{
key = _key;
values = Collections.synchronizedList (new LinkedList <String> ());
}
public void addValue (String _value)
{
values.add (_value);
}
public void setResult (String _result)
{
result = _result;
}
public String getKey ()
{
return key;
}
public String getResult ()
{
return result;
}
public Iterator<String> iterator ()
{
return values.iterator ();
}
public String toString ()
{
return key + ":"+result;
}
}
The getters and setters aside, the primary crux of this class is that it links together a single ‘key’ with a list of ‘values’. When EmitIntermediate (String key, String value) is called with a new key, a new MapResult object is created and added to the map_results hashtable. If EmitIntermediate is called with an existing key, the corresponding result is added to the list of the existing MapResult grabbed from the hashtable.
MapPhase
public void MapPhase ()
{
// Create the synchronised hashtable to handle all the map and reduce results
map_results = Collections.synchronizedMap (new HashMap<String, MapResult> ());
// For each input file, start a worker thread to perform the 'map' processing
int num_map_workers = files.length;
// Array to hold all the Thread objects in order to wait for them
Thread [] worker_threads = new Thread [num_map_workers];
// Kick off all the map threads, not the use of FUNCTION_MAP and the file object
for (int i = 0; i < num_map_workers; i++)
{
MapReduce worker = new MapReduce (FUNCTION_MAP, files [i]);
worker_threads [i] = new Thread (worker);
worker_threads [i].start ();
}
// Now wait for all the threads to complete
try
{
for (int i = 0; i < num_map_workers; i++)
{
worker_threads [i].join ();
}
}
catch (InterruptedException e)
{
}
}
The MapPhase method actually turns out to be pretty simple, due mostly to the decision to create one worker thread per input file. Java threads are pretty easy to work with, but when you add in the java.util.Collection objects like Hashtable, Map and List, it’s important to make sure you create synchronised ones where necessary.
run () with FUNCTION_MAP
case FUNCTION_MAP:
// Load the file into a string
String contents = GetFileContents (file);
// Invoke the map function
map (file.getName (), contents);
break;
So, with a set of worker threads now all running the run () method, and branching into the FUNCTION_MAP section of the case statement, each one needs to invoke the user map () function on the contents of one file.
EmitIntermediate
public void EmitIntermediate (String key, String value)
{
MapResult bucket;
synchronized (map_results)
{
bucket = map_results.get (key);
if (bucket == null)
{
bucket = new MapResult (key);
map_results.put (key, bucket);
}
}
bucket.addValue (value);
}
Finally, the last section of the map phase. First the map_results table is checked for a MapResult object matching the key. If one isn’t found, it’s created. Finally the value is added to the results list.
ReducePhase
Now things get a bit more complicated. This is probably the bit that took me the longest to get my head around, and indeed, I re-wrote the method twice before I was happy with it. It took a bit of debugging too to catch all the edge cases, and there’s some pretty heavy use of templated collection classes to handle.
public static final int NUM_REDUCE_WORKERS = 4;
public void ReducePhase ()
{
int i;
// Split the map_results hashtable up into individual chunks for a number of workers to reduce
int num_reduce_workers = NUM_REDUCE_WORKERS;
// Catch the too many workers case
if (num_reduce_workers > map_results.size ())
num_reduce_workers = map_results.size ();
// I'll call each workers section of the results table a 'chunk'
int chunk_size = map_results.size () / num_reduce_workers;
int records_remaining = map_results.size ();
// We need to iterate over all the elements in map_results
Iterator<MapResult> iterator = map_results.values ().iterator ();
// Each worker will have it's own section of the map_results table to work on
List<Map<String, MapResult>> reduce_chunks = new ArrayList<Map<String, MapResult>> (num_reduce_workers);
// Split up the map_results table into chunks for each worker to process.
int current_worker = 0;
while (records_remaining > 0)
{
// Catch the last worker case
if (records_remaining < chunk_size)
{
chunk_size = records_remaining;
}
Map<String, MapResult> worker_map = new HashMap<String, MapResult> (chunk_size);
reduce_chunks.add (worker_map);
for (int record = 0; record < chunk_size; record++)
{
MapResult m = iterator.next ();
worker_map.put (m.getKey (), m);
}
++current_worker;
records_remaining -= chunk_size;
}
// Now kick off the reduce workers
Thread [] worker_threads = new Thread [num_reduce_workers];
for (i = 0; i < num_reduce_workers; i++)
{
MapReduce worker = new MapReduce (FUNCTION_REDUCE, reduce_chunks.get (i));
worker_threads [i] = new Thread (worker);
worker_threads [i].start ();
}
// Now wait for all the threads to complete
try
{
for (i = 0; i < num_reduce_workers; i++)
{
worker_threads [i].join ();
}
}
catch (InterruptedException e)
{
}
}
What this method boils down to is splitting up the complete set of MapResult objects from the map_results table into appropriately sized chunks to pass to a fixed number of worker tasks. It’s handy that all the values for a given key are stored in a single MapResult object, so we don’t need to perform any sorting here, to make sure that each reduce task gets all the results for a single key, that happens automatically.
run () with FUNCTION_REDUCE
case FUNCTION_REDUCE:
Set <Map.Entry<String, MapResult>> results_set = results.entrySet ();
Iterator<Map.Entry<String, MapResult>> results_iterator = results_set.iterator ();
while (results_iterator.hasNext ())
{
Map.Entry<String, MapResult> result_entry = results_iterator.next ();
current_result = result_entry.getValue ();
reduce (current_result.key, current_result.iterator ());
}
break;
Each reduce worker gets a bunch of MapResult objects to process, and the user reduce () method will need to be called for each one. Plenty more templated collections again, but other than that, the interesting thing here is the use of the object member variable current_result. This allows the Emit method to tie the emitted result to the current MapResult object.
Emit
public void Emit (String result)
{
current_result.setResult (result);
}
Not much say really
PrintResults
Finally, after all the reduce workers have finished, we can see the output. The reduced result for each key will be contained within it’s MapResult object in the map_results table. As a test case, I ran the code over some Java source files, and my output function prints out the number of times the keyword “void” appears.
public void PrintResults ()
{
// Iterate over all the results once again
Iterator<MapResult> iterator = map_results.values ().iterator ();
while (iterator.hasNext ())
{
MapResult result = iterator.next ();
if (result.key.replaceAll(" ", "").equals ("void"))
{
System.out.println (result.getKey () + ": " + result.getResult ());
}
}
}
It’s interesting to note that if I wanted to speed up the process, I could do the keyword test in the map () function, and only Emit intermediate results for keys that equalled “void”. Then PrintResults could simply output the single MapResult object, and the reduce phase would only have needed a single worker thread.
Final Thoughts
So there you have it, a naive implementation of the map-reduce algorithm in Java. The things I learnt from writing the code were:
- Template classes and the Java Collections Framework are you’re friends.
- In a map-reduce implementation, great care must be taken in the initial file partitioning, and the intermediate result partitioning, before the user map and reduce methods are invoked.
If you’d like the complete source code, you can grab an archive here. I added some timing code to see how long things took, and a few more println’s. Maybe someone could use it as the basis of some utility or other in Java, simply implement your own map (), reduce () and PrintResults () methods.
A project I’m working on at the moment has a fairly complex build process in Java, in particular, the resource generation step processes a large set of 3D Studio Max files and generates scene data. Given a multicore machine, the resource generator performance might be improved by a framework like this, hmm…