Spring 整合RabbitMQ实例

了解RabbitMQ

RabbitMQ使用场景

Spring 整合RabbitMQ

配置maven : pom.xml
<dependency>
      <groupId>com.rabbitmq</groupId>
      <artifactId>amqp-client</artifactId>
      <version>3.5.1</version>
    </dependency>
    <dependency>
        <groupId>org.springframework.amqp</groupId>
        <artifactId>spring-rabbit</artifactId>
        <version>1.4.5.RELEASE</version>
    </dependency>
1.配置连接信息 : rabbit.properties 文件
//连接信息
rabbit.vhost=/test  //虚拟机
rabbit.addresses=10.166.27.51:5672//服务器 端口
rabbit.username=test//用户名
rabbit.password=test//密码
channel.cache.size=50
//exchange 交换机配置名称
exchange.direct=test//交换机名
//配置队列名
queue.sync_nc=newretail.queue.sync_nc
queue.sync_nc_error=newretail.queue.sync_nc_error
queue.reply=queue.reply
//队列交换机的路由键
route.sync_nc=route.queue.sync_nc
route.reply=route.reply
route.sync_nc_error=route.queue.sync_nc_error
2. 依赖注入 : spring-rabbit.xml
rabbit 命名空间包含了多个元素,帮助我们声明队列、Exchange 以及将它们结合在一起的 binding
元素 作用
<queue> 创建一个队列
<fanout-exchange> 创建一个 fanout 类型的 Exchange
<header-exchange> 创建一个 header 类型的 Exchange
<topic-exchange> 创建一个 topic 类型的 Exchange
<direct-exchange> 创建一个 direct 类型的 Exchange
<bindings><binding></bindings> 元素定义一个或多个元素的集合。元素创建 Exchange 和队列之间的 binding
这些配置元素要与 <admin> 元素一起使用。
<admin> 元素会创建一个 RabbitMQ 管理组件(administrative component),
它会自动创建 (如果它们在 RabbitMQ 代理中尚未存在的话)上述这些元素所声明的队列、Exchange 以及 binding。
<?xml version="1.0" encoding="UTF-8" ?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
    xmlns:rabbit="http://www.springframework.org/schema/rabbit"
    xsi:schemaLocation="http://www.springframework.org/schema/beans
        http://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.springframework.org/schema/context
        http://www.springframework.org/schema/context/spring-context.xsd
        http://www.springframework.org/schema/rabbit
        http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">

      <!--引用ExecutorService或ThreadPoolTask​​Executor(由<task:executor />元素定义)。创建连接时传递给Rabbit库。如果未提供,Rabbit库当前使用具有5个线程的固定线程池ExecutorService-->
        <task:executor id="poolTaskExecutor"
                   pool-size="10"
                   queue-capacity="10"
                   keep-alive="20"
                   rejection-policy="DISCARD_OLDEST"/>

    <!-- RabbitMQ 连接工厂公共配置 连接服务-->
    <rabbit:connection-factory id="rabbitConnectionFactory"
        addresses="${rabbit.addresses}" 
        virtual-host="${rabbit.vhost}"
        username="${rabbit.username}" 
        password="${rabbit.password}"
<!--上面配置的线程池 一般不需配置默认 即可-->
                executor="poolTaskExecutor"
<!-- channel-cache-size,channel的缓存数量,默认值为25-->
        channel-cache-size="${channel.cache.size}" 
<!-- 设置此属性配置可以确保消息成功发送到交换器-->
        publisher-confirms="true"
<!-- 可以确保消息在未被队列接收时返回-->
        publisher-returns="true"    
                <!-- cache-mode 一般不需要配置,缓存连接模式,
默认值为CHANNEL(单个connection连接,连接之后关闭,自动销毁) -->
                cache-mode="CHANNEL" />
<!-- <admin> 元素会创建一个 RabbitMQ 管理组件(administrative component)-->
    <rabbit:admin connection-factory="rabbitConnectionFactory" />
    <!-- 队列配置 -->
<!--定义消息队列,durable:是否持久化,
如果想在RabbitMQ退出或崩溃的时候,不会失去所有的queue和消息,
需要同时标志队列(queue)和交换机(exchange)是持久化的,即rabbit:queue标签和rabbit:direct-exchange中的durable=true,
而消息(message)默认是持久化的可以看类org.springframework.amqp.core.MessageProperties中的属性
public static final MessageDeliveryMode DEFAULT_DELIVERY_MODE = MessageDeliveryMode.PERSISTENT;
exclusive: 仅创建者可以使用的私有队列,断开后自动删除;
auto_delete: 当所有消费客户端连接断开后,是否自动删除队列 -->
    <rabbit:queue name="${queue.sync_nc}" durable="true" auto-delete="false" exclusive="false">
        <rabbit:queue-arguments>
            <entry key="x-max-priority">
                <value type="java.lang.Integer">10</value> 
