Wednesday, April 9, 2014

Apache Camel - Request / Reply Pattern with Java DSL

Apache camel is a powerful tool yet  a lightweight integration framework. This implements all EIPs an  you can easily integrate different applications using the required patterns.  You can use Java, Spring XML, Scala or Groovy. Almost every technology you can imagine is available, for example HTTP, FTP, JMS, EJB, JPA, RMI, JMS, JMX, LDAP, Netty, and many, many more (of course most ESBs also offer support for them).

You can get more information on Apache camel by going through the following link - Apache Camel

There are many ways that we can use message routing and below article will illustrate one of the ways to use Request / Reply Pattern with Java DSL. Here I'm using Active MQ as my message Broker.

Apache ActiveMQ is one of the the most popular and powerful open source messaging and Integration Patterns server. Apache ActiveMQ is fast, supports many Cross Language Clients and Protocols, comes with easy to use Enterprise Integration Patterns. I like this because it has lot of features and useful tools. Also the latest ActiveMQ (5.9.0) is bundle with the hawtio web console which you can monitor all your queues.

Hawtio itself you can monitor all your camel Contexts, Routers, ActiveMQ Queues ect.

You can get more information on Apache ActiveMQ by going through following link - Apache ActiveMQ

Now lets check the Sample code.

Producer Sample

 public void myMethod() throws Exception {
  // TODO Auto-generated method stub

  CamelContext context = null;
  ProducerTemplate camelTemplate = null;
  
  context = new DefaultCamelContext();
  context.getProperties().put(Exchange.LOG_DEBUG_BODY_STREAMS, "true");
  
  // Connect to embedded ActiveMQComponent JMS broker
  ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
    "failover:(tcp://localhost:61616,tcp://localhost:61617)?randomize=true"
      + "&priorityBackup=true&timeout=40000");
  context.addComponent("jms",
    JmsComponent.jmsComponentAutoAcknowledge(connectionFactory));

  //Create the Template
  camelTemplate = context.createProducerTemplate();
  Object response = camelTemplate
    .requestBodyAndHeader(
      "jms:myQueue.queue?exchangePattern=InOut&requestTimeout=40000&timeToLive=40000"
        + "&asyncConsumer=true&asyncStartListener=true&concurrentConsumers=10"
        + "&useMessageIDAsCorrelationID=true",
        "mBodyMsg", "HeaderString", "HeaderValue");
  
  camelTemplate.stop();
  context.stop();

 }

If you are implementing InOut pattern you need ensure that processed data set back to the same message queue.
In order to achieve this you need to ensure that there is a RouteBuilder implemented which listen to the
above queue and process data. And also set the response back to the same Queue.

Below given the RouteBuilder Configure method implementation.

 @Override
 public void configure() throws Exception {

  CamelContext context = null;

  try {
   // create CamelContext
   context = new DefaultCamelContext();
   ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
     "failover:(tcp://localhost:61616,tcp://localhost:61617)?randomize=true&priorityBackup=true");

   context.addComponent("jms",
     JmsComponent.jmsComponentAutoAcknowledge(connectionFactory));

   context.addRoutes(new RouteBuilder() {
    public void configure() {
     // Error Handler with deadLetterChannel
     errorHandler(deadLetterChannel("jms:queue:dead"));
     GenerateRandomNumber.getInstance();
     int num = GenerateRandomNumber.randInt(1, 100000);
     from("jms:myQueue.queue")
       .setHeader("JMSMessageID", constant("ID : " + num))
       .setHeader("JMSReplyTo",
         constant("myQueue.queue"))
       .process(new RequestProcess());
    }
   });
   context.start();
  } catch (Exception e) {
   e.printStackTrace();
  } finally {
  }
 }

Data Processing will happen in the RequestProcess class which should be extend with the Apache camel Processor. Below given the process method implementation.
 public void process(Exchange exchange) throws Exception {
  String body = exchange.getIn().getBody(String.class);
  /***
   * Process data and get the response and set the resposen to the Exchage
   * body.
   */
  exchange.getOut().setBody(body + 
    "response; ID : " + exchange.getExchangeId());
 }