8. Messaging – EJB 3 Developer Guide

Chapter 8. Messaging

In this chapter we discuss the Java Message Service API before moving on to Message-driven beans. In particular we will cover:

  • Java Message Service (JMS) API

  • Point-to-point and publish/subscribe messaging models

  • Queue producer and consumer examples

  • Topic producer and consumer examples

  • Message-driven beans (MDBs)

  • MDB activation configuration properties

  • MDB lifeCycle

  • Sending message confirmation to a client

  • MDBs and transactions

Introduction

So far our examples have all been synchronous. For example, a client invokes a session bean which performs some operation while the client waits. When the session bean completes its operation (or operations), control is passed back to the client and the client continues with the remaining operations. For some applications an asynchronous mode of operation is more appropriate, particularly in business to business applications. For example, a client may send a message requesting that an order be processed. The order processing may take some time. Once the message has been sent, the client continues with its operations. The client does not need to wait until the order has been processed. Of course the client will need to be notified in some way that the message has been received and in due course that the order has been processed. Even if the order processing is near instantaneous we may still prefer the asynchronous messaging model because we don't want to couple the client and server systems.

The client is referred to as the message producer. The recipient of the message is referred to as a message consumer. In certain messaging systems, as we shall see, there may be more than one consumer for a given message.

Message-driven beans are asynchronous message consumers. Typically a message-driven bean, or MDB, is based on the Java Message Service (JMS) API. Since EJB 2.1 MDBs may implement a messaging system other than JMS. However, we will only cover JMS-based MDBs in this book. All EJB-compliant application servers must support both MDBs and the JMS API.

The JMS API is not only implemented by EJB-compliant application servers, but by many standalone message-oriented-middleware (MOM) products such as IBM MQ. These products provide services such as load balancing and guaranteed message delivery.

Before looking at MDB examples we first take a brief look at the JMS API.

Java Message Service (JMS) API

The JMS API is a set of interfaces and classes that support two basic messaging modes: point-to-point and publish/subscribe.

In the point-to-point mode, the message producer sends a message to a queue. The message is read just once by a consumer. After the message is read it is deleted from the queue. There may be more than one potential consumer, however once a message has been read by one consumer it cannot be read by any of the remaining consumers. A consumer does not need to be registered with the queue at the time the message is sent. If no other consumer has read the message then the consumer can read the message as soon as it (the consumer) accesses the queue.

In the publish/subscribe mode, the message producer sends a message to a topic. One or more consumers register with, or subscribe to, a topic. The message remains in the topic until all consumers have read the message. If a consumer has not registered with a topic at the time the message is sent then it will be unable to subsequently read the message.

A queue or topic consumer can be either synchronous or asynchronous. If a consumer is synchronous and there is no message in the topic or queue, the consumer will hang until the message arrives. An asynchronous consumer is registered as a listener to the queue or topic; when a message arrives the listener is notified and the asynchronous consumer then reads the message.

JMS does not guarantee message delivery in any order. So if a producer sends a part A message followed by a part B message, you cannot assume the consumer will read part A before part B.

There are a number of steps a queue or topic producer must undertake and a number of steps depending on whether the consumer is synchronous or asynchronous. These are best described by examples.

Queue Producer and Consumer Examples

We will start with a Java application, QueueProducer.java, acting as a JMS queue producer. Recall how we use a queue in point-to-point messaging. Point-to-point is used when you want a particular recipient, or consumer, to process a given message just once. An example would be a message to process an order—we would want only one recipient to process this order and that recipient should process the order only once. Another example is an instruction to open a specific bank account. QueueProducer.java just sends a message Open bank account 1234 to the queue. The code is shown below:

package ejb30.client;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;
import javax.jms.*;
public class QueueProducer {
public static void main(String[] args) {
ConnectionFactory cf = null;
Connection conn = null;
Queue queue = null;
try {
InitialContext ctx = new InitialContext();
cf = (ConnectionFactory)
ctx.lookup("BankServiceConnectionFactory");
queue = (Queue) ctx.lookup("BankServiceJMSQueue");
} catch (NamingException e) {
e.printStackTrace();
}
try {
conn = cf.createConnection();
Session sess = conn.createSession(false,
Session.AUTO_ACKNOWLEDGE);
MessageProducer mp = sess.createProducer(queue);
TextMessage msg = sess.createTextMessage(
"Open bank account 1234");
mp.send(msg);
System.out.println("JMS Message sent");
} catch (JMSException ex) {
System.out.println(
"JMS Exception:" + ex.getMessage() );
} finally {
try {
conn.close();
} catch (JMSException ex) {}
}
}
}

The first step is to lookup a connection factory. In our example BankServiceConnectionFactory is the connection factory, and because this is a standalone Java application we need to use JNDI to look it up. It is possible for a session bean to act as a queue producer in which case we can use dependency injection to obtain a connection factory. We shall see an example of this later in this chapter. We will also describe how we create a connection factory later in the section "Running the Queue Producer and Synchronous Queue Consumer Examples".

Next we use JNDI to lookup the queue. In our example BankServiceJMSQueue is the queue. Again we shall shortly see how this is created.

Next we create a Connection object using the connection factory createConnection() method:

conn = cf.createConnection();

The next step is to create a Session object using the Connection.createSession() method:

Session sess = conn.createSession(false,
Session.AUTO_ACKNOWLEDGE);

