Spring集成ActiveMQ

ActiveMQ基础

JMS(JAVA Message Service,java消息服务)API是一个消息服务的标准或者说是规范,允许应用程序组件基于JavaEE平台创建、发送、接收和读取消息。它使分布式通信耦合度更低,消息服务更加可靠以及异步性。

基本概念

JMS是java的消息服务,JMS的客户端之间可以通过JMS服务进行异步的消息传输。

消息模型

○ Point-to-Point(P2P)

○ Publish/Subscribe(Pub/Sub)

即点对点和发布订阅模型

JMS编程模型

(1) ConnectionFactory

创建Connection对象的工厂,针对两种不同的jms消息模型,分别有QueueConnectionFactory和TopicConnectionFactory两种。可以通过JNDI来查找ConnectionFactory对象。

(2) Destination

Destination的意思是消息生产者的消息发送目标或者说消息消费者的消息来源。对于消息生产者来说,它的Destination是某个队列(Queue)或某个主题(Topic);对于消息消费者来说,它的Destination也是某个队列或主题(即消息来源)。

所以,Destination实际上就是两种类型的对象:Queue、Topic可以通过JNDI来查找Destination。

(3) Connection

Connection表示在客户端和JMS系统之间建立的链接(对TCP/IP socket的包装)。Connection可以产生一个或多个Session。跟ConnectionFactory一样,Connection也有两种类型:QueueConnection和TopicConnection。

(4) Session

Session是我们操作消息的接口。可以通过session创建生产者、消费者、消息等。Session提供了事务的功能。当我们需要使用session发送/接收多个消息时,可以将这些发送/接收动作放到一个事务中。同样,也分QueueSession和TopicSession。

(5) 消息的生产者

消息生产者由Session创建,并用于将消息发送到Destination。同样,消息生产者分两种类型:QueueSender和TopicPublisher。可以调用消息生产者的方法(send或publish方法)发送消息。

(6) 消息消费者

消息消费者由Session创建,用于接收被发送到Destination的消息。两种类型:QueueReceiver和TopicSubscriber。可分别通过session的createReceiver(Queue)或createSubscriber(Topic)来创建。当然,也可以session的creatDurableSubscriber方法来创建持久化的订阅者。

(7) MessageListener

消息监听器。如果注册了消息监听器,一旦消息到达,将自动调用监听器的onMessage方法。EJB中的MDB(Message-Driven Bean)就是一种MessageListener。

ActiveMQ下载及安装

这个网上例子比较多,不同操作系统方式不同,这里不做说明。
Mac可以使用brew命令来进行安装,也比较简单

brew install activemq
image.png

下载安装成功以后,启动activemq

activemq start

然后本地浏览器中输入http://localhost:8161/查看是否启动成功,登录用户及密码为admin/admin

image.png

Spring集成

1.使用maven引入activemq包

<dependency>
  <groupId>org.apache.activemq</groupId>
  <artifactId>activemq-all</artifactId>
  <version>5.15.2</version>
</dependency>

2.配置spring文件

<!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供-->
    <bean id="amqConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
        <property name="brokerURL" value="tcp://localhost:61616"></property>
    </bean>
<!-- Spring Caching连接工厂 -->
    <!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->
    <bean id="cachingConnectionFactory"
          class="org.springframework.jms.connection.CachingConnectionFactory">
        <!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory -->
        <property name="targetConnectionFactory" ref="amqConnectionFactory"></property>
        <!-- Session缓存数量 -->
        <property name="sessionCacheSize" value="10"></property>
    </bean>

消息提供者/发布者配置

jmsTemplate配置

<!-- Spring提供的JMS工具类,它可以进行消息发送、接收等 -->
    <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
        <!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 -->
        <constructor-arg ref="cachingConnectionFactory"/>
    </bean>

消息模型配置

   <!-- 点对点队列 -->   
    <bean id="mqQueue" class="org.apache.activemq.command.ActiveMQQueue">
        <constructor-arg index="0" value="MessageQueue" />
    </bean>
   <!-- 发布/消费 -->
    <bean id="mqTopic" class="org.apache.activemq.command.ActiveMQTopic">
        <constructor-arg index="0" value="MessageTopic" />
    </bean>

Java代码

消息提供者/发布者

package com.permission.core.activemq;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.stereotype.Component;

import javax.jms.*;

/**
 * @Description: 消息提供者/发布者
 * @Author: 
 * @Date: Created in 下午5:44 2018/1/11
 */
@Component
public class MessageQueueSender {

    @Autowired
    private JmsTemplate jmsTemplate;

    @Autowired
    private Destination mqQueue;

    @Autowired
    private Destination mqTopic;


    public void sendMessage(String message) {
        jmsTemplate.send(mqTopic, new MessageCreator() {
            @Override
            public Message createMessage(Session session) throws JMSException {
                TextMessage textMessage = session.createTextMessage(message);
                return textMessage;
            }
        });
    }
}

