This section introduces Scala and the KijiExpress language. It does not have any tutorial steps. If you are already on board with Scala and the Scalding library for data processing, skip ahead to Setup; you can also refer back to the summary at the end of this page.

KijiExpress is built on top of Twitter's Scalding. Scalding is a powerful Scala library that can be used to process collections of data using MapReduce. It uses Cascading, which is a Java library that provides an abstraction over MapReduce and gives us data flow control. (Scala + Cascading = "Scalding")

Scala-Scalding-Cascading-Hadoop-Stack

Tuples and Pipelines

KijiExpress views a data set as a collection of named tuples. A named tuple can be thought of as an ordered list where each element has a name. When using KijiExpress with data stored in a Kiji table, a row from the Kiji table corresponds to a single tuple, where columns from the Kiji table correspond to named fields in the tuple. KijiExpress provides a view of a Kiji table as a collection of tuples by viewing each row from the table as a tuple.

By viewing a data set as a collection of named tuples, KijiExpress (through Scalding) allows you to transform your data using common functional operations.

With Scalding, data processing occurs in pipelines, where the input and output from each pipeline is a stream of named tuples represented by a data object. Each operation you described in your KijiExpress program, or job, defines the input, output, and processing for a pipe.

Jobs

Scala - the language in which you'll write KijiExpress jobs - is object-oriented, so while functions are called with the familiar syntax function(object), there are sometimes methods defined on objects, where the syntax of using that method on the object is object.function(parameters).

For example, the line

val userData = userDataInput.project('username, 'stateId, 'totalSpent)

is calling the project method on the object userDataInput, with the arguments 'username, 'stateId, and 'totalSpent. The result of this is another object, called userData.

KijiExpress allows users to access tuple fields by using a ' (single quote) to name the field. The function above operates on the username, stateId, and totalSpent fields by including the 'username, 'stateId, and 'totalSpent symbols in the first parameter group.

When writing KijiExpress jobs, your methods will often take a first argument group in parentheses that specifies a mapping from input field names to output field names. You can then define a function in curly braces {} immediately following that defines how to map from the input fields to the output fields. The syntax looks like this:

input.method ('input-field -> 'output-field) {x => function(x) }

For example, consider the line:

val userDataInput = input.map('line -> ('username, 'stateId, 'totalSpent)) { line: String =>
    (line.split(" ")(0), line.split(" ")(1), line.split(" ")(2)) }

We call the map method on userDataInput, from the input field line to the output fields username, stateId, and totalSpent. Remember that fields are marked with the single quote. Then to indicate how to map the input to output, we pass the function { line: String => (line.split(" ")(0), line.split(" ")(1), line.split(" ")(2)) } as another argument. This function returns a 3-tuple; the elements of the output tuple are used to populate the output fields username, stateId, and totalSpent. When using the map method, this function is called on the line field to populate the username, stateId, and totalSpent fields.

A Simple Example Job

For a demonstration of some common methods on pipes, consider this simple KijiExpress job. At each step, fields in the tuple can be created and operated on.

This script reads customer data from "user-file.txt" and cleans it up. It keeps only users who have spent more than $2, cleans up the data by joining it with side data in "state-names.txt", counts the number of spendy users by state, and writes the result of that to an output file.

This script expects two files in the directory you run it from:

  1. "user-file.txt," which contains user data in the form "username stateID totalSpent" on each line.
  2. "state-names.txt" which contains a mapping from state IDs to state names in the form "numericalID stateName" on each line, for example "1 California".

A detailed description of each part follows the script.

// Read data from a text file.
val input = TextLine("user-file.txt").read

// Split each line on spaces into the fields username, stateId, and totalSpent.
val userDataInput = input.map('line -> ('username, 'stateId, 'totalSpent)) { line: String =>
    // Split the line up on the spaces, and create a tuple with the first, second, and third words
    // in the line, in that order.
    (line.split(" ")(0), line.split(" ")(1), line.split(" ")(2)) }

// Keep only the username, stateId, and totalSpent fields.
val userData = userDataInput.project('username, 'stateId, 'totalSpent)

// Keep only the customers who spent more than $2.00.
val importantCustomerData = userData
    .filter('totalSpent) { totalSpent: String => totalSpent.toDouble > 2.0 }

// Create a new pipeline containing state ID to state name mappings.
val sideData = TextLine("state-names.txt").read
    .map('line -> ('numericalStateId, 'stateName)) { line: String =>
        // Split the line up on the spaces, and create a tuple with the first and second words in
        // the line.
        (line.split(" ")(0), line.split(" ")(1))
    }

// Join the pipelines on the field stateId from "importantCustomerData" and numericalStateId
// from "sideData".
val importantCustomerDataWithStateNames = importantCustomerData
    .joinWithSmaller('stateId -> 'numericalStateId, sideData)
    // Keepy only the userId and stateId fields
    .project('userId, 'stateId)

// Group by the states customers are from and compute the size of each group.
val importantCustomersPerState = importantCustomerDataWithStateNames
    .groupBy('stateName) { group => group.size('customersPerState) }

// Output to a file in tab-separated form.
importantCustomersPerState.write(Tsv("important-customers-by-state.txt"))

Input

// Read data from a text file.
val input = TextLine("user-file.txt").read

First, we read our input with TextLine, which is a predefined Scalding Source that reads lines of text from a file. TextLine views a file (in this case the file user-file.txt in HDFS) as a collection of tuples, one per line of text. Each tuple has a field named line, which contains the corresponding line of text read from the file, as well as a field named offset (not used here), which holds the byte offset of the line within the file.

Once we have a view of the data set as a collection of tuples, we can use different operations to derive results that we can store in new tuple fields.

Map

// Split each line on spaces into the fields username, stateId, and totalSpent.
val userDataInput = input.map('line -> ('username, 'stateId, 'totalSpent)) { line: String =>
    // Split the line up on the spaces, and create a tuple with the first, second, and third words
    // in the line, in that order.
    (line.split(" ")(0), line.split(" ")(1), line.split(" ")(2)) }

