Skip to content

Latest commit

 

History

History
246 lines (219 loc) · 18.3 KB

1.6.md

File metadata and controls

246 lines (219 loc) · 18.3 KB

#1.6 模块化,组成和层级 (Modularity, Composition and Hierarchy) akka stream为流处理图(stream processing graphs)提供了一套模型,允许灵活的组合可复用的部件。在这一章节我们将从概念和API角度展示它们的样子,显示库的模块化方面。

##1.6.1 基本组成和模块 (Basics of composition and modularity) 在akka stream的任何处理阶段都能被看作一个有着入口和出口的"盒子",元素在达到时被处理然后离开(完成)这个阶段。这个观点看来,Source就是有着单一出口的盒子,而BidiFlow就是仅有两个入口和两个出口的盒子。在下面的图中,我们用盒子的观点说明最常用的阶段。 box

Source,SinkFlow是线性阶段,它们被作为处理阶段的严格链路而使用。 Fan-inFan-out阶段有多个输入或者多个输出。因此它们运行被定义的更复杂的图布局,而不只是链路。BidiFlow阶段通常在关于输入和输出链路被处理的IO的任务中很有用。由于特殊的图形BidiFlow,例如可以很容易的在其他之上堆叠它们来构建分布式协议。在akka中TLS支持就是BidiFlow的实现案例。 这些可复用组件已经允许复杂计算过程的创建。我们所看到的虽然至今没有实现模块化。理想的是,例如,在那些隐藏内部仅仅为了与模块使用者进行交互的端口的可复用组件内部打包更大的图形实体。一个很好的例子是Http服务组件,BidiFlow的内部编码实现是用一个输入-输出input-output端口对port pair来接收和发送ByteString s,在这之上的端口发送和接收HttpRequestHttpResponse实例的客户端TCP连接接口。 下图展示了各种组合阶段,包含各种其他类型的阶段在其中,但是隐藏它们背后的形状,使它们看起来像SourceFlow等。 compose 上面一个有趣的例子是由断开的SinkSource组成的Flow。这可以通过Flowwrap()构造方法实现,需要将两部分作为参数传进去。 BidiFlow的例子表明内部模块可以是任意复杂的,以及以灵活的方式连接来暴露端口。唯一限制的是,封闭模块的端口必须是彼此连接的,或者暴露连接端口。而那些端口的数量需要和要求的形状相匹配,例如Source只允许暴露一个端口,其余的内部端口需要被正确连接。这些机制允许模块的任意嵌套。下面的例子展示了由复合Source和复合Sink构建的RunnableGraph(后者又包含了一个复合FlowcomposeRunnableGraph 上图包含我们还没看到的一个以上的图形,这就是所谓的RunnableGraph。这证明,如果我们连接所有暴露的端口,将不存在更多开放端口,我们将得到一个封闭的模块。这就是RunnableGraph表示的那样。这是Materializer能参与并变成那些执行被描述的任务的运行实体的网络(这里网络应该表示内部运行实体的复杂联系)的形状Shape。事实上,RunnableGraph是模块本身,以及更多图形的一部分(这或许有些令人惊讶)。将封闭图形嵌入更大的图是很少用到的(由于没有开放用于和图的其他部分通信的端口而形成一个孤岛),但这展示统一的底层模型。 如果我们试图建立对应上图的代码片段,我们的第一次尝试可能是这样的:

Source.single(0)
.map(_ + 1)
.filter(_ != 0)
.map(_ - 2)
.to(Sink.fold(0)(_ + _))

然而很清晰,在我们的第一次尝试上没有嵌套,因为库无法揣摩我们意图将复合模块的边界定义在哪里,而这部分的工作是我们的职责。如果我们使用由FlowSourceSink类提供的DSL然后嵌套能通过调用withAttributes()或者named()方法实现。(后者仅仅是增加name属性的一个简写) 下面的代码演示如何实现预期嵌套:

val nestedSource =
Source.single(0) // An atomic source
.map(_ + 1) // an atomic processing stage
.named("nestedSource") // wraps up the current Source and gives it a name
val nestedFlow =
Flow[Int].filter(_ != 0) // an atomic processing stage
.map(_ - 2) // another atomic processing stage
.named("nestedFlow") // wraps up the Flow, and gives it a name
val nestedSink =
nestedFlow.to(Sink.fold(0)(_ + _)) // wire an atomic sink to the nestedFlow
.named("nestedSink") // wrap it up
// Create a RunnableGraph
val runnableGraph = nestedSource.to(nestedSink)

一旦我们隐藏组件的内部(细节),这将表现的像其他类似形状的内置组件。如果我们隐藏组合(组件)的内部(细节),结果是这仅仅和那些被使用过的预定义组件相似: RunnableGraph 如果我们看内建组件和自定义组件的使用,下面代码展示了它们在使用上没有区别:

// Create a RunnableGraph from our components
val runnableGraph = nestedSource.to(nestedSink)
// Usage is uniform, no matter if modules are composite or atomic
val runnableGraph2 = Source.single(0).to(Sink.fold(0)(_ + _))

##1.6.2组成复杂系统 (Composing complex systems) 在上一节,我们探讨了组成composition和分层hierarchy的可能性,但是这远离了非线性、通用图形组件。在akka stream中并没有强制要求流处理布局只能是线性的。对于Source的DSL正如它们最常见的实践方式那样,在创建线性链路的优化上是友好的。还有一种更先进的DSL来构建复杂的图形,如果需要更多的灵活性,这将是可使用的。我们将看这两个DSL的区别只停留在表面: 它们在操作概念上和所有的DSL是统一的,可以很好的结合在一起。 dsl 简图展示了封装了一个意义非凡的流处理网络的RunnableGraph(记住,如果没有移动的端口,那么图是完全的(封闭的),因此可以被物化)。这包含fan-infan-out,直接directed以及间接non-directed的循环。GraphDSL对象的runnable()方法允许创建一般的、封闭的、可运行的图形。例如,简图的网络能通过以下实现:

import GraphDSL.Implicits._
RunnableGraph.fromGraph(GraphDSL.create() { implicit builder =>
val A: Outlet[Int] = builder.add(Source.single(0)).out
val B: UniformFanOutShape[Int, Int] = builder.add(Broadcast[Int](2))
val C: UniformFanInShape[Int, Int] = builder.add(Merge[Int](2))
val D: FlowShape[Int, Int] = builder.add(Flow[Int].map(_ + 1))
val E: UniformFanOutShape[Int, Int] = builder.add(Balance[Int](2))
val F: UniformFanInShape[Int, Int] = builder.add(Merge[Int](2))
val G: Inlet[Any] = builder.add(Sink.foreach(println)).in
C <~ F
A ~> B ~> C ~> F
B ~> D ~> E ~> F
E ~> G
ClosedShape
})

在上面的代码中,我们使用了隐式端口编号功能implicit port numbering feature(使得图形更具可读性以及和简图更像),我们明确的引入Source s,Sink s 以及 Flow s.明确的参照端口是可能的,没有必要通过add导入我们的线性阶段,所以另一个版本可能是:

import GraphDSL.Implicits._
RunnableGraph.fromGraph(GraphDSL.create() { implicit builder =>
val B = builder.add(Broadcast[Int](2))
val C = builder.add(Merge[Int](2))
val E = builder.add(Balance[Int](2))
val F = builder.add(Merge[Int](2))
Source.single(0) ~> B.in; B.out(0) ~> C.in(1); C.out ~> F.in(0)
C.in(0) <~ F.out
B.out(1).map(_ + 1) ~> E.in; E.out(0) ~> F.in(1)
E.out(1) ~> Sink.foreach(println)
ClosedShape
})

与第一部分类似,到目前位置,我们还没有考虑模块化。我们创建了一个复杂的图形,但是布局是平的flat,而不是模块化的。我们将修改例子,通过图DSL创建一个可复用组件。通过GraphDSLcreate()工厂方法可以做到这一点。如果我们从前面的例子中移除了sourcessinks,剩下的就是个部分图: partialGraph 我们能在代码中再次创建类似的图,使用类似以前的DSL:

import GraphDSL.Implicits._
val partial = GraphDSL.create() { implicit builder =>
val B = builder.add(Broadcast[Int](2))
val C = builder.add(Merge[Int](2))
val E = builder.add(Balance[Int](2))
val F = builder.add(Merge[Int](2))
C <~ F
B ~> C ~> F
B ~> Flow[Int].map(_ + 1) ~> E ~> F
FlowShape(B.in, E.out(1))
}.named("partial")

唯一新增加的就是返回builder block的值,而builder block是一个形状Shape。所有的图graphs(包括Source,BidiFlow等)有一个形状Shape,对模块的类型化端口编码实现encodes the typed ports of the module。在我们的例子中,仅有一个输入和输出端口,所以我们可以通过返回的实例定义它有FlowShape。虽然可以创建一个新的形状类型,但通常建议使用那些能匹配的内置形状。 由此产生的图已经是一个正确的封装模块,所以没有必要调用named()来封装图。但给模块名字在很好的方式,以便于调试。 flowlike 由于我们的部分图有着正确的形状,所以能用更简单的,线性的DSL使用它:

Source.single(0).via(partial).to(Sink.ignore)

虽然还不能把它作为一个Flow(比如我们不能在这之上调用.filter方法),但是Flow拥有只是增加DSL到FlowShapewrap()方法。同样的情况出现在SourceSink以及BidiFlow,所以如果图有着正确的形状,那么得到一个简单的DSL是容易的。为了方便起见,也可以跳过部分图的创建而使用简便创造者convenience creator的方法之一。为了证明这一点,我们将创建以下的图: convenienceCreator 上述封闭图形的代码版本可能是这样的:

// Convert the partial graph of FlowShape to a Flow to get
// access to the fluid DSL (for example to be able to call .filter())
val flow = Flow.fromGraph(partial)
// Simple way to create a graph backed Source
val source = Source.fromGraph( GraphDSL.create() { implicit builder =>
val merge = builder.add(Merge[Int](2))
Source.single(0) ~> merge
Source(List(2, 3, 4)) ~> merge
// Exposing exactly one output port
SourceShape(merge.out)
})
// Building a Sink with a nested Flow, using the fluid DSL
val sink = {
val nestedFlow = Flow[Int].map(_ * 2).drop(10).named("nestedFlow")
nestedFlow.to(Sink.head)
}
// Putting all together
val closed = source.via(flow.filter(_ > 1)).to(sink)

注意:所有的图生成器部分将检查生成的图的除了暴露的端口外的所有其他端口是否被连接,如果违反这,将抛出异常。


我们仍在试图证明RunnableGraph是;类似其他的一个组件,可以在图中嵌套。在下面的代码中我们将把一个封闭图嵌入另一个:

val closed1 = Source.single(0).to(Sink.foreach(println))
val closed2 = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder =>
val embeddedClosed: ClosedShape = builder.add(closed1)
// ...
embeddedClosed
})