如果是点对点模式,jmsTemplate.send(mqQueue,****)
如果是发布/订阅模式,jmsTemplate.send(mqTopic,****)

消息接收者/消费者配置

如果跟消息提供者不在一个工程下,也需要配置connectionFactory

    <!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供-->
    <bean id="amqConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
        <property name="brokerURL" value="tcp://localhost:61616"></property>
    </bean>

    <!-- Spring Caching连接工厂 -->
    <!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->
    <bean id="cachingConnectionFactory"
          class="org.springframework.jms.connection.CachingConnectionFactory">
        <!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory -->
        <property name="targetConnectionFactory" ref="amqConnectionFactory"></property>
        <!-- Session缓存数量 -->
        <property name="sessionCacheSize" value="10"></property>
    </bean>

配置消息监听

    <!-- 消息监听器 -->
    <bean id="consumerMessageListener" class="com.bigdata.core.activemq.ConsumerMessageListener" />

    <jms:listener-container container-type="default" destination-type="topic" connection-factory="cachingConnectionFactory" acknowledge="auto">
        <jms:listener destination="MessageTopic" ref="consumerMessageListener"/>
    </jms:listener-container>

这里需要注意,如果是点对点模式 jms:listener-container标签属性destination-type需要配置为queue,默认情况为queue;如果是发布/订阅模式jms:listener-container标签属性destination-type需要配置为topic

消息接收者/订阅者 java代码

package com.bigdata.core.activemq;

import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

/**
 * @Description: TODO
 * @Author: 
 * @Date: Created in 下午4:28 2018/1/11
 */
public class ConsumerMessageListener implements MessageListener {

    @Override
    public void onMessage(Message message) {
        TextMessage textMessage = (TextMessage) message;
        try{
            System.out.println("接收到的消息内容是:" + textMessage.getText());
        }catch (Exception e){
            e.printStackTrace();
        }

    }
}

java代码监听需要实现接口MessageListener

此外可以在amqConnectionFactory中配置消息传输监听器,用以处理网络异常及服务器异常,配置信息如下:

   <!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供-->
   <bean id="amqConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
       <property name="brokerURL" value="tcp://localhost:61616"></property>
       <!-- 消息传输监听器 处理网络及服务器异常 -->
       <property name="transportListener">
           <bean class="com.permission.core.activemq.ActiveMQTransportListener" />
       </property>
   </bean>

ActiveMQTransportListener代码

package com.permission.core.activemq;

import org.apache.activemq.transport.TransportListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;

/**
 * @Description: TODO
 * @Author: 
 * @Date: Created in 下午4:31 2018/1/11
 */
public class ActiveMQTransportListener implements TransportListener {

    private final static Logger log = LoggerFactory.getLogger(ActiveMQTransportListener.class);

    @Override
    public void onCommand(Object o) {
        log.info("onCommand -> 对消息传输命令进行监控  ...");
    }

    @Override
    public void onException(IOException e) {
        log.error("onException -> 消息服务器连接错误......", e);
    }

    @Override
    public void transportInterupted() {
        log.warn("transportInterupted -> 消息服务器连接发生中断...");
    }

    @Override
    public void transportResumed() {
        log.info("transportResumed -> 消息服务器连接已恢复...");
    }
}

订阅着/接收者消息传送监听器会对所有数据进行监控,如下:

