可以使用多种代码模式和类库来验证Akka Stream
的Source
, Flow
和Sink
的行为是否正确. 在这里我们讨论以下的方法来测试这些元素:
- 简单的
Source
,Sink
和Flow
; Source
和Sink
结合了akka-testkit
模块中的TestProbe
;Source
和Sink
由akka-stream-testkit
模块特意构造出;
把你的数据处理过程流水线保持成分离的source
,flow
以及sink
是非常重要的. 这使得它们很容易被测试, 通过把它们组合到其他的source
和sink
中, 或者一些测试框架, 如akka-testkit
或者akka-stream-testkit
锁提供的.
// Testing a custom sink can be as simple as attaching a source that emits elements from a predefined collection,
测试一个自定义的Sink
十分容易, 只需要附着一个预定义输出元素的集合, 运行一个创建好的测试Flow
并且断言被测试Sink
的输出值即可. 下面是一个测试Sink
的例子:
val sinkUnderTest = Flow[Int].map(_ * 2).toMat(Sink.fold(0)(_ + _))(Keep.right)
val future = Source(1 to 4).runWith(sinkUnderTest)
val result = Await.result(future, 100.millis)
assert(result == 20)
同样可以对Source
使用这种方法. 在下一个例子中我们有一个产生无限元素的流的Source
. 这样的源可以通过断言任意一个满足条件的第一个元素的值. 在这里grouped
组合子和Sink.head
非常有用.
import system.dispatcher
import akka.pattern.pipe
val sourceUnderTest = Source.repeat(1).map(_ * 2)
val future = sourceUnderTest.grouped(10).runWith(Sink.head)
val result = Await.result(future, 100.millis)
assert(result == Seq.fill(10)(2))
当我们测试一个Flow
时, 我们需要附加一个Source
和一个Sink
. 当流的两个端都在我们控制之下时, 我们可以选择不同的Source
来测试Flow
各种各样的边界情况而且使用Sink
来容易的使用断言.
val flowUnderTest = Flow[Int].takeWhile(_ < 5)
val future = Source(1 to 10).via(flowUnderTest).runWith(Sink.fold(Seq.empty[Int])(_ :+ _))
val result = Await.result(future, 100.millis)
assert(result == (1 to 4))
AkkaStream
提供了与Actor
现成的整合. 这个支持可以使用在akka-testkit API
熟悉的TestProbe
写流测试
一种更直接的测试方法是把流具象化成一个Future
然后使用pipe
模式来把这个Future
的内容管道输出到这个观察者中(Probe).
import system.dispatcher
import akka.pattern.pipe
val sourceUnderTest = Source(1 to 4).grouped(2)
val probe = TestProbe()
sourceUnderTest.grouped(2).runWith(Sink.head).pipeTo(probe.ref)
probe.expectMsg(100.millis, Seq(Seq(1, 2), Seq(3, 4)))
我们可以使用一个输出所有到来的元素到一个给定的ActorRef
的Sink.actorRef
, 来替代具象化到一个Future
. 现在我们可以在TestProbe
上使用使用断言方法并且在元素一个接一个到来的时候进行期望. 我们还能断言流的完成, 使用期望给定给Sink.actorRef
的onCompleteMessage
.
case object Tick
val sourceUnderTest = Source.tick(0.seconds, 200.millis, Tick)
val probe = TestProbe()
val cancellable = sourceUnderTest.to(Sink.actorRef(probe.ref, "completed")).run()
probe.expectMsg(1.second, Tick)
probe.expectNoMsg(100.millis)
probe.expectMsg(200.millis, Tick)
cancellable.cancel()
probe.expectMsg(200.millis, "completed")
和Sink.actorRef
一样提供了可以控制接收到的元素的功能, 我们可以使用Source.actorRef
然后对输出的元素进行完全的控制.
val sinkUnderTest = Flow[Int].map(_.toString).toMat(Sink.fold("")(_ + _))(Keep.right)
val (ref, future) = Source.actorRef(8, OverflowStrategy.fail)
.toMat(sinkUnderTest)(Keep.both).run()
ref ! 1
ref ! 2
ref ! 3
ref ! akka.actor.Status.Success("done")
val result = Await.result(future, 100.millis)
assert(result == "123")
你可能发现多种代码模式已经在测试流的流水线时出现了. Akka Stream
有一个独立的akka-stream-testkit
模块提供了专门写流测试的工具. 这个模块有两个主要的组件TestSource
和TestSink
, 它们提供了把Source
和Sink
具象化到Probes
的流式API.
注意: 确认在你的项目的依赖中添加了
akka-stream-testkit
.
一个由TestSink.probe
返回的Sink
允许人工控制需求和断言将要输出到下流的到来元素.
val sourceUnderTest = Source(1 to 4).filter(_ % 2 == 0).map(_ * 2)
sourceUnderTest
.runWith(TestSink.probe[Int])
.request(2)
.expectNext(4, 8)
.expectComplete()
一个由TestSource.probe
返回的Source
可以用来断言需求或者控制何时流完成或者以一个错误结束.
val sinkUnderTest = Sink.cancelled
TestSource.probe[Int]
.toMat(sinkUnderTest)(Keep.left)
.run()
.expectCancellation()
你还可以注入异常然后测试sink
在错误条件下的行为.
val sinkUnderTest = Sink.head[Int]
val (probe, future) = TestSource.probe[Int]
.toMat(sinkUnderTest)(Keep.both)
.run()
probe.sendError(new Exception("boom"))
Await.ready(future, 100.millis)
val Failure(exception) = future.value.get
assert(exception.getMessage == "boom")
当测试flow
时可以把测试source
和sink
在一起使用.
val flowUnderTest = Flow[Int].mapAsyncUnordered(2) { sleep =>
pattern.after(10.millis * sleep, using = system.scheduler)(Future.successful(sleep))
}
val (pub, sub) = TestSource.probe[Int]
.via(flowUnderTest)
.toMat(TestSink.probe[Int])(Keep.both)
.run()
sub.request(n = 3)
pub.sendNext(3)
pub.sendNext(2)
pub.sendNext(1)
sub.expectNextUnordered(1, 2, 3)
pub.sendError(new Exception("Power surge in the linear subroutine C-47!"))
val ex = sub.expectError()
assert(ex.getMessage.contains("C-47"))
为了测试, 可以使用一个特殊的流的执行模式, 该模式下并发执行会更加积极(以减少性能的代价), 为了帮助在测试中暴露竞争情况(race condition). 为了启用这个设定, 在你的配置文件中添加下行:
akka.stream.materializer.debug.fuzzing-mode = on
警告: 永远不要在生产环境或者benchmark中使用该设定. 这是一个测试工具, 它使得你的代码在测试中更具有更多覆盖, 但是它减少了流的通量. 如果你启用了该设定, 一个警告消息将记录在日志中.