Skip to content

Latest commit

 

History

History
595 lines (386 loc) · 26.1 KB

1.15.md

File metadata and controls

595 lines (386 loc) · 26.1 KB

1.15 流的Cookbook

1.15.1 介绍

这是一个, 用解决一些以配方形式展现的针对小目标的问题, 来展示Akka Stream API的多种多样的用法的模式的集合. 这页的意图在于给予一些如何用流来着手解决一些小任务的灵感和想法. 这页展示的配方可以直接拿来使用, 但是它们更多是作为一个起点: 极度鼓励去自定义一些代码片段.

这个部分可以作为文档主体的一个补充材料. 当阅读手册时同时参看该页是一个很好的主意, 并且寻找各种在文档主体中出现的各种流概念的例子.

如果你需要这些配方中使用的可用处理步骤的快速引用, 可以参看1.14 内置步骤概览和相关语义.

1.15.2 用Flow来工作

在这个集合中我们展现一些包线性Flow的简单配方. 在本节中的配方是相当的基础, 更有针对性的配方在另一个独立的章节中(缓存和速率, 用流IO来工作).

记录一个流的元素

场景: 在开发过程中, 能够看到流的某一些特殊部分发生了什么,有时候非常有用.

最简单的解法是使用map操作和使用println来把接收到的元素打印到控制台. 由于这个配方非常简单, 它适用于快速debug的时候.

val loggedSource = mySource.map { elem => println(elem); elem }

另一个记录的方法是使用log()操作, 它允许配置记录在流中的元素, 同样包括完成和错误中的.

//自定义日志等级
mySource.log("before-map")
  .withAttributes(Attributes.logLevels(onElement = Logging.WarningLevel))
  .map(analyse)

//或者提供一个自定义的日志适配器
implicit val adapter = Logging(system, "customLogger")
mySource.log("custom")

序列流的扁平化

场景: 给定的一个流里包含的是序列元素, 但是需要一个包含基本元素的流, 分别把序列中嵌套的所有元素都流化.

可以使用mapConcat操作来实现一对多的元素转化, 使用一个以In => immutable.Seq[Out]形式的函数作为一个映射函数. 在这个例子中我们希望使用能够映射一个Seq的元素到元素本身, 所以我们只要调用mapConcat(identity).

val myData: Source[List[Message], Unit] = someDataSource

val flattened: Source[Message, Unit] = myData.mapConcat(identity)

流转化成严格集合

场景: 给定一个有限个数的元素序列流, 但是需要一个scala的集合.

在这个配方中, 我们将使用grouped流操作, 它将到来的元素组团成一个有限大小的集合的流(这个可以视为序列流的扁平化的反操作). 通过使用grouped(MaxAllowedSeqSize)我们创建了一个以MaxAllowedSeqSize为最大值的组的流, 然后我们通过加上Sink.head来获取这个流的第一个元素. 我们获取到的是一个包含了MaxAllowedSeqSize个数的原来流中元素序列的Future(其余更多的元素被丢弃了).

val strict: Future[immutable.Seq[Message]] =
  myData.grouped(MaxAllowedSeqSize).runWith(Sink.head)

计算一个ByteString流的摘要信息

场景: 给定一个字节流以ByteString流的形式, 我们想要计算出该流的加密摘要.

这个配方使用一个PushPullStage来持有一个可变的MessageDigest类(java Cryptography API的一部分), 然后随着字节流的到来更新它. 当流开始时, 该步骤的OnPull处理机被调用, 它只是把Pull事件冒泡给它的上游. 作为这个拉取的响应, 一个ByteString的块将到达(OnPush), 我们使用它来更新这个摘要, 然后该步骤将拉取下一个块.

