Spring Boot RabbitMQ快速入门 (2)

Prefetch设置

当我们进入RabbitMQ的GUI管理界面, 点入某个队列查看消费者的属性时, 有记录如下

Channel Consumer tag Ack required Exclusive Prefetch count Arguments
172.22.0.1:57382 amq.ctag-Gsix2DEjaFI9zVlsJJZp3Q 1
172.22.0.1:57378 amq.ctag-_FIcIOpflMXXaBQN7xLYcA 1

上面的表格说明消息的消费需要手工ack, 且是公平分发的. 设置prefetch的方式有两种

  • 全局式设定

    在application.yml文件中设定spring.rabbitmq.listener.prefetch即可, 这会影响到本Spring Boot应用中所有使用默认SimpleRabbitListenerContainerFactory的消费者

    spring:
      rabbitmq:
        host: localhost
        username: chris
        password: 123123
        virtual-host: prontera
        listener:
          prefetch: 100
    
  • 特定消费者设置

    在消费者的配置中自定义一个SimpleRabbitListenerContainerFactory

    @Bean
    public SimpleRabbitListenerContainerFactory myContainerFactory(
      SimpleRabbitListenerContainerFactoryConfigurer configurer, 
      ConnectionFactory connectionFactory) {
      SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
      factory.setPrefetchCount(100);
      configurer.configure(factory, connectionFactory);
      return factory;
    }
    
    

    然后在消费者上声明使用该ContainerFactory即可达到对特定消费者配置prefetch的作用

    @RabbitListener(queues = "#{rabbitConfiguration.TOPIC_QUEUE}", containerFactory = "myContainerFactory")
    public void processBootTask2(WorkUnit content) {
      System.out.println(content);
    }
    
    

Ack机制

Spring Boot Rabbit使用手工应答机制, 当@RabbitListener修饰的方法被调用且没有抛出异常时, Spring Boot会为我们自动应答. 否则会根据设定的重试机制而作出nack或reject等行为.

重试机制

重试分两种, template的重试与listener的重试, 分别代表生产者与消费者

生产者端的重试

spring:
  rabbitmq:
    template:
      retry:
        enabled: true

通过以上配置可以启动AmqpTemplate的重试机制, 例如与RabbitMQ连接丢失的时候将会自动重试事件的发布, 这个特性默认是关闭的

消费者端的重试

消费者一端, 即@RabbitListener也有像AmqpTemplate一样的重试机制, 当重试次数(默认是3)耗尽的时候, 该特性同样也是默认关闭的, 可以通过以下配置打开

spring:
  rabbitmq:
    host: localhost
    username: chris
    password: 123123
    virtual-host: prontera
    listener:
      retry:
        enabled: true

如果消费一端的重试机制没有被启动, 而且Listener抛出异常的话, 那么该消息就会无限地被重试(刚开始我也晕, retry都关了居然会被无限地重试, 这个不是bug, 官方文档就是这么写的, 实测结果也是一样). 通常我们的做法是抛出AmqpRejectAndDontRequeueException以reject该消息, 同时如果有dead-letter queue被设置的话该消息就会被置入, 否则被丢弃.

如果启动消费端的重试机制, 我们可以设置其最大的尝试次数(默认为3次)

spring:
  rabbitmq:
    listener:
      retry:
        enabled: true
        max-attempts: 5

死信队列

@Bean
public DirectExchange directExchange() {
  return new DirectExchange(DEFAULT_DIRECT_EXCHANGE, true, true);
}

@Bean
public Queue tradeQueue() {
  final ImmutableMap<String, Object> args = 
    ImmutableMap.of("x-dead-letter-exchange", DEFAULT_DIRECT_EXCHANGE,
                    "x-dead-letter-routing-key", TRADE_DEAD_ROUTE_KEY);
  return new Queue(TRADE_QUEUE, true, false, true, args);
}

@Bean
public Binding tradeBinding() {
  return BindingBuilder.bind(tradeQueue()).to(directExchange()).with(TRADE_ROUTE_KEY);
}

@Bean
public Queue deadLetterQueue() {
  return new Queue(TRADE_DEAD_QUEUE, true, false, true);
}

@Bean
public Binding deadLetterBinding() {
  return BindingBuilder.bind(deadLetterQueue()).to(directExchange()).with(TRADE_DEAD_ROUTE_KEY);
}

队列定义不一致

对于已经存在的Queue配置将不会被后来的覆盖, 且会在Spring Boot控制台抛出一条WARN日志

Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'durable' for queue 'boot_task' in vhost 'prontera': received 'false' but current is 'true', class-id=50, method-id=10)

