简书:亚武de小文 【原创:转载请注明出处】
Rabbitmq集成
一、相关配置
-
导入maven依赖
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> <version>2.1.6.RELEASE</version> </dependency>
或者
新建项目的时候直接选中
-
在application.yml文件当中引入RabbitMQ基本的配置信息
spring: # rabbitmq基本信息 rabbitmq: host: 127.0.0.1 port: 5672 username: guest password: guest
二、代码编写
定义RabbitConfig类,配置Exchange、Queue、及绑定交换机
-
RabbitConfig.java
package com.yawu.xiaowen.springboot.config; import org.springframework.amqp.core.*; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class RabbitConfig { public static final String QUEUE_INFORM_EMAIL = "queue_inform_email"; public static final String QUEUE_INFORM_SMS = "queue_inform_sms"; public static final String EXCHANGE_TOPICS_INFORM = "exchange_topics_inform"; /** * 此处以topic交换机为例 * 交换机配置 * ExchangeBuilder提供了fanout、direct、topic、header交换机类型的配置 * * @return the exchange */ @Bean(EXCHANGE_TOPICS_INFORM) public Exchange EXCHANGE_TOPICS_INFORM() { // durable(true)持久化,消息队列重启后交换机仍然存在 return ExchangeBuilder.topicExchange(EXCHANGE_TOPICS_INFORM).durable(true).build(); } //声明队列 @Bean(QUEUE_INFORM_SMS) public Queue QUEUE_INFORM_SMS() { Queue queue = new Queue(QUEUE_INFORM_SMS); return queue; } //声明队列 @Bean(QUEUE_INFORM_EMAIL) public Queue QUEUE_INFORM_EMAIL() { Queue queue = new Queue(QUEUE_INFORM_EMAIL); return queue; } /** * channel.queueBind(INFORM_QUEUE_SMS,"inform_exchange_topic","inform.#.sms.#"); * 绑定队列到交换机 . * * @param queue the queue * @param exchange the exchange * @return the binding */ @Bean public Binding BINDING_QUEUE_INFORM_SMS(@Qualifier(QUEUE_INFORM_SMS) Queue queue, @Qualifier(EXCHANGE_TOPICS_INFORM) Exchange exchange) { return BindingBuilder.bind(queue).to(exchange).with("inform.#.sms.#").noargs(); } @Bean public Binding BINDING_QUEUE_INFORM_EMAIL(@Qualifier(QUEUE_INFORM_EMAIL) Queue queue, @Qualifier(EXCHANGE_TOPICS_INFORM) Exchange exchange) { return BindingBuilder.bind(queue).to(exchange).with("inform.#.email.#").noargs(); } }
生产者
- 使用RarbbitTemplate发送消息
-
SpProducer.java
package com.yawu.xiaowen.springboot.pcdemo; import com.yawu.xiaowen.springboot.config.RabbitConfig; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; /** * Springboot的生产者 * * @author yawu * @date 2019.07.02 */ @Component public class SpProducer { @Autowired RabbitTemplate rabbitTemplate; public void sendByTopics() { for (int i = 0; i < 5; i++) { String message = "sms email inform to user" + i; rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_TOPICS_INFORM, "inform.sms.email", message); System.out.println("发送的消息:'" + message + "'"); } } }
消费者
-
SpConsumer.java
package com.yawu.xiaowen.springboot.pcdemo; import com.rabbitmq.client.Channel; import com.yawu.xiaowen.springboot.config.RabbitConfig; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; /** * Springboot的消费者 * * @author yawu * @date 2019.07.02 */ @Component public class SpConsumer { // 监听email队列 @RabbitListener(queues = {RabbitConfig.QUEUE_INFORM_EMAIL}) public void receive_email(String msg, Message message, Channel channel) { System.out.println("接收到的email;" + msg); } // 监听sms队列 @RabbitListener(queues = {RabbitConfig.QUEUE_INFORM_SMS}) public void receive_sms(String msg, Message message, Channel channel) { System.out.println("接收到的sms;" + msg); } }
测试运行
-
XiaowenApplicationTests.java
package com.yawu.xiaowen; import com.yawu.xiaowen.springboot.pcdemo.SpConsumer; import com.yawu.xiaowen.springboot.pcdemo.SpProducer; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; /** * Springboot的启动测试类 * * @author yawu * @date 2019.07.02 */ @RunWith(SpringRunner.class) @SpringBootTest(classes = XiaowenApplication.class, value = "spring.profiles.active=boot") public class XiaowenApplicationTests { @Autowired SpProducer spProducer; @Autowired SpConsumer spConsumer; @Test public void testSendMsg() { spProducer.sendByTopics(); try { Thread.sleep(10 * 1000); } catch (InterruptedException e) { e.printStackTrace(); } } }