<blockquote><h4>认识消息队列</h4></blockquote>
“消息”是在两台计算机间传送的数据单位。消息可以非常简单,例如只包含文本字符串;也可以更复杂,可能包含对象……
消息被发送到队列中。“消息队列”是在消息的传输过程中保存消息的容器。消息队列管理器在将消息从它的源中继到它的目标时充当中间人。队列的主要目的是提供路由并保证消息的传递;如果发送消息时接收者不可用,消息队列会保留消息,直到可以成功地传递它。
<blockquote><h4>JMS消息队列</h4></blockquote>
Jms即[Java消息服务](http://baike.baidu.com/view/3292569.htm)(Java Message Service)[应用程序](http://baike.baidu.com/view/330120.htm)接口是一个[Java平台](http://baike.baidu.com/view/209634.htm)中关于面向[消息中间件](http://baike.baidu.com/view/3118541.htm)(MOM)的API,用于在两个应用程序之间,或[分布式系统](http://baike.baidu.com/view/991489.htm)中发送消息,进行异步通信。Java消息服务是一个与具体平台无关的API,绝大多数MOM提供[商都](http://baike.baidu.com/view/19763.htm)对JMS提供支持。
JMS(Java Messaging Service)是[Java](http://baike.baidu.com/view/29.htm)平台上有关面向消息中间件(MOM)的技术规范,它便于消息系统中的Java[应用程序](http://baike.baidu.com/view/330120.htm)进行消息交换,并且通过提供标准的产生、发送、接收消息的接口简化[企业](http://baike.baidu.com/view/38340.htm)应用的开发,翻译为[Java](http://baike.baidu.com/view/29.htm)消息[服务](http://baike.baidu.com/view/133203.htm)。
<blockquote><h4>JMS对象模型</h4></blockquote>
1)连接工厂。连接工厂(ConnectionFactory)是由管理员创建,并绑定到JNDI树中。客户端使用JNDI查找连接工厂,然后利用连接工厂创建一个JMS连接。
2)JMS连接。JMS连接(Connection)表示JMS客户端和服务器端之间的一个活动的连接,是由客户端通过调用连接工厂的方法建立的。
3)JMS会话。JMS会话(Session)表示JMS客户与JMS服务器之间的会话状态。JMS会话建立在JMS连接上,表示客户与服务器之间的一个会话线程。
4)JMS目的。JMS目的(Destination),又称为消息队列,是实际的消息源。
5)JMS生产者和消费者。生产者(Message Producer)和消费者(Message Consumer)对象由Session对象创建,用于发送和接收消息。
6)JMS消息通常有两种类型: ① 点对点(Point-to-Point)。在点对点的消息系统中,消息分发给一个单独的使用者。点对点消息往往与队列(javax.jms.Queue)相关联。 ② 发布/订阅(Publish/Subscribe)。发布/订阅消息系统支持一个事件驱动模型,消息生产者和消费者都参与消息的传递。生产者发布事件,而使用者订阅感兴趣的事件,并使用事件。该类型消息一般与特定的主题(javax.jms.Topic)关联。
<blockquote><h4>ActiveMQ代码实现</h4></blockquote>
1)下载ActiveMQ
去官方网站下载:http://activemq.apache.org/
2)解压运行AciveMQ
解压apache-activemq-5.13.3-bin.zip文件,运行apache-activemq-5.13.3\bin\win64\activemq.bat,启动ActiveMQ,登录http://localhost:8161/admin/,创建一个Queues,命名为my-activemq
3)创建Maven项目,添加依赖POM.xml
dependencies>
<!-- activemq 相关maven依赖 -->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-core</artifactId>
<version>5.5.0</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-pool</artifactId>
<version>5.7.0</version>
</dependency>
<!-- 日志相关依赖 -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.6.1</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.6.1</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.16</version>
</dependency>
4)创建消息发送者(生产者)Sender.java
package com.zxp.activemq;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* 发送者
* 2016年5月6日 下午3:00:57
* @author zhangxiaoping
*/
public class Sender {
private static final Logger LOGGER=LoggerFactory.getLogger(Sender.class);
//默认代理地址 "failover://tcp://localhost:61616" 服务器地址不同IP修改不同的IP
private static final String BROKER_URL=ActiveMQConnection.DEFAULT_BROKER_URL;
//消息队列名称
private static final String SUBJECT="my-activemq";
private static int i=1;
public static void main(String[] args) throws JMSException, InterruptedException {
//初始化连接工厂
ConnectionFactory connectionFactory=new ActiveMQConnectionFactory(BROKER_URL);
//建立连接
Connection conn= connectionFactory.createConnection();
//启动连接
conn.start();
//创建Session,此方法第一个参数表示会话是否在事务中执行,第二个参数设定会话的应答模式
Session session= conn.createSession(false,Session.AUTO_ACKNOWLEDGE);
//创建目标队列
Destination dest = session.createQueue(SUBJECT);
//通过session创建消息的发送者
MessageProducer producer=session.createProducer(dest);
while(true){
//定义要发送的消息
TextMessage message= session.createTextMessage("======ActiveMQ发送消息===="+i+"===");
LOGGER.debug(message.getText());
//发送消息
producer.send(message);
//休眠2秒
Thread.sleep(2000);
i++;
}
// conn.close();
}
}
5)创建消息的接收者(消费者)Receiver.java
package com.zxp.activemq;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* 接收者
* 2016年5月6日 下午3:03:16
* @author zhangxiaoping
*/
public class Receiver implements MessageListener{
private static final Logger LOGGER=LoggerFactory.getLogger(Receiver.class);
//默认代理地址 "failover://tcp://localhost:61616" 服务器地址不同IP修改不同的IP
private static final String BROKER_URL=ActiveMQConnection.DEFAULT_BROKER_URL;
//消息队列名称
private static final String SUBJECT="my-activemq";
public static void main(String[] args) throws JMSException {
//初始化连接工厂
ConnectionFactory connectionFactory=new ActiveMQConnectionFactory(BROKER_URL);
//建立连接
Connection conn= connectionFactory.createConnection();
//启动连接
conn.start();
//创建Session,此方法第一个参数表示会话是否在事务中执行,第二个参数设定会话的应答模式
Session session= conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
//创建目标队列
Destination dest=session.createQueue(SUBJECT);
//通过session创建消息的接收者
MessageConsumer consumer= session.createConsumer(dest);
//初始化监听
Receiver receiver=new Receiver();
//给接收者添加监听对象
consumer.setMessageListener(receiver);
}
public void onMessage(Message arg0) {
TextMessage message=(TextMessage) arg0;
try {
LOGGER.debug("接收到消息"+message.getText());
Thread.sleep(4000);
} catch (Exception e) {
LOGGER.error("error"+e.getMessage());
}
}
}
6)运行Sender.java、Receiver.java登录http://localhost:8161/admin/查看队列信息。