最终这个ByteString将被耗尽并且我们将通过onUpstreamFinish收到该事件的消息通知. 当这个时候我们希望输出摘要值, 但是我们无法在这个处理者中直接实现. 相应的我们调用ctx.absorbTermination()来传递信号给上下文, 告知它我们并不想结束. 当环境决定我们可以输出更多元素时onPull再次被调用, 我们看到ctx.isFinishing返回了true(因为上游源已经被消耗光了). 由于我们只是希望输出一个最终的元素, 只需要调用ctx.pushAndFinish传递入想要输出的ByteString的摘要.

import akka.stream.stage._
def digestCalculator(algorithm: String) = new PushPullStage[ByteString, ByteString] {
  val digest = MessageDigest.getInstance(algorithm)

  override def onPush(chunk: ByteString, ctx: Context[ByteString]): SyncDirective = {
    digest.update(chunk.toArray)
    ctx.pull()
  }

  override def onPull(ctx: Context[ByteString]): SyncDirective = {
    if (ctx.isFinishing) ctx.pushAndFinish(ByteString(digest.digest()))
    else ctx.pull()
  }

  override def onUpstreamFinish(ctx: Context[ByteString]): TerminationDirective = {
    // 如果流完成了, 我们需要在onPull块中输出最后的元素
    // 无法在一个终止块中直接输出元素
    // (onUpstreamFinish 或者 onUpstreamFailure)
    ctx.absorbTermination()
    }
  }

val digest: Source[ByteString, Unit] = data.transform(() => digestCalculator("SHA-256"))  

从ByteString流中解析行

场景: 给定一个字节流以ByteString流的形式, 包含了需要解析的以行结束符为结尾的行(或者, 可选的, 包含以一个特殊分隔字节符分隔的二进制帧).

Framing帮助对象包含了一个便捷的方法来从ByteStrings中解析消息:

import akka.stream.io.Framing
val linesStream = rawData.via(Framing.delimiter(
  ByteString("\r\n"), maximumFrameLength = 100, allowTruncation = true))
  .map(_.utf8String)

实现 redcue-by-key

场景: 给定一个包含元素的流, 我们希望在元素的不同分组中计算一些聚合值.

reduce-by-key类型操作的hello worldwordcount, 我们将在下面展现. 给定一个包含词语的流, 我们首先创建一个依据identity函数创建出的分组的新流, 也就是说我们现在有了一个包含流的流, 每一个子流中包含相同的词语.

为了对词语计数, 我们需要处理这个包含流的流(实际上包含相同词语的分组). groupBy返回一个SubFlow, 意味着我们可以直接变换结果子流. 在这个例子中, 我们使用fold这个组合子来聚合词语本身和它出现的次数在一个二元组中(String, Integer). 每一个子流将输出一个最终值--准确的说是一个对--当所有的输入结束的完成时. 作为最后一步我们把从子流返回的值合并到一个单一的输出流.

一个值得记录下的细节存在于MaximumDistinctWords参数: 它定义了groupBy和merge操作的适用性. Akka Stream关注在有边界的资源消耗并且打开的并发输入到merge操作的个数, 描述了merge本身需要的资源个数. 因此在任意给定时间内只有一个有限个数的子流是活跃的. 如果groupBy操作遇到了超过这个数字的key的个数, 那么流是无法在没有违法它边界资源的情况下继续下去, 在这种情况下groupBy将一个失败终结.

val counts: Source[(String, Int), Unit] = words
  // 首先把词语分割到不同的流中
  .groupBy(MaximumDistinctWords, identity)
  // 为这些流添加计数逻辑
  .fold(("", 0)) {
    case ((_, count), word) => (word, count + 1)
  }
  // 获取一个词语计数的流
  .mergeSubstreams

从这个wordcount中提取特殊的部分到

  • 一个groupKey函数定义了如何进行分组
  • 一个foldZero定义了作用在给定group key的子流上fold的初始元素
  • 一个fold函数用来实施实际约简

我们得到了以下一个泛用版本:

def reduceByKey[In, K, Out](
  maximumGroupSize: Int,
  groupKey: (In) => K,
  foldZero: (K) => Out)(fold: (Out, In) => Out): Flow[In, (K, Out), Unit] = {

  Flow[In]
    .groupBy(maximumGroupSize, groupKey)
    .fold(Option.empty[(K, Out)]) {
      case (None, elem) =>
        val key = groupKey(elem)
        Some((key, fold(foldZero(key), elem)))
      case (Some((key, out)), elem) =>
        Some((key, fold(out, elem)))
    }
    .map(_.get)
    .mergeSubstreams
}

val wordCounts = words.via(reduceByKey(
  MaximumDistinctWords,
  groupKey = (word: String) => word,
  foldZero = (key: String) => 0)(fold = (count: Int, elem: String) => count + 1))

注意: 请注意我们上面讨论的reduce-by-key版本是以串行的方式读取所有的输入, 也就是说它不是一个像MapReduce和其他相似框架的并行模式.


用groupBy将元素排序到多个组中

场景: groupBy操作严格的划分到来的元素, 每一个元素都仅属于一个分组. 有时候我们希望同时把元素映射到多个分组中.

为了获取想要的值, 我们把问题分成了两步:

  • 首先, 使用一个topicMapper函数来提供一个消息隶属的话题(分组)列表, 我们变换Message流成为一个(Message, Topic)流, 在这里消息属于的每一个话题都将成为一个独立的对被输出. 这步由使用mapConcat来完成.

  • 然后我们得到一个新的包含消息话题对元素的流(包含一个独立的对, 对于消息属于的每一个话题)并且传入到groupBy中, 把话题当成group key.

val topicMapper: (Message) => immutable.Seq[Topic] = extractTopics

val messageAndTopic: Source[(Message, Topic), Unit] = elems.mapConcat { msg: Message =>
  val topicsForMessage = topicMapper(msg)
  // Create a (Msg, Topic) pair for each of the topics
  // the message belongs to
  topicsForMessage.map(msg -> _)
}

val multiGroups = messageAndTopic
  .groupBy(2, _._2).map {
    case (msg, topic) =>
      // 做后续需要做的事
}

1.15.3 用Graph来工作

在这个集合中我们展示一些使用了Graph流元素来完成一些目标的配方.

程序化触发Flow的元素

场景: 给定了一个流我们希望根据一个触发信号控制流元素的输出. 换句话说, 即使这个流可以流动(没有产生背压), 我们也希望控制住元素直到一个触发器信号到来.

这个配方用简单的zip了Trigger信号流和Message流来解决这个问题. 因为Zip生成两元对, 我们简单的映射输出流为选择两元对的第一个元素.

val graph = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder =>
  import GraphDSL.Implicits._
  val zip = builder.add(Zip[Message, Trigger]())
  elements ~> zip.in0
  triggerSource ~> zip.in1
  zip.out ~> Flow[(Message, Trigger)].map { case (msg, trigger) => msg } ~> sink
  ClosedShape
})

另一种方案, 替换使用一个Zip, 然后使用map来获取二元对的第一个元素, 我们可以使用ZipWith来避免在第一步时创建一个二元对, 这个方法以一个接受两个参数的函数作为参数来创建输出元素. 如果 这个函数返回一个两个参数的二元对, 那么就和Zip的行为完全一致, 所以ZipWithZip的更泛用版本.

val graph = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder =>
  import GraphDSL.Implicits._
  val zip = builder.add(ZipWith((msg: Message, trigger: Trigger) => msg))
  
  elements ~> zip.in0
  triggerSource ~> zip.in1
  zip.out ~> sink
  ClosedShape
})

平均分配任务到一个固定工作池

场景: 给定一个包含工作的流和一个用Flow描述的工人进程, 创建了一个工人池能够自动的平衡到来的工作到可用的工人上, 然后合并结果.

