At this point, you should be worried about wearing out your keyboard by manually running an add command for each of your (incredibly large set of) friends. First, we will discuss and give examples of the naive way of extending AddEntry to load a table, and then present the recommended MapReduce approach.

An example of writing puts in order to load a table is given in StandalonePhonebookImporter. This approach is easier than running the add command for every user, but is still suboptimal because you are using one machine to perform every put.

Parsing the Delimited Phonebook Entries

First, let's take a closer look at the data we want to import. In $KIJI_HOME/examples/phonebook/input-data.txt you will see records like:

John|Doe||202-555-9876|{"addr1":"1600 Pennsylvania Ave","apt":null,"addr2":null,"city":"Washington","state":"DC","zip":99999}

The fields in each record are delimited by a | character. The last field is actually a complete JSON Avro record representing an address.

At the top of the importLine(...) method in StandalonePhonebookImporter, you'll see we're using a delimiter to split up the record into its specific fields:

String[] fields = value.toString().split("\\|");

String firstName = fields[0];
String lastName = fields[1];
String email = fields[2];
String telephone = fields[3];
String addressJson = fields[4];

Since our last field is a complex Avro type represented in JSON, we use a JSON decoder with an Avro schema (in this case Address), to decode the field into a complete Avro Address object.

// addressJson contains a JSON-encoded Avro "Address" record. Parse this into
// an object and write it to the person's record.
// The Address record type is generated from src/main/avro/Address.avsc as part
// of the build process (see avro-maven-plugin in pom.xml).
final SpecificDatumReader<Address> datumReader =
    new SpecificDatumReader<Address>(Address.SCHEMA$);
final JsonDecoder decoder =
    DecoderFactory.get().jsonDecoder(Address.SCHEMA$, addressJson);
final Address streetAddr =, decoder);

Next we create a unique EntityId that will be used to reference this row. As before, we will use the combination of first and last name as a unique reference to this row:

final EntityId user = table.getEntityId(firstName + "," + lastName);

Finally we just retrieve the current system timestamp and write these record fields.

final long timestamp = System.currentTimeMillis();
writer.put(user, Fields.INFO_FAMILY, Fields.FIRST_NAME, timestamp, firstName);
writer.put(user, Fields.INFO_FAMILY, Fields.LAST_NAME, timestamp, lastName);
writer.put(user, Fields.INFO_FAMILY, Fields.EMAIL, timestamp, email);
writer.put(user, Fields.INFO_FAMILY, Fields.TELEPHONE, timestamp, telephone);
writer.put(user, Fields.INFO_FAMILY, Fields.ADDRESS, timestamp, streetAddr);

Running the Example

You can execute StandalonePhonebookImporter just like you would the AddEntry example - using the kiji jar command.

$KIJI_HOME/bin/kiji jar \
    $KIJI_HOME/examples/phonebook/lib/kiji-phonebook-1.0.0.jar \
    org.kiji.examples.phonebook.StandalonePhonebookImporter \

You now have data in your phonebook table!


Verify that the user records were added properly by executing:

$KIJI_HOME/bin/kiji scan kiji://.env/default/phonebook

Importing Data using MapReduce

While the above example does import the requisite data, it doesn't scale to large data sets, because it doesn't take advantage of the cluster you are writing to. Instead of generating all puts locally like we did above, we will write a MapReduce job that reads from text file in HDFS and writes to your Kiji table by generating puts in a distributed fashion.

Our example of importing data into a table with MapReduce can be found in the class PhonebookImporter. PhonebookImporter defines a special type of MapReduce job called a Kiji bulk import job that reads each line of our input file, parses it, and writes it to a table. Kiji bulk import jobs are created by implementing a KijiBulkImporter, not a Mapper and Reducer. This API is provided by KijiMR. The Music recommendation tutorial covers KijiMR in much greater detail, but we will take a look at using the KijiBulkImporter API below.

Instead of a map() method, we provide a produce() method definition; this method processes an input record from a file like an ordinary mapper, except its context argument is specifically targeted to output to a row in a Kiji table.

At the top of the produce() method, you'll see that we extract the fields from the lines as in the above example. Then using the KijiTableContext context argument, we'll write all of the fields to the phonebook table:

