消息消费失败处理方式:
一 进入死信队列(进入死信的三种方式)
1.消息被拒绝(basic.reject or basic.nack)并且requeue=false
2.消息TTL过期过期时间
3.队列达到最大长度
DLX也是一下正常的Exchange同一般的Exchange没有区别,它能在任何的队列上被指定,实际上就是设置某个队列的属性,当这个队列中有死信时,RabbitMQ就会自动的将这个消息重新发布到设置的Exchange中去,进而被路由到另一个队列, publish可以监听这个队列中消息做相应的处理, 这个特性可以弥补R abbitMQ 3.0.0以前支持的immediate参数中的向publish确认的功能。
rabbitmq的三种模式:
一. Fanout Exchange 广播
所有发送到Fanout Exchange的消息都会被转发到与该Exchange 绑定(Binding)的所有Queue上。Fanout Exchange 不需要处理RouteKey 。只需要简单的将队列绑定到exchange 上。这样发送到exchange的消息都会被转发到与该交换机绑定的所有队列上。类似子网广播,每台子网内的主机都获得了一份复制的消息。所以,Fanout Exchange 转发消息是最快的。
二. Direct Exchange 点对点
所有发送到Direct Exchange的消息被转发到RouteKey中指定的Queue。Direct模式,可以使用rabbitMQ自带的Exchange:default Exchange 。所以不需要将Exchange进行任何绑定(binding)操作 。消息传递时,RouteKey必须完全匹配,才会被队列接收,否则该消息会被抛弃。
三. Topic Exchange 模糊匹配
所有发送到Topic Exchange的消息被转发到所有关心RouteKey中指定Topic的Queue上,Exchange 将RouteKey 和某Topic 进行模糊匹配。此时队列需要绑定一个Topic。可以使用通配符进行模糊匹配,符号“#”匹配一个或多个词,符号“*”匹配不多不少一个词。因此“log.#”能够匹配到“log.info.oa”,但是“log.*” 只会匹配到“log.error”。所以,Topic Exchange 使用非常灵活。
mq也支持重发机制:
rabbitmq的消息确认机制分两部分一部分是生产端,一部分是消费端生产端有两种选择,transaction 和 confirm。confirm 的性能要好于transaction
//transaction 机制
channel.txSelect();
String msg ="msg test !!!";
for(inti=0;i<10000;i++){
msg = i+" : msg test !!!";
channel.basicPublish(EXCHAGE, QUEUE_NAME,null,msg.getBytes());
System.out.println("publish msg "+msg);
if(i>0&&i%100==0){
//批量提交
channel.txCommit();
}
}// 若出现异常 进行 channel.txRollback(),对相应批次的msg进行重发或记录
channel.txCommit();
生产者配置:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
<rabbit:connection-factoryid="connectionFactory"
host="10.153.25.15"username="insurance"password="insurance"port="5672"/>
<rabbit:adminconnection-factory="connectionFactory"/>
<rabbit:queueid="queue_insurance"durable="true"auto-delete="false"
exclusive="false"name="queue_insurance">正常队列当中指向死信
<rabbit:queue-arguments>
<entrykey="x-message-ttl">设置超时
<valuetype="java.lang.Long">30000
</entry>
<entrykey="x-dead-letter-exchange">指定交换机
<valuetype="java.lang.String">alter</value>
</entry>
</rabbit:queue-arguments>
</rabbit:queue>
<rabbit:queueid="alter_queue"durable="true"auto-delete="false"exclusive="false"name="alter_queue"/>死信队列
<rabbit:direct-exchangename="alter"
durable="true"auto-delete="false"id="alter">死信交换机
<rabbit:bindings>
<rabbit:bindingqueue="alter_queue"key="queue_key_insurance"/>
</rabbit:bindings>
</rabbit:direct-exchange>
<rabbit:direct-exchangename="exchange_insurance"
durable="true"auto-delete="false"id="exchange_insurance">正常交换机
<rabbit:bindings>
<rabbit:bindingqueue="queue_insurance"key="queue_key_insurance"/>
</rabbit:bindings>
</rabbit:direct-exchange>
<!-- (5)客户端投递消息到exchange。 -->
<rabbit:templateid="amqpTemplate"exchange="exchange_insurance"
connection-factory="connectionFactory"/>
</beans>
消费者配置:
<?xml version="1.0"encoding="UTF-8"?>
<beansxmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
<!-- 连接服务配置 -->
<rabbit:connection-factoryid="connectionFactory2"host="10.153.25.15"
username="insurance"password="insurance"port="5672"/>
<rabbit:adminconnection-factory="connectionFactory2"/>
<!-- queue 队列声明 -->
<!-- queue 队列声明 name 队里的额name 是关联生产表和消费表的为唯一线索 -->
<rabbit:queueid="queue_insurance"name="queue_insurance">
<rabbit:queue-argumentsvalue-type="java.lang.Long">
<entrykey="x-message-ttl"value="30000"/>
</rabbit:queue-arguments>
</rabbit:queue>
<!-- 定义消费者监听器 -->
<!-- 创建一个bean实例,bean实例中声明处理请求的类 -->
<beanid="consumerLitener2"class="com.insurance.mq.CommissionController"></bean>
<rabbit:listener-container connection-factory="connectionFactory2"acknowledge="auto"concurrency="8">
<!-- queues属性从那个队列中接收消息,ref属性是当存在消息是使用哪个类去处理 -->
<rabbit:listenerqueues="queue_insurance"ref="consumerLitener2"/>
</rabbit:listener-container>
</beans>