The 'Hello World!' of MapReduce

To quote Scalding Developers, Hadoop is a distributed system for counting words. Unfortunately we are fresh out of words, but we do have the play history of bajillions of users listening to bazillions of songs.

This MapReduce job uses the listening history of our users that we have stored in the users Kiji table to calculate the total number of times each song has been played. We then write the result of this job to a text file in HDFS containing tab-separated values.

Creating the Job

SongPlayCounter is an example of a KijiExpress job that reads from a Kiji table and writes to a file on HDFS. SongPlayCounter proceeds through these stages:

  • Read the column "info:track_plays" from rows in a Kiji table.
  • Break each user's track play history into individual songs.
  • Count the number of times each song has been played.
  • Write each song ID and play count to a file in HDFS.

The tutorial includes the job already written for you in Running the Job below; these sections walk through how the job is created.

Read "info:track_plays" from a Kiji table

Data can be read from a Kiji table by using KijiInput. This factory method takes options specific to requesting slices of data from a Kiji table such as:

  • Which columns to retrieve from the table and the field names they should be given.
  • Number of versions of each cell to return.
  • Filters to apply to the requested data.

For the tutorial, we call KijiInput to read all versions of the column "info:track_plays" from the provided Kiji table and bind the resulting value to the field named 'playlist:

  KijiInput.builder
      .withTableURI(args("table-uri"))
      .withColumnSpecs(
          QualifiedColumnInputSpec.builder
              .withColumn("info", "track_plays")
              .withMaxVersions(all)
              .build -> 'playlist
          )
      .build

Note the reference here to args("table-uri"), which evaluates to the value of the job-specific command-line option --table-uri .

Split each user's track play history into individual songs

Each cell in the "info:track_plays" column for a given row may contain multiple cell versions, each with a different timestamp. The different versions of the "info:track_plays" contain the songs that the corresponding user has listened to at different times. KijiExpress returns the multiple versions of a cell as a Scala Seq of cells.

We now unpack the data contained within each cell:

/**
  * Gets the ids of songs a user has listened to.
  *
  * @param slice from the column `info:track_plays` that records all the songs a user has
  *     listened to.
  * @return the song ids that a user has listened to.
  */
def songsListenedTo(slice: Seq[FlowCell[CharSequence]]): Seq[String] = {
  slice.map { cell => cell.datum.toString }
}

The function songsListenedTo simply extracts all of the data (i.e., the track that the user has listened to) from each cell.

We next invoke the the Scalding function flatMapTo on the unpacked stream of cells. flatMapTo is one of the set of Scalding map-like functions, which also include flatMap, map, and mapTo. The tuples resulting from a map or flatMap operation contain the same fields as the tuples the operation was run against, as well as the output fields generated by the operation. The results of mapTo and flatMapTo functions, however, contain only the output fields generated by the operation; all fields from the input tuple are dropped.

Moreover, the Scalding flatMapTo function flattens the list that it produces, hence the output of the following flatMapTo command is a pipe of tuples, each containing a single field, 'song.

The following command extracts the cell data using songsListenedTo and flattens the results pipe of lists of songs:

.flatMapTo('playlist -> 'song) { songsListenedTo }

Count the occurrences of each song

Now that we have separated each played song from the user that listened to it, we can calculate the play count for each song. To do so, we will use the Scalding groupBy function. groupBy takes two arguments:

  • A field (or fields) to group by
  • A function that aggregates the resulting tuples that shared the same value bound to the provided field.

In our case we want to group on the song name, and then count the number of tuples for each unique song name. We shall then bind the number of tuples to the field 'songCount:

.groupBy('song) { _.size('songCount) }

After this operation, the output pipe contains tuples with fields 'song and 'songCount.

Write the results to a file

Finally, we write the play counts to a TSV (tab-separated value) file on HDFS:

.write(Tsv(args("output")))

Note the reference here to args("output"), which evaluates to the value of the job-specific command-line option --output .

All Together Now

This code shows the entire pipeline put together:

SongPlayCounter.scala

Running the Job

  • Run the SongPlayCounter job:
express.py job --libjars="${MUSIC_EXPRESS_HOME}/lib/*" \
    --user-jar=${MUSIC_EXPRESS_HOME}/lib/kiji-express-music-2.0.1.jar \
    --job-name=org.kiji.express.music.SongPlayCounter --mode=hdfs \
    --table-uri ${KIJI}/users \
    --output express-tutorial/songcount-output

Verify Output

  • Run the following command to see the output of the job:
hadoop fs -tail express-tutorial/songcount-output/part-00000 | head -n 3
song-0  260
song-1  100
song-10 272