一、启动RabbitMQ本地服务
1、本地安装RabbitMQ服务
1. brew update // Homebrew更新
2. brew install rabbitmq // 安装
2、配置环境变量
在用户根目录下配置环境变量,我用的是是nano 编辑器
3、启动节点(服务)
rabbitmq-server或者rabbitmq-server start
该命令会同时启动应用程序和 Erlang 节点。
附:停止节点
rabbitmqctl stop
该命令会同时关闭应用程序和 Erlang 节点。
二、生产者
// 官方示例是回调函数,我这儿用的是ES6的写法
const amqp = require('amqplib');
async function f1() {
const con = await amqp.connect('amqp://localhost');
const channel = await con.createChannel(); // 创建通道(后续所有操作都在该通道中完成)
const queue = 'task_queue';
const msg = process.argv.slice(2).join(' ') || 'hello rabbitmq 222222!';
// 创建连接队列
channel.assertQueue(queue, {
durable: true,
});
// 将消息送入队列
const result = await channel.sendToQueue(queue, Buffer.from(msg), {
persistent: true,
});
console.log(" [x] Sent '%s'", msg);
setTimeout(function(){
con.close();
process.exit(0);
}, 500);
}
f1();
三、消费者
// 官方示例是回调函数,我这儿用的是ES6的写法
const amqp = require('amqplib');
async function f1() {
// 1、连接RabbitMQ服务器(我在本地安装并启动了RabbitMQ服务)
const con = await amqp.connect('amqp://localhost');
// 2、创建通道(所有后续的操作都在通道里完成)
const channel = await con.createChannel();
const queue = 'task_queue';
// 3、创建并连接到队列
channel.assertQueue(queue, {
durable: true, // 持久化消息(即使服务器重启,依然不会丢失消息)
});
// 4、设置每次队列送到消费者的数量为2(通道里每次只保留2个消息待处理,这2个处理完了在接受下2个)
channel.prefetch(2);
console.log(" [*] Waiting for messages in %s. To exit press CTRL+C", queue);
// 5、从队列里取出消息
const message = await channel.consume(queue, function(msg) {
var secs = msg.content.toString().split('.').length - 1;
console.log(" [x] Received %s", msg.content.toString());
setTimeout(function() {
console.log(" [x] Done");
channel.ack(msg);
}, secs * 1000);
}, {
// 服务器期望消费者消费完后发送回执(ACK),如果服务器端超时未收到回执(ACK),
// 会重新分配该消息(如果有其他消费者,就会分配给其他消费者了)
noAck: false
});
}
f1();
四、启动测试
五、参考链接
环境部署参考(官方)
https://www.rabbitmq.com/install-homebrew.html
生产者消费者参考(官方)
https://www.rabbitmq.com/tutorials/tutorial-two-javascript.html
RabbitMQ服务命令参考
https://blog.csdn.net/wohu1104/article/details/91469537
RabbitMQ理解参考
https://www.sojson.com/blog/48.html
https://www.cnblogs.com/wutianqi/p/10043011.html