From 0b590d1124ff327a47b10541e52cbe4644baa0db Mon Sep 17 00:00:00 2001 From: himadieievsv Date: Tue, 16 Jan 2024 23:09:55 +0900 Subject: [PATCH 1/6] refactor multiInstanceExecute --- build.gradle.kts | 2 +- .../core/locks/ListeningCountDownLatch.kt | 11 ++--- .../abstracts/AbstractMultiInstanceLock.kt | 6 +-- .../locks/excecutors/ExecutorAdditions.kt | 41 ++++++++-------- ...ceExecutor.kt => MultiInstanceExecutor.kt} | 47 +++++++++++++------ ...orTest.kt => MultiInstanceExecutorTest.kt} | 20 ++++---- 6 files changed, 71 insertions(+), 56 deletions(-) rename redpulsar-core/src/main/kotlin/com/himadieiev/redpulsar/core/locks/excecutors/{MultyInstanceExecutor.kt => MultiInstanceExecutor.kt} (75%) rename redpulsar-core/src/test/kotlin/com/himadieiev/redpulsar/core/locks/excecutors/{MultyInstanceExecutorTest.kt => MultiInstanceExecutorTest.kt} (92%) diff --git a/build.gradle.kts b/build.gradle.kts index 4b6e782..02d734e 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -18,7 +18,7 @@ plugins { allprojects { group = "com.himadieiev" - version = "1.1.3" + version = "1.1.4" repositories { mavenCentral() diff --git a/redpulsar-core/src/main/kotlin/com/himadieiev/redpulsar/core/locks/ListeningCountDownLatch.kt b/redpulsar-core/src/main/kotlin/com/himadieiev/redpulsar/core/locks/ListeningCountDownLatch.kt index 6436906..2d608be 100644 --- a/redpulsar-core/src/main/kotlin/com/himadieiev/redpulsar/core/locks/ListeningCountDownLatch.kt +++ b/redpulsar-core/src/main/kotlin/com/himadieiev/redpulsar/core/locks/ListeningCountDownLatch.kt @@ -3,9 +3,8 @@ package com.himadieiev.redpulsar.core.locks import com.himadieiev.redpulsar.core.locks.abstracts.backends.CountDownLatchBackend import com.himadieiev.redpulsar.core.locks.api.CallResult import com.himadieiev.redpulsar.core.locks.api.CountDownLatch +import com.himadieiev.redpulsar.core.locks.excecutors.WaitStrategy import com.himadieiev.redpulsar.core.locks.excecutors.executeWithRetry -import com.himadieiev.redpulsar.core.locks.excecutors.waitAllJobs -import com.himadieiev.redpulsar.core.locks.excecutors.waitMajorityJobs import kotlinx.coroutines.CoroutineName import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Dispatchers @@ -147,7 +146,7 @@ class ListeningCountDownLatch( timeout = maxDuration, retryCount = retryCount, retryDelay = retryDelay, - waiter = ::waitAllJobs, + waitStrategy = WaitStrategy.MAJORITY, ) { backend -> backend.count( latchKeyName = buildKey(name), @@ -168,7 +167,7 @@ class ListeningCountDownLatch( timeout = maxDuration, retryCount = retryCount, retryDelay = retryDelay, - waiter = ::waitAllJobs, + waitStrategy = WaitStrategy.MAJORITY, ) { backend -> backend.undoCount( latchKeyName = buildKey(name), @@ -186,7 +185,7 @@ class ListeningCountDownLatch( timeout = maxDuration.multipliedBy(2), retryCount = retryCount, retryDelay = retryDelay, - waiter = ::waitAllJobs, + waitStrategy = WaitStrategy.MAJORITY, ) { backend -> backend.checkCount(latchKeyName = buildKey(name)) } @@ -202,7 +201,7 @@ class ListeningCountDownLatch( timeout = timeout, retryCount = retryCount, retryDelay = retryDelay, - waiter = ::waitMajorityJobs, + waitStrategy = WaitStrategy.MAJORITY, ) { backend -> backend.listen(channelName = buildKey(channelSpace, name)) } diff --git a/redpulsar-core/src/main/kotlin/com/himadieiev/redpulsar/core/locks/abstracts/AbstractMultiInstanceLock.kt b/redpulsar-core/src/main/kotlin/com/himadieiev/redpulsar/core/locks/abstracts/AbstractMultiInstanceLock.kt index 8286dfd..40cb1b9 100644 --- a/redpulsar-core/src/main/kotlin/com/himadieiev/redpulsar/core/locks/abstracts/AbstractMultiInstanceLock.kt +++ b/redpulsar-core/src/main/kotlin/com/himadieiev/redpulsar/core/locks/abstracts/AbstractMultiInstanceLock.kt @@ -1,8 +1,8 @@ package com.himadieiev.redpulsar.core.locks.abstracts import com.himadieiev.redpulsar.core.locks.abstracts.backends.LocksBackend +import com.himadieiev.redpulsar.core.locks.excecutors.WaitStrategy import com.himadieiev.redpulsar.core.locks.excecutors.executeWithRetry -import com.himadieiev.redpulsar.core.locks.excecutors.waitAllJobs import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.runBlocking import java.time.Duration @@ -43,7 +43,7 @@ abstract class AbstractMultiInstanceLock( cleanUp = { backend -> unlockInstance(backend, resourceName) }, - waiter = ::waitAllJobs, + waitStrategy = WaitStrategy.ALL, callee = { backend -> lockInstance(backend, resourceName, ttl) }, @@ -64,7 +64,7 @@ abstract class AbstractMultiInstanceLock( defaultDrift = Duration.ofMillis(3L * backends.size), retryCount = retryCount, retryDelay = retryDelay, - waiter = ::waitAllJobs, + waitStrategy = WaitStrategy.ALL, callee = { backend -> unlockInstance(backend, resourceName) }, diff --git a/redpulsar-core/src/main/kotlin/com/himadieiev/redpulsar/core/locks/excecutors/ExecutorAdditions.kt b/redpulsar-core/src/main/kotlin/com/himadieiev/redpulsar/core/locks/excecutors/ExecutorAdditions.kt index c52c308..d75403a 100644 --- a/redpulsar-core/src/main/kotlin/com/himadieiev/redpulsar/core/locks/excecutors/ExecutorAdditions.kt +++ b/redpulsar-core/src/main/kotlin/com/himadieiev/redpulsar/core/locks/excecutors/ExecutorAdditions.kt @@ -1,29 +1,26 @@ package com.himadieiev.redpulsar.core.locks.excecutors -import kotlinx.coroutines.Job -import kotlinx.coroutines.joinAll -import kotlinx.coroutines.yield -import java.util.concurrent.atomic.AtomicInteger - -suspend inline fun waitAllJobs(jobs: List) { - jobs.joinAll() +enum class WaitStrategy { + ALL, + MAJORITY, } -suspend inline fun waitMajorityJobs(jobs: List) { - val quorum: Int = jobs.size / 2 + 1 - val succeed = AtomicInteger(0) - val failed = AtomicInteger(0) - jobs.forEach { job -> - job.invokeOnCompletion { cause -> - if (cause == null) { - succeed.incrementAndGet() - } else { - failed.incrementAndGet() - } - } +fun requiredToSuccessCount( + waitStrategy: WaitStrategy, + backendsSize: Int, +): Int { + return when (waitStrategy) { + WaitStrategy.ALL -> backendsSize + WaitStrategy.MAJORITY -> backendsSize / 2 + 1 } - while (succeed.get() < quorum && failed.get() < quorum) { - yield() +} + +fun enoughToFailCount( + waitStrategy: WaitStrategy, + backendsSize: Int, +): Int { + return when (waitStrategy) { + WaitStrategy.ALL -> 1 + WaitStrategy.MAJORITY -> backendsSize - requiredToSuccessCount(waitStrategy, backendsSize) + 1 } - return } diff --git a/redpulsar-core/src/main/kotlin/com/himadieiev/redpulsar/core/locks/excecutors/MultyInstanceExecutor.kt b/redpulsar-core/src/main/kotlin/com/himadieiev/redpulsar/core/locks/excecutors/MultiInstanceExecutor.kt similarity index 75% rename from redpulsar-core/src/main/kotlin/com/himadieiev/redpulsar/core/locks/excecutors/MultyInstanceExecutor.kt rename to redpulsar-core/src/main/kotlin/com/himadieiev/redpulsar/core/locks/excecutors/MultiInstanceExecutor.kt index 70914e4..96e9413 100644 --- a/redpulsar-core/src/main/kotlin/com/himadieiev/redpulsar/core/locks/excecutors/MultyInstanceExecutor.kt +++ b/redpulsar-core/src/main/kotlin/com/himadieiev/redpulsar/core/locks/excecutors/MultiInstanceExecutor.kt @@ -4,17 +4,23 @@ import com.himadieiev.redpulsar.core.locks.abstracts.Backend import com.himadieiev.redpulsar.core.utils.withRetry import com.himadieiev.redpulsar.core.utils.withTimeoutInThread import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Deferred +import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.Job +import kotlinx.coroutines.async +import kotlinx.coroutines.joinAll import kotlinx.coroutines.launch +import kotlinx.coroutines.yield import mu.KotlinLogging import java.time.Duration import java.util.Collections +import java.util.concurrent.atomic.AtomicInteger import kotlin.system.measureTimeMillis /** * An algorithm for running closure on multiple remote instances proxied by [backends]. * Each call will be executed in separate [Job] and wait for the result using one of two self-explanatory strategies: - * [waitAllJobs] and [waitMajorityJobs]. + * [WaitStrategy.ALL] and [WaitStrategy.MAJORITY]. * Also, it checks whether the result is successful on majority (depends on waiting strategy) of instances and time * spend for getting results is not exceeding some reasonable time difference using [timeout] and * clock drift. @@ -32,32 +38,45 @@ import kotlin.system.measureTimeMillis * @param waiter [Function] the function to wait for results. * @param callee [Function] the function to call on each backend. */ +@OptIn(ExperimentalCoroutinesApi::class) suspend inline fun multiInstanceExecute( backends: List, scope: CoroutineScope, timeout: Duration, defaultDrift: Duration = Duration.ofMillis(3), + waitStrategy: WaitStrategy = WaitStrategy.ALL, crossinline cleanUp: (backend: T) -> Unit = { _ -> }, - crossinline waiter: suspend (jobs: List) -> Unit, crossinline callee: suspend (backend: T) -> R, ): List { - val jobs = mutableListOf() - val quorum: Int = backends.size / 2 + 1 + val jobs = mutableListOf>() + val quorum = backends.size / 2 + 1 + val successCount = requiredToSuccessCount(waitStrategy, backends.size) + val failedCount = enoughToFailCount(waitStrategy, backends.size) val results = Collections.synchronizedList(mutableListOf()) val clockDrift = (timeout.toMillis() * 0.01).toLong() + defaultDrift.toMillis() val timeDiff = measureTimeMillis { backends.forEach { backend -> jobs.add( - scope.launch { - val result = callee(backend) + scope.async { callee(backend) }, + ) + } + val failed = AtomicInteger(0) + jobs.forEach { job -> + job.invokeOnCompletion { cause -> + if (cause == null) { + val result = job.getCompleted() if (result != null) { results.add(result) + return@invokeOnCompletion } - }, - ) + } + failed.incrementAndGet() + } + } + while (results.size < successCount && failed.get() < failedCount) { + yield() } - waiter(jobs) } val validity = timeout.toMillis() - timeDiff - clockDrift val logger = KotlinLogging.logger { } @@ -68,7 +87,7 @@ suspend inline fun multiInstanceExecute( backends.forEach { backend -> cleanUpJobs.add(scope.launch { cleanUp(backend) }) } - waitAllJobs(cleanUpJobs) + cleanUpJobs.joinAll() return emptyList() } return results @@ -81,8 +100,8 @@ suspend inline fun multiInstanceExecuteWithRetry( defaultDrift: Duration = Duration.ofMillis(3), retryCount: Int = 3, retryDelay: Duration = Duration.ofMillis(100), + waitStrategy: WaitStrategy = WaitStrategy.ALL, crossinline cleanUp: (backend: T) -> Unit = { _ -> }, - crossinline waiter: suspend (jobs: List) -> Unit, crossinline callee: suspend (backend: T) -> R, ): List { return withRetry(retryCount = retryCount, retryDelay = retryDelay) { @@ -91,7 +110,7 @@ suspend inline fun multiInstanceExecuteWithRetry( scope = scope, timeout = timeout, defaultDrift = defaultDrift, - waiter = waiter, + waitStrategy = waitStrategy, callee = callee, cleanUp = cleanUp, ) @@ -105,7 +124,7 @@ suspend fun List.executeWithRetry( retryCount: Int = 3, retryDelay: Duration = Duration.ofMillis(100), cleanUp: (backend: T) -> Unit = { _ -> }, - waiter: suspend (jobs: List) -> Unit, + waitStrategy: WaitStrategy = WaitStrategy.ALL, callee: suspend (backend: T) -> R, ): List { return multiInstanceExecuteWithRetry( @@ -115,7 +134,7 @@ suspend fun List.executeWithRetry( defaultDrift = defaultDrift, retryCount = retryCount, retryDelay = retryDelay, - waiter = waiter, + waitStrategy = waitStrategy, callee = callee, cleanUp = cleanUp, ) diff --git a/redpulsar-core/src/test/kotlin/com/himadieiev/redpulsar/core/locks/excecutors/MultyInstanceExecutorTest.kt b/redpulsar-core/src/test/kotlin/com/himadieiev/redpulsar/core/locks/excecutors/MultiInstanceExecutorTest.kt similarity index 92% rename from redpulsar-core/src/test/kotlin/com/himadieiev/redpulsar/core/locks/excecutors/MultyInstanceExecutorTest.kt rename to redpulsar-core/src/test/kotlin/com/himadieiev/redpulsar/core/locks/excecutors/MultiInstanceExecutorTest.kt index 6d4a636..7eeb473 100644 --- a/redpulsar-core/src/test/kotlin/com/himadieiev/redpulsar/core/locks/excecutors/MultyInstanceExecutorTest.kt +++ b/redpulsar-core/src/test/kotlin/com/himadieiev/redpulsar/core/locks/excecutors/MultiInstanceExecutorTest.kt @@ -17,7 +17,7 @@ import org.junit.jupiter.params.provider.ValueSource import java.time.Duration @Tag(TestTags.UNIT) -class MultyInstanceExecutorTest { +class MultiInstanceExecutorTest { private var backends: List = emptyList() private lateinit var scope: CoroutineScope @@ -39,7 +39,7 @@ class MultyInstanceExecutorTest { backends = backends, scope = scope, timeout = Duration.ofSeconds(1), - waiter = ::waitAllJobs, + waitStrategy = WaitStrategy.ALL, ) { backend -> backend.test() } @@ -50,7 +50,7 @@ class MultyInstanceExecutorTest { } @ParameterizedTest(name = "quorum instances are down {0} instances") - @ValueSource(ints = [2, 3, 4, 5, 7, 10]) + @ValueSource(ints = [1, 2, 3, 4, 5, 7, 10]) fun `quorum instances are down`(number: Int) { val quorum = number / 2 + 1 backends = createBackends(number) @@ -65,7 +65,7 @@ class MultyInstanceExecutorTest { backends = backends, scope = scope, timeout = Duration.ofSeconds(1), - waiter = ::waitAllJobs, + waitStrategy = WaitStrategy.ALL, ) { backend -> backend.test() } @@ -76,7 +76,7 @@ class MultyInstanceExecutorTest { } @ParameterizedTest(name = "non quorum instances are down {0} instances") - @ValueSource(ints = [2, 3, 4, 5, 7, 10]) + @ValueSource(ints = [1, 2, 3, 4, 5, 7, 10]) fun `non quorum instances are down`(number: Int) { val quorum = number / 2 + 1 backends = createBackends(number) @@ -91,7 +91,7 @@ class MultyInstanceExecutorTest { backends = backends, scope = scope, timeout = Duration.ofSeconds(1), - waiter = ::waitAllJobs, + waitStrategy = WaitStrategy.ALL, ) { backend -> backend.test() } @@ -113,7 +113,7 @@ class MultyInstanceExecutorTest { backends = backends, scope = scope, timeout = Duration.ofSeconds(1), - waiter = ::waitMajorityJobs, + waitStrategy = WaitStrategy.MAJORITY, ) { backend -> backend.test() } } @@ -136,7 +136,7 @@ class MultyInstanceExecutorTest { backends = backends, scope = scope, timeout = Duration.ofSeconds(1), - waiter = ::waitMajorityJobs, + waitStrategy = WaitStrategy.MAJORITY, ) { backend -> backend.test() } } @@ -145,7 +145,7 @@ class MultyInstanceExecutorTest { } @ParameterizedTest(name = "retry on non quorum instance count is down {0} instances") - @ValueSource(ints = [2, 3, 4, 5, 7, 10]) + @ValueSource(ints = [1, 2, 3, 4, 5, 7, 10]) fun `retry on non quorum instance count is down`(number: Int) { val quorum = number / 2 + 1 backends = createBackends(number) @@ -162,7 +162,7 @@ class MultyInstanceExecutorTest { timeout = Duration.ofSeconds(1), retryCount = 3, retryDelay = Duration.ofMillis(1), - waiter = ::waitAllJobs, + waitStrategy = WaitStrategy.ALL, ) { backend -> backend.test() } From b757e948296d019347cb35aeabc5016ba82f15ca Mon Sep 17 00:00:00 2001 From: himadieievsv Date: Tue, 16 Jan 2024 23:10:56 +0900 Subject: [PATCH 2/6] update kdoc --- .../redpulsar/core/locks/excecutors/MultiInstanceExecutor.kt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/redpulsar-core/src/main/kotlin/com/himadieiev/redpulsar/core/locks/excecutors/MultiInstanceExecutor.kt b/redpulsar-core/src/main/kotlin/com/himadieiev/redpulsar/core/locks/excecutors/MultiInstanceExecutor.kt index 96e9413..c76854c 100644 --- a/redpulsar-core/src/main/kotlin/com/himadieiev/redpulsar/core/locks/excecutors/MultiInstanceExecutor.kt +++ b/redpulsar-core/src/main/kotlin/com/himadieiev/redpulsar/core/locks/excecutors/MultiInstanceExecutor.kt @@ -34,8 +34,8 @@ import kotlin.system.measureTimeMillis * @param scope [CoroutineScope] the scope to run coroutine in. * @param timeout [Duration] the maximum time to wait. * @param defaultDrift [Duration] the default clock drift. + * @param waitStrategy [WaitStrategy] the strategy to wait for results. * @param cleanUp [Function] the function to clean up resources on each backend. - * @param waiter [Function] the function to wait for results. * @param callee [Function] the function to call on each backend. */ @OptIn(ExperimentalCoroutinesApi::class) From a29914f74f0887b14ed61257d49e346b1feb321b Mon Sep 17 00:00:00 2001 From: himadieievsv Date: Tue, 16 Jan 2024 23:13:36 +0900 Subject: [PATCH 3/6] remove logger --- .../redpulsar/core/locks/excecutors/MultiInstanceExecutor.kt | 4 ---- 1 file changed, 4 deletions(-) diff --git a/redpulsar-core/src/main/kotlin/com/himadieiev/redpulsar/core/locks/excecutors/MultiInstanceExecutor.kt b/redpulsar-core/src/main/kotlin/com/himadieiev/redpulsar/core/locks/excecutors/MultiInstanceExecutor.kt index c76854c..5515118 100644 --- a/redpulsar-core/src/main/kotlin/com/himadieiev/redpulsar/core/locks/excecutors/MultiInstanceExecutor.kt +++ b/redpulsar-core/src/main/kotlin/com/himadieiev/redpulsar/core/locks/excecutors/MultiInstanceExecutor.kt @@ -11,7 +11,6 @@ import kotlinx.coroutines.async import kotlinx.coroutines.joinAll import kotlinx.coroutines.launch import kotlinx.coroutines.yield -import mu.KotlinLogging import java.time.Duration import java.util.Collections import java.util.concurrent.atomic.AtomicInteger @@ -79,9 +78,6 @@ suspend inline fun multiInstanceExecute( } } val validity = timeout.toMillis() - timeDiff - clockDrift - val logger = KotlinLogging.logger { } - logger.info { "Validity: $validity, Result size: ${results.size}" } - logger.info { "Results: $results" } if (results.size < quorum || validity < 0) { val cleanUpJobs = mutableListOf() backends.forEach { backend -> From b2f983e7e2e5fd779a6605a6dd013679493a2f62 Mon Sep 17 00:00:00 2001 From: himadieievsv Date: Tue, 16 Jan 2024 23:28:04 +0900 Subject: [PATCH 4/6] wait according to strategy but success according to quorum --- .../core/locks/excecutors/MultiInstanceExecutor.kt | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/redpulsar-core/src/main/kotlin/com/himadieiev/redpulsar/core/locks/excecutors/MultiInstanceExecutor.kt b/redpulsar-core/src/main/kotlin/com/himadieiev/redpulsar/core/locks/excecutors/MultiInstanceExecutor.kt index 5515118..704fd32 100644 --- a/redpulsar-core/src/main/kotlin/com/himadieiev/redpulsar/core/locks/excecutors/MultiInstanceExecutor.kt +++ b/redpulsar-core/src/main/kotlin/com/himadieiev/redpulsar/core/locks/excecutors/MultiInstanceExecutor.kt @@ -60,20 +60,22 @@ suspend inline fun multiInstanceExecute( scope.async { callee(backend) }, ) } + val succeed = AtomicInteger(0) val failed = AtomicInteger(0) jobs.forEach { job -> job.invokeOnCompletion { cause -> if (cause == null) { + succeed.incrementAndGet() val result = job.getCompleted() if (result != null) { results.add(result) - return@invokeOnCompletion } + } else { + failed.incrementAndGet() } - failed.incrementAndGet() } } - while (results.size < successCount && failed.get() < failedCount) { + while (succeed.get() < successCount && failed.get() < failedCount) { yield() } } From 9415b2b9abecbf643c053391d451a5e8832a39aa Mon Sep 17 00:00:00 2001 From: himadieievsv Date: Tue, 16 Jan 2024 23:33:13 +0900 Subject: [PATCH 5/6] fail only if majority failed --- .../core/locks/excecutors/ExecutorAdditions.kt | 10 ---------- .../core/locks/excecutors/MultiInstanceExecutor.kt | 3 +-- 2 files changed, 1 insertion(+), 12 deletions(-) diff --git a/redpulsar-core/src/main/kotlin/com/himadieiev/redpulsar/core/locks/excecutors/ExecutorAdditions.kt b/redpulsar-core/src/main/kotlin/com/himadieiev/redpulsar/core/locks/excecutors/ExecutorAdditions.kt index d75403a..df4b420 100644 --- a/redpulsar-core/src/main/kotlin/com/himadieiev/redpulsar/core/locks/excecutors/ExecutorAdditions.kt +++ b/redpulsar-core/src/main/kotlin/com/himadieiev/redpulsar/core/locks/excecutors/ExecutorAdditions.kt @@ -14,13 +14,3 @@ fun requiredToSuccessCount( WaitStrategy.MAJORITY -> backendsSize / 2 + 1 } } - -fun enoughToFailCount( - waitStrategy: WaitStrategy, - backendsSize: Int, -): Int { - return when (waitStrategy) { - WaitStrategy.ALL -> 1 - WaitStrategy.MAJORITY -> backendsSize - requiredToSuccessCount(waitStrategy, backendsSize) + 1 - } -} diff --git a/redpulsar-core/src/main/kotlin/com/himadieiev/redpulsar/core/locks/excecutors/MultiInstanceExecutor.kt b/redpulsar-core/src/main/kotlin/com/himadieiev/redpulsar/core/locks/excecutors/MultiInstanceExecutor.kt index 704fd32..8a03ebc 100644 --- a/redpulsar-core/src/main/kotlin/com/himadieiev/redpulsar/core/locks/excecutors/MultiInstanceExecutor.kt +++ b/redpulsar-core/src/main/kotlin/com/himadieiev/redpulsar/core/locks/excecutors/MultiInstanceExecutor.kt @@ -50,7 +50,6 @@ suspend inline fun multiInstanceExecute( val jobs = mutableListOf>() val quorum = backends.size / 2 + 1 val successCount = requiredToSuccessCount(waitStrategy, backends.size) - val failedCount = enoughToFailCount(waitStrategy, backends.size) val results = Collections.synchronizedList(mutableListOf()) val clockDrift = (timeout.toMillis() * 0.01).toLong() + defaultDrift.toMillis() val timeDiff = @@ -75,7 +74,7 @@ suspend inline fun multiInstanceExecute( } } } - while (succeed.get() < successCount && failed.get() < failedCount) { + while (succeed.get() < successCount && failed.get() < quorum) { yield() } } From 6fb933bde601239536cc4258955fa26ccad4a870 Mon Sep 17 00:00:00 2001 From: himadieievsv Date: Wed, 17 Jan 2024 00:58:05 +0900 Subject: [PATCH 6/6] multi instance executor finally get result properly for MAJORITY instances --- .../core/locks/ListeningCountDownLatch.kt | 6 +- .../locks/excecutors/ExecutorAdditions.kt | 16 ---- .../locks/excecutors/MultiInstanceExecutor.kt | 53 +++++++------ .../core/locks/excecutors/WaitStrategy.kt | 6 ++ .../core/locks/ListeningCountDownLatchTest.kt | 79 +++++++++++++++++++ 5 files changed, 115 insertions(+), 45 deletions(-) delete mode 100644 redpulsar-core/src/main/kotlin/com/himadieiev/redpulsar/core/locks/excecutors/ExecutorAdditions.kt create mode 100644 redpulsar-core/src/main/kotlin/com/himadieiev/redpulsar/core/locks/excecutors/WaitStrategy.kt diff --git a/redpulsar-core/src/main/kotlin/com/himadieiev/redpulsar/core/locks/ListeningCountDownLatch.kt b/redpulsar-core/src/main/kotlin/com/himadieiev/redpulsar/core/locks/ListeningCountDownLatch.kt index 2d608be..d812a32 100644 --- a/redpulsar-core/src/main/kotlin/com/himadieiev/redpulsar/core/locks/ListeningCountDownLatch.kt +++ b/redpulsar-core/src/main/kotlin/com/himadieiev/redpulsar/core/locks/ListeningCountDownLatch.kt @@ -146,7 +146,7 @@ class ListeningCountDownLatch( timeout = maxDuration, retryCount = retryCount, retryDelay = retryDelay, - waitStrategy = WaitStrategy.MAJORITY, + waitStrategy = WaitStrategy.ALL, ) { backend -> backend.count( latchKeyName = buildKey(name), @@ -167,7 +167,7 @@ class ListeningCountDownLatch( timeout = maxDuration, retryCount = retryCount, retryDelay = retryDelay, - waitStrategy = WaitStrategy.MAJORITY, + waitStrategy = WaitStrategy.ALL, ) { backend -> backend.undoCount( latchKeyName = buildKey(name), @@ -185,7 +185,7 @@ class ListeningCountDownLatch( timeout = maxDuration.multipliedBy(2), retryCount = retryCount, retryDelay = retryDelay, - waitStrategy = WaitStrategy.MAJORITY, + waitStrategy = WaitStrategy.ALL, ) { backend -> backend.checkCount(latchKeyName = buildKey(name)) } diff --git a/redpulsar-core/src/main/kotlin/com/himadieiev/redpulsar/core/locks/excecutors/ExecutorAdditions.kt b/redpulsar-core/src/main/kotlin/com/himadieiev/redpulsar/core/locks/excecutors/ExecutorAdditions.kt deleted file mode 100644 index df4b420..0000000 --- a/redpulsar-core/src/main/kotlin/com/himadieiev/redpulsar/core/locks/excecutors/ExecutorAdditions.kt +++ /dev/null @@ -1,16 +0,0 @@ -package com.himadieiev.redpulsar.core.locks.excecutors - -enum class WaitStrategy { - ALL, - MAJORITY, -} - -fun requiredToSuccessCount( - waitStrategy: WaitStrategy, - backendsSize: Int, -): Int { - return when (waitStrategy) { - WaitStrategy.ALL -> backendsSize - WaitStrategy.MAJORITY -> backendsSize / 2 + 1 - } -} diff --git a/redpulsar-core/src/main/kotlin/com/himadieiev/redpulsar/core/locks/excecutors/MultiInstanceExecutor.kt b/redpulsar-core/src/main/kotlin/com/himadieiev/redpulsar/core/locks/excecutors/MultiInstanceExecutor.kt index 8a03ebc..f1bfc10 100644 --- a/redpulsar-core/src/main/kotlin/com/himadieiev/redpulsar/core/locks/excecutors/MultiInstanceExecutor.kt +++ b/redpulsar-core/src/main/kotlin/com/himadieiev/redpulsar/core/locks/excecutors/MultiInstanceExecutor.kt @@ -14,7 +14,6 @@ import kotlinx.coroutines.yield import java.time.Duration import java.util.Collections import java.util.concurrent.atomic.AtomicInteger -import kotlin.system.measureTimeMillis /** * An algorithm for running closure on multiple remote instances proxied by [backends]. @@ -49,36 +48,38 @@ suspend inline fun multiInstanceExecute( ): List { val jobs = mutableListOf>() val quorum = backends.size / 2 + 1 - val successCount = requiredToSuccessCount(waitStrategy, backends.size) val results = Collections.synchronizedList(mutableListOf()) val clockDrift = (timeout.toMillis() * 0.01).toLong() + defaultDrift.toMillis() - val timeDiff = - measureTimeMillis { - backends.forEach { backend -> - jobs.add( - scope.async { callee(backend) }, - ) - } - val succeed = AtomicInteger(0) - val failed = AtomicInteger(0) - jobs.forEach { job -> - job.invokeOnCompletion { cause -> - if (cause == null) { - succeed.incrementAndGet() - val result = job.getCompleted() - if (result != null) { - results.add(result) - } - } else { - failed.incrementAndGet() - } + val t1 = System.currentTimeMillis() + backends.forEach { backend -> + jobs.add( + scope.async { callee(backend) }, + ) + } + val succeed = AtomicInteger(0) + val failed = AtomicInteger(0) + jobs.forEach { job -> + job.invokeOnCompletion { cause -> + if (cause == null) { + val result = job.getCompleted() + if (result != null) { + results.add(result) } - } - while (succeed.get() < successCount && failed.get() < quorum) { - yield() + succeed.incrementAndGet() + } else { + failed.incrementAndGet() } } - val validity = timeout.toMillis() - timeDiff - clockDrift + } + while (succeed.get() + failed.get() < backends.size) { + if (waitStrategy == WaitStrategy.MAJORITY && results.size >= quorum) { + jobs.forEach(Job::cancel) + break + } + yield() + } + val t2 = System.currentTimeMillis() + val validity = timeout.toMillis() - (t2 - t1) - clockDrift if (results.size < quorum || validity < 0) { val cleanUpJobs = mutableListOf() backends.forEach { backend -> diff --git a/redpulsar-core/src/main/kotlin/com/himadieiev/redpulsar/core/locks/excecutors/WaitStrategy.kt b/redpulsar-core/src/main/kotlin/com/himadieiev/redpulsar/core/locks/excecutors/WaitStrategy.kt new file mode 100644 index 0000000..711e1ac --- /dev/null +++ b/redpulsar-core/src/main/kotlin/com/himadieiev/redpulsar/core/locks/excecutors/WaitStrategy.kt @@ -0,0 +1,6 @@ +package com.himadieiev.redpulsar.core.locks.excecutors + +enum class WaitStrategy { + ALL, + MAJORITY, +} diff --git a/redpulsar-core/src/test/kotlin/com/himadieiev/redpulsar/core/locks/ListeningCountDownLatchTest.kt b/redpulsar-core/src/test/kotlin/com/himadieiev/redpulsar/core/locks/ListeningCountDownLatchTest.kt index c036902..ac3b470 100644 --- a/redpulsar-core/src/test/kotlin/com/himadieiev/redpulsar/core/locks/ListeningCountDownLatchTest.kt +++ b/redpulsar-core/src/test/kotlin/com/himadieiev/redpulsar/core/locks/ListeningCountDownLatchTest.kt @@ -3,6 +3,8 @@ package com.himadieiev.redpulsar.core.locks import TestTags import com.himadieiev.redpulsar.core.locks.abstracts.backends.CountDownLatchBackend import com.himadieiev.redpulsar.core.locks.api.CallResult +import io.mockk.Call +import io.mockk.MockKAnswerScope import io.mockk.coEvery import io.mockk.coVerify import io.mockk.every @@ -547,6 +549,76 @@ class ListeningCountDownLatchTest { } } + @Test + fun `one instance will continue to wait`() { + instances.forEach { backend -> + backend.everyCheckCount("countdownlatch:test", 1) + } + backend1.everyListen { + delay(50) + "open" + } + backend2.everyListen { + delay(2000) + null + } + backend3.everyListen { + delay(100) + "open" + } + val latch = + ListeningCountDownLatch( + "test", + 5, + backends = instances, + retryDelay = Duration.ofMillis(1), + retryCount = 1, + ) + assertEquals(CallResult.SUCCESS, latch.await(Duration.ofSeconds(1))) + + coVerify(atMost = 1) { + instances.forEach { backend -> backend.listen(any()) } + } + verify(exactly = 1) { + instances.forEach { backend -> backend.checkCount(any()) } + } + } + + @Test + fun `failed instance return value first`() { + instances.forEach { backend -> + backend.everyCheckCount("countdownlatch:test", 1) + } + backend1.everyListen { + delay(100) + "open" + } + backend2.everyListen { + delay(200) + "open" + } + backend3.everyListen { + delay(50) + null + } + val latch = + ListeningCountDownLatch( + "test", + 5, + backends = instances, + retryDelay = Duration.ofMillis(1), + retryCount = 1, + ) + assertEquals(CallResult.SUCCESS, latch.await(Duration.ofSeconds(1))) + + coVerify(atMost = 1) { + instances.forEach { backend -> backend.listen(any()) } + } + verify(exactly = 1) { + instances.forEach { backend -> backend.checkCount(any()) } + } + } + @Test fun `quorum wasn't reach at majority`() { instances.forEach { backend -> @@ -677,6 +749,13 @@ class ListeningCountDownLatchTest { } returns returnVal } + private fun CountDownLatchBackend.everyListen(answer: suspend MockKAnswerScope.(Call) -> String?) { + val backend = this + coEvery { + backend.listen(eq("countdownlatch:channels:test")) + } coAnswers answer + } + private fun CountDownLatchBackend.everyCheckCount( latchKeyName: String, returnVal: Long?,