The first parameter, false, indicates that we do not want to use transactions. Session.AUTO_ACKNOWLEDGE indicates that the consumer should send an acknowledgment to the JMS server immediately on receipt. However as the current program is a producer and not a consumer, this parameter will be ignored.

We now create a MessageProducer object using the Session.createProducer() method:

MessageProducer mp = sess.createProducer(queue);

We create a TextMessage object which holds a message in the form of a String object:

TextMessage msg = sess.createTextMessage(
"Open bank account 1234");

TextMessage is one of the message types that JMS supports. Alternatively, we could use one of the following JMS Message types:

  • MapMessage contains a Java Map

  • ObjectMessage contains a serializable Java object

  • BytesMessage contains an array of bytes

  • StreamMessage contains a stream of primitive Java types

We would use one of these message types when we want to send data, and not just a message, for the recipient to process.

Finally we use the MessageProducer.send() method to send the message to the queue:

mp.send(msg);

We have chosen to send a single message to the queue. We can of course send multiple messages by repeatedly invoking the send() method.

Finally, we release resources by closing the Connection object after we have finished.

Synchronous Queue Consumer Example

In this example a Java application, SynchQueueConsumer, simply synchronously reads a message from the same queue used in the previous example. Because a queue is being used, SynchQueueConsumer can read this message only once. Once the message has been read, it is removed from the queue and no other consumer can read the message. Because SynchQueueConsumer is a synchronous consumer, if there is no message in the queue SynchQueueConsumer will hang until the message arrives. A synchronous consumer operates in a pull manner; it proactively polls the queue for a message.

package ejb30.client;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;
import javax.jms.*;
public class SynchQueueConsumer{
public static void main(String[] args) {
ConnectionFactory cf = null;
Connection conn = null;
Queue queue = null;
try {
InitialContext ctx = new InitialContext();
cf = (ConnectionFactory)
ctx.lookup("BankServiceConnectionFactory");
queue = (Queue) ctx.lookup("BankServiceJMSQueue");
} catch (NamingException e) {
e.printStackTrace();
}
try {
conn = cf.createConnection();
Session sess = conn.createSession(false,
Session.AUTO_ACKNOWLEDGE);
MessageConsumer mcon = sess.createConsumer(queue);
conn.start();
Message message = mcon.receive(0);
TextMessage msg = (TextMessage) message;
System.out.println(
"JMS Message:" + msg.getText() );
} catch (JMSException ex) {
System.out.println(
"JMS Exception:" + ex.getMessage() );
} finally {
try {
conn.close();
} catch (JMSException ex) {}
}
}
}

As with our queue producer program, we use JNDI to look up the BankServiceConnectionFactory connection factory and the BankServiceJMSQueue queue. Again, as before, we create a Connection and a Session object.

We next create a MessageConsumer object using the Session.createConsumer() method:

MessageConsumer mcon = sess.createConsumer(queue);

We start the connection, and then use the MessageConsumer.receive() method to receive any message from the queue:

conn.start();
Message message = mcon.receive(0);

Recall that a synchronous consumer will hang if there is no message in the queue. The receive() method argument indicates the timeout in milliseconds. If the value is 0 the program will hang indefinitely.

Next we cast the message to a TextMessage type and print out the result:

TextMessage msg = (TextMessage) message;
System.out.println( "JMS Message:" + msg.getText() );

We have read a single message from the queue and assumed it is a TextMessage type. In a production version of the program we would read all messages from the queue and have some mechanism to indicate the last message, possibly a null message or End of Message message. We would filter out any messages which are not of type TextMessage or handle these types separately.

Finally, as before, we close the Connection object.

Running the Queue Producer and Synchronous Queue Consumer Examples

Before we can run the examples, we need to create the connection factory and queue. This process is application server-dependent. In the case of GlassFish there are two ways we can do this. First we can logon to the Administration Console by entering http://localhost:4848/ in the browser. Then use the Resources option to manually set up the connection factory and queue.

Alternatively we can create an Ant script which uses the GlassFish asadmin utility to perform the setup from the command line:

<target name="create-connection-factory">
<exec executable="${glassfish.home}/bin/asadmin"
failonerror="true"
vmlauncher="false">
<arg line="create-jms-resource --user admin
--passwordfile adminpassword
--restype javax.jms.ConnectionFactory --enabled=true
BankServiceConnectionFactory"/>
</exec>
</target>
<target name="create-queue">
<exec executable="${glassfish.home}/bin/asadmin"
failonerror="true"
vmlauncher="false">
<arg line="create-jms-resource --user admin
--passwordfile adminpassword
--restype javax.jms.Queue --enabled=true
--property Name=BankServiceJMSQueue BankServiceJMSQueue"/>
</exec>
</target>

We would also have corresponding targets for deleting the connection factory and queue.

Note in the statement,

<arg line="create-jms-resource --user admin
--passwordfile adminpassword
--restype javax.jms.Queue --enabled=true
--property Name=BankServiceJMSQueue BankServiceJMSQueue"/>

the clause Name=BankServiceJMSQueue refers to the value of the queue, whereas the last item in the line is the JNDI name of the queue. Normally we don't need to distinguish between the two items and both can be set to be the same. If the queue had a JNDI name of BankServiceJMSQueue and a value of BankServiceJMSQueueValue, then the statement would read:

