一、限时订单实战
1.1、什么是限时订单
在各种电商网站下订单后会保留一个时间段,时间段内未支付则自动将订单状态设置为已过期,这种订单称之为限时订单。
1.2、如何实现限时订单
1.2.1、限时订单的流程
电商平台都会包含以下 5 种状态。
待付款:代表买家下单了但是还没有付款。
待发货:代表买家付款了卖家还没有发货。
已发货:代表卖家已经发货并寄出商品了。
已完成:代表买家已经确认收到货了。
已关闭:代表订单过期了买家也没付款、或者卖家关闭了订单。
1.2.2、限时订单实现的关键
我们可以看到,订单中的很多状态都是可以用户触发的,唯独订单过期了买家也没付款我们需要自动的把订单给关闭,这个操作是没有用户或者是 人工干预的,所以限时订单的关键就是如何检查订单状态,如果订单过期了则把该订单设置为关闭状态。
1.2.3、轮询数据库?
轮询数据库在实现限时订单上是可行的,而且实现起来很简单。写个定时器去每隔一段时间扫描数据库,检查到订单过期了,做适当的业务处理。 但是轮询会带来什么问题?
1、轮询大部分时间其实是在做无用功,我们假设一张订单是 45 分钟过期,每 1 分钟我们扫描一次,对这张订单来说,要扫描 45 次以后,才会检查 到这张订单过期,这就意味着数据库的资源(连接,IO)被白白浪费了;
2、处理上的不及时,一个待支付的电影票订单我们假设是 12:00:35 过期,但是上次扫描的时间是 12:00:30,那么这个订单实际的过期时间是什么时 候?12:01:30,和我本来的过期时间差了 55 秒钟。放在业务上,会带来什么问题?这张电影票,假设是最后一张,有个人 12:00:55 来买票,买得到吗? 当然买不到了。那么这张电影票很有可能就浪费了。如果缩短扫描的时间间隔,第一只能改善不能解决,第二,又会对数据库造成更大的压力。
那么我们能否有种机制,不用定时扫描,当订单到期了,自然通知我们的应用去处理这些到期的订单呢?
1.2.4、Java 本身的提供的解决方案
java 其实已经为我们提供了问题的方法。我们想,要处理限时支付的问题,肯定是要有个地方保存这些限时订单的信息的,意味着我们需要一个容器, 于是我们在 Java 容器中去寻找。Map? List? Queue?
看看 java 为我们提供的容器,我们是个多线程下的应用,会有多个用户同时下订单,所以所有并发不安全的容器首先被排除,并发安全的容器有哪 些?一一排除,很巧,java 在阻塞队列里为我们提供了一种叫延迟队列 delayQueue 的容器,刚好可以为我们解决问题。
DelayQueue: 阻塞队列(先进先出)
1)支持阻塞的插入方法:意思是当队列满时,队列会阻塞插入元素的线程,直到队列不满。
2)支持阻塞的移除方法:意思是在队列为空时,获取元素的线程会等待队列变为非空。
延迟期满时才能从中提取元素(光队列里有元素还不行)。
Delayed 接口使对象成为延迟对象,它使存放在 DelayQueue 类中的对象具有了激活日期。该接口强制实现下列两个方法。
- CompareTo(Delayed o):Delayed 接口继承了 Comparable 接口,因此有了这个方法。让元素按激活日期排队
- getDelay(TimeUnit unit):这个方法返回到激活日期的剩余时间,时间单位由单位参数指定。
阻塞队列更多详情,参考 VIP 课程《并发编程》
1.2.5、架构师应该多考虑一点!
架构师在设计和实现系统时需要考虑些什么?
功能,这个没什么好说,实现一个应用,连基本的功能都没实现,要这个应用有何用?简直就是“一顿操作猛如虎,一看战绩零比五”
高性能,能不能尽快的为用户提供服务和能为多少用户同时提供服务,性能这个东西是个很综合性的东西,从前端到后端,从架构(缓存机制、异 步机制)到 web 容器、数据库本身再到虚拟机到算法、java 代码、sql 语句的编写,全部都对性能有影响。如何提升性能,要建立在充分的性能测试的基 础上,然后一个个的去解决性能瓶颈。对我们今天的应用来讲,我们不想去轮询数据库,其实跟性能有非常大的关系。
高可用,应用正确处理业务,服务用户的时间,这个时间当然是越长越好,希望可以 7*24 小时。而且哪怕服务器出现了升级,宕机等等情况下,能 够以最短的时间恢复,为用户继续服务,但是实际过程中没有哪个网站可以说做到 100%,不管是 Google,FaceBook,阿里,腾讯,一般来说可以做到 99.99% 的可用性,已经是相当厉害了,这个水平大概就是一个服务在一年可以做到只有 50 分钟不可用。这个需要技术、资金、技术人员的水平和责任心,还要 运气。
高伸缩,伸缩性是指通过不断向集群中加入服务器的手段来缓解不断上升的用户并发访问压力和不断增长的数据存储需求。就像弹簧一样挂东西一 样,用户多,伸一点,用户少,缩一点。衡量架构是否高伸缩性的主要标准就是是否可用多台服务器构建集群,是否容易向集群中添加新的服务器。加 入新的服务器后是否可以提供和原来服务器无差别的服务。集群中可容纳的总的服务器数量是否有限制。
高扩展,的主要标准就是在网站增加新的业务产品时,是否可以实现对现有产品透明无影响,不需要任何改动或者很少改动既有业务功能就可以上 线新产品。比如购买电影票的应用,用户购买电影票,现在我们要增加一个功能,用户买了票后,随机抽取用户送限量周边。怎么做到不改动用户下订 单功能的基础上增加这个功能。熟悉设计模式的同学,应该很眼熟,这是设计模式中的开闭原则(对扩展开放,对修改关闭)在架构层面的一个原则。
普利兹克奖(建筑界的诺贝尔奖之称)2016 年该奖得主:48 岁的智利建筑师亚历杭德罗·阿拉维纳。他设计的房子对扩展开放,对修改关闭。
1.2.6、从系统可用性角度考虑
应用重启带来的问题:
保存在 Queue 中的订单会丢失,这些丢失的订单会在什么时候过期,因为队列里已经没有这个订单了,无法检查了,这些订单就得不到处理了。
已过期的订单不会被处理,在应用的重启阶段,可能会有一部分订单过期,这部分过期未支付的订单同样也得不到处理,会一直放在数据库里,过 期未支付订单所对应的资源比如电影票所对应的座位,就不能被释放出来,让别的用户来购买。
解决之道 :在系统启动时另行处理
1.2.7、从系统伸缩性角度考虑
集群化了会带来什么问题?应用之间会相互抢夺订单,特别是在应用重启的时候,重新启动的那个应用会把不属于自己的订单,也全部加载到自己 的队列里去,一是造成内存的浪费,二来会造成订单的重复处理,而且加大了数据库的压力。
解决方案
让应用分区处理
1、 给每台服务器编号,然后在订单表里登记每条订单的服务器编号;
2,更简单的,在订单表里登记每台服务器的 IP 地址,修改相应的 sql 语句即 可。
几个问题:如果有一台服务器挂了怎么办?运维吃干饭的吗?服务器挂了赶紧启动啊。如果是某台服务器下线或者宕机,起不来怎么搞?这个还是 还是稍微有点麻烦,需要人工干预一下,手动把库里的每条订单数据的服务器编号改为目前正常的服务器的编号,不过也就是一条 sql 语句的事,然后想 办法让正常的服务器进行处理(重启正常的服务器)。
1.3、用 RocketMQ 实现限时订单
引入 RocketMQ 呢,使用延时消息,一举解决我们限时订单的伸缩性和扩展性问题
1.3.1、延时消息
概念介绍
延时消息:Producer 将消息发送到消息队列 RocketMQ 服务端,但并不期望这条消息立马投递,而是延迟一定时间后才投递到 Consumer 进行消费, 该消息即延时消息。
适用场景
消息生产和消费有时间窗口要求:比如在电商交易中超时未支付关闭订单的场景,在订单创建时会发送一条延时消息。这条消息将会在 30 分钟以 后投递给消费者,消费者收到此消息后需要判断对应的订单是否已完成支付。 如支付未完成,则关闭订单。如已完成支付则忽略。
1.3.2、核心的代码部分
整个代码见 delayOrder 包
1、 保存订单 SaveOrder.java 的时候,作为生产者往消息队列里推入订单,核心 RocketMQProducer,这个类当然是要继承 IDelayOrder,同时也是 RocketMQ 的生产者。
2、 消息队列会把延时的订单发给消费者 MessageListenerImpl,它是一个 RocketMQ 的消费者监听,它来负责检查订单是否过期,有消息过来,证明 消息订单过期了,则把订单状态修改为过期订单。
RocketMQ 本身又如何保证可用性和伸缩性?这个就需要 RocketMQ 的主从同步(HA 机制)。
二、RocketMQ 源码分析
使用的 RocketMQ 版本是 4.4.0,鉴于 RocketMQ 通信模块的底层源码是 Netty 实现的,在学习 RocketMQ 的源码之前,建议读者先对 Netty 的多线程模 型、JAVA NIO 模型均有一定的了解,以便快速理解 RocketMQ 源码。
RocketMQ 源码安装和调试见《RocketMQ 源码安装和调试.docx》
2.1、RocketMQ 整体架构
RocketMQ 主要的功能集中在 NameServer、rocketmq-broker、rocketmq-remoting、rocketmq-store 4 个模块
图中名字对应每一个工程中的 artifactId。
整体模块如下:
- rocketmq-namesrv: 命名服务,更新和路由发现 broker 服务。 NameServer 要作用是为消息生产者、消息消费者提供关于主题 Topic 的路由信息,NameServer 除了要存储路由的基础信息,还要能够管理 Broker 节点,包括路由注册、路由删除等功能
- rocketmq-broker: mq 的核心。 它能接收 producer 和 consumer 的请求,并调用 store 层服务对消息进行处理。 HA 服务的基本单元,支持同步双写,异步双写等模式。
- rocketmq-store: 存储层实现,同时包括了索引服务,高可用 HA 服务实现。
- rocketmq-remoting: 基于 netty 的底层通信实现,所有服务间的交互都基于此模块。
- rocketmq-common: 一些模块间通用的功能类,比如一些配置文件、常量。
- rocketmq-client: java 版本的 mq 客户端实现
- rocketmq-filter: 消息过滤服务,相当于在 broker 和 consumer 中间加入了一个 filter 代理。
- rocketmq-srvutil: 解析命令行的工具类 ServerUtil。
- rocketmq-tools: mq 集群管理工具,提供了消息查询等功能
2.2 RocketMQ 服务启动
这个类是服务启动时执行,初始化了发送消息、消费消息、清理过期请求等各种线程池和监听事件。
了解了 mq 服务启动的过程,接下来,我们按照一条消息从客户端发出,最终到服务端的存储层并如何落盘,这一条调用链来分析源码,了解一条消 息是怎么处理的。
2.3、源码分析之消息的来龙去脉
RocketMQ 是一个消息中间件,消息中间件最大的功能就是处理消息,所以我们从消息的角度来做一次源码分析,分析消息的来龙去脉。因为 RocketMQ 本身解耦的,我们从两个独立的部分,消息的生产和消息的消费两大部分入手。
2.3.1、消息的生产
2.3.1.1、Client 中的消息发送
源码跟踪
前面讲过生产者发送有,单向发送,可靠同步发送和可靠异步发送,我们分析消息可靠同步发送的接口代码。
DefaultMQProducer.send() -> DefaultMQProducerImpl.send() -> DefaultMQProducerImpl.sendDefaultImpl()
核心分析 sendDefaultImpl 方法:
- 获取主题路由相关信息
- for 循环发送(发送次数由 retryTimesWhenSendFailed+1 来决定)
- 调用 sendKernelImpl 方法,下面详细分析 sendKernelImpl 方法
3.1 获取路由表信息(如果没有则会从 NameServer 中获取)
3.2 通过判断发送类型设置不同的入参,但是最终都调用了 MQClientAPIImpl 类的 sendMessage 方法。下面详细分析同步调用的 sendMessage 方法
3.2.1 sendMessageSync –>NettyRemotingClient.invokeSync()方法完成发送。
在 NettyRemotingAbstract 中的 invokeSyncImpl 里面会大量使用 Netty 进行调用(Netty 的版本是 4.0.42.Final) - 不同发送方式的 sendResult 处理不同
核心关键点
消息重试:为什么 RocketMQ 中的消息重试是 2?
就是消息一般情况下发送三次。(King 老师个人认为跟中国文化有关,事不过三)
Netty: 这块 Netty 发送的 详情见《网络协议和 Netty》专题
为何要使用 Netty 作为高性能的通信库?
(1)Netty 的编程 API 使用简单,开发门槛低,无需编程者去关注和了解太多的 NIO 编程模型和概念;
(2)对于编程者来说,可根据业务的要求进行定制化地开发,通过 Netty 的 ChannelHandler 对通信框架进行灵活的定制化扩展;
(3)Netty 框架本身支持拆包/解包,异常检测等机制,让编程者可以从 JAVA NIO 的繁琐细节中解脱,而只需要关注业务处理逻辑;
(4)Netty 解决了(准确地说应该是采用了另一种方式完美规避了)JDK NIO 的 Bug(Epoll bug,会导致 Selector 空轮询,最终导致 CPU 100%);
(5)Netty 框架内部对线程,selector 做了一些细节的优化,精心设计的 reactor 多线程模型,可以实现非常高效地并发处理;
(6)Netty 已经在多个开源项目(Hadoop 的 RPC 框架 avro 使用 Netty 作为通信框架)中都得到了充分验证,健壮性/可靠性比较好。
总结
客户端发送消息流程比较简单,首先封装消息,然后根据 NameServer 返回的路由信息,然后把这些组成一个整体,最后调用 Remoting 模块使用 Netty 把消息发送给 Broker。在里面包含了多种发送方式,同时也有消息重新发送机制。
2.3.1.2、Broker 中消息的生产
因为在 Broker 启动流程中涉及到了非常复杂的封装,这里没有必要进行讲解,我们简单想一想,Broker 最核心的功能就是接收到消息然后把消息进 行存储,那么我们就 Broker 中对于消息的处理流程进行分析。
源码跟踪
发送的消息到达 broker,调用 org.apache.rocketmq.broker.processor.SendMessageProcessor 类的 processRequest()方法,processRequest()调用 sendMessag()
1.非批次发送消息 sendMessag()
2.消息存储(注意,这里都只存储 commitlog),调用 DefaultMessageStore.putMessage()方法
2.1 这里进行 commitlog 的提交,调用 CommitLog.putMessage()
2.1.1 在 MappedFile 类中,处理存储都是使用 MappedFile 这个类进行处理的,最终调用 appendMessage 方法。 appendMessagesInner 方法中,这里进行文件的追加(AppendMessageCallback 接口的实现 DefaultAppendMessageCallback 在 CommitLog 类中,是一个内部类)
2.2 在 commitlog 类中 doAppend 方法中进行 commitlog 的处理,还是基于 byteBuffer 的操作
2.3 在 commitlog 类中 doAppend 方法中进行返回,将消息写入的内存的位置信息、写入耗时封装为 AppendMessageResult 对象返回
核心关键点
Commitlog,RocketMQ 接收到消息后,首先是写入 Commitlog 文件,按照顺序进行写入,使用 NIO 技术。
在 Commitlog 中 putMessage 最后通过判断配置文件的主从同步类型和刷盘类型,进行刷盘。
总结
借助 java NIO 的力量,使得 I/O 性能十分高。当消息来的时候,顺序写入 CommitLog。
RocketMQ 下主要有三类大文件:commitlog 文件、Index 文件,consumequeue 文件,对于三类大文件,使用的就是 NIO 的 MappedByteBuffer 类来提 高读写性能(主要是刷盘方法中)。这个类是文件内存映射的相关类,支持随机读和顺序写。在 RocketMQ 中,被封装成了 MappedFile 类
2.3.1.3、Broker 中更新消息队列和索引文件
消息进入 Commitlog 文件还不够,因为对于消费者来说,他们必须要看到 ConsumeQueue 和 IndexFile(ConsumeQueue 是因为消费要根据队列进行消费, 另外没有索引文件 IndexFile,消息的查找会出现很大的延迟)。
所以 RocketMQ 通过开启一个线程 ReputMessageService 来监听 CommitLog 文件更新事件,如果有新的消息,则及时更新 ComsumeQueue、IndexFile 文 件。
源码跟踪
- DefaultMessageStore 类中的内部类 ReputMessageService 专门处理此项任务
- ReputMessageService 类的 run(),默认 1 毫秒处理一次(文件从 CommitLog 到 ComsumeQueue 和 Index)
- ReputMessageService 类的 doReput()方法。
- ReputMessageService 类的 doReput()方法中, doDispatch,最终会构建(构建消息消费队 )和(构建索引文件)
核心关键点
定时任务来处理的消息存储转换。处理核心类是 DefaultMessageStore 类中的内部类 ReputMessageService
总结
定时任务来处理的消息存储转换。处理核心类是 DefaultMessageStore 类中的内部类 ReputMessageService
2.3.2、消息的消费
消息消费分为推和拉两种模式。这里重点分析推模式。
Client 中的消费者启动流程
DefaultMQPushConsumerImpl#start()方法,其中重点就是 consumer.start() DefaultMQPushConsumer.start() ->DefaultMQPushConsumerImpl.start()
源码跟踪
1.检查配置信息
2.加工订阅信息(同时,如果消息消费模式为集群模式,还需要为该消费组创建一个重试主题。)
3.创建 MQClientInstance 实例,
4.负载均衡
5.队列默认分配算法
6.pullAPIWrapper 拉取消息
7.消费进度存储
8.加载消息进度(
9.判断是顺序消息还是并发消息
10.消息消费服务并启动
11.注册消费者
12.MQClientInstance 启动
12.1 定时任务 startScheduledTask()
12.1.1 每隔 2 分钟尝试获取一次 NameServer 地址
12.1.2 每隔 30S 尝试更新主题路由信息
12.1.3 每隔 30S 进行 Broker 心跳检测
12.1.4 默认每隔 5 秒持久化 ConsumeOffset
12.1.5 默认每隔 1S 检查线程池适配
12.2 开启拉消息服务(线程)
12.3 负载均衡服务(线程)
13.更新 TopicRouteData
14.检测 broker 状态
15.发送心跳
16.重新负载
核心关键点
在 RocketMQ 中,推模式还是使用拉模式进行消息的处理的。在 MQClientInstance 启动过程中启动了哪些定时任务
定时任务中 12 步中,包括了消费过程中的各种信息,这些信息都是定时去处理的。
总结
在 RocketMQ 中,推模式对比拉模式封装了非常多的功能,比如负载均衡、队列分配、消费进度存储、顺序消息、心跳检测等。
消息的拉取
分析一下 PUSH 模式下的集群模式消息拉取代码。
同一个消费组内有多个消费者,一个 topic 主题下又有多个消费队列,那么消费者是怎么分配这些消费队列的呢,从上面的启动的代码中是不是还记 得在 org.apache.rocketmq.client.impl.factory.MQClientInstance#start 中,启动了 pullMessageService 服务线程,这个服务线程的作用就是拉取消息,我们去 看下它的 run 方法:
源码跟踪
- MQClientInstance. start() -> PullMessageService. start()
- PullMessageService. pullMessage() -> DefaultMQPushConsumerImpl. pullMessage()方法
- pullMessage 方法中包含了消息拉取的核心部分,包括处理暂停、流量控制、方法回调等。 核心关键点 流量控制 1.当 processQueue 没有消费的消息的数量达到(默认 1000 个)会触发流量控制 2.当 processQueue 中没有消费的消息体总大小 大于(默认 100m)时,触发流控 3.消息的最大位置和最小位置的差值如果大于默认值 2000,那么触发流控
总结
消息进行拉取时的核心是流量控制,这个也是解决客户端与服务端消费能力不对等的一种方案。
消息的消费 消息拉取到了之后,消费者要进行消息的消费,消息的消费主要是 consumeMessageService 线程做的,我们先看下 consumeMessageService 的构造函 数
源码跟踪
1.ConsumeMessageConcurrentlyService 构造函数,在这个构造函数中,new 了一个名字叫 consumeExecutor 的线程池,在并发消费的模式下,这个线 程池也就是消费消息的方式
- 通过回调方式的模式,提交到 consumeMessageService 中(ConsumeMessageConcurrentlyService 实现类),进入 submitConsumeRequest 方法,这个 就是提交偏移量的处理。
- 再直接进入 ConsumeMessageConcurrentlyService 中的内部类 ConsumeRequest 中的 run 方法
3.1 判断 processQueue 的 dropped 属性
3.2:拿到业务系统定义的消息监听 listener
3.3 判断是否有钩子函数,执行 before 方法
3.4 调用 resetRetryTopic 方法设置消息的重试主题
3.5 执行 listener.consumeMessage,业务系统具体去消费消息,
3.6 对消费结果的处理,进入 processConsumeResult 方法
3.6.1 集群模式下
3.6.2 失败的消息进入一个 List
3.6.3 消费失败的数据会重新建立一个数据,使用一个定时任务,再次到 Client 中的消费者启动流程 . 源码跟踪 step 6(重试消息的 时候会创建一个条新的消息,而不是用老的消息)
核心关键点
消费失败的数据会重新建立一个数据,使用一个定时任务,重试消息的时候会创建一个条新的消息,而不是用老的消息。
总结
不管是消费成功还是消费失败的消息,都会更新消费进度,首先从 processQueue 中移除所有消费成功的消息并返回 offset,这里要注意一点,就是这 个 offset 是 processQueue 中的 msgTreeMap 的最小的 key,为什么要这样做呢?因为消费进度的推进是 offset 决定的,因为是线程池消费,不能保证先消 费的是 offset 大的那条消息,所以推进消费进度只能取最小的那条消息的 offset,这样在消费端重启的时候就可能会导致消息重复消费。