Event Publishing & Processing
应用程序生成的事件需要分发给更新查询数据库,搜索引擎或任何其他需要它们的资源的组件:事件处理程序。事件总线负责将事件消息发送给所有感兴趣的组件。在接收端,事件处理器(Event Processors )负责处理这些事件,包括调用相应的事件处理程序。
发布事件
在绝大多数情况下,Aggregates将通过applying 这些事件来发布事件。但是,偶尔我们需要将事件(可能来自另一个组件内)直接发布到事件总线。要发布事件,只需在EventMessage中包装描述事件的payload即可。 GenericEventMessage.asEventMessage(Object)方法允许您将任何对象包装到EventMessage中。如果传递的对象已经是一个EventMessage,它会直接返回。
事件总线
EventBus是一种将事件分发给订阅事件处理程序的机制。 Axon提供了两个Event Bus实现:SimpleEventBus和EmbeddedEventStore。虽然这两个实现都支持订阅和跟踪处理器(请参阅事件处理器),但EmbeddedEventStore会保留事件,以便在稍后阶段重演它们。 SimpleEventBus而不会存储他们,并且一旦将它们发布到订阅的组件,就会“忘记”事件。
使用配置API时,默认使用SimpleEventBus。要配置EmbeddedEventStore,您需要提供一个StorageEngine的实现,它可以实际存储Events。
Configurer configurer = DefaultConfigurer.defaultConfiguration();
configurer.configureEmbeddedEventStore(c -> new InMemoryEventStorageEngine());
Event Processors
Event Handlers主要是指收到事件时要执行的业务逻辑。Event Processors是指处理这些过程的技术方面组件。他们开启一个工作单元,或一个事务,还要确保相关数据可以正确地附加到在事件处理期间创建的所有消息。
Event Processors大致有两种形式:订阅和跟踪。订阅Event Processors订阅自己的事件来源,并由发布机制管理的线程调用。跟踪Event Processors,用它自己管理的线程从源头拉取消息。
Assigning handlers to processors
所有processor 都有一个名称,用于标识跨JVM实例的processor 实例。两个相同名称的processor ,可以看作是同一个processor 的两个实例。
所有 Event Handlers都附加到名称是“ Event Handler”类的包名称的Processor 。
例如下面的类:
● org.axonframework.example.eventhandling.MyHandler,
● org.axonframework.example.eventhandling.MyOtherHandler, and
● org.axonframework.example.eventhandling.module.MyHandler
将创建两个Processors:
● org.axonframework.example.eventhandling with 2 handlers, and
● org.axonframework.example.eventhandling.module with a single handler
Configuration API允许您配置其他策略,以将类分配给processors,甚至可以将特定的实例分配给特定的processors。
配置processors
Processors 关心的是处理事件的技术方面,而不管每个事件触发的业务逻辑如何。然而,“常规”(单例,无状态)事件处理程序的配置方式与Sagas略有不同,因为这些不同对于这两种类型的处理程序都很重要。
Event Handlers
默认情况下,Axon将使用订阅Event Processors。可以使用配置API的EventHandlingConfiguration类来更改处理程序的分配方式以及处理器的配置方式。
EventHandlingConfiguration 类定义了许多可用于定义处理器及如何配置的方法。
registerEventProcessorFactory允许你定义一个默认的工厂方法,创建没有明确定义工厂的事件处理器。
registerEventProcessor(String name, EventProcessorBuilder builder)定义了用于创建一个带有给定名称的处理器的工厂方法。注意,此种处理器只有当名称被选择作为任何可用的事件处理程序bean的处理器时才会创建。
registerTrackingProcessor(String name)定义具有给定名称的processor 应使用默认设置配置为Tracking Event Processor。它默认使用的是TransactionManager和TokenStore.
registerTrackingProcessor(String name,Function <Configuration,TrackingEventProcessorConfiguration> processorConfiguration,Function <Configuration,SequencingPolicy <?super EventMessage <?>>> sequencingPolicy)定义具有给定名称的处理器应配置为Tracking Processor,并使用给定的TrackingEventProcessorConfiguration阅读多线程的配置设置。 SequencingPolicy定义了处理器对事件顺序处理的期望。请参阅并行处理了解更多详情。
usingTrackingProcessors()设置默认Tracking Event Processor,而不是订阅Event Processor。
Sagas
Sagas的配置使用SagaConfiguration类。它提供静态方法来初始化Tracking Processing或Subscribing Processing 的实例。
要配置Saga 在订阅模式下运行,只需:
SagaConfiguration<MySaga> sagaConfig = SagaConfiguration.subscribingSagaManager(MySaga.class);
如果你不想使用默认的EventBus / Store作为这个saga的来源来获取它的消息,你也可以定义另一个消息源:
SagaConfiguration.subscribingSagaManager(MySaga.class, c -> /* define source here */);
subscribingSagaManager()方法的另一种变体允许您传递(构建器)EventProcessingStrategy。默认情况下,Sagas是同步调用的。也可以使用这种方法进行异步。但是,使用跟踪处理程序是异步调用的首选方式。
要配置saga使用跟踪处理器(Tracking Processor),只需:
SagaConfiguration.trackingSagaManager(MySaga.class);
这将是默认属性,这意味着使用单个线程来处理事件。你可以用这种方式来改变:
SagaConfiguration.trackingSagaManager(MySaga.class)
// configure 4 threads
.configureTrackingProcessor(c -> TrackingProcessingConfiguration.forParallelProcessing(4))
TrackingProcessingConfiguration有几个方法允许您指定将创建多少个segments 以及应该使用哪个ThreadFactory来创建处理器(Processor )线程。请参阅并行处理了解更多详情。
查看SagaConfiguration类的API文档(JavaDoc),了解如何配置saga事件处理的完整细节。
Token Store
跟订阅处理器不同,跟踪处理器需要一个令牌存储来存储他们的进度。跟踪处理器通过其事件流接收的每个消息都伴随有一个令牌。这个令牌允许处理器在稍后的时间点重新打开流,从最后一个事件中取出它离开的位置。
配置API使用令牌存储,以及处理器从全局配置实例中需要的大多数其他组件。如果没有显式定义TokenStore,则使用InMemoryTokenStore,这在生产环境中不推荐使用。
要配置不同的令牌存储,请使用Configurer.registerComponent(TokenStore.class,conf - > ... create token store ...)
请注意,您可以重写TokenStore以在各自的EventHandlingConfiguration或定义该处理器的SagaConfiguration中与跟踪处理器一起使用。在通常情况下,建议使用令牌存储,将令牌存储在与事件处理程序更新视图模型相同的数据库中。这样的话在更新视图模型和令牌的存储是原子性的。
Parallel Processing(并行处理)
从Axon Framework 3.1开始,跟踪处理器可以使用多个线程来处理事件流。他们是这样的做的,他们定义了一个叫’segment’的东西,并以数字作为唯一标识符。通常,单个线程将处理单个segment。
你可以定义要使用的Segments 的数量。当Processor 第一次启动时,它可以初始化多个segments。这个数字定义了可以同时处理事件的最大线程数。 TrackingProcessor的每个节点都将尝试启动配置的线程数量,以开始处理这些线程。
Event Handlers可能对事件的排序有特定的期望。如果是这种情况,处理器必须确保这些事件以特定的顺序发送给这些处理程序。 Axon为此使用SequencingPolicy。 SequencingPolicy本质上是一个函数,它为任何给定的消息返回一个值。如果SequencingPolicy函数的返回值对于两个不同的事件消息是相等的,则意味着这些消息必须按顺序处理。默认情况下,Axon组件将使用SequentialPerAggregatePolicy,这使得由相同的聚合实例发布的事件将被顺序处理。
一个Saga实例永远不会被多个线程同时调用。因此,saga的排序政策是无关紧要的。 Axon将确保每个Saga实例都按它们在Event Bus上发布的顺序接收它需要处理的事件。
注意:
请注意, Subscribing Processors 不管理自己的线程。因此,不可能配置他们应该如何接收他们的事件。实际上,它们将始终按顺序进行工作,因为这通常是Command Handling组件中的并发级别。
Multi-node processing
对于跟踪处理器来说,处理事件的线程是否都运行在同一个节点上,或者在托管相同(逻辑)TrackingProcessor的不同节点上运行并不重要。当具有相同名称的两个TrackingProcessor实例在不同的计算机上处于激活状态时,它们被视为同一个逻辑处理器的两个实例。他们将“竞争”事件流的各个部分。每个实例将“声明”一个segment,防止分配给该segment的事件在其他节点上处理。
TokenStore实例将使用JVM的名称(通常是主机名和进程ID的组合)作为缺省nodeId。你可以重写TokenStore的实现来支持多节点处理。
分发事件
在某些情况下,你须要将事件发布到外部系统,比如消息代理。
Spring AMQP
Axon提供了开箱即用的东西,以便将事件传送到AMQP消息代理(如Rabbit MQ)。
Forwarding events to an AMQP Exchange
SpringAMQPPublisher将事件转发给AMQP Exchange。他通常是用EventBus或EventStore的SubscribableMessageSource来进行初始化。
要配置SpringAMQPPublisher,只需将一个实例定义为一个Spring Bean即可。有许多setter方法帮助你完成您期望的效果,如事务支持,发布者确认(如果代理支持)以及exchange 名称。
默认exchange 名称是“Axon.EventBus”。
注意:请注意,exchanges 不会自动创建。您必须声明您希望使用的队列,exchanges 和Bindings 。你可以查看Spring文档以获取更多信息。
Reading Events from an AMQP Queue
Spring对从AMQP Queue读取消息提供了广泛的支持。然而,这需要被“桥接”到Axon上,以便这些消息可以从Axon处理,就好像它们是常规的事件消息一样。
SpringAMQPMessageSource允许事件处理器从队列中读取消息,而不是从事件存储或事件总线。它充当Spring AMQP和这些处理器所需的SubscribableMessageSource之间的适配器。
配置SpringAMQPMessageSource最简单的方法是定义一个重写默认的onMessage方法的bean,并用@RabbitListener对它进行注解,如下所示:
@Beanpublic SpringAMQPMessageSource myMessageSource(Serializer serializer) {
return new SpringAMQPMessageSource(serializer) {
@RabbitListener(queues = "myQueue")
@Override
public void onMessage(Message message, Channel channel) throws Exception {
super.onMessage(message, channel);
}
};
}
Spring的@RabbitListener注解告诉Spring需要为给定Queue上的每个消息(例子中的'myQueue')调用这个方法。该方法只是调用super.onMessage()方法,该方法将事件的实际发布执行给所有已订阅的处理器。
如果要将这个MessageSource注册为subscribe Processors,须要将正确的SpringAMQPMessageSource实例传递 Subscribing Processor的构造函数:
// in an @Configuration file:@Autowiredpublic void configure(EventHandlingConfiguration ehConfig, SpringAmqpMessageSource myMessageSource) {
ehConfig.registerSubscribingEventProcessor("myProcessor", c -> myMessageSource);
}
请注意,Tracking Processors与SpringAMQPMessageSource不兼容。
Asynchronous Event Processing
异步处理事件的推荐方法是使用跟踪事件处理器(Tracking Event Processor)。即使在系统发生故障的情况下(假设事件已经被持久化了),他也能保证处理完所有的事件。
但是,也可以在订阅处理器(SubscribingProcessor)中异步处理事件。为了达到这个目的,SubscribingProcessor必须配置一个EventProcessingStrategy。这个策略可以用来改变如何管理事件监听器的调用。
默认策略(DirectEventProcessingStrategy)在传递事件的线程中调用这些处理程序。这允许处理器使用现有的事务。
另一个Axon提供的策略是AsynchronousEventProcessingStrategy。它使用Executor来异步调用事件监听器(Event Listeners)。
即使AsynchronousEventProcessingStrategy是异步执行,仍然需要按顺序处理某些事件。 SequencingPolicy定义事件是否必须按顺序,并行或两者兼而有之。策略返回给定事件的序列标识符。如果策略为两个事件返回一个相同的标识符,则意味着它们必须由事件处理程序按顺序处理。空序列标识符表示该事件可以与任何其他事件并行处理。
Axon提供了许多可以使用的通用策略:
FullConcurrencyPolicy将告诉Axon这个事件处理程序可以同时处理所有的事件。这意味着事件之间没有任何关系需要按照特定的顺序进行处理。
SequentialPolicy告诉Axon必须按顺序处理所有的事件。事件的处理将在前一个事件的处理完成时开始。
SequentialPerAggregatePolicy将强制从同一个聚合中引发的域事件按顺序处理。但是,来自不同聚合的事件可以同时处理。对于更新数据库表中聚合的细节的事件监听器( event listeners)来说,这通常是一个合适的策略。
除了这些提供的政策,您可以自定义实现。所有策略都必须实现SequencingPolicy接口。该接口定义了一个方法getSequenceIdentifierFor,它返回给定事件的序列标识符。返回相同序列标识符的事件那么表示必须按顺序处理。产生不同序列标识符的事件可以被同时处理。出于性能原因,如果事件可能与任何其他事件并行处理,则策略实现应返回null。这是更快的,因为Axon不必检查事件处理的任何限制。
建议在使用AsynchronousEventProcessingStrategy时显示定义一个ErrorHandler。默认的ErrorHandler传播异常,但是在异步执行中,除Executor之外没有任何东西传播。这可能会导致事件没有被处理。相反,建议使用ErrorHandler报告错误并允许处理继续。 ErrorHandler在SubscribingEventProcessor的构造函数上进行配置,其中还提供了EventProcessingStrategy。