<arg line="create-jms-resource --user admin
--passwordfile adminpassword
--restype javax.jms.Queue --enabled=true
--property Name=BankServiceJMSQueueValue
BankServiceJMSQueue"/>

Now run the QueueProducer program. After it completes, run SynchQueueConsumer. The program will read the message. Recall that in point-to-point mode a consumer does not need to be registered with the queue at the time the message is sent. If we run SynchQueueConsumer again, the program will hang until QueueProducer is run again. This is normal behavior for a synchronous queue consumer.

An Asynchronous Queue Consumer Example

In this example a Java application, AsynchQueueConsumer, simply asynchronously reads a message from the same queue used in the previous examples. This program differs from the synchronous consumer example in that it registers with the JMS server a listener object that implements the MessageListener interface. The MessageListener interface has an onMessage() method which the listener object must implement. When a message arrives on the queue, the JMS server will invoke the onMessage() method. So, in contrast to the synchronous case where a message is pulled from the queue, in the asynchronous case the message is pushed from the queue. The code for AsynchQueueConsumer is listed below:

public class AsynchQueueConsumer{
public static void main(String[] args) {
ConnectionFactory cf = null;
Connection conn = null;
Queue queue = null;
MessageListener listener = null;
try {
listener = new TextMessageListener();
InitialContext ctx = new InitialContext();
cf = (ConnectionFactory)
ctx.lookup("BankServiceConnectionFactory");
queue = (Queue) ctx.lookup("BankServiceJMSQueue");
} catch (NamingException e) {
e.printStackTrace();
}
try {
conn = cf.createConnection();
Session sess = conn.createSession(false,
Session.AUTO_ACKNOWLEDGE);
MessageConsumer mcon = sess.createConsumer(queue);
mcon.setMessageListener(listener);
conn.start();
} catch (JMSException ex) {
System.out.println(
"JMS Exception:" + ex.getMessage() );
} finally {
try {
conn.close();
} catch (JMSException ex) {}
}
}
}

We have highlighted where the program differs from a synchronous consumer.

As with our queue producer program, we use JNDI to look up the BankServiceConnectionFactory connection factory and the BankServiceJMSQueue queue. Again, as before, we create Connection, Session and MessageConsumer objects. Our listener object is named TextMessageListener. We shall see the code for TextMessageListener shortly. We create an instance of the TextMessageListener class and register it as a listener using the MessageConsumer.setMessageListener() method :

listener = new TextMessageListener();
mcon.setMessageListener(listener);

Next we start the connection. From this point messages are delivered to the listener. Finally we close the connection.

We need to create the TextMessageListener class which must implement the MessageListener interface. This interface has just one method, onMessage(), which TextMessageListener must implement. When a message arrives on the queue, the JMS server will invoke the onMessage() method. The code for TextMessageListener is shown below:

public class TextMessageListener implements MessageListener {
public void onMessage(Message message) {
try {
TextMessage msg = (TextMessage) message;
System.out.println(
"JMS Message:" + msg.getText() );
} catch (JMSException ex) {
ex.printStackTrace();
}
}
}

The onMessage() method has an argument of type Message, namely the incoming message. We cast the message to a TextMessage type before printing it out. Again in a production version, we would be more defensive and either cater for, or filter out, other JMS Message types.

Running the Asynchronous Queue Consumer Example

First run the QueueProducer program. After it completes run AsynchQueueConsumer. The program will read the message. Run AsynchQueueConsumer again. Unlike the synchronous version, the program will not hang. It will terminate with no message being printed out.

Topic Producer and Consumer Examples

We will start with a Java application, TopicProducer.java, acting as a JMS topic producer. Recall we use a topic in publish/subscribe messaging. Publish/subscribe is used for one-to-many delivery of messages. Typically this mode is used for broadcasting information to a large number of consumers who may not even be known to the topic producer. An example in the banking industry would be the notification of an imminent interest rate cut. TopicProducer.java just sends a message "Savings account interest rate cut by 0.5 % "

The code is very similar to QueueProducer.

public class TopicProducer {
public static void main(String[] args) {
ConnectionFactory cf = null;
Connection conn = null;
Topic topic = null;
try {
InitialContext ctx = new InitialContext();
cf = (ConnectionFactory)
ctx.lookup("BankServiceConnectionFactory");
topic = (Topic) ctx.lookup("BankServiceJMSTopic");
} catch (NamingException e) {
e.printStackTrace();
}
try {
conn = cf.createConnection();
Session sess = conn.createSession(false,
Session.AUTO_ACKNOWLEDGE);
MessageProducer mp = sess.createProducer(topic);
TextMessage msg = sess.createTextMessage(
"Savings account interest rate cut by 0.5 %");
mp.send(msg);
System.out.println("JMS Message sent");
} catch (JMSException ex) {
System.out.println(
"JMS Exception:" + ex.getMessage() );
} finally {
try {
conn.close();
} catch (JMSException ex) {}
}
}
}

We lookup the same connection factory, BankServiceConnectionFactory, that we used in the QueueProducer example. However, we could create a separate connection factory dedicated to topics if we so wished. The advantage of having separate connection factories is that the application server administrator can individually configure each connnection factory. Instead of a queue we look up a topic, in our example the topic is BankServiceJMSTopic :

topic = (Topic) ctx.lookup("BankServiceJMSTopic");

We create Connection and Session objects as before. We create a MessageProducer object using the Session object. However, this time we supply the topic as an argument:

