Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[김남윤] 코루틴 Flow을 통한 성능 개선 #1351

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
214 changes: 126 additions & 88 deletions yun/2024-10-16-coroutine-flow.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
# Kotlin 코루틴으로 성능 개선: Flow를 활용한 다중 요청 처리

Kotlin의 코루틴을 이용한 비동기 프로그래밍은 성능을 크게 향상시킬 수 있는 강력한 도구입니다. 특히 `Flow`를 활용하여 여러 요청을 동시에 처리하는 방식은 효율적인 비동기 처리를 가능하게 합니다. 이 포스팅에서는는 `Flow`를 사용하여 다중 요청을 처리하는 방법과 이론적 배경, 그리고 이를 사용할 때 주의할 점에 대해 다루겠습니다.

## 시나리오
Expand All @@ -8,23 +10,23 @@ Kotlin의 코루틴을 이용한 비동기 프로그래밍은 성능을 크게

```kotlin
class OrderClient {
fun getOrder(orderRequest: OrderRequest): ResponseResult<OrderResponse> {
return runBlocking {
delay(300) // 300ms 지연, 실제 API를 호출하지 않고 시간만 지연
ResponseResult.Success(OrderResponse(orderRequest.productId))
}
}
fun getOrder(orderRequest: OrderRequest): ResponseResult<OrderResponse> {
return runBlocking {
delay(300) // 300ms 지연, 실제 API를 호출하지 않고 시간만 지연
ResponseResult.Success(OrderResponse(orderRequest.productId))
}
}
}

fun getOrderSync(orderRequests: List<OrderRequest>): List<OrderResponse> {
return orderRequests
.map {
orderClient
.getOrder(it) // 300ms 지연
.onFailure { log.error("Failure: $it") }
.onSuccess { log.info("Success: $it") }
.getOrThrow()
}
return orderRequests
.map {
orderClient
.getOrder(it) // 300ms 지연
.onFailure { log.error("Failure: $it") }
.onSuccess { log.info("Success: $it") }
.getOrThrow()
}
}
```

