Saturday, January 14, 2012

Writing to MySQL in Hadoop

Continuing from the previous post, I will now create a Reducer that writes the baseball results to a MySQL database. This example is mainly helpful for writing summary data and not for tracking on going data as handling keys is tricky. For that you can write a custom output format that uses its own writer to communicate to the database of your choice. For now we will use a simple reducer to write to a simple table. The table I'm writing to looks like this:


mysql> desc records;
+-------+--------------+------+-----+---------+-------+
| Field | Type | Null | Key | Default | Extra |
+-------+--------------+------+-----+---------+-------+
| team | varchar(100) | YES | | NULL | |
| wins | int(11) | YES | | NULL | |
| loses | int(11) | YES | | NULL | |
+-------+--------------+------+-----+---------+-------+
3 rows in set (0.59 sec)


All we care about is the team name and the wins and losses from the source feed. To start out the example we will show the configuration setup.


DBConfiguration.configureDB(conf, "com.mysql.jdbc.Driver", "jdbc:mysql://127.0.0.1/baseball", "visitor","vispass");
//job set up code here
...

//set up the output format with our table name "records" and our three column names
String [] fields = { "team", "wins", "loses"};
DBOutputFormat.setOutput(job, "records", fields);
//tell the job what reducer to use
job.setReducerClass(ScoreReducer.class);


The configuraton line specifies our local MySQL url and the name of the database called baseball along with the user name and password. It is important that this configuration line be placed before the creation of the Job class or the settings will not be persisted to the reducer and output format class. The reducer class takes out Text team key and list of integer values and turns them in to win and loss totals, placing them in a database record.


public static class ScoreReducer extends Reducer<Text, IntWritable, BaseballDBWritable, Text> {

@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {

int wins = 0;
int losses = 0;

//get iterator so we can loop through wins and loses
Iterator<IntWritable> vals = values.iterator();

int sum = 0;
while(vals.hasNext()) {
int n = vals.next().get();

if (n == 0) //0 indicates a loss
losses++;
else //1 indicates a win
wins++;

}
//create our record object to persist the team's wins and losses
BaseballDBWritable record = new BaseballDBWritable(key.toString(), wins, losses);
context.write(record, key);
}

}


To write out to a database, our key class needs to implement the DBWritable class. In this case I've created a custom one called BaseballDBWritable which holds the fields for each record. The ordering of the database record as the key is necessary due to the way Hadoop passes the information off to the output. The value class here is unimportant and not used. The custom writable class looks like this:


public static class BaseballDBWritable implements Writable, DBWritable {

String team;
int wins;
int losses;

public BaseballDBWritable(String team, int wins, int losses) {
this.team = team;
this.wins = wins;
this.losses = losses;
}

@Override
public void readFields(ResultSet arg0) throws SQLException {
}

@Override
public void write(PreparedStatement arg0) throws SQLException {
arg0.setString(1, team);
arg0.setInt(2, wins);
arg0.setInt(3, losses);
}

@Override
public void readFields(DataInput arg0) throws IOException {
}

@Override
public void write(DataOutput arg0) throws IOException {
}
}


All we do here is add our values to the prepared statement created behind the scenes. The default database output format class will handle creating and executing the surrounding sql. Once all this is done your job is ready to go. Assuming a blank table from the start and a number of rows in our source database, the output records should look like this:


mysql> select * from records;
+-----------+------+-------+
| team | wins | loses |
+-----------+------+-------+
| Astros | 1 | 0 |
| Athletics | 2 | 0 |
| Dodgers | 0 | 1 |
| Giants | 1 | 0 |
| Marlins | 1 | 0 |
| Mets | 0 | 1 |
| Padres | 0 | 1 |
| Phillies | 1 | 0 |
| Rays | 0 | 1 |
| Red Sox | 0 | 1 |
| Reds | 0 | 1 |
| Yankees | 1 | 1 |
+-----------+------+-------+
12 rows in set (0.03 sec)

Thursday, January 12, 2012

Reading from HBase in Hadoop

A useful application of a MapReduce job could be to move data from a large data store and summarize it in a smaller store. I decided to try moving data from HBase in to another store. Using the latest library it is pretty easy. I started with a simple table called baseballscores. The first row with id 'game1' looks like this:

hbase(main):001:0> get 'baseballscores', 'game1'
COLUMN CELL
loser:score timestamp=1325913333740, value=2
loser:team timestamp=1325913325984, value=Rays
winner:score timestamp=1325913306939, value=5
winner:team timestamp=1325913295557, value=Athletics


The table has two column families and two columns per family. To read from this table, we first we need to define our configuration. We can use the TableMapReduceUtil class to help set up the mapper job. We need to pass it the name of our table, the mapper class name, the mapper output key class type and the output value class from the mapper.


//create our configuration
Configuration conf = HBaseConfiguration.create();
Job job = Job.getInstance(conf, "HBaseReader");
job.setJarByClass(HBaseMapReduce.class);
Scan scan = new Scan();

//set up mapper job to use baseballscores table
TableMapReduceUtil.initTableMapperJob("baseballscores", scan, HBaseMapper.class, Text.class, Text.class, job);


Now we can create out mapper class to take the data from the HBase table. The values collection contains our column data. To get a specific column, we can call Values.getValue and pass in the name of the column family and then the column name. Here we extract the winning and losing team names and then associate them with a win or loss key so we can total them later.

   static class HBaseMapper extends TableMapper<Text, Text> {

@Override
public void map(ImmutableBytesWritable row, Result values, Context context) throws IOException {

Text winningTeam = new Text(Bytes.toString(values.getValue(Bytes.toBytes("winner"), Bytes.toBytes("team"))));
Text losingTeam = new Text(Bytes.toString(values.getValue(Bytes.toBytes("loser"), Bytes.toBytes("team"))));

try {
context.write("win", winningTeam);
context.write("loss", losingTeam);
} catch (InterruptedException e) {
throw new IOException(e);
}
}
}



And that's it. You can write the reducer of your choice and put the data anywhere you like.

Thursday, January 5, 2012

Custom File Output in Hadoop

I had been curious about the best way to control text output from a job and decided it was probably best to create my own output class. By simply extending the default FileOutputFormat class you can control all aspects of the output.

The first step is to create the output class. You need to be aware of the form returned by the reduce function. In my case, it sends a Text key and a list of integers associated with it so I need to define the template definition accordingly. The simple class looks like this:


public class MyTextOutputFormat extends FileOutputFormat<Text, List<IntWritable>> {
@Override
public org.apache.hadoop.mapreduce.RecordWriter<Text, List<Intwritable>> getRecordWriter(TaskAttemptContext arg0) throws IOException, InterruptedException {
//get the current path
Path path = FileOutputFormat.getOutputPath(arg0);
//create the full path with the output directory plus our filename
Path fullPath = new Path(path, "result.txt");

//create the file in the file system
FileSystem fs = path.getFileSystem(arg0.getConfiguration());
FSDataOutputStream fileOut = fs.create(fullPath, arg0);

//create our record writer with the new file
return new MyCustomRecordWriter(fileOut);
}
}


After figuring out the full path and creating our output file, we then need to create an instance of the actual record writer to pass back to the reduce job. That class allows us to actual write to the file however we want. Again the template definition needs to match the form coming from the reducer. Mine class looks like this:

public class MyCustomRecordWriter extends RecordWriter<Text, List<IntWritable>> {
private DataOutputStream out;

public MyCustomRecordWriter(DataOutputStream stream) {
out = stream;
try {
out.writeBytes("results:\r\n");
}
catch (Exception ex) {
}
}

@Override
public void close(TaskAttemptContext arg0) throws IOException, InterruptedException {
//close our file
out.close();
}

@Override
public void write(Text arg0, List arg1) throws IOException, InterruptedException {
//write out our key
out.writeBytes(arg0.toString() + ": ");
//loop through all values associated with our key and write them with commas between
for (int i=0; i<arg1.size(); i++) {
if (i>0)
out.writeBytes(",");
out.writeBytes(String.valueOf(arg1.get(i)));
}
out.writeBytes("\r\n");
}
}


Finally we need to tell our job about our ouput format and the path before running it.


job.setOutputKeyClass(Text.class);
job.setOutputValueClass(ArrayList.class);
job.setOutputFormatClass(MyTextOutputFormat.class);
FileOutputFormat.setOutputPath(job, new Path("/home/hadoop/out"));



And that's it. Two simple classes allow us all the control we need.