Skip to content

Latest commit

 

History

History
257 lines (209 loc) · 19.4 KB

1.4.md

File metadata and controls

257 lines (209 loc) · 19.4 KB

#1.4 流工作的基础 (Basics and working with Flows) ##1.4.1 核心概念 (Core concepts) Akka stream是一个用有限的缓存空间来处理和传输一系列元素的库。后者就是我们所说的有界的属性,这也是akka stream的决定性特征。用白话来表示就是能描述一个链(chain (或者像我们后来看到的图)的处理实体,在任何给定时间只缓存有限的元素,各个执行独立的(也可能是同时)来自其他元素。有界缓冲区的属性是和akka model不同点之一,每一个actor通常有一个无界的、或者有界的,但是是废弃的信箱。akka stream有一个有界的但是不废弃的信箱处理元素。 在我们继续之前,让我们定义一些将在文档中使用的基本术语:

  • Stream 一个包含移动的、可转化的数据的活跃进程
  • Element元素是streams中的处理单元。从上游流到下游流所有业务转化以及转移元素。 缓冲区大小总是表示实际大小的元素中独立元素的数量。
  • Back-pressure 一种流程控制的手段:一种消费者通知生产者关于他们当前可用性、有效的减缓上游生产者的速度来和他们实际的消费能力相匹配的数据的方式。在akka stream的环境中中back-pressure一直被理解为非阻塞的non-blocking,异步的asynchronous
  • Non-Blocking 意味着特定的操作不阻塞调用线程的进程,即使这需要很长时间才能完成请求的操作。
  • Graph 流处理拓扑的描述,定义当流运行时元素的流向的路径。
  • Processing Stage 在所有的构建模块中是构建图的一个通用的名字。类似例子: 像map()filter()的操作,通过类似PushStagePushPullStageStatefulStagetransform()阶段,通过Merge或者Broadcast的图连接。要了解完整的内容请查看Overview of built-in stages and their semantics

当我们谈论异步、非阻塞、backpressure意味着在akka streams中可用的处理阶段将不使用阻塞调用异步消息和其他传递交换信息,而使用异步意味着在不阻塞线程的情况下减缓生产速度。这是一个线程池友好的设计,需要等待的实体将不会阻塞线程(快速生产等待缓慢消费),传递自己以支持使用一个基本的线程池。

##1.4.2 定义和运行流 (Defining and running streams) 在akka stream中能使用如下核心抽象描述线性处理管道:

  • Source 只有一个输出的处理状态,无论下游处理状态是否做好接收准备,(Source)都会发射数据元素。
  • Sink 只有一个输入的处理状态,请求和接收元素可能减缓上流生产者生产元素
  • Flow只有一个输入和一个输出的处理状态,通过它转化数据元素将连接它的上游和下游。
  • RunnableGraph 两端分别连接SourceSinkFlow是可运行的(run()).

可能会附加一个FlowSource产生一个复合源,也可能预先给Sink设置一个Flow来获得一个新的sink。当同时拥有sourcesinkstream以恰当的方式终结, stream将会成为RunnableGraph类型,表明该流已经做好了执行的准备。

记住即使通过连接所有的sourcesink以及其他不同的处理状态来构建RunnableGraph,在它物化前都不会有数据从中经过,这一点是重要的。物化是通过图(在akka streams通常会涉及actors的启动)来描述计算所需的所有资源的分配过程。 由于Flows作为管道处理的一个简单描述,这通常是不可变的,线程安全的以及自由共享的,这意味着例如在actors之间传递以及安全的共享,有一个actor准备工作,然后代码中在一些完全不同的地方进行物化。

val source = Source(1 to 10)
val sink = Sink.fold[Int, Int](0)(_ + _)
// connect the Source to the Sink, obtaining a RunnableGraph
val runnable: RunnableGraph[Future[Int]] = source.toMat(sink)(Keep.right)
// materialize the flow and get the value of the FoldSink
val sum: Future[Int] = runnable.run()

当我们运行(物化)RunnableGraph[T]后我们将得到物化值类型T。每一个流处理状态都可以提供一个物化值,这也是用户结合它们到一个新类型的职责(这里应该理解为必须提供的类型)。在上述例子中,我们使用toMat来表示我们想要改变source以及sink的物化值,我们方便的使用Keep.right来表明我们只关心sink的物化值。在我们的例子中,FoldSink物化了一个代表在流上的folding过程所产生的Future类型的物化值。在一般情况下,一个流可以暴露多个物化值,但是相当常见的情况是仅仅对只对流中Source或者Sink的一个值感兴趣。由于这个原因,有一个方便的runWith()方法适用于SinkSource以及Flow要求,一个提供的源(Source)(为了运行Sink),一个收集器(Sink)(为了运行Source)或同时有SinkSource(为了运行Flow,由于还没连接这两个)。