Expand All @@ -34,41 +36,42 @@ fun getOrderSync(orderRequests: List<OrderRequest>): List<OrderResponse> {

@Test
fun getOrderSync() {
val stopWatch = StopWatch()
val flatMapMergeStudy = FlatMapMergeStudy()
val orderRequests = (1..100).map { OrderRequest("$it") }
val stopWatch = StopWatch()
val flatMapMergeStudy = FlatMapMergeStudy()
val orderRequests = (1..100).map { OrderRequest("$it") }

stopWatch.start()
val response = flatMapMergeStudy.getOrderSync(orderRequests)
stopWatch.stop()
println(stopWatch.totalTimeMillis)
stopWatch.start()
val response = flatMapMergeStudy.getOrderSync(orderRequests)
stopWatch.stop()
println(stopWatch.totalTimeMillis)

// 30,528ms
println(response)
// 30,528ms
println(response)
}
```

## 코루틴 `Flow`를 이용한 성능 개선

Kotlin의 코루틴을 이용한 비동기 프로그래밍은 성능을 크게 향상시킬 수 있는 강력한 도구입니다. 특히 Flow를 활용하여 여러 요청을 동시에 처리하는 방식은 효율적인 비동기 처리를 가능하게 합니다. 이 블로그에서는 flatMapMerge를 사용하여 다중 요청을 처리하는 방법과 이론적 배경, 그리고 이를 사용할 때 주의할 점에 대해 다루겠습니다.
Kotlin의 코루틴은 비동기 작업을 손쉽게 처리할 수 있는 강력한 도구입니다. 특히 Flow를 활용하면 여러 비동기 요청을 효율적으로 처리할 수 있습니다. Flow는 데이터 스트림을 처리하는 코루틴 기반 API로, 여러 개의 작업을 동시에 병렬로 수행할 수 있도록 지원합니다. 이번 섹션에서는 Flow의 flatMapMerge를 사용하여 다수의 API 요청을 효율적으로 처리하는 방법과, 이를 통해 얻을 수 있는 성능 향상에 대해 다뤄보겠습니다.


```kotlin
@OptIn(FlowPreview::class)
suspend fun getOrderFlow(orderRequests: List<OrderRequest>): List<OrderResponse> {
return orderRequests
.asFlow()
.flatMapMerge { request ->
flow {
orderClient
.getOrder(request)
.onFailure { log.error("Failure: $it") }
.onSuccess {
log.info("Success: $it")
emit(it)
}
}
}
.toList()
return orderRequests
.asFlow()
.flatMapMerge { request ->
flow {
orderClient
.getOrder(request)
.onFailure { log.error("Failure: $it") }
.onSuccess {
log.info("Success: $it")
emit(it)
}
}
}
.toList()
}
```

Expand All @@ -79,35 +82,35 @@ suspend fun getOrderFlow(orderRequests: List<OrderRequest>): List<OrderResponse>
```kotlin
@Test
fun getOrderFlow(): Unit = runBlocking {
val stopWatch = StopWatch()
val flatMapMergeStudy = FlatMapMergeStudy()
val orderRequests = (1..100).map { OrderRequest("$it") }

stopWatch.start()
val response = flatMapMergeStudy.getOrderFlow(orderRequests)
stopWatch.stop()
// 2,228ms
println(stopWatch.totalTimeMillis)
}
val stopWatch = StopWatch()
val flatMapMergeStudy = FlatMapMergeStudy()
val orderRequests = (1..100).map { OrderRequest("$it") }

stopWatch.start()
val response = flatMapMergeStudy.getOrderFlow(orderRequests)
stopWatch.stop()
// 2,228ms
println(stopWatch.totalTimeMillis)
}
```

이론상 100개의 요청을 동시에 처리하면 300ms 정도의 시간이 소요되어야 하지만, 실제로는 2,228ms가 소요됩니다. 이는 다음과 같은 요인들로 인한 것입니다.

1. **코루틴 생성과 컨텍스트 전환 오버헤드**
- 코루틴을 생성하고 실행할 때 발생하는 오버헤드는 무시할 수 없는 시간 지연을 초래할 수 있습니다.
- 특히, `flatMapMerge`를 사용하여 다수의 코루틴을 병렬로 실행할 때, 각 코루틴의 생성과 컨텍스트 전환 비용이 누적되어 총 실행 시간이 증가할 수 있습니다.
- 코루틴을 생성하고 실행할 때 발생하는 오버헤드는 무시할 수 없는 시간 지연을 초래할 수 있습니다.
- 특히, `flatMapMerge`를 사용하여 다수의 코루틴을 병렬로 실행할 때, 각 코루틴의 생성과 컨텍스트 전환 비용이 누적되어 총 실행 시간이 증가할 수 있습니다.
2. **`flatMapMerge`의 병합 과정**
- `flatMapMerge`는 여러 플로우를 병합하면서 각 플로우의 결과를 수집합니다.
- 이 과정에서 발생하는 추가적인 작업들, 예를 들어 플로우의 결과를 수집하고 병합하는 오버헤드가 존재할 수 있습니다.
- 이 오버헤드는 특히 플로우의 개수가 많을 때 더 크게 작용합니다.
- `flatMapMerge`는 여러 플로우를 병합하면서 각 플로우의 결과를 수집합니다.
- 이 과정에서 발생하는 추가적인 작업들, 예를 들어 플로우의 결과를 수집하고 병합하는 오버헤드가 존재할 수 있습니다.
- 이 오버헤드는 특히 플로우의 개수가 많을 때 더 크게 작용합니다.
3. **`emit` 호출과 플로우 수집의 지연**
- 각 플로우에서 `emit`을 호출하고, 최종적으로 `toList`로 수집하는 과정에서 발생하는 지연도 무시할 수 없습니다.
- `emit`은 비동기적으로 데이터를 내보내는 작업이므로, 여러 번 호출될 때 지연이 누적될 수 있습니다.
- 각 플로우에서 `emit`을 호출하고, 최종적으로 `toList`로 수집하는 과정에서 발생하는 지연도 무시할 수 없습니다.
- `emit`은 비동기적으로 데이터를 내보내는 작업이므로, 여러 번 호출될 때 지연이 누적될 수 있습니다.
4. **기본 Concurrency 설정**
- flatMapMerge의 기본 concurrency 값은 16이며, 이 코드는 기본값으로 동작합니다.
- Concurrency 16으로 동작할 때 100개의 요청을 처리하는 데 소요되는 시간은 100 / 16 * 300 = 1875ms 정도입니다.
- 이 시간은 앞서 언급한 1, 2, 3번 항목들과 함께 작업을 수행해야 하므로 추가적인 지연이 발생할 수 있습니다.
- **특히, Concurrency 16으로 처리하는 시간이 가장 오래 걸리며, 이는 전체 처리 시간에 크게 영향을 미칩니다.**
- flatMapMerge의 기본 concurrency 값은 16이며, 이 코드는 기본값으로 동작합니다.
- Concurrency 16으로 동작할 때 100개의 요청을 처리하는 데 소요되는 시간은 100 / 16 * 300 = 1875ms 정도입니다.
- 이 시간은 앞서 언급한 1, 2, 3번 항목들과 함께 작업을 수행해야 하므로 추가적인 지연이 발생할 수 있습니다.
- **특히, Concurrency 16으로 처리하는 시간이 가장 오래 걸리며, 이는 전체 처리 시간에 크게 영향을 미칩니다.**

![](https://raw.githubusercontent.com/cheese10yun/blog-sample/master/kotlin-coroutine/images/result_001.png)

Expand Down Expand Up @@ -136,26 +139,26 @@ Kotlin 문서에서는 코루틴을 경량 스레드라고 합니다. 이는 대
```kotlin
// 코드 출처 코틀린 동시성 프로그래밍
suspend fun createCoroutines(amount: Int) {
val jobs = ArrayList<Job>()
for (i in 1..amount) {
jobs += GlobalScope.launch {
delay(1000)
}
}
jobs.forEach { it.join() }
val jobs = ArrayList<Job>()
for (i in 1..amount) {
jobs += GlobalScope.launch {
delay(1000)
}
}
jobs.forEach { it.join() }
}

