消息中间件提供了异步通信、缓冲和解耦等特性,由于使系统解耦,也提高了系统的可扩展性。但是也需要考虑由此可能带来的一些问题。
1、消息丢失
1.1 发生的场景
topic特性使得不在线的消费者上线后收不到之前的消息了,如果一段时间消费者都不在线,那消息就始终处于未被处理的状态。
服务器异常重启,导致内存中的数据丢失
消息设置了失效时间,过了时间就进入死信队列了
消费者自动回复ack,但是ack之后处理消息的过程中出现故障或异常,导致消息被消费者接收单未处理
1.2 可能导致的问题
消息丢失,导致的问题就是数据丢失,具体要看是什么业务场景的数据
1.3 解决思路
可追溯消息,默认topic数据是不进行持久化的,可以设置可追溯策略,最近多少条、最近多长时间、最近多少字节。
持久化消息
幂等化业务操作,使得消息丢失后可以重试,但不影响结果
做高可用,保证消息不因节点故障或网络故障丢失
死信队列,对死信队列的数据进行重新推送或重新消费
ack,消息被消费处理完成后再回复broker可以去清除该消息了
消息重投、重试机制
记录日志,将处理异常的或者可能丢失的筛选出来重新处理
2、重复消费
2.1 发生的场景
网络环境差等因素导致重复发送多条相同业务的消息、自动重试等。
2.2 可能导致的问题
导致业务数据错乱。
2.3 解决思路
设计幂等,使得消息重试多少次得到的结果都是一样的。
3、消息堆积
3.1 发生的场景
分组或者独占消费者(exclusive和selector),即设定消息只能有特定的消费者接收消费,或者消费者设定了只接收特定属性值的消息。
消费者的消费能力远小于生产者的生产能力(也许是存在短时间消息生产高峰或者消费者节点出现故障)
3.2 可能导致的问题
内存占用越来越多,内存可能溢出;
如果设置了消息持久化,磁盘空间占用越来越大;
主要还是影响业务响应时间;
内存溢出或者磁盘占满还可能导致消息丢失。
3.3 解决思路
对应场景和原因提出解决思路。
分组或独占消费者:定期检查发现追踪。
消费者能力不足:增加节点,多线程提升消费效率,优化IO模型与IO传递速度。
broker容量有限:增加容量,JVM参数配置,是否有不自动清除的垃圾数据、死信未处理、做高可用集群。
生产者短时间产生大量消息:检查产生的数据是否正常、程序是否有问题;事前做好压测和测试,预留处理能力。
一、什么是activemq
activeMQ是一种开源的,实现了JMS1.1规范的,面向消息(MOM)的中间件,为应用程序提供高效的、可扩展的、稳定的和安全的企业级消息通信。
二、activemq的作用以及原理
Activemq 的作用就是系统之间进行通信。 当然可以使用其他方式进行系统间通信, 如果使用 Activemq 的话可以对系统之间的调用进行解耦, 实现系统间的异步通信。 原理就是生产者生产消息, 把消息发送给activemq。 Activemq 接收到消息, 然后查看有多少个消费者, 然后把消息转发给消费者, 此过程中生产者无需参与。 消费者接收到消息后做相应的处理和生产者没有任何关系
三、activemq的几种通信方式
3.1publish(发布)-subscribe(订阅)(发布-订阅方式)
发布/订阅方式用于多接收客户端的方式.作为发布订阅的方式,可能存在多个接收客户端,并且接收端客户端与发送客户端存在时间上的依赖。一个接收端只能接收他创建以后发送客户端发送的信息。作为subscriber ,在接收消息时有两种方法,destination的receive方法,和实现message listener 接口的onMessage 方法
3.2 p2p(point-to-point)(点对点)
p2p的过程则理解起来比较简单。它好比是两个人打电话,这两个人是独享这一条通信链路的。一方发送消息,另外一方接收,就这么简单。在实际应用中因为有多个用户对使用p2p的链路
相互通信的双方是通过一个类似于队列的方式来进行交流。和前面pub-sub的区别在于一个topic有一个发送者和多个接收者,而在p2p里一个queue只有一个发送者和一个接收者。
3.3publish(发布)-subscribe(订阅)方式的处理
发布订阅模式的通信方式, 默认情况下只通知一次, 如果接收不到此消息就没有了。 这种场景只适用于对消息送达率要求不高的情况。 如果要求消息必须送达不可以丢失的话, 需要配置持久订阅。 每个订阅端定义一个 id,
<property name="clientId" 在订阅是向 activemq 注册。 发布消息 <property name="subscriptionDurable" value="true"/>和接收消息时需要配置发送模式为持久化template.setDeliveryMode(DeliveryMode.PERSISTENT);。 此时如果客户端接收不到消息, 消息会持久化到服务端(就是硬盘上), 直到客户端正常接收后为止。
3.4p - p(点对点)方式的处理
点对点模式的话, 如果消息发送不成功此消息默认会保存到 activemq 服务端直到有消费者将其消费, 所以此时消息是不会丢失的。
Kafka、ActiveMQ、RabbitMQ、RocketMQ 有什么优缺点?
Kafka
单机吞吐量:十万级,最大的优点,就是吞吐量高。
topic数量都吞吐量的影响:topic从几十个到几百个的时候,吞吐量会大幅度下降。所以在同等机器下,kafka尽量保证topic数量不要过多。如果要支撑大规模topic,需要增加更多的机器资源
时效性:ms级
可用性:非常高,kafka是分布式的,一个数据多个副本,少数机器宕机,不会丢失数据,不会导致不可用
消息可靠性:经过参数优化配置,消息可以做到0丢失
功能支持:功能较为简单,主要支持简单的MQ功能,在大数据领域的实时计算以及日志采集被大规模使用
ActiveMQ
单机吞吐量:万级
topic数量都吞吐量的影响:
时效性:ms级
可用性:高,基于主从架构实现高可用性
消息可靠性:有较低的概率丢失数据
功能支持:MQ领域的功能极其完备
总结:
非常成熟,功能强大,在早些年业内大量的公司以及项目中都有应用
偶尔会有较低概率丢失消息
现在社区以及国内应用都越来越少,官方社区现在对ActiveMQ 5.x维护越来越少,几个月才发布一个版本
主要是基于解耦和异步来用的,较少在大规模吞吐的场景中使用
RabbitMQ
单机吞吐量:万级
topic数量都吞吐量的影响:
时效性:微秒级,延时低是一大特点。
可用性:高,基于主从架构实现高可用性
消息可靠性:
功能支持:基于erlang开发,所以并发能力很强,性能极其好,延时很低
总结:
erlang语言开发,性能极其好,延时很低;
吞吐量到万级,MQ功能比较完备
开源提供的管理界面非常棒,用起来很好用
社区相对比较活跃,几乎每个月都发布几个版本分
在国内一些互联网公司近几年用rabbitmq也比较多一些 但是问题也是显而易见的,RabbitMQ确实吞吐量会低一些,这是因为他做的实现机制比较重。
erlang开发,很难去看懂源码,基本职能依赖于开源社区的快速维护和修复bug。
rabbitmq集群动态扩展会很麻烦,不过这个我觉得还好。其实主要是erlang语言本身带来的问题。很难读源码,很难定制和掌控。
RocketMQ
单机吞吐量:十万级
topic数量都吞吐量的影响:topic可以达到几百,几千个的级别,吞吐量会有较小幅度的下降。可支持大量topic是一大优势。
时效性:ms级
可用性:非常高,分布式架构
消息可靠性:经过参数优化配置,消息可以做到0丢失
功能支持:MQ功能较为完善,还是分布式的,扩展性好
总结:
接口简单易用,可以做到大规模吞吐,性能也非常好,分布式扩展也很方便,社区维护还可以,可靠性和可用性都是ok的,还可以支撑大规模的topic数量,支持复杂MQ业务场景
而且一个很大的优势在于,源码是java,我们可以自己阅读源码,定制自己公司的MQ,可以掌控
社区活跃度相对较为一般,不过也还可以,文档相对来说简单一些,然后接口这块不是按照标准JMS规范走的有些系统要迁移需要修改大量代码
总结:
kafka的特点其实很明显,就是仅仅提供较少的核心功能,但是提供超高的吞吐量,ms级的延迟,极高的可用性以及可靠性,而且分布式可以任意扩展
同时kafka最好是支撑较少的topic数量即可,保证其超高吞吐量
kafka唯一的一点劣势是有可能消息重复消费,那么对数据准确性会造成极其轻微的影响,在大数据领域中以及日志采集中,这点轻微影响可以忽略
最后
一般的业务系统要引入MQ,最早大家都用ActiveMQ,但是现在确实大家用的不多了,没经过大规模吞吐量场景的验证,社区也不是很活跃后来大家开始用RabbitMQ,但是确实erlang语言阻止了大量的java工程师去深入研究和掌控他,对公司而言,几乎处于不可控的状态,但是确实人家是开源的,比较稳定的支持,活跃度也高;不过现在确实越来越多的公司,会去用RocketMQ,确实很不错,但是要想好社区万一突然黄掉的风险所以中小型公司,技术实力较为一般,技术挑战不是特别高,用RabbitMQ是不错的选择;大型公司,基础架构研发实力较强,用RocketMQ是很好的选择
如果是大数据领域的实时计算、日志采集等场景,用Kafka是业内标准的,绝对没问题,社区活跃度很高,绝对不会黄,何况几乎是全世界这个领域的事实性规范