Friday, August 17, 2012

Parallel Programming with the C++ AMP library

Microsoft's C++ AMP library is a group of language extensions that allow your C++ code to take advantage of the mulitple cores on a GPU. The new code works seamlessly with existing code and makes it easy to increase the parallelism of your programs. The library comes with code for features such as arrays, memory transferring and mathematical functions. To use it, you wil first need a copy of visual Studio 2012 which at present can be downloaded for free as a release candidate. The libraries automatically come installed with it.

Once you have visual Studio up and running you can begin. Simply create an empty Win32 console application under the Visual C++ project types. Create a .cpp file to hold your main function. To make use of the code you simply need to include the amp.h header file in your code. We'll start with just a simple loop that will modify an array in parallel.

Microsoft gives you two data structures for modifying data in parallel, array and array_view. Here we will copy elements in to an array_view so we can act on them.

Now we will loop through all three members and add 10 to them.

This code loops three times in parallel, adding 10 to each threads part of the array. Now we can copy the data back to a vector and display the results.

The final code looks like this.

And that's it for this simple intro. There's plenty more to explore though.

Thursday, March 8, 2012

Intro to Cuda C

This is a simple tutorial on creating an application with NVidia's Cuda toolkit. Cuda allows you to write applications that utilize the GPU for processing. It requires hardware that supports a version of Cuda. If your video card supports it, first make sure to have the latest drivers.

Cuda is a C like language that produces code to run on a GPU. You can combine both regualr C and Cuda C code in a single application and go between them easily. You'll need a regular C compiler and/or environment as well as NVidia's Cuda C compiler. This tutorial will use Visual Studio 2008 but the general ideas would work with gcc as well. You will need to download the Cuda toolkit to get the compiler. The 4.1 version is here:
http://developer.nvidia.com/cuda-toolkit-41

I personally had trouble using this version and instead used this one:
http://developer.nvidia.com/cuda-toolkit-32-downloads

The two versions can co-exist without issues. You can also install the SDK to see code samples. Once everything is installed, you can launch Visual Studio and create a new project. Pick a Visual C++ project of type Win 32 Console Application. Call it whatever you like and click OK.


At the next Window, click Application Settings and check Empty Project. Then click Finish.


You now have a blank project. Right click on the Source Files folder in the Solution Explorer and choose Add > New Item. Add a new class and call it main.cu and click OK.

We can now start adding code. The CU file we created will be compiled by the NVidia compiler contained in the toolkit and then can be run. We will see how to set that up later. The NVidia compiler accepts standard C in addition to its own extensions so its easy to learn. For our sample we will write a function to add two integers together and store the result in a third. The method will be designed to run on the GPU using memory allocated on it and called from regular CPU code. It looks like this:


Here we have two integers and a pointer which will store our result to be passed back to the CPU. Note the use of the keyword global which marks our function as an entry point for GPU code. Any function called directly from regular C code must be marked with global.

To allow the GPU to write a value to our int, we need to allocate some memory for the c parameter. This is done with the cudaMalloc function which is similar to malloc. We just need to tell it that we want to allocate space for an int. The following is our initialization code:


Now we can call our add function. A special syntax is used for calling GPU code to pass in the number of blocks and threads we want our function to run on. It takes the form <<<#BLOCKS,#THREADS>>>. This allows us control of parallelism if we wish to take advantage of it. For now we are using only 1 block and 1 thread for the simple calculation. After the function call we copy the result to a our int called answer using the cudaMemcpy function. This same function can be used for copying allocated memory from the CPU to GPU before a method call, such as if we ae passing a filled array to the GPU. The code looks like this:


Now we are ready to display or answer. The final code looks like this:



Now we are ready to test our code. First we need to add the proper build rule. Right click on our project name in the Solution Explorer and choose Custom Build Rules...


From this list we want to pick the version of Cuda we want to run against. I picked the 3.2 Runtime API rule. It might take some experimenting with which version will work for you if you installed multiple toolkits. Once you have picked, click OK.


Now we need to add the proper library to our project. Right click on the Project in the Solution Explorer and choose Properties.


Go to the General item under Linker and add a path by Additional Library Directories. The path to add should be the lib path under the location where you installed the toolkit plus whether your app is 32 bit or 64. For example, the path on my machine is C:\Program Files\NVIDIA GPU Computing Toolkit\CUDA\v3.2\lib\Win32 because I am building a 32 bit app.


