Skip to content

Latest commit

 

History

History
587 lines (517 loc) · 31 KB

1.5.md

File metadata and controls

587 lines (517 loc) · 31 KB

#1.5 图的使用 (Working with Graphs) 在akka stream中图计算不是像线性计算那样使用流畅的DSL来表达,而是写出更像图的DSL(例如从笔记中获得设计讨论(这里我理解是讨论中的内容直接画出图形),或者从协议的插图(中获取图纸)),目的是翻译图计算的图纸,使得代码更简洁。在这一章节中我们将深入构建和重复利用图graphs的多种方式,以及解释常见缺陷和如何避免这些缺陷。 当你想执行任何种类的fan-in("多输入")或者fan-out("多输出")操作是,图Graphs是需要的。考虑到线性计算类似于路,我们能用交汇点(junctions)描绘图:多个流在单个点上被连接。有些图操作再简单不过,符合流Flows的线性计算风格,就像concat(连接两个流stream,当第一个stream被消费完后再消费第二个),在Flow或者Source本身被定义速记方法,但是你应该记住,这些依旧是图交汇点的实现。

##1.5.1 构建图( Constructing Graphs) 图是从那些在图graphs或者节点junctions内部为流Flows充当fan-in或者fan-out`节点的简单流中构建出来的。幸好基于交汇点的行为明确了元素是简单易用的而使得交汇点有意义。 akka stream目前提供这些交汇点(内置阶段以及语意的详细概述如下:)

  • Fan-out
  • Broadcast[T] --(一个输入,多个输出)将一个输入发射到每一个输出(就是广播,所有的输出都无差别的获得输入)
  • Balance[T] --(一个输入,多个输出)将一个输出发射到任意的一个输出(就是类似负载均衡,或者说路由选择吧)
  • UnzipWith[In,A,B,...]--(一个输入,多个输出)执行一个将给定输入拆分成多个类型的输出的功能函数,多个输出的类型分别为A,B...
  • UnZip[A,B] --(一个输入,两个输出),把有着(A,B)元素的元组拆分成两个stream,一个类型是A一个类型是B
  • Fan-in
  • Merge[In]--(多个输入,一个输出)从任意输入中选择元素,逐一推送至输出
  • MergePreferred[In]--类似Merge,但是元素可从首选端口采集,否则再从其他任意端口采集(拥有一个优先级高于端口的输入,优先采集该端口的)
  • zipWith[A,B,...,Out]--(多个输入,一个输出)执行一个能将给定的多个类型的输入合并成一个类型的输出元素发射的功能函数。
  • Zip[A,B]--(两个输入,一个输出)将输入的A类型的stream和B类型的stream合并成(A,B)元组类型的输出,是特殊的zipWith。
  • Concat[A]--(两个输入,一个输出),连接两个流(先消费其中一个,再消费另一个)

GraphDSL DSL的一个设计目标是类似在白板上绘制图形,所以很简单的使用它将一个白板上的设计转化为代码,而且可以关联两者。通过翻译下面的手绘图形转化来说明这一点:

手绘

由于每一个线性元素对应一个Flow,并且如果是开始或者结束Flow,每一个圆形对应到Junction或者Source或者Sink,交汇点Junctions必须以定义的类型参数创建,否则任何情况下都会被推断为Nothing类型。

val g = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder: GraphDSL.Builder[Unit] =>
import GraphDSL.Implicits._
val in = Source(1 to 10)
val out = Sink.ignore
val bcast = builder.add(Broadcast[Int](2))
val merge = builder.add(Merge[Int](2))
val f1, f2, f3, f4 = Flow[Int].map(_ + 10)
in ~> f1 ~> bcast ~> f2 ~> merge ~> f3 ~> out
bcast ~> f4 ~> merge
ClosedShape
})

注意:节点Junction引用等同于同样定义的图形节点graph node(在GraphDSL中使用相同的合并实例参照所得到的图的同一位置)