接收到的消息内容是:发布消息:0
[2018-01-16 15:06:35.202] [INFO] [ActiveMQ Transport: tcp://localhost/127.0.0.1:61616@56196] [Caller+0   at com.bigdata.core.activemq.ActiveMQTransportListener.onCommand(ActiveMQTransportListener.java:21)
] >>> onCommand -> 对消息传输命令进行监控  ...
接收到的消息内容是:发布消息:1
[2018-01-16 15:06:35.206] [INFO] [ActiveMQ Transport: tcp://localhost/127.0.0.1:61616@56196] [Caller+0   at com.bigdata.core.activemq.ActiveMQTransportListener.onCommand(ActiveMQTransportListener.java:21)
] >>> onCommand -> 对消息传输命令进行监控  ...
接收到的消息内容是:发布消息:2
[2018-01-16 15:06:35.209] [INFO] [ActiveMQ Transport: tcp://localhost/127.0.0.1:61616@56196] [Caller+0   at com.bigdata.core.activemq.ActiveMQTransportListener.onCommand(ActiveMQTransportListener.java:21)
] >>> onCommand -> 对消息传输命令进行监控  ...
接收到的消息内容是:发布消息:3
[2018-01-16 15:06:35.214] [INFO] [ActiveMQ Transport: tcp://localhost/127.0.0.1:61616@56196] [Caller+0   at com.bigdata.core.activemq.ActiveMQTransportListener.onCommand(ActiveMQTransportListener.java:21)
] >>> onCommand -> 对消息传输命令进行监控  ...
接收到的消息内容是:发布消息:4
[2018-01-16 15:06:35.216] [INFO] [ActiveMQ Transport: tcp://localhost/127.0.0.1:61616@56196] [Caller+0   at com.bigdata.core.activemq.ActiveMQTransportListener.onCommand(ActiveMQTransportListener.java:21)
] >>> onCommand -> 对消息传输命令进行监控  ...
接收到的消息内容是:发布消息:5
[2018-01-16 15:06:35.220] [INFO] [ActiveMQ Transport: tcp://localhost/127.0.0.1:61616@56196] [Caller+0   at com.bigdata.core.activemq.ActiveMQTransportListener.onCommand(ActiveMQTransportListener.java:21)
] >>> onCommand -> 对消息传输命令进行监控  ...
接收到的消息内容是:发布消息:6
[2018-01-16 15:06:35.222] [INFO] [ActiveMQ Transport: tcp://localhost/127.0.0.1:61616@56196] [Caller+0   at com.bigdata.core.activemq.ActiveMQTransportListener.onCommand(ActiveMQTransportListener.java:21)
] >>> onCommand -> 对消息传输命令进行监控  ...
接收到的消息内容是:发布消息:7
[2018-01-16 15:06:35.223] [INFO] [ActiveMQ Transport: tcp://localhost/127.0.0.1:61616@56196] [Caller+0   at com.bigdata.core.activemq.ActiveMQTransportListener.onCommand(ActiveMQTransportListener.java:21)
] >>> onCommand -> 对消息传输命令进行监控  ...
接收到的消息内容是:发布消息:8
[2018-01-16 15:06:35.225] [INFO] [ActiveMQ Transport: tcp://localhost/127.0.0.1:61616@56196] [Caller+0   at com.bigdata.core.activemq.ActiveMQTransportListener.onCommand(ActiveMQTransportListener.java:21)
] >>> onCommand -> 对消息传输命令进行监控  ...
接收到的消息内容是:发布消息:9
[2018-01-16 15:06:35.226] [INFO] [ActiveMQ Transport: tcp://localhost/127.0.0.1:61616@56196] [Caller+0   at com.bigdata.core.activemq.ActiveMQTransportListener.onCommand(ActiveMQTransportListener.java:21)
] >>> onCommand -> 对消息传输命令进行监控  ...

如果activemq服务器down掉,消息传输会输出

2018-01-16 15:09:26,467 WARN [ActiveMQ Connection Executor: tcp://localhost/127.0.0.1:61616@56068] org.springframework.jms.connection.CachingConnectionFactory#onException[SingleConnectionFactory.java:322] Encountered a JMSException - resetting the underlying JMS Connection
javax.jms.JMSException: java.io.EOFException
    at org.apache.activemq.util.JMSExceptionSupport.create(JMSExceptionSupport.java:54)
    at org.apache.activemq.ActiveMQConnection.onAsyncException(ActiveMQConnection.java:1952)
    at org.apache.activemq.ActiveMQConnection.onException(ActiveMQConnection.java:1971)
    at org.apache.activemq.transport.TransportFilter.onException(TransportFilter.java:114)
    at org.apache.activemq.transport.ResponseCorrelator.onException(ResponseCorrelator.java:126)
    at org.apache.activemq.transport.TransportFilter.onException(TransportFilter.java:114)
    at org.apache.activemq.transport.TransportFilter.onException(TransportFilter.java:114)
    at org.apache.activemq.transport.WireFormatNegotiator.onException(WireFormatNegotiator.java:173)
    at org.apache.activemq.transport.AbstractInactivityMonitor.onException(AbstractInactivityMonitor.java:345)
    at org.apache.activemq.transport.TransportSupport.onException(TransportSupport.java:96)
    at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:219)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.EOFException: null
    at java.io.DataInputStream.readInt(DataInputStream.java:392)
    at org.apache.activemq.openwire.OpenWireFormat.unmarshal(OpenWireFormat.java:268)
    at org.apache.activemq.transport.tcp.TcpTransport.readCommand(TcpTransport.java:240)
    at org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:232)
    at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:215)
    ... 1 common frames omitted
2018-01-16 15:09:26,479 ERROR [ActiveMQ Connection Executor: tcp://localhost/127.0.0.1:61616@56068] com.permission.core.activemq.ActiveMQTransportListener#onException[ActiveMQTransportListener.java:26] onException -> 消息服务器连接错误......
java.io.EOFException: null
    at java.io.DataInputStream.readInt(DataInputStream.java:392)
    at org.apache.activemq.openwire.OpenWireFormat.unmarshal(OpenWireFormat.java:268)
    at org.apache.activemq.transport.tcp.TcpTransport.readCommand(TcpTransport.java:240)
    at org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:232)
    at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:215)
    at java.lang.Thread.run(Thread.java:748)
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,456评论 5 477
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,370评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,337评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,583评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,596评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,572评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,936评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,595评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,850评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,601评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,685评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,371评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,951评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,934评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,167评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 43,636评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,411评论 2 342