import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Sink, Source}
object SubStreamExample {
implicit val system = ActorSystem("QuickStart2")
implicit val materializer = ActorMaterializer()
def main(args: Array[String]): Unit = {
// exampleOne()
// mergeStream()
// mergeSubstreamsWithParallelism()
// splitWhen()
flatMapConcatExample()
// flatMapMergeExample()
}
def exampleOne(): Unit ={
//分成3组;然后每一组后面都添加一个sink;3个sink造成每个sink轮流输出是不能保证输出循序的
Source(1 to 10).groupBy(3, _ % 3).to(Sink.ignore).run()
//1 4 7 10 (1 + 4 + 7 + 10 ) = 22
//2 5 8 (2 + 5 + 8) = 15
//3 6 9 (3 + 6 + 9) = 18
Source(1 to 10).groupBy(3, _ % 3).flatMapConcat( i => Source(List.fill(1)(i))).reduce((x,y) => x +y).to(Sink.foreach(println)).run() //22 28 15
}
def mergeStream(): Unit ={
//groupBy(3, _ % 3) 根据后面的函数,将输入流进行分组;第一个参数最大子流数量
//mergeSubstreams;合并3组到一个组;顺序是不能保证的
Source(1 to 10)
.groupBy(3, _ % 3)
.mergeSubstreams
.runWith(Sink.foreach(println))
}
def mergeSubstreamsWithParallelism(): Unit ={
//groupBy(3, _ % 3) 根据后面的函数,将输入流进行分组;第一个参数最大子流数量
//mergeSubstreamsWithParallelism;limit the number of active substreams running and being merged at a time
//如果并行度小于子流数,会发生阻塞;第三类元素出来之后没有子流;会一直阻塞;其他的输出流的元素也不能出来;得到有效处理
Source(1 to 10)
.groupBy(3, _ % 3)
.mergeSubstreamsWithParallelism(2)
.runWith(Sink.foreach(println))
//concatSubstreams is equivalent to mergeSubstreamsWithParallelism(1)
Source(1 to 10)
.groupBy(3, _ % 3)
.concatSubstreams
.runWith(Sink.ignore)
}
def splitWhen(): Unit ={
val text =
"This is the first line.\n" +
"The second line.\n" +
"There is also the 3rd line\n"
//splitWhen 和 splitAfter 返回true时,都会产生一个新的子流;when是当前元素进入新的子流;after是当前的下一个元素进入子流
val charCount = Source(text.toList)
.splitAfter { _ == '\n' }
.filter(_ != '\n')
.map(_ ⇒ 1)
.reduce(_ + _)
.to(Sink.foreach(println))
.run()
}
def flatMapConcatExample(): Unit ={
// List.fill(3)(1) 一个list,充满3个1元素
//flatMapConcat 处理多个子流;处理完后一个流,再处理另一个流; 1 to 2;起两个流,完成后流作废
Source(1 to 2)
.flatMapConcat(i ⇒ Source(List.fill(3)(i)))
.runWith(Sink.foreach(println))
}
def flatMapMergeExample(): Unit ={
//flatMapMerge 多个子流处理数据;第一个参数为宽度;子流个数
Source(1 to 2)
.flatMapMerge(3, i ⇒ Source(List.fill(3)(i)))
.runWith(Sink.foreach(println))
}
}
akka Stream SubStreams 操作
©著作权归作者所有,转载或内容合作请联系作者
- 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
- 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
- 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
推荐阅读更多精彩内容
- 这是Stream操作的中的最后一步,终止操作,如果没有看过前面的创建和中间操作的同学可以去看看。ok,接下来我们直...
- 题目描述 LeetCode 500 Example 1: Note: You may use one charac...