1.MessageListener
MessageListener是最原始的消息监听器,它是JMS规范中定义的一个接口。其中定义了一个用于处理接收到的消息的onMessage方法,该方法只接收一个Message参数。
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
public class ConsumerMessageListener implements MessageListener {
public void onMessage(Message message) {
//这里我们知道生产者发送的就是一个纯文本消息,所以这里可以直接进行强制转换,或者直接把onMessage方法的参数改成Message的子类TextMessage
TextMessage textMsg = (TextMessage) message;
System.out.println("接收到一个纯文本消息。");
try {
System.out.println("消息内容是:" + textMsg.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
}
2.SessionAwareMessageListener
SessionAwareMessageListener是Spring为我们提供的,它不是标准的JMS MessageListener。假如我们在接收到消息后发送一个回复的消息,这个时候我们就需要在代码里面去重新获取一个Connection或Session。SessionAwareMessageListener的设计就是方便了我们回复一个消息,它同样提供了onMessage方法,但这个方法可以同时接收两个参数,一个是表示当前接收到的消息Message,另一个就是可以用来发送消息的Session对象。
package com.tiantian.springintejms.listener;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.springframework.jms.listener.SessionAwareMessageListener;
public class ConsumerSessionAwareMessageListener implements
SessionAwareMessageListener<TextMessage> {
private Destination destination;
public void onMessage(TextMessage message, Session session) throws JMSException {
System.out.println("收到一条消息");
System.out.println("消息内容是:" + message.getText());
MessageProducer producer = session.createProducer(destination);
Message textMessage = session.createTextMessage("ConsumerSessionAwareMessageListener。。。");
producer.send(textMessage);
}
public Destination getDestination() {
returndestination;
}
public void setDestination(Destination destination) {
this.destination = destination;
}
}
该MessageListener中通过set方法注入其属性destination的值为queueDestination。这样当我们的SessionAwareMessageListener接收到消息之后就会往queueDestination发送一个消息。destination也可以通过message.getJMSReplyTo()来获得。在发送消息时可通过message.setJMSReplyTo(replyQueue)来设置这个回复队列地址。
3.MessageListenerAdapter
它类实现了MessageListener接口和SessionAwareMessageListener接口,它的主要作用是将接收到的消息进行类型转换,然后通过反射的形式把它交给一个普通的Java类进行处理。MessageListenerAdapter会把接收到的消息做如下转换:
- TextMessage转换为String对象;
- BytesMessage转换为byte数组;
- MapMessage转换为Map对象;
- ObjectMessage转换为对应的Serializable对象。
通过MessageListenerAdapter的构造方法参数指定一个普通的Java类(比如一个MessageListener或者是一个SessionAwareMessageListener):
<!-- 消息监听适配器 -->
<bean id="messageListenerAdapter" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
<property name="delegate">
<bean class="com.tiantian.springintejms.listener.ConsumerListener"/>
</property>
<property name="defaultListenerMethod" value="receiveMessage"/>
</bean>
<!-- 消息监听适配器对应的监听容器 -->
<bean id="messageListenerAdapterContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="destination" ref="adapterQueue"/>
<!-- 使用MessageListenerAdapter来作为消息监听器 -->
<property name="messageListener" ref="messageListenerAdapter"/>
</bean>
如果我们指定的这个目标处理器是一个MessageListener或者是一个SessionAwareMessageListener的时候Spring将直接利用接收到的Message对象作为方法参数调用它们的onMessage方法。如果指定的目标处理器是一个普通的Java类时,调用defaultListenerMethod指定的方法,当我们没有指定defaultListenerMethod属性时,会默认调用目标监听器的handleMessage方法。
public class ConsumerListener {
public void handleMessage(String message) {
System.out.println("ConsumerListener通过handleMessage接收到一个纯文本消息,消息内容是:" + message);
}
public void receiveMessage(String message) {
System.out.println("ConsumerListener通过receiveMessage接收到一个纯文本消息,消息内容是:" + message);
}
}
MessageListenerAdapter另外一个主要的功能是可以通过MessageListenerAdapter注入的handleMessage方法自动的发送返回消息。当我们用于处理接收到的消息的方法(默认是handleMessage)的返回值不为空(null或者void)的时候,Spring会自动将它封装为一个JMS Message,然后自动进行回复。那么这个时候这个回复消息将发送到哪里呢?这主要有两种方式可以指定。
第一,可以通过发送的Message的setJMSReplyTo方法指定该消息对应的回复消息的目的地。
第二,通过MessageListenerAdapter的defaultResponseDestination属性来指定。
@Service
/**
* POJO class, have handleMessage(...) method.
* The return of handleMessage(...) will be
* automatically send back to message.getJMSReplyTo().
*/
public class JmsMessageListener {
public String handleMessage(String text) {
System.out.println("Received: " + text);
return "ACK from handleMessage";
}
}
在Spring配置文件中
<!-- 消息监听适配器 -->
<bean id="messageListenerAdapter" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
<!-- <constructor-arg>
<bean class="com.tiantian.springintejms.listener.ConsumerListener"/>
</constructor-arg> -->
<property name="delegate">
<bean class="com.tiantian.springintejms.listener.ConsumerListener"/>
</property>
<property name="defaultListenerMethod" value="receiveMessage"/>
<property name="defaultResponseDestination" ref="defaultResponseQueue"/>
</bean>
<!-- 消息监听适配器对应的监听容器 -->
<bean id="messageListenerAdapterContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="destination" ref="adapterQueue"/>
<property name="messageListener" ref="messageListenerAdapter"/><!-- 使用MessageListenerAdapter来作为消息监听器 -->
</bean>
!-- 默认的消息回复队列 -->
<bean id="defaultResponseQueue" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg>
<value>defaultResponseQueue</value>
</constructor-arg>
</bean>
<!-- defaultResponseQueue对应的监听器 -->
<bean id="defaultResponseQueueListener" class="com.tiantian.springintejms.listener.DefaultResponseQueueListener"/>
<!-- defaultResponseQueue对应的监听容器 -->
<bean id="defaultResponseQueueMessageListenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="destination" ref="defaultResponseQueue"/>
<property name="messageListener" ref="defaultResponseQueueListener"/>
</bean>
参考:三种消息监听器