Motivation

The key-value pairs generated by mapper or gatherer tasks can be aggregated according to their keys by the MapReduce framework. All the values associated with a given key can be processed by a reducer task to emit final, aggregated key-value pairs.

This makes reducers an important component of the KijiMR workflow: A gatherer can output key-value pairs for each row processed in isolation, but to compute aggregate statistics for the entire table, gatherers must be complemented with appropriate reducers.

Classes Overview

The Kiji framework provides two abstract base classes for reducers: org.kiji.reducer.KijiReducer is the base class to use when implementing a reducer that emits key-value pairs. org.kiji.reducer.KijiTableReducer is the base class to use when implementing a reducer that writes cells to a Kiji table.

Using the API

All Kiji reducers extend the base class KijiReducer and must implement three methods:

  • Class getOutputKeyClass() and Class getOutputValueClass() to specify the classes of the output keys and values it emits;
  • void reduce(InputKey key, Iterable<InputValue> values, Context context) to implement the logic to reduce the set of input values for one input key in. The type of the input keys and input values are specific to each reducer. The reduce() method emits reduced (key, value) pairs using the reducer context with context.write(key, value).

Optionally, a reducer may use setup() and cleanup() to initialize and finalize resources that can be reused during the reduce task. These methods will be called once by each task, setup() before processing input records and cleanup() after the task is done processing input.

Example

Here is a simplistic example of a reducer class that may be chained with the FamilySizeGatherer described in the gatherer section:

/**
 * Example of a reducer class that, for every zip code, calculates
 * the number of families per family size.
 *
 * For its input, this reducer expects the output from the FamilySizeGatherer
 * presented in the gatherer section of the user guide.
 * This gatherer emits a key-value pair for every household with:
 *   <li> the household zip code as a key;
 *   <li> the number of members in the household as a value.
 *
 * For its output, this reducer emits key-value pairs where:
 *   <li> keys are strings encoded as "zip-code:family-size";
 *   <li> values are the number of families in the zip code with the size encoded in the key.
 */
public class FamilySizeReducer
    extends KijiReducer<IntWritable, IntWritable, Text, IntWritable>
    implements Configurable {

  /** {@inheritDoc} */
  @Override
  public Class<?> getOutputKeyClass() {
    // Pair (zip code, family size) encoded as a Text formatted "zip-code:family-size":
    return Text.class;
  }

  /** {@inheritDoc} */
  @Override
  public Class<?> getOutputValueClass() {
    // Number of families whose size is the size encoded in the key:
    return IntWritable.class;
  }

  /** {@inheritDoc} */
  @Override
  protected void reduce(IntWritable zipCode, Iterable<IntWritable> familySizes, Context context)
      throws IOException, InterruptedException {
    // Map: family size -> number of families with this size
    final Map<Integer, AtomicInteger> sizeMap = Maps.newHashMap();

    // Count the number of families with :
    for (IntWritable familySize : familySizes) {
      AtomicInteger count = sizeMap.get(familySize.get());
      if (count == null) {
        count = new AtomicInteger(0);
        sizeMap.put(familySize.get(), count);
      }

      // Increment the counter for this family size:
      count.incrementAndGet();
    }

    // Emit the generated family size map for zipCode:
    for (Map.Entry<Integer, AtomicInteger> entry : sizeMap.entrySet()) {
      final int familySize = entry.getKey();
      final int familyCount = entry.getValue().get();
      context.write(
          new Text(String.format("%s:%s", zipCode, familySize)),
          new IntWritable(familyCount));
    }
  }
}

This reducer may be chained to the FamilySizeGatherer described in the gatherer section of this user guide as follows:

kiji gather \
    --gatherer=my.application.package.FamilySizeGatherer \
    --reducer=my.application.package.FamilySizeReducer \
    --input="format=kiji table=kiji://.env/default/household_table_name" \
    --output="format=seq file=hdfs://localhost:9000/reducer_output.seq nsplits=3"