Spring Boot RabbitMQ Properties

# RABBIT (RabbitProperties)
spring.rabbitmq.addresses= # Comma-separated list of addresses to which the client should connect.
spring.rabbitmq.cache.channel.checkout-timeout= # Number of milliseconds to wait to obtain a channel if the cache size has been reached.
spring.rabbitmq.cache.channel.size= # Number of channels to retain in the cache.
spring.rabbitmq.cache.connection.mode=CHANNEL # Connection factory cache mode.
spring.rabbitmq.cache.connection.size= # Number of connections to cache.
spring.rabbitmq.connection-timeout= # Connection timeout, in milliseconds; zero for infinite.
spring.rabbitmq.dynamic=true # Create an AmqpAdmin bean.
spring.rabbitmq.host=localhost # RabbitMQ host.
spring.rabbitmq.listener.acknowledge-mode= # Acknowledge mode of container.
spring.rabbitmq.listener.auto-startup=true # Start the container automatically on startup.
spring.rabbitmq.listener.concurrency= # Minimum number of consumers.
spring.rabbitmq.listener.default-requeue-rejected= # Whether or not to requeue delivery failures; default `true`.
spring.rabbitmq.listener.max-concurrency= # Maximum number of consumers.
spring.rabbitmq.listener.prefetch= # Number of messages to be handled in a single request. It should be greater than or equal to the transaction size (if used).
spring.rabbitmq.listener.retry.enabled=false # Whether or not publishing retries are enabled.
spring.rabbitmq.listener.retry.initial-interval=1000 # Interval between the first and second attempt to deliver a message.
spring.rabbitmq.listener.retry.max-attempts=3 # Maximum number of attempts to deliver a message.
spring.rabbitmq.listener.retry.max-interval=10000 # Maximum interval between attempts.
spring.rabbitmq.listener.retry.multiplier=1.0 # A multiplier to apply to the previous delivery retry interval.
spring.rabbitmq.listener.retry.stateless=true # Whether or not retry is stateless or stateful.
spring.rabbitmq.listener.transaction-size= # Number of messages to be processed in a transaction. For best results it should be less than or equal to the prefetch count.
spring.rabbitmq.password= # Login to authenticate against the broker.
spring.rabbitmq.port=5672 # RabbitMQ port.
spring.rabbitmq.publisher-confirms=false # Enable publisher confirms.
spring.rabbitmq.publisher-returns=false # Enable publisher returns.
spring.rabbitmq.requested-heartbeat= # Requested heartbeat timeout, in seconds; zero for none.
spring.rabbitmq.ssl.enabled=false # Enable SSL support.
spring.rabbitmq.ssl.key-store= # Path to the key store that holds the SSL certificate.
spring.rabbitmq.ssl.key-store-password= # Password used to access the key store.
spring.rabbitmq.ssl.trust-store= # Trust store that holds SSL certificates.
spring.rabbitmq.ssl.trust-store-password= # Password used to access the trust store.
spring.rabbitmq.ssl.algorithm= # SSL algorithm to use. By default configure by the rabbit client library.
spring.rabbitmq.template.mandatory=false # Enable mandatory messages.
spring.rabbitmq.template.receive-timeout=0 # Timeout for `receive()` methods.
spring.rabbitmq.template.reply-timeout=5000 # Timeout for `sendAndReceive()` methods.
spring.rabbitmq.template.retry.enabled=false # Set to true to enable retries in the `RabbitTemplate`.
spring.rabbitmq.template.retry.initial-interval=1000 # Interval between the first and second attempt to publish a message.
spring.rabbitmq.template.retry.max-attempts=3 # Maximum number of attempts to publish a message.
spring.rabbitmq.template.retry.max-interval=10000 # Maximum number of attempts to publish a message.
spring.rabbitmq.template.retry.multiplier=1.0 # A multiplier to apply to the previous publishing retry interval.
spring.rabbitmq.username= # Login user to authenticate to the broker.
spring.rabbitmq.virtual-host= # Virtual host to use when connecting to the broker.

作者:Chris
原博客:http://blog.chriscs.com

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,189评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,577评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,857评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,703评论 1 276
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,705评论 5 366
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,620评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,995评论 3 396
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,656评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,898评论 1 298
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,639评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,720评论 1 330
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,395评论 4 319
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,982评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,953评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,195评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 44,907评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,472评论 2 342

推荐阅读更多精彩内容