RabbitMQ-消费消息
Address[] addresses = new Address[] {new Address(IP_ADDRESS, PORT)};
/**
* 1.建立连接工厂
*/
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setUsername(USER_NAME);
connectionFactory.setPassword(PASSWORD);
/**
* 网络故障自动连接恢复
*/
connectionFactory.setAutomaticRecoveryEnabled(true);
/**
* 2.创建连接 和生产者有一点不同
*/
Connection connection = connectionFactory.newConnection(addresses);
/**
* 3.创建信道
*/
final Channel channel = connection.createChannel();
/**
* 4.设置客户端最多接收示被ack的消息个数
*/
channel.basicQos(64);
Consumer consumer =new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
throws IOException {
System.out.println("接收消息 : "+new String(body));
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
//消息确认
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
/**
* 回调
*/
channel.basicConsume(QUEUR_NAME, consumer);
/**
* 关闭资源
*/
TimeUnit.SECONDS.sleep(5);
channel.close();
connection.close();
basicConsume方法
String basicConsume(String queue, Consumer callback) throws IOException;
String basicConsume(String queue, boolean autoAck, Consumer callback) throws IOException;
String basicConsume(String queue, boolean autoAck, Map<String, Object> arguments, Consumer callback) throws IOException;
String basicConsume(String queue, boolean autoAck, String consumerTag, Consumer callback) throws IOException;
String basicConsume(String queue, boolean autoAck, String consumerTag, boolean noLocal, boolean exclusive, Map<String, Object> arguments, Consumer callback) throws IOException;
- queue 队列名
- autoAck 是否自动确认消息,true自动确认,false 不自动要手动调用,建立设置为false
//消息确认
channel.basicAck(envelope.getDeliveryTag(), false);
- consumerTag 消费者标签,用来区分多个消费者
- noLocal 设置为true,表示 不能将同一个Conenction中生产者发送的消息传递给这个Connection中 的消费者
- exclusive 是否排他
- arguments 消费者的参数
- callback 消费者 DefaultConsumer建立使用,重写其中的方法
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
- 重写的方法
@Override
public void handleConsumeOk(String consumerTag) {
this._consumerTag = consumerTag;
}
@Override
public void handleCancelOk(String consumerTag) {
// no work to do
}
@Override
public void handleCancel(String consumerTag) throws IOException {
// no work to do
}
@Override
public void handleShutdownSignal(String consumerTag, ShutdownSignalException sig) {
// no work to do
}
@Override
public void handleRecoverOk(String consumerTag) {
// no work to do
}
- handleShutdownSignal方法 当Channel与Conenction关闭的时候会调用,
- handleCancelOk方法会在其它方法之前调用,返回消费者标签
*handleCancelOk与handleCancel消费者可以显式或者隐式的取水订单的时候调用,也可以通过
channel.basicCancel方法来显式的取消一个消费者订阅
会首先触发handleConsumeOk方法,之后触发handleDelivery方法,最后才触发handleCancelOk方法
channel.basicAck();确认消息
deliveryTag:该消息的index
multiple:是否批量.true:将一次性ack所有小于deliveryTag的消息。