- ActiveMQ安装
1.先从官网下载ActiveMQ工程
http://activemq.apache.org/
下载地址:
http://www.apache.org/dyn/closer.cgi?filename=/activemq/5.15.2/apache-activemq-5.15.2-bin.zip&action=download
2.解压缩下载到的压缩包
3.进入解压后的文件夹中, 进入bin目录中, 根据自己的系统版本, 找到对应的文件启动服务
例如, 我的是windows 64 位系统, 所以, 我进入的是bin\win64, 双击activemq.bat即可启动服务了!
点对点模式
- 发送消息
//1.创建一个连接工厂对象,指定服务的IP和端口号
ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://127.0.0.1:61616");
//2.使用工厂对象创建一个连接对象
Connection connection = factory.createConnection();
//3.开启连接
connection.start();
//4.使用连接对象创建一个Session对象
//第一个参数表示是否开启事务,一般不开启:false
//当第一个参数为false时,第二个参数才会有意义
//第二个参数表示应答模式:可以是手动应答或者自动应答, 一般是自动应答:Session.AUTO_ACKNOWLEDGE
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//5.使用Session对象创建一个queue对象
Queue queue = session.createQueue("text-queue");
//6.使用Session对象创建一个生产者对象
MessageProducer producer = session.createProducer(queue);
//7.创建一条消息
TextMessage message = new ActiveMQTextMessage();
message.setText("text message");
//8.使用生产者对象发送消息
producer.send(message);
//9.关闭资源
producer.close();
session.close();
connection.close();
System.out.println("消息发送完成!");
- 接收消息
第一种方式:
//创建连接工厂
ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://127.0.0.1:61616");
//使用工厂产生连接对象
Connection connection = factory.createConnection();
//开启连接
connection.start();
//通过连接对象创建一个session对象
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//创建一个queue对象
Queue queue = session.createQueue("text-queue");
//通过session创建一个消费者
MessageConsumer consumer = session.createConsumer(queue);
//使用消费者接收消息
while(true) {
Message message = consumer.receive(2000);//2秒后没接收到消息返回null
if(message==null) {
break;
}
TextMessage textMessage = (TextMessage) message;
//打印接收到的消息
String text = textMessage.getText();
System.out.println(text);
}
//关闭资源
consumer.close();
session.close();
connection.close();
第二种方式:
//创建连接工厂
ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://127.0.0.1:61616");
//使用工厂产生连接对象
Connection connection = factory.createConnection();
//开启连接
connection.start();
//通过连接对象创建一个session对象
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//创建一个queue对象
Queue queue = session.createQueue("text-queue");
//通过session创建一个消费者
MessageConsumer consumer = session.createConsumer(queue);
//使用消费者接收消息
/*while(true) {
Message message = consumer.receive(2000);//2秒后没接收到消息返回null
if(message==null) {
break;
}
TextMessage textMessage = (TextMessage) message;
//打印接收到的消息
String text = textMessage.getText();
System.out.println(text);
}*/
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
TextMessage textMessage = (TextMessage) message;
//打印接收到的消息
String text="";
try {
text = textMessage.getText();
} catch (JMSException e) {
e.printStackTrace();
}
System.out.println(text);
}
});
System.in.read();//让程序等待, 等待消息被读取出来, 测试时用的
//关闭资源
consumer.close();
session.close();
connection.close();
订阅模式
- 发送消息
//创建连接工厂
ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://127.0.0.1:61616");
//使用工厂创建连接对象
Connection connection = factory.createConnection();
//使用开启连接
connection.start();
//使用连接对象创建Session对象
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//使用Session对象创建Topic对象
Topic topic = session.createTopic("text-topic");
//使用Session创建生产者对象
MessageProducer producer = session.createProducer(topic);
//创建一个消息对象
TextMessage textMessage = session.createTextMessage("这是一个topic消息");
//使用生产者发送消息
producer.send(textMessage);
//关闭资源
producer.close();
session.close();
connection.close();
System.out.println("消息发送完成!");
- 消息接收
//创建连接工厂
ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://127.0.0.1:61616");
//创建连接对象
Connection connection = factory.createConnection();
//开启连接
connection.start();
//创建session对象
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//创建topic对象
Topic topic = session.createTopic("text-topic");
//创建消费者对象
MessageConsumer consumer = session.createConsumer(topic);
System.out.println("客户端3启动了...");
//接收消息
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
TextMessage textMessage = (TextMessage) message;
try {
String text = textMessage.getText();
//打印消息
System.out.println(text);
} catch (JMSException e) {
e.printStackTrace();
}
}
});
//程序等待
System.in.read();
//关闭资源
consumer.close();
session.close();
connection.close();
Spring 整合 ActiveMQ
-
加入jar包:
- spring-jms.jar
- spring-context-support.jar
发送消息
在spring配置文件中加入以下内容:
<!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供 -->
<bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://192.168.25.168:61616" />
</bean>
<!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->
<bean id="connectionFactory"
class="org.springframework.jms.connection.SingleConnectionFactory">
<!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory -->
<property name="targetConnectionFactory" ref="targetConnectionFactory" />
</bean>
<!-- 配置生产者 -->
<!-- Spring提供的JMS工具类,它可以进行消息发送、接收等 -->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 -->
<property name="connectionFactory" ref="connectionFactory" />
</bean>
<!--这个是队列目的地,点对点的 -->
<bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg>
<value>spring-queue</value>
</constructor-arg>
</bean>
<!--这个是主题目的地,一对多的 -->
<bean id="topicDestination" class="org.apache.activemq.command.ActiveMQTopic">
<constructor-arg value="item-add-topic" />
</bean>
- 发送消息代码示例:
//初始化spring容器: 测试用
ApplicationContext applicationContext = new ClassPathXmlApplicationContext("classpath:/spring/applicationContext-activemq.xml");
//从容器中获得JMSTemplate对象
JmsTemplate jmsTemplate = applicationContext.getBean(JmsTemplate.class);
//从容器在获得Destination对象
Queue queue = applicationContext.getBean(Queue.class);
//第一个参数:指定发送的目的地
//第二个参数:消息的构造器对象
jmsTemplate.send(queue, new MessageCreator() {
@Override
public Message createMessage(Session session) throws JMSException {
TextMessage textMessage = session.createTextMessage("使用spring和active整合发送queue消息aaaaaa");
return textMessage;
}
});
- 接收消息
在spring配置文件中配置:
<!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供 -->
<bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://192.168.25.168:61616" />
</bean>
<!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->
<bean id="connectionFactory"
class="org.springframework.jms.connection.SingleConnectionFactory">
<!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory -->
<property name="targetConnectionFactory" ref="targetConnectionFactory" />
</bean>
<!--这个是队列目的地,点对点的 -->
<bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg>
<value>spring-queue</value>
</constructor-arg>
</bean>
<!--这个是主题目的地,一对多的 -->
<bean id="topicDestination" class="org.apache.activemq.command.ActiveMQTopic">
<constructor-arg value="item-add-topic" />
</bean>
<!-- 配置消息监听器1 -->
<bean id="myMessageListener" class="com.taotao.search.listener.MyMessageListener"/>
<!-- 消息监听容器1 -->
<bean class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory" />
<property name="destination" ref="queueDestination" />
<property name="messageListener" ref="myMessageListener" />
</bean>
<!-- 配置消息监听器2 -->
<bean id="itemAddListener" class="com.taotao.search.listener.ItemAddListener"/>
<!-- 消息监听容器2 -->
<bean class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory" />
<property name="destination" ref="topicDestination" />
<property name="messageListener" ref="itemAddListener" />
</bean>
接收消息示例代码
/**
* 接收activemq队列消息的监听器
* <p>Title: MyMessageListener</p>
* <p>Description: </p>
* <p>Company: www.itcast.cn</p>
* @version 1.0
*/
public class MyMessageListener implements MessageListener {
@Override
public void onMessage(Message message) {
//取消息的内容
try {
TextMessage textMessage = (TextMessage) message;
//取内容
String text = textMessage.getText();
System.out.println(text);
//其他业务逻辑
} catch (Exception e) {
e.printStackTrace();
}
}
}