请注意,import GraphDSL.Implicits._在作用域中引入了~>操作符(读作"edge","via"或者"to")以及对应它的相反操作<~(在适当情况下记作在反方向的流flows) 通过上述代码,可以轻易的发现GraphDSL.Builder对象是可变的。(隐式的)通过~>操作使用,同样也是一个可变操作。这样设计的原因是使得创建复杂的图形更简单,甚至包含循环。一旦GraphDSL完成构建,GraphDSL实例是不可变的,线程安全的,易共享的。对于所有的图形构件--sourcessinks以及flows一旦完成构建,都遵循这一点。这意味着你在图处理中可以在多个地方反复使用给定的Flow或者junction。 上面例子中体现了re-use:合并和广播节点通过builder.add(...)被导入图graph, 这个操作将拷贝那些传递给它并且返回拷贝结果的入口和出口以便于它们能连线的蓝图副本。另一种方式是传递任何已经存在的图形进入工厂方法来产生新图形。这些方法的差别是通过builder.add(...)的输忽略导入图的物化值,而通过工厂方法导入允许(将物化值)纳入其中。详细的信息请参照物化流。 在下面的例子中,我们准备由两个平行流streams组成的图形graph,当我们再次使用流Flow的相同实例,但它会作为相应的Sources以及Sinks的两个连接而被适时的物化:

val topHeadSink = Sink.head[Int]
val bottomHeadSink = Sink.head[Int]
val sharedDoubler = Flow[Int].map(_ * 2)
RunnableGraph.fromGraph(GraphDSL.create(topHeadSink, bottomHeadSink)((_, _)) { implicit builder =>
(topHS, bottomHS) =>
import GraphDSL.Implicits._
val broadcast = builder.add(Broadcast[Int](2))
Source.single(1) ~> broadcast.in
broadcast.out(0) ~> sharedDoubler ~> topHS.in
broadcast.out(1) ~> sharedDoubler ~> bottomHS.in
ClosedShape
})

##1.5.2 构建和组合部分图 (Constructing and combining Partial Graphs) 有的时候在一个地方是不可能(或必要)去构建整个的图计算。而是在不同的地方构造它们的所有不同点,在最后将它们连接成一个完整的图形并运行。这样可以实现返回不同的图而非封闭图This can be achieved by returning a different Shape than ClosedShape,例如从给定的GraphDSL.create函数得到FlowShape(in,out)通过阅读Predefined shapes查看预定义图形的详细列表。

构建一个图形Graph、一个可运行图形RunnableGraph需要所有的端口被连接,如果没有(连接上),将会在施工(此处应该是运行时)抛出一个异常,这将有助于避免在图形工作过程中的简单接线错误。然而部分图Partial graph允许你返回在内部线路执行的代码块中尚未连接的一组端口。让我们想像一下,我们想要为用户提供一个将三个int输入压缩获得其中最大值的特定元素。我们希望公开三个输入端口(尚未连接的sources)以及一个输出端口(尚未连接的sink)。

val pickMaxOfThree = GraphDSL.create() { implicit b =>
import GraphDSL.Implicits._
val zip1 = b.add(ZipWith[Int, Int, Int](math.max _))
val zip2 = b.add(ZipWith[Int, Int, Int](math.max _))
zip1.out ~> zip2.in0
UniformFanInShape(zip2.out, zip1.in0, zip1.in1, zip2.in1)
}
val resultSink = Sink.head[Int]
val g = RunnableGraph.fromGraph(GraphDSL.create(resultSink) { implicit b =>
sink =>
import GraphDSL.Implicits._
// importing the partial graph will return its shape (inlets & outlets)
val pm3 = b.add(pickMaxOfThree)
Source.single(1) ~> pm3.in(0)
Source.single(2) ~> pm3.in(1)
Source.single(3) ~> pm3.in(2)
pm3.out ~> sink.in
ClosedShape
})
val max: Future[Int] = g.run()
Await.result(max, 300.millis) should equal(3)

正如你所有看到的,我们首先构造了一个包含所有对流的元素进行压缩和比较的部分图。这个部分图将有三个输入和一个输出,因此我们使用UniformFanInShape。然后我们在将所有未定义的元素重新连接到真正的sources以及sinks的第二步中的封闭图形中将其导入(上一步定义的部分图)。图就可以运行并产生预期的结果。