@Test
@DelicateCoroutinesApi
fun `코루틴 생성 테스트`() = runBlocking {
println("${Thread.activeCount()} thread active at the start")
println("${Thread.activeCount()} thread active at the start")

val time = measureTimeMillis {
createCoroutines(100)
}
val time = measureTimeMillis {
createCoroutines(100)
}

println("${Thread.activeCount()} thread active at the end")
println("Took $time ms")
println("${Thread.activeCount()} thread active at the end")
println("Took $time ms")
}
```

Expand Down Expand Up @@ -190,21 +193,21 @@ Kotlin의 `flatMapMerge`에서 `concurrency` 파라미터는 동시에 병렬로
```kotlin
@OptIn(FlowPreview::class)
suspend fun getOrderFlow(orderRequests: List<OrderRequest>, concurrency: Int): List<OrderResponse> {
return orderRequests
.asFlow()
// concurrency 동시 실행할 코루틴 수 제한,
.flatMapMerge(concurrency) { request ->
flow {
orderClient
.getOrder(request)
.onFailure { log.error("Failure: $it") }
.onSuccess {
log.info("Success: $it")
emit(it)
}
}
}
.toList()
return orderRequests
.asFlow()
// concurrency 동시 실행할 코루틴 수 제한,
.flatMapMerge(concurrency) { request ->
flow {
orderClient
.getOrder(request)
.onFailure { log.error("Failure: $it") }
.onSuccess {
log.info("Success: $it")
emit(it)
}
}
}
.toList()
}
```

Expand Down Expand Up @@ -243,6 +246,41 @@ flatMapMerge의 concurrency 파라미터는 동시에 실행되는 코루틴 수

또한 배치 애플리케이션처럼 특정 작업만 하고 애플리케이션이 종료되는 환경에서는 concurrency 값을 높여 처리량을 극대화하는 것이 좋습니다. 이런 경우에는 단기간에 최대한 많은 작업을 처리하는 것이 목표이므로, 가능한 한 높은 concurrency 값을 설정하여 성능을 최적화할 수 있습니다.

## 코루틴은 더 적은 스레드로 더 많은 동시성을 처리한다

```kotlin
private fun rxAndBulkWriter(): ItemWriter<StoreProjection> {
return ItemWriter { stores ->
stores
.toFlowable()
.parallel()
.runOn(Schedulers.io())
.map { store ->
...
}
.sequential()
.blockingSubscribe(
{ store -> // 속도 특정 시에는 주석 ... },
{ log.error(it.message, it) },
{ ... }
)
}
}
```

RxJava에서 flow 방식으로 처리하는 코드는 코루틴과 비교해볼 수 있습니다. RxJava와 코루틴의 flow를 이용한 병렬 처리 및 병합 과정은 기본적으로 유사한 개념을 공유합니다. 두 방식 모두 데이터를 비동기적으로 처리하고 결과를 수집하며, 병렬로 처리한 작업들을 하나의 흐름으로 다시 병합합니다. 그렇다면, 이 두 방식의 차이점은 무엇일까요?

![](https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/parallelflowable.sequential.png)

RxJava와 같은 라이브러리에서는 동시성 처리를 위해 CPU 코어 수에 맞춰 스레드를 생성하는 것이 일반적입니다. 예를 들어, CPU가 4코어라면 4개의 스레드를 생성해 병렬로 작업을 처리합니다. 이 방식은 각 스레드에 작업을 분배해 동시에 여러 작업을 처리할 수 있지만, 스레드의 생성과 컨텍스트 전환에서 발생하는 비용이 성능에 영향을 미칠 수 있습니다. 스레드 수가 많아질수록 시스템 자원을 더 많이 소비하게 됩니다.

코틀린의 코루틴은 동시성 프로그래밍에 있어서 더 효율적인 대안을 제공합니다. 코루틴은 스레드보다 훨씬 가벼우며, 하나의 스레드에서 여러 개의 코루틴을 실행할 수 있습니다. 코루틴은 스레드처럼 독립적인 작업 단위이지만, 스레드보다 적은 자원을 사용하고 빠르게 컨텍스트 전환을 할 수 있습니다.

코루틴은 I/O 작업이나 비동기 처리가 필요한 경우 특히 효과적입니다. 여러 코루틴이 동시에 실행되더라도, 이는 스레드 수와는 무관하게 적은 스레드로도 많은 작업을 처리할 수 있습니다. 즉, CPU 코어 수보다 훨씬 더 많은 동시 작업을 수행할 수 있으며, 스레드의 생성 및 컨텍스트 전환 비용도 줄어듭니다.

결론적으로, RxJava와 같은 방식은 CPU 코어 수만큼 스레드를 생성해 동시성을 처리하는 반면, 코루틴은 하나의 스레드에서 여러 작업을 동시에 처리할 수 있어, 적은 스레드로도 더 많은 동시성을 처리할 수 있다는 장점이 있습니다. 코루틴의 이러한 특성은 자원을 절약하고, 더 높은 성능을 제공할 수 있는 강력한 도구가 됩니다.


## 출처
* [코틀린 동시성 프로그래밍](https://m.yes24.com/Goods/Detail/90338766)
* [코틀린 동시성 프로그래밍](https://m.yes24.com/Goods/Detail/90338766)
* [RxJava](https://github.com/ReactiveX/RxJava)