public void produce(LongWritable byteOffset, Text line, KijiTableContext context)
    throws IOException {
  context.put(user, Fields.INFO_FAMILY, Fields.FIRST_NAME, firstName);
  context.put(user, Fields.INFO_FAMILY, Fields.LAST_NAME, lastName);
  context.put(user, Fields.INFO_FAMILY, Fields.EMAIL, email);
  context.put(user, Fields.INFO_FAMILY, Fields.TELEPHONE, telephone);
  context.put(user, Fields.INFO_FAMILY, Fields.ADDRESS, streetAddr);

The context.put() calls are identical in form to using a KijiTableWriter.

If you are writing a custom bulk importer and require specialized setup and teardown steps, these can be placed in setup() and cleanup() methods like in a Mapper. We don't need any in this example.

The outer PhonebookImporter class contains configureJob(...) and run(...) methods that handle the setup and execution of the MapReduce job. Instead of constructing a Hadoop Job object directly, we use a KijiBulkImportJobBuilder. This builder object lets us specify Kiji-specific arguments, and construct a KijiMapReduceJob (A Kiji-specific wrapper around Job):

KijiMapReduceJob configureJob(Path inputPath, KijiURI tableUri) throws IOException {
  return KijiBulkImportJobBuilder.create()
      .withInput(new TextMapReduceJobInput(inputPath))
      .withOutput(new DirectKijiTableMapReduceJobOutput(tableUri))

The HDFS file path to the sample input data is set to the first command line argument. A KijiURI is constructed that specifies the phonebook table as the target:

public int run(String[] args) throws Exception {
  final KijiURI tableUri =
      KijiURI.newBuilder(String.format("kiji://.env/default/%s", TABLE_NAME)).build();
  final KijiMapReduceJob job = configureJob(new Path(args[0]), tableUri);

  final boolean isSuccessful =;
  return isSuccessful ? 0 : 1;

The TextMapReduceJobInput and DirectKijiTableMapReduceJobOutput classes are abstractions that, under the hood, configure an InputFormat and OutputFormat for the MapReduce job. Different KijiMR job types (bulk importer, producer, or gatherer) support different subsets of available formats (files, tables, etc). These classes allow the system to ensure that the correct type is used. For example, bulk import jobs require that the target is a table. "Regular" MapReduce jobs configured through KijiMapReduceJobBuilder can use any MapReduceJobInput and MapReduceJobOutput that makes sense in the context of the application.

Running the Example

First you'll need to put the text file containing your friends' contact info into HDFS. You can do this by using the HDFS -copyFromLocal command:

$HADOOP_HOME/bin/hadoop fs -copyFromLocal \
    $KIJI_HOME/examples/phonebook/input-data.txt /tmp

You can then use the kiji jar command to execute the MapReduce job. To use the jar command, specify the jar file the PhonebookImporter class and the path to the input-data.txt file in HDFS.

$KIJI_HOME/bin/kiji jar \
    $KIJI_HOME/examples/phonebook/lib/kiji-phonebook-1.0.0.jar \
    org.kiji.examples.phonebook.PhonebookImporter \

You now have data in your phonebook table!

Running with the bulkimport Command

The BulkImportJobBuilder allows you to programmatically configure and launch a bulk import MapReduce job. KijiMR extends Kiji with a bulkimport command-line tool that can perform substantially the same steps. The following command would run the same bulk import job without requiring that you write a "main()" method:

$KIJI_HOME/bin/kiji bulk-import \
    --importer='org.kiji.examples.phonebook.PhonebookImporter$PhonebookBulkImporter' \
    --input="format=text file=/tmp/input-data.txt" \
    --output="format=kiji nsplits=1 table=kiji://.env/default/phonebook" \

The --input and --output arguments specify in text form the same MapReduceJobInput and MapReduceJobOutput objects as are created programmatically in this example.


Verify that the user records were added properly by executing:

$KIJI_HOME/bin/kiji scan kiji://.env/default/phonebook

Here's what one of the first entries should look like:

Scanning kiji table: kiji://localhost:2181/default/phonebook/
entity-id=hbase=hex:551e50c1f2632437ccbacb16100f11db [1363228117784] info:firstname
entity-id=hbase=hex:551e50c1f2632437ccbacb16100f11db [1363228117787] info:lastname
entity-id=hbase=hex:551e50c1f2632437ccbacb16100f11db [1363228117789] info:email
entity-id=hbase=hex:551e50c1f2632437ccbacb16100f11db [1363228117792] info:telephone
entity-id=hbase=hex:551e50c1f2632437ccbacb16100f11db [1363228117793] info:address
                                 {"addr1": "1600 Pennsylvania Ave", "apt": null, "addr2": null, "city": "Washington", "state": "DC", "zip": 99999}