MASA Framework - EventBus设计

概述

利用发布订阅模式来解耦不同架构层级,亦可用于解决隔离业务之间的交互

优点:

  • 松耦合
  • 横切关注点
  • 可测试性
  • 事件驱动

发布订阅模式

发布者通过调度中心将消息发送给订阅者。调度中心解决发布与订阅者之间的关系,保证消息可以送达订阅者手中。

发布者与订阅者互不相识,发布者只管向调度中心发布消息,而订阅者只关心自己订阅的消息类型

event bus design.png

多订阅者保序执行

在常见的发布订阅模式中,的确很少见到类似的说法。但在实际业务中我们会有类似的需求,一个消息由调度中心协调多个订阅者按照顺序执行消息,同时还可以将上一个订阅者处理过的消息传递给下一个订阅者。这样既可以保留发布订阅模式的特性,又有了顺序执行逻辑的特性。

一个小思考:如果 EventBus 的配置支持动态调整的话,是否业务的执行顺序也可以被动态排列组合?

换句话说它或许可以为进程内工作流提供了一个可能性

event bus design - keep order.png

Event Sourcing(事件溯源)

一种事件驱动的架构模式,可以用于审计和溯源

  • 基于事件驱动架构
  • 以事件为事实
  • 业务数据由事件计算产生的视图,可以持久化也可以不持久化

CQRS(命令查询的责任分离)

CQRS 是一种架构模式,能够使改变模型与查询模型的实现分离

cqrs.png

Event Sourcing & CQRS

事件溯源可以与 CQRS 很好的配合

  • 在 Command Handler 中持久化事件到 Event Store 的同时实时计算一个最终视图给 View DB 用于查询展示
  • 在 Query 中既可以通过 View DB 获取最新状态,也可以通过 Event Store 来重放事件来校验 View 或用于更严谨的业务
event sourcing cqrs.png

Saga

Saga 是一个长活事务被分解成可以交错运行的子事务集合。其中每个子事务都是一个保持数据库一致性的真实事务

  • 每个 Saga 由一系列 sub-transaction Ti 组成
  • 每个 Ti 都有对应的补偿动作 Ci,补偿动作用于撤销 Ti 造成的结果

两种执行顺序

  • T1, T2, T3...[Tx retry]...,Tn
  • T1, T2, ..., Tj, Cj,..., C2, C1

两种恢复策略

  • backward recovery,向后恢复,补偿所有已完成的事务,如果任一子事务失败。即上面提到的第二种执行顺序,其中 j 是发生错误的 sub-transaction,这种做法的效果是撤销掉之前所有成功的 sub-transation,使得整个 Saga 的执行结果撤销
  • forward recovery,向前恢复,重试失败的事务,假设每个子事务最终都会成功。适用于必须要成功的场景,执行顺序是类似于这样的:T1, T2, ..., Tj(失败), Tj(重试),..., Tn,其中 j 是发生错误的 sub-transaction。该情况下不需要 Ci

BuildingBlocks 的类视图

作为接口标准,BuildingBlocks 中并没有过多的干涉实现方式,它只保留了最基础的功能流程限制,以达到最小 EventBus 的功能集合。至于最终是基于接口还是特性来实现订阅关系的,交还给 Contrib 自行决定。

事件

用于本地事件的发布/订阅

  • IEvent:事件接口,IEvent<TResult>为带返回值的基础事件接口
  • IEventHanldler<TEvent>:事件处理器接口,ISagaEventHandler<TEvent>为 Saga 的实现提供了基础接口要求
  • IMiddleware<TEvent>:中间件接口,允许在事件执行前挂载预处理动作和时间执行后的收尾动作
  • IEventBus:事件总线接口,用于发送事件,并提供订阅关系维护和附加功能执行

[图片上传失败...(image-c47e65-1642142057856)]

集成事件

用于跨进程事件的发布/订阅

  • IntegrationEventLog:集成事件日志,用于实现本地消息表的消息模型
  • IIntegrationEventLogService:集成事件日志服务接口
  • ITopic:发布/订阅的主题
  • IIntegrationEvent:集成事件接口
  • IIntegrationEventBus:集成事件总线,用于跨进程调用的事件总线

[图片上传失败...(image-e512f9-1642142057856)]

CQRS

用于使改变模型与查询模型的实现分离

  • IQuery<TResult>:查询的接口
  • IQueryHandler<TCommand,TResult>:查询处理器接口
  • ICommand:可用于增删改等指令的接口
  • ICommandHandler<TCommand>:指令处理器接口