<!-- 设定队列支持的最大优先级:rabbit3.5以上支持,3.5以下 需要安装插件 -->
            </entry>  
        </rabbit:queue-arguments>
    </rabbit:queue>
<rabbit:queue name="${queue.reply}" durable="true" auto-delete="false" exclusive="false" />
    <!-- 交换机配置 -->
<!--绑定队列,rabbitmq的exchangeType常用的三种模式:direct,fanout,topic三种,
我们用direct模式,即rabbit:direct-exchange标签,
Direct交换器很简单,如果是Direct类型,就会将消息中的RoutingKey与该Exchange关联的所有Binding中的BindingKey进行比较,如果相等,则发送到该Binding对应的Queue中。有一个需要注意的地方:如果找不到指定的exchange,就会报错。但routing key找不到的话,不会报错,这条消息会直接丢失,所以此处要小心,
auto-delete:自动删除,如果为Yes,则该交换机所有队列queue删除后,自动删除交换机,默认为false -->
    <rabbit:direct-exchange name="${exchange.direct}" durable="true" auto-delete="false">
        <rabbit:bindings>
            <rabbit:binding queue="${queue.sync_nc}" key="${route.sync_nc}" />
                <rabbit:binding queue="${queue.reply}" key="${route.reply}" />
        </rabbit:bindings>
    </rabbit:direct-exchange>
    <!-- 生产者配置 -->
      <!--实例化两个ben处理消息到达交换机,和消息进入队列的节点-->
    <bean id="messageConfirm" class="com.sjky.platform.common.rabbit.MessageConfirm" />
    <bean id="messageReturn" class="com.sjky.platform.common.rabbit.MessageReturn" />
        <!--spring 为amqp默认的是jackson的一个插件生产者生产的数据转换为json存入消息队列-->
    <bean id="messageConverter"
           class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter">
    </bean>
    <rabbit:template id="rabbitTemplate"
        connection-factory="rabbitConnectionFactory" //连接工厂
        message-converter="messageConverter"
        exchange="${exchange.direct}" 
        reply-timeout="2000"   //发送和接收操作的超时时间(以毫秒为单位)。默认值为5000(5秒)
        retry-template="retryTemplate" 
        mandatory="true"
<!--引用上面实体,消息到达交换机时被调用-->
        confirm-callback="messageConfirm"
<!--应用上面定义的实体,消息无法到达队列是使用-->
        return-callback="messageReturn"
    />
    <!-- retryTemplate为连接失败时的重试模板 -->
    <bean id="retryTemplate" class="org.springframework.retry.support.RetryTemplate">
        <property name="backOffPolicy">
            <bean class="org.springframework.retry.backoff.ExponentialBackOffPolicy">
                <property name="initialInterval" value="5000" />
                <property name="multiplier" value="10.0" />
                <property name="maxInterval" value="10000" />
            </bean>
        </property>
        <property name="retryPolicy">
            <bean class="org.springframework.retry.policy.SimpleRetryPolicy">
                <property name="maxAttempts" value="3"/>
            </bean>
        </property>
    </bean>
    <!--在任何Spring管理的对象上启用对@RabbitListener批注的检测-->
    <rabbit:annotation-driven />
