Spring 整合RabbitMQ
配置maven : pom.xml
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>3.5.1</version>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>1.4.5.RELEASE</version>
</dependency>
1.配置连接信息 : rabbit.properties 文件
//连接信息
rabbit.vhost=/test //虚拟机
rabbit.addresses=10.166.27.51:5672//服务器 端口
rabbit.username=test//用户名
rabbit.password=test//密码
channel.cache.size=50
//exchange 交换机配置名称
exchange.direct=test//交换机名
//配置队列名
queue.sync_nc=newretail.queue.sync_nc
queue.sync_nc_error=newretail.queue.sync_nc_error
queue.reply=queue.reply
//队列交换机的路由键
route.sync_nc=route.queue.sync_nc
route.reply=route.reply
route.sync_nc_error=route.queue.sync_nc_error
2. 依赖注入 : spring-rabbit.xml
rabbit 命名空间包含了多个元素,帮助我们声明队列、Exchange 以及将它们结合在一起的 binding
元素 |
作用 |
<queue> |
创建一个队列 |
<fanout-exchange> |
创建一个 fanout 类型的 Exchange |
<header-exchange> |
创建一个 header 类型的 Exchange |
<topic-exchange> |
创建一个 topic 类型的 Exchange |
<direct-exchange> |
创建一个 direct 类型的 Exchange |
<bindings><binding></bindings> |
元素定义一个或多个元素的集合。元素创建 Exchange 和队列之间的 binding |
这些配置元素要与 <admin> 元素一起使用。
<admin> 元素会创建一个 RabbitMQ 管理组件(administrative component),
它会自动创建 (如果它们在 RabbitMQ 代理中尚未存在的话)上述这些元素所声明的队列、Exchange 以及 binding。
<?xml version="1.0" encoding="UTF-8" ?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
<!--引用ExecutorService或ThreadPoolTaskExecutor(由<task:executor />元素定义)。创建连接时传递给Rabbit库。如果未提供,Rabbit库当前使用具有5个线程的固定线程池ExecutorService-->
<task:executor id="poolTaskExecutor"
pool-size="10"
queue-capacity="10"
keep-alive="20"
rejection-policy="DISCARD_OLDEST"/>
<!-- RabbitMQ 连接工厂公共配置 连接服务-->
<rabbit:connection-factory id="rabbitConnectionFactory"
addresses="${rabbit.addresses}"
virtual-host="${rabbit.vhost}"
username="${rabbit.username}"
password="${rabbit.password}"
<!--上面配置的线程池 一般不需配置默认 即可-->
executor="poolTaskExecutor"
<!-- channel-cache-size,channel的缓存数量,默认值为25-->
channel-cache-size="${channel.cache.size}"
<!-- 设置此属性配置可以确保消息成功发送到交换器-->
publisher-confirms="true"
<!-- 可以确保消息在未被队列接收时返回-->
publisher-returns="true"
<!-- cache-mode 一般不需要配置,缓存连接模式,
默认值为CHANNEL(单个connection连接,连接之后关闭,自动销毁) -->
cache-mode="CHANNEL" />
<!-- <admin> 元素会创建一个 RabbitMQ 管理组件(administrative component)-->
<rabbit:admin connection-factory="rabbitConnectionFactory" />
<!-- 队列配置 -->
<!--定义消息队列,durable:是否持久化,
如果想在RabbitMQ退出或崩溃的时候,不会失去所有的queue和消息,
需要同时标志队列(queue)和交换机(exchange)是持久化的,即rabbit:queue标签和rabbit:direct-exchange中的durable=true,
而消息(message)默认是持久化的可以看类org.springframework.amqp.core.MessageProperties中的属性
public static final MessageDeliveryMode DEFAULT_DELIVERY_MODE = MessageDeliveryMode.PERSISTENT;
exclusive: 仅创建者可以使用的私有队列,断开后自动删除;
auto_delete: 当所有消费客户端连接断开后,是否自动删除队列 -->
<rabbit:queue name="${queue.sync_nc}" durable="true" auto-delete="false" exclusive="false">
<rabbit:queue-arguments>
<entry key="x-max-priority">
<value type="java.lang.Integer">10</value>
<!-- 设定队列支持的最大优先级:rabbit3.5以上支持,3.5以下 需要安装插件 -->
</entry>
</rabbit:queue-arguments>
</rabbit:queue>
<rabbit:queue name="${queue.reply}" durable="true" auto-delete="false" exclusive="false" />
<!-- 交换机配置 -->
<!--绑定队列,rabbitmq的exchangeType常用的三种模式:direct,fanout,topic三种,
我们用direct模式,即rabbit:direct-exchange标签,
Direct交换器很简单,如果是Direct类型,就会将消息中的RoutingKey与该Exchange关联的所有Binding中的BindingKey进行比较,如果相等,则发送到该Binding对应的Queue中。有一个需要注意的地方:如果找不到指定的exchange,就会报错。但routing key找不到的话,不会报错,这条消息会直接丢失,所以此处要小心,
auto-delete:自动删除,如果为Yes,则该交换机所有队列queue删除后,自动删除交换机,默认为false -->
<rabbit:direct-exchange name="${exchange.direct}" durable="true" auto-delete="false">
<rabbit:bindings>
<rabbit:binding queue="${queue.sync_nc}" key="${route.sync_nc}" />
<rabbit:binding queue="${queue.reply}" key="${route.reply}" />
</rabbit:bindings>
</rabbit:direct-exchange>
<!-- 生产者配置 -->
<!--实例化两个ben处理消息到达交换机,和消息进入队列的节点-->
<bean id="messageConfirm" class="com.sjky.platform.common.rabbit.MessageConfirm" />
<bean id="messageReturn" class="com.sjky.platform.common.rabbit.MessageReturn" />
<!--spring 为amqp默认的是jackson的一个插件生产者生产的数据转换为json存入消息队列-->
<bean id="messageConverter"
class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter">
</bean>
<rabbit:template id="rabbitTemplate"
connection-factory="rabbitConnectionFactory" //连接工厂
message-converter="messageConverter"
exchange="${exchange.direct}"
reply-timeout="2000" //发送和接收操作的超时时间(以毫秒为单位)。默认值为5000(5秒)
retry-template="retryTemplate"
mandatory="true"
<!--引用上面实体,消息到达交换机时被调用-->
confirm-callback="messageConfirm"
<!--应用上面定义的实体,消息无法到达队列是使用-->
return-callback="messageReturn"
/>
<!-- retryTemplate为连接失败时的重试模板 -->
<bean id="retryTemplate" class="org.springframework.retry.support.RetryTemplate">
<property name="backOffPolicy">
<bean class="org.springframework.retry.backoff.ExponentialBackOffPolicy">
<property name="initialInterval" value="5000" />
<property name="multiplier" value="10.0" />
<property name="maxInterval" value="10000" />
</bean>
</property>
<property name="retryPolicy">
<bean class="org.springframework.retry.policy.SimpleRetryPolicy">
<property name="maxAttempts" value="3"/>
</bean>
</property>
</bean>
<!--在任何Spring管理的对象上启用对@RabbitListener批注的检测-->
<rabbit:annotation-driven />
<!-- 消费者配置AUTO方式 -->
<bean id="rabbitListenerContainerFactory"
class="org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory">
<property name="messageConverter" ref="messageConverter" />
<property name="connectionFactory" ref="rabbitConnectionFactory" />
<property name="concurrentConsumers" value="9" />
<property name="maxConcurrentConsumers" value="50" />
<property name="taskExecutor" ref="taskExecutor" />
<property name="prefetchCount" value="3" /> <!-- 每次1个 -->
<!--选项: NONE,MANUAL,AUTO 默认:AUTO,当为MANUAL时必须调用Channel.basicAck()来手动应答所有消息 -->
<property name="acknowledgeMode" value="AUTO" />
<property name="errorHandler" ref="mqErrorHandler" /><!-- 引用注册的实体,处理未捕获异常 -->
</bean>
<!--处理未捕获异常-->
<bean id="mqErrorHandler" class="com.sjky.platform.common.rabbit.MQErrorHandler" />
<!-- 消费者配置MANUAL方式 -->
<!-- <bean id="rabbitListenerContainerFactory"
class="org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory">
<property name="messageConverter" ref="messageConverter" />
<property name="connectionFactory" ref="rabbitConnectionFactory" />
<property name="concurrentConsumers" value="1" />
<property name="maxConcurrentConsumers" value="100" />
<property name="taskExecutor" ref="taskExecutor" />
<property name="prefetchCount" value="1" />
选项: NONE,MANUAL,AUTO 默认:AUTO,当为MANUAL时必须调用Channel.basicAck()来手动应答所有消息
<property name="acknowledgeMode" value="MANUAL" />
<property name="errorHandler" ref="mqErrorHandler" /> 处理未捕获异常
<property name="adviceChain" ref="retryOperationsInterceptorFactoryBean" />
</bean>
<bean id="mqMessageRecover" class="com.sjky.platform.common.rabbit.MQMsgRecover"/>
-->
<!-- 实现异常事件处理逻辑 -->
<!-- <bean id="retryOperationsInterceptorFactoryBean"
class="org.springframework.amqp.rabbit.config.StatelessRetryOperationsInterceptorFactoryBean">
<property name="messageRecoverer" ref="mqMessageRecover" />配置消息恢复者
<property name="retryOperations" ref="retryTemplate" />配置重试模板
</bean> -->
</beans>
3. 注入时需要的Rabbit工具类
/**
* 消息到达交换机时被调用
* ack=true 成功 ,ack=false 失败
*/
public class MessageConfirm implements ConfirmCallback {
private static final Logger logger = LoggerFactory.getLogger(MessageConfirm.class);
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (!ack) {
logger.error("消息无法到达交换机[ack: " + ack + ",correlationData: " + correlationData + ",cause : " + cause+"].");
}else {
logger.info("消息到达交换机[ack: " + ack + ",correlationData: " + correlationData + ",cause : " + cause+"].");
}
}
}
/**
* Exchange无法将消息路由到任何队列时会被调用
*/
public class MessageReturn implements ReturnCallback {
private static final Logger logger = LoggerFactory.getLogger(MessageReturn.class);
@Autowired
private RabbitTemplate rabbitTemplate;
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
//将消息发送到错误队列
RepublishMessageRecoverer recoverer = new RepublishMessageRecoverer(rabbitTemplate,RabbitConfig.getDirectExchange(), RabbitConfig.getErrorRoute());
Exception cause = new Exception(new Exception("route fail and republish"));
recoverer.recover(message,cause);
logger.error("消息无法到达队列[Returned Message: " + replyText + ",code: " + replyCode + ",exchange: " + exchange + ",routingKey :" + routingKey+"].");
}
}
/**
* 消费时 消费失败 抛出异常时 调用
*/
public class MQErrorHandler implements ErrorHandler {
private static final Logger logger = LoggerFactory.getLogger(MQErrorHandler.class);
@Override
public void handleError(Throwable cause) {
logger.error("一个错误发生了:", cause);
}
}
/**
* acknowledgeMode=MANUL时的队列监听出现异常时消息恢复到队列
* 注:必须调用channel.basicAck()确认回执
* <!-- 消费者配置示例 -->
<bean id="rabbitListenerContainerFactory"
class="org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory">
<property name="messageConverter" ref="messageConverter" />
<property name="connectionFactory" ref="rabbitConnectionFactory" />
<property name="concurrentConsumers" value="1" />
<property name="maxConcurrentConsumers" value="100" />
<property name="taskExecutor" ref="taskExecutor" />
<property name="prefetchCount" value="1" />
<!--选项: NONE,MANUAL,AUTO 默认:AUTO,当为MANUAL时必须调用Channel.basicAck()来手动应答所有消息 -->
<property name="acknowledgeMode" value="MANUAL" />
<property name="errorHandler" ref="mqErrorHandler" /> <!-- 处理未捕获异常 -->
<property name="adviceChain" ref="retryOperationsInterceptorFactoryBean" />
</bean>
<bean id="mqErrorHandler" class="com.sjky.platform.common.rabbit.MQErrorHandler" />
<bean id="mqMessageRecover" class="com.sjky.platform.common.rabbit.MQMsgRecover"/>
<!-- 实现异常事件处理逻辑 -->
<bean id="retryOperationsInterceptorFactoryBean"
class="org.springframework.amqp.rabbit.config.StatelessRetryOperationsInterceptorFactoryBean">
<property name="messageRecoverer" ref="mqMessageRecover" />
<property name="retryOperations" ref="retryTemplate" />
</bean>
*
*/
public class MQMessageRecover implements MessageRecoverer {
private static final Logger logger = LoggerFactory.getLogger(MQMessageRecover.class);
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private Jackson2JsonMessageConverter msgConverter;
@Override
public void recover(Message message, Throwable cause) {
String data=msgConverter.fromMessage(message).toString();
MessageProperties messageProperties=message.getMessageProperties();
Map<String, Object> headers = messageProperties.getHeaders();
headers.put("x-exception-stacktrace", getStackTraceAsString(cause));
headers.put("x-exception-message", cause.getCause() != null ? cause.getCause().getMessage() : cause.getMessage());
headers.put("x-original-exchange", message.getMessageProperties().getReceivedExchange());
headers.put("x-original-routingKey", message.getMessageProperties().getReceivedRoutingKey());
messageProperties.setReceivedDeliveryMode(MessageDeliveryMode.PERSISTENT);
//重新将数据放回队列中
rabbitTemplate.send(messageProperties.getReceivedExchange(), messageProperties.getReceivedRoutingKey(), message);
logger.error("处理消息(" + data + ") 错误, 重新发布去队列.", cause);
}
private String getStackTraceAsString(Throwable cause) {
StringWriter stringWriter = new StringWriter();
PrintWriter printWriter = new PrintWriter(stringWriter, true);
cause.printStackTrace(printWriter);
return stringWriter.getBuffer().toString();
}
4. 将消息添加到队列
/**
* @description: 添加请求到队列
* @author: xhh
* @create: 2020-10-22 14:24
**/
@Controller
@RequestMapping("/nc/***/")
public class RabbirController extends BaseController {
@PostMapping("addOrderToQueue")
@ResponseBody
public ResultData addOrderToQueue(@RequestParam String orderCode, HttpServletRequest request){
ResultData resultData = new ResultData();
resultData.setResult(true);
resultData.setMessage("加入队列成功!");
logger.info("=================系统请求将订单code:{}放入队列=====================",orderCode);
try {
//判断订单是否加入过队列
String userCode = WebUtils.getLoginUserCode(request);
HashMap<String, String> stringStringHashMap = new HashMap<>(2);
stringStringHashMap.put("code",orderCode);
stringStringHashMap.put("createUserCode",userCode);
//将订单号发送到指定交换机的队列 (交换机,陆游键,消息内容)
rabbitTemplate.convertAndSend(RabbitConfig.getDirectExchange(),RabbitConfig.getSyncNcRoute(),stringStringHashMap);
} catch (Exception e) {
resultData.setResult(false);
resultData.setMessage(e.getMessage());
e.printStackTrace();
}
return resultData;
}
}
5. 监听 消费队列
/**
* @description: 监听同步nc的队列
* @author: xhh
* @create: 2020-10-23 16:36
**/
@Service
public class ToNcQueueListener {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
/*****
* @Description: 监听同步NC的队列
* @Param: [message]
* @return: void
* @Author: xh
* @Date: 2020/10/27 16:32
*/
@RabbitListener(queues = "${queue.sync_nc}")//此注解 监听的队列名称
public void processMsg(Message message) throws Exception {
String data = new String(message.getBody());
JSONObject jsonObject = JSONObject.parseObject(data);
//订单号
String orderCode = jsonObject.getString("code");
//登录员工code
String userCode = jsonObject.getString("createUserCode");
logger.info("======================监听到订单信息:{}=============================", data);
}
}