mysql> desc records;
+-------+--------------+------+-----+---------+-------+
| Field | Type | Null | Key | Default | Extra |
+-------+--------------+------+-----+---------+-------+
| team | varchar(100) | YES | | NULL | |
| wins | int(11) | YES | | NULL | |
| loses | int(11) | YES | | NULL | |
+-------+--------------+------+-----+---------+-------+
3 rows in set (0.59 sec)
All we care about is the team name and the wins and losses from the source feed. To start out the example we will show the configuration setup.
DBConfiguration.configureDB(conf, "com.mysql.jdbc.Driver", "jdbc:mysql://127.0.0.1/baseball", "visitor","vispass");
//job set up code here
...
//set up the output format with our table name "records" and our three column names
String [] fields = { "team", "wins", "loses"};
DBOutputFormat.setOutput(job, "records", fields);
//tell the job what reducer to use
job.setReducerClass(ScoreReducer.class);
The configuraton line specifies our local MySQL url and the name of the database called baseball along with the user name and password. It is important that this configuration line be placed before the creation of the Job class or the settings will not be persisted to the reducer and output format class. The reducer class takes out Text team key and list of integer values and turns them in to win and loss totals, placing them in a database record.
public static class ScoreReducer extends Reducer<Text, IntWritable, BaseballDBWritable, Text> {
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int wins = 0;
int losses = 0;
//get iterator so we can loop through wins and loses
Iterator<IntWritable> vals = values.iterator();
int sum = 0;
while(vals.hasNext()) {
int n = vals.next().get();
if (n == 0) //0 indicates a loss
losses++;
else //1 indicates a win
wins++;
}
//create our record object to persist the team's wins and losses
BaseballDBWritable record = new BaseballDBWritable(key.toString(), wins, losses);
context.write(record, key);
}
}
To write out to a database, our key class needs to implement the DBWritable class. In this case I've created a custom one called BaseballDBWritable which holds the fields for each record. The ordering of the database record as the key is necessary due to the way Hadoop passes the information off to the output. The value class here is unimportant and not used. The custom writable class looks like this:
public static class BaseballDBWritable implements Writable, DBWritable {
String team;
int wins;
int losses;
public BaseballDBWritable(String team, int wins, int losses) {
this.team = team;
this.wins = wins;
this.losses = losses;
}
@Override
public void readFields(ResultSet arg0) throws SQLException {
}
@Override
public void write(PreparedStatement arg0) throws SQLException {
arg0.setString(1, team);
arg0.setInt(2, wins);
arg0.setInt(3, losses);
}
@Override
public void readFields(DataInput arg0) throws IOException {
}
@Override
public void write(DataOutput arg0) throws IOException {
}
}
All we do here is add our values to the prepared statement created behind the scenes. The default database output format class will handle creating and executing the surrounding sql. Once all this is done your job is ready to go. Assuming a blank table from the start and a number of rows in our source database, the output records should look like this:
mysql> select * from records;
+-----------+------+-------+
| team | wins | loses |
+-----------+------+-------+
| Astros | 1 | 0 |
| Athletics | 2 | 0 |
| Dodgers | 0 | 1 |
| Giants | 1 | 0 |
| Marlins | 1 | 0 |
| Mets | 0 | 1 |
| Padres | 0 | 1 |
| Phillies | 1 | 0 |
| Rays | 0 | 1 |
| Red Sox | 0 | 1 |
| Reds | 0 | 1 |
| Yankees | 1 | 1 |
+-----------+------+-------+
12 rows in set (0.03 sec)