ActiveMQ入门教程
本博客内容皆为网络搜集而来,不保证任何版权问题,不保证长期有效性(即具有时效性),如有侵权或内容有违相关法律法规,请联系本人邮箱移除
博客永久更新地址: ActiveMQ入门教程
概述与介绍
ActiveMQ 是Apache出品,最流行的. 功能强大的即时通讯和集成模式的开源服务器。
ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现。
提供客户端支持跨语言和协议,带有易于在充分支持JMS 1.1和1.4使用J2EE企业集成模式和许多先进的功能。
特性
多种语言和协议编写客户端。语言: Java. C. C++. C#. Ruby. Perl. Python. PHP。应用协议:OpenWire. Stomp REST. WS Notification. XMPP. AMQP
完全支持JMS1.1和J2EE 1.4规范 (持久化,XA消息,事务)
对Spring的支持,ActiveMQ可以很容易内嵌到使用Spring的系统里面去,而且也支持Spring2.0的特性
通过了常见J2EE服务器(如 Geronimo. JBoss 4. GlassFish. WebLogic)的测试,其中通过JCA 1.5 resource adaptors的配置,可以让ActiveMQ可以自动的部署到任何兼容J2EE 1.4 商业服务器上
支持多种传送协议:in-VM. TCP. SSL. NIO. UDP. JGroups. JXTA
支持通过JDBC和journal提供高速的消息持久化
- 从设计上保证了高性能的集群,客户端-服务器,点对点
- 支持Ajax
- 支持与Axis的整合
- 可以很容易得调用内嵌JMS provider,进行测试
本文基于最新版的ActiveMQ 5.15.0进行讲解, 期间遇到的问题也是这个版本出现的, 解决方法也会在文中记录
1.下载
2.安装
2.1解压apache-activemq-5.15.0-bin.zip
本文解压在D盘根目录
2.2启动activemq
2.2.1普通启动命令
按下: win+r, 输入cmd, 进入dos界面:
//3条命令的意思分别是:
//1.进入activemq的bin目录
//2.进入d盘
//3.启动activemq
cd D:\apache-activemq-5.15.0\bin
d:
activemq start
如果你启动看到了异常信息:
Caused by: java.io.IOException: Failed to bind to server socket: tcp://0.0.0.0:61616?maximumConnections=1000&wireformat.maxFrameSize=104857600 due to: java.net.SocketException: Unrecognized Windows Sockets error: 0: **JVM_Bind**
原因是:
你的端口被占用了。找到是哪个程序占用了你的端口, 并kill掉该进程或服务。或者尝试修改ActiveMQ的默认端口61616(ActiveMQ使用的默认端口是61616), 在大多数情况下,占用61616端口的是Internet Connection Sharing (ICS) 这个Windows服务,你只需停止它就可以启动ActiveMQ了。
启动成功:
1.访问: http://127.0.0.1:8161/,再点击Manage ActiveMQ broker
进入管理页面, 帐号密码都是: admin
进入admin页面, 也可以直接进入http://127.0.0.1:8161/admin/, 输入帐号密码: admin即可
导航菜单中
- Queues是队列方式消息
- Topics是主题方式消息
- Subscribers消息订阅监控查询
- Connections可以查看链接数,分别可以查看xmpp、ssl、stomp、openwire、ws和网络链接
- Network是网络链接数监控
- Send可以发送消息数据。
如果你想修改用户名和密码的话,在conf/jetty-realm.properties中修改即可。
## ---------------------------------------------------------------------------
## Licensed to the Apache Software Foundation (ASF) under one or more
## contributor license agreements. See the NOTICE file distributed with
## this work for additional information regarding copyright ownership.
## The ASF licenses this file to You under the Apache License, Version 2.0
## (the "License"); you may not use this file except in compliance with
## the License. You may obtain a copy of the License at
##
## http://www.apache.org/licenses/LICENSE-2.0
##
## Unless required by applicable law or agreed to in writing, software
## distributed under the License is distributed on an "AS IS" BASIS,
## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
## See the License for the specific language governing permissions and
## limitations under the License.
## ---------------------------------------------------------------------------
# Defines users that can access the web (console, demo, etc.)
# username: password [,rolename ...]
admin: admin, admin
user: user, user
2.2.2运行demo示例
遇到的问题:
使用官方文档提供的启动方式启动ActiveMQ v5.15,
bin/activemq console xbean:examples/conf/activemq-demo.xml
不能启动,activemq中没有console 参数。
解决办法:
不同版本启动命令不一样!
1、V5.8使用下面的命令行启动
D:\apache-activemq-5.8.0\bin>activemq xbean:../conf/activemq-demo.xml
2、V5.9使用下面的命令行启动
D:\apache-activemq-5.9.1\bin>activemq xbean:../examples/conf/activemq-demo.xml
3、V5.15.0使用下面的命令行启动
D:\apache-activemq-5.15.0\bin>activemq start xbean:file:../examples/conf/activemq-demo.xml
当然你还可以用绝对的文件目录方式:activemq start xbean:file:D:\apache-activemq-5.15.0\conf/activemq-demo.xml
输入命令启动
activemq start xbean:file:../examples/conf/activemq-demo.xml
访问: http://localhost:8161/demo/ , 点击Websockets
下面还有个发消息和收消息
发消息
收消息
进入页面
3.使用
3.1消息示例
3.1.1ActiviteMQ消息有3种形式
JMS 公共 | 点对点域 | 发布/订阅域 |
---|---|---|
ConnectionFactory | QueueConnectionFactory | TopicConnectionFactory |
Connection | QueueConnection | TopicConnection |
Destination | Queue | Topic |
Session | QueueSession | TopicSession |
MessageProducer | QueueSender | TopicPublisher |
MessageConsumer | QueueReceiver | TopicSubscriber |
- 点对点方式(point-to-point)
点对点的消息发送方式主要建立在 Message Queue,Sender,reciever上,Message Queue 存贮消息,Sneder 发送消息,receive接收消息.具体点就是Sender Client发送Message Queue ,而 receiver Cliernt从Queue中接收消息和"发送消息已接受"到Quere,确认消息接收。消息发送客户端与接收客户端没有时间上的依赖,发送客户端可以在任何时刻发送信息到Queue,而不需要知道接收客户端是不是在运行
- 发布/订阅 方式(publish/subscriber Messaging)
发布/订阅方式用于多接收客户端的方式.作为发布订阅的方式,可能存在多个接收客户端,并且接收端客户端与发送客户端存在时间上的依赖。一个接收端只能接收他创建以后发送客户端发送的信息。作为subscriber ,在接收消息时有两种方法,destination的receive方法,和实现message listener 接口的onMessage 方法。
3.1.2ActiviteMQ接收和发送消息基本流程
发送消息的基本步骤:
(1)、创建连接使用的工厂类JMS ConnectionFactory
(2)、使用管理对象JMS ConnectionFactory建立连接Connection,并启动
(3)、使用连接Connection 建立会话Session
(4)、使用会话Session和管理对象Destination创建消息生产者MessageSender
(5)、使用消息生产者MessageSender发送消息
消息接收者从JMS接受消息的步骤
(1)、创建连接使用的工厂类JMS ConnectionFactory
(2)、使用管理对象JMS ConnectionFactory建立连接Connection,并启动
(3)、使用连接Connection 建立会话Session
(4)、使用会话Session和管理对象Destination创建消息接收者MessageReceiver
(5)、使用消息接收者MessageReceiver接受消息,需要用setMessageListener将MessageListener接口绑定到MessageReceiver消息接收者必须实现了MessageListener接口,需要定义onMessage事件方法。
3.1.3使用java代码实现收发消息
1.使用JMS方式发送接收消息
发送方代码
package com.dragon.activemq.demo;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
/**
* @author Dragon
* @version V0.1
* @Title: com.dragon.activemq.demo.MessageSender
* @project ActiveMQ-5.15.0
* @Description: 使用JMS方式发送接收消息 - 消息发送者
* @date 2017/10/01 21:16
*/
public class MessageSender {
// 发送次数
public static final int SEND_NUM = 5;
// tcp 地址
public static final String BROKER_URL = "tcp://localhost:61616";
// 目标,在ActiveMQ管理员控制台创建 http://localhost:8161/admin/queues.jsp
public static final String DESTINATION = "sagedragon.mq.queue";
/**
* <b>function:</b> 发送消息
*
* @param session
* @param producer
* @throws Exception
*/
public static void sendMessage(Session session, MessageProducer producer) throws Exception {
for (int i = 0; i < SEND_NUM; i++) {
String message = "发送消息第" + (i + 1) + "条";
TextMessage text = session.createTextMessage(message);
System.out.println(message);
producer.send(text);
}
}
public static void run() throws Exception {
Connection connection = null;
Session session = null;
try {
// 创建链接工厂
ConnectionFactory factory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, BROKER_URL);
// 通过工厂创建一个连接
connection = factory.createConnection();
// 启动连接
connection.start();
// 创建一个session会话
session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
// 创建一个消息队列
Destination destination = session.createQueue(DESTINATION);
// 创建消息制作者
MessageProducer producer = session.createProducer(destination);
// 设置持久化模式
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
sendMessage(session, producer);
// 提交会话
session.commit();
} catch (Exception e) {
throw e;
} finally {
// 关闭释放资源
if (session != null) {
session.close();
}
if (connection != null) {
connection.close();
}
}
}
public static void main(String[] args) throws Exception {
MessageSender.run();
}
}
输出
发送消息第1条
21:20:28.651 [main] DEBUG org.apache.activemq.TransactionContext - Begin:TX:ID:xxx-49962-1506864009010-1:1:1
发送消息第2条
发送消息第3条
发送消息第4条
发送消息第5条
接收方
package com.dragon.activemq.demo;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;
/**
* @author Dragon
* @version V0.1
* @Title: com.dragon.activemq.demo.MessageSender
* @project ActiveMQ-5.15.0
* @Description: 使用JMS方式发送接收消息 - 消息接收者
* @date 2017/10/01 21:16
*/
public class MessageReceiver {
// tcp 地址
public static final String BROKER_URL = "tcp://localhost:61616";
// 目标,在ActiveMQ管理员控制台创建 http://localhost:8161/admin/queues.jsp
public static final String DESTINATION = "sagedragon.mq.queue";
public static void run() throws Exception {
Connection connection = null;
Session session = null;
try {
// 创建链接工厂
ConnectionFactory factory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection
.DEFAULT_PASSWORD, BROKER_URL);
// 通过工厂创建一个连接
connection = factory.createConnection();
// 启动连接
connection.start();
// 创建一个session会话
session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
// 创建一个消息队列
Destination destination = session.createQueue(DESTINATION);
// 创建消息制作者
MessageConsumer consumer = session.createConsumer(destination);
while (true) {
// 接收数据的时间(等待) 100 ms
Message message = consumer.receive(1000 * 100);
TextMessage text = (TextMessage) message;
if (text != null) {
System.out.println("接收:" + text.getText());
} else {
break;
}
}
// 提交会话
session.commit();
} catch (Exception e) {
throw e;
} finally {
// 关闭释放资源
if (session != null) {
session.close();
}
if (connection != null) {
connection.close();
}
}
}
public static void main(String[] args) throws Exception {
MessageReceiver.run();
}
}
输出
21:25:08.718 [main] DEBUG org.apache.activemq.TransactionContext - Begin:TX:ID:xxx-49992-1506864308500-1:1:1
接收:发送消息第1条
接收:发送消息第2条
接收:发送消息第3条
接收:发送消息第4条
接收:发送消息第5条
2.Queue队列方式发送点对点消息数据
发送方
package com.dragon.activemq.demo;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.MapMessage;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueSession;
import javax.jms.Session;
/**
* @author Dragon
* @version V0.1
* @Title: com.dragon.activemq.demo.QueueSender
* @project ActiveMQ-5.15.0
* @Description: Queue队列方式发送点对点消息数据 - 消息发送者
* @date 2017/10/01 21:16
*/
public class QueueSender {
// 发送次数
public static final int SEND_NUM = 5;
// tcp 地址
public static final String BROKER_URL = "tcp://localhost:61616";
// 目标,在ActiveMQ管理员控制台创建 http://localhost:8161/admin/queues.jsp
public static final String DESTINATION = "sagedragon.mq.queue";
/**
* <b>function:</b> 发送消息
*
* @param session
* @param sender
* @throws Exception
*/
public static void sendMessage(QueueSession session, javax.jms.QueueSender sender) throws Exception {
for (int i = 0; i < SEND_NUM; i++) {
String message = "发送消息第" + (i + 1) + "条";
MapMessage map = session.createMapMessage();
map.setString("text", message);
map.setLong("time", System.currentTimeMillis());
System.out.println(map);
sender.send(map);
}
}
public static void run() throws Exception {
QueueConnection connection = null;
QueueSession session = null;
try {
// 创建链接工厂
QueueConnectionFactory factory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, BROKER_URL);
// 通过工厂创建一个连接
connection = factory.createQueueConnection();
// 启动连接
connection.start();
// 创建一个session会话
session = connection.createQueueSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
// 创建一个消息队列
Queue queue = session.createQueue(DESTINATION);
// 创建消息发送者
javax.jms.QueueSender sender = session.createSender(queue);
// 设置持久化模式
sender.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
sendMessage(session, sender);
// 提交会话
session.commit();
} catch (Exception e) {
throw e;
} finally {
// 关闭释放资源
if (session != null) {
session.close();
}
if (connection != null) {
connection.close();
}
}
}
public static void main(String[] args) throws Exception {
QueueSender.run();
}
}
输出
21:27:31.827 [main] DEBUG org.apache.activemq.transport.WireFormatNegotiator - Sending: WireFormatInfo { version=12, properties={StackTraceEnabled=true, PlatformDetails=JVM: 1.8.0_77, 25.77-b03, Oracle Corporation, OS: Windows 10, 10.0, amd64, CacheEnabled=true, Host=localhost, TcpNoDelayEnabled=true, SizePrefixDisabled=false, CacheSize=1024, ProviderName=ActiveMQ, TightEncodingEnabled=true, MaxFrameSize=9223372036854775807, MaxInactivityDuration=30000, MaxInactivityDurationInitalDelay=10000, ProviderVersion=5.15.0}, magic=[A,c,t,i,v,e,M,Q]}
21:27:31.834 [ActiveMQ Transport: tcp://localhost/127.0.0.1:61616@50006] DEBUG org.apache.activemq.transport.WireFormatNegotiator - Received WireFormat: WireFormatInfo { version=12, properties={TcpNoDelayEnabled=true, SizePrefixDisabled=false, CacheSize=1024, ProviderName=ActiveMQ, StackTraceEnabled=true, PlatformDetails=JVM: 1.8.0_77, 25.77-b03, Oracle Corporation, OS: Windows 10, 10.0, amd64, CacheEnabled=true, TightEncodingEnabled=true, MaxFrameSize=9223372036854775807, MaxInactivityDuration=30000, MaxInactivityDurationInitalDelay=10000, ProviderVersion=5.15.0}, magic=[A,c,t,i,v,e,M,Q]}
21:27:31.835 [main] DEBUG org.apache.activemq.transport.InactivityMonitor - Using min of local: WireFormatInfo { version=12, properties={StackTraceEnabled=true, PlatformDetails=JVM: 1.8.0_77, 25.77-b03, Oracle Corporation, OS: Windows 10, 10.0, amd64, CacheEnabled=true, Host=localhost, TcpNoDelayEnabled=true, SizePrefixDisabled=false, CacheSize=1024, ProviderName=ActiveMQ, TightEncodingEnabled=true, MaxFrameSize=9223372036854775807, MaxInactivityDuration=30000, MaxInactivityDurationInitalDelay=10000, ProviderVersion=5.15.0}, magic=[A,c,t,i,v,e,M,Q]} and remote: WireFormatInfo { version=12, properties={TcpNoDelayEnabled=true, SizePrefixDisabled=false, CacheSize=1024, ProviderName=ActiveMQ, StackTraceEnabled=true, PlatformDetails=JVM: 1.8.0_77, 25.77-b03, Oracle Corporation, OS: Windows 10, 10.0, amd64, CacheEnabled=true, TightEncodingEnabled=true, MaxFrameSize=9223372036854775807, MaxInactivityDuration=30000, MaxInactivityDurationInitalDelay=10000, ProviderVersion=5.15.0}, magic=[A,c,t,i,v,e,M,Q]}
21:27:31.836 [ActiveMQ Transport: tcp://localhost/127.0.0.1:61616@50006] DEBUG org.apache.activemq.transport.WireFormatNegotiator - tcp://localhost/127.0.0.1:61616@50006 before negotiation: OpenWireFormat{version=12, cacheEnabled=false, stackTraceEnabled=false, tightEncodingEnabled=false, sizePrefixDisabled=false, maxFrameSize=9223372036854775807}
21:27:31.841 [ActiveMQ Transport: tcp://localhost/127.0.0.1:61616@50006] DEBUG org.apache.activemq.transport.WireFormatNegotiator - tcp://localhost/127.0.0.1:61616@50006 after negotiation: OpenWireFormat{version=12, cacheEnabled=true, stackTraceEnabled=true, tightEncodingEnabled=true, sizePrefixDisabled=false, maxFrameSize=9223372036854775807}
ActiveMQMapMessage {commandId = 0, responseRequired = false, messageId = null, originalDestination = null, originalTransactionId = null, producerId = null, destination = null, transactionId = null, expiration = 0, timestamp = 0, arrival = 0, brokerInTime = 0, brokerOutTime = 0, correlationId = null, replyTo = null, persistent = false, type = null, priority = 0, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = null, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = false, readOnlyBody = false, droppable = false, jmsXGroupFirstForConsumer = false} ActiveMQMapMessage{ theTable = {text=发送消息第1条, time=1506864451941} }
21:27:31.966 [main] DEBUG org.apache.activemq.TransactionContext - Begin:TX:ID:Mr-Dragon-50004-1506864451704-1:1:1
ActiveMQMapMessage {commandId = 0, responseRequired = false, messageId = null, originalDestination = null, originalTransactionId = null, producerId = null, destination = null, transactionId = null, expiration = 0, timestamp = 0, arrival = 0, brokerInTime = 0, brokerOutTime = 0, correlationId = null, replyTo = null, persistent = false, type = null, priority = 0, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = null, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = false, readOnlyBody = false, droppable = false, jmsXGroupFirstForConsumer = false} ActiveMQMapMessage{ theTable = {text=发送消息第2条, time=1506864451971} }
ActiveMQMapMessage {commandId = 0, responseRequired = false, messageId = null, originalDestination = null, originalTransactionId = null, producerId = null, destination = null, transactionId = null, expiration = 0, timestamp = 0, arrival = 0, brokerInTime = 0, brokerOutTime = 0, correlationId = null, replyTo = null, persistent = false, type = null, priority = 0, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = null, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = false, readOnlyBody = false, droppable = false, jmsXGroupFirstForConsumer = false} ActiveMQMapMessage{ theTable = {text=发送消息第3条, time=1506864451973} }
ActiveMQMapMessage {commandId = 0, responseRequired = false, messageId = null, originalDestination = null, originalTransactionId = null, producerId = null, destination = null, transactionId = null, expiration = 0, timestamp = 0, arrival = 0, brokerInTime = 0, brokerOutTime = 0, correlationId = null, replyTo = null, persistent = false, type = null, priority = 0, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = null, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = false, readOnlyBody = false, droppable = false, jmsXGroupFirstForConsumer = false} ActiveMQMapMessage{ theTable = {text=发送消息第4条, time=1506864451974} }
ActiveMQMapMessage {commandId = 0, responseRequired = false, messageId = null, originalDestination = null, originalTransactionId = null, producerId = null, destination = null, transactionId = null, expiration = 0, timestamp = 0, arrival = 0, brokerInTime = 0, brokerOutTime = 0, correlationId = null, replyTo = null, persistent = false, type = null, priority = 0, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = null, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = false, readOnlyBody = false, droppable = false, jmsXGroupFirstForConsumer = false} ActiveMQMapMessage{ theTable = {text=发送消息第5条, time=1506864451975} }
21:27:31.976 [main] DEBUG org.apache.activemq.ActiveMQSession - ID:Mr-Dragon-50004-1506864451704-1:1:1 Transaction Commit :TX:ID:Mr-Dragon-50004-1506864451704-1:1:1
接收方
package com.dragon.activemq.demo;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueSession;
import javax.jms.Session;
/**
* @author Dragon
* @version V0.1
* @Title: com.dragon.activemq.demo.QueueReceiver
* @project ActiveMQ-5.15.0
* @Description: Queue队列方式发送点对点消息数据 - 消息发送者
* @date 2017/10/01 21:16
*/
public class QueueReceiver {
// tcp 地址
public static final String BROKER_URL = "tcp://localhost:61616";
// 目标,在ActiveMQ管理员控制台创建 http://localhost:8161/admin/queues.jsp
public static final String TARGET = "sagedragon.mq.queue";
public static void run() throws Exception {
QueueConnection connection = null;
QueueSession session = null;
try {
// 创建链接工厂
QueueConnectionFactory factory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection
.DEFAULT_PASSWORD, BROKER_URL);
// 通过工厂创建一个连接
connection = factory.createQueueConnection();
// 启动连接
connection.start();
// 创建一个session会话
session = connection.createQueueSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
// 创建一个消息队列
Queue queue = session.createQueue(TARGET);
// 创建消息制作者
javax.jms.QueueReceiver receiver = session.createReceiver(queue);
receiver.setMessageListener(new MessageListener() {
public void onMessage(Message msg) {
if (msg != null) {
MapMessage map = (MapMessage) msg;
try {
System.out.println(map.getLong("time") + "接收#" + map.getString("text"));
} catch (JMSException e) {
e.printStackTrace();
}
}
}
});
// 休眠100ms再关闭
Thread.sleep(1000 * 100);
// 提交会话
session.commit();
} catch (Exception e) {
throw e;
} finally {
// 关闭释放资源
if (session != null) {
session.close();
}
if (connection != null) {
connection.close();
}
}
}
public static void main(String[] args) throws Exception {
QueueReceiver.run();
}
}
输出 , 爆了个类型转换错误
21:31:36.409 [ActiveMQ Session Task-1] ERROR org.apache.activemq.ActiveMQMessageConsumer - ID:Mr-Dragon-50055-1506864696129-1:1:1:1 Exception while processing message: ID:Mr-Dragon-49962-1506864009010-1:1:1:1:5
java.lang.ClassCastException: org.apache.activemq.command.ActiveMQTextMessage cannot be cast to javax.jms.MapMessage
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
1506864451941接收#发送消息第1条
1506864451971接收#发送消息第2条
1506864451973接收#发送消息第3条
1506864451974接收#发送消息第4条
1506864451975接收#发送消息第5条
3.Topic主题发布和订阅消息
发送方
package com.dragon.activemq.demo;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.MapMessage;
import javax.jms.Session;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicPublisher;
import javax.jms.TopicSession;
/**
* @author Dragon
* @version V0.1
* @Title: com.dragon.activemq.demo.QueueSender
* @project ActiveMQ-5.15.0
* @Description: Topic主题发布和订阅消息 - 消息发送者
* @date 2017/10/01 21:16
*/
public class TopicSender {
// 发送次数
public static final int SEND_NUM = 5;
// tcp 地址
public static final String BROKER_URL = "tcp://localhost:61616";
// 目标,在ActiveMQ管理员控制台创建 http://localhost:8161/admin/queues.jsp
public static final String DESTINATION = "sagedragon.mq.queue";
/**
* <b>function:</b> 发送消息
*
* @param session 会话
* @param publisher 发布者
* @throws Exception
*/
public static void sendMessage(TopicSession session, TopicPublisher publisher) throws Exception {
for (int i = 0; i < SEND_NUM; i++) {
String message = "发送消息第" + (i + 1) + "条";
MapMessage map = session.createMapMessage();
map.setString("text", message);
map.setLong("time", System.currentTimeMillis());
System.out.println(map);
publisher.send(map);
}
}
public static void run() throws Exception {
TopicConnection connection = null;
TopicSession session = null;
try {
// 创建链接工厂
TopicConnectionFactory factory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, BROKER_URL);
// 通过工厂创建一个连接
connection = factory.createTopicConnection();
// 启动连接
connection.start();
// 创建一个session会话
session = connection.createTopicSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
// 创建一个消息队列
Topic topic = session.createTopic(DESTINATION);
// 创建消息发送者
TopicPublisher publisher = session.createPublisher(topic);
// 设置持久化模式
publisher.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
sendMessage(session, publisher);
// 提交会话
session.commit();
} catch (Exception e) {
throw e;
} finally {
// 关闭释放资源
if (session != null) {
session.close();
}
if (connection != null) {
connection.close();
}
}
}
public static void main(String[] args) throws Exception {
TopicSender.run();
}
}
输出
ActiveMQMapMessage {commandId = 0, responseRequired = false, messageId = null, originalDestination = null, originalTransactionId = null, producerId = null, destination = null, transactionId = null, expiration = 0, timestamp = 0, arrival = 0, brokerInTime = 0, brokerOutTime = 0, correlationId = null, replyTo = null, persistent = false, type = null, priority = 0, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = null, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = false, readOnlyBody = false, droppable = false, jmsXGroupFirstForConsumer = false} ActiveMQMapMessage{ theTable = {text=发送消息第1条, time=1506865178417} }
21:39:38.425 [main] DEBUG org.apache.activemq.TransactionContext - Begin:TX:ID:Mr-Dragon-50129-1506865178168-1:1:1
ActiveMQMapMessage {commandId = 0, responseRequired = false, messageId = null, originalDestination = null, originalTransactionId = null, producerId = null, destination = null, transactionId = null, expiration = 0, timestamp = 0, arrival = 0, brokerInTime = 0, brokerOutTime = 0, correlationId = null, replyTo = null, persistent = false, type = null, priority = 0, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = null, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = false, readOnlyBody = false, droppable = false, jmsXGroupFirstForConsumer = false} ActiveMQMapMessage{ theTable = {text=发送消息第2条, time=1506865178429} }
ActiveMQMapMessage {commandId = 0, responseRequired = false, messageId = null, originalDestination = null, originalTransactionId = null, producerId = null, destination = null, transactionId = null, expiration = 0, timestamp = 0, arrival = 0, brokerInTime = 0, brokerOutTime = 0, correlationId = null, replyTo = null, persistent = false, type = null, priority = 0, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = null, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = false, readOnlyBody = false, droppable = false, jmsXGroupFirstForConsumer = false} ActiveMQMapMessage{ theTable = {text=发送消息第3条, time=1506865178430} }
ActiveMQMapMessage {commandId = 0, responseRequired = false, messageId = null, originalDestination = null, originalTransactionId = null, producerId = null, destination = null, transactionId = null, expiration = 0, timestamp = 0, arrival = 0, brokerInTime = 0, brokerOutTime = 0, correlationId = null, replyTo = null, persistent = false, type = null, priority = 0, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = null, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = false, readOnlyBody = false, droppable = false, jmsXGroupFirstForConsumer = false} ActiveMQMapMessage{ theTable = {text=发送消息第4条, time=1506865178432} }
ActiveMQMapMessage {commandId = 0, responseRequired = false, messageId = null, originalDestination = null, originalTransactionId = null, producerId = null, destination = null, transactionId = null, expiration = 0, timestamp = 0, arrival = 0, brokerInTime = 0, brokerOutTime = 0, correlationId = null, replyTo = null, persistent = false, type = null, priority = 0, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = null, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = false, readOnlyBody = false, droppable = false, jmsXGroupFirstForConsumer = false} ActiveMQMapMessage{ theTable = {text=发送消息第5条, time=1506865178433} }
接收方
package com.dragon.activemq.demo;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;
/**
* @author Dragon
* @version V0.1
* @Title: com.dragon.activemq.demo.QueueSender
* @project ActiveMQ-5.15.0
* @Description: Topic主题发布和订阅消息 - 消息接收者,依赖hawtbuf-1.9.jar
* @date 2017/10/01 21:16
*/
public class TopicReceiver {
// tcp 地址
public static final String BROKER_URL = "tcp://localhost:61616";
// 目标,在ActiveMQ管理员控制台创建 http://localhost:8161/admin/queues.jsp
public static final String TARGET = "sagedragon.mq.queue";
public static void run() throws Exception {
TopicConnection connection = null;
TopicSession session = null;
try {
// 创建链接工厂
TopicConnectionFactory factory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection
.DEFAULT_PASSWORD, BROKER_URL);
// 通过工厂创建一个连接
connection = factory.createTopicConnection();
// 启动连接
connection.start();
// 创建一个session会话
session = connection.createTopicSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
// 创建一个消息队列
Topic topic = session.createTopic(TARGET);
// 创建消息制作者
TopicSubscriber subscriber = session.createSubscriber(topic);
subscriber.setMessageListener(new MessageListener() {
public void onMessage(Message msg) {
if (msg != null) {
MapMessage map = (MapMessage) msg;
try {
System.out.println(map.getLong("time") + "接收#" + map.getString("text"));
} catch (JMSException e) {
e.printStackTrace();
}
}
}
});
// 休眠100ms再关闭
Thread.sleep(1000 * 100);
// 提交会话
session.commit();
} catch (Exception e) {
throw e;
} finally {
// 关闭释放资源
if (session != null) {
session.close();
}
if (connection != null) {
connection.close();
}
}
}
public static void main(String[] args) throws Exception {
TopicReceiver.run();
}
}
输出
1506865454228接收#发送消息第1条
21:44:36.318 [ActiveMQ InactivityMonitor WriteCheckTimer] DEBUG org.apache.activemq.transport.AbstractInactivityMonitor - WriteChecker: 10881ms elapsed since last write check.
21:44:36.373 [ActiveMQ InactivityMonitor Worker] DEBUG org.apache.activemq.transport.AbstractInactivityMonitor - Running WriteCheck[tcp://127.0.0.1:61616]
1506865454236接收#发送消息第2条
1506865454237接收#发送消息第3条
1506865454237接收#发送消息第4条
1506865454238接收#发送消息第5条
4.在spring中使用ActiveMQ
xml配置: applicationContext-beans.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.1.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-3.1.xsd">
<!-- 连接池 -->
<bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop">
<property name="connectionFactory">
<bean class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://localhost:61616"/>
</bean>
</property>
</bean>
<!-- 连接工厂 -->
<bean id="activeMQConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://localhost:61616"/>
</bean>
<!-- 配置消息目标 -->
<bean id="destination" class="org.apache.activemq.command.ActiveMQQueue">
<!-- 目标,在ActiveMQ管理员控制台创建 http://localhost:8161/admin/queues.jsp -->
<constructor-arg index="0" value="sagedragon.mq.queue"/>
</bean>
<!-- 消息模板 -->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="activeMQConnectionFactory"/>
<property name="defaultDestination" ref="destination"/>
<property name="messageConverter">
<bean class="org.springframework.jms.support.converter.SimpleMessageConverter"/>
</property>
</bean>
</beans>
pom依赖
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.15.0</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-spring</artifactId>
<version>5.15.0</version>
</dependency>
发送方
package com.dragon.activemq.demo;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.FileSystemXmlApplicationContext;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.Session;
import java.util.Date;
/**
* @author Dragon
* @version V0.1
* @Title: com.dragon.activemq.demo.SpringSender
* @Description: Spring JMSTemplate 消息发送者
* @project ActiveMQ-5.15.0
* @date 2017/10/01 21:47
*/
public class SpringSender {
public static void main(String[] args) {
ApplicationContext ctx = new FileSystemXmlApplicationContext("classpath:applicationContext-*.xml");
JmsTemplate jmsTemplate = (JmsTemplate) ctx.getBean("jmsTemplate");
jmsTemplate.send(new MessageCreator() {
public Message createMessage(Session session) throws JMSException {
MapMessage message = session.createMapMessage();
message.setString("message", "current system time: " + new Date().getTime());
return message;
}
});
}
}
输出
21:59:36.021 [main] DEBUG org.springframework.jms.core.JmsTemplate - Sending created message: ActiveMQMapMessage {commandId = 0, responseRequired = false, messageId = null, originalDestination = null, originalTransactionId = null, producerId = null, destination = null, transactionId = null, expiration = 0, timestamp = 0, arrival = 0, brokerInTime = 0, brokerOutTime = 0, correlationId = null, replyTo = null, persistent = false, type = null, priority = 0, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = null, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = false, readOnlyBody = false, droppable = false, jmsXGroupFirstForConsumer = false} ActiveMQMapMessage{ theTable = {message=current system time: 1506866376016} }
接收方
package com.dragon.activemq.demo;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.FileSystemXmlApplicationContext;
import org.springframework.jms.core.JmsTemplate;
import java.util.Map;
/**
* @author Dragon
* @version V0.1
* @Title: com.dragon.activemq.demo.SpringSender
* @Description: Spring JMSTemplate 消息接收者
* @project ActiveMQ-5.15.0
* @date 2017/10/01 21:47
*/
public class SpringReceiver {
@SuppressWarnings("unchecked")
public static void main(String[] args) {
ApplicationContext ctx = new FileSystemXmlApplicationContext("classpath:applicationContext-*.xml");
JmsTemplate jmsTemplate = (JmsTemplate) ctx.getBean("jmsTemplate");
while (true) {
Map<String, Object> map = (Map<String, Object>) jmsTemplate.receiveAndConvert();
System.out.println("收到消息:" + map.get("message"));
}
}
}
输出
收到消息:current system time: 1506866100187
5.在springboot中使用ActiveMQ
pom.xml
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
application.yml
spring:
application:
name: activemq-demo
server:
port: 9090
发送方
/**
* @author Dragon
* @version V0.1
* @Title: com.dragon.activemq.demo.springboot.demo.MessageProduction
* @Description: 消息生产者
* @date 2017/10/01 23:14
*/
//注册为一个bean
@Component
//开启定时器
@EnableScheduling
public class MessageProduction {
@Autowired
private JmsMessagingTemplate jmsMessagingTemplate;//使用JmsMessagingTemplate将消息放入队列
@Autowired
private Queue queue;
@Scheduled(fixedDelay = 3000)//每3s执行1次,将消息放入队列内
public void send() {
this.jmsMessagingTemplate.convertAndSend(this.queue,
"测试消息队列" + System.currentTimeMillis() / 1000);
}
}
输出
消息队列
/**
* @author Dragon
* @version V0.1
* @Title: com.dragon.activemq.demo.springboot.demo.MessageQueue
* @Description: 队列消息发送者
* @date 2017/10/01 23:16
*/
@Component
public class MessageQueue {
//返回一个名为my-message的队列,并且注册为bean
@Bean
public Queue queue() {
return new ActiveMQQueue("my-message");
}
}
接收方
package com.dragon.activemq.demo.springboot.demo;
import com.dragon.activemq.demo.springboot.Constants;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;
import javax.jms.JMSException;
import java.text.MessageFormat;
import java.util.Date;
/**
* @author Dragon
* @version V0.1
* @Title: com.dragon.activemq.demo.springboot.demo.MessageListener
* @Description: 消息监听者
* @date 2017/10/01 23:16
*/
@Component
public class MessageListener {
/**
* 使用@JmsListener注解来监听指定的某个队列内的消息,是否有新增,有的话则取出队列内消息
* 进行处理
**/
@JmsListener(destination = "my-message")
public void removeMessage(String msg) {
//public void removeMessage(Email email) {
System.out.println("监听接收到的消息是:" + msg);//打印队列内的消息
}
/*@JmsListener(destination = "mailbox", containerFactory = "myFactory")
public void receiveMessage(Email email) {
System.out.println("Received <" + email + ">");
}*/
}
启动ActiveMQApplication后的输出
监听接收到的消息是:测试消息队列1506875404
监听接收到的消息是:测试消息队列1506875407
监听接收到的消息是:测试消息队列1506875410
监听接收到的消息是:测试消息队列1506875413
ActiveMQApplication.java
package com.dragon.activemq.demo;
import com.dragon.activemq.demo.springboot.demo.Email;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.jms.DefaultJmsListenerContainerFactoryConfigurer;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.jms.annotation.EnableJms;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.config.JmsListenerContainerFactory;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.support.converter.MappingJackson2MessageConverter;
import org.springframework.jms.support.converter.MessageConverter;
import org.springframework.jms.support.converter.MessageType;
import javax.jms.ConnectionFactory;
/**
* @author Dragon
* @version V0.1
* @Title: com.dragon.activemq.demo.ActiveMQApplication
* @Description: ActiveMQ 启动类
* @date 2017/10/01 22:20
*/
@SpringBootApplication(scanBasePackages = {"com.dragon"})////扫描com.dragon包的注解类为bean
@EnableJms//开启jms
public class ActiveMQApplication {
/**
* 将springboot里面的消息加到jms监听工厂
* @param connectionFactory
* @param configurer
* @return
*/
@Bean
public JmsListenerContainerFactory<?> myFactory(ConnectionFactory connectionFactory,
DefaultJmsListenerContainerFactoryConfigurer configurer) {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
// This provides all boot's default to this factory, including the message converter
configurer.configure(factory, connectionFactory);
// You could still override some of Boot's default if necessary.
return factory;
}
/**
* 转换消息格式, 没有这个会爆类型转换错误:
* Caused by: org.springframework.messaging.converter.MessageConversionException:
* Cannot convert from [org.apache.activemq.command .ActiveMQTextMessage]
* to
* [com.dragon.activemq.demo.springboot.demo.Email] / [java.lang.String]
* for org.springframework.jms.listener.adapter.AbstractAdaptableMessageListener$MessagingMessageConverterAdapter$LazyResolutionMessage@3189b1ff
* @return
*/
@Bean // Serialize message content to json using TextMessage
public MessageConverter jacksonJmsMessageConverter() {
MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
converter.setTargetType(MessageType.TEXT);
converter.setTypeIdPropertyName("_type");
return converter;
}
public static void main(String[] args) {
// Launch the application
ConfigurableApplicationContext context = SpringApplication.run(ActiveMQApplication.class, args);
//JmsTemplate jmsTemplate = context.getBean(JmsTemplate.class);
// Send a message with a POJO - the template reuse the message converter
//System.out.println("Sending an email message.");
//jmsTemplate.convertAndSend("mailbox", new Email("info@example.com", "Hello"));
}
}
4.遇到的坑
4.1消息类型转换错误
Caused by: org.springframework.messaging.converter.MessageConversionException: Cannot convert from [org.apache.activemq.command.ActiveMQTextMessage] to [com.dragon.activemq.demo.springboot.demo.Email] for org.springframework.jms.listener.adapter.AbstractAdaptableMessageListener$MessagingMessageConverterAdapter$LazyResolutionMessage@3189b1ff
解决方法:
在application类里加个bean
@Bean // Serialize message content to json using TextMessage
public MessageConverter jacksonJmsMessageConverter() {
MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
converter.setTargetType(MessageType.TEXT);
converter.setTypeIdPropertyName("_type");
return converter;
}
4.2发送消息之后消费不了
解决方法:
将springboot里的消息加到jms工厂
@Bean
public JmsListenerContainerFactory<?> myFactory(ConnectionFactory connectionFactory,
DefaultJmsListenerContainerFactoryConfigurer configurer) {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
// This provides all boot's default to this factory, including the message converter
configurer.configure(factory, connectionFactory);
// You could still override some of Boot's default if necessary.
return factory;
}
4.3实体类序列化后报错
Caused by: org.springframework.jms.support.converter.MessageConversionException: Could not convert JMS message; nested exception is javax.jms.JMSException: Failed to build body from content. Serializable class not available to broker. Reason: java.lang.ClassNotFoundException: Forbidden class com.dragon.activemq.demo.springboot.demo.Email! This class is not trusted to be serialized as ObjectMessage payload. Please take a look at http://activemq.apache.org/objectmessage.html for more information on how to configure trusted classes.
报错的详细讲解
http://activemq.apache.org/objectmessage.html
解决方法:
设置可以序列化的包列表, 加到vm启动参数里面!
#全部包
#org.apache.activemq.SERIALIZABLE_PACKAGES=*
org.apache.activemq.SERIALIZABLE_PACKAGES=com.dragon.activemq,com.dragon.demo
5.参考资料
ActiveMQ 4.x / 5.x Getting Started
spring-boot集成activeMQ(一)-使用默认的ActiveMQ
http://activemq.apache.org/objectmessage.html
最后感谢观看本教程!
@ 作者:龙圣贤
@ 写作日期:2017年10月02日 01:05:14
@ 转载请简信联系,并且带上文章出处。