MessageProducer mp = sess.createProducer(topic);

As before, we use the send() method of the MessageProducer object to send the message to the topic. Finally we close the connection.

Synchronous Topic Consumer Example

In this example a Java application, SynchTopicConsumer, simply synchronously reads a message from the same topic used in the previous example.

public class SynchTopicConsumer{
public static void main(String[] args) {
ConnectionFactory cf = null;
Connection conn = null;
Topic topic = null;
try {
InitialContext ctx = new InitialContext();
cf = (ConnectionFactory)
ctx.lookup("BankServiceConnectionFactory");
topic = (Topic) ctx.lookup("BankServiceJMSTopic");
} catch (NamingException e) {
e.printStackTrace();
}
try {
conn = cf.createConnection();
Session sess = conn.createSession(false,
Session.AUTO_ACKNOWLEDGE);
MessageConsumer mcon = sess.createConsumer(topic);
conn.start();
Message message = mcon.receive(0);
TextMessage msg = (TextMessage) message;
System.out.println(
"JMS Message:" + msg.getText() );
} catch (JMSException ex) {
System.out.println(
"JMS Exception:" + ex.getMessage() );
} finally {
try {
conn.close();
} catch (JMSException ex) {}
}
}
}

The code for this program is almost the same as for SynchQueueConsumer described earlier. One difference is that we perform a JNDI lookup of BankServiceJMSTopic rather than a queue:

topic = (Topic) ctx.lookup("BankServiceJMSTopic");

The second difference is that we use the topic as an argument when creating the MessageConsumer object:

MessageConsumer mcon = sess.createConsumer(topic);

Running the Topic Producer and Synchronous Topic Consumer Examples

Before we can run the examples we need to create the topic. As with connection factories and queues, this process is application server-dependent. In the case of GlassFish we can do this manually by means of the Administration Console, or by creating an Ant script which uses the GlassFish asadmin utility to perform the setup from the command line:

<target name="create-topic">
<exec executable="${glassfish.home}/bin/asadmin"
failonerror="true"
vmlauncher="false">
<arg line="create-jms-resource --user admin
--passwordfile adminpassword --restype javax.jms.Topic
--enabled=true --property Name=BankServiceJMSTopic
BankServiceJMSTopic"/>
</exec>
</target>

Now run the TopicProducer program. After it completes, run SynchTopicConsumer. This program will hang unlike the synchronous queue consumer example shown earlier. This is because SynchTopicConsumer was not registered with the topic at the time that TopicProducer sent the message. If we run TopicProducer again then the hanging SynchTopicConsumer will read the message and terminate.

An Asynchronous Topic Consumer Example

In this example a Java application, AsynchTopicConsumer, simply asynchronously reads a message from the same topic used in the previous example. The code for this program is very similar to the asynchronous queue consumer program described earlier. We also need to create an instance of a MessageListener class and register it as a listener. The code is shown below:

public class AsynchTopicConsumer{
public static void main(String[] args) {
ConnectionFactory cf = null;
Connection conn = null;
Topic topic = null;
MessageListener listener = null;
try {
listener = new TextMessageListener();
InitialContext ctx = new InitialContext();
cf = (ConnectionFactory)
ctx.lookup("BankServiceConnectionFactory");
topic = (Topic) ctx.lookup("BankServiceJMSTopic");
} catch (NamingException e) {
e.printStackTrace();
}
try {
conn = cf.createConnection();
Session sess = conn.createSession(false,
Session.AUTO_ACKNOWLEDGE);
MessageConsumer mcon = sess.createConsumer(topic);
mcon.setMessageListener(listener);
conn.start();
try
{
Thread.sleep(30000);
} catch (Exception e) {}
} catch (JMSException ex) {
System.out.println(
"JMS Exception:" + ex.getMessage() );
} finally {
try {
conn.close();
} catch (JMSException ex) {}
}
}
}

We use JNDI to look up the BankServiceConnectionFactory connection factory and the BankServiceJMSTopic topic. As before, we create Connection, Session, and MessageConsumer objects. We also create an instance of the TextMessageListener class and register it as a listener. However the program differs from the queue example in that after starting the connection we let the program sleep for 30 seconds.

The reason for this is, you may recall, that a consumer must be registered with the topic at the time the message is sent. In this way, providing we run the topic producer within 30 seconds of the consumer starting, we will be able to read any message in the topic. Alternatively we could have created an infinite loop which would require some sort of end of message from the producer to break from the loop.

Running the Asynchronous Topic Consumer Example

First run the AsynchTopicConsumer program. In another window immediately afterwards, run a second copy of AsynchTopicConsumer. Finally, in a third window run the TopicProducer program. All three programs should be started within 30 seconds of each other. Because this is a topic and a topic can have any number of subscribers, both copies of AsynchTopicConsumer will receive the message sent by TopicProducer.

Motivation for Message-Driven Beans

In enterprise scale environments we would like our messaging systems to take advantage of the services offered by an EJB container. These include security, transactions, concurrency, and the scalability of the EJB model. A session bean cannot be a message consumer since its business methods must be synchronously invoked by a client. However, a session bean can act as a message producer as we shall see. To meet these requirements, EJB provides message-driven beans (MDBs). An added bonus of MDBs is that much of the boilerplate code of creating and starting connections and creating sessions and listeners is dispensed with.

