RabbitMQ默认的调度机制是均匀的发送给消费者,公平调度。
但是实际情况下可能会出现消费者消息能力不同的情况,这样的话整体的消费能力会降低。
RabbitMQ提供了prefetchcount=1来设置调度策略, 在消费者处理并确认前一条消息之前,不要向其发送新消息。相反,它将把它发送给下一个还不忙的消费者。
如果是用的spring-boot-starter-amqp框架,可以通过在配置文件设置"spring.rabbitmq.listener.simple.prefetch=1"
消费者实现代码:
@RabbitListener(queues = "q.test.direct")
public void exec1(Message message, @Headers Map<String, Object> headers, Channel channel) throws InterruptedException {
logger.info("exec1 message {}", new String(message.getBody()));
Thread.sleep(3000L);
}
@RabbitListener(queues = "q.test.direct")
public void exec2(Message message, @Headers Map<String, Object> headers, Channel channel) {
logger.info("exec2 message {}", new String(message.getBody()));
}
修改完成的执行结果:
2019-09-19 17:06:17.347 INFO 79404 --- [ntContainer#0-1] com.jiang.rabbitmqdemo.Consumer : exec1 message test
2019-09-19 17:06:17.347 INFO 79404 --- [ntContainer#1-1] com.jiang.rabbitmqdemo.Consumer : exec2 message test
2019-09-19 17:06:17.351 INFO 79404 --- [ntContainer#1-1] com.jiang.rabbitmqdemo.Consumer : exec2 message test
2019-09-19 17:06:17.354 INFO 79404 --- [ntContainer#1-1] com.jiang.rabbitmqdemo.Consumer : exec2 message test
2019-09-19 17:06:17.357 INFO 79404 --- [ntContainer#1-1] com.jiang.rabbitmqdemo.Consumer : exec2 message test
2019-09-19 17:06:17.360 INFO 79404 --- [ntContainer#1-1] com.jiang.rabbitmqdemo.Consumer : exec2 message test
2019-09-19 17:06:17.362 INFO 79404 --- [ntContainer#1-1] com.jiang.rabbitmqdemo.Consumer : exec2 message test
2019-09-19 17:06:17.365 INFO 79404 --- [ntContainer#1-1] com.jiang.rabbitmqdemo.Consumer : exec2 message test
2019-09-19 17:06:17.369 INFO 79404 --- [ntContainer#1-1] com.jiang.rabbitmqdemo.Consumer : exec2 message test
2019-09-19 17:06:17.376 INFO 79404 --- [ntContainer#1-1] com.jiang.rabbitmqdemo.Consumer : exec2 message test