This command line runs a gatherer with the reducer described above. The output of the job will be written as 3 Hadoop sequence files in HDFS in the directory specified with hdfs://localhost:9000/reducer_output.seq/. Each reducer task will write a sequence file named hdfs://localhost:9000/reducer_output.seq/part-r-<reducer #>. The number of gatherer tasks is currently set to be the number of regions in the input table. The number of reducer tasks, hence the number of part files, is configurable via the nsplits job output parameter.

Provided Library Classes

The org.kiji.mapreduce.lib.reduce package of the KijiMR Library comes with a number of reducers. The most immediately useful ones for an application developer include org.kiji.mapreduce.lib.reduce.IntSumReducer, org.kiji.mapreduce.lib.reduce.LongSumReducer and org.kiji.mapreduce.lib.reduce.DoubleSumReducer. These are KijiReducer versions of reducers that compute the sums of all the values associated with each key.

These reducers are examples of subclasses of the abstract class org.kiji.mapreduce.lib.reduce.KeyPassThroughReducer, which is a convenience superclass for KijiReducers that use the same keys as the map phase.

See the KijiMR Library JavaDoc for more information on these classes.

Table reducers

org.kiji.mapreduce.KijiTableReducer is a specialization of a KijiReducer that writes cells into an output Kiji table. A table reduce must implement the logic to reduce the input values for an input key in reduce(InputKey key, Iterable<InputValue> values, KijiTableContext context), using the table reducer context to write cells to the configured output table.

Below is an minimal example of a table reducer. This reducer is very similar to the FamilySizeReducer above, but inherits from KijiTableReducer, and is designed to output to hfiles or directly into a Kiji table.

/**
 * Example of a table reducer class that, for every zip code, calculates
 * the number of families per family size.
 *
 * For its input, this reducer expects the output from the FamilySizeGatherer
 * presented in the gatherer section of the user guide.
 * This gatherer emits a key-value pair for every household with:
 *   <li> the household zip code as a key;
 *   <li> the number of members in the household as a value.
 *
 * The output of this reducer is a table whose row keys are zip codes.
 * The reducer writes cells to a map-type family named 'sizes'.
 * Each cell is written to the column 'sizes:<family-size>' and contains
 * the number of families with the size encoded in the column qualifier.
 */
public class FamilySizeTableReducer
    extends KijiTableReducer<IntWritable, IntWritable> {

  /** {@inheritDoc} */
  @Override
  protected void reduce(
      IntWritable zipCode, Iterable<IntWritable> familySizes, KijiTableContext context)
      throws IOException {
    // Map: family size -> number of families with this size
    final Map<Integer, AtomicInteger> sizeMap = Maps.newHashMap();

    // Count the number of families with :
    for (IntWritable familySize : familySizes) {
      AtomicInteger count = sizeMap.get(familySize.get());
      if (count == null) {
        count = new AtomicInteger(0);
        sizeMap.put(familySize.get(), count);
      }

      // Increment the counter for this family size:
      count.incrementAndGet();
    }

    // Emit the generated family size map in the entity whose ID is zipCode:
    final EntityId eid = context.getEntityId(zipCode.toString());

    for (Map.Entry<Integer, AtomicInteger> entry : sizeMap.entrySet()) {
      final Integer familySize = entry.getKey();
      final int familyCount = entry.getValue().get();
      context.put(eid, "sizes", familySize.toString(), familyCount);
    }
  }
}

This reducer may be chained to the FamilySizeGatherer described in the gatherer section of this user guide as follows:

kiji gather \
    --gatherer=my.application.package.FamilySizeGatherer \
    --reducer=my.application.package.FamilySizeTableReducer \
    --input="format=kiji table=kiji://.env/default/households_table_name" \
    --output="format=kiji table=kiji://.env/default/family_size_table_name nsplits=3"

This command line runs a gatherer chained with the table reducer presented above. The output of the job is written to the live Kiji table specified with kiji://.env/default/family_size_table_name. The number of gatherer tasks is currenlty set to be the number of regions in the input table. The number of reducer tasks is controlled by the nsplits job output parameter, 3 in this example.