SequentialPlayCount
Instead of recommending the most popular songs to everyone using Pandorify, we want to tailor our recommendations based on user's listening history. For every user, we will look up the most recent song they have listened to and then recommend the song most frequently played after it. In order to do that, we need to create an index so that for each song, we can quickly look up what the most popular songs to listen to afterwards are.
So, we need to count the number of times two songs have been played, one after another. The
SequentialPlayCounter
and SequentialPlayCountReducer
allow us to do that.
SequentialPlayCounter.java
SequentialPlayCountReducer.java
SequentialPlayCounter
SequentialPlayCounter
operates in much the same way that SongPlayCounter
does, but it
requires a more complex key structure to store both the song played and the song that followed.
The easiest way work with complex keys in Kiji is to use Avro.
We define a SongBiGram
, which will be our key, as a pair of songs played sequentially by a
single user.
/** Song play bigram. */
record SongBiGram {
/** The ID of the first song played in a sequence. */
string first_song_played;
/** The ID of the song played immediately after it. */
string second_song_played;
}
Whereas SongPlayCounter
's output value class was Text.class
, SequentialPlayCounter
uses AvroKey.class
which requires that we also implement AvroKeyWriter
and override getAvroKeyWriterSchema()
to
fully define the Avro key format.
SequentialPlayCounter
executes the same basic stages as SongPlayCounter
, but with a more complex
gather operation.
Read track play data and compose complex keys
SequentialPlayCounter
reads the same data as SongPlayCounter
, but maintains a "sliding window"
of the most recent two track ids. For each song after the first, gather()
emits a key-value pair
where the key is a SongBiGram
of the two most recently played songs, and the value is one (1) as a
tally.
/** {@inheritDoc} */
@Override
public void gather(KijiRowData input, GathererContext<AvroKey<SongBiGram>, LongWritable> context)
throws IOException {
// Here we use a sliding window to build bigrams that represent pairs of songs that have
// ever been played one following another.
// The variables firstSong and secondSong slide along as we iterate through the track plays.
CharSequence firstSong = null;
CharSequence nextSong = null;
NavigableMap<Long, CharSequence> trackPlays = input.getValues("info", "track_plays");
for (CharSequence trackId : trackPlays.values()) { // Iterate through this user's track plays.
// Slide the window one song over.
firstSong = nextSong;
nextSong = trackId;
// If firstSong is null, we are at the beginning of the list and our sliding window
// only contains one song, so don't output it. Otherwise...
if (null != firstSong) {
// Create the bigram of these two songs.
mBiGram.setFirstSongPlayed(firstSong);
mBiGram.setSecondSongPlayed(nextSong);
// Emit the bigram of these two songs.
context.write(new AvroKey<SongBiGram>(mBiGram), ONE);
}
}
}
SequentialPlayCountReducer
This reducer takes in pairs of songs that have been played sequentially and the number one.
It then computes the number of times those songs have been played together, and emits the ID of
the first song as the key, and a SongCount
record representing the song played after the first as
the value. A SongCount
record has a field containing the ID of the subsequent song and a field
for the number of times it has been played after the initial song.
This reducer takes AvroKey
as input, and writes AvroKey
and AvroValue
as output, so it must
implement AvroKeyReader
, AvroKeyWriter
, and AvroValueWriter
. The keys we are emitting are just strings
so we could use a Text key. Instead, we made the choice to use an AvroKey
so that we could use the Kiji defined AvroKeyValue output format, which
requires that you output AvroKey
and AvroValue
.
The schema for our Avro key is so simple that we don't have to add a record to our avdl file
in order to return the correct schema in getWriterSchema()
. Instead, we can use the static methods
avro provides for creating schemas of primitive types.
public Schema getAvroKeyWriterSchema() throws IOException {
// Since we are writing AvroKeys, we need to specify the schema.
return SongBiGram.SCHEMA$;
}
Sum Sequential Plays
SequentialPlayCountReducer
starts with the same reduction operation that LongSumReducer
used to
count track plays in the SongCount
example, but diverges when emitting key-value pairs. Instead
of passing the keys through the reducer, SequentialPlayCountReducer
creates new keys based on the
track IDs in the SongBiGram
keys. The new keys are simply the first track ID from each bi-gram,
while the second track ID becomes part of the SongCount
value.
protected void reduce(AvroKey<SongBiGram> key, Iterable<LongWritable> values, Context context)
throws IOException, InterruptedException {
// Initialize sum to zero.
long sum = 0L;
// Add up all the values. When this reducer is used after SongPlayCounter, every value
// should be the number 1, so we are just counting the number of times the second song
// was played after the first (the key).
for (LongWritable value : values) {
sum += value.get();
}
// Set values for this count.
final SongBiGram songPair = key.datum();
final SongCount nextSongCount = SongCount.newBuilder()
.setCount(sum)
.setSongId(songPair.getSecondSongPlayed())
.build();
// Write out result for this song.
context.write(
new AvroKey<CharSequence>(songPair.getFirstSongPlayed().toString()),
new AvroValue<SongCount>(nextSongCount));
}
TestSequentialSongPlayCounter
To verify that SequentialPlayCounter
and SequentialPlayCountReducer
function as expected, their
test:
* Creates and populates an in-memory Kiji instance
* Runs a MapReduce job with SequentialPlayCounter
as the gatherer and SequentialPlayCountReducer
as the reducer
* Verifies that the output is as expected
TestSequentialSongPlayCounter.java
Create an in-memory Kiji instance
The InstanceBuilder class provides methods for populating a test Kiji instance. Once the test instance has been defined, its build method is called, creating the in-memory instance and table.
public final void setup() throws Exception {
final KijiTableLayout userLayout =
KijiTableLayout.createFromEffectiveJsonResource("/layout/users.json");
final String userTableName = userLayout.getName();
mUserTableURI = KijiURI.newBuilder(getKiji().getURI()).withTableName(userTableName).build();
new InstanceBuilder(getKiji())
.withTable(userTableName, userLayout)
.withRow("user-1").withFamily("info").withQualifier("track_plays")
.withValue(2L, "song-2")
.withValue(3L, "song-1")
.withRow("user-2").withFamily("info").withQualifier("track_plays")
.withValue(2L, "song-3")
.withValue(3L, "song-2")
.withValue(4L, "song-1")
.withRow("user-3").withFamily("info").withQualifier("track_plays")
.withValue(1L, "song-5")
.build();
}
Run and verify SequentialPlayCounter and SequentialPlayCountReducer
KijiGatherJobBuilder
is used to create a test MapReduce job. This job builder can be used outside
the context of a test to configure and run jobs programatically. The job is then run using Hadoop's
local job runner. The resulting output sequence file is then validated.
// Configure and run job.
final File outputDir = new File(getLocalTempDir(), "output.sequence_file");
final Path path = new Path("file://" + outputDir);
final KijiMapReduceJob mrjob = KijiGatherJobBuilder.create()
.withConf(getConf())
.withGatherer(SequentialPlayCounter.class)
.withReducer(SequentialPlayCountReducer.class)
.withInputTable(mUserTableURI)
// Note: the local map/reduce job runner does not allow more than one reducer:
.withOutput(new AvroKeyValueMapReduceJobOutput(new Path("file://" + outputDir), 1))
.build();
assertTrue(mrjob.run());
Reading back files is easy with normal file or table readers, currently avrokv files can be read in a limited way using a KeyValueStoreReader.
AvroKVRecordKeyValueStore.Builder kvStoreBuilder = AvroKVRecordKeyValueStore.builder()
.withInputPath(path).withConfiguration(getConf());
final AvroKVRecordKeyValueStore outputKeyValueStore = kvStoreBuilder.build();
KeyValueStoreReader reader = outputKeyValueStore.open();
// Check that our results are correct.
assertTrue(reader.containsKey("song-1"));
SongCount song1Result = (SongCount) reader.get("song-1");
assertEquals(2L, song1Result.getCount().longValue());
// Avro strings are deserialized to CharSequences in Java, .toString() allows junit to correctly
// compare the expected and actual values.
assertEquals("song-2", song1Result.getSongId().toString());
assertTrue(reader.containsKey("song-2"));
SongCount song2Result = (SongCount) reader.get("song-2");
assertEquals(1L, song2Result.getCount().longValue());
// Avro strings are deserialized to CharSequences in Java, .toString() allows junit to correctly
// compare the expected and actual values.
assertEquals("song-3", song2Result.getSongId().toString());
Running the Example
kiji gather \
--gatherer=org.kiji.examples.music.gather.SequentialPlayCounter \
--reducer=org.kiji.examples.music.reduce.SequentialPlayCountReducer \
--input="format=kiji table=${KIJI}/users" \
--output="format=avrokv file=output.sequentialPlayCount nsplits=2" \
--lib=${LIBS_DIR}
Verify
Because this job outputs Avro key-value files, which are binary and hard to read directly, we can use the
Hadoop job tracker to verify the success of the job. Using your favorite browser, navigate to
the JobTracker page (localhost:50030 by default). This is where you can
monitor all your Hadoop jobs. Locate the Kiji gather: SequentialPlayCounter
/
SequentialPlayCountReducer
job and navigate to the job page by clicking on the Job ID. On the
job page, check that Map output records number roughly 7000.
Music Recommendation Tutorial
- Overview
- Setup
- Bulk Importing
- PlayCount
- SequentialPlayCount
- NextTopSong
- Music Recommendation Producer