我们将把解法描述成一个函数, 该函数以一个工人流和分配的工人个数作为参数然后给定一个内部包含这些工人的一个池的Flow. 为了达成这个目标我们从一个Graph创建一个Flow.

这个Graph由一个Balance节点组成, 该节点是一个特殊的扇出操作, 它试图路由元素到可用的下游消费者. 在一个For循环中我们把所有的工人组装成输出给这个balaner的元素, 然后我们组装这些工人的输出到一个Merge元素, 它会从工人那搜集所有的结果.

def balancer[In, Out](worker: Flow[In, Out, Any], workerCount: Int): Flow[In, Out, Unit] = {
  import GraphDSL.Implicits._

  Flow.fromGraph(GraphDSL.create() { implicit b =>
    val balancer = b.add(Balance[In](workerCount, waitForAllDownstreams = true))
    val merge = b.add(Merge[Out](workerCount))

    for (_ <- 1 to workerCount) {
      // for each worker, add an edge from the balancer to the worker, then wire
      // it to the merge element
      balancer ~> worker ~> merge
    }

    FlowShape(balancer.in, merge.out)
  })
}

val processedJobs: Source[Result, Unit] = myJobs.via(balancer(worker, 3))

1.15.4 处理速率

这个集合的配方展示了上下游不同速率需要用其他方式处理而不仅仅是简单的背压的多种模式.

丢弃元素

场景: 给定一个快速的生产者和一个慢速的消费者, 我们希望在必要时候丢弃元素使得生产者不会变慢太多.

这个可以使用最多功能的速率转换操作conflate. Conflate可以看成是一个特殊的fold操作, 它在折叠多个上游元素到一个聚合元素时, 会保持上游的速度不受下游影响.

当上游速度很快时, conflate的折叠过程开始. 这个折叠需要一个初始元素, 该初始元素由一个种子函数提供, 这个函数拿走当前的元素并且生产出一个折叠过程的初始值. 在这个例子中是identity, 所以我们的折叠状态由消息本身开始. 而折叠函数也十分特殊: 给定了聚合值(最近的消息)和一个新的元素(最新的元素), 我们的聚合状态简单的转换成为最新的元素. 这个函数的选择实现了一个简单的元素丢弃操作.

val droppyStream: Flow[Message, Message, Unit] = 
  Flow[Message].conflate(seed = identity)((lastMessage, newMessage) => newMessage)

丢弃广播

场景: 默认的Broadcast这个Graph元素会在合适的时候产生背压, 但是这意味着一个慢速的下游消费者会拖慢其他的下游消费者, 这导致了一个较低级的通量. 换句话说Broadcast的速率就是下游最慢消费者的速率. 在某些情况下, 如果有需求那么能够允许快一点的消费者可以通过丢弃元素从而独立与他的兄弟消费者来处理.

我们解决这个问题的方法是通过在所有的下游消费者前添加一个buffer元素, 定义一个丢弃策略来取代默认的背压. 这允许了在不同消费者间暂时性较小的速率差异(缓存平滑了小的速率变化), 但同时如果有需要也允许了更快的消费者处理时丢弃慢速消费者的缓存.

val graph = RunnableGraph.fromGraph(GraphDSL.create(mySink1, mySink2, mySink3)((_, _, _)) { implicit b =>
  (sink1, sink2, sink3) =>
    import GraphDSL.Implicits._

    val bcast = b.add(Broadcast[Int](3))
    myElements ~> bcast

    bcast.buffer(10, OverflowStrategy.dropHead) ~> sink1
    bcast.buffer(10, OverflowStrategy.dropHead) ~> sink2
    bcast.buffer(10, OverflowStrategy.dropHead) ~> sink3
    ClosedShape
})

搜集丢失的滴答(ticks)

场景: 给定一个常规的滴答源(流的形式), 我们想保持一个丢失的滴答的计数并在可能的时候传递下去, 来替代对滴答的生产者产生背压.

