Recommendations Producer
Personalizing Recommendations
In the previous tutorial step, for each song, we computed the songs most frequently played next. We now use this data to make actual recommendations. We simply look at the last song each user has listened to and recommend the most popular song played after it.
Specifically, the songs-table
has information about the top next songs in its column
"info:top_next_songs". We have the latest song a user has played in the users-table
in column
"info:track_plays." We now have to join these to get our result.
The next sections walk through the pipeline line-by-line and describe the custom functions as they appear. The entire file is available at the end of the page.
Get the most popular song played next
We first describe a helper function which retrieves the song ID of the most popular song given the
list of top next songs. getMostPopularSong
takes as its input a Scala Seq
of Kiji cells
containing AvroRecords from the "info:top_next_songs" column of the songs table.
The "info:top_next_songs" column contains a TopSongs
Avro record. You can look in the
ExpressMusic.avdl file for the definitions of these Avro records:
/** A count of the number of times a song has been played. */
record SongCount {
string song_id;
long count;
}
/** Container record for the top songs and their number of plays. */
record TopSongs {
array<SongCount> top_songs;
}
In the Scala script for this step in the tutorial, we read out Avro data from our Kiji table as
specific Avro records. We can access the fields within those records with getter functions,
possibly followed by casts to the appropriate primitive type. For example, to get the value of
count
within a SongCount
Avro record, we would use the following:
val myCount: String = songCount.getCount.toLong
In the getMostPopularSong
function below, we retrieve the most popular song from a TopSongs
record by getting the first song in the list of top_songs
within the Avro record. Recall that we
sorted this list in the previous step in the tutorial. The items within the top_songs
array are
instances of SongCount
, which contains a song_id
member, which is what we need to recommend the
next song to play:
/**
* This method retrieves the most popular song (at index 0) in the TopSongs record.
*
* @param songs from the TopSongs record.
* @return the most popular song.
*/
def getMostPopularSong(songs: Seq[FlowCell[TopSongs]]): String = {
songs.head.datum.getTopSongs.get(0).getSongId.toString
}
Apply the above function on a Scalding Pipe
Next, we construct a pipe does the following:
- Reads the column "info:top_next_songs" from the
songs
table as'topNextSongs
. - Uses the first component of a given row's
EntityId
as thesongId
. - Applies the above described
getMostPopularSong
to all the songs. - Outputs tuples of
('songId, nextSong)
The code follows:
val recommendedSong = KijiInput.builder
.withTableURI(args("songs-table"))
.withColumnSpecs(QualifiedColumnInputSpec.builder
.withColumn("info", "top_next_songs")
.withSchemaSpec(SchemaSpec.Specific(classOf[TopSongs]))
.build -> 'topNextSongs)
.build
.map('entityId -> 'songId) { eId: EntityId => eId(0) }
.map('topNextSongs -> 'nextSong) { getMostPopularSong }
.project('songId, 'nextSong)
Putting it all together
Finally we create a flow that does the following:
- Reads the column "info:track_plays" from the users table, which contains the listening history.
- Retrieves the song most recently played by a user into the field
'lastTrackPlayed
. - Generates the recommendation by joining the two pipes, one of which contains the
'user
and'songId
fields (for the last song played), and the other which contains the'songId
and'nextSong
fields for the most popular top songs.
KijiInput.builder
.withTableURI(args("users-table"))
.withColumns("info:track_plays" -> 'trackPlays)
.build
.map('trackPlays -> 'lastTrackPlayed) {
slice: Seq[FlowCell[CharSequence]] => slice.head.datum.toString }
.joinWithSmaller('lastTrackPlayed -> 'songId, recommendedSong)
.write(KijiOutput.builder
.withTableURI(args("users-table"))
.withColumns('nextSong -> "info:next_song_rec")
.build
Running the Example
- To run the SongRecommender job:
express.py job \
--jars="${MUSIC_EXPRESS_HOME}/lib/*" \
--class=org.kiji.express.music.SongRecommender \
--mode=hdfs \
--songs-table ${KIJI}/songs \
--users-table ${KIJI}/users
Verify Output
You can verify the output in an HBase-backed Kiji instance by scanning the users-table
.
kiji scan ${KIJI}/users --max-rows=2
You should see something like:
Scanning kiji table: kiji://localhost:2181/kiji_express_music/users/
entity-id=['user-41'] [1325762580000] info:track_plays
song-41
entity-id=['user-41'] [1379354039975] info:next_song_rec
song-41
entity-id=['user-3'] [1325751420000] info:track_plays
song-0
entity-id=['user-3'] [1379354034880] info:next_song_rec
song-0
And if using Cassandra:
kiji get ${KIJI}/users --entity-id="['user-41']"
And you should see:
Looking up entity: ['user-41'] from kiji table: kiji-cassandra://localhost:2181/localhost:9042/kiji_express_music/users/
entity-id=['user-41'] [1325762580000] info:track_plays
song-41
entity-id=['user-41'] [1409631207364] info:next_song_rec
song-41
Shut down the cluster
That's the end of the Express tutorial!
Now is a good time to shut down the BentoBox cluster. If you have been using HBase, run:
bento stop
And if you have been using Cassandra, run:
cassandra-bento stop
Top Next Songs Job Content
Here's the entire SongRecommender job:
SongPlayCounter.express
KijiExpress Tutorial
- Overview
- KijiExpress Language
- Setting up Kiji and HDFS
- Importing Data
- PlayCount
- Top Next Songs
- Recommendations Producer