The 'Hello World!' of MapReduce

To quote Scalding Developers Hadoop is a distributed system for counting words. Unfortunately we are fresh out of words, but we do have the play history of bajillions of users listening to bazillions of songs.

This MapReduce job uses the listening history of our users that we have stored in the "users" Kiji table to calculate the total number of times each song has been played. The result of this computation is written to a text file in HDFS.

Creating the Job

SongPlayCounter is an example of a KijiExpress job that reads from a Kiji table and writes to a file on HDFS. SongPlayCounter proceeds through these stages:

  • Read the column "info:track_plays" from rows in a Kiji table.
  • Break each user's track play history into individual songs.
  • Count the number of times each song has been played.
  • Write each song ID and play count to a file in HDFS.

The tutorial includes the job already written for you in Running the Job below; these sections walk through how the job is created.

Read "info:track_plays" from a Kiji table

Data can be read from a Kiji table by using KijiInput). This factory method takes options specific to requesting slices of data from a Kiji table such as:

  • Which columns to retrieve from the table and the field names they should be given.
  • Number of versions of each cell to return.
  • Filters to apply to the requested data.

For the tutorial:

  • Call KijiInput to read all versions of the column "info:track_plays" from the provided Kiji table and bind the resulting value to the field named 'playlist:
KijiInput(args("table-uri"))(Map(Column("info:track_plays", all) -> 'playlist))

Split each user's track play history into individual songs

Each cell in the "info:track_plays" column may contain multiple songs that the user represented by this row has listened to. This data is manifested as a KijiSlice. For our purposes, we can imagine a KijiSlice as a list of cells, each one a different version of the "info:track_plays" column.

  • Unpack the data contained within each cell:
/**
 * Gets the ids of songs a user has listened to.
 *
 * @param slice from the column `info:track_plays` that records all the songs a user has
 *     listened to.
 * @return the song ids that a user has listened to.
 */
def songsListenedTo(slice: KijiSlice[String]): Seq[String] = {
  slice.cells.map { cell => cell.datum }
}

After the cells have been unpacked, we then flatten the resulting list by calling flatMapTo.

flatMapTo is one of the set of map operations including flatMap, map, and mapTo. The tuples resulting from a map or flatMap operation contain the same fields as the tuples the operation was run against, as well as the output fields generated by the operation. By contrast with mapTo and flatMapTo, the tuples resulting from the operation contain only the output fields generated by the operation; the fields contained in the original input tuples are dropped.

The output from flatMapTo are returned in a single, unnested "list."

  • Run the following command to flatten the cell data:
.flatMapTo('playlist -> 'song) { songsListenedTo }

After this operation, the virtual "list" being operated on now contains all of the songs listened to by users.

Count the occurrences of each song

Now that each played song has been separated from the user that listened to it, we can calculate the play count for each song. To achieve this, we will use the groupBy operation. groupBy takes two arguments:

  • A field to group by
  • A function that aggregates the resulting tuples that shared the same value bound to the provided field.

In our case we want to group on the song name which will provide a list of tuples that contained the same song. This group will then be used to calculate its size which then gets bound to the field name 'songCount.

  • Run the following command to group songs and count each group:
.groupBy('song) { _.size('songCount) }

After this operation, the virtual "list" being operated on now contains a mapping between song names (stored in the 'song field) and its corresponding play count (stored in the 'songCount field).

Write the results to a file

  • Write the play counts to a TSV (tab-separated value) file on HDFS:
.write(Tsv(args("output")))

All Together Now

This code shows the entire pipeline put together:

SongPlayCounter.scala

Running the Job

  • Run the SongPlayCounter job:
express job --libjars "${MUSIC_EXPRESS_HOME}/lib/*" \
    ${MUSIC_EXPRESS_HOME}/lib/kiji-express-music-0.7.0.jar \
    org.kiji.express.music.SongPlayCounter \
    --table-uri ${KIJI}/users \
    --output express-tutorial/songcount-output \
    --hdfs

Alternative: Running the Job as a Script

You can also run the SongPlayCounter job as a script:

express script --libjars "${MUSIC_EXPRESS_HOME}/lib/*" \
    ${MUSIC_EXPRESS_HOME}/scripts/SongPlayCounter.express --hdfs

Verify Output

  • Run the following command to see the output of the job:
hadoop fs -tail express-tutorial/songcount-output/part-00000 | head -n 3
song-0  260
song-1  100
song-10 272