Next you need to reference a specific lib. In the Input section under Linker, add cudart.lib to the Additional Dependencies line at the top.


Click OK to exit the properties. You are now ready to run your application. Run without debugging and see the result.

Saturday, January 14, 2012

Writing to MySQL in Hadoop

Continuing from the previous post, I will now create a Reducer that writes the baseball results to a MySQL database. This example is mainly helpful for writing summary data and not for tracking on going data as handling keys is tricky. For that you can write a custom output format that uses its own writer to communicate to the database of your choice. For now we will use a simple reducer to write to a simple table. The table I'm writing to looks like this:


mysql> desc records;
+-------+--------------+------+-----+---------+-------+
| Field | Type | Null | Key | Default | Extra |
+-------+--------------+------+-----+---------+-------+
| team | varchar(100) | YES | | NULL | |
| wins | int(11) | YES | | NULL | |
| loses | int(11) | YES | | NULL | |
+-------+--------------+------+-----+---------+-------+
3 rows in set (0.59 sec)


All we care about is the team name and the wins and losses from the source feed. To start out the example we will show the configuration setup.


DBConfiguration.configureDB(conf, "com.mysql.jdbc.Driver", "jdbc:mysql://127.0.0.1/baseball", "visitor","vispass");
//job set up code here
...

//set up the output format with our table name "records" and our three column names
String [] fields = { "team", "wins", "loses"};
DBOutputFormat.setOutput(job, "records", fields);
//tell the job what reducer to use
job.setReducerClass(ScoreReducer.class);


The configuraton line specifies our local MySQL url and the name of the database called baseball along with the user name and password. It is important that this configuration line be placed before the creation of the Job class or the settings will not be persisted to the reducer and output format class. The reducer class takes out Text team key and list of integer values and turns them in to win and loss totals, placing them in a database record.


public static class ScoreReducer extends Reducer<Text, IntWritable, BaseballDBWritable, Text> {

@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {

int wins = 0;
int losses = 0;

//get iterator so we can loop through wins and loses
Iterator<IntWritable> vals = values.iterator();

int sum = 0;
while(vals.hasNext()) {
int n = vals.next().get();

if (n == 0) //0 indicates a loss
losses++;
else //1 indicates a win
wins++;

}
//create our record object to persist the team's wins and losses
BaseballDBWritable record = new BaseballDBWritable(key.toString(), wins, losses);
context.write(record, key);
}

}


To write out to a database, our key class needs to implement the DBWritable class. In this case I've created a custom one called BaseballDBWritable which holds the fields for each record. The ordering of the database record as the key is necessary due to the way Hadoop passes the information off to the output. The value class here is unimportant and not used. The custom writable class looks like this:


public static class BaseballDBWritable implements Writable, DBWritable {

String team;
int wins;
int losses;

public BaseballDBWritable(String team, int wins, int losses) {
this.team = team;
this.wins = wins;
this.losses = losses;
}

@Override
public void readFields(ResultSet arg0) throws SQLException {
}

@Override
public void write(PreparedStatement arg0) throws SQLException {
arg0.setString(1, team);
arg0.setInt(2, wins);
arg0.setInt(3, losses);
}

@Override
public void readFields(DataInput arg0) throws IOException {
}

@Override
public void write(DataOutput arg0) throws IOException {
}
}


All we do here is add our values to the prepared statement created behind the scenes. The default database output format class will handle creating and executing the surrounding sql. Once all this is done your job is ready to go. Assuming a blank table from the start and a number of rows in our source database, the output records should look like this:


mysql> select * from records;
+-----------+------+-------+
| team | wins | loses |
+-----------+------+-------+
| Astros | 1 | 0 |
| Athletics | 2 | 0 |
| Dodgers | 0 | 1 |
| Giants | 1 | 0 |
| Marlins | 1 | 0 |
| Mets | 0 | 1 |
| Padres | 0 | 1 |
| Phillies | 1 | 0 |
| Rays | 0 | 1 |
| Red Sox | 0 | 1 |
| Reds | 0 | 1 |
| Yankees | 1 | 1 |
+-----------+------+-------+
12 rows in set (0.03 sec)

Thursday, January 12, 2012

Reading from HBase in Hadoop

A useful application of a MapReduce job could be to move data from a large data store and summarize it in a smaller store. I decided to try moving data from HBase in to another store. Using the latest library it is pretty easy. I started with a simple table called baseballscores. The first row with id 'game1' looks like this:

hbase(main):001:0> get 'baseballscores', 'game1'
COLUMN CELL
loser:score timestamp=1325913333740, value=2
loser:team timestamp=1325913325984, value=Rays
winner:score timestamp=1325913306939, value=5
winner:team timestamp=1325913295557, value=Athletics


The table has two column families and two columns per family. To read from this table, we first we need to define our configuration. We can use the TableMapReduceUtil class to help set up the mapper job. We need to pass it the name of our table, the mapper class name, the mapper output key class type and the output value class from the mapper.


//create our configuration
Configuration conf = HBaseConfiguration.create();
Job job = Job.getInstance(conf, "HBaseReader");
job.setJarByClass(HBaseMapReduce.class);
Scan scan = new Scan();

//set up mapper job to use baseballscores table
TableMapReduceUtil.initTableMapperJob("baseballscores", scan, HBaseMapper.class, Text.class, Text.class, job);


Now we can create out mapper class to take the data from the HBase table. The values collection contains our column data. To get a specific column, we can call Values.getValue and pass in the name of the column family and then the column name. Here we extract the winning and losing team names and then associate them with a win or loss key so we can total them later.

   static class HBaseMapper extends TableMapper<Text, Text> {

@Override
public void map(ImmutableBytesWritable row, Result values, Context context) throws IOException {

Text winningTeam = new Text(Bytes.toString(values.getValue(Bytes.toBytes("winner"), Bytes.toBytes("team"))));
Text losingTeam = new Text(Bytes.toString(values.getValue(Bytes.toBytes("loser"), Bytes.toBytes("team"))));

try {
context.write("win", winningTeam);
context.write("loss", losingTeam);
} catch (InterruptedException e) {
throw new IOException(e);
}
}
}



And that's it. You can write the reducer of your choice and put the data anywhere you like.

Thursday, January 5, 2012

Custom File Output in Hadoop

I had been curious about the best way to control text output from a job and decided it was probably best to create my own output class. By simply extending the default FileOutputFormat class you can control all aspects of the output.

The first step is to create the output class. You need to be aware of the form returned by the reduce function. In my case, it sends a Text key and a list of integers associated with it so I need to define the template definition accordingly. The simple class looks like this:


public class MyTextOutputFormat extends FileOutputFormat<Text, List<IntWritable>> {
@Override
public org.apache.hadoop.mapreduce.RecordWriter<Text, List<Intwritable>> getRecordWriter(TaskAttemptContext arg0) throws IOException, InterruptedException {
//get the current path
Path path = FileOutputFormat.getOutputPath(arg0);
//create the full path with the output directory plus our filename
Path fullPath = new Path(path, "result.txt");

//create the file in the file system
FileSystem fs = path.getFileSystem(arg0.getConfiguration());
FSDataOutputStream fileOut = fs.create(fullPath, arg0);

//create our record writer with the new file
return new MyCustomRecordWriter(fileOut);
}
}


After figuring out the full path and creating our output file, we then need to create an instance of the actual record writer to pass back to the reduce job. That class allows us to actual write to the file however we want. Again the template definition needs to match the form coming from the reducer. Mine class looks like this:

public class MyCustomRecordWriter extends RecordWriter<Text, List<IntWritable>> {
private DataOutputStream out;

public MyCustomRecordWriter(DataOutputStream stream) {
out = stream;
try {
out.writeBytes("results:\r\n");
}
catch (Exception ex) {
}
}

@Override
public void close(TaskAttemptContext arg0) throws IOException, InterruptedException {
//close our file
out.close();
}

@Override
public void write(Text arg0, List arg1) throws IOException, InterruptedException {
//write out our key
out.writeBytes(arg0.toString() + ": ");
//loop through all values associated with our key and write them with commas between
for (int i=0; i<arg1.size(); i++) {
if (i>0)
out.writeBytes(",");
out.writeBytes(String.valueOf(arg1.get(i)));
}
out.writeBytes("\r\n");
}
}


Finally we need to tell our job about our ouput format and the path before running it.


job.setOutputKeyClass(Text.class);
job.setOutputValueClass(ArrayList.class);
job.setOutputFormatClass(MyTextOutputFormat.class);
FileOutputFormat.setOutputPath(job, new Path("/home/hadoop/out"));



And that's it. Two simple classes allow us all the control we need.