val source = Source(1 to 10)
val sink = Sink.fold[Int, Int](0)(_ + _)
// materialize the flow, getting the Sinks materialized value
val sum: Future[Int] = source.runWith(sink)

值得一提的是:由于处理阶段是不可变的,连接它们返回一个新的处理阶段而不是修改现有实例,所以在构造长的流(long flows),记得要分配新的变量或者运行它。

val source = Source(1 to 10)
source.map(_ => 0) // has no effect on source, since it's immutable
source.runWith(Sink.fold(0)(_ + _)) // 55
val zeroes = source.map(_ => 0) // returns new Source[Int], with `map()` appended
zeroes.runWith(Sink.fold(0)(_ + _)) // 0

注意默认情况下akka stream元素只支持一个下游的处理阶段。让默认流元素变得更简单和更有效,使用fan-out(支持多个下游处理阶段)是一个明确的选择。此外,它也允许更加灵活的处理多播的方案,比如broadcast(示意所有下游元素)或者balance(示意下流元素中的任意一个)


在上面的例子中,我们使用了runWith()方法,该方法既物化流同时也返回给定sink或者source的物化值。由于流可以被物化多次,每一次的物化将重新计算一个新的物化值。通常导致每次返回不同的物化值。在我们下面的例子中,在可运行变量中,我们创建两个流的物化实例即使我们使用的相同的sink来接收future,两次物化映射了产生的不同future

// connect the Source to the Sink, obtaining a RunnableGraph
val sink = Sink.fold[Int, Int](0)(_ + _)
val runnable: RunnableGraph[Future[Int]] =
Source(1 to 10).toMat(sink)(Keep.right)
// get the materialized value of the FoldSink
val sum1: Future[Int] = runnable.run()
val sum2: Future[Int] = runnable.run()
// sum1 and sum2 are different Futures!

###定义 sources,sinks以及flows(Defining sources,sinks and flows) SourceSink对象定义各种方式创建sourcessinks元素。下面示例将列举有用的构造(需要了解更多信息请参考API):

// Create a source from an Iterable
Source(List(1, 2, 3))
// Create a source from a Future
Source.fromFuture(Future.successful("Hello Streams!"))
// Create a source from a single element
Source.single("only one element")
// an empty source
Source.empty
// Sink that folds over the stream and returns a Future
// of the final result as its materialized value
Sink.fold[Int, Int](0)(_ + _)
// Sink that returns a Future as its materialized value,
// containing the first element of the stream
Sink.head
// A Sink that consumes a stream without doing anything with the elements
Sink.ignore
// A Sink that executes a side-effecting call for every element of the stream
Sink.foreach[String](println(_))

有多种方式可用来连接不同部分的流,以下示例展示了一些可用的举措:

// Explicitly creating and wiring up a Source, Sink and Flow
Source(1 to 6).via(Flow[Int].map(_ * 2)).to(Sink.foreach(println(_)))
// Starting from a Source
val source = Source(1 to 6).map(_ * 2)
source.to(Sink.foreach(println(_)))
// Starting from a Sink
val sink: Sink[Int, Unit] = Flow[Int].map(_ * 2).to(Sink.foreach(println(_)))
Source(1 to 6).to(sink)
// Broadcast to a sink inline
val otherSink: Sink[Int, Unit] =
    Flow[Int].alsoTo(Sink.foreach(println(_))).to(Sink.ignore)

Source(1 to 6).to(otherSink)

###非法流元素 (Illegal stream elements) 按照Reactive Streams规范(第2.13条),akka stream不允许null作为元素在流中传递,如果你要定义没有值的模型,使用scala.Option或者scala.util.Either

##1.4.3 Back-pressure解释 遵循Reactive Stream规范,akka stream实现了异步的、非阻塞的back-pressure协议,akka是创始成员之一。库的使用者不必明确的写任何关于backpressure处理的代码--这是akka stream的任何建立和处理中自动提供的。这是可能的但需要补充说明的是缓冲区基友溢出的策略可以影响流的行为。在可能包含循环的复杂的图处理(由于图周期,活跃度,死锁的问题,要非常小心的使用)中这一点是非常重要的。backpressure协议是下游订阅者参照需求接收和缓存一定数量的元素的术语。在Reactive Stream中被叫做发布者的术语以及在akka strem作为source的实现的数据源保证它不会发送任何超过订阅者总需求的元素。


注意 Reactive Stream规范中定义的发布者和订阅者并不是面向用户界别的API,而是面向底层的不同Reactive Stream实现。akka stream在不暴露Reactive Stream的前提下用SourceFlow(在Reactive Stream中简称为Processor)以及Sink实现了这些概念。如果你需要了解Reactive stream其他的库,去阅读Integrating with Reactive Streams


