From 0b590d1124ff327a47b10541e52cbe4644baa0db Mon Sep 17 00:00:00 2001 From: himadieievsv Date: Tue, 16 Jan 2024 23:09:55 +0900 Subject: [PATCH] refactor multiInstanceExecute --- build.gradle.kts | 2 +- .../core/locks/ListeningCountDownLatch.kt | 11 ++--- .../abstracts/AbstractMultiInstanceLock.kt | 6 +-- .../locks/excecutors/ExecutorAdditions.kt | 41 ++++++++-------- ...ceExecutor.kt => MultiInstanceExecutor.kt} | 47 +++++++++++++------ ...orTest.kt => MultiInstanceExecutorTest.kt} | 20 ++++---- 6 files changed, 71 insertions(+), 56 deletions(-) rename redpulsar-core/src/main/kotlin/com/himadieiev/redpulsar/core/locks/excecutors/{MultyInstanceExecutor.kt => MultiInstanceExecutor.kt} (75%) rename redpulsar-core/src/test/kotlin/com/himadieiev/redpulsar/core/locks/excecutors/{MultyInstanceExecutorTest.kt => MultiInstanceExecutorTest.kt} (92%) diff --git a/build.gradle.kts b/build.gradle.kts index 4b6e782..02d734e 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -18,7 +18,7 @@ plugins { allprojects { group = "com.himadieiev" - version = "1.1.3" + version = "1.1.4" repositories { mavenCentral() diff --git a/redpulsar-core/src/main/kotlin/com/himadieiev/redpulsar/core/locks/ListeningCountDownLatch.kt b/redpulsar-core/src/main/kotlin/com/himadieiev/redpulsar/core/locks/ListeningCountDownLatch.kt index 6436906..2d608be 100644 --- a/redpulsar-core/src/main/kotlin/com/himadieiev/redpulsar/core/locks/ListeningCountDownLatch.kt +++ b/redpulsar-core/src/main/kotlin/com/himadieiev/redpulsar/core/locks/ListeningCountDownLatch.kt @@ -3,9 +3,8 @@ package com.himadieiev.redpulsar.core.locks import com.himadieiev.redpulsar.core.locks.abstracts.backends.CountDownLatchBackend import com.himadieiev.redpulsar.core.locks.api.CallResult import com.himadieiev.redpulsar.core.locks.api.CountDownLatch +import com.himadieiev.redpulsar.core.locks.excecutors.WaitStrategy import com.himadieiev.redpulsar.core.locks.excecutors.executeWithRetry -import com.himadieiev.redpulsar.core.locks.excecutors.waitAllJobs -import com.himadieiev.redpulsar.core.locks.excecutors.waitMajorityJobs import kotlinx.coroutines.CoroutineName import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Dispatchers @@ -147,7 +146,7 @@ class ListeningCountDownLatch( timeout = maxDuration, retryCount = retryCount, retryDelay = retryDelay, - waiter = ::waitAllJobs, + waitStrategy = WaitStrategy.MAJORITY, ) { backend -> backend.count( latchKeyName = buildKey(name), @@ -168,7 +167,7 @@ class ListeningCountDownLatch( timeout = maxDuration, retryCount = retryCount, retryDelay = retryDelay, - waiter = ::waitAllJobs, + waitStrategy = WaitStrategy.MAJORITY, ) { backend -> backend.undoCount( latchKeyName = buildKey(name), @@ -186,7 +185,7 @@ class ListeningCountDownLatch( timeout = maxDuration.multipliedBy(2), retryCount = retryCount, retryDelay = retryDelay, - waiter = ::waitAllJobs, + waitStrategy = WaitStrategy.MAJORITY, ) { backend -> backend.checkCount(latchKeyName = buildKey(name)) } @@ -202,7 +201,7 @@ class ListeningCountDownLatch( timeout = timeout, retryCount = retryCount, retryDelay = retryDelay, - waiter = ::waitMajorityJobs, + waitStrategy = WaitStrategy.MAJORITY, ) { backend -> backend.listen(channelName = buildKey(channelSpace, name)) } diff --git a/redpulsar-core/src/main/kotlin/com/himadieiev/redpulsar/core/locks/abstracts/AbstractMultiInstanceLock.kt b/redpulsar-core/src/main/kotlin/com/himadieiev/redpulsar/core/locks/abstracts/AbstractMultiInstanceLock.kt index 8286dfd..40cb1b9 100644 --- a/redpulsar-core/src/main/kotlin/com/himadieiev/redpulsar/core/locks/abstracts/AbstractMultiInstanceLock.kt +++ b/redpulsar-core/src/main/kotlin/com/himadieiev/redpulsar/core/locks/abstracts/AbstractMultiInstanceLock.kt @@ -1,8 +1,8 @@ package com.himadieiev.redpulsar.core.locks.abstracts import com.himadieiev.redpulsar.core.locks.abstracts.backends.LocksBackend +import com.himadieiev.redpulsar.core.locks.excecutors.WaitStrategy import com.himadieiev.redpulsar.core.locks.excecutors.executeWithRetry -import com.himadieiev.redpulsar.core.locks.excecutors.waitAllJobs import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.runBlocking import java.time.Duration @@ -43,7 +43,7 @@ abstract class AbstractMultiInstanceLock( cleanUp = { backend -> unlockInstance(backend, resourceName) }, - waiter = ::waitAllJobs, + waitStrategy = WaitStrategy.ALL, callee = { backend -> lockInstance(backend, resourceName, ttl) }, @@ -64,7 +64,7 @@ abstract class AbstractMultiInstanceLock( defaultDrift = Duration.ofMillis(3L * backends.size), retryCount = retryCount, retryDelay = retryDelay, - waiter = ::waitAllJobs, + waitStrategy = WaitStrategy.ALL, callee = { backend -> unlockInstance(backend, resourceName) }, diff --git a/redpulsar-core/src/main/kotlin/com/himadieiev/redpulsar/core/locks/excecutors/ExecutorAdditions.kt b/redpulsar-core/src/main/kotlin/com/himadieiev/redpulsar/core/locks/excecutors/ExecutorAdditions.kt index c52c308..d75403a 100644 --- a/redpulsar-core/src/main/kotlin/com/himadieiev/redpulsar/core/locks/excecutors/ExecutorAdditions.kt +++ b/redpulsar-core/src/main/kotlin/com/himadieiev/redpulsar/core/locks/excecutors/ExecutorAdditions.kt @@ -1,29 +1,26 @@ package com.himadieiev.redpulsar.core.locks.excecutors -import kotlinx.coroutines.Job -import kotlinx.coroutines.joinAll -import kotlinx.coroutines.yield -import java.util.concurrent.atomic.AtomicInteger - -suspend inline fun waitAllJobs(jobs: List) { - jobs.joinAll() +enum class WaitStrategy { + ALL, + MAJORITY, } -suspend inline fun waitMajorityJobs(jobs: List) { - val quorum: Int = jobs.size / 2 + 1 - val succeed = AtomicInteger(0) - val failed = AtomicInteger(0) - jobs.forEach { job -> - job.invokeOnCompletion { cause -> - if (cause == null) { - succeed.incrementAndGet() - } else { - failed.incrementAndGet() - } - } +fun requiredToSuccessCount( + waitStrategy: WaitStrategy, + backendsSize: Int, +): Int { + return when (waitStrategy) { + WaitStrategy.ALL -> backendsSize + WaitStrategy.MAJORITY -> backendsSize / 2 + 1 } - while (succeed.get() < quorum && failed.get() < quorum) { - yield() +} + +fun enoughToFailCount( + waitStrategy: WaitStrategy, + backendsSize: Int, +): Int { + return when (waitStrategy) { + WaitStrategy.ALL -> 1 + WaitStrategy.MAJORITY -> backendsSize - requiredToSuccessCount(waitStrategy, backendsSize) + 1 } - return } diff --git a/redpulsar-core/src/main/kotlin/com/himadieiev/redpulsar/core/locks/excecutors/MultyInstanceExecutor.kt b/redpulsar-core/src/main/kotlin/com/himadieiev/redpulsar/core/locks/excecutors/MultiInstanceExecutor.kt similarity index 75% rename from redpulsar-core/src/main/kotlin/com/himadieiev/redpulsar/core/locks/excecutors/MultyInstanceExecutor.kt rename to redpulsar-core/src/main/kotlin/com/himadieiev/redpulsar/core/locks/excecutors/MultiInstanceExecutor.kt index 70914e4..96e9413 100644 --- a/redpulsar-core/src/main/kotlin/com/himadieiev/redpulsar/core/locks/excecutors/MultyInstanceExecutor.kt +++ b/redpulsar-core/src/main/kotlin/com/himadieiev/redpulsar/core/locks/excecutors/MultiInstanceExecutor.kt @@ -4,17 +4,23 @@ import com.himadieiev.redpulsar.core.locks.abstracts.Backend import com.himadieiev.redpulsar.core.utils.withRetry import com.himadieiev.redpulsar.core.utils.withTimeoutInThread import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Deferred +import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.Job +import kotlinx.coroutines.async +import kotlinx.coroutines.joinAll import kotlinx.coroutines.launch +import kotlinx.coroutines.yield import mu.KotlinLogging import java.time.Duration import java.util.Collections +import java.util.concurrent.atomic.AtomicInteger import kotlin.system.measureTimeMillis /** * An algorithm for running closure on multiple remote instances proxied by [backends]. * Each call will be executed in separate [Job] and wait for the result using one of two self-explanatory strategies: - * [waitAllJobs] and [waitMajorityJobs]. + * [WaitStrategy.ALL] and [WaitStrategy.MAJORITY]. * Also, it checks whether the result is successful on majority (depends on waiting strategy) of instances and time * spend for getting results is not exceeding some reasonable time difference using [timeout] and * clock drift. @@ -32,32 +38,45 @@ import kotlin.system.measureTimeMillis * @param waiter [Function] the function to wait for results. * @param callee [Function] the function to call on each backend. */ +@OptIn(ExperimentalCoroutinesApi::class) suspend inline fun multiInstanceExecute( backends: List, scope: CoroutineScope, timeout: Duration, defaultDrift: Duration = Duration.ofMillis(3), + waitStrategy: WaitStrategy = WaitStrategy.ALL, crossinline cleanUp: (backend: T) -> Unit = { _ -> }, - crossinline waiter: suspend (jobs: List) -> Unit, crossinline callee: suspend (backend: T) -> R, ): List { - val jobs = mutableListOf() - val quorum: Int = backends.size / 2 + 1 + val jobs = mutableListOf>() + val quorum = backends.size / 2 + 1 + val successCount = requiredToSuccessCount(waitStrategy, backends.size) + val failedCount = enoughToFailCount(waitStrategy, backends.size) val results = Collections.synchronizedList(mutableListOf()) val clockDrift = (timeout.toMillis() * 0.01).toLong() + defaultDrift.toMillis() val timeDiff = measureTimeMillis { backends.forEach { backend -> jobs.add( - scope.launch { - val result = callee(backend) + scope.async { callee(backend) }, + ) + } + val failed = AtomicInteger(0) + jobs.forEach { job -> + job.invokeOnCompletion { cause -> + if (cause == null) { + val result = job.getCompleted() if (result != null) { results.add(result) + return@invokeOnCompletion } - }, - ) + } + failed.incrementAndGet() + } + } + while (results.size < successCount && failed.get() < failedCount) { + yield() } - waiter(jobs) } val validity = timeout.toMillis() - timeDiff - clockDrift val logger = KotlinLogging.logger { } @@ -68,7 +87,7 @@ suspend inline fun multiInstanceExecute( backends.forEach { backend -> cleanUpJobs.add(scope.launch { cleanUp(backend) }) } - waitAllJobs(cleanUpJobs) + cleanUpJobs.joinAll() return emptyList() } return results @@ -81,8 +100,8 @@ suspend inline fun multiInstanceExecuteWithRetry( defaultDrift: Duration = Duration.ofMillis(3), retryCount: Int = 3, retryDelay: Duration = Duration.ofMillis(100), + waitStrategy: WaitStrategy = WaitStrategy.ALL, crossinline cleanUp: (backend: T) -> Unit = { _ -> }, - crossinline waiter: suspend (jobs: List) -> Unit, crossinline callee: suspend (backend: T) -> R, ): List { return withRetry(retryCount = retryCount, retryDelay = retryDelay) { @@ -91,7 +110,7 @@ suspend inline fun multiInstanceExecuteWithRetry( scope = scope, timeout = timeout, defaultDrift = defaultDrift, - waiter = waiter, + waitStrategy = waitStrategy, callee = callee, cleanUp = cleanUp, ) @@ -105,7 +124,7 @@ suspend fun List.executeWithRetry( retryCount: Int = 3, retryDelay: Duration = Duration.ofMillis(100), cleanUp: (backend: T) -> Unit = { _ -> }, - waiter: suspend (jobs: List) -> Unit, + waitStrategy: WaitStrategy = WaitStrategy.ALL, callee: suspend (backend: T) -> R, ): List { return multiInstanceExecuteWithRetry( @@ -115,7 +134,7 @@ suspend fun List.executeWithRetry( defaultDrift = defaultDrift, retryCount = retryCount, retryDelay = retryDelay, - waiter = waiter, + waitStrategy = waitStrategy, callee = callee, cleanUp = cleanUp, ) diff --git a/redpulsar-core/src/test/kotlin/com/himadieiev/redpulsar/core/locks/excecutors/MultyInstanceExecutorTest.kt b/redpulsar-core/src/test/kotlin/com/himadieiev/redpulsar/core/locks/excecutors/MultiInstanceExecutorTest.kt similarity index 92% rename from redpulsar-core/src/test/kotlin/com/himadieiev/redpulsar/core/locks/excecutors/MultyInstanceExecutorTest.kt rename to redpulsar-core/src/test/kotlin/com/himadieiev/redpulsar/core/locks/excecutors/MultiInstanceExecutorTest.kt index 6d4a636..7eeb473 100644 --- a/redpulsar-core/src/test/kotlin/com/himadieiev/redpulsar/core/locks/excecutors/MultyInstanceExecutorTest.kt +++ b/redpulsar-core/src/test/kotlin/com/himadieiev/redpulsar/core/locks/excecutors/MultiInstanceExecutorTest.kt @@ -17,7 +17,7 @@ import org.junit.jupiter.params.provider.ValueSource import java.time.Duration @Tag(TestTags.UNIT) -class MultyInstanceExecutorTest { +class MultiInstanceExecutorTest { private var backends: List = emptyList() private lateinit var scope: CoroutineScope @@ -39,7 +39,7 @@ class MultyInstanceExecutorTest { backends = backends, scope = scope, timeout = Duration.ofSeconds(1), - waiter = ::waitAllJobs, + waitStrategy = WaitStrategy.ALL, ) { backend -> backend.test() } @@ -50,7 +50,7 @@ class MultyInstanceExecutorTest { } @ParameterizedTest(name = "quorum instances are down {0} instances") - @ValueSource(ints = [2, 3, 4, 5, 7, 10]) + @ValueSource(ints = [1, 2, 3, 4, 5, 7, 10]) fun `quorum instances are down`(number: Int) { val quorum = number / 2 + 1 backends = createBackends(number) @@ -65,7 +65,7 @@ class MultyInstanceExecutorTest { backends = backends, scope = scope, timeout = Duration.ofSeconds(1), - waiter = ::waitAllJobs, + waitStrategy = WaitStrategy.ALL, ) { backend -> backend.test() } @@ -76,7 +76,7 @@ class MultyInstanceExecutorTest { } @ParameterizedTest(name = "non quorum instances are down {0} instances") - @ValueSource(ints = [2, 3, 4, 5, 7, 10]) + @ValueSource(ints = [1, 2, 3, 4, 5, 7, 10]) fun `non quorum instances are down`(number: Int) { val quorum = number / 2 + 1 backends = createBackends(number) @@ -91,7 +91,7 @@ class MultyInstanceExecutorTest { backends = backends, scope = scope, timeout = Duration.ofSeconds(1), - waiter = ::waitAllJobs, + waitStrategy = WaitStrategy.ALL, ) { backend -> backend.test() } @@ -113,7 +113,7 @@ class MultyInstanceExecutorTest { backends = backends, scope = scope, timeout = Duration.ofSeconds(1), - waiter = ::waitMajorityJobs, + waitStrategy = WaitStrategy.MAJORITY, ) { backend -> backend.test() } } @@ -136,7 +136,7 @@ class MultyInstanceExecutorTest { backends = backends, scope = scope, timeout = Duration.ofSeconds(1), - waiter = ::waitMajorityJobs, + waitStrategy = WaitStrategy.MAJORITY, ) { backend -> backend.test() } } @@ -145,7 +145,7 @@ class MultyInstanceExecutorTest { } @ParameterizedTest(name = "retry on non quorum instance count is down {0} instances") - @ValueSource(ints = [2, 3, 4, 5, 7, 10]) + @ValueSource(ints = [1, 2, 3, 4, 5, 7, 10]) fun `retry on non quorum instance count is down`(number: Int) { val quorum = number / 2 + 1 backends = createBackends(number) @@ -162,7 +162,7 @@ class MultyInstanceExecutorTest { timeout = Duration.ofSeconds(1), retryCount = 3, retryDelay = Duration.ofMillis(1), - waiter = ::waitAllJobs, + waitStrategy = WaitStrategy.ALL, ) { backend -> backend.test() }