Saturday, December 12, 2009

JMS Request-Response with Spring-Integration

A colleague of mine asked me “why would Spring need to write a framework to send / receive JMS messages”.

So here is why:

In short, avoiding frameworks usually leads to breaking the DRY principle. 

In order to send messages you need to
  1. Fetch the connection factory from JNDI.
  2. Create a JMS connection.
  3. Create a JMS session.
  4. Fetch / create the JMS destination.
  5. Create and configure a Producer.
  6. Create your JMS message.
  7. Remember to start() the connection.
  8. Remember to close / cache the JMS resources, and handle errors properly.
In order to receive messages you need to
  1. Fetch the connection factory from JNDI.
  2. Create a JMS connection.
  3. Create a JMS session.
  4. Fetch / create the JMS destination.
  5. Create and configure a consumer - either add a MessageListener, or receive() messages in a loop.
  6. Remember to close / cache the JMS resources, and handle errors properly.
If you’d also like to get a response you need to
  1. Set the message reply to channel. This can be a temp destination, or an existing one. If this is an existing destination, you will also need a message selector and use the JMSCorrelationID.
  2. Create a consumer for the reply message and receive() the message.
  3. On the receiver side you need to create a response message, set the JMSCorrelationID, and send back to the JMSReplyTo destination that was specified on the incoming message.
Writing all this code is naturally prone to errors. Now throw in handling of connections errors (reconnects) as well, and what you get is a whole lot of messy code :P

When using Spring JMS integration framework you can avoid most of the boiler plate code.  When using spring-integration on top of that you can actually decouple your (Java) code from JMS and Spring all together. That is you can avoid having any compile time dependency on JMS or Spring!

The client code:
  <integration:channel id="publishChannel"/>
  <integration:channel id="jmsReplyToChannel"/>
 
  <integration:gateway service-interface="MyPublisher" id="publisher" default-request-channel="publishChannel" default-reply-channel="jmsReplyToChannel"/>

  <jms:outbound-gateway id="jmsOutGateway"
                       request-destination="topic"
                       request-channel="publishChannel"
                       reply-channel="jmsReplyToChannel"
                       connection-factory="jmsConnectionFactory"/>

The publisher is merely an interface :D

public interface MyPublisher {
    public long publish(String msg);
}

The server side code:

<jms:inbound-gateway id="jmsInGateway"
                      request-destination="topic"
                      request-channel="serviceInboundChannel"
                      connection-factory="jmsConnectionFactory"/>
 
  <integration:service-activator input-channel="serviceInboundChannel" ref="myService" method="print"/>

  <integration:channel id="serviceInboundChannel"/>

  <bean id="myService" class="MyServiceImpl"/>

The called service implementation can be anything you like. I only print the message to stdout and return the reception time – this can be later used to test that we are actually performing a synchronised request response style messaging, which is not the case when no JMS response is involved.
public class MyServiceImpl implements MyService {
    public long print(String msg) {
        long now = System.currentTimeMillis();
        System.out.format("%d print: %s\n", now, msg);
        return now;
    }
}

We also need some boring JMS Spring beans definitions to get this working. The code below assumes you have a running ActiveMQ server, but you may replace the provider url to use an embeded ActiveMQ broker (see the commented-out line).
<bean id="jmsJndiTemplate" class="org.springframework.jndi.JndiTemplate">
    <constructor-arg>
      <props>
        <prop key="java.naming.factory.initial">org.apache.activemq.jndi.ActiveMQInitialContextFactory</prop>
        <prop key="java.naming.provider.url">tcp://localhost:61616</prop>
        <!-- prop key="java.naming.provider.url">vm://localhost?broker.persistent=false</prop -->
      </props>
    </constructor-arg>
  </bean>

  <bean id="jmsConnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
    <constructor-arg>
      <bean class="org.springframework.jndi.JndiObjectFactoryBean">
        <property name="jndiName" value="ConnectionFactory"/>
            <property name="jndiTemplate" ref="jmsJndiTemplate"/>
          </bean>
    </constructor-arg>
    <property name="reconnectOnException" value="true"/>
    <property name="cacheProducers" value="true"/>
  </bean>

  <bean id="topic" class="org.springframework.jndi.JndiObjectFactoryBean">
        <property name="jndiName" value="dynamicTopics/eran-test"/>
        <property name="jndiTemplate" ref="jmsJndiTemplate"/> 
  </bean>

The diagram below depicts our creation:

Notes:

  • I used a topic here, but it would have been more appropriate to use a queue in most cases…
  • Spring-Integration has it’s own cost. Although the code could be slightly improved performance wise, the internal message format, and the temp queue / channels creation impact performance. So don’t say I didn’t warn you… a plain async message passing style application should probably be written with plain Spring JMSteamplate, and message containers.
  • The code above as is causes Spring to create a lot of temporary resources like queues, consumers, producers, channels. I left it this way for brevity, but most of the temp resources creation can be avoided, thus reducing the load on the message broker.

9 comments:

  1. What about configuring the topic rules?
    Where do we configure the persistent / ttl / message priority...?

    ReplyDelete
  2. The outbound-gateway has all the QoS attributes such as delivery-mode, ttl, priority, etc.

    ReplyDelete
  3. Hi

    A small question what tool did you use to create the diagram ?

    Eyal

    ReplyDelete
  4. I created the diagram in Visio using the Enterprise Integration Patterns stencil.

    As far as I can recall I downloaded it from here: http://www.eaipatterns.com/downloads.html

    It is easy to find if not anyway - there are several of those stencils our there.

    ReplyDelete
  5. Hi,
    If we configure the outbound-gateway with a reply-destination, Spring will not create a temp queue for the request/reply.

    ReplyDelete
  6. Good point 'Anonymous'.
    I actually ended up doing just that.
    Never got to make that note in the blog though.

    ReplyDelete
  7. Thanks for the post..simple and nice.
    Can we get away with temp channels between
    1) the inbound-gateway and the serviceActivator
    2) the serviceActivator and the service bean..
    I have tried different things..no results

    ReplyDelete
  8. Can you please tell us, what are all the jars used with all specific versions . as we are facing issue while configuring because of incorrect version of jars.

    ReplyDelete
  9. Sorry Prbha,
    I posted this 2 years ago, and I can't really remember.
    I think it was Spring 2.5.x, and spring-integration was version ~1.x

    The current spring-integration is 2.0.5.RELEASE.

    I suggest you consult the docs for the deps: www.springsource.org/spring-integration#documentation

    I can try and help out if you specify your classpath, but I can't promise anything.

    ReplyDelete