Archive for June, 2009

The Forest of Unconstrained Functionality

Steve on Jun 24th 2009

Where do artists and designers go at the very early stages of a project, when they’re creating designs for a new game?

Gambolling

As the plans and designs for a new project are being drawn up, the programmers are still hard at work on the tail end of their current project, which will be the testing (QA) phase.  This is usually when programmers (or the lead programmer)  have the least spare time to cast their technical eyes over new designs or game proposals.

Off go the creative fellows, gambolling in the forests of unconstrained functionality.  The iPhone seems to have a particularly lush and thick forest, populated with accelerometer trees, gesture recognition glades, multi-touch creeks and camera roll clearings.  If you know you’re the technical lead on an upcoming project, make sure you keep an eye on what’s being promised to your clients and publishers.  New and innovative features may make a game stand out, but they’re going to be high risk and time consuming.  If you read a design document two months into the development cycle, as you sit down to implement a feature for your new game, only to discover what you thought would take 3 days requires gesture recognition and motion detection (and there’s no money for 3rd party libraries), you’ve only got yourself to blame!

Thanks to Fergus McNeill for the awesome poster.

Filed in General | No responses yet

The bleeding edge

Steve on Jun 13th 2009

Only one post in May, crikey, sorry about that but I’ve been so insanely busy at work that it’s really hard to find the time to add to the blog. I would humbly compare my bloginess to that of Jeff Minter. His blog is updated often with charming photos of sheep and days out in Wales, when he’s not busy crunching code.  When he’s working, blog updates become few and far between, but who can blame him? In my case, I hope the massive Map-Reduce article makes up for it a bit though, and I thought I’d follow up on my new PC build too.

Building a new PC -Update

I don’t usually class myself as an early adopter in most areas, but when it comes to a building a new PC, one might as well get the latest kit. I thought I was following a recommended list of hardware but it turned out the RAM I ordered wasn’t exactly the same as the set Custom PC magazine had listed. After two sets of the Corsair 6GB kits wouldn’t work with the Gigabyte motherboard, but an uber cheap Dabs value 1GB stick booted fine, I knew the memory wasn’t compatible. So, after browsing some forums, I plumped for the somewhat more expensive, but known to work, Crucial Ballistix Tracer 6GB kit, and everything works a treat now. I also bought a second 1TB Samsung hard drive since they are such good value, and have a RAID 0 configuration for performance. Ubuntu Linux 9.04 installed from my USB drive without a hickup and it runs like a demon. I love the fact that the Gnome CPU speed taskbar applet opened up 8 tabs for all the CPU cores, bonkers! The flashing lights on the RAM also provide a nice ambience where they shine out of the case vents and fan holes.

Filed in General | No responses yet

MapReduce Implementation

Steve on Jun 3rd 2009

Reading the Google paper on map-reduce really peaked my interest in learning more about it.  The Apache Hadoop website has some good reading, and Google of course turns up lots of other info.  Reading is all well and good, but the best way to learn about a concept or algorithm is to actually implement it.

As a programmer, nothing helps you learn a subject better than implementing it.  Want to understand the PNG image format?  Write a encoder/decoder.  3DS Max Ascii export format?  Implement a scene playback engine for ASE files.  A* pathfinding?  Code a maze game with some AI controlled Minotaur that wanders around it.  I decided that I needed to write a map-reduce system, not just a map-reduce program to plug into an existing engine, but the actual runtime code that executes a map-reduce function over a set of files.

So, here it is, in Java, from a blank page to a working (albeit naive) map-reduce implementation in under 500 lines of code. If you want to grab the complete source, here it is.

Outline

After I finished writing this post, I looked at it and though, crikey, that’s a lot of text to wade through!  So I drew a diagram of the process my code implements.  Hopefully, it’ll be a useful reference if you need to visualise any of the data structures or the steps in the process.

MapReduce Outline

MapReduce Outline

First is the functional outline of the MapReduce class.  A single go() method. I know it will need to handle some command line arguments to create a list of file to process, then coordinate the map and reduce phases.

public class MapReduce implements Runnable
{
   /** All the files to process, as specified on the command line */
   public static File [] files;

   /** A hashtable of MapResult objects that are created by values emitted for a given key */
   public static Map <String, MapResult> map_results;

   /** The main method is the static entry point for command line Java programs */
   public static void main(String [] args)
   {
      // Usage information
      if (args == null || args.length == 0)
      {
         System.out.println ("Usage: java MapReduce <filespec>");
         System.out.println ("       Process a set of files listed by <filespec> using a trivial MapReduce implementation.");
         System.exit (1);
      }

      // Out of the static context and off we go
      MapReduce me = new MapReduce ();
      me.go (args);
   }