我们将使用conflate来解决这个问题. Conflate以两个函数作为参数:

  • 为折叠过程提供初始元素Seed函数, 该过程作用在上游速度下游快的时候. 在这个例子中seed函数是一个返回0的常量函数, 因为开始时没有任何丢失的滴答.

  • 一个折叠函数, 在由于下游处理速度的不足, 多个上游消息需要折叠到一个聚合值的时候被调用. 我们的折叠函数简单的增加当前存储的丢失滴答的计数.

作为结果, 我们有一个IntFlow, 里面的数值代表丢失的滴答. 0数值意味着我们可以以足够的速度消耗滴答(也就是说 0意味着: 1个没有丢失的滴答 + 0 个丢失的滴答)

val missedTicks: Flow[Tick, Int, Unit] =
  Flow[Tick].conflate(seed = (_) => 0)(
    (missedTicks, tick) => missedTicks + 1)

创建一个重复最后可见元素的流处理器

场景: 给定一个生产者和一个消费者, 它们的速率提前并不知道, 必要时我们想要通过丢弃从上游来的最早的未消费元素来确认没有任何一者在放慢速度, 并且必要时重复下游的最后的值.

我们有两个选项实现这个特性. 两个场景下我们都将使用DetachedStage来构建我们的自定义元素(DetachedStage主要应用于元素的速率转换, 就像conflate, expandbuffer一样). 在第一个版本中我们将使用一个已经提供的初始值initial, 它用于当上游没有准备好的元素时传递给下游. 在onPush()处理块中我们只要重写currentValue变量并且通过调用onPush()立即缓解上游(记住, DetachedStage的实现不允许把调用push()作为onPush()的回应 或者 调用pull()作为onPull()的回应). 下游的onPull处理块也十分的相似, 我们通过输出currentValue来立即缓解下游.

import akka.stream.stage._
class HoldWithInitial[T](initial: T) extends DetachedStage[T, T] {
  private var currentValue: T = initial

  override def onPush(elem: T, ctx: DetachedContext[T]): UpstreamDirective = {
    currentValue = elem
    ctx.pull()
  }

  override def onPull(ctx: DetachedContext[T]): DownstreamDirective = {
    ctx.push(currentValue)
  }
}

虽然这个相对比较简单, 第一版的缺点是它需要一个任意的初始化值, 但有时候这个值可能无法提供. 因此, 我们创建了第二个版本, 下游只可能在一种场景下等待: 如果最初始的元素还没有准备好.

我们引入了一个布尔类型waitingFirstValue来表示第一个元素是否已经提供(或者 可以把currentValue当成一个Option使用亦或元素类型是AnyRef的子类那么使用一个null来提供相同功能). 在下游的onPull处理块中, 和上一版本不同之处是我们调用了holdDownStream(), 如果第一个元素还未可用那么阻塞了我们的下游. 上游的onPush()处理块设定waitingFirstValuefalse, 然后当检查了holdDownStream()是否已经被调用, 它要么释放上游生产者或者通过调用pushAndPull()同时释放上游和下游

import akka.stream.stage._
class HoldWithWait[T] extends DetachedStage[T, T] {
  private var currentValue: T = _
  private var waitingFirstValue = true

  override def onPush(elem: T, ctx: DetachedContext[T]): UpstreamDirective = {
    currentValue = elem
    waitingFirstValue = false
    if (ctx.isHoldingDownstream) ctx.pushAndPull(currentValue)
    else ctx.pull()
  }

  override def onPull(ctx: DetachedContext[T]): DownstreamDirective = {
    if (waitingFirstValue) ctx.holdDownstream()
    else ctx.push(currentValue)
  }

}


### 全局的限制多个流的速率

**场景**: 给定了一些我们无法合并的非独立的流, 我们希望全局的限制这些流的聚集通量.