cqrs code map.png

Event Bus

要完成上述的这些功能,我们需要借助于 EventBus,它需要有以下基础功能

  • 接收事件
  • 维护订阅关系
  • 转发事件

接收与转发事件

这两个功能其实可以合并为一个接口,由发布者调用 Publish,再由 Event Bus 根据订阅关系转发即可

维护订阅关系

在.Net 项目中,我们常见的用于扫描自动注册的方式是接口特性

MediatR 支持接口的方式去扫描事件订阅关系,举个例子:IRequestHandler<,>

public class PingHandler : IRequestHandler<Ping, string>
{
    public Task<string> Handle(Ping request, CancellationToken cancellationToken)
    {
        return Task.FromResult("Pong");
    }
}

如果你的代码洁癖程度没有高的离谱,或许你希望是这样

public class NetHandler : IRequestHandler<Ping, string>, IRequestHandler<Telnet, string>
{
    public Task<string> Handle(Ping request, CancellationToken cancellationToken)
    {
        return Task.FromResult("Pong");
    }

    public Task<string> Handle(Telnet request, CancellationToken cancellationToken)
    {
        return Task.FromResult("Success");
    }
}

看着好像还行?如果很多呢?

那有没有办法解决这个问题?

特性!我们来看个例子

public class NetHandler
{
    [EventHandler]
    public Task PingAsync(PingEvent @event)
    {
        //TODO
    }

    [EventHandler]
    public Task TelnetAsync(TelnetEvent @event)
    {
        //TODO
    }
}

似乎我们找到了一个出路

多订阅者保序执行

通过事件层层推进确实可以满足顺序执行的场景,但如果你被大量无限套娃的事件包围的时候或许你需要另外一个出路,看下例子:

public class NetHandler
{
    [EventHandler(0)]
    public Task PingAsync(PingEvent @event)
    {
        //TODO
    }

    [EventHandler(1)]
    public Task LogAsync(PingEvent @event)
    {
        //TODO
    }
}

只要参数是同一个 Event 就会按照 EventHandler 的 Order 顺序执行。

Saga

那执行失败了怎么办,如果两个方法因为其中一个需要调用远程服务而无法跟本地事务结合,能帮我回滚吗?

来吧,SAGA 走起,帮你再做个取消动作,同时还支持重试机制,以及是否忽略当前步骤的取消动作。

我们先来预设一下场景:

  1. 调用 CheckBalanceAsync 来检查余额
  2. 调用 WithdrawAsync, 抛出 exception
  3. 重试 WithdrawAsync 3 次
  4. 调用 CancelWithdrawAsync

代码如下:

public class TransferHandler
{
    [EventHandler(1)]
    public Task CheckBalanceAsync(TransferEvent @event)
    {
        //TODO
    }

    [EventHandler(2, FailureLevels.ThrowAndCancel, enableRetry: true, retryTimes: 3)]
    public Task WithdrawAsync(TransferEvent @event)
    {
        //TODO
        throw new Exception();
    }

    [EventHandler(2, FailureLevels.Ignore, enableRetry: false, isCancel: true)]
    public Task CancelWithdrawAsync(TransferEvent @event)
    {
        //TODO
    }
}

AOP

举个业务场景,给所有 Command 在执行前增加一个参数验证

我们提供了 Middleware,允许像俄罗斯套娃一样(.Net Middleware)做横切关注点的相关的事情

public class LoggingMiddleware<TEvent>
    : IMiddleware<TEvent> where TEvent : notnull, IEvent
{
    private readonly ILogger<LoggingMiddleware<TEvent>> _logger;

    public LoggingMiddleware(ILogger<LoggingMiddleware<TEvent>> logger) => _logger = logger;

    public async Task HandleAsync(TEvent @event, EventHandlerDelegate next)
    {
        _logger.LogInformation("----- Handling command {EventName} ({@Event})", typeof(TEvent).FullName, @event);
         await next();
    }
}

注册 DI

builder.Services.AddTransient(typeof(IMiddleware<>), typeof(LoggingMiddleware<>))

MASA EventBus 完整功能列表

  • 接收事件
  • 维护订阅关系 - 接口
  • 维护订阅关系 - 特性
  • 多订阅者顺序执行
  • 转发事件
  • Saga
  • AOP
  • UoW
  • 自动开启和关闭事务

