[翻译]akka in action之akka-stream ( 3 使用Graph DSL实现扇入扇出 )

3 使用Graph DSL实现扇入扇出

到目前为止,我们只研究了一个输入和一个输出的线性处理。Akka-stream提供了用于描述扇入和扇出的图DSL(领域专用语言),能够定义有大量输入和输出的图。图形 DSL 差不多是一种图解 ASCII-许多情况下, 你可以将图形的白板图转换为 DSL。

有许多扇入和扇出GraphStages,可用于创建各种图形,例如Source、FLow、Sink。也可以创建你自己的自定义GraphStages 。

你以使用图形 DSL 创建任意Shape 的图形。在akka-stream方面,Shape定义了图有多少输入和输出(这些输入和输出称作入口(Inlets) 和出口(Outlets))。在下面的例子中,我们将创建一个Flow-shaped 图,所以它可用于像前面的POST路由。内部它使用了一个扇出形状。

3.1 广播到流

继续上一章的例子,我们按照日志事件的状态将日志事件进行拆分(一个Sink用于所有的错误,一个用于所有的警告,等等),以便每次对一个或多个状态发出GET请求时,不必过滤这些事件。下图展示了BroadcastGraphStage 如何将事件发送到不同的流。

使用BroadcastGraphStage拆分事件

图DSL提供了GraphDSL.Builder 来创建图中的节点,并且 ~> 方法用于将节点连接在一起,很像via方法。图中的节点是类型图, 在引用图的某个部分时可能会混淆, 因此在某些情况下, 我们将使用术语 "node"。

下面展示了代码中上图如何定义。也展示了从图开放入口和出口中定义一个流。

Broadcast

探究Graphs和Shapes

processStates返回类型可能不是你所期望的。不是Flow[Event, ByteString, NotUsed]类型,而是Graph[FlowShape[Event, ByteString], NotUsed]。事实上,Flow[-In, +Out, +Mat]继承于Graph[FlowShape[In, Out], Mat]。这表示一个Flow仅仅是一个有预定义Shape的Graph。如果你稍微深入akka-stream源代码,你会发现 FlowShape 是一个只有一个输入和一个输出的Shape。

所有预定义的组件定义的方法类似:所有都定义为带有一个Shape的Graph。例如,Source和Sink分别继承于 Graph[SourceShape[Out], Mat] 和Graph[SinkShape[In], Mat]。

参数builder是一个GraphDSL.Builder,它是可变的。它只打算在这里使用匿名函数来建立一个图。GraphDSL.Builder的add方法返回一个Shape,它描述了一个图的入口和出口。

过滤流写入到不同文件,如logFileSink(logId, state) 方法调用所示。例如,对于文件logId1的错误,将追加一个1-errors文件。

在POST路由里使用processStates:

src.via(processStates(logId))
  .toMat(logFileSink(logId))(Keep.right)
  .run()

用于返回错误日志文件的GET路由与普通的GET路由类似,除了一个命名约定(用于从[log-id]-error 文件读取)。

下一节,我们将看看合并源,以便可以返回合并在一起的所有日志,或者仅是某日志文件所有状态不是OK的日志事件。

3.2 合并流

让我们看看用于合并源的图。在第一个例子中,我们合并了所有状态不是OK的事件到一个文件中。 对/logs/[log-id]/not-ok 进行GET将返回所有状态不是OK的事件。下图展示了MergeGraphStage 如何将三个Sources合并为一个。

使用MergeGraphStage合并非OK状态

下面的代码展示了MergeGraphStage 如何应用图DSL。它定义了一个mergeNotOK方法,将特定 logId 的所有非OK日志源合并到一个源中。

合并所有非OK状态

注意,warning, error 和 critical 源,首先通过了一个JSON帧化流,否则, 您可以读取任意 ByteStrings 并将它们合并在一起, 从而导致 JSON 输出出现乱码。

三个源由带有三个入口的MergeGraphStage 合并。SourceShape 由merge.out 出口创建。Source有一个fromGraph 方便方法,将带有SourceShape 的Graph转化为Source。

MergePreferred GraphStage
MergeGraphStage 从其任何输入中随机取元素。Akka-stream 还提供了一个 MergePreferredGraphStage, 它有一个输出端口、一个首选输入端口, 以及零个或多个次级端口。当其中一个输入有可用的元素时, MergePreferred 会发出, 如果多个输入有可用的元素, 则倾向于首选的输入。

mergeNotOk 方法稍后用于getLogNotOkRoute 去创建读取的Source,如下所示

def getLogNotOkRoute =
  pathPrefix("logs" / Segment /"not-ok") { logId =>
    pathEndOrSingleSlash {
      get {
        extractRequest { req =>
          complete(Marshal(mergeNotOk(logId)).toResponseFor(req))
        }
      }
    }
  }

还有一个用于合并源的简化 API, 我们将使用它来合并所有日志。请求GET /logs 将返回所有合并在一起的日志。下面代码展示了如何使用简化的 API。

mergeSources 方法

Source.combine 方法从多个源中创建Source,类似于使用图形DSL进行的操作。mergeSources 用于合并具有相同类型任意数量的源。例如该方法用于/logs路由中,如下所示。

GET /logs

预定义和自定义GraphStages
在akka-stream 中有相当多的预定义 GraphStages, 这里没有展示负载平衡 (-Balance)、压缩 (Zip、ZipWith) 和串联流 (Concat) 等等。这些图的DSL非常类似于已经展示的示例。在所有情况下, 都需要向生成器添加节点,连接形状的入口和出口(由add方法返回),然后从函数返回某个形状,并将其传递给Graph.create 方法。也可以编写自己的自定义 GraphStage, 这超出了akka-stream的介绍性章节的范围。

本节中展示的 BroadcastGraphStage 在任何输出应用背压时都应用背压, 这意味着您只能以最慢的用户可以读取的速度进行广播。下一节将讨论如何使用缓冲来允许生产者和消费者以不同的速度运行, 以及如何在不同速度下运行的生产者和消费者之间进行调解。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,362评论 5 477
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,330评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,247评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,560评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,580评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,569评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,929评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,587评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,840评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,596评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,678评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,366评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,945评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,929评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,165评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 43,271评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,403评论 2 342

推荐阅读更多精彩内容