MDBs are EJB components which act as asynchronous message consumers. Since EJB 2.1, MDBs can support non-JMS messaging systems as well as JMS messaging systems, but we will describe only JMS-based MDBs in this book.

MDBs do not have remote or local business interfaces. JMS-based MDBs must implement the MessageListener interface. As we have seen, this interface has just one method, onMessage(). This method, as well as receiving the message, also performs any business logic not unlike session beans. MDBs can use point-to-point (queue) or publish/subscribe (topic) modes. MDBs are stateless. Like session beans, MDBs can invoke other session beans and can interact with entities by means of the EntityManager.

A Simple Message-Driven Bean Example

For this example we revert to the banking scenario. Now we will get a session bean to send a message to a JMS queue requesting that a new customer be added to the database. The message will be a Customer object with id, firstName, and lastName attributes. On receipt of the message, the MDB simply adds the customer to the database.

A Session Bean Queue Producer

We have modified the BankServiceBean.addCustomer() method so that it acts as a queue producer. The addCustomer() method is invoked synchronously by a client in the usual manner. The code for addCustomer() is shown below:

@Stateless
public class BankServiceBean implements BankService {
@Resource(mappedName="BankServiceConnectionFactory")
private ConnectionFactory cf;
@Resource(mappedName="BankServiceJMSQueue")
private Queue queue;
...
public void addCustomer(int custId, String firstName,
String lastName) {
try{
Customer cust = new Customer();
cust.setId(custId);
cust.setFirstName(firstName);
cust.setLastName(lastName);
Connection conn = cf.createConnection();
Session sess = conn.createSession(false,
Session.AUTO_ACKNOWLEDGE);
MessageProducer mp = sess.createProducer(queue);
ObjectMessage objmsg = sess.createObjectMessage();
objmsg.setObject(cust);
mp.send(objmsg);
conn.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
...

As we have a session bean running within an EJB container, we can dispense with JNDI lookups and use the @Resource annotation to inject the connection factory and queue into corresponding variables. We assume that a connection factory with JNDI name BankServiceConnectionFactory has already been created. We also assume that a queue with JNDI name BankServiceJMSQueue has already been created. We saw in the section "Running the Queue Producer and Synchronous Queue Consumer Examples" how to create the BankServiceConnectionFactory and BankServiceJMSQueue in the case of the GlassFish application server. Here we choose to use the same connection factory and JMS queue, but we could create a connection factory and JMS queue specifically for MDBs.

We create a customer object and set its attributes.

We create Connection, Session, and MessageProducer objects in the same way as in the Java application client queue producer described earlier. However, instead of creating a TextMessage object, we create an ObjectMessage object using the Session.createObjectMessage() method:

ObjectMessage objmsg = sess.createObjectMessage();

We use the ObjectMessage.setObject() method to assign the customer object to the message:

objmsg.setObject(cust);

Finally we use the MessageProducer.send() method to send the message to the queue.

A Message-Driven Bean Queue Consumer

We will create an MDB named ProcessCustomerBean which reads a customer object from the message queue and persists the customer to the database. The code is shown below:

package ejb30.MDB;
import javax.persistence.EntityManager;
import ejb30.entity.Customer;
import javax.persistence.PersistenceContext;
import javax.ejb.EJBException;
import javax.ejb.MessageDriven;
import javax.ejb.ActivationConfigProperty;
import javax.jms.MessageListener;
import javax.jms.Message;
import javax.jms.ObjectMessage;
@MessageDriven(activationConfig = {
@ActivationConfigProperty(propertyName = "destinationType",
propertyValue = "javax.jms.Queue")
}, mappedName="BankServiceJMSQueue" )
public class ProcessCustomerBean implements MessageListener {
@PersistenceContext(unitName="BankService")
private EntityManager em;
public void onMessage(Message message) {
try {
if (message instanceof ObjectMessage) {
ObjectMessage objmsg = (ObjectMessage)
message;
Customer cust = (Customer) objmsg.getObject();
Customer mergedCust = em.merge(cust);
System.out.println("MDB: Customer persisted");
} else {
System.out.println("Wrong type of message");
}
} catch(Exception e) {
throw new EJBException(e);
}
}
}

Recall there is no remote or local business interface for MDBs. We use the @MessageDriven annotation to indicate this is an MDB. We also use the @ActivationConfigProperty to specify a configuration property, namely destinationType. Configuration properties can be application server dependent, however there are a number, such as destinationType, which are common to all EJB 3 compliant application servers. We will see a few more of these configuration properties later in this chapter. We use the mappedName element to specify the JNDI name of the queue being read from.

We only describe the use of annotations with MDBs in this book. However, as with the rest of EJB 3, it is possible to use XML deployment descriptors with MDBs either as an alternative to, or in conjunction with annotations.

The MDB must implement the MessageListener interface. As with asynchronous Java application message consumers, we must implement an onMessage() method. This method has no return value. Within the method, we ignore any messages which are not of ObjectMessage type. We cast the message to ObjectMessage type, then obtain the customer object using the ObjectMessage.getObject() method. Finally we persist the customer object using the EntityManager.merge() method.

MDB Activation Configuration Properties

We have already seen the destinationType activation configuration property. In this section we will describe a few more configuration properties which are common to all EJB 3 compliant application servers.

acknowledgeMode

We have already come across message acknowledgment in the context of JMS Java applications. In the context of MDBs, without message acknowledgment from the EJB container to the JMS server, the server may re-send messages to the container. There are two values for acknowlegeMode

  • Auto-acknowledge—the EJB container sends an acknowledgement as soon as the message is received.

  • Dups-ok-acknowledge—the EJB container sends an acknowledgment any time after it has received the message. Consequently duplicate messages may be sent by the JMS server. The MDB must be programmed to handle duplicates.

An example of the syntax is:

@ActivationConfigProperty(propertyName = "acknowledgeMode",
propertyValue = "Auto-acknowledge")

The acknowledgeMode property is only needed if the MDB is using bean-managed transactions (BMT).

There is no need for this property if the MDB is using Container-Managed Transactions (CMT). With CMT if the transaction succeeds, the EJB container will send a message acknowledgment to the JMS server in any case. If the transaction fails, the message is not acknowledged and the message is put back in the queue and re-sent.

We have more to say about MDBs and transactions later in this chapter.

subscriptionDurability

You may recall in the publish/subscribe mode, one or more consumers register with a topic. The message remains in the topic until all consumers have read the message. Normally if a consumer has not connected to and registered with a topic at the time the message is sent then it will be unable to subsequently read the message. This default case is described as non-durable. In the case of MDBs, topic connection and registration usually occurs at deployment time, so if a message has already been sent to a topic, the MDB will not be able to read it. If, once we have deployed the MDB, the EJB container crashes after the message has been sent, the connection between the EJB container and JMS server will be lost. Consequently the MDB will not be able to read the message after the EJB container is restarted. However, with a durable subscription, the JMS server will hold the message until the EJB container restarts in the event of a crash.

System performance is improved with a nondurable subscription but at the cost of reliability.

An example of the syntax is:

@ActivationConfigProperty(propertyName =
"subscriptionDurability",
propertyValue = "NonDurable")

or

@ActivationConfigProperty(propertyName =
"subscriptionDurability",
propertyValue = "Durable")

messageSelector

A message selector allows the MDB to filter out certain messages. For example, we would like to modify the MDB described in the previous section to only process messages which relate to priority customers.

We do this as follows:

@ActivationConfigProperty(propertyName = "messageSelector",
propertyValue = "Priority = ‘HIGH'")

The producer would need to set the priority using the ObjectMessage.setStringProperty() method as follows:

objmsg.setStringProperty("Priority", "HIGH" );

The propertyValue clause can be an expression similar to one in an SQL WHERE clause.

MessageDrivenContext

An MDB may decide to implement the MessageDrivenContext interface. This interface is similar to the SessionContext interface provided for session beans, and provides information about an MDBs environment. We use the @Resource annotation to inject the context object into an MDB:

@Resource private MessageDrivenContext ctx;

There are a number of methods such as setRollbackOnly() and getRollbackOnly() which are identical to the corresponding SessionContext methods. These are used in the context of transactions, which we discuss later in this chapter.

Remaining MessageDrivenContext methods deal with Timer or Security aspects, which we cover in later chapters.

MDB LifeCycle

The MDB lifecycle is similar to the stateless session bean lifecycle. The following state diagram shows the MDB's lifecycle:

There are just two MDB lifecycle states: does-not-exist and method-ready pool. The initial state of an MDB is the does-not-exist state. This would be the case before a container starts up. The next state is the method-ready pool. When the container starts up it typically creates a number of MDB instances in the method-ready pool. However, the container can decide at any time to create such instances. In creating an instance in the method-ready pool, the container performs the following steps:

  1. The MDB is instantiated.

  2. The container injects the bean's MessageDrivenContext, if applicable.

  3. The container performs any other dependency injection that is specified in the bean's metadata.

  4. The container invokes a PostConstruct callback method if one is present in the bean. The PostConstruct method would be used for initializing any resources used by the bean. A PostConstruct method is called only once in the life of an instance, when it has transitioned from does-not-exist state to the method-ready pool. Note there can be at most only one PostConstruct method. The method can take any name but must be void with no arguments.

When a message arrives, the bean's onMessage() method is invoked. The container may have created multiple instances of MDBs which handle the same JMS destination, so any bean instance may be chosen by the container to execute the onMessage() method.

At some stage the container may decide to destroy the bean instance, usually when the container shuts down or when the container decides there are too many instances in the method ready pool. In this case the bean transitions from the method-ready-pool state to the does-not-exist state.

The container will first invoke a PreDestroy callback method if one is present in the bean. The PreDestroy method is used for tidying up activities. A PreDestroy method is called only once in the life of an instance, when it is about to transition to the does-not-exist state. As with the PostConstruct method, there can be at most only one PreDestroy method. The method can take any name but must be void with no arguments.

MDB Example Revisited

We will go back to our earlier example and add in a few more features that we covered in the last few sections. To recap, we have a session bean, BankServiceBean, which sends a message to a JMS queue requesting that a new customer be added to the database. The message itself is a customer object. On receipt of the message, the MDB adds the customer to the database. This time we will use a topic instead of a queue.

In this example we will also assume that customers can be either high or low priority. To illustrate the message filtering capabilities of MDBs, the MDB we describe in this section will only process high priority customers. We can assume that a separate MDB will process low priority customers.

The only modification we make to BankServiceBean is to indicate that we are using a JMS topic and that we have a high priority customer. We do this using the ObjectMessage.setStringProperty() method. The MDB will subsequently filter out any low priority customers. The code below highlights the modifications to BankServiceBean :

@Stateless
public class BankServiceBean implements BankService {
@Resource(mappedName="BankServiceConnectionFactory")
private ConnectionFactory cf;
@Resource(mappedName="BankServiceJMSTopic")
private Topic topic;
@PersistenceContext(unitName="BankService")
private EntityManager em;
public void addCustomer(int custId, String firstName,
String lastName) {
try{
Customer cust = new Customer();
cust.setId(custId);
cust.setFirstName(firstName);
cust.setLastName(lastName);
Connection conn = cf.createConnection();
Session sess = conn.createSession(false,
Session.AUTO_ACKNOWLEDGE);
MessageProducer mp = sess.createProducer(topic);
ObjectMessage objmsg = sess.createObjectMessage();
objmsg.setObject(cust);
objmsg.setStringProperty("Priority", "HIGH" );
mp.send(objmsg);
conn.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
...

The revised code for the MDB ProcessCustomerBean is shown below:

@MessageDriven(activationConfig = {
@ActivationConfigProperty(propertyName = "destinationType",
propertyValue = "javax.jms.Topic"),
@ActivationConfigProperty(propertyName = "acknowledgeMode",
propertyValue = "Auto-acknowledge"),
@ActivationConfigProperty(propertyName =
"subscriptionDurability",
propertyValue = "NonDurable"),
@ActivationConfigProperty(propertyName = "messageSelector",
propertyValue = "Priority = ‘HIGH'")
}, mappedName="BankServiceJMSTopic" )
public class ProcessCustomerBean implements MessageListener {
@PersistenceContext(unitName="BankService")
private EntityManager em;
@PostConstruct
public void init() {
System.out.println(
"Post Constructor Method init() Invoked");
}
public void onMessage(Message message) {
try {
if (message instanceof ObjectMessage) {
ObjectMessage objmsg = (ObjectMessage)
message;
Customer cust = (Customer) objmsg.getObject();
Customer mergedCust = em.merge(cust);
System.out.println("MDB: Customer persisted");
} else {
System.out.println("Wrong type of message");
}
} catch(Exception e) {
throw new EJBException(e);
}
}
@PreDestroy
public void tidyUp() {
System.out.println(
"Pre Destruction Method tidyUp() Invoked");
}
}

We indicate that we are using a topic and not a queue. We have added three more activation configuration properties: acknowlegeMode, subscriptionDurability, and messageSelector. We have set the acknowledgeMode to Auto-acknowledge; however as we are using CMT by default this will have no effect. We have added the property just as an example of the syntax.

The subscriptionDurability property has been set to NonDurable, and the messageSelector property indicates we should only read messages with priority set to HIGH.

We have added a PostConstruct method init() and a PreDestroy method tidyUp().

Sending Message Confirmation to a Client

We have already seen how an EJB container acknowledges message receipt with the JMS server. In this section we look at how a message producer client can be informed that its message has been received by an MDB.

This is done by the client setting the JMSReplyTo attribute with the message. JMSReplyTo contains the destination, queue or topic, to which the MDB should send its confirmation message. The MDB uses the ObjectMessage.getJMSReplyTo() method to obtain the confirmation destination. The MDB then acts as a message producer to send a message to the confirmation destination.

We will look at the example we have used throughout this chapter. Instead of using a session bean to send a JMS message to an MDB, we will use a Java application program. Recall that a session bean cannot act as a JMS message consumer. The Java application program, TopicProducer, will act both as a topic producer and a synchronous queue consumer. The code for TopicProducer is shown below:

public class TopicProducer {
@Resource(mappedName="BankServiceConnectionFactory")
private static ConnectionFactory cf;
@Resource(mappedName="BankServiceJMSTopic")
private static Topic topic;
@Resource(mappedName="BankServiceJMSQueue")
private static Queue queue;
public static void main(String[] args) {
Connection conn = null;
Connection conn2 = null;
try {
Customer cust = new Customer();
cust.setId(10);
cust.setFirstName("LEONARDO");
cust.setLastName("DAVINCI");
conn = cf.createConnection();
Session sess = conn.createSession(false,
Session.AUTO_ACKNOWLEDGE);
MessageProducer mp = sess.createProducer(topic);
ObjectMessage objmsg = sess.createObjectMessage();
objmsg.setObject(cust);
objmsg.setStringProperty("Priority", "HIGH" );
objmsg.setJMSReplyTo(queue);
mp.send(objmsg);
System.out.println("JMS Message sent");
// now program acts as synchronous consumer
// to get confirmation
conn2 = cf.createConnection();
Session sess2 = conn2.createSession(false,
Session.AUTO_ACKNOWLEDGE);
MessageConsumer mcon = sess.createConsumer(queue);
conn.start();
Message message = mcon.receive(0);
TextMessage msg = (TextMessage) message;
System.out.println(
"JMS Message:" + msg.getText() );
} catch (JMSException ex) {
System.out.println(
"JMS Exception:" + ex.getMessage() );
} finally {
try {
conn.close();
} catch (JMSException ex) {}
}
}
}

This time we choose to deploy TopicProducer in the application client container. This means we can use the @Resource annotation to inject BankServiceJMSTopic and BankServiceJMSQueue into corresponding variables rather than using JNDI lookups. BankServiceJMSTopic is used for sending the message and BankServiceJMSQueue for receiving confirmation. The program also creates a second Connection object conn2 which is subsequently used to obtain the message confirmation. The program creates a Customer object and sends a message in the usual way. The only addition is the line:

objmsg.setJMSReplyTo(queue);

This specifies the destination, BankServiceJMSQueue in our case, to which the MDB should reply. After sending the message, the program now acts as a synchronous queue consumer and reads the confirmation message. The code is the standard synchronous consumer code that we have already seen. Note the program assumes there will be just one confirmation message of type TextMessage. Of course, in the real world, we would not want to assume this. Note the program hangs until a message arrives in the confirmation queue. The program will hang indefinitely if no confirmation is sent.

We will now take a look at the modifications required for the ProcessCustomerBean MDB to send a confirmation.

@MessageDriven(mappedName="BankServiceJMSTopic" )
public class ProcessCustomerBean implements MessageListener {
@Resource(mappedName="BankServiceConnectionFactory")
private ConnectionFactory cf;
....
public void onMessage(Message message) {
try {
if (message instanceof ObjectMessage) {
ObjectMessage objmsg = (ObjectMessage)
message;
cust = (Customer) objmsg.getObject();
Customer mergedCust = em.merge(cust);
System.out.println("MDB: Customer persisted");
sendConfirmation(objmsg, cf);
} else {
System.out.println("Wrong type of message");
}
} catch(Exception e) {
throw new EJBException(e);
}
}
private void sendConfirmation(ObjectMessage objmsg,
ConnectionFactory cf) {
Connection conn = null;
try {
Queue queue = (Queue) objmsg.getJMSReplyTo();
conn = cf.createConnection();
Session sess = conn.createSession(false,
Session.AUTO_ACKNOWLEDGE);
MessageProducer mp = sess.createProducer(queue);
TextMessage msg = sess.createTextMessage(
"Customer added to databae");
mp.send(msg);
} catch (JMSException ex) {
System.out.println(
"JMS Exception:" + ex.getMessage() );
} finally {
try {
conn.close();
} catch (JMSException ex) {}
}
}
}

First note that we inject an instance of the BankServiceConnectionFactory into the cf variable. We need this later to send the confirmation message. There is no change to the onMessage() method other than a call to a new method, sendConfirmation(), after the customer is persisted.

The sendConfirmation() method obtains the destination by using the ObjectMessage.getJMSReplyTo() method:

Queue queue = (Queue) objmsg.getJMSReplyTo();

In our example the destination is a queue, but you can have a topic as a destination. We then use the usual queue producer code to send the confirmation as a TextMessageType.

MDBs and Transactions

Because JMS decouples the message producer from the consumer, it is not possible for a transaction to span the sending and receipt of a message. For example, you cannot have a transaction starting when a client sends a message to a recipient and committing after the client has received confirmation. In many cases a client does not know who the recipient is. Even if such transactions were possible, they would be very long running. Good transaction design encourages keeping transactions short.

Consequently the only transaction attributes available when using Container Managed Transactions (CMT) with MDBs are Required and Not_Supported. The remaining transaction attributes deal with client initiated transactions, so are not applicable to MDBs. An MDB with the Required attribute will always execute in a new transaction. As with session beans, we would use the Not_Supported attribute if the MDB writes to a legacy database or simple file system which does not support transactions.

As an alternative to CMT, MDBs can use Bean Managed Transactions (BMT). We need to be careful with exception handling when using CMT with MDBs. You may recall a system exception is defined as a RuntimeException, RemoteException, or EJBException. If a system exception is thrown, the container will roll back the transaction and discard the MDB instance. The message will not be acknowledged and the JMS server may resend the message. A new MDB instance will read the message. It is quite possible a second attempt to read the message will still cause the same system exception to be thrown. Consequently we have the possibility of an infinite loop of message rereads occurring. In practice it is possible to configure the JMS server to attempt a fixed number of message redeliveries before giving up.

A similar scenario occurs when the transaction is rolled back, either with the MessageDrivenContext.setRollbackOnly() method or if the MDB is annotated with @ApplicationException(rollback=true) and an application exception is thrown. The container will not discard the instance, but the message will not be acknowledged and the possibly infinite message retry scenario will occur.

For these reasons the MDB onMessage() method should not throw system exceptions or rollback transactions.

Summary

We first described the Java Message Service (JMS) API. We described two messaging modes that JMS supports: point-to-point and publish/subscribe. In point-to-point mode a client, or message producer, sends a message to a queue. In publish/subscribe mode a message producer sends a message to a topic. A message recipient is referred to a message consumer. A queue or topic consumer can be either synchronous or asynchronous.

We looked at a number of examples of Java applications running outside an EJB container using JMS. We looked at queue and topic producers and both synchronous and asynchronous queue and topic consumers.

Message Driven Beans (MDBs) are provided where our messaging systems need the services offered by an EJB container. MDBs are EJB components which act as asynchronous message consumers. Although MDBs may implement non JMS messaging systems, all EJB compliant servers must support JMS-based MDBs.

MDBs can use point-to-point (queue) or publish/subscribe (topic) modes.

JMS-based MDBs must implement the onMessage() method of the MessageListener interface. This method receives the message and performs any business logic.

We looked at MDB activation configuration properties and described the MDB lifecycle. We gave an example of an MDB sending message confirmation to a client.

Finally we covered MDBs and transactions.