最近公司有一个需求
给用到RabbitMQ的项目添加一个调试页面:
页面的左半边进行发送消息,填写交换机、路由键以及消息体;
页面的右半边可以填写信息,来创建交换机、队列、路由键,并对该队列启动监听。
必须要做的几件事情:
1.手动创建交换机、队列以及路由键并进行绑定;
2.手动添加监听。
话不多说,直接上demo代码!!!
一、配置
pom.xml文件:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit-test</artifactId>
<scope>test</scope>
</dependency>
<!-- hutool -->
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>5.3.7</version>
</dependency>
<!-- lombok -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.12</version>
<scope>provided</scope>
</dependency>
application.yml
spring:
jmx:
enabled: false
profiles:
# dev:本地开发配置 test:测试服务器配置 prod:云平台服务器配置
active: xf
rabbitmq:
#http api地址
managementurl: 127.0.0.1:15672
addresses: 127.0.0.1:5672
username: guest
password: guest
virtual-host: /
## 消费端配置
listener:
simple:
concurrency: 5
## manual:手动ack确认
acknowledge-mode: manual
max-concurrency: 10
prefetch: 1
二、手动创建交换机、队列以及绑定关系
1.首先,读取yml文件中的配置,生成ConnectionFactory bean对象;
读取配置
@Component
@PropertySource("classpath:application.yml")
public class RabbitmqConstants {
//这个是用来做http请求的地址
@Value("${spring.rabbitmq.managementurl}")
public String managementurl;
@Value("${spring.rabbitmq.addresses}")
public String addresses;
@Value("${spring.rabbitmq.username}")
public String username;
@Value("${spring.rabbitmq.password}")
public String password;
@Value("${spring.rabbitmq.virtual-host}")
public String virtualHost;
public String getManagementurl() {
return managementurl;
}
public void setManagementurl(String managementurl) {
this.managementurl = managementurl;
}
public String getAddresses() {
return addresses;
}
public void setAddresses(String addresses) {
this.addresses = addresses;
}
public String getUsername() {
return username;
}
public void setUsername(String username) {
this.username = username;
}
public String getPassword() {
return password;
}
public void setPassword(String password) {
this.password = password;
}
public String getVirtualHost() {
return virtualHost;
}
public void setVirtualHost(String virtualHost) {
this.virtualHost = virtualHost;
}
}
2.其次,创建ConnectionFactory对象以及RabbitAdmin对象;
RabbitAdmin对象可用于创建交换机、队列以及绑定关系
@Configuration
public class RabbitmqConfig {
@Resource
private RabbitmqConstants rabbitmqConstants;
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory();
cachingConnectionFactory.setAddresses(rabbitmqConstants.addresses);
cachingConnectionFactory.setUsername(rabbitmqConstants.username);
cachingConnectionFactory.setPassword(rabbitmqConstants.password);
cachingConnectionFactory.setVirtualHost(rabbitmqConstants.virtualHost);
return cachingConnectionFactory;
}
@Bean
public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
//autoStartup 必须要设为 true ,否则Spring容器不会加载RabbitAdmin类
rabbitAdmin.setAutoStartup(true);
return rabbitAdmin;
}
}
3.利用RabbitAdmin对象创建交换机、队列以及绑定关系。
这里,我把相关的操作封装到service类
public interface RabbitmqApi {
/**
* 创建exchange交换机
* 注意事项:不能amq开头
*
* @param exchangeName 交换机名称
* @param durable 持久化:存入磁盘,重启还在
* @param autoDelete 自动删除:至少有一个queue或者exchange和该exchange绑定,否则自动删除
*/
void createExchange(String exchangeName, boolean durable, boolean autoDelete);
/**
* 删除exchange交换机
*
* @param exchangeName 交换机名称
*/
void deleteExchange(String exchangeName);
/**
* 创建queue队列
*
* @param queueName 队列名称
* @param durable 持久化
* @param exclusive 是否独占
* @param autoDelete 自动删除
*/
void createQueue(String queueName, boolean durable, boolean exclusive, boolean autoDelete);
/**
* 删除队列
*
* @param queueName 队列名称
*/
void deleteQueue(String queueName);
/**
* 绑定
*
* @param queueName 队列名称
* @param exchangeName 交换机名称
* @param routingKey 路由键
*/
void bind(String queueName, String exchangeName, String routingKey);
}
@Service
public class RabbitmqApiImpl implements RabbitmqApi {
@Resource
private RabbitAdmin rabbitAdmin;
@Override
public void createExchange(String exchangeName, boolean durable, boolean autoDelete) {
TopicExchange topicExchange = new TopicExchange(exchangeName, durable, autoDelete);
rabbitAdmin.declareExchange(topicExchange);
}
@Override
public void deleteExchange(String exchangeName) {
rabbitAdmin.deleteExchange(exchangeName);
}
@Override
public void createQueue(String queueName, boolean durable, boolean exclusive, boolean autoDelete) {
Queue queue = new Queue(queueName, durable, exclusive, autoDelete);
rabbitAdmin.declareQueue(queue);
}
@Override
public void deleteQueue(String queueName) {
rabbitAdmin.deleteQueue(queueName);
}
@Override
public void bind(String queueName, String exchangeName, String routingKey) {
rabbitAdmin.declareBinding(new Binding(queueName, Binding.DestinationType.QUEUE, exchangeName, routingKey, null));
}
}
三、创建自定义监听器
1.首先,创建一个监听器容器;
利用ConnectionFactory对象创建监听器容器
@Configuration
public class CustomListenerConfig {
@Resource
private RabbitmqConfig rabbitConfig;
@Resource
public CustomListenerHandler customListenerHandler;
@Bean
public SimpleMessageListenerContainer mqMessageContainer() throws AmqpException, IOException {
//生成 监听容器
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(rabbitConfig.connectionFactory());
//设置启动监听超时时间
container.setConsumerStartTimeout(3000L);
container.setExposeListenerChannel(true);
//设置确认模式 设置成自动偷偷懒~
container.setAcknowledgeMode(AcknowledgeMode.AUTO);
//监听处理类
container.setMessageListener(customListenerHandler);
return container;
}
@Bean
public void start() {
try {
mqMessageContainer().start();
} catch (IOException e) {
e.printStackTrace();
}
}
}
2.直接对容器操作,添加/删除队列监听。
这里同样,封装到service类。
public interface CustomListenerService {
/**
* 新增监听
* @param queueName
*/
void addNewListener(String queueName);
/**
* 删除监听
* @param queueName
* @return 返回 true 成功
*/
boolean removeListener(String queueName);
}
@Service
@Slf4j
@ComponentScan(basePackages={"cn.hutool.extra.spring"})
public class CustomListenerServiceImpl implements CustomListenerService {
@Override
public void addNewListener(String queueName) {
if(StrUtil.isEmpty(queueName)){
log.info("------------------添加失败,无效的queue!------------------");
}
SimpleMessageListenerContainer container = SpringUtil.getBean(SimpleMessageListenerContainer.class);
String[] queues = container.getQueueNames();
//没有监听的队列,直接添加
if(queues == null || queues.length == 0){
container.addQueueNames(queueName);
log.info("------------------添加队列监听成功------------------");
}else{
//string[]转list<String>
List<String> queueList = new ArrayList<>();
Collections.addAll(queueList, queues);
if(queueList.contains(queueName)){
log.info("------------------添加失败," + queueName + "该队列的监听者已存在!------------------");
}else{
container.addQueueNames(queueName);
log.info("------------------添加队列监听成功------------------");
}
}
}
@Override
public boolean removeListener(String queueName) {
if(StrUtil.isEmpty(queueName)){
return false;
}
SimpleMessageListenerContainer container = SpringUtil.getBean(SimpleMessageListenerContainer.class);
return container.removeQueueNames(queueName);
}
}
四、模拟测试
简单写一个定时任务,模拟测试一下~
定时任务代码:
@Component
@EnableScheduling
@Slf4j
public class TestTask {
@Resource
private RabbitTemplate rabbitTemplate;
@Resource
private RabbitmqApi rabbitmqApi;
@Resource
private CustomListenerService customListenerService;
/**
* 自定义监听测试
*/
@Scheduled(initialDelay = 2000, fixedDelay = 500000)
public void testAddMyListener() throws IOException {
//创建交换机 队列 路由
rabbitmqApi.createExchange("testexchange", false, false);
rabbitmqApi.createQueue("testqueue", false, true, false);
rabbitmqApi.bind("testqueue", "testexchange", "test.routingKey");
//添加监听MQTT及RabbitMQ 关键词
customListenerService.addNewListener("testqueue");
}
/**
* 消息发送测试
* @throws IOException
*/
@Scheduled(initialDelay = 5000, fixedDelay = 5000)
public void testlisten() throws IOException {
rabbitTemplate.convertAndSend("testexchange", "test.routingKey", "sendto:myTestListener");
}
/**
* 自定义监听测试
* @throws IOException
*/
@Scheduled(initialDelay = 20000, fixedDelay = 500000)
public void testRemoveListener() throws IOException {
//删除队列监听
boolean result = customListenerService.removeListener("testqueue");
if(result){
log.info("------------------删除队列监听成功------------------");
}else{
log.info("------------------删除队列监听失败------------------");
}
//删除队列、交换机
rabbitmqApi.deleteQueue("testqueue");
rabbitmqApi.deleteExchange("testexchange");
log.info("------------------删除队列、交换机成功------------------");
}
}
运行结果
萌新上路,有错误或可优化的地方,望留言!
demo代码示例:
https://github.com/sushizhendeqiang/springboot-rabbitmq-customerlistener