Use Counters
You have just run out of minutes on your phone plan and want to find the person responsible
for inconveniencing you this way. Luckily, we have provided you with a phone log with the
time spent per phone call at $KIJI_HOME/examples/phonebook/input-phone-log.txt
. You want to
be able to get the total time spent talking to each of your contacts this month.
To support operations like this Kiji provides atomic counter type cells like HBase that
permit multiple sources to be simultaneously incrementing the cell without conflict. In
this example, we will use the stats:talktime
column of counters to count the total
talktime for each of your contacts.
Running the describe phonebook;
command within the kiji-schema-shell should print the
stats:talktime
as a column of counter cells:
...
Column family: stats
Description: Statistics about a contact.
Column stats:talktime (Time spent talking with this person)
Schema: (counter)
We'll show you a way to use counters to get the amount of time that each of your contacts has spent calling you with the IncrementTalkTime example.
IncrementTalkTime.java
Deprecation Warning
This section refers to classes in the org.kiji.schema.mapreduce package that may be removed in the future. You should use the KijiMR library to manage MapReduce jobs that interoperate with Kiji tables. In particular, the KijiMapReduceJobBuilder takes care of lots of things you'd have to do manually here (e.g., use GenericTableMapReduceUtil and DistributedCacheJars).
The stats:talktime
column of the phonebook table is of the type COUNTER. You can see this by
looking at the layout files. As Kiji tables are based on Hbase, they also provide the ability to
treat columns as counters.
IncrementTalkTime uses MapReduce to calculate the talk time per person in the call log by extending the Hadoop Mapper class. You can find more information about Hadoop MapReduce here.
The application starts by configuring a Hadoop job. Note that we need to ship certain jars that we depend on during the map task. Here\'s how we do this:
GenericTableMapReduceUtil.addAllDependencyJars(job);
DistributedCacheJars.addJarsToDistributedCache(job,
new File(System.getenv("KIJI_HOME"), "lib"));
job.setUserClassesTakesPrecedence(true);
The map function is run once per each line in the phone log file. The input to the function is a line of the form:
firstname | lastname | call_duration
The setup
function is called once per mapper. It opens the phonebook kiji table and creates
a context to be able to write to it. Specifically, the calculated total talk time per contact
will be written to the contact's record.
The map task breaks the input line up into its individual components. It then generates a row ID for this user in the Kiji table as follows:
final EntityId user = mKijiTable.getEntityId(firstName + "," + lastName);
The following code increments the existing value of the stats:talktime
column by the call
duration in the call log.
mWriter.increment(user, "stats", "talktime", talkTime);
Running the Example
First you need to add the phone log to hdfs. You can do this by using the hdfs -copyFromLocal
command.
$HADOOP_HOME/bin/hadoop fs -copyFromLocal \
$KIJI_HOME/examples/phonebook/input-phone-log.txt /tmp
You can then run the kiji jar
command, much like the previous examples, providing the path
to the file in hdfs.
$KIJI_HOME/bin/kiji jar \
$KIJI_HOME/examples/phonebook/lib/kiji-phonebook-1.1.3.jar \
org.kiji.examples.phonebook.IncrementTalkTime /tmp/input-phone-log.txt
Verify
Now we can look up the derived talktime value from the stats column for the user John Doe using the kiji get
command:
$KIJI_HOME/bin/kiji get kiji://.env/default/phonebook/stats \
--entity-id='"John,Doe"'
Looking up entity: ['John,Doe'] from kiji table: kiji://localhost:2181/default/phonebook/stats/
entity-id=['John,Doe'] [1363228284547] stats:talktime
15
Phonebook Tutorial
- Overview
- Setup
- Create a Table
- Read and Write in Kiji
- Import Data
- Derive Data
- Use Counters
- Delete Contacts