PlayCount
The 'Hello World!' of MapReduce
To quote Scalding Developers Hadoop is a distributed system for counting words. Unfortunately we are fresh out of words, but we do have the play history of bajillions of users listening to bazillions of songs.
This MapReduce job uses the listening history of our users that we have stored in the "users" Kiji table to calculate the total number of times each song has been played. The result of this computation is written to a text file in HDFS.
Creating the Job
SongPlayCounter
is an example of a KijiExpress job that reads from a Kiji table and writes to a
file on HDFS. SongPlayCounter proceeds through these stages:
- Read the column "info:track_plays" from rows in a Kiji table.
- Break each user's track play history into individual songs.
- Count the number of times each song has been played.
- Write each song ID and play count to a file in HDFS.
The tutorial includes the job already written for you in Running the Job below; these sections walk through how the job is created.
Read "info:track_plays" from a Kiji table
Data can be read from a Kiji table by using KijiInput
).
This factory method takes options specific to requesting slices of data from a Kiji table such as:
- Which columns to retrieve from the table and the field names they should be given.
- Number of versions of each cell to return.
- Filters to apply to the requested data.
For the tutorial:
- Call
KijiInput
to read all versions of the column "info:track_plays" from the provided Kiji table and bind the resulting value to the field named'playlist
:
KijiInput(args("table-uri"))(Map(Column("info:track_plays", all) -> 'playlist))
Split each user's track play history into individual songs
Each cell in the "info:track_plays" column may contain multiple songs that the user represented by
this row has listened to. This data is manifested as a
KijiSlice
. For our purposes, we can imagine a
KijiSlice
as a list of cells, each one a different
version of the "info:track_plays" column.
- Unpack the data contained within each cell:
/**
* Gets the ids of songs a user has listened to.
*
* @param slice from the column `info:track_plays` that records all the songs a user has
* listened to.
* @return the song ids that a user has listened to.
*/
def songsListenedTo(slice: KijiSlice[String]): Seq[String] = {
slice.cells.map { cell => cell.datum }
}
After the cells have been unpacked, we then flatten the resulting list by calling flatMapTo
.
flatMapTo
is one of the set of map operations including flatMap
, map
, and mapTo
.
The tuples resulting from a map
or flatMap
operation contain the same fields as the
tuples the operation was run against, as well as the output fields generated by the operation.
By contrast with mapTo
and flatMapTo
, the tuples resulting from the operation contain only the
output fields generated by the operation; the fields contained in the original input tuples are dropped.
The output from flatMapTo
are returned in a single, unnested "list."
- Run the following command to flatten the cell data:
.flatMapTo('playlist -> 'song) { songsListenedTo }
After this operation, the virtual "list" being operated on now contains all of the songs listened to by users.
Count the occurrences of each song
Now that each played song has been separated from the user that listened to it, we can calculate the
play count for each song. To achieve this, we will use the groupBy
operation. groupBy
takes two
arguments:
- A field to group by
- A function that aggregates the resulting tuples that shared the same value bound to the provided field.
In our case we want to group on the song name which will provide a list of tuples that contained the
same song. This group will then be used to calculate its size which then gets bound to the field
name 'songCount
.
- Run the following command to group songs and count each group:
.groupBy('song) { _.size('songCount) }
After this operation, the virtual "list" being operated on now contains a mapping between song names
(stored in the 'song
field) and its corresponding play count (stored in the 'songCount
field).
Write the results to a file
- Write the play counts to a TSV (tab-separated value) file on HDFS:
.write(Tsv(args("output")))
All Together Now
This code shows the entire pipeline put together:
SongPlayCounter.scala
Running the Job
- Run the SongPlayCounter job:
express job --libjars "${MUSIC_EXPRESS_HOME}/lib/*" \
${MUSIC_EXPRESS_HOME}/lib/kiji-express-music-0.7.0.jar \
org.kiji.express.music.SongPlayCounter \
--table-uri ${KIJI}/users \
--output express-tutorial/songcount-output \
--hdfs
Alternative: Running the Job as a Script
You can also run the SongPlayCounter job as a script:
express script --libjars "${MUSIC_EXPRESS_HOME}/lib/*" \
${MUSIC_EXPRESS_HOME}/scripts/SongPlayCounter.express --hdfs
Verify Output
- Run the following command to see the output of the job:
hadoop fs -tail express-tutorial/songcount-output/part-00000 | head -n 3
song-0 260
song-1 100
song-10 272
KijiExpress Tutorial
- Overview
- KijiExpress Language
- Setting up Kiji and HDFS
- Importing Data
- PlayCount
- Top Next Songs
- Recommendations Producer