导入模块的类型表明导入的模块有一个封闭的形状,所以我我们不能连接这个到其他内部的封闭图。然而,这个"岛"被正确的嵌入(由于不能和其他连接而形成了孤岛),可以和模块一样被物化而作为图的一部分。正如我们证明的,这两个DSL是可以完全互操作的,因为是"有着端口的盒子"的简单嵌套结构的编码实现。 DSL之间仅有的不同是在给定抽象级别上有没有可能更为强大。 在流体DSLfluid DSL中嵌入复杂图形是可能的,在较大的,结构复杂(的Flow中),将容易被导入和嵌入。我们也看到,每一个模块都有一个独立的形状(例如SinkSinkShape),DSL被用来创建它(这里应该是Shape)。这种统一描述允许在各种流处理实体的丰富的组合性上有着便捷的处理方式。 ##1.6.3 物化值 (Materialized values) 意识到RunnableGraph就是没有尚未使用端口的模块(是一个"岛屿")后,可以清晰的认识到物化后唯一和运行流处理逻辑沟通是通过一些side-channel。这些side channel被表示为物化值。这种情况类似ActorsProps实例描述actor逻辑,但是通过调用actorOf()来创建实际运行的actor,返回能和运行的actor自身沟通的ActorRef。因此,Props能被复用,每一次调用将返回不同的引用。 当涉及到流streams,每一个物化创建一个新的与RunnableGraph中编码实现蓝图一致的运行网络。为了与运行网络能相互作用,每一个物化需要返回提供了交互能力的不同对象。换句话说,RunnableGraph能看作工厂,可以创建:

  • 外部不可达的运行处理实体的网络
  • 通过网络根据需要提供可控的交互能力的物化值