   /** Empty constructor */
   public MapReduce ()
   {
   }

   /** The go method gets a list of files to process and invokes the map and reduce phases */
   public void go (String [] args)
   {
      GetFiles (args);
      MapPhase ();
      ReducePhase ();
      PrintResults ();
   }

   /** Create a worker thread to process each input file and run the map function over the data */
   public void MapPhase ()
   {
      // implement the map phase of the processing
   }

   /** Create a set of worker threads to perform the reduce function over the intermediate results of the map operation */
   public void ReducePhase ()
   {
      // implement the reduce phase of the processing
   }
}

GetFiles just processes the command line arguments and comes up with an array of java.io.File objects.  The map phase needs to take each of these files, read it’s contents and pass the contents to a set of worker tasks that execute the map () method on each files data.  The first cornerstone of map-reduce is this splitting of the input files and processing by a distinct set of separate worker tasks.  A full implementation would have worker tasks partitioned across multiple processors in a cluster, and a distributed filesystem, but that’s a bit beyond an afternoons programming!

I’m simply going to create java.lang.Thread objects,  one for each of the input files to perform the map function.   The reduce phase needs a bit more work to determine the number of threads to use, but I’ll again create a bunch of them for the job.  Notice the MapReduce class implements the java.lang.Runnable interface, I’m going to use the class as the basis for both the map and reduce worker threads.  To do that, the class will need a run () method, and a way to designate running threads as map or reduce workers so the code branch can be distinct for each type.

   //
   // Worker task methods
   //

   /** Worker task will perform the map function */
   public static final int FUNCTION_MAP = 0;

   /** Worker task will perform the reduce function */
   public static final int FUNCTION_REDUCE = 1;

   /** Variable set to either FUNCTION_MAP or FUNCTION_REDUCE */
   protected int function;

   /** For a map worker, this is the file to operate on */
   protected File file;

   /** Results of the map function calls to EmitIntermediate (), stored in a table */
   protected Map<String, MapResult> results;

   public MapReduce (int _function, File _file)
   {
      function = _function;
      file = _file;
   }

   public MapReduce (int _function, Map<String, MapResult> _results)
   {
      function = _function;
      results = _results;
   }

   public void run ()
   {
      switch (function)
      {
         case FUNCTION_MAP:
            // invoke users map function here
            break;

         case FUNCTION_REDUCE:
            // invoke users reduce function here
            break;
      }
   }

You can see from the constructors there that the first is meant for the map phase, accepting a File object along with the function type, and the second for the reduce phase accepting a java.util.Map object.  Don’t worry about the MapResult class yet, I’ll get to that shortly.  I also don’t really need to pass the function type to the constructor here, it could be deduced, but it makes the code more obvious in the MapPhase () and ReducePhase () methods.

Finally we need the user map and reduce methods, and the output methods these use to emit results.  The map function uses an EmitIntermediate method to output key-value pairs as it processes the data in it’s input file.  The reduce function uses an Emit method to output a single result given all the previously emitted values for a given key.  Like the Google paper, all these methods use String objects for keys and values, and we can use the handy java.util.Iterator for the list of values for the reduce method.

  public void EmitIntermediate (String key, String value)
  {
      // Store the intermdiate key-value pair for later sorting and 'reduction'.
      // This is done by getting a MapResult object from the map_results hashtable,
      // or creating a new one, and adding this emmitted value to it.
   }
   public void Emit (String result)
   {
      // Output this result for the key the reduce function is currently handling.
      // Do this by getting the MapResult object from the map_results table
      // for the current key and seting it's result.
   }
   // key: document name
   // value: document contents
   public void map (String key, String value)
   {
      // Split value into words
      String [] words = value.split (" ");
      // Emit '1' associated with each word
      for (String word: words)
      {
         EmitIntermediate (word, "1");
      }
   }
   // key: a word
   // values: a list of counts
   public void reduce (String key, Iterator<String> values)
   {
      int result = 0;
      while (values.hasNext ())
      {
         String v = values.next ();
         result += Integer.parseInt (v);
      }
      Emit (result + "");
   }

I’ve implemented the simple word count map-reduce example in my map () and reduce () methods.  The map function splits the input data into words and emits the value “1″ for each one.  The reduce function then takes the emitted values associated with any given key (word) and sums them together, finally emitting the resulting value.

Finer Details