在Reactive Stream中backpressure的工作能通俗的描述成动态推\拉模式,取决于下游应付上游发布的速率而动态的切换推和拉的backpressure模型。为了进一步说明这一点,我们考虑这两种情况下,如何使用backpressure处理:

###缓慢生产,快速消费 这种情况下当然是幸福的--我们并不需要在这种情况下减缓发布。但是极少的情况下信号是恒定的,并且在任何时间点上都有可能改变,这种情况突然结束了,现在订阅者比发布者者的速率慢了。为了确保这种情况(这种情况下高可用),backpressure在这种情况下仍需要启用,但我们并不想为这个安全措施而付出高昂的代价。Reactive Stream协议通过从订阅者到发布者的异步信号Request(n:Int)解决了这个问题。该协议保证了发布者永远不会发出多于需求的元素。由于订阅者比目前更快,则发出请求更多元素的信号(可能也批量共同投放需求--在同一信号中需要组合元素)。这意味着发布者在生产进入的元素时永远不需要等待(be back-pressured)。 正如我们所看到的,这种情况就是我们所说的推模式,发布者可以尽可能快的生产元素。当它发射元素时,待处理的需求将重新恢复(这里我的理解是类似订阅者不停的下订单,发布者生产完刚才的订单又得重新生产订单)

###快速生产,缓慢消费 这种情况下backpressuring发布者是必须的,因为订阅者没有足够能力来处理它的上游想要发射的数据元素。 由于发布者不允许发出超过由订阅者挂起的需求信号的元素,它必须通过应用以下策略来遵循backpressure:

  • 如果能控制其生产速率,则不生产元素。
  • 尝试以有限的方式缓冲元素,直到有更多需求的信号。
  • 删除元素直到有更多需求的信号。
  • 如果无法采取以上策略,则摧毁流。

正如我们所看到的,这种情况实际上意味着订阅者将从发布者拉元素--这种操作模式被成为基于拉的backpressure

##1.4.4 流物化 (Stream Materialization) 在akka stream中将构造流和图看作是准备一个蓝图,一个执行计划。流物化是接受流的说明(图)和分配所有在运行中需要的资源的过程。在 akka stream中,这往往意味着启动有着处理能力的actor,但并不限于此--这也可能表明根据流的需要,打开文件或者socket等。物化在所谓的"业务终端"被触发。最值得注意的是,这包含在SourceFlow上被定义的run()runWith()方法,以及少数在我们所知的sinks上有着语法糖的方法,比如runForeach(el => ...)(作为runWith(SInk.foreach(el => ...))的别名)。 物化在物化线程上同步进行,实际上流物化中的流处理是由actor的处理发起的,将会在线程池上运行已经完成的运行配置--构建ActorMaterializerMaterializationSettings作为默认的调度的设置。


注意:在复合图中线性计算阶段(SourceSinkFlow)重用实例是合法的,但一定多次会物化那个阶段。


###融合操作 akka stream2.0中包含支持融合操作的最初版本,这意味着,流程或者图可以在同一个actor中运行,并且有三种可能

  • 启动一个流由于要启动一个融合算法而比以前耗时更多
  • 从一个处理阶段到下一个阶段如果避免融合算法,传递将会快很多
  • 融合处理将不再并行运行,将在一个cpu core中运行融合

第一点可以通过预融合来解决,然后使用如下蓝图的流:

import akka.stream.Fusing
val flow = Flow[Int].map(_ * 2).filter(_ > 500)
val fused = Fusing.aggressive(flow)
Source.fromIterator { () => Iterator from 0 }
    .via(fused)
    .take(1000)

为了平衡上面的第二点和第三点,你将不得不通过加入Attributes.asyncBoundaryflowsgraphs中实现插入异步的界限来确保和其它部分以异步方式通信。

import akka.stream.Attributes.asyncBoundary
Source(List(1, 2, 3))
.map(_ + 1)
.withAttributes(asyncBoundary)
.map(_ * 2)
.to(Sink.ignore)

在这个例子中,我们创建了流中的两个区域,这将在每个actor上执行--假如加法和乘法是代价很高的操作,由于使用两个cpu并行的执行这些任务将实现新能的提升。要注意的是,异步边界不是flow内单独的存在(其他流媒体中),而是通过添加这一点的属性信息到那些已经构建的flow graph

示例图

这意味着,红色气泡部分将由一个actor执行,而在它之外的部分由另一个执行。该方案可连续使用 -- 总是使得一个类似的边界囊括(边界)之前的Stage,然后附加所有其它的在(被囊括的处理Stage)之后被添加的处理Stage。


警告: 如果没有融合(即最高为2.0-M2版本)的每一个流的处理阶段出于效率的因素隐含一个输入缓冲区。如果你的流图包含循环,那么为了避免死锁,这些缓冲区具有至关重要的作用。由于融合(fusing)这些隐式缓冲区已经不再存在。数据元素在融合阶段将不再缓存。为了让流运行所有(元素或者阶段),缓冲是需要的,你将不得不通过.buffer()组合子以明确的方式插入--通常是大小为2足够将循环反馈给函数(allow a feedback loop to function)