Integration Event Bus

用于跨服务的 Event Bus,支持最终一致性,本地消息表

Pub/Sub

提供了 Pub Sub 接口,并基于 Dapr Pub/Sub 提供默认实现

本地消息表

提供了本地消息保存和 UoW 联动接口,并基于 EF Core 提供默认实现

使用方法

启用 Dapr Event Bus

builder.Services
    .AddDaprEventBus<IntegrationEventLogService>(options=>
    {
        options.UseUoW<CatalogDbContext>(dbOptions => dbOptions.UseSqlServer("server=localhost;uid=sa;pwd=Password;database=test"))
               .UseEventLog<CatalogDbContext>();
        )
    });

定义 Integration Event

public class DemoIntegrationEvent : IntegrationEvent
{
    public override string Topic { get; set; } = nameof(DemoIntegrationEvent);//dapr topic name

    //todo other properties
}

定义 DbContext(非必须,定义 DbContext 可以将本地消息表与业务事务联动)

public class CustomDbContext : IntegrationEventLogContext
{
    public DbSet<User> Users { get; set; } = null!;

    public CustomDbContext(MasaDbContextOptions<CustomDbContext> options) : base(options)
    {

    }
}

发送 Event

IIntegrationEventBus eventBus; // from DI
await eventBus.PublishAsync(new DemoIntegrationEvent());

订阅 Event(基于 Dapr Pub/Sub 的版本)

[Topic("pubsub", nameof(DomeIntegrationEvent))]
public async Task DomeIntegrationEventHandleAsync(DomeIntegrationEvent @event)
{
    //todo
}

Domain Event Bus

在领域中同时提供 Event Bus 和 Integration Event Bus 的能力,允许实时发送事件或在 Save 时一次性触发

Domain Event Bus 是最完整的能力,所以使用 Domain Event Bus 相当于已经开启了 Event Bus 和 Integration Event Bus,在 Domain Event Bus 内部会自动协调事件分类往 Event Bus 和 Integration Event Bus 分流

启用 Domain Event Bus

builder.Services
.AddDomainEventBus(options =>
{
    options.UseEventBus()//Use in-process events
        .UseUoW<CustomDbContext>(dbOptions => dbOptions.UseSqlServer("server=localhost;uid=sa;pwd=P@ssw0rd;database=idientity"))
        .UseDaprEventBus<IntegrationEventLogService>()///Use cross-process events
        .UseEventLog<LocalMessageDbContext>()
        .UseRepository<CustomDbContext>();
})

添加 DomainCommand

Domain Event 是进程内事件,IntegrationDomainEvent 是跨进程事件

public class RegisterUserSucceededIntegrationEvent : IntegrationDomainEvent
{
    public override string Topic { get; set; } = nameof(RegisterUserSucceededIntegrationEvent);

    public string Account { get; set; } = default!;
}

public class RegisterUserSucceededEvent : DomainEvent
{
    public string Account { get; set; } = default!;
}

进程内事件订阅

[EventHandler]
public Task RegisterUserHandlerAsync(RegisterUserDomainCommand command)
{
    //TODO
}

跨进程事件订阅

[Topic("pubsub", nameof(RegisterUserSucceededIntegrationEvent))]
public async Task RegisterUserSucceededHandlerAsync(RegisterUserSucceededIntegrationEvent @event)
{
    //todo
}

发送 DomainCommand

IDomainEventBus eventBus;//from DI
await eventBus.PublishAsync(new RegisterUserDomainCommand());

使用场景

  • 兼顾遗留系统对接
  • 游走在云与非云中
  • 流计算
  • 微服务解耦和跨集群通信(需要将 Dapr Pub/Sub 改为 Dapr Binding,不难)
  • 部分 AOP 类场景

总结

事件驱动可以解决一些特定场景的问题,凡事都有两面性,在本来就很简单的业务场景中使用如此复杂的模式会带来不小的负担。

学以致用,学无止境。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 206,214评论 6 481
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 88,307评论 2 382
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 152,543评论 0 341
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 55,221评论 1 279
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 64,224评论 5 371
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,007评论 1 284
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,313评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,956评论 0 259
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,441评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,925评论 2 323
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,018评论 1 333
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,685评论 4 322
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,234评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,240评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,464评论 1 261
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,467评论 2 352
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,762评论 2 345

推荐阅读更多精彩内容