最近开发的需求涉及到了MQ的一些知识,现在做一个简单的总结。
项目中为什么会用到MQ
我们的系统要把每次用户登陆或者修改的用户信息同步到用户管理系统那边(2个系统不是用的同一个数据库)。这就导致我们需要每次通过ETL来将用户信息同步过去,但是etl同步数据有延迟,现在我们想要的就是将修改后的信息实时传过去。传统的方式可以通过远程RPC调用将数据进行传输,但是缺点就是系统间的耦合性太高了。通过使用MQ消息订阅及消费模式,实时的高效同步,我们只需要将我们的数据发送到MQ上,然后对方再从MQ上将数据取下来就行了。我们发送数据的这一方就是一个生产者,对方从MQ上取下我们发送的数据就是消费者。可以将MQ当作一个传递消息的邮局,发件人就是生产者,收件人就是消费者。
MQ的优点
- 解耦:降低系统之间的联系,我们系统每次操作用户信息后,直接将信息放到MQ中就行了,对方系统只管监听MQ并将数据取出来就行了。
- 流量削峰:可以将我们系统的请求缓存到MQ中,对方系统再慢慢的从MQ中拉取数据并将数据同步到数据库中。
- 数据分发:哪个系统需要数据只要直接从MQ中将数据取出来就行了而不必系统与系统之间联系。
RocketMQ的结构
- NameServer:主要用来记录维护Topic、Broker的信息,及监控Broker的运行状态。
- Broker:是RocketMQ的核心,提供了消息的接收,存储,拉取等功能,为了保证Broker的高可用,Broker分为Master和Slave,一个Master可以对应多个Slave,Master与Slave的对应关系通过指定相同的BrokerName,不同的BrokerId来定义,BrokerId为0表示Master,非0表示Slave。
- Producer生产者:需要与NameServer建立连接,从NameServer获取Topic路由信息,并向提供Topic服务的Broker Master建立连接。
- Customer消费者:同样与NameServer建立连接,从NameServer获取Topic路由信息,并向提供Topic服务的Broker Master,Slave建立连接。
RocketMQ简单使用
准备:Linux系统、JDK1.8、RocketMQ安装压缩包
进入官网下载压缩包
上传到服务器上并解压压缩文件
进入解压后文件夹中的bin目录下,输入sh mqnamesrv
,启动NameServer发现报错
原因:RocketMQ默认的虚拟机内存较大,启动的时候因为内存不足而失败,需要修改runserver.sh和runbroker.sh这2个文件,将内存调小一点。
继续输入sh mqnamesrv
启动NameServer
新开终端,继续输入sh mqbroker -n localhost:9876
启动Broker
再新开一个终端,输入jps
,可以看到NameServer和Broker都启动了
测试运行代码
pom文件中引入
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.4.0</version>
</dependency>
生产者
public class Producer {
public static void main(String[] args) throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
//Instantiate with a producer group name.
DefaultMQProducer producer = new DefaultMQProducer("test_group");
// 设置NameServer地址,多个地址之间用;分隔
producer.setNamesrvAddr("mainnode:9876");
//启动一个producer实例
producer.start();
for (int i = 0; i < 5; i++) {
//定义发送的消息
Message msg = new Message("TopicTest" /* 定义发送的主题 */,
"Tag" /* 发送的标记 */,
"key"+i/* Key */,
("hello"+i).getBytes());/* 发送的内容 */
//调用producer的send()方法发送消息并获取响应结果
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
}
//关闭生产者
producer.shutdown();
}
}
消费者
public class Customer {
public static void main(String[] args) throws MQClientException {
// Instantiate with specified consumer group name.
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test_group");
// 设置NameServer地址,多个地址之间用;分隔
consumer.setNamesrvAddr("mainnode:9876");
// 设置消费者订阅的Topic和Tag,*代表全部的Tag
consumer.subscribe("TopicTest", "*");
// 获取信息
consumer.registerMessageListener(new MessageListenerConcurrently() {
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
for (MessageExt ms : list) {
System.out.println(new String(ms.getBody()));
}
//返回消费成功状态
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//调用start()方法启动consumer
consumer.start();
}
}
分别运行消费者和生产者测试,可以看到生产者发送到MQ的信息已经被消费者打印输出了。