Please note that the contents of this offline web site may be out of date. To access the most recent documentation visit the online version .
Note that links that point to online resources are green in color and will open in a new window.
We would love it if you could give us feedback about this material by filling this form (You have to be online to fill it)



Using the MapReduce Library

This document presents an overview of the MapReduce Python API. It consists of the following sections:

  1. Downloading the MapReduce library
  2. Features and capabilities
  3. Instantiating a MapReduce pipeline
  4. Starting a MapReduce job
  5. Showing the MapReduce status monitor
  6. Determining when a MapReducePipeline job is complete

Downloading the MapReduce library

Currently, to use the MapReduce API, you can either download the package from PyPi and install it into your app's library folder as follows:

pip install GoogleAppEngineMapReduce -t <your_app_directory/lib>

Or you can use SVN Checkout as follows:

svn checkout http://appengine-mapreduce.googlecode.com/svn/trunk/python/src/mapreduce

To enable MapReduce framework in the app add the following include to the app.yaml configuration file:

includes:
- mapreduce/include.yaml

Features and capabilities

The App Engine adaptation of Google's MapReduce model is optimized for the needs of the App Engine environment, where resource quota management is a key consideration. This release of the MapReduce API provides the following features and capabilities:

  • Automatic sharding for faster execution, allowing you to use as many workers as you need to get your results faster
  • Standard data input readers for iterating over blob and datastore data.
  • Standard output writers
  • Status pages to let you see how your jobs are running
  • Processing rate limiting to slow down your mapper functions and space out the work, helping you avoid exceeding your resource quotas

Instantiating a MapReduce pipeline

In your code, you instantiate a MapReducePipeline object inside the run method of a PipelineBase object as follows:

class WordCountPipeline(base_handler.PipelineBase):

    def run(self, filekey, blobkey):
        logging.debug("filename is %s" % filekey)
        output = yield mapreduce_pipeline.MapreducePipeline(
            "word_count",
            "main.word_count_map",
            "main.word_count_reduce",
            "mapreduce.input_readers.BlobstoreZipInputReader",
            "mapreduce.output_writers.FileOutputWriter",
            mapper_params={
                "input_reader": {
                    "blob_key": blobkey,
                },
            },
            reducer_params={
                "output_writer": {
                    "mime_type": "text/plain",
                    "output_sharding": "input",
                    "filesystem": "blobstore",
                },
            },
            shards=16)
        yield StoreOutput("WordCount", filekey, output)

The following arguments are supplied to the MapReducePipeline object's run method:

  • The name of the MapReduce job, for display in the user interface and in any logs
  • The mapper function to use
  • The reducer function to use
  • The input reader to use to supply the mapper function with data
  • The output writer for the reducer function to use
  • The parameters (if any) to supply to the input reader
  • The parameters (if any) to supply to the output writer
  • The number of shards (workers) to use for the MapReduce job

Note the use of the output_sharding parameter in the example above. This enables the shard retry feature .

You must write your own mapper and reducer functions. (The shuffler feature is built in and you don't invoke it explicitly.) You can use the standard data input readers and output writers ( BlobstoreZipInputReader and FileOutputWriter in the example).

Starting a MapReduce job

To start a MapReduce job using the MapReducePipeline object, you invoke the Pipeline base class's start method on it, as shown below:

def post(self):
    filekey = self.request.get("filekey")
    blob_key = self.request.get("blobkey")

    if self.request.get("word_count"):
        pipeline = WordCountPipeline(filekey, blob_key)
        pipeline.start()

Showing the MapReduce status monitor

If you wish, you can display a status monitor for your MapReduce jobs, as follows:

def post(self):
    filekey = self.request.get("filekey")
    blob_key = self.request.get("blobkey")

    if self.request.get("word_count"):
        pipeline = WordCountPipeline(filekey, blob_key)
        pipeline.start()

        redirect_url = "%s/?status?root=%s" % (pipeline.base_path,
                                               pipeline.pipeline_id)
        self.redirect(redirect_url)

Determining when a MapReducePipeline job is complete

To find out whether your MapReduce job is complete, you need to save the pipeline ID when you start the MapReduce job, as shown in the following MapReduce pipeline code:

class StartMapreduce(webapp2.RequestHandler):
    def get(self):
        pipeline = mapreduce_pipeline.MapreducePipeline(arguments)
        pipeline.start()
        self.redirect("/wait?pipeline=" + pipeline.pipeline_id)

Notice the redirect above where the pipeline ID is saved.

Then in the handler where you want to do some work when the MapReduce job is complete, you get the MapReduce pipeline using the saved pipeline ID, and you check it to determine whether it is done.

class WaitHandler(webapp2.RequestHandler):
    def get(self):
        pipeline_id = self.request.get("pipeline")
        pipeline = mapreduce_pipeline.MapreducePipeline.from_id(pipeline_id)
        if pipeline.has_finalized:
            # MapreducePipeline has completed
            pass
        else:
            # MapreducePipeline is still running
            pass

As shown above, the MapReducePipeline has_finalized method is used to check for a completed job.

Authentication required

You need to be signed in with Google+ to do that.

Signing you in...

Google Developers needs your permission to do that.