This statement creates userDataInput, which contains the fields line, offset, username, stateId, and totalSpent. Notice that doing a map operation on input keeps the fields line and offset around, and adds the username, stateId, and totalSpent fields.

Project

// Keep only the username, stateId, and totalSpent fields.
val userData = userDataInput.project('username, 'stateId, 'totalSpent)

We no longer need the line or offset fields. The project method projects the tuples onto the specified fields, discarding any unspecified fields. userData contains the same tuples as userDataInput, but without the line and offset fields that TextLine provided.

Filter

// Keep only the customers who spent more than $2.00.
val importantCustomerData = userData
    .filter('totalSpent) { totalSpent: String => totalSpent.toDouble > 2.0 }

This statement creates importantCustomerData, a stream of named tuples, each of which has the same three fields as userData does: username, stateId, and totalSpent. importantCustomerData, however, contains only the tuples from userData for which the function we provide to the filter operation evaluates to true, e.g., users who have spent more than two dollars on our service.

Join

// Create a new pipeline containing state ID to state name mappings.
val sideData = TextLine("state-names.txt").read
    .map('line -> ('numericalStateId, 'stateName)) { line: String =>
        // Split the line up on the spaces, and create a tuple with the first and second words in
        // the line.
        (line.split(" ")(0), line.split(" ")(1))
    }

// Join the pipelines on the field 'stateId from "importantCustomerData" and 'numericalStateId
// from "sideData".
val importantCustomerDataWithStateNames = importantCustomerData
    .joinWithSmaller('stateId -> 'numericalStateId, sideData)
    // Keepy only the userId and stateId fields
    .project('userId, 'stateId)

In this step, we perform a join operation to add state names to importantCustomerData. First we define the pipe, sideData, with which to join importantCustomerData. sideData contains tuples with fields line, numericalStateId, and stateName (as well as offset). You've seen TextLine and .map before. Notice that we have chained calls such as .map on pipes, creating a pipeline that looks like TextLine(inputfile).read.map(...).

We now join our main pipeline with the sideData pipe, specifying the fields (in this case, the field stateId from importantCustomerData and numericalStateId from sideData. The join operation adds all of the fields (line, offset, numericalStateId, and stateName) from sideData to every tuple in importantCustomerData for which numericalStateId equals stateId.

Since sideData is smaller than importantCustomerData (sideData contains only 50 tuples, one for each state in the United States, while importantCustomerData could be very big), we use the joinWithSmaller operation on importantCustomerData. Specifying which pipe we expect to be smaller enables Scalding optimize the MapReduce jobs.

Finally, we apply another projection to our pipe, retaining only the fields userId and stateId (since our goal is to obtain per-state counts of customers who have spent more than two dollars).

Group by

// Group by the states customers are from and compute the size of each group.
val importantCustomersPerState = importantCustomerDataWithStateNames
    .groupBy('stateName) { group => group.size('customersPerState) }

This step groups the tuples from the previous step by their stateName, and for each group, puts the size of the group in a new field called customersPerState.

Output

// Output to a file in tab-separated form.
importantCustomersPerState.write(Tsv("important-customers-by-state.txt"))

Tsv is a predefined Scalding source. It writes the tuples out to a file in tab-separated form. KijiExpress provides sources to read from and write to Kiji tables, which you will see later in the tutorial.

Results

If you run this script with the file "user-file.txt":

daisy 1 3
robert 4 0
kiyan 2 5
juliet 1 4
renuka 2 2

and "state-names.txt":

1 California
2 Washington

Then the output in the file important-customers-by-state.txt will be:

California  2
Washington  1

This result show that there are two customers in California and one in Washington who spent more than two dollars.

Scala Quick Reference

Below we summarize the Scala commands we used in our example Scalding script.

Indicate Fields

Precede field names with a single quote:

<object>.map(('<input-field>, '<input-field> ...) -> ('<mapped-field>, '<mapped-field>, ..))

Input From File

val <variable-name> = TextLine("<filename>")

Map

Include the input and output fields.

val <variable-name> = <object>.map('<input-field> -> ('<output-field1>, '<output-field2>, ...)) { <map function> }

Include only the output fields:

val <variable-name> = <object>.mapTo('<input-field> -> ('<output-field1>, '<output-field2>, ...)) { <map function> }

Split Tuple at Blanks

{ <object>: String => (<object>.split(" ")(0), <object>.split(" ")(1)) }

Project

val <variable-name> = <object>.project('<field1>, '<field2>, ...)

Filter

val <variable-name> = <object>.filter('<field>, '<field>, ...) { function }

Join

In addition, there are methods joinWithLarger and joinWithTiny. See Scalding Join Operations.

val <variable-name> = <object>.joinWithSmaller('<field-from-this-data-set> -> '<field-from-other-data-set>, <other-data-set>)

Group By

val <variable-name> = <object>.groupBy('<field>) { <group function> }

Group By Value

val <variable-name> = <object>.groupBy('<field>) { x => x }

Calculate Size

val <variable-name> = <object>.groupBy('<field>) { <group> => <group>.size('<field>) }

Output TSV

For other sources in addition to Tsv, see Scalding Sources.

<object>.write(Tsv("<filename>"))

Scalding Resources

There are many resources available to learn more about the Scalding library.