Producers
Motivation
A KijiProducer
executes a function over a subset of the columns in a table row and produces output to be injected back into a column of that row. Producers can be run in a MapReduce job that operates over a range of rows from a Kiji table. Common tasks for producers include parsing, profiling, recommending, predicting, and classifying. For example, you might run a LocationIPProducer to compute and store the location of each user into a new column, or a PersonalizationProfileProducer to compute a personalization profile.
Whereas gatherers generally run over the rows of a Kiji table to generate key-value pairs for the purposes of conversion, analysis, or filling in information in a different table, producers can only write back information directly into the row they’re presently processing. Producers are the most appropriate tool when you want to update a row with information calculated from the existing contents of that row and KeyValueStores.
Classes Overview
There are three important classes for an application that wants to use producers. All producers must extend the abstract class org.kiji.mapreduce.produce.KijiProducer
and override its abstract methods as described below. Clients should be familiar with the org.kiji.mapreduce.producer.ProducerContext
class, which is the interface producers use to output data. Finally, while produce jobs can be launched with kiji produce
from the command line, org.kiji.mapreduce.produce.KijiProduceJobBuilder
can be used to construct and launch produce jobs programatically.
Using the API
Each producer must extend KijiProducer
and must implement the following three methods:
KijiDataRequest getDataRequest()
. This method specifies the columns retrieved while scanning from the input table. It should construct and return aKijiDataRequest
.String getOutputColumn()
. This method specifies the fully-qualified column or the column family being produced. It should return a string of the form “family:qualifier” or “family”. Family-only output columns are only valid for map-type families (see the KijiSchema user guide).void produce(KijiRowData input, ProducerContext context)
. This method contains the logic to produce the content for the output column for each input row. It will be called once per row processed by the task.input
contains columns from the row as requested by theKijiDataRequest
returned fromgetDataRequest()
. Theproduce()
method can use itscontext
argument to output to this column as detailed below.
When producing new content for a row, the producer may combine the input row data with data from external stores through KeyValueStore
s by implementing Map<String, KeyValueStore<?, ?>> getRequiredStores()
. This method should construct and return a map specifying all the KeyValueStore
s that the producer wants to access. The KeyValueStore
s may then be accessed from the produce()
method through the ProducerContext
. For more details, you may check the Key-Value Stores section in this guide.
Optionally, a producer may implement setup()
and cleanup()
to initialize and finalize resources that can be reused during the produce task. These methods will be called once by each task, setup()
before processing any rows and cleanup()
after the task is done processing. If you wish to use a KeyValueStore
, it should be opened once with context.getStore(storeName)
in setup()
, saved in a member variable, and closed in cleanup()
.
As mentioned above, a Producer’s produce()
method has a org.kiji.mapreduce.producer.ProducerContext
argument. This class has a number of methods which are important to a producer author:
void put(T value)
,void put(long timestamp, T value)
,void put(String qualifier, T value)
, andvoid put(String qualifier, long timestamp, T value)
. Each of these calls put data into the current row in the column specified by the producer’sgetOutputColumn()
. The type ofvalue
must be compatible with the output column’s schema as declared in the table layout. Thetimestamp
parameter is optional; if ommitted the current time will be used. Aqualifier
argument should only be used if the producer’sgetOutputColumn()
specified a map-type family.void incrementCounter(Enum<?> counter)
. Increments a MapReduce job counter. This can be useful for calculating aggregate counts about the full MapReduce job (for example, you can use a counter to calculate the total number of input rows containing malformed data). This method is common to all KijiContexts.
Example
The following is a minimal example of a Kiji producer class. This producer is designed to calculate and save the zodiac signs of people in a user table, and could be part of something like a horoscope application. We assume the user table has a column info:birthday
containing the user’s birthdate as a string and a map-type family of strings, produced
. Our example producer will output the zodiac sign as a string into produced:zodiac_sign
.
For brevity, the calculation of a zodiac sign from a birthday string has been omitted.
/**
* Example of a producer class. Calculates zodiac signs.
*/
public static class ZodiacProducer extends KijiProducer {
public static enum Counters {
MISSING_BIRTHDAY,
}
/** {@inheritDoc} */
@Override
public KijiDataRequest getDataRequest() {
// Fetch all columns in family 'info' from the input table:
return KijiDataRequest.create("info");
}
/** {@inheritDoc} */
@Override
public String getOutputColumn() {
// Configure the producer to emit to a single column 'produced:zodiac_sign':
return "produced:zodiac_sign";
}
/** Compute the zodiac sign from a birthday. */
private String zodiacSignFromBirthday(String birthday) {
// Implementation left to the reader
// …
}
/** {@inheritDoc} */
@Override
public void produce(KijiRowData row, ProducerContext context) throws IOException {
// Extract the required data from the input row:
final CharSequence birthday = input.getMostRecentValue("info", "birthday");
if (birthday == null) {
// Input row contains no birthday, report and move on:
context.incrementCounter(Counters.MISSING_BIRTHDAY);
return;
}
// Some computation:
final String zodiacSign = zodiacSignFromBirthday(birthday.toString());
// Emits the generated content to the configured output column:
context.put(zodiacSign);
}
}
Our example producer may be used on the command line with something like the following. This assumes that our table is named user_table
on the default Kiji instance
kiji produce \
--producer=my.application.package.ZodiacProducer \
--input="format=kiji table=kiji://.env/default/user_table" \
--output="format=kiji table=kiji://.env/default/usertable nsplits=1"
Note: the output table of a producer must match the input table. The nsplits
parameter is ignored for producers.
Provided Library Classes
The org.kiji.mapreduce.lib.produce
package of the KijiMR Library contains a number of producer implementations that might be of use to application developers:
AllVersionsSingleInputProducer
andSingleInputProducer
are convenience classes. Subclasses of these abstract classes only have to implementString getInputColumn()
instead of constructing an entireKijiDataRequest
ingetDataRequest()
. Theproduce()
method will receive all the versions of that column (if the parent class isAllVersionsSingleInputProducer
) or the most recent (if the parent class isSingleInputProducer
).RegexProducer
is an abstract subclass ofSingleInputProducer
. Subclasses must implementString getInputColumn()
to specify a column andString getRegex()
, which should be a regular expression that matches the contents of the input column. The regex should have one capturing group.RegexProducer
contains aproduce()
method which will write that group to the output column. Both input and output should be strings.ConfiguredRegexProducer
is a concrete implementation ofRegexProducer
which uses Configuration keys to specify its input, output, and regular expression. You can use this class to copy substrings from one column to another without writing Java code.IdentityProducer
is another concrete producer. It copies data directly from its input column to its output column, both of which may be specified using configuration keys on the command line. This can be useful for clients who want to copy one column’s data to another without writing code.
See the javadoc of these classes for more information about using them.
KijiMR User Guide
- What is KijiMR?
- Bulk Importers
- Producers
- Gatherers
- Reducers
- HFiles
- Command Line Tools
- Key-Value Stores
- Job History
- Working with Avro
- Testing