<!-- 消费者配置AUTO方式 -->
    <bean id="rabbitListenerContainerFactory"
        class="org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory">
        <property name="messageConverter" ref="messageConverter" />
        <property name="connectionFactory" ref="rabbitConnectionFactory" />
        <property name="concurrentConsumers" value="9" />
        <property name="maxConcurrentConsumers" value="50" />
        <property name="taskExecutor" ref="taskExecutor" />
        <property name="prefetchCount" value="3" /> <!-- 每次1个 -->
    <!--选项: NONE,MANUAL,AUTO 默认:AUTO,当为MANUAL时必须调用Channel.basicAck()来手动应答所有消息 -->
        <property name="acknowledgeMode" value="AUTO" />
        <property name="errorHandler" ref="mqErrorHandler" /><!-- 引用注册的实体,处理未捕获异常 -->
    </bean>
      <!--处理未捕获异常-->
    <bean id="mqErrorHandler" class="com.sjky.platform.common.rabbit.MQErrorHandler" />
    <!-- 消费者配置MANUAL方式 -->
    <!-- <bean id="rabbitListenerContainerFactory"
        class="org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory">
        <property name="messageConverter" ref="messageConverter" />
        <property name="connectionFactory" ref="rabbitConnectionFactory" />
        <property name="concurrentConsumers" value="1" />
        <property name="maxConcurrentConsumers" value="100" />
        <property name="taskExecutor" ref="taskExecutor" />
        <property name="prefetchCount" value="1" /> 
        选项: NONE,MANUAL,AUTO 默认:AUTO,当为MANUAL时必须调用Channel.basicAck()来手动应答所有消息
        <property name="acknowledgeMode" value="MANUAL" />
        <property name="errorHandler" ref="mqErrorHandler" /> 处理未捕获异常
        <property name="adviceChain" ref="retryOperationsInterceptorFactoryBean" />
    </bean> 
    <bean id="mqMessageRecover" class="com.sjky.platform.common.rabbit.MQMsgRecover"/>
    -->
    <!-- 实现异常事件处理逻辑 -->
    <!-- <bean id="retryOperationsInterceptorFactoryBean"
        class="org.springframework.amqp.rabbit.config.StatelessRetryOperationsInterceptorFactoryBean">
        <property name="messageRecoverer" ref="mqMessageRecover" />配置消息恢复者
        <property name="retryOperations" ref="retryTemplate" />配置重试模板
    </bean> -->
</beans>
3. 注入时需要的Rabbit工具类
/**
 * 消息到达交换机时被调用
 * ack=true 成功 ,ack=false 失败
*/

public class MessageConfirm implements ConfirmCallback {

    private static final Logger logger = LoggerFactory.getLogger(MessageConfirm.class);
    
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {

        if (!ack) {
            logger.error("消息无法到达交换机[ack: " + ack + ",correlationData: " + correlationData + ",cause : " + cause+"].");
        }else {
            logger.info("消息到达交换机[ack: " + ack + ",correlationData: " + correlationData + ",cause : " + cause+"].");
        }
    }

}

/**
 * Exchange无法将消息路由到任何队列时会被调用
*/
public class MessageReturn implements ReturnCallback {
    
    private static final Logger logger = LoggerFactory.getLogger(MessageReturn.class);
    
    @Autowired
    private RabbitTemplate rabbitTemplate;
    
    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        //将消息发送到错误队列
        RepublishMessageRecoverer recoverer = new RepublishMessageRecoverer(rabbitTemplate,RabbitConfig.getDirectExchange(), RabbitConfig.getErrorRoute());
        Exception cause = new Exception(new Exception("route fail and republish"));
        recoverer.recover(message,cause);
        logger.error("消息无法到达队列[Returned Message: " + replyText + ",code: " + replyCode + ",exchange: " + exchange + ",routingKey :" + routingKey+"].");
    }

}
/**
 * 消费时  消费失败 抛出异常时 调用
*/
public class MQErrorHandler implements ErrorHandler {
    
    private static final Logger logger = LoggerFactory.getLogger(MQErrorHandler.class);

    @Override
    public void handleError(Throwable cause) {
        logger.error("一个错误发生了:", cause);
    }
    
}
/**
 * acknowledgeMode=MANUL时的队列监听出现异常时消息恢复到队列
 * 注:必须调用channel.basicAck()确认回执
 *  <!-- 消费者配置示例 -->
    <bean id="rabbitListenerContainerFactory"
        class="org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory">
        <property name="messageConverter" ref="messageConverter" />
        <property name="connectionFactory" ref="rabbitConnectionFactory" />
        <property name="concurrentConsumers" value="1" />
        <property name="maxConcurrentConsumers" value="100" />
        <property name="taskExecutor" ref="taskExecutor" />
        <property name="prefetchCount" value="1" /> 
        <!--选项: NONE,MANUAL,AUTO 默认:AUTO,当为MANUAL时必须调用Channel.basicAck()来手动应答所有消息 -->
        <property name="acknowledgeMode" value="MANUAL" />
        <property name="errorHandler" ref="mqErrorHandler" /> <!-- 处理未捕获异常 -->
        <property name="adviceChain" ref="retryOperationsInterceptorFactoryBean" />
    </bean>
    <bean id="mqErrorHandler" class="com.sjky.platform.common.rabbit.MQErrorHandler" />
    <bean id="mqMessageRecover" class="com.sjky.platform.common.rabbit.MQMsgRecover"/>
    <!-- 实现异常事件处理逻辑 -->
    <bean id="retryOperationsInterceptorFactoryBean"
        class="org.springframework.amqp.rabbit.config.StatelessRetryOperationsInterceptorFactoryBean">
        <property name="messageRecoverer" ref="mqMessageRecover" />
        <property name="retryOperations" ref="retryTemplate" />
    </bean>
 *
 */