虽然不像actors,每个处理阶段可能提供一个物化值,所以当我们组成多阶段或者多模块时,我们也需要连接物化值(有使这更容易的默认规则,比如to()via()维护了提取物化值的最常见情况,阅读Combining materialized values以了解更多细节)我们通过代码示例和一个表示发生了什么的简图演示这将如何工作: combinemat 为了实现上述情况,首先我们创建了一个复合Source,其中封闭的SourcePromise[Unit]的物化类型。通过使用组合功能combiner functionKeep.left,物化类型是由嵌套模块而来的(由图上红色区域表示):

// Materializes to Promise[Option[Int]] (red)
val source: Source[Int, Promise[Option[Int]]] = Source.maybe[Int]
// Materializes to Unit (black)
val flow1: Flow[Int, Int, Unit] = Flow[Int].take(100)
// Materializes to Promise[Int] (red)
val nestedSource: Source[Int, Promise[Option[Int]]] =
source.viaMat(flow1)(Keep.left).named("nestedSource")

接下来,我们从两个更小的组件创建一个组合流Flow。在这里,第二个封闭的流Flow有一个Future[OutgoingConnection]的物化类型,我们通过组合功能combiner functionKeep.right把者传播到父节点(在简图中用黄色表示)

// Materializes to Unit                                                   (orange)
val flow2: Flow[Int, ByteString, Unit] = Flow[Int].map { i => ByteString(i.toString) }
 
