This page describes the major classes in the MapReduce library.
The Input class
When you define a job, you specify an
Input
class that reads a particular type of input and returns records to the map stage. The MapReduce library contains several
Input
classes that handle data from different types of sources.
Each type of
Input
class is paired with a corresponding subclass of
InputReader
. The MapReduce library calls the
Input
class method
createReaders()
, which determines how many map shards are needed (the
shard count
) and creates an instance of
InputReader
for each shard. Each reader is initialized to process the non-overlapping subset of data from its shard. An
Input
class may explicitly set the shard count, or it can compute the count by analyzing the input data.
The available
Input
classes can be found in the MapReduce library's
inputs package
. Some of the most often-used classes are:
- BlobstoreInput - Reads an object from your app’s blobstore, separating the input into records (byte arrays) that are delineated by a separator, which is a single byte. You declare the shard count, and the input is divided into equal-sized shards. A byte array is passed to the Mapper. (A similar class, CloudStorageLineInput shards files in Google Cloud Storage on separator boundaries.)
- DatastoreInput - Reads all entities of the specified kind from your app’s datastore. The shard count is given by the user. An Entity object is passed to the Mapper.
- ConsecutiveLongInput - Generates a specified number of consecutive long numbers with each shard using a different starting offset to prevent duplicates. A Long object is passed to the Mapper. This is useful in testing applications.
- RandomLongInput - Random number generator, generates a specified number of random long values across the specified number of shards. This is useful in testing applications. By specifying the random seed, the job can be reproduced to help debugging. A Long object is passed to the Mapper.
Writing your own Input class
If the existing
Input
classes don't serve your purpose, you can write your own, along with an associated
InputReader
class, to read and parse other types of input.
Subclassing Input
The
Input
class must define the
createReaders()
method:
List<? extends InputReader<I>> createReaders() throws IOException;
This method determines how many map shards are needed and creates and initializes an instance of
InputReader
for each shard. The number of readers may be explicitly set by the user, or the method could analyze the input and determine how many readers are needed to partition the data efficiently.
Subclassing InputReader
The
InputReader
class implements the Serializable interface. You should be sure your implementation is serializable. The class must implement the
next()
method:
public I next() throws IOException, NoSuchElementException;
This method is similar to an Iterator interface.
next
() is called repeatedly. It returns a new record each time until the data source is exhausted, in which case it throws
NoSuchElementException
.
InputReader
may also override these methods, see the class reference for more details:
public Double getProgress();
public void beginSlice() throws IOException;
public void endSlice() throws IOException;
public void beginShard() throws IOException;
public void endShard() throws IOException;
public long estimateMemoryRequirement();
To see an example of an
Input
implementation, take a look at the source code for
ConsecutiveLongInput
. This file defines the
ConsecutiveLongInput
class and its
InputReader
class,
Reader
, which is nested.
The Mapping classes
There are two different classes for defining mappers for the two types of jobs.
The Mapper class
-
The map stage of a MapReduce job must implement the Mapper class, which defines the
map()
method:public abstract void map(I value);
The library calls
map()
once for each record produced by anInputReader
. The method can callMapper.emit()
to emit zero or more key-value pairs. The size of each key-value pair must be less than 1MB:protected void emit(K key, V value);
The MapOnlyMapper class
-
The map stage of a Map job must implement the MapOnlyMapper class, which defines the
map()
method:public abstract void map(I value);
The library calls
map()
once for each record produced by anInputReader
. The method can callMapOnlyMapper.emit()
to emit zero or more values. You must be sure that the size of the emitted value is valid for the job's output type:protected void emit(O output);
Note: you may use an instance of the
Mapper
class in a Map job instead, provided its key type isVoid
. You can then pass the Mapper instance to theMapSpecification.Builder.setMapper()
method.
A
map()
method should be simple and small. If you need to do a lot of
work related to one input item, try re-thinking what the input should be. It is
usually possible to formulate a job so the library does the hard work.
Both types of mappers handle input data in zero or more shards; each shard is subdivided into zero or more slices. A mapper may be serialized and deserialized before processing its first shard (or first slice in a shard), after processing its last shard (or last slice in a shard), or between any consecutive shards (or slices). To make it easy to initialize, save, and restore state, a mapper has these methods, which can be overridden:
public void beginShard();
public void endShard();
public void beginSlice();
public void endSlice();
public long estimateMemoryRequirement();
Since a
Mapper
may be serialized many times, avoid saving references to large objects or any data you can easily reconstruct in the
beginSlice()
methods. If you use
IOC frameworks
to inject members into your mapper, be sure the frameworks work with MapReduce serialization. If this is not done properly, when the class is serialized between slices its members may not be preserved.
The Reducer class
MapReduce jobs must provide an implementation of
Reducer
class, which defines the
reduce()
method:
public abstract void reduce(K key, ReducerInput<V> values);
The library calls
reduce()
once for each unique key. The
values
argument contains all the values produced by every
Mapper
instance for the same key. The method evaluates all the values and generates the final output for the key. Note that the type of the
values
(
ReducerInput
) is an Iterator. The
reduce()
method calls
Reducer.emit()
to emit the final output for the key.
protected void emit(K key, V value);
Like the
Mapper
class,
Reducer
handles its input in zero or more shards; each shard is subdivided into zero or more slices. A
Reducer
may be serialized and deserialized before processing its first shard (or first slice in a shard), after processing its last shard (or last slice in a shard), or between any consecutive shards (or slices). To make it easy to initialize, save, and restore state,
Reducer
has these methods, which can be overridden:
public void beginShard();
public void endShard();
public void beginSlice();
public void endSlice();
public long estimateMemoryRequirement();
As with
map()
, the
reduce()
method should also be simple and small. If you need to do a lot of work related to one output item, try re-thinking what the output should be. Let the library do the hard work.
Since a
Reducer
may be serialized many times, avoid saving references to large objects or any data you can easily reconstruct in the
beginSlice()
method. If you use
IOC frameworks
to inject members into your reducer, be sure the frameworks work with MapReduce serialization. If this is not done properly, when the class is serialized between slices its members may not be preserved.
The MapReduce library contains a few
Reducer
classes; they can be found in the MapReduce library's
reducers package
.
The Output class
When you define a job, you specify an Output class that takes the data emitted by the final stage of the job (the map stage of a Map job, or the reduce stage of a MapReduce job) and writes it to a specific type of destination.
The MapReduce library contains several
Output
classes that write to different
types of output destinations. Each type of
Output
class is paired with a
corresponding subclass of
OutputWriter
. The MapReduce library calls the
Output
class method
createWriters(numShards)
with the number of shards requested in
the
MapReduceSpecification
.
The available
Output
classes can be found in the MapReduce library's
outputs package
. Some of the most often-used classes are:
- GoogleCloudStorageFileOutput - Writes the data emitted by reduce to a GCS file. There is no delimiter inserted between records. This allows the reducer to define its own format.
- BlobFileOutput - Produces a Blobstore file for each reducer shard. Each file name is generated using a given format string that includes a zero-based integer parameter that is replaced with the shard number. The user specifies the number of shards.
- GoogleCloudStorageLevelDbOutput - Writes the data emitted by reduce to a GCS file. Each item is written as a single record in LevelDb format. This is useful for chaining jobs.
- MarshallingOutput - This can be used in conjunction with either of the above. It allows the reducer to write any type of record for which a Marshaller can be provided, which greatly simplifies coding.
- InMemoryOutput - Used to return an arbitrary Java object in memory. This is normally used for testing.
- NoOutput - Useful for Map jobs that do not need to produce any output.
Writing your own Output class
If the existing
Output
classes don't serve your purpose, you can write your own, along with its associated
OutputWriter
class.
Subclassing Output
The
Output
class must define these methods:
List<? extends OutputWriter<O>> createWriters(int numShards);
R finish(Collection<? extends OutputWriter<O>> writers) throws IOException;
createWriters(numShards)
creates and initializes an instance of
OutputWriter
for each shard.
The
finish()
method returns the job's result (of type
R
), or null. In many cases it returns a pointer to the data written by the
Output
class.
Subclassing OutputWriter
The
OutputWriter
class must define the method
write()
, which writes a value to the output.
OutputWriter
may also override these methods, see the class reference for more details:
public void beginSlice() throws IOException;
public void endSlice() throws IOException;
public void beginShard() throws IOException;
public void endShard() throws IOException;
public long estimateMemoryRequirement();
public boolean allowSliceRetry();
To see an example of an
Output
implementation, take a look at the source code for
DatastoreOutput
. This file defines the
DatastoreOutput
class and the
DatastoreOutputWriter
class, which is nested.
The Marshaller class
After mapping, each key-value goes into the shuffle stage and then into a Reducer. Between each stage the data is temporarily stored. This means the keys and values are serialized and deserialized by marshallers. You must provide the
MapReduceSpecification
with a marshaller for keys and for values separately, and you may use the same
Marshaller
for both.
The MapReduce library provides several marshallers for you to use. They are private classes. To use them you call a
getXXXMarshaller()
method in the
Marshallers
class:
-
StringMarshaller
- Takes Java strings and encodes them using utf-8. -
SerializationMarshaller
- this will handle any Java object that implements the Serializable interface -
LongMarshaller
- an optimized marshaller for long values. Unlike the SerializationMarshaller that uses Java’s built in serialization methods, the LongMarshaller will produce a more compact representation (since it knows every entry is a long). This marshaller does not accept null values. -
IntegerMarshaller
- Just like the LongMarshaller except for integers. -
ByteBufferMarshaller
- a marshaller for ByteBuffers.
If the
SerializationMarshaller
is not efficient enough for your types you can write your own marshaller. It will need to implement the
Marshaller
interface which requires two methods:
public ByteBuffer toBytes(T object);
public T fromBytes(ByteBuffer b) throws IOException;
The Counter Class
You may find it convenient to use the
Counter
class. A counter is an integer variable that is aggregated across multiple shards. Counters can be used to do statistical calculations. You acquire a counter by calling
getCounter()
on the context from the map() or reduce() function. For example:
Counter c = getContext().getCounter("name");
This retrieves an existing counter or creates one if it doesn't already exist.
Size limits
The total size of all the instances of
Mapper
,
InputReader
,
OutputWriter
and
Counter
s must be less than 1MB between slices. This is because these instances are serialized and saved to the datastore between slices.