DDD领域驱动/CQRS读写分离/ES事件溯源 这些前沿的时髦的技术理念汇聚在一次,落地到一套完整实现方案。这就是Axon
我们从ES事件溯源开始说
- 传统的数据库设计只记录数据的当前状态,对于输入如何到达当前状态的过程信息则并不重视。
- 而ES采用了完全不同的记录方式,它记录所有改变当前状态的事件。并从这些事件累加出当前状态。它于传统方式相比有如下特性:
- 更好的追溯问题的起因。(传统方式只能到log中找)
- 更大的数据存储(是缺点也是优点,有利于数据挖掘)
- 对于事件只插入不更新。没有锁,吞吐量更高
- 更复杂的实现
- ES还有一种辅助机制来减少溯源的计算量,定期做一个当前状态的快照。事件在最近的快照后进行累加
领域驱动中提到过充血/失血模型
- 传统实现是失血模型,数据和行为是分离的。数据存储在数据库中,使用时加载到应用服务器,然后处理之后再存入数据库。
- 而真正面相对象的设计中,数据和行为是在一起的(比如Actor模型),即服务层即执行逻辑也持有数据。
- 当然传统的失血模型有它的好处。即服务是无状态的,可以方便的做负载扩展。Axon的充血模型是怎么处理这个问题的,之后再深入。
- CQRS读写分离。如果实施了事件溯源,那么数据库里就只保存事件了,正常的业务查询是不能直接使用这些数据的。这个时候就需要视图表来展示数据,视图表都是从事件表转换而来的。
当然DDD领域驱动/CQRS读写分离/ES事件溯源这些不是强制绑定的,可以随意一选择,下面是Axon的架构图:
下面我们用axon来实践开发一个游戏(代码使用的是Axon4.0)
首选我们创建一个领域对象——玩家Player
@Aggregate
public class Player{
@AggregateIdentifier
private String id;
private String name;//名字
private PlayerStatus status;//状态
@CommandHandler
public Player(CreatePlayerCmd cmd) {
AggregateLifecycle.apply(new PlayerCreatedEvent(
IdentifierFactory.getInstance().generateIdentifier()
, cmd.getPlayerName(), new Date()));
}
@EventSourcingHandler
private void on(PlayerCreatedEvent event) {
this.id = event.getId();
this.name = event.getPlayerName();
this.status = PlayerStatus.ACTIVE;
}
}
- @Aggregate会被Spring加载并自动赋予axon相关的功能
- @AggregateIdentifier 指定聚合根的ID,必须要有
- @CommandHandler 用来处理外部触发的行为和计算
- @EventSourcingHandler 用作状态更改和事件回放(通过事件重放回复当前状态)
接着我们实现一个随着时间自然消耗的功能
EventSourcingHandler
private Date lastSustainLivingDate;//上次维持生存的计算时间节点
@CommandHandler
public void handle(SustainLivingCmd cmd){
AggregateLifecycle.apply(new SustainLivingEvent(cmd.getTiggerDate(), lastSustainLivingDate));
}
@EventSourcingHandler
protected void on(SustainLivingEvent event) {
this.lastSustainLivingDate = event.getTriggerDate();
}
- 写到这里发现每实现一个功能是比较繁琐的。要创建两个方法,一个命令对象,一个事件对象。中间是一些繁琐的值传递。
- AggregateLifecycle.apply()做了几件事情
- 调用实例自己的@EventSourcingHandler
- 通过EventBus存储和传播事件
- 外部事件监听器监听,主要是CQRS中的查询视图,更新试图数据。
PlayerEntry保存了所有玩家的列表幷提供查询,它监听PlayerCreatedEvent
@Component
public class PlayerEntryListener {
@Autowired
private PlayerEntryRepository playerEntryRepository;
@EventHandler
public void on(PlayerCreatedEvent event) {
PlayerEntry playerEntry = new PlayerEntry();
playerEntry.setId(event.getId());
playerEntry.setName(event.getPlayerName());
playerEntryRepository.save(playerEntry);
}
}
Command执行过程:
- Command通过org.axonframework.commandhandling.CommandBus分发,其中DistributedCommandBus实现了分布式的派发,它可以适配SpringCloud. 主要逻辑是通过
AggregateIdentifier做一致性HASH。确保次对同一个聚合根发到同一台机器,这样保证了缓存的命中。Aggregate本质上在应用内缓存了当前状态,如果该服务宕机。另一台机器会通过事件溯源重新构造出当前状态。
- Command的事务处理:Axon通过UnitofWork控制一致性,其中嵌入了JDBC事务。并且在回滚时会清除Aggregate缓存,下次会重新加载。
- 当然对于远程的子事务无法保证一致性。这是个大的隐患
Saga最终一致性,补偿机制。
- Saga的理念没问题,当是实现起来问题很大。
- 一个是每个command都要创建event非常繁琐,并且saga来来回回非常多
- 另一个是没有考虑到幂等,本地宕机,状态,分布事务等细节的处理。自己实现也不灵活
- 确保分布式事务一致性是个非常繁琐的事情,请参考转账交易这件小事,是如何被程序员玩坏的.
说说其他实现困难的情况
- 如果Aggregate存在继承关系,或者实现一些通用的行为接口。框架并不支持
- 无法实现延迟触发的功能
- 对大量聚合实现批量操作不太容易
- 几乎所有的操作都要靠command触发,然后转成event,非常繁琐。
取其思想,寻找替代
在交易中的订单模型,本质上就是ES中的Event. 账户余额是用订单累加出来的。只不过订单不止记录事件。还用来记录更新审核状态,系统状态,事务状态等,不那么纯粹。通过订单回溯账户状态是个人工过程而不是自动的。
- 在这方面,Axon对事件的存储采用统一的表,事件序列化到表中,并不有利于数据库的查看。需要额外的记录一个查询视图
- 快照模型也在交易对账中有所体现。每日对账后生成当日的一个快照,第二天的交易从快照开始累计
Command模型配合只insert没有update的操作,可以在高并发下实现无锁处理。这个可以通过Kafka。或者数据库先插入,后异步读取处理的方式来实现。
充血模型是Axon的一个特色。不过并不难实现。通过自定义一个SpringCloud LoadBalancer Rule 实现哈希一致也可以实现。另外传统的失血模型使用无状态服务更简单,它可以在数据库层面通过一致性hash来存储数据,比如mongodb。对于订单这样的单一记录低频操作完全可以处理。对于同一记录的高频处理Axon也是有其优势的,不过Akka可能也是不错的选择
Saga异常难写,在网络异常,幂等,重试方面并不友好
总结:DDD领域驱动/CQRS读写分离/ES事件溯源,这些思想都很棒,但是并不用拘泥于特定的实现。Axon对于学习这些概念具有非常大的帮助,但是在实践的过程中幷不高效、灵活