// Materializes to Future[OutgoingConnection]                             (yellow)
val flow3: Flow[ByteString, ByteString, Future[OutgoingConnection]] =
  Tcp().outgoingConnection("localhost", 8080)
 
// Materializes to Future[OutgoingConnection]                             (yellow)
val nestedFlow: Flow[Int, ByteString, Future[OutgoingConnection]] =
  flow2.viaMat(flow3)(Keep.right).named("nestedFlow")

第三步,我们创建一个复合Sink,通过使用嵌套流nestedFlow作为构建块building block。在这个片段,无论是封闭的Flow还是叠加的Sink的物化值,我们都感兴趣,所以我们用Keep.both来获取作为叠加SinknestedSink的物化值的物化值对(在简图中用蓝色表示)。

// Materializes to Future[String]                                         (green)
val sink: Sink[ByteString, Future[String]] = Sink.fold("")(_ + _.utf8String)
 
// Materializes to (Future[OutgoingConnection], Future[String])           (blue)
val nestedSink: Sink[Int, (Future[OutgoingConnection], Future[String])] =
  nestedFlow.toMat(sink)(Keep.both)

在例子最后,我们将nestedSOurcenestedSink联系在一起,我们使用自定义组合函数来创建RunnableGraph的结果的另一个物化类型。这个组合函数只是忽略了Future[Sink]部分,通过自定义的样例类MyClass包装了两个其他的值(在简图中用紫色表示):

case class MyClass(private val p: Promise[Option[Int]], conn: OutgoingConnection) {
  def close() = p.trySuccess(None)
}
 
def f(p: Promise[Option[Int]],
      rest: (Future[OutgoingConnection], Future[String])): Future[MyClass] = {
 
  val connFuture = rest._1
  connFuture.map(MyClass(p, _))
}
 
// Materializes to Future[MyClass]                                        (purple)
val runnableGraph: RunnableGraph[Future[MyClass]] =
  nestedSource.toMat(nestedSink)(f)

—————— 注意:在上面例子中对于组合物化值,嵌套结构是没有必要的。这只是表明两个功能如何协同工作。组合物化值而没有嵌套和深层次参与的深入案例请查看Combining materialized values —————— ##1.6.4属性(Attributes) 我们已经看到使用named()来引入嵌套层次到流体DSLfluid DSL(也可以通过来自GraphDSLcreate()来明确嵌套)。除了有增加嵌套层次的影响,named()也是调用withAttributes(Attributes.name("someName"))的简写。属性提供物化运行实体materialized running entity的某些方面的一种微调方式。例如,缓冲区大小可以通过属性控制(见Buffers in Akka Streams)。当谈及层次组合,字段通过嵌套模块继承,除非它们通过自定义值复写。 下面的代码,是早期例子的一个改进,在某些模块设置inputBuffer,而其他模块没有设置:

import Attributes._
val nestedSource =
  Source.single(0)
    .map(_ + 1)
    .named("nestedSource") // Wrap, no inputBuffer set
 
val nestedFlow =
  Flow[Int].filter(_ != 0)
    .via(Flow[Int].map(_ - 2).withAttributes(inputBuffer(4, 4))) // override
    .named("nestedFlow") // Wrap, no inputBuffer set
 
val nestedSink =
  nestedFlow.to(Sink.fold(0)(_ + _)) // wire an atomic sink to the nestedFlow
    .withAttributes(name("nestedSink") and inputBuffer(3, 3)) // override

其影响是,每个模块从其封闭父模块继承inputBuffer属性,除非有明确设置相同的属性。nestedSource从物化本身获得默认属性。另一方面nestedSink有这些属性设置,所以能在所有的嵌套模块中使用。nestedFlow除了被明确提供来复写继承的map阶段,其他都继承nestedSinknested

这个简图说明案例代码的继承过程(红色代表物化默认属性,nestSink的属性设置是蓝色,而nestedFlow的属性设置是绿色)