新的融合行为通过配置参数禁用融合:akka.stream.materializer.auto-fusing=off,你仍然可以通过运行更少的actor手动融合那些图(Graphs),唯一例外的是SslTlsStage和groupBy,操作者在所有阶段都可以融合。

###组合物化值 (Combining materialized values) 由于akka stream每一个处理阶段在物化以后都会产生一个物化值。在表示当一起阻塞这些阶段,以某种方式将这些值转化为最终值时这是有必要的。对于这一点,很多组合子有额外的参数、函数的变量,这将被用于组合所得的值,下面是使用这些组合子的示例:

// An source that can be signalled explicitly from the outside
val source: Source[Int, Promise[Option[Int]]] = Source.maybe[Int]
// A flow that internally throttles elements to 1/second, and returns a Cancellable
// which can be used to shut down the stream
val flow: Flow[Int, Int, Cancellable] = throttler
// A sink that returns the first element of a stream in the returned Future
val sink: Sink[Int, Future[Int]] = Sink.head[Int]
// By default, the materialized value of the leftmost stage is preserved
val r1: RunnableGraph[Promise[Option[Int]]] = source.via(flow).to(sink)
// Simple selection of materialized values by using Keep.right
val r2: RunnableGraph[Cancellable] = source.viaMat(flow)(Keep.right).to(sink)
val r3: RunnableGraph[Future[Int]] = source.via(flow).toMat(sink)(Keep.right)
// Using runWith will always give the materialized values of the stages added
// by runWith() itself
val r4: Future[Int] = source.via(flow).runWith(sink)
val r5: Promise[Option[Int]] = flow.to(sink).runWith(source)
val r6: (Promise[Option[Int]], Future[Int]) = flow.runWith(source, sink)
// Using more complext combinations
val r7: RunnableGraph[(Promise[Option[Int]], Cancellable)] =
source.viaMat(flow)(Keep.both).to(sink)
val r8: RunnableGraph[(Promise[Option[Int]], Future[Int])] =
source.via(flow).toMat(sink)(Keep.both)
val r9: RunnableGraph[((Promise[Option[Int]], Cancellable), Future[Int])] =
source.viaMat(flow)(Keep.both).toMat(sink)(Keep.both)
val r10: RunnableGraph[(Cancellable, Future[Int])] =
source.viaMat(flow)(Keep.right).toMat(sink)(Keep.both)
// It is also possible to map over the materialized values. In r9 we had a
// doubly nested pair, but we want to flatten it out
val r11: RunnableGraph[(Promise[Option[Int]], Cancellable, Future[Int])] =
r9.mapMaterializedValue {
case ((promise, cancellable), future) =>
(promise, cancellable, future)
}
// Now we can use pattern matching to get the resulting materialized values
val (promise, cancellable, future) = r11.run()
// Type inference works as expected
promise.success(None)
cancellable.cancel()
future.map(_ + 3)
// The result of r11 can be also achieved by using the Graph API
val r12: RunnableGraph[(Promise[Option[Int]], Cancellable, Future[Int])] =
RunnableGraph.fromGraph(GraphDSL.create(source, flow, sink)((_, _, _)) { implicit builder =>  
(src, f, dst) =>
import GraphDSL.Implicits._
src ~> f ~> dst
ClosedShape
})  

注意:在流处理图的内部,图可以访问到物化值,详细内容请参考 Accessing the materialized value inside the Graph


##1.4.5 流排序(Stream ordering) 在akka stream中几乎所有的计算阶段都维护元素的输入顺序。这意味着,如果输入{{IA1,IA2,...,IAn} 导致(“cause”)输出{OA1,OA2,...,OAk}},以及如果输入{IB1,IB2,...,IBm} 引发(“cause”)输出{OB1,OB2,...,OBl}以及所有的IAiIBi之前接着OAiOBi之前。这个属性即使通过异步操作,如mapAsync操作维护,但是存在所谓mapAsyncUnordered的不保留顺序的无序版本。然而,在存在多个输入流(类似合并Merge)的交汇点Junctions,通常情况下没有定义不同输入端口的到达情况的输出顺序。这种融合操作merge-like可能会在发射Bi元素前发射Ai,这个顺序是内部逻辑决定的发射元素的顺序。特殊的元素乐死Zip是如何保证他们的输出顺序,每一个输出元素取决于上游的元素已经收到信号--从而通过该属性定义的顺序进行zipping。如果你发现需要在fin-in情况下考虑细粒度的控制发射元素的顺序,使用MergePreferred或者 GraphStage,这将让你完全控制合并的执行