diff --git a/yun/2024-10-16-coroutine-flow.md b/yun/2024-10-16-coroutine-flow.md index d97ccf01..a35085ab 100644 --- a/yun/2024-10-16-coroutine-flow.md +++ b/yun/2024-10-16-coroutine-flow.md @@ -1,3 +1,5 @@ +# Kotlin 코루틴으로 성능 개선: Flow를 활용한 다중 요청 처리 + Kotlin의 코루틴을 이용한 비동기 프로그래밍은 성능을 크게 향상시킬 수 있는 강력한 도구입니다. 특히 `Flow`를 활용하여 여러 요청을 동시에 처리하는 방식은 효율적인 비동기 처리를 가능하게 합니다. 이 포스팅에서는는 `Flow`를 사용하여 다중 요청을 처리하는 방법과 이론적 배경, 그리고 이를 사용할 때 주의할 점에 대해 다루겠습니다. ## 시나리오 @@ -8,23 +10,23 @@ Kotlin의 코루틴을 이용한 비동기 프로그래밍은 성능을 크게 ```kotlin class OrderClient { - fun getOrder(orderRequest: OrderRequest): ResponseResult { - return runBlocking { - delay(300) // 300ms 지연, 실제 API를 호출하지 않고 시간만 지연 - ResponseResult.Success(OrderResponse(orderRequest.productId)) - } - } + fun getOrder(orderRequest: OrderRequest): ResponseResult { + return runBlocking { + delay(300) // 300ms 지연, 실제 API를 호출하지 않고 시간만 지연 + ResponseResult.Success(OrderResponse(orderRequest.productId)) + } + } } fun getOrderSync(orderRequests: List): List { - return orderRequests - .map { - orderClient - .getOrder(it) // 300ms 지연 - .onFailure { log.error("Failure: $it") } - .onSuccess { log.info("Success: $it") } - .getOrThrow() - } + return orderRequests + .map { + orderClient + .getOrder(it) // 300ms 지연 + .onFailure { log.error("Failure: $it") } + .onSuccess { log.info("Success: $it") } + .getOrThrow() + } } ``` @@ -34,41 +36,42 @@ fun getOrderSync(orderRequests: List): List { @Test fun getOrderSync() { - val stopWatch = StopWatch() - val flatMapMergeStudy = FlatMapMergeStudy() - val orderRequests = (1..100).map { OrderRequest("$it") } + val stopWatch = StopWatch() + val flatMapMergeStudy = FlatMapMergeStudy() + val orderRequests = (1..100).map { OrderRequest("$it") } - stopWatch.start() - val response = flatMapMergeStudy.getOrderSync(orderRequests) - stopWatch.stop() - println(stopWatch.totalTimeMillis) + stopWatch.start() + val response = flatMapMergeStudy.getOrderSync(orderRequests) + stopWatch.stop() + println(stopWatch.totalTimeMillis) - // 30,528ms - println(response) + // 30,528ms + println(response) } ``` ## 코루틴 `Flow`를 이용한 성능 개선 -Kotlin의 코루틴을 이용한 비동기 프로그래밍은 성능을 크게 향상시킬 수 있는 강력한 도구입니다. 특히 Flow를 활용하여 여러 요청을 동시에 처리하는 방식은 효율적인 비동기 처리를 가능하게 합니다. 이 블로그에서는 flatMapMerge를 사용하여 다중 요청을 처리하는 방법과 이론적 배경, 그리고 이를 사용할 때 주의할 점에 대해 다루겠습니다. +Kotlin의 코루틴은 비동기 작업을 손쉽게 처리할 수 있는 강력한 도구입니다. 특히 Flow를 활용하면 여러 비동기 요청을 효율적으로 처리할 수 있습니다. Flow는 데이터 스트림을 처리하는 코루틴 기반 API로, 여러 개의 작업을 동시에 병렬로 수행할 수 있도록 지원합니다. 이번 섹션에서는 Flow의 flatMapMerge를 사용하여 다수의 API 요청을 효율적으로 처리하는 방법과, 이를 통해 얻을 수 있는 성능 향상에 대해 다뤄보겠습니다. + ```kotlin @OptIn(FlowPreview::class) suspend fun getOrderFlow(orderRequests: List): List { - return orderRequests - .asFlow() - .flatMapMerge { request -> - flow { - orderClient - .getOrder(request) - .onFailure { log.error("Failure: $it") } - .onSuccess { - log.info("Success: $it") - emit(it) - } - } - } - .toList() + return orderRequests + .asFlow() + .flatMapMerge { request -> + flow { + orderClient + .getOrder(request) + .onFailure { log.error("Failure: $it") } + .onSuccess { + log.info("Success: $it") + emit(it) + } + } + } + .toList() } ``` @@ -79,35 +82,35 @@ suspend fun getOrderFlow(orderRequests: List): List ```kotlin @Test fun getOrderFlow(): Unit = runBlocking { - val stopWatch = StopWatch() - val flatMapMergeStudy = FlatMapMergeStudy() - val orderRequests = (1..100).map { OrderRequest("$it") } - - stopWatch.start() - val response = flatMapMergeStudy.getOrderFlow(orderRequests) - stopWatch.stop() - // 2,228ms - println(stopWatch.totalTimeMillis) - } + val stopWatch = StopWatch() + val flatMapMergeStudy = FlatMapMergeStudy() + val orderRequests = (1..100).map { OrderRequest("$it") } + + stopWatch.start() + val response = flatMapMergeStudy.getOrderFlow(orderRequests) + stopWatch.stop() + // 2,228ms + println(stopWatch.totalTimeMillis) + } ``` 이론상 100개의 요청을 동시에 처리하면 300ms 정도의 시간이 소요되어야 하지만, 실제로는 2,228ms가 소요됩니다. 이는 다음과 같은 요인들로 인한 것입니다. 1. **코루틴 생성과 컨텍스트 전환 오버헤드** - - 코루틴을 생성하고 실행할 때 발생하는 오버헤드는 무시할 수 없는 시간 지연을 초래할 수 있습니다. - - 특히, `flatMapMerge`를 사용하여 다수의 코루틴을 병렬로 실행할 때, 각 코루틴의 생성과 컨텍스트 전환 비용이 누적되어 총 실행 시간이 증가할 수 있습니다. + - 코루틴을 생성하고 실행할 때 발생하는 오버헤드는 무시할 수 없는 시간 지연을 초래할 수 있습니다. + - 특히, `flatMapMerge`를 사용하여 다수의 코루틴을 병렬로 실행할 때, 각 코루틴의 생성과 컨텍스트 전환 비용이 누적되어 총 실행 시간이 증가할 수 있습니다. 2. **`flatMapMerge`의 병합 과정** - - `flatMapMerge`는 여러 플로우를 병합하면서 각 플로우의 결과를 수집합니다. - - 이 과정에서 발생하는 추가적인 작업들, 예를 들어 플로우의 결과를 수집하고 병합하는 오버헤드가 존재할 수 있습니다. - - 이 오버헤드는 특히 플로우의 개수가 많을 때 더 크게 작용합니다. + - `flatMapMerge`는 여러 플로우를 병합하면서 각 플로우의 결과를 수집합니다. + - 이 과정에서 발생하는 추가적인 작업들, 예를 들어 플로우의 결과를 수집하고 병합하는 오버헤드가 존재할 수 있습니다. + - 이 오버헤드는 특히 플로우의 개수가 많을 때 더 크게 작용합니다. 3. **`emit` 호출과 플로우 수집의 지연** - - 각 플로우에서 `emit`을 호출하고, 최종적으로 `toList`로 수집하는 과정에서 발생하는 지연도 무시할 수 없습니다. - - `emit`은 비동기적으로 데이터를 내보내는 작업이므로, 여러 번 호출될 때 지연이 누적될 수 있습니다. + - 각 플로우에서 `emit`을 호출하고, 최종적으로 `toList`로 수집하는 과정에서 발생하는 지연도 무시할 수 없습니다. + - `emit`은 비동기적으로 데이터를 내보내는 작업이므로, 여러 번 호출될 때 지연이 누적될 수 있습니다. 4. **기본 Concurrency 설정** - - flatMapMerge의 기본 concurrency 값은 16이며, 이 코드는 기본값으로 동작합니다. - - Concurrency 16으로 동작할 때 100개의 요청을 처리하는 데 소요되는 시간은 100 / 16 * 300 = 1875ms 정도입니다. - - 이 시간은 앞서 언급한 1, 2, 3번 항목들과 함께 작업을 수행해야 하므로 추가적인 지연이 발생할 수 있습니다. - - **특히, Concurrency 16으로 처리하는 시간이 가장 오래 걸리며, 이는 전체 처리 시간에 크게 영향을 미칩니다.** + - flatMapMerge의 기본 concurrency 값은 16이며, 이 코드는 기본값으로 동작합니다. + - Concurrency 16으로 동작할 때 100개의 요청을 처리하는 데 소요되는 시간은 100 / 16 * 300 = 1875ms 정도입니다. + - 이 시간은 앞서 언급한 1, 2, 3번 항목들과 함께 작업을 수행해야 하므로 추가적인 지연이 발생할 수 있습니다. + - **특히, Concurrency 16으로 처리하는 시간이 가장 오래 걸리며, 이는 전체 처리 시간에 크게 영향을 미칩니다.** ![](https://raw.githubusercontent.com/cheese10yun/blog-sample/master/kotlin-coroutine/images/result_001.png) @@ -136,26 +139,26 @@ Kotlin 문서에서는 코루틴을 경량 스레드라고 합니다. 이는 대 ```kotlin // 코드 출처 코틀린 동시성 프로그래밍 suspend fun createCoroutines(amount: Int) { - val jobs = ArrayList() - for (i in 1..amount) { - jobs += GlobalScope.launch { - delay(1000) - } - } - jobs.forEach { it.join() } + val jobs = ArrayList() + for (i in 1..amount) { + jobs += GlobalScope.launch { + delay(1000) + } + } + jobs.forEach { it.join() } } @Test @DelicateCoroutinesApi fun `코루틴 생성 테스트`() = runBlocking { - println("${Thread.activeCount()} thread active at the start") + println("${Thread.activeCount()} thread active at the start") - val time = measureTimeMillis { - createCoroutines(100) - } + val time = measureTimeMillis { + createCoroutines(100) + } - println("${Thread.activeCount()} thread active at the end") - println("Took $time ms") + println("${Thread.activeCount()} thread active at the end") + println("Took $time ms") } ``` @@ -190,21 +193,21 @@ Kotlin의 `flatMapMerge`에서 `concurrency` 파라미터는 동시에 병렬로 ```kotlin @OptIn(FlowPreview::class) suspend fun getOrderFlow(orderRequests: List, concurrency: Int): List { - return orderRequests - .asFlow() - // concurrency 동시 실행할 코루틴 수 제한, - .flatMapMerge(concurrency) { request -> - flow { - orderClient - .getOrder(request) - .onFailure { log.error("Failure: $it") } - .onSuccess { - log.info("Success: $it") - emit(it) - } - } - } - .toList() + return orderRequests + .asFlow() + // concurrency 동시 실행할 코루틴 수 제한, + .flatMapMerge(concurrency) { request -> + flow { + orderClient + .getOrder(request) + .onFailure { log.error("Failure: $it") } + .onSuccess { + log.info("Success: $it") + emit(it) + } + } + } + .toList() } ``` @@ -243,6 +246,41 @@ flatMapMerge의 concurrency 파라미터는 동시에 실행되는 코루틴 수 또한 배치 애플리케이션처럼 특정 작업만 하고 애플리케이션이 종료되는 환경에서는 concurrency 값을 높여 처리량을 극대화하는 것이 좋습니다. 이런 경우에는 단기간에 최대한 많은 작업을 처리하는 것이 목표이므로, 가능한 한 높은 concurrency 값을 설정하여 성능을 최적화할 수 있습니다. +## 코루틴은 더 적은 스레드로 더 많은 동시성을 처리한다 + +```kotlin +private fun rxAndBulkWriter(): ItemWriter { + return ItemWriter { stores -> + stores + .toFlowable() + .parallel() + .runOn(Schedulers.io()) + .map { store -> + ... + } + .sequential() + .blockingSubscribe( + { store -> // 속도 특정 시에는 주석 ... }, + { log.error(it.message, it) }, + { ... } + ) + } +} +``` + +RxJava에서 flow 방식으로 처리하는 코드는 코루틴과 비교해볼 수 있습니다. RxJava와 코루틴의 flow를 이용한 병렬 처리 및 병합 과정은 기본적으로 유사한 개념을 공유합니다. 두 방식 모두 데이터를 비동기적으로 처리하고 결과를 수집하며, 병렬로 처리한 작업들을 하나의 흐름으로 다시 병합합니다. 그렇다면, 이 두 방식의 차이점은 무엇일까요? + +![](https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/parallelflowable.sequential.png) + +RxJava와 같은 라이브러리에서는 동시성 처리를 위해 CPU 코어 수에 맞춰 스레드를 생성하는 것이 일반적입니다. 예를 들어, CPU가 4코어라면 4개의 스레드를 생성해 병렬로 작업을 처리합니다. 이 방식은 각 스레드에 작업을 분배해 동시에 여러 작업을 처리할 수 있지만, 스레드의 생성과 컨텍스트 전환에서 발생하는 비용이 성능에 영향을 미칠 수 있습니다. 스레드 수가 많아질수록 시스템 자원을 더 많이 소비하게 됩니다. + +코틀린의 코루틴은 동시성 프로그래밍에 있어서 더 효율적인 대안을 제공합니다. 코루틴은 스레드보다 훨씬 가벼우며, 하나의 스레드에서 여러 개의 코루틴을 실행할 수 있습니다. 코루틴은 스레드처럼 독립적인 작업 단위이지만, 스레드보다 적은 자원을 사용하고 빠르게 컨텍스트 전환을 할 수 있습니다. + +코루틴은 I/O 작업이나 비동기 처리가 필요한 경우 특히 효과적입니다. 여러 코루틴이 동시에 실행되더라도, 이는 스레드 수와는 무관하게 적은 스레드로도 많은 작업을 처리할 수 있습니다. 즉, CPU 코어 수보다 훨씬 더 많은 동시 작업을 수행할 수 있으며, 스레드의 생성 및 컨텍스트 전환 비용도 줄어듭니다. + +결론적으로, RxJava와 같은 방식은 CPU 코어 수만큼 스레드를 생성해 동시성을 처리하는 반면, 코루틴은 하나의 스레드에서 여러 작업을 동시에 처리할 수 있어, 적은 스레드로도 더 많은 동시성을 처리할 수 있다는 장점이 있습니다. 코루틴의 이러한 특성은 자원을 절약하고, 더 높은 성능을 제공할 수 있는 강력한 도구가 됩니다. + ## 출처 -* [코틀린 동시성 프로그래밍](https://m.yes24.com/Goods/Detail/90338766) \ No newline at end of file +* [코틀린 동시성 프로그래밍](https://m.yes24.com/Goods/Detail/90338766) +* [RxJava](https://github.com/ReactiveX/RxJava) \ No newline at end of file