PlayCount
The 'Hello World!' of MapReduce
To quote Scalding Developers, Hadoop is a distributed system for counting words. Unfortunately we are fresh out of words, but we do have the play history of bajillions of users listening to bazillions of songs.
This MapReduce job uses the listening history of our users that we have stored in the users
Kiji
table to calculate the total number of times each song has been played. We then write the result of
this job to a text file in HDFS containing tab-separated values.
Creating the Job
SongPlayCounter
is an example of a KijiExpress job that reads from a Kiji table and writes to a
file on HDFS. SongPlayCounter proceeds through these stages:
- Read the column "info:track_plays" from rows in a Kiji table.
- Break each user's track play history into individual songs.
- Count the number of times each song has been played.
- Write each song ID and play count to a file in HDFS.
The tutorial includes the job already written for you in Running the Job below; these sections walk through how the job is created.
Read "info:track_plays" from a Kiji table
Data can be read from a Kiji table by using KijiInput
.
This factory method takes options specific to requesting slices of data from a Kiji table such as:
- Which columns to retrieve from the table and the field names they should be given.
- Number of versions of each cell to return.
- Filters to apply to the requested data.
For the tutorial, we call KijiInput
to read all versions of the column "info:track_plays" from the
provided Kiji table and bind the resulting value to the field named 'playlist
:
KijiInput.builder
.withTableURI(args("table-uri"))
.withColumnSpecs(
QualifiedColumnInputSpec.builder
.withColumn("info", "track_plays")
.withMaxVersions(all)
.build -> 'playlist
)
.build
Note the reference here to args("table-uri")
, which evaluates to the value of the job-specific
command-line option --table-uri
.
Split each user's track play history into individual songs
Each cell in the "info:track_plays" column for a given row may contain multiple cell versions, each with a different timestamp. The different versions of the "info:track_plays" contain the songs that the corresponding user has listened to at different times. KijiExpress returns the multiple versions of a cell as a Scala Seq of cells.
We now unpack the data contained within each cell:
/**
* Gets the ids of songs a user has listened to.
*
* @param slice from the column `info:track_plays` that records all the songs a user has
* listened to.
* @return the song ids that a user has listened to.
*/
def songsListenedTo(slice: Seq[FlowCell[CharSequence]]): Seq[String] = {
slice.map { cell => cell.datum.toString }
}
The function songsListenedTo
simply extracts all of the data (i.e., the track that the user has
listened to) from each cell.
We next invoke the the Scalding function flatMapTo
on the unpacked stream of cells. flatMapTo
is one of the set of
Scalding map-like functions,
which also include flatMap
, map
, and mapTo
. The tuples resulting from a map
or flatMap
operation
contain the same fields as the tuples the operation was run against, as well as the output fields
generated by the operation. The results of mapTo
and flatMapTo
functions, however, contain only
the output fields generated by the operation; all fields from the input tuple are dropped.
Moreover, the Scalding flatMapTo
function flattens the list that it produces, hence the output of
the following flatMapTo
command is a pipe of tuples, each containing a single field, 'song
.
The following command extracts the cell data using songsListenedTo
and flattens the results pipe
of lists of songs:
.flatMapTo('playlist -> 'song) { songsListenedTo }
Count the occurrences of each song
Now that we have separated each played song from the user that listened to it, we can calculate the
play count for each song. To do so, we will use the
Scalding groupBy function.
groupBy
takes two arguments:
- A field (or fields) to group by
- A function that aggregates the resulting tuples that shared the same value bound to the provided field.
In our case we want to group on the song name, and then count the number of tuples for each unique
song name. We shall then bind the number of tuples to the field 'songCount
:
.groupBy('song) { _.size('songCount) }
After this operation, the output pipe contains tuples with fields 'song
and 'songCount
.
Write the results to a file
Finally, we write the play counts to a TSV (tab-separated value) file on HDFS:
.write(Tsv(args("output")))
Note the reference here to args("output")
, which evaluates to the value of the job-specific
command-line option --output
.
All Together Now
This code shows the entire pipeline put together:
SongPlayCounter.scala
Running the Job
- Run the SongPlayCounter job:
express.py job --libjars="${MUSIC_EXPRESS_HOME}/lib/*" \
--user-jar=${MUSIC_EXPRESS_HOME}/lib/kiji-express-music-2.0.4.jar \
--job-name=org.kiji.express.music.SongPlayCounter --mode=hdfs \
--table-uri ${KIJI}/users \
--output express-tutorial/songcount-output
Verify Output
- Run the following command to see the output of the job:
hadoop fs -tail express-tutorial/songcount-output/part-00000 | head -n 3
song-0 260
song-1 100
song-10 272
KijiExpress Tutorial
- Overview
- KijiExpress Language
- Setting up Kiji and HDFS
- Importing Data
- PlayCount
- Top Next Songs
- Recommendations Producer