Thursday, January 10, 2013

Communicating With Hadoop Through RabbitMQ

For those who have a need to start hadoop jobs from other systems, RabbitMQ is an easy to use possibility. RabbitMQ is an open source and freely available messaging system that works with many languages such as Java, Ruby, Python and C#. Its simple to install and connect machines across a network. We'll start by setting up RabbitMQ on the linux machine that will run our Hadoop job. Install guides can be found here: http://www.rabbitmq.com/download.html. For ubuntu Linux, I simply used the following command:

sudo apt-get install rabbitmq-server

You can check that the broker is installed and running using this command:

rabbitmqctl status

The broker's job is to listen for messages coming from this or other machines and route them to the proper queues. If the broker is not running, your machine will not be able to communicate with other queues. Once you have determined the service is running, you can create your listener program. The listener's job is to wait for messages sent to our queue and then start the appropriate job upon receipt. Your listening program can be as fancy as you like, for testing purposes you can just create a basic Java application with a main entry point. Before writing the program you will need to download the libraries from here:

http://www.rabbitmq.com/java-client.html

When creating your program you will need to add references to the jar files: rabbitmq-client.jar, commons-cli-1.1.jar and commons-io-1.2.jar. To listen to a message queue doesn't take much code. First we set up our connection to our local broker:

    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();
Next we can specify our queue to listen to which will be created if it doesn't already exist.
    //create the queue if it doesn't already exist
    channel.queueDeclare(queueName, false, false, false, null);            
    QueueingConsumer consumer = new QueueingConsumer(channel);
    //listen to the queue
    channel.basicConsume(queueName, true, consumer);
Finally we can retrieve the message like this:
    QueueingConsumer.Delivery delivery = consumer.nextDelivery();
    String message = new String(delivery.getBody());

Once we retrieve the message we can determin a course of action. In this case, we wish to start our job using the tool runner:

ToolRunner.run(new Configuration(), new MyMapReduceJob(), args);

The full client code looks like this:

    try
    {
        String queueName = "TESTQUEUE";
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.queueDeclare(queueName, false, false, false, null);
            
        QueueingConsumer consumer = new QueueingConsumer(channel);
        channel.basicConsume(queueName, true, consumer);

        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            if (message.equalsIgnoreCase("runjob")) {
                int res = ToolRunner.run(new Configuration(), new MyMapReduceJob(), args);
                break;
            }
        }
    }
    catch (Exception ex) {
        System.out.println(ex.getMessage());
    }
Publishing Messages to Hadoop

Now with the client set up and listening we can create a sender. For this article we'll assume the message is being sent from .NET on a windows server. The first step is to install the RabbitMQ server. Instructions can be found here:

http://www.rabbitmq.com/install-windows.html

Currently this requires setting up Erlang first. Once the server is installed you can download the client library for .NET which can be found here:

http://www.rabbitmq.com/dotnet.html

Once everything is ready you can create a new C# console application to send messages. The code from .NET is similarly only a few lines. We create a connection using the connection factory which should point to the name of the server receiving the messages.

    ConnectionFactory factory = new ConnectionFactory();
    factory.Protocol = Protocols.FromEnvironment();
    factory.HostName = "UbuntuMachine";
    IConnection conn = factory.CreateConnection();

Next we create a model and bind to the queue.

    using (IModel model = conn.CreateModel())
    {
        model.ExchangeDeclare("exch", ExchangeType.Direct);
        model.QueueBind("TESTQUEUE", "exch", "key");
    }

Finally we send our message as a byte array.

    byte[] messageBody = Encoding.UTF8.GetBytes("runjob");
    model.BasicPublish("exch", "key", null, messageBody);

The final completed code looks like this:

    ConnectionFactory factory = new ConnectionFactory();
    factory.Protocol = Protocols.FromEnvironment();
    factory.HostName = "UbuntuMachine";
    using (IConnection conn = factory.CreateConnection())
    {
        using (IModel model = conn.CreateModel())
        {
            model.ExchangeDeclare("exch", ExchangeType.Direct);
            model.QueueBind("TESTQUEUE", "exch", "key");

            Console.WriteLine("Sending message.");
            byte[] messageBody = Encoding.UTF8.GetBytes("runjob");
            model.BasicPublish("exch", "key", null, messageBody);
        }
    }

And that is it. Now we can send messages to Hadoop from different environments using a simple message queue. This can be especially useful for distributed ETL system using tools such as SSIS or Pentaho.