准备
ActiveMQ下载 ActiveMQ 5.14.5 Release 下载Windows Distribution
注:运行环境需要装jdk
代码
运行
解压下载文件,根据本地系统选择运行win32/win64文件夹的 activemq.bat
双击运行如下
浏览器访问 http://localhost:8161/ ,用户名和密码均为 admin
Hello World (点对点的消息模型)
- 使用maven来构建项目,pom配置
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.14.5</version>
</dependency>
- 项目目录
- 编写消息生产者
package com.sima.queues;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
* Created by Maple on 2017-05-28.
*/
public class JMSProducer {
private static final String USERNAME = ActiveMQConnection.DEFAULT_USER; // 默认的连接用户名
private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD; // 默认的连接密码
private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL; // 默认的连接地址
// private static final String BROKEURL= "tcp://localhost:8161"; // 默认的连接地址
private static final int SENDNUM = 10; // 发送的消息数量
public static void main(String[] args){
ConnectionFactory connectionFactory; // 连接工厂
Connection connection = null; // 连接
Session session; // 会话 接受或者发送消息的线程
Destination destination; // 消息的目的地
MessageProducer messageProducer; // 消息生产者
connectionFactory = new ActiveMQConnectionFactory(JMSProducer.USERNAME,
JMSProducer.PASSWORD, JMSProducer.BROKEURL);
try {
connection = connectionFactory.createConnection(); // 通过连接工厂获取连接
connection.start(); // 启动连接
session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); // 创建Session
destination = session.createQueue("TestQueueFirst"); // 创建消息队列
messageProducer = session.createProducer(destination); // 创建消息生产者
//设置不持久化
// messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
sendMessage(session, messageProducer); // 发送消息
session.commit();
} catch (JMSException e) {
e.printStackTrace();
}finally {
if(connection!=null){
try {
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
private static void sendMessage(Session session, MessageProducer messageProducer) {
for(int i=0;i<JMSProducer.SENDNUM;i++){
TextMessage message= null;
try {
message = session.createTextMessage("ActiveMQ 发送的消息-"+i);
System.out.println("发送消息:"+"ActiveMQ 发送的消息-"+i);
messageProducer.send(message);
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
- 编写消息消费者
package com.sima.queues;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
* Created by Maple on 2017-05-28.
*/
public class JMSConsumerFirst {
private static final String USERNAME = ActiveMQConnection.DEFAULT_USER; // 默认的连接用户名
private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD; // 默认的连接密码
private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL; // 默认的连接地址
// private static final String BROKEURL= "tcp://localhost:8161"; // 默认的连接地址
public static void main(String[] args){
ConnectionFactory connectionFactory; // 连接工厂
Connection connection = null; // 连接
Session session; // 会话 接受或者发送消息的线程
Destination destination; // 消息的目的地
MessageConsumer messageConsumer; // 消息的消费者
// 实例化连接工厂
connectionFactory=new ActiveMQConnectionFactory(JMSConsumerFirst.USERNAME, JMSConsumerFirst.PASSWORD, JMSConsumerFirst.BROKEURL);
try {
connection=connectionFactory.createConnection(); // 通过连接工厂获取连接
connection.start(); // 启动连接
session=connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); // 创建Session
destination=session.createQueue("TestQueueFirst"); // 创建连接的消息队列
messageConsumer=session.createConsumer(destination); // 创建消息消费者
while(true){
TextMessage textMessage=(TextMessage)messageConsumer.receive(100000);
if(textMessage!=null){
System.out.println("收到的消息:"+textMessage.getText());
}else{
break;
}
}
} catch (JMSException e) {
e.printStackTrace();
}
}
}
- 生成消息
运行 JMSProducer,查看 http://localhost:8161/admin/queues.jsp
- 消费消息
运行 JMSConsumerFirst
查看 http://localhost:8161/admin/queues.jsp
- 此时,再运行 JMSProducer生产消息,会被消费者消费
监听方式
- 创建监听类
package com.sima.queues;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
/**
* Created by Maple on 2017-05-28.
*/
public class MyListener implements MessageListener {
public void onMessage(Message message) {
try {
System.out.println("通过MyListener收到的消息:"+((TextMessage)message).getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
}
- 修改消息消费者
该行代码
messageConsumer.setMessageListener(new MyListener());// 注册消息监听
替换
while(true){
TextMessage textMessage=(TextMessage)messageConsumer.receive(100000);
if(textMessage!=null){
System.out.println("收到的消息:"+textMessage.getText());
}else{
break;
}
}
运行结果一致,这种方式有利于代码的管理,建议采用该方式。