一个可行的方法是使用一个共享的`actor`来作为一个全局的限制器并结合`mapAsync`创建一个可复用的`Flow`, 它可以插入一个流中来限制流的速率.

作为第一步我们定义一个会记录全局速率限制的`actor`. 这个`actor`维护一个定时器, 一个就绪许可证计数器和一个潜在成员加入的等待队列. 这个`actor`有一个`open`和`close`的状态. 当它仍有就绪许可证时这个`actor`处于`open`状态. 每当有一个许可请求到来时, 将以`WantToPass`的消息传递给这个`actor`, 可用的许可数将减少并且我们会通知发送方被许可了通过发送一个`MayPass`消息. 如果许可数到达了0, `actor`就转换成`close`状态. 在这个状态中请求不会立即被回应, 相应的发送方的引用将会添加到队列. 一旦就绪许可证重新补足的计时器触发了, 通过发送`ReplenishTokens`消息, 我们增加就绪许可的计数并发送给每一个等待的发送方一个回复. 如果还存在比可用许可更多的发送方, 那么我们仍处于`closed`状态.

```scala

object Limiter {
  case object WantToPass
  case object MayPass

  case object ReplenishTokens

  def props(maxAvailableTokens: Int, tokenRefreshPeriod: FiniteDuration,
            tokenRefreshAmount: Int): Props =
  Props(new Limiter(maxAvailableTokens, tokenRefreshPeriod, tokenRefreshAmount))
}

class Limiter(
  val maxAvailableTokens: Int,
  val tokenRefreshPeriod: FiniteDuration,
  val tokenRefreshAmount: Int) extends Actor {
  import Limiter._
  import context.dispatcher
  import akka.actor.Status

  private var waitQueue = immutable.Queue.empty[ActorRef]
  private var permitTokens = maxAvailableTokens
  private val replenishTimer = system.scheduler.schedule(
    initialDelay = tokenRefreshPeriod,
    interval = tokenRefreshPeriod,
    receiver = self,
    ReplenishTokens)

  override def receive: Receive = open

  val open: Receive = {
    case ReplenishTokens =>
      permitTokens = math.min(permitTokens + tokenRefreshAmount, maxAvailableTokens)
    case WantToPass =>
      permitTokens -= 1
      sender() ! MayPass
      if (permitTokens == 0) context.become(closed)
  }

  val closed: Receive = {
    case ReplenishTokens =>
      permitTokens = math.min(permitTokens + tokenRefreshAmount, maxAvailableTokens)
      releaseWaiting()
    case WantToPass =>
      waitQueue = waitQueue.enqueue(sender())
  }

  private def releaseWaiting(): Unit = {
    val (toBeReleased, remainingQueue) = waitQueue.splitAt(permitTokens)
    waitQueue = remainingQueue
    permitTokens -= toBeReleased.size
    toBeReleased foreach (_ ! MayPass)
    if (permitTokens > 0) context.become(open)
  }

  override def postStop(): Unit = {
    replenishTimer.cancel()
    waitQueue foreach (_ ! Status.Failure(new IllegalStateException("limiter stopped")))
  }
}

我们使用mapAsync函数并结合ask模式来创建使用这个全局限制器actorFlow. 我们还定义一个超时, 如果在配置的最大的等待时间段内没有得到回复, 通过ask得到的future会失败, 同样也会使得相应的流失败.

def limitGlobal[T](limiter: ActorRef, maxAllowedWait: FiniteDuration): Flow[T, T, Unit] = {
  import akka.pattern.ask
  import akka.util.Timeout
  Flow[T].mapAsync(4)((element: T) => {
    import system.dispatcher
    implicit val triggerTimeout = Timeout(maxAllowedWait)
    val limiterTriggerFuture = limiter ? Limiter.WantToPass
    limiterTriggerFuture.map((_) => element)
  })
}

注意: 用来限制的全局actor也引入了一个全局的瓶颈. 你可能需要为这个actor配置一个专用的dispatcher


1.15.5 处理IO

把ByteString流组装成固定大小的ByteString

情况: 给定一个ByteString的流, 我们希望创建一个包含和该序列流相同字节的ByteString流, 但是对ByteString的大小进行限制. 换句话说我们希望当ByteString超过一个大小阈值时, 把它切片成更小的块.

这个问题可以通过使用单个PushPullStage来解决. 我们步骤的主要逻辑在emitChunkOrPull()块中, 该块实现了以下逻辑:

  • 如果缓存为空, 我们拉取更多的字节

  • 如果缓存不为空, 我们根据chunkSize来切割. 这样的结果就是我们将得到一个将要输出的下一个块, 和一个空的或者非空的剩余缓存.

onPush()onPull()同样都调用了emitChunkOrPull(), 唯一不同的就是推送的处理块还通过把到来的块附加到缓存的最后来保存它.

import akka.stream.stage._

class Chunker(val chunkSize: Int) extends PushPullStage[ByteString, ByteString] {
  private var buffer = ByteString.empty

  override def onPush(elem: ByteString, ctx: Context[ByteString]): SyncDirective = {
    buffer ++= elem
    emitChunkOrPull(ctx)
  }

  override def onPull(ctx: Context[ByteString]): SyncDirective = emitChunkOrPull(ctx)

  override def onUpstreamFinish(ctx: Context[ByteString]): TerminationDirective =
    if (buffer.nonEmpty) ctx.absorbTermination()
    else ctx.finish()

  private def emitChunkOrPull(ctx: Context[ByteString]): SyncDirective = {
    if (buffer.isEmpty) {
      if (ctx.isFinishing) ctx.finish()
      else ctx.pull()
    } else {
      val (emit, nextBuffer) = buffer.splitAt(chunkSize)
      buffer = nextBuffer
      ctx.push(emit)
    }
  }
}

val chunksStream = rawBytes.transform(() => new Chunker(ChunkLimit))

限制在一个ByteStrings流中通过的字节数

情况: 给定了一个ByteStrings流如果消耗超过了一个给定最大字节数那么我们希望这个流以失败告终.

这个配方使用一个PushStage来实现这个特性. 在唯一个我们重载的处理块中, onPush(), 我们只是更新一个计数器并且观察它是否已经变得比maximumBytes大. 如果违反发生了我们输出一个失败信号, 否则则传递我们接受到的块.

import akka.stream.stage._
class ByteLimiter(val maximumBytes: Long) extends PushStage[ByteString, ByteString] {
  private var count = 0

  override def onPush(chunk: ByteString, ctx: Context[ByteString]): SyncDirective = {
    count += chunk.size
    if (count > maximumBytes) ctx.fail(new IllegalStateException("Too much bytes"))
    else ctx.push(chunk)
  }
}

val limiter = Flow[ByteString].transform(() => new ByteLimiter(SizeLimit))

ByteString流压缩

情况: 在一长串的流变换后, 因为它们的不可变性, 结构上可共享的特性, ByteStrings可能指向了多个没有作用但是在内存中的源ByteString实例. 作为最后一步变化, 我们希望清除掉那些不再被引用的源ByteString的拷贝.

这个配方简单的使用map来在ByteString元素上调用compact()方法. 这回拷贝潜在的数组, 所以如果要使用它请置于整个调用链的最后.

val compacted: Source[ByteString, Unit] = data.map(_.compact)

ByteString流中注入keep-alive信息

情况: 给定一个以ByteString流为形式的沟通通道, 我们希望在当且仅当没有对正常信息交通有妨碍的时候, 注入keep-live信息.

这里有一个内置的操作, 它直接提供这样的功能:

import scala.concurrent.duration._
val injectKeepAlive: Flow[ByteString, ByteString, Unit] =
  Flow[ByteString].keepAlive(1.second, () => keepaliveMessage)