警告:请注意,关于所有元素是否被正确连接上,GraphDSL是不能提供编译时的类型安全的--这个校验将在图实例的运行时执行检查。一个部分图还确认所有的端口已连接或者作为返回图形的一部分。


##1.5.3从部分图中构造 Sources、Sinks以及Flows (Constructing Sources,Sinks and Flows from Partial Graphs) 有时候使用复杂图形作为简明的结构,比如SourceSink或者Flow而取代那些尚未连接的一批flows或者junctions的部分图形处理是有用的。 实际上,部分连通图的特殊情况的概念是很容易解释的:

  • Source是仅有一个输出,返回SourceShape的部分图
  • Sink是仅有一个输入,返回SinkShape的部分图
  • Flow是仅有一个输入以及一个输出,返回FlowShape的部分图

用简单的元素类似Sink/Source/Flow来隐藏复杂图形能使你更容易的创建复杂图形,并且把它看作线性计算的复合阶段的简单实现。 使用Source.fromGraph方法来从一个图graph中创建Source,我们必须有Graph[SourceShape,T]才能使用它。这个构造使用GraphDSL.create从函数调用中返回SourceShape。单一出口必须提供SourceShape.of方法,这将会与"sink 必须在 Source运行前被联系上"相符合。 参照前面的例子,我们可以创建一个能压缩两个数字的Source,看一下构建过程吧:

val pairs = Source.fromGraph(GraphDSL.create() { implicit b =>
import GraphDSL.Implicits._
// prepare graph elements
val zip = b.add(Zip[Int, Int]())
def ints = Source.fromIterator(() => Iterator.from(1))
// connect the graph
ints.filter(_ % 2 != 0) ~> zip.in0
ints.filter(_ % 2 == 0) ~> zip.in1
// expose port
SourceShape(zip.out)
})
val firstPair: Future[(Int, Int)] = pairs.runWith(Sink.head)

Sink[T]上有着同样的情况,在这种情况下必须提供Inlet[T]类型的值来使用SinkShape.of。如果定义Flow[T],那么必须显示一个inlet和一个outlet:

val pairUpWithToString =
Flow.fromGraph(GraphDSL.create() { implicit b =>
import GraphDSL.Implicits._
// prepare graph elements
val broadcast = b.add(Broadcast[Int](2))
val zip = b.add(Zip[Int, String]())
// connect the graph
broadcast.out(0).map(identity) ~> zip.in0
broadcast.out(1).map(_.toString) ~> zip.in1
// expose ports
FlowShape(broadcast.in, zip.out)
})
pairUpWithToString.runWith(Source(List(1)), Sink.head)

##1.5.4 通过简化的API 结合Sources和Sinks (combining Sources and Sinks with simplified API) 这是一个简化的API以至于你在不使用GraphDSL的情况通过像Broadcast[T]Balance[T]Merge[In]Concat[A]这样的交汇点junctions结合sourcessinks。组合方法关心必要的图的底层构造。在接下来的例子中我们将两个sources组合成一个(fan-in):

val sourceOne = Source(List(1))
val sourceTwo = Source(List(2))
val merged = Source.combine(sourceOne, sourceTwo)(Merge(_))
val mergedResult: Future[Int] = merged.runWith(Sink.fold(0)(_ + _))

这同样能作用于Sink[T],不过这种情况下将是fan-out:

val sendRmotely = Sink.actorRef(actorRef, "Done")
val localProcessing = Sink.foreach[Int](_ => /* do something usefull */ ())
val sink = Sink.combine(sendRmotely, localProcessing)(Broadcast[Int](_))
Source(List(0, 1, 2)).runWith(sink)

##1.5.5 构建可重用的图部件 (Building reusable Graph components) 通过图DSL可以做到构建任意输入和输出端口的可重用的、封装的元素。举个例子,我们将构建代表工作者池(represents a pool of workers)的图节点graph junction,每一个工作者标识一个Flow[I,O,_],也就是说作一个类型I的简单的转化工作,结果是类型O(正如你已经看到的,这个流内部已经包含了复杂的图)。我们可复用的工作者池将不会存储那些进来的任务的订单(它们假定有适当的ID字段),将会使用Balance交汇点来调度任务到可达的工作者。在这之上,我们的交汇点将起"fastlane"的特殊作用,一个已定义端口被用来发送高优先级的任务。 总之,我我们的交汇点将有着类型I(为了普通和高优先级的任务)和类型O的两个输入端口。为了表示这个接口,我们将定义一个特定的形状,下面展示如何实现:

// A shape represents the input and output ports of a reusable
// processing module
case class PriorityWorkerPoolShape[In, Out](
jobsIn: Inlet[In],
priorityJobsIn: Inlet[In],
resultsOut: Outlet[Out]) extends Shape {
// It is important to provide the list of all input and output
// ports with a stable order. Duplicates are not allowed.
override val inlets: immutable.Seq[Inlet[_]] =
jobsIn :: priorityJobsIn :: Nil
override val outlets: immutable.Seq[Outlet[_]] =
resultsOut :: Nil
// A Shape must be able to create a copy of itself. Basically
// it means a new instance with copies of the ports
override def deepCopy() = PriorityWorkerPoolShape(
jobsIn.carbonCopy(),
priorityJobsIn.carbonCopy(),
resultsOut.carbonCopy())
// A Shape must also be able to create itself from existing ports
override def copyFromPorts(
inlets: immutable.Seq[Inlet[_]],
outlets: immutable.Seq[Outlet[_]]) = {
assert(inlets.size == this.inlets.size)
assert(outlets.size == this.outlets.size)
// This is why order matters when overriding inlets and outlets.
PriorityWorkerPoolShape[In, Out](inlets(0).as[In], inlets(1).as[In], outlets(0).as[Out])
}
}

##1.5.6 预定义的图形 (Predefined shapes) 通常一个定制的图形需要能提供所有的输入输出端口,能拷贝自己,能从给定的端口创建新的实例。提供了一些已经定义的形状来避免无谓的引用。

  • SourceShapeSinkShapeFlowShape适用于简单图形。
  • UniformFanInShapeUniformFanOutShape适用于有着相同类型的多个输入(或者输出)端口的交汇点。
  • FanInShape1FanInShape2...、FanOutShape1FanOutShape2...适用于有着不同类型的多个输入(或者输出)端口的交汇点。

由于我们的图形有着两个输入端口和一个输出端口,所以我们可以用FanInShape DSL来定义我们的特殊图形:

