Bulk-importers and producers write their outputs to Kiji tables. Gatherers and reducer may also direct their outputs to Kiji tables.
The most natural way to write to a Kiji table from a MapReduce job is through direct writes. This is what the
format=kiji job output specification achieves on the command line, eg. when running a job with
--output="format=kiji table=kiji://.env/default/table nsplits=1". Using the job builder API, this is what the job output class
DirectKijiTableMapReduceJobOutput implements, eg. when specifying the output of a job with
Under the hood, the output format used by each MapReduce tasks will open a
KijiTableWriter and issue
put() operations that will be sent to the live Kiji table as the MapReduce job progresses.
Following is a bulk-import command line example that uses direct writes to the output Kiji table:
kiji bulk-import \ --importer=my.application.package.BulkImporterExample \ --input="format=seq file=hdfs://cluster/path/to/input-seq-file/" \ --output="format=kiji table=kiji://.env/default/table nsplits=1" \
When using direct writes, the
nsplits parameter is ignored (although required). The number of bulk-import tasks depends on the input format used. In the previous example, the number of bulk-import tasks is the number of input sequence files in the directory
hdfs://cluster/path/to/input-seq-file/. Each bulk-import task will send writes directly to the output Kiji table
Another approach for writing to Kiji tables is for the MapReduce jobs to write HFiles. An HFile is the underlying file format used by HBase region servers to store the cells for a table. Cells written to an HFile must first be ordered by increasing row key, then by increasing HBase column names, and finally according to their timestamps. The shuffle-sort-reduce phase of a MapReduce job is used to order the cells properly before writing them to the HFiles. HFiles can then be efficiently loaded into a live Kiji table through bulk-loading operations. A bulk-load operation can be initiated with the
kiji bulk-load command line tool.
KijiMR jobs are configured to write HFiles through the
format=hfile job output specification on the command line, as in
--output="format=hfile table=kiji://.env/default/table file=hdfs://cluster/path/to/hfile/ nsplits=1". Using the job builder API, the HFile job output is implemented by the
HFileMapReduceJobOutput job output class, as in
jobBuilder.withOutput(new HFileMapReduceJobOutput(tableURI, hfilePath)).
Below is the same example of a bulk-import command line as in the Direct Writes section, with the job output specification updated to generate HFiles instead of writing directly to the output Kiji table.
# Run the bulk importer configured to write HFiles: kiji bulk-import \ --importer=my.application.package.BulkImporterExample \ --input="format=seq file=hdfs://cluster/path/to/input-seq-file/" \ --output="format=hfile table=kiji://.env/default/output_table file=hdfs://cluster/path/to/output-hfile/ nsplits=10"
The first command line runs the bulk-importer job and generates HFiles in HDFS in the directory
hdfs://cluster/path/to/output-hfile/. Because the bulk-importer job is configured to write HFiles, the job builder added an identity reducer phase to sort the cells. The above-mentioned command line will generate 10 sets of HFiles, one per reducer (
nsplits=10) in directories named
hdfs://cluster/path/to/output-hfile/part-r-<reducer #>. Each reducer writes separate HFiles for each column family in directories named after the HBase column names. Ideally, each generated HFile fits entirely in a region in the target Kiji table so that no additional processing is required to have the HFile loaded by a region server serving the Kiji table.
Once the HFiles are generated, we can then bulk-loaded via the
kiji bulk-load command.
# Bulk-load the generated HFiles into the live Kiji table: kiji bulk-load \ --table=kiji://.env/default/output_table \ --hfile=hdfs://cluster/path/to/output-hfile/
Writing HFiles relies on the MapReduce shuffle-sort phase to order the cells correctly in the generated HFiles. If you need to run a mapper or gatherer with a custom table reducer and want to generate HFiles, another extra MapReduce is required to sort the cells before actually writing the HFiles.
This scenario is not fully supported yet. The helper MapReduce job
org.kiji.mapreduce.testlib.HFileReduceJob is not included yet in the KijiMR distribution. See KIJIMR-100 for more details on this work.
Below is an example of the workflow required when writing HFiles from a table reducer:
# Run the bulk importer configured to write HFiles. # Because this uses a custom table reducer, this actually writes a sequence file # containing HBase cells instead of an HFile: kiji gather \ --gatherer=my.application.package.MyGatherer \ --reducer=my.application.package.MyTableReducer \ --input="format=kiji table=kiji://.env/default/input_table" \ --output="format=hfile table=kiji://.env/default/output_table file=hdfs://cluster/path/to/output-seq-file/ nsplits=10" # Sorts the HBase cells from the output sequence file and write HFiles: # Note: The following class is currently NOT available in the KijiMR release jar # However, you may find it in the KijiMR test jar. java -classpath $(kiji classpath) org.kiji.mapreduce.testlib.HFileReduceJob \ --input-path=hdfs://cluster/path/to/output-seq-file/ \ --output-path=hdfs://cluster/path/to/output-hfile/ \ --output-table=kiji://.env/default/output_table \ --nsplits=10 # Bulk-load the generated HFiles into the live Kiji table: kiji bulk-load \ --table=kiji://.env/default/output_table \ --hfile=hdfs://cluster/path/to/output-hfile/
Working with direct writes is generally simpler to set up, as this mode requires no additional MapReduce pass and no bulk-loading operation. Direct writes are usually fine with small amounts of data or for testing.
Writing HFiles adds some complexity to the MapReduce workflow with an extra bulk-loading operation and potentially an extra MapReduce job to sort the HBase cells.
One advantage of HFiles over direct writes is the level of atomicity it provides. A MapReduce job that fails while writing directly to a Kiji table will leave the output table in a disheveled state, with partial updates. Whereas a MapReduce job that fails while writing HFiles will leave the output table unmodified - the HFile output of the MapReduce becomes visible once the HFiles are bulk loaded.
Writing significant amounts of data to a Kiji table through direct writes requires careful tuning of the underlying HBase cluster to avoid performance issues. As the writes are accumulated in the table regions, they might trigger region compactions or splits. In some circumstances, this may affect the availability of the HBase cluster.
HFiles, on the other hand, can be loaded by region servers directly with very little overhead. However, this approach might also lead to region compaction issues, as these described in HBASE-3404, HBASE-3690.
For greater details on this topic, you might have a look at the HBase book section «Writing to HBase».