This document presents an overview of the MapReduce Python API. It consists of the following sections:
- Downloading the MapReduce library
- Features and capabilities
- Instantiating a MapReduce pipeline
- Starting a MapReduce job
- Showing the MapReduce status monitor
- Determining when a MapReducePipeline job is complete
Downloading the MapReduce library
Currently, to use the MapReduce API, you can either download the package from PyPi and install it into your app's library folder as follows:
pip install GoogleAppEngineMapReduce -t <your_app_directory/lib>
Or you can use SVN Checkout as follows:
svn checkout http://appengine-mapreduce.googlecode.com/svn/trunk/python/src/mapreduce
To enable MapReduce framework in the app add the following include to the app.yaml configuration file:
includes:
- mapreduce/include.yaml
Features and capabilities
The App Engine adaptation of Google's MapReduce model is optimized for the needs of the App Engine environment, where resource quota management is a key consideration. This release of the MapReduce API provides the following features and capabilities:
- Automatic sharding for faster execution, allowing you to use as many workers as you need to get your results faster
- Standard data input readers for iterating over blob and datastore data.
- Standard output writers
- Status pages to let you see how your jobs are running
- Processing rate limiting to slow down your mapper functions and space out the work, helping you avoid exceeding your resource quotas
Instantiating a MapReduce pipeline
In your code, you instantiate a
MapReducePipeline
object inside the
run
method of a
PipelineBase
object as follows:
class WordCountPipeline(base_handler.PipelineBase):
def run(self, filekey, blobkey):
logging.debug("filename is %s" % filekey)
output = yield mapreduce_pipeline.MapreducePipeline(
"word_count",
"main.word_count_map",
"main.word_count_reduce",
"mapreduce.input_readers.BlobstoreZipInputReader",
"mapreduce.output_writers.FileOutputWriter",
mapper_params={
"input_reader": {
"blob_key": blobkey,
},
},
reducer_params={
"output_writer": {
"mime_type": "text/plain",
"output_sharding": "input",
"filesystem": "blobstore",
},
},
shards=16)
yield StoreOutput("WordCount", filekey, output)
The following arguments are supplied to the
MapReducePipeline
object's
run
method:
- The name of the MapReduce job, for display in the user interface and in any logs
- The mapper function to use
- The reducer function to use
- The input reader to use to supply the mapper function with data
- The output writer for the reducer function to use
- The parameters (if any) to supply to the input reader
- The parameters (if any) to supply to the output writer
- The number of shards (workers) to use for the MapReduce job
Note the use of the
output_sharding
parameter in the example above. This enables the
shard
retry feature
.
You must write your own mapper and reducer functions. (The shuffler feature is built in and you don't invoke it explicitly.) You can use the standard data input readers and output writers (
BlobstoreZipInputReader
and
FileOutputWriter
in the example).
Starting a MapReduce job
To start a MapReduce job using the
MapReducePipeline
object, you invoke the
Pipeline
base class's
start
method on it, as shown below:
def post(self):
filekey = self.request.get("filekey")
blob_key = self.request.get("blobkey")
if self.request.get("word_count"):
pipeline = WordCountPipeline(filekey, blob_key)
pipeline.start()
Showing the MapReduce status monitor
If you wish, you can display a status monitor for your MapReduce jobs, as follows:
def post(self):
filekey = self.request.get("filekey")
blob_key = self.request.get("blobkey")
if self.request.get("word_count"):
pipeline = WordCountPipeline(filekey, blob_key)
pipeline.start()
redirect_url = "%s/?status?root=%s" % (pipeline.base_path,
pipeline.pipeline_id)
self.redirect(redirect_url)
Determining when a MapReducePipeline job is complete
To find out whether your MapReduce job is complete, you need to save the pipeline ID when you start the MapReduce job, as shown in the following MapReduce pipeline code:
class StartMapreduce(webapp2.RequestHandler):
def get(self):
pipeline = mapreduce_pipeline.MapreducePipeline(arguments)
pipeline.start()
self.redirect("/wait?pipeline=" + pipeline.pipeline_id)
Notice the redirect above where the pipeline ID is saved.
Then in the handler where you want to do some work when the MapReduce job is complete, you get the MapReduce pipeline using the saved pipeline ID, and you check it to determine whether it is done.
class WaitHandler(webapp2.RequestHandler):
def get(self):
pipeline_id = self.request.get("pipeline")
pipeline = mapreduce_pipeline.MapreducePipeline.from_id(pipeline_id)
if pipeline.has_finalized:
# MapreducePipeline has completed
pass
else:
# MapreducePipeline is still running
pass
As shown above, the MapReducePipeline
has_finalized
method is used to check for a completed job.