public class MQMessageRecover implements MessageRecoverer {

    private static final Logger logger = LoggerFactory.getLogger(MQMessageRecover.class);
    
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Autowired
    private Jackson2JsonMessageConverter msgConverter;
    
    @Override
    public void recover(Message message, Throwable cause) {
        String data=msgConverter.fromMessage(message).toString();
        MessageProperties messageProperties=message.getMessageProperties();
        Map<String, Object> headers = messageProperties.getHeaders();
        headers.put("x-exception-stacktrace", getStackTraceAsString(cause));
        headers.put("x-exception-message", cause.getCause() != null ? cause.getCause().getMessage() : cause.getMessage());
        headers.put("x-original-exchange", message.getMessageProperties().getReceivedExchange());
        headers.put("x-original-routingKey", message.getMessageProperties().getReceivedRoutingKey());
        messageProperties.setReceivedDeliveryMode(MessageDeliveryMode.PERSISTENT);
        //重新将数据放回队列中
        rabbitTemplate.send(messageProperties.getReceivedExchange(), messageProperties.getReceivedRoutingKey(), message);
        logger.error("处理消息(" + data + ") 错误, 重新发布去队列.", cause);
    }
    
    private String getStackTraceAsString(Throwable cause) {
        StringWriter stringWriter = new StringWriter();
        PrintWriter printWriter = new PrintWriter(stringWriter, true);
        cause.printStackTrace(printWriter);
        return stringWriter.getBuffer().toString();
    }
4. 将消息添加到队列
/**
 * @description: 添加请求到队列
 * @author: xhh
 * @create: 2020-10-22 14:24
 **/
@Controller
@RequestMapping("/nc/***/")
public class RabbirController extends BaseController {

    @PostMapping("addOrderToQueue")
    @ResponseBody
    public ResultData addOrderToQueue(@RequestParam String orderCode, HttpServletRequest request){
        ResultData resultData = new ResultData();
        resultData.setResult(true);
        resultData.setMessage("加入队列成功!");
        logger.info("=================系统请求将订单code:{}放入队列=====================",orderCode);
        try {
            //判断订单是否加入过队列
            String userCode = WebUtils.getLoginUserCode(request);
            HashMap<String, String> stringStringHashMap = new HashMap<>(2);
            stringStringHashMap.put("code",orderCode);
            stringStringHashMap.put("createUserCode",userCode);
            //将订单号发送到指定交换机的队列 (交换机,陆游键,消息内容)
rabbitTemplate.convertAndSend(RabbitConfig.getDirectExchange(),RabbitConfig.getSyncNcRoute(),stringStringHashMap);
          } catch (Exception e) {
            resultData.setResult(false);
            resultData.setMessage(e.getMessage());
            e.printStackTrace();
        }
        return resultData;
    }
}
5. 监听 消费队列
/**
 * @description: 监听同步nc的队列
 * @author: xhh
 * @create: 2020-10-23 16:36
 **/
@Service
public class ToNcQueueListener {
    private final Logger logger = LoggerFactory.getLogger(this.getClass());


    /*****
     * @Description: 监听同步NC的队列
     * @Param: [message]
     * @return: void
     * @Author: xh
     * @Date: 2020/10/27 16:32
     */
    @RabbitListener(queues = "${queue.sync_nc}")//此注解 监听的队列名称
    public void processMsg(Message message) throws Exception {
        String data = new String(message.getBody());
        JSONObject jsonObject = JSONObject.parseObject(data);
        //订单号
        String orderCode = jsonObject.getString("code");
        //登录员工code
        String userCode = jsonObject.getString("createUserCode");
        logger.info("======================监听到订单信息:{}=============================", data);
    }
}

浪客行1213的简书


xhh
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,772评论 6 477
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,458评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,610评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,640评论 1 276
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,657评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,590评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,962评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,631评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,870评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,611评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,704评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,386评论 4 319
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,969评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,944评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,179评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 44,742评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,440评论 2 342

推荐阅读更多精彩内容