Please note that the contents of this offline web site may be out of date. To access the most recent documentation visit the online version .
Note that links that point to online resources are green in color and will open in a new window.
We would love it if you could give us feedback about this material by filling this form (You have to be online to fill it)



MapReduce for App Engine

MapReduce is a programming model for processing large amounts of data in a parallel and distributed fashion. It is useful for large, long-running jobs that cannot be handled within the scope of a single request, tasks like:

  • Analyzing application logs
  • Aggregating related data from external sources
  • Transforming data from one format to another
  • Exporting data for external analysis

With the App Engine MapReduce library, your code can run efficiently and scale automatically. App Engine takes care of the details of partitioning the input data, scheduling execution across a set of machines, handling failures, and reading/writing to the Google Cloud platform.

  1. Overview
  2. Stages
  3. Sharding: Parallel Processing
  4. Slicing: Distributed Scheduling and Fault Tolerance

Overview

App Engine MapReduce is an open-source library that is built on top of App Engine services, including Datastore and Task Queues. You must download the MapReduce library and include it with your application. The library provides:

  • A programming model for large-scale distributed data processing
  • Automatic parallelization and distribution within your existing codebase
  • Access to Google-scale data storage
  • I/O scheduling
  • Fault-tolerance, handling of exceptions
  • User-tunable settings to optimize for speed/cost
  • Tools for monitoring status

There are no usage charges associated with the MapReduce library. As with any App Engine application, you are charged for any App Engine resources that the library or your MapReduce code consumes (beyond the free quotas) while running your job. These can include instance hours, Datastore and Google Cloud Storage usage, network, and other storage.

Stages

A MapReduce job has three stages: map, shuffle, and reduce. Each stage in the sequence must complete before the next one can run. Intermediate data is stored temporarily between the stages.

The data flow for a MapReduce job looks like this:

MapReduce pipeline

Map

The MapReduce library includes a Mapper class that performs the map stage. The map stage uses an input reader that delivers data one record at a time. The library also contains a collection of Input classes that implement readers for common types of data. You can also create your own reader, if needed.

The map stage uses a map() function that you must implement. When the map stage runs, it repeatedly calls the reader to get one input record at a time and applies the map() function to the record.

The implementation of the map() function depends on the kind of job you are running. When used in a Map job, the map() function emits output values. When used in a map reduce job, the map() function emits key-value pairs for the shuffle stage.

When emitting pairs for a MapReduce job, the keys do not have to be unique. The same key can appear in many pairs. For example, assume the input is a dog database that contains records listing license id, breed, and name:

14877 poodle muffy
88390 beagle dotty
73205 collie pancakes
95782 beagle esther
77865 collie lassie
75093 poodle albert
24798 poodle muffy
13334 collie lassie

A MapReduce job that computes the most popular name for each breed has a map() function that pairs each dog's name with its breed and emits these pairs:

(poodle, muffy)
(beagle, dotty)
(collie, pancakes)
(beagle, esther)
(collie, lassie)
(poodle, albert)
(poodle, muffy)
(collie, lassie)

Shuffle

The shuffle stage first groups all the pairs with the same key together:

(poodle, muffy)
(poodle, albert)
(poodle, muffy)
(beagle, dotty)
(beagle, esther)
(collie, pancakes)
(collie, lassie)
(collie, lassie)

and then outputs a single list of values for each key:

(poodle, [muffy, albert, muffy])
(beagle, [dotty, esther])
(collie, [pancakes, lassie, lassie])

If the same key-value pair occurs more than once, the associated value will appear multiple times in the shuffle output for that key. Also note that the list of values is not sorted.

The shuffle stage uses a Google Cloud Storage bucket, either the default bucket or one that you can specify in your setup code.

Reduce

The MapReduce library includes a Reducer class that performs the reduce stage. The reduce stage uses a reduce() function that you must implement. When this stage executes, the reduce() function is called for each unique key in the shuffled intermediate data set. The reduce function takes a key and the list of values associated with that key and emits a new value based on the input. For example, a reduce function that determines the most popular name for each breed would take the input:

(collie, [pancakes, lassie, lassie])

count the number of times each unique name appears in the list, and output the string:

collie: lassie

The reduce output is passed to the output writer. The MapReduce library includes a collection of Output classes that implement writers for common types of output targets. You can also create your own writer, if needed.

Sharding: Parallel Processing

Sharding divides the input of a stage into multiple data sets ( shards ) that are processed in parallel. This can significantly improve the time it takes to run a stage. When running a MapReduce job, all the shards in a stage must finish before the next stage can run.

When a map stage runs, each shard is handled by a separate instance of the Mapper class, with its own input reader. Similarly, for a reduce stage, each shard is handled by a separate instance of the Reducer class with its own output writer. The shuffle stage also shards its input, but without using any user-specified classes.

The number of shards used in each stage can be different. The implementation of the input and output classes determines the number of map and reduce shards respectively. The diagram below shows the map stage handling its input in three shards, and the reduce stage using two shards.

Sharding

Slicing: Distributed Scheduling and Fault Tolerance

The data in a shard is processed sequentially. The job executes a consecutive series of tasks using an App Engine task queue, one task at a time per shard. When a task runs, it reads data from the associated shard and calls the appropriate function (map, shuffle, or reduce) as many times as possible in a configurable time interval. The data processed in a task is called a slice . The amount of data consumed in a slice can vary, depending on how quickly the function processes its input. When a slice is completed, another task is enqueued for the next slice on the shard. The process repeats until all data in the shard has been processed. The diagram below shows a task in the map stage consuming a slice of a shard with repeated read/map calls:

Single Slice

The tasks for all shards are placed in a single task queue. App Engine dynamically determines how many instances of a module to spin up in order to handle the task load. The number of instances may change while a stage is running. The diagram below shows a moment in time during the map stage when only two instances of the module running the Map are handling the three current tasks (t1, t2, t3) associated with the shards.

Slicing

The use of task queues, along with dynamic instance scaling, helps to distribute the workload efficiently and transparently. Dividing execution into slices also offers a level of fault tolerance. Without slicing, if an error occurs while processing a shard, that entire shard would need to be re-run. With execution broken into slices, it is possible to detect the failure of a slice and attempt to re-run the slice a number of times before declaring a complete failure of the shard, possibly starting the shard again, or failing the entire job.

Authentication required

You need to be signed in with Google+ to do that.

Signing you in...

Google Developers needs your permission to do that.