So, if you’ve got this far, you should have the basic outline of my MapReduce implementation.  Now to fill in the details for the empty methods and sections I’ve so far glossed over.  Just to recap, we need:

  1. A MapResult class that will link a specific key with a list of intermediate values from the map phase, and then the final result from the reduce phase.
  2. The MapPhase method that divides the input fileset up between map worker tasks.
  3. The worker task invocation of the user map () method, in the FUNCTION_MAP case section of the run () method.
  4. The EmitIntermediate method that allows the user map method to return values to the processor.
  5. The ReducePhase method that takes the intermediate keys and values and divides them up between a number of reduce worker tasks.
  6. The worker task invocation of the user reduce () method, in the FUNCTION_REDUCE case section of the run () method.
  7. The Emit method that allows the user reduce method to output the final result value for a given key.
  8. A PrintResults method to display all the output.

MapResult

   public class MapResult
   {
      /** The key associated with the result set.  Although not stricly necessary to store this here,
          it's useful at a couple of points. */
      private String key;

      /** This list of values is the result of the map phase.  Any time a map () method calls
          EmitIntermediate (key, value), the value is added to this list.*/
      private List <String> values;

      /** This result is the combined result of all the values, as output by the reduce phase,
          from the Emit (result) method call associated with this result set. */
      private String result;

      public MapResult (String _key)
      {
         key = _key;
         values = Collections.synchronizedList (new LinkedList <String> ());
      }

      public void addValue (String _value)
      {
         values.add (_value);
      }

      public void setResult (String _result)
      {
         result = _result;
      }

      public String getKey ()
      {
         return key;
      }

      public String getResult ()
      {
         return result;
      }

      public Iterator<String> iterator ()
      {
         return values.iterator ();
      }

      public String toString ()
      {
         return key + ":"+result;
      }
   }

The getters and setters aside, the primary crux of this class is that it links together a single ‘key’ with a list of ‘values’.  When EmitIntermediate (String key, String value) is called with a new key, a new MapResult object is created and added to the map_results hashtable. If EmitIntermediate is called with an existing key, the corresponding result is added to the list of the existing MapResult grabbed from the hashtable.

MapPhase

   public void MapPhase ()
   {
      // Create the synchronised hashtable to handle all the map and reduce results
      map_results = Collections.synchronizedMap (new HashMap<String, MapResult> ());

      // For each input file, start a worker thread to perform the 'map' processing
      int num_map_workers = files.length;

      // Array to hold all the Thread objects in order to wait for them
      Thread [] worker_threads = new Thread [num_map_workers];

      // Kick off all the map threads, not the use of FUNCTION_MAP and the file object
      for (int i = 0; i < num_map_workers; i++)
      {
         MapReduce worker = new MapReduce (FUNCTION_MAP, files [i]);
         worker_threads [i] = new Thread (worker);
         worker_threads [i].start ();
      }

      // Now wait for all the threads to complete
      try
      {
         for (int i = 0; i < num_map_workers; i++)
         {
            worker_threads [i].join ();
         }
      }
      catch (InterruptedException e)
      {
      }
   }

The MapPhase method actually turns out to be pretty simple, due mostly to the decision to create one worker thread per input file.  Java threads are pretty easy to work with, but when you add in the java.util.Collection objects like Hashtable, Map and List, it’s important to make sure you create synchronised ones where necessary.

run () with FUNCTION_MAP

         case FUNCTION_MAP:

            // Load the file into a string
            String contents = GetFileContents (file);

            // Invoke the map function
            map (file.getName (), contents);

            break;

So, with a set of worker threads now all running the run () method, and branching into the FUNCTION_MAP section of the case statement, each one needs to invoke the user map () function on the contents of one file.

EmitIntermediate

   public void EmitIntermediate (String key, String value)
   {
      MapResult bucket;

      synchronized (map_results)
      {
         bucket = map_results.get (key);
         if (bucket == null)
         {
            bucket = new MapResult (key);
            map_results.put (key, bucket);
         }
      }

      bucket.addValue (value);
   }

Finally, the last section of the map phase.  First the map_results table is checked for a MapResult object matching the key.  If one isn’t found, it’s created.  Finally the value is added to the results list.

ReducePhase

