You have just run out of minutes on your phone plan and want to find the person responsible for inconveniencing you this way. Luckily, we have provided you with a phone log with the time spent per phone call at
$KIJI_HOME/examples/phonebook/input-phone-log.txt. You want to be able to get the total time spent talking to each of your contacts this month.
To support operations like this Kiji provides atomic counter type cells like HBase that permit multiple sources to be simultaneously incrementing the cell without conflict. In this example, we will use the
stats:talktime column of counters to count the total talktime for each of your contacts.
describe phonebook; command within the kiji-schema-shell should print the
stats:talktime as a column of counter cells:
... Column family: stats Description: Statistics about a contact. Column stats:talktime (Time spent talking with this person) Schema: (counter)
We’ll show you a way to use counters to get the amount of time that each of your contacts has spent calling you with the IncrementTalkTime example.
stats:talktime column of the phonebook table is of the type COUNTER. You can see this by looking at the layout files. As Kiji tables are based on Hbase, they also provide the ability to treat columns as counters.
IncrementTalkTime uses MapReduce to calculate the talk time per person in the call log by extending the Hadoop Mapper class. You can find more information about Hadoop MapReduce here.
The application starts by configuring a Hadoop job. Note that we need to ship certain jars that we depend on during the map task. Here's how we do this:
GenericTableMapReduceUtil.addAllDependencyJars(job); DistributedCacheJars.addJarsToDistributedCache(job, new File(System.getenv("KIJI_HOME"), "lib")); job.setUserClassesTakesPrecedence(true);
The map function is run once per each line in the phone log file. The input to the function is a line of the form:
firstname | lastname | call_duration
setup function is called once per mapper. It opens the phonebook kiji table and creates a context to be able to write to it. Specifically, the calculated total talk time per contact will be written to the contact’s record.
The map task breaks the input line up into its individual components. It then generates a row ID for this user in the Kiji table as follows:
EntityId user = mKijiTable.getEntityId(firstName + "," + lastName);
The following code increments the existing value of the
stats:talktime column by the call duration in the call log.
mWriter.increment(user, "stats", "talktime", talkTime);
Running the Example
First you need to add the phone log to hdfs. You can do this by using the
hdfs -copyFromLocal command.
$HADOOP_HOME/bin/hadoop fs -copyFromLocal \ $KIJI_HOME/examples/phonebook/input-phone-log.txt /tmp
You can then run the
kiji jar command, much like the previous examples, providing the path to the file in hdfs.
$KIJI_HOME/bin/kiji jar \ $KIJI_HOME/examples/phonebook/lib/kiji-phonebook-1.0.0-rc3.jar \ org.kiji.examples.phonebook.IncrementTalkTime /tmp/input-phone-log.txt
Now we can look up the derived talktime value from the stats column for the user John Doe using the
kiji ls command:
$KIJI_HOME/bin/kiji ls --table=phonebook \ --entity-id="John,Doe" --columns="stats"
Looking up entity: U\x1EP\xC1\xF2c$7\xCC\xBA\xCB\x16\x10\x0F\x11\xDB from kiji table: kiji://localhost:2181/default/phonebook/ U\x1EP\xC1\xF2c$7\xCC\xBA\xCB\x16\x10\x0F\x11\xDB  stats:talktime 15