3 使用Graph DSL实现扇入扇出
到目前为止,我们只研究了一个输入和一个输出的线性处理。Akka-stream提供了用于描述扇入和扇出的图DSL(领域专用语言),能够定义有大量输入和输出的图。图形 DSL 差不多是一种图解 ASCII-许多情况下, 你可以将图形的白板图转换为 DSL。
有许多扇入和扇出GraphStages,可用于创建各种图形,例如Source、FLow、Sink。也可以创建你自己的自定义GraphStages 。
你以使用图形 DSL 创建任意Shape 的图形。在akka-stream方面,Shape定义了图有多少输入和输出(这些输入和输出称作入口(Inlets) 和出口(Outlets))。在下面的例子中,我们将创建一个Flow-shaped 图,所以它可用于像前面的POST路由。内部它使用了一个扇出形状。
3.1 广播到流
继续上一章的例子,我们按照日志事件的状态将日志事件进行拆分(一个Sink用于所有的错误,一个用于所有的警告,等等),以便每次对一个或多个状态发出GET请求时,不必过滤这些事件。下图展示了BroadcastGraphStage 如何将事件发送到不同的流。
图DSL提供了GraphDSL.Builder 来创建图中的节点,并且 ~> 方法用于将节点连接在一起,很像via方法。图中的节点是类型图, 在引用图的某个部分时可能会混淆, 因此在某些情况下, 我们将使用术语 "node"。
下面展示了代码中上图如何定义。也展示了从图开放入口和出口中定义一个流。
探究Graphs和Shapes
processStates返回类型可能不是你所期望的。不是Flow[Event, ByteString, NotUsed]类型,而是Graph[FlowShape[Event, ByteString], NotUsed]。事实上,Flow[-In, +Out, +Mat]继承于Graph[FlowShape[In, Out], Mat]。这表示一个Flow仅仅是一个有预定义Shape的Graph。如果你稍微深入akka-stream源代码,你会发现 FlowShape 是一个只有一个输入和一个输出的Shape。
所有预定义的组件定义的方法类似:所有都定义为带有一个Shape的Graph。例如,Source和Sink分别继承于 Graph[SourceShape[Out], Mat] 和Graph[SinkShape[In], Mat]。
参数builder是一个GraphDSL.Builder,它是可变的。它只打算在这里使用匿名函数来建立一个图。GraphDSL.Builder的add方法返回一个Shape,它描述了一个图的入口和出口。
过滤流写入到不同文件,如logFileSink(logId, state) 方法调用所示。例如,对于文件logId1的错误,将追加一个1-errors文件。
在POST路由里使用processStates:
src.via(processStates(logId))
.toMat(logFileSink(logId))(Keep.right)
.run()
用于返回错误日志文件的GET路由与普通的GET路由类似,除了一个命名约定(用于从[log-id]-error 文件读取)。
下一节,我们将看看合并源,以便可以返回合并在一起的所有日志,或者仅是某日志文件所有状态不是OK的日志事件。
3.2 合并流
让我们看看用于合并源的图。在第一个例子中,我们合并了所有状态不是OK的事件到一个文件中。 对/logs/[log-id]/not-ok 进行GET将返回所有状态不是OK的事件。下图展示了MergeGraphStage 如何将三个Sources合并为一个。
下面的代码展示了MergeGraphStage 如何应用图DSL。它定义了一个mergeNotOK方法,将特定 logId 的所有非OK日志源合并到一个源中。
注意,warning, error 和 critical 源,首先通过了一个JSON帧化流,否则, 您可以读取任意 ByteStrings 并将它们合并在一起, 从而导致 JSON 输出出现乱码。
三个源由带有三个入口的MergeGraphStage 合并。SourceShape 由merge.out 出口创建。Source有一个fromGraph 方便方法,将带有SourceShape 的Graph转化为Source。
MergePreferred GraphStage
MergeGraphStage 从其任何输入中随机取元素。Akka-stream 还提供了一个 MergePreferredGraphStage, 它有一个输出端口、一个首选输入端口, 以及零个或多个次级端口。当其中一个输入有可用的元素时, MergePreferred 会发出, 如果多个输入有可用的元素, 则倾向于首选的输入。
mergeNotOk 方法稍后用于getLogNotOkRoute 去创建读取的Source,如下所示
def getLogNotOkRoute =
pathPrefix("logs" / Segment /"not-ok") { logId =>
pathEndOrSingleSlash {
get {
extractRequest { req =>
complete(Marshal(mergeNotOk(logId)).toResponseFor(req))
}
}
}
}
还有一个用于合并源的简化 API, 我们将使用它来合并所有日志。请求GET /logs 将返回所有合并在一起的日志。下面代码展示了如何使用简化的 API。
Source.combine 方法从多个源中创建Source,类似于使用图形DSL进行的操作。mergeSources 用于合并具有相同类型任意数量的源。例如该方法用于/logs路由中,如下所示。
预定义和自定义GraphStages
在akka-stream 中有相当多的预定义 GraphStages, 这里没有展示负载平衡 (-Balance)、压缩 (Zip、ZipWith) 和串联流 (Concat) 等等。这些图的DSL非常类似于已经展示的示例。在所有情况下, 都需要向生成器添加节点,连接形状的入口和出口(由add方法返回),然后从函数返回某个形状,并将其传递给Graph.create 方法。也可以编写自己的自定义 GraphStage, 这超出了akka-stream的介绍性章节的范围。
本节中展示的 BroadcastGraphStage 在任何输出应用背压时都应用背压, 这意味着您只能以最慢的用户可以读取的速度进行广播。下一节将讨论如何使用缓冲来允许生产者和消费者以不同的速度运行, 以及如何在不同速度下运行的生产者和消费者之间进行调解。