Now things get a bit more complicated.  This is probably the bit that took me the longest to get my head around, and indeed, I re-wrote the method twice before I was happy with it.  It took a bit of debugging too to catch all the edge cases, and there’s some pretty heavy use of templated collection classes to handle.

   public static final int NUM_REDUCE_WORKERS = 4;
   public void ReducePhase ()
   {
      int i;

      // Split the map_results hashtable up into individual chunks for a number of workers to reduce
      int num_reduce_workers = NUM_REDUCE_WORKERS;

      // Catch the too many workers case
      if (num_reduce_workers > map_results.size ())
         num_reduce_workers = map_results.size ();

      // I'll call each workers section of the results table a 'chunk'
      int chunk_size = map_results.size () / num_reduce_workers;
      int records_remaining = map_results.size ();

      // We need to iterate over all the elements in map_results
      Iterator<MapResult> iterator = map_results.values ().iterator ();

      // Each worker will have it's own section of the map_results table to work on
      List<Map<String, MapResult>> reduce_chunks = new ArrayList<Map<String, MapResult>> (num_reduce_workers);

      // Split up the map_results table into chunks for each worker to process.
      int current_worker = 0;
      while (records_remaining > 0)
      {
         // Catch the last worker case
         if (records_remaining < chunk_size)
         {
            chunk_size = records_remaining;
         }

         Map<String, MapResult> worker_map = new HashMap<String, MapResult> (chunk_size);
         reduce_chunks.add (worker_map);

         for (int record = 0; record < chunk_size; record++)
         {
            MapResult m = iterator.next ();
            worker_map.put (m.getKey (), m);
         }
         ++current_worker;
         records_remaining -= chunk_size;
      }

      // Now kick off the reduce workers
      Thread [] worker_threads = new Thread [num_reduce_workers];

      for (i = 0; i < num_reduce_workers; i++)
      {
         MapReduce worker = new MapReduce (FUNCTION_REDUCE, reduce_chunks.get (i));
         worker_threads [i] = new Thread (worker);
         worker_threads [i].start ();
      }

      // Now wait for all the threads to complete
      try
      {
         for (i = 0; i < num_reduce_workers; i++)
         {
            worker_threads [i].join ();
         }
      }
      catch (InterruptedException e)
      {
      }
   }

What this method boils down to is splitting up the complete set of MapResult objects from the map_results table into appropriately sized chunks to pass to a fixed number of worker tasks.  It’s handy that all the values for a given key are stored in a single MapResult object, so we don’t need to perform any sorting here, to make sure that each reduce task gets all the results for a single key, that happens automatically.

run () with FUNCTION_REDUCE

         case FUNCTION_REDUCE:

            Set <Map.Entry<String, MapResult>> results_set = results.entrySet ();
            Iterator<Map.Entry<String, MapResult>> results_iterator = results_set.iterator ();

            while (results_iterator.hasNext ())
            {
               Map.Entry<String, MapResult> result_entry = results_iterator.next ();
               current_result = result_entry.getValue ();
               reduce (current_result.key, current_result.iterator ());
            }

            break;

Each reduce worker gets a bunch of MapResult objects to process, and the user reduce () method will need to be called for each one.  Plenty more templated collections again,  but other than that, the interesting thing here is the use of the object member  variable current_result.  This allows the Emit method to tie the emitted result to the current MapResult object.

Emit

   public void Emit (String result)
   {
      current_result.setResult (result);
   }

Not much say really :-)

PrintResults

Finally, after all the reduce workers have finished, we can see the output.  The reduced result for each key will be contained within it’s MapResult object in the map_results table.  As a test case, I ran the code over some Java source files, and my output function prints out the number of times the keyword “void” appears.

   public void PrintResults ()
   {
      // Iterate over all the results once again
      Iterator<MapResult> iterator = map_results.values ().iterator ();

      while (iterator.hasNext ())
      {
         MapResult result = iterator.next ();
         if (result.key.replaceAll(" ", "").equals ("void"))
         {
            System.out.println (result.getKey () + ": " + result.getResult ());
         }
      }
   }

It’s interesting to note that if I wanted to speed up the process, I could do the keyword test in the map () function, and only Emit intermediate results for keys that equalled “void”.  Then PrintResults could simply output the single MapResult object, and the reduce phase would only have needed a single worker thread.

Final Thoughts

So there you have it, a naive implementation of the map-reduce algorithm in Java.  The things I learnt from writing the code were:

  1. Template classes and the Java Collections Framework are you’re friends.
  2. In a map-reduce implementation, great care must be taken in the initial file partitioning, and the intermediate result partitioning, before the user map and reduce methods are invoked.

If you’d like the complete source code, you can grab an archive here.  I added some timing code to see how long things took, and a few more println’s.  Maybe someone could use it as the basis of some utility or other in Java, simply implement your own map (), reduce () and PrintResults () methods.

A project I’m working on at the moment has a fairly complex build process in Java, in particular, the resource generation step processes a large set of 3D Studio Max files and generates scene data.  Given a multicore machine, the resource generator performance might be improved by a framework like this, hmm…

Filed in General | No responses yet