This page describes how to write the code for a MapReduce or Map job. Before you run a job, you must also add the MapReduce library and additional information to your app's configuration files. This is described in Configuring your Project .
Data processing classes
You must create instances of the classes that correspond to the various stages and data flow in a job. The MapReduce library defines a collection of classes that you can use directly, or subclass to customize the behavior for your particular application.
Every Map job requires these classes:
- Input - reads records from a particular type of input source. You can use one of the existing subclasses or write your own.
- MapOnlyMapper - performs the map stage. This is an abstract class, you must create your own subclass.
-
Output
- writes results to a particular type of output target. You can use one of the existing subclasses or write your own. The
NoOutput
class is provided for Map jobs that do not produce output.
Every MapReduce job requires these classes:
- Input - reads records from a particular type of input source. You can use one of the existing subclasses or write your own.
- Mapper - performs the map stage. This is an abstract class, you must create your own subclass.
- Reducer - performs the reduce stage. Reduce is an abstract class, you must create your own subclass.
- Output - writes results to a particular type of output target. You can use one of the existing subclasses or write your own.
- Marshallers - handles the serialization of keys and values between the map and reduce stages. You can use one of the existing subclasses or write your own.
You must be sure that the classes you use can work together properly. Specifically:
- The type of data produced by the input and consumed by the mapper should be the same.
- The type of data produced by the reducer (or mapper for Map jobs) and consumed by the output should be the same.
- The types used for the keys and values that are emitted by the mapper and received by the reducer should be the same.
Job specification
The job specification names the job and identifies the classes that implement its stages. There are separate
specification classes for the two types of jobs, each has an associated
Builder
nested class that you use to create the specification.
For a Map job, use
MapSpecification.Builder
:
MapSpecification<Long, Entity, Void> spec = new MapSpecification.Builder<>(
new ConsecutiveLongInput(0, entities, shardCount),
new EntityCreator(datastoreType, bytesPerEntity),
new DatastoreOutput())
.setJobName("Create MapReduce entities")
.build();
For a MapReduce job, use
MapReduceSpecification.Builder
:
MapReduceSpecification<Long, Integer, Integer, ArrayList<Integer>, GoogleCloudStorageFileSet>
spec = new MapReduceSpecification.Builder<>(input, mapper, reducer, output)
.setKeyMarshaller(intermediateKeyMarshaller)
.setValueMarshaller(intermediateValueMarshaller)
.setJobName("DemoMapreduce")
.setNumReducers(shards)
.build();
Note that the documentation for these methods specifies generic types that appear in the arguments declarations. This enforces the compatibility requirement between classes that was mentioned above.
Job settings
The job settings specify parameters that control a job's performance. There are separate
settings classes for the two types of jobs, each has an associated
Builder
nested class that you use to create the settings.
For a Map job, use
MapSettings
:
MapSettings settings = new MapSettings.Builder()
.setWorkerQueueName("mapreduce-workers")
.setModule("mapreduce")
.build();
For a MapReduce job, use
MapReduceSettings
:
MapReduceSettings settings = new MapReduceSettings.Builder()
.setBucketName(bucket)
.setWorkerQueueName(queue)
.setModule(module) // if queue is null will use the current queue or "default" if none
.build();
The
Builder
set methods that you'll use most often are:
-
setModule()
- specifies the module in which the MapReduce job will run. If you do not set a module, the job will run in the module that callsMapReduce.start()
. Multiple jobs can use the same module. -
setWorkerQueueName()
- specifies a task queue. -
setBucketName()
- specifies a Google Cloud Storage bucket (used for MapReduce jobs only). If no bucket name is set, the job uses the GCS default bucket.
Running the job
Once the job's specification and settings are defined, you can create and start
the job in one step, using the static
start()
method on the appropriate job
class.
For Map jobs, use
MapJob.start()
:
String id = MapJob.start(mapSpec, settings);
For MapReduce jobs, use
MapReduceJob.start()
:
String id = MapReduceJob.start(mapReduceSpec, settings);
The start call returns a job ID. You can use the ID in a url of the form
http://<app-id>.appspot.com/_ah/pipeline/status.html?root=<job-id>
to
display information
about your job.