import FanInShape.Name
import FanInShape.Init
class PriorityWorkerPoolShape2[In, Out](_init: Init[Out] = Name("PriorityWorkerPool"))
extends FanInShape[Out](_init) {
protected override def construct(i: Init[Out]) = new PriorityWorkerPoolShape2(i)
val jobsIn = newInlet[In]("jobsIn")
val priorityJobsIn = newInlet[In]("priorityJobsIn")
// Outlet[Out] with name "out" is automatically created

此时我们有一个连接代表我们工作者池worker pool的图Graph的图形Shape。首先我们将通过MergePreferred混合进来的普通和高优先级的任务,接着我们将发送这些任务到Balance节点junction,这将fan-out到若干可配置的wokers(flows),最后我们将所有结果混合然后发送它们到我们唯一的输出端口。代码如下:

object PriorityWorkerPool {
def apply[In, Out](
worker: Flow[In, Out, Any],
workerCount: Int): Graph[PriorityWorkerPoolShape[In, Out], Unit] = {
GraphDSL.create() { implicit b 
import GraphDSL.Implicits._
val priorityMerge = b.add(MergePreferred[In](1))
val balance = b.add(Balance[In](workerCount))
val resultsMerge = b.add(Merge[Out](workerCount))
// After merging priority and ordinary jobs, we feed them to the balancer
priorityMerge ~> balance
// Wire up each of the outputs of the balancer to a worker flow
// then merge them back
for (i <- 0 until workerCount)
balance.out(i) ~> worker ~> resultsMerge.in(i)
// We now expose the input ports of the priorityMerge and the output
// of the resultsMerge as our PriorityWorkerPool ports
// -- all neatly wrapped in our domain specific Shape
PriorityWorkerPoolShape(
jobsIn = priorityMerge.in(0),
priorityJobsIn = priorityMerge.preferred,
resultsOut = resultsMerge.out)
}
}
}

现在我们需要做的就是在图中使用我们的特殊节点。接下来的例子中通过简单的字符串和打印它们的结果模拟了一些简单的工作者和任务。实际上我们通过使用add()两次使用了工作者节点的两个实例。

val worker1 = Flow[String].map("step 1 " + _)
val worker2 = Flow[String].map("step 2 " + _)
RunnableGraph.fromGraph(GraphDSL.create() { implicit b =>
import GraphDSL.Implicits._
val priorityPool1 = b.add(PriorityWorkerPool(worker1, 4))
val priorityPool2 = b.add(PriorityWorkerPool(worker2, 2))
Source(1 to 100).map("job: " + _) ~> priorityPool1.jobsIn
Source(1 to 100).map("priority job: " + _) ~> priorityPool1.priorityJobsIn
priorityPool1.resultsOut ~> priorityPool2.jobsIn
Source(1 to 100).map("one-step, priority " + _) ~> priorityPool2.priorityJobsIn
priorityPool2.resultsOut ~> Sink.foreach(println)
ClosedShape
}).run()

##1.5.7 双向流 (Bidirectional Flows) 两个flows来自相反方向的图拓扑通常是有用的。以解码器中传出消息的序列化和传入消息的反序列化为例。另一个这一的阶段是附加头部长度信息到输出数据的帧协议和分析数据帧成原字节流。这两个阶段不得不被组合,作为协议栈的一部分。将一个作用在另一个之上。出于这个目的,存在有着两个开放输入和两个开放输出的类型为BidiFlow的特殊的图。这样类似的图形叫做BidiShape,定义方式如下:

/**
* A bidirectional flow of elements that consequently has two inputs and two
* outputs, arranged like this:
*
* {{{
* +------+
* In1 ~>| |~> Out1
* | bidi |
* Out2 <~| |<~ In2
* +------+
* }}}
*/
final case class BidiShape[-In1, +Out1, -In2, +Out2](in1: Inlet[In1 @uncheckedVariance],
out1: Outlet[Out1 @uncheckedVariance],
in2: Inlet[In2 @uncheckedVariance],
out2: Outlet[Out2 @uncheckedVariance])extends Shape {
  // implementation details elided ...
}

双向流的定义就和上面索索的表现在解码器上的单向流一样:

trait Message
case class Ping(id: Int) extends Message
case class Pong(id: Int) extends Message
def toBytes(msg: Message): ByteString = {
// implementation details elided ...
}
def fromBytes(bytes: ByteString): Message = {
// implementation details elided ...
}
val codecVerbose = BidiFlow.fromGraph(GraphDSL.create() { b =>
// construct and add the top flow, going outbound
val outbound = b.add(Flow[Message].map(toBytes))
// construct and add the bottom flow, going inbound
val inbound = b.add(Flow[ByteString].map(fromBytes))
// fuse them together into a BidiShape
BidiShape.fromFlows(outbound, inbound)
})
// this is the same as the above
val codec = BidiFlow.fromFunctions(toBytes _, fromBytes _)

第一个版本类似部分图的构造,然而在1对1转化的简单情况下,类似最后一行那样给出一个简明方便的方法。这两个函数的实现也不难:

def toBytes(msg: Message): ByteString = {
implicit val order = ByteOrder.LITTLE_ENDIAN
msg match {
case Ping(id) => ByteString.newBuilder.putByte(1).putInt(id).result()
case Pong(id) => ByteString.newBuilder.putByte(2).putInt(id).result()
}
}
def fromBytes(bytes: ByteString): Message = {
implicit val order = ByteOrder.LITTLE_ENDIAN
val it = bytes.iterator
it.getByte match {
case 1 => Ping(it.getInt)
case 2 => Pong(it.getInt)
case other => throw new RuntimeException(s"parse error: expected 1|2 got $other")
}
}

通过这个方法,你能很容易的完善任意将一个对象转化为一系列字节的序列化库。 另一个阶段,我们还谈到一点关于反转帧协议,这意味着收到的字节块相当于0或者更多的消息。最好的实现方式是用GraphStage(参阅 Custom processing with GraphStage)。

val framing = BidiFlow.fromGraph(GraphDSL.create() { b =>
implicit val order = ByteOrder.LITTLE_ENDIAN
def addLengthHeader(bytes: ByteString) = {
val len = bytes.length
ByteString.newBuilder.putInt(len).append(bytes).result()
}
class FrameParser extends PushPullStage[ByteString, ByteString] {
// this holds the received but not yet parsed bytes
var stash = ByteString.empty
// this holds the current message length or -1 if at a boundary
var needed = -1
override def onPush(bytes: ByteString, ctx: Context[ByteString]) = {
stash ++= bytes
run(ctx)
}
override def onPull(ctx: Context[ByteString]) = run(ctx)
override def onUpstreamFinish(ctx: Context[ByteString]) =
if (stash.isEmpty) ctx.finish()
else ctx.absorbTermination() // we still have bytes to emit
private def run(ctx: Context[ByteString]): SyncDirective =
if (needed == -1) {
// are we at a boundary? then figure out next length
if (stash.length < 4) pullOrFinish(ctx)
else {
needed = stash.iterator.getInt
stash = stash.drop(4)
run(ctx) // cycle back to possibly already emit the next chunk
}
} else if (stash.length < needed) {
// we are in the middle of a message, need more bytes
pullOrFinish(ctx)
} else {
// we have enough to emit at least one message, so do it
val emit = stash.take(needed)
stash = stash.drop(needed)
needed = -1
ctx.push(emit)
}

/*
After having called absorbTermination() we cannot pull any more, so if we need
* more data we will just have to give up.
*/
private def pullOrFinish(ctx: Context[ByteString]) =
if (ctx.isFinishing) ctx.finish()
else ctx.pull()
}
val outbound = b.add(Flow[ByteString].map(addLengthHeader))
val inbound = b.add(Flow[ByteString].transform(() => new FrameParser))
BidiShape.fromFlows(outbound, inbound)
})

有了这些实现,我们能构建一个协议栈并且测试它:

/* construct protocol stack
* +------------------------------------+
* | stack |
* | |
* | +-------+ +---------+ |
* ~> O~~o | ~> | o~~O ~>
* Message | | codec | ByteString | framing | | ByteString
* <~ O~~o | <~ | o~~O <~
* | +-------+ +---------+ |
* +------------------------------------+
*/
val stack = codec.atop(framing)
// test it by plugging it into its own inverse and closing the right end
val pingpong = Flow[Message].collect { case Ping(id) => Pong(id) }
val flow = stack.atop(stack.reversed).join(pingpong)
val result = Source((0 to 9).map(Ping)).via(flow).grouped(20).runWith(Sink.head)
Await.result(result, 1.second) should ===((0 to 9).map(Pong))

##1.5.8图的内部访问物化值 (Accessing the materialized value inside the Graph) 在某种情况下,必须返回图(部分图,完全图,或者支持Source,Sink,Flow,BidiFlow)的物化值。builder.materializedValue产生一个被用作图的普通source或者outlet的最终将发射物化值的输出Outlet而使这点变得可能。如果物化值在不止一个地方需要,无论调用多少次的materializedValue都能取得必要的输出

  import GraphDSL.Implicits._
  val foldFlow: Flow[Int, Int, Future[Int]] = Flow.fromGraph(GraphDSL.create(Sink.fold[Int, Int](0)(_ + _))
  {implicit builder =>
    fold =>
      FlowShape(fold.in, builder.materializedValue.mapAsync(4)(identity).outlet)
})

注意不要引入那些物化值实际上促成(其他)物化值的循环。接下来的例子延时了将fold的物化Future反馈给fold自己:

import GraphDSL.Implicits._
// This cannot produce any value:
val cyclicFold: Source[Int, Future[Int]] = Source.fromGraph(GraphDSL.create(Sink.fold[Int, Int](0)(_ + _)) {
  implicit builder =>
    fold =>
      // - Fold cannot complete until its upstream mapAsync completes
      // - mapAsync cannot complete until the materialized Future produced by
      //   fold completes
      // As a result this Source will never emit anything, and its materialited
      // Future will never complete
      builder.materializedValue.mapAsync(4)(identity) ~> fold
      SourceShape(builder.materializedValue.mapAsync(4)(identity).outlet)
})

##1.5.9 图循环、活跃性以及死锁 (Graph cycles, liveness and deadlocks) 在有限流拓扑中的循环需要特别注意避免死锁以及其他活跃度的问题。这部分是几个在流处理图中由死锁的反馈回路引发问题的例子。 第一个例子演示了包含naïve循环的图。图graph从源source获取元素,打印他们,然后广播这些元素到消费者(此时我们只要使用Sink.ignore)使用一个Merge节点通过混入主干main stream来反馈。


注意:图DSL允许连接方向反转,当写循环的时候这一点是非常便利的--正如我们看到的,很多情况下这很有用


// WARNING! The graph below deadlocks!
RunnableGraph.fromGraph(GraphDSL.create() { implicit b =>
  import GraphDSL.Implicits._
 
  val merge = b.add(Merge[Int](2))
  val bcast = b.add(Broadcast[Int](2))
 
  source ~> merge ~> Flow[Int].map { s => println(s); s } ~> bcast ~> Sink.ignore
            merge                    <~                      bcast
  ClosedShape
})

通过运行我们观察到在一些数字被打印后,没有更多的元素被输出到控制台--一段时间后,所有的处理将停止。经过一番调查,我们发现:

  • 通过从source混合元素,在循环中增加了元素流动的数目。
  • 通过广播返回循环,没有降低在循环中的元素数目。 由于Akka stream(和 Reactive Stream一样)保证的限制处理(详细的信息查看"Buffering"一节),这意味着超过时间跨度,只有数量有限的元素被缓冲。由于循环获得了越来越多的元素,最终所有的内部缓冲区都会变满,不断的backpressure源。为了能处理更多来自source的元素,需要以某种方式跳出循环。 如果我们通过用MergePreferred替换Merge节点的方式定义反馈回路,这将可以避免死锁。MergePreferred是非公平的,因为它总是在尝试优先级级别较低的端口前,如果优先级高的输入端口有元素,将试图从优先级高的输入端口消费。由于我们的反馈通过高优先级的端口,将总能保证在循环中的元素是可流动。
// WARNING! The graph below stops consuming from "source" after a few steps
RunnableGraph.fromGraph(GraphDSL.create() { implicit b =>
  import GraphDSL.Implicits._
 
  val merge = b.add(MergePreferred[Int](1))
  val bcast = b.add(Broadcast[Int](2))
 
  source ~> merge ~> Flow[Int].map { s => println(s); s } ~> bcast ~> Sink.ignore
            merge.preferred              <~                  bcast
  ClosedShape
})

如果我们运行这个例子,我们将看到相同的数字序列被一遍又一遍的打印,但是处理不会停止。因此我们避免了死锁,但source仍然不断的处于backpressure,因为缓冲空间再也没有恢复:我们唯一能看到的行为是一对来自source的初始元素。


注意:在这里我们看到的是,在某种情况下,我们需要在有界性和活跃性之间做出选择。在第一例子中,如果在环路中有个无限缓冲器,将不会造成死锁,反之亦然,如果元素在循环中是平衡的(有些元素将被移除,有些元素将被注入),那么也不会有死锁。 ——— 我们可以在反馈回路中引入dropping元素来保证我们的循环活跃(非死锁)且公平。在这种情况下,我们给buffer()操作一个OverflowStrategy.dropHead的丢弃策略:

RunnableGraph.fromGraph(GraphDSL.create() { implicit b =>
import GraphDSL.Implicits._
val merge = b.add(Merge[Int](2))
val bcast = b.add(Broadcast[Int](2))
source ~> merge ~> Flow[Int].map { s => println(s); s } ~> bcast ~> Sink.ignore
merge <~ Flow[Int].buffer(10, OverflowStrategy.dropHead) <~ bcast
ClosedShape
})

如果运行这个例子,我们可以看到:

  • 元素的流动不会停止,总会有元素被打印
  • 我们看到一些数字随着时间的推移将被多次打印,但平均数量在长期的增长 这个例子强调了一个通过丢弃元素(其中元素的数目是无限的循环)避免在潜在不平衡循环的情况下死锁的解决方案。另一种方案是通过OverflowStrategy.fail定义一个更大的缓冲区:在所有的缓冲区被消耗时破坏流stream而不是锁住它。 我们发现,在前面的例子中,核心为题是在反馈回路上的不均衡性。我们通过加入dropping元素来规避这个问题。但现在在开始阶段建立一个平衡的循环来替代上面的方案,我们通过用ZipWith取代Merge节点修改我们第一个图来实现这个目的。由于ZipWith从源和反馈回路提取元素注入到循环,因此我们保持了元素的平衡。
// WARNING! The graph below never processes any elements
RunnableGraph.fromGraph(GraphDSL.create() { implicit b =>
import GraphDSL.Implicits._
val zip = b.add(ZipWith[Int, Int, Int]((left, right) => right))
val bcast = b.add(Broadcast[Int](2))
source ~> zip.in0
zip.out.map { s => println(s); s } ~> bcast ~> Sink.ignore
zip.in1 <~ bcast
ClosedShape
})

然而,当我们尝试运行例子,事实证明所有的元素没有被打印!经过一番调查,我们认识到:

  • 为了从源source到循环cycle得到第一个元素,我们需要一个已经存在在循环中的元素。
  • 为了得到循环的初始元素,我们需要从源获取元素 这两个条件是典型的“先有鸡还是先有蛋的问题”。解决措施是为循环注入一个独立于源的初始元素。为此我们通过在逆回路backwards arc中使用Concat节点,通过Source.single注入一个元素。
RunnableGraph.fromGraph(GraphDSL.create() { implicit b =>
import GraphDSL.Implicits._
val zip = b.add(ZipWith((left: Int, right: Int) => left))
val bcast = b.add(Broadcast[Int](2))
val concat = b.add(Concat[Int]())
val start = Source.single(0)
source ~> zip.in0
zip.out.map { s => println(s); s } ~> bcast ~> Sink.ignore
zip.in1 <~ concat <~ start
concat <~ bcast
ClosedShape
})

当我们运行上述例子,我们将看到处理开始并永不停止。从这个例子侧面看出:平衡的循环往往需要一个"kick-off"(揭幕,开始)元素被注入循环。


*注意:*由于2.x 有了融合特性,而这里需要将物化融合特性关闭,或者加入异步边界才能正常工作 具体代码为

  implicit  val system = ActorSystem("lock")
  implicit  val mat = ActorMaterializer(ActorMaterializerSettings(system).withAutoFusing(false))
  val source = Source(1 to 10)
  // WARNING! The graph below stops consuming from "source" after a few steps
  RunnableGraph.fromGraph(GraphDSL.create() { implicit b =>
    import GraphDSL.Implicits._

    val merge = b.add(MergePreferred[Int](1))
    val bcast = b.add(Broadcast[Int](2))

   source ~> merge ~> Flow[Int].map{s => println(s);s} ~> bcast ~> Sink.ignore
    merge.preferred <~ bcast
    ClosedShape
  }).run()

   Thread sleep 1000
  system.shutdown()

或者

 implicit  val system = ActorSystem("lock")
  implicit  val mat = ActorMaterializer()
//  implicit  val mat = ActorMaterializer(ActorMaterializerSettings(system).withAutoFusing(false))
  val source = Source(1 to 10).withAttributes(Attributes.asyncBoundary)
  // WARNING! The graph below stops consuming from "source" after a few steps
  RunnableGraph.fromGraph(GraphDSL.create() { implicit b =>
    import GraphDSL.Implicits._

    val merge = b.add(MergePreferred[Int](1).withAttributes(Attributes.asyncBoundary))
    val bcast = b.add(Broadcast[Int](2).withAttributes(Attributes.asyncBoundary))

    source ~> merge ~> Flow[Int].map{s => println(s);s} ~> bcast ~> Sink.ignore
    merge.preferred <~ bcast
    ClosedShape
  }).run()

  Thread sleep 1000
  system.shutdown()