From f29c0528fea1cbda40142931ffcf42292bfd39f4 Mon Sep 17 00:00:00 2001 From: Serhii Date: Sat, 13 Jan 2024 22:17:56 +0900 Subject: [PATCH] make multiInstanceExecute cooperative (#58) * make multiInstanceExecute cooperative * bump version to 1.1.3 --- build.gradle.kts | 2 +- .../core/locks/ListeningCountDownLatch.kt | 76 ++++++++------- .../abstracts/AbstractMultiInstanceLock.kt | 54 ++++++----- .../locks/excecutors/MultyInstanceExecutor.kt | 19 ++-- .../excecutors/MultyInstanceExecutorTest.kt | 97 +++++++++++-------- 5 files changed, 138 insertions(+), 110 deletions(-) diff --git a/build.gradle.kts b/build.gradle.kts index 49ccb51..4b6e782 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -18,7 +18,7 @@ plugins { allprojects { group = "com.himadieiev" - version = "1.1.2" + version = "1.1.3" repositories { mavenCentral() diff --git a/redpulsar-core/src/main/kotlin/com/himadieiev/redpulsar/core/locks/ListeningCountDownLatch.kt b/redpulsar-core/src/main/kotlin/com/himadieiev/redpulsar/core/locks/ListeningCountDownLatch.kt index 1e9e2a1..6436906 100644 --- a/redpulsar-core/src/main/kotlin/com/himadieiev/redpulsar/core/locks/ListeningCountDownLatch.kt +++ b/redpulsar-core/src/main/kotlin/com/himadieiev/redpulsar/core/locks/ListeningCountDownLatch.kt @@ -4,6 +4,7 @@ import com.himadieiev.redpulsar.core.locks.abstracts.backends.CountDownLatchBack import com.himadieiev.redpulsar.core.locks.api.CallResult import com.himadieiev.redpulsar.core.locks.api.CountDownLatch import com.himadieiev.redpulsar.core.locks.excecutors.executeWithRetry +import com.himadieiev.redpulsar.core.locks.excecutors.waitAllJobs import com.himadieiev.redpulsar.core.locks.excecutors.waitMajorityJobs import kotlinx.coroutines.CoroutineName import kotlinx.coroutines.CoroutineScope @@ -140,46 +141,55 @@ class ListeningCountDownLatch( } private fun count(): List { - return backends.executeWithRetry( - scope = scope, - timeout = maxDuration, - retryCount = retryCount, - retryDelay = retryDelay, - ) { backend -> - backend.count( - latchKeyName = buildKey(name), - channelName = buildKey(channelSpace, name), - clientId = clientId, - count = currentCounter.get(), - initialCount = count, - ttl = maxDuration.multipliedBy(2), - ) + return runBlocking { + backends.executeWithRetry( + scope = scope, + timeout = maxDuration, + retryCount = retryCount, + retryDelay = retryDelay, + waiter = ::waitAllJobs, + ) { backend -> + backend.count( + latchKeyName = buildKey(name), + channelName = buildKey(channelSpace, name), + clientId = clientId, + count = currentCounter.get(), + initialCount = count, + ttl = maxDuration.multipliedBy(2), + ) + } } } private fun undoCount() { - backends.executeWithRetry( - scope = scope, - timeout = maxDuration, - retryCount = retryCount, - retryDelay = retryDelay, - ) { backend -> - backend.undoCount( - latchKeyName = buildKey(name), - clientId = clientId, - count = currentCounter.get(), - ) + runBlocking { + backends.executeWithRetry( + scope = scope, + timeout = maxDuration, + retryCount = retryCount, + retryDelay = retryDelay, + waiter = ::waitAllJobs, + ) { backend -> + backend.undoCount( + latchKeyName = buildKey(name), + clientId = clientId, + count = currentCounter.get(), + ) + } } } private fun checkCount(scope: CoroutineScope): List { - return backends.executeWithRetry( - scope = scope, - timeout = maxDuration.multipliedBy(2), - retryCount = retryCount, - retryDelay = retryDelay, - ) { backend -> - backend.checkCount(latchKeyName = buildKey(name)) + return runBlocking { + backends.executeWithRetry( + scope = scope, + timeout = maxDuration.multipliedBy(2), + retryCount = retryCount, + retryDelay = retryDelay, + waiter = ::waitAllJobs, + ) { backend -> + backend.checkCount(latchKeyName = buildKey(name)) + } } } @@ -192,8 +202,6 @@ class ListeningCountDownLatch( timeout = timeout, retryCount = retryCount, retryDelay = retryDelay, - // Allow non-quorum polling here. That might need to be changed as it could lead to unexpected behavior - // if multiple instances goes down or encounter network issue. waiter = ::waitMajorityJobs, ) { backend -> backend.listen(channelName = buildKey(channelSpace, name)) diff --git a/redpulsar-core/src/main/kotlin/com/himadieiev/redpulsar/core/locks/abstracts/AbstractMultiInstanceLock.kt b/redpulsar-core/src/main/kotlin/com/himadieiev/redpulsar/core/locks/abstracts/AbstractMultiInstanceLock.kt index e9d3dfa..8286dfd 100644 --- a/redpulsar-core/src/main/kotlin/com/himadieiev/redpulsar/core/locks/abstracts/AbstractMultiInstanceLock.kt +++ b/redpulsar-core/src/main/kotlin/com/himadieiev/redpulsar/core/locks/abstracts/AbstractMultiInstanceLock.kt @@ -2,7 +2,9 @@ package com.himadieiev.redpulsar.core.locks.abstracts import com.himadieiev.redpulsar.core.locks.abstracts.backends.LocksBackend import com.himadieiev.redpulsar.core.locks.excecutors.executeWithRetry +import com.himadieiev.redpulsar.core.locks.excecutors.waitAllJobs import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.runBlocking import java.time.Duration /** @@ -31,19 +33,22 @@ abstract class AbstractMultiInstanceLock( resourceName: String, ttl: Duration, ): Boolean { - return backends.executeWithRetry( - scope = scope, - timeout = ttl, - defaultDrift = Duration.ofMillis(3L * backends.size), - retryCount = retryCount, - retryDelay = retryDelay, - cleanUp = { backend -> - unlockInstance(backend, resourceName) - }, - callee = { backend -> - lockInstance(backend, resourceName, ttl) - }, - ).isNotEmpty() + return runBlocking { + backends.executeWithRetry( + scope = scope, + timeout = ttl, + defaultDrift = Duration.ofMillis(3L * backends.size), + retryCount = retryCount, + retryDelay = retryDelay, + cleanUp = { backend -> + unlockInstance(backend, resourceName) + }, + waiter = ::waitAllJobs, + callee = { backend -> + lockInstance(backend, resourceName, ttl) + }, + ) + }.isNotEmpty() } /** @@ -52,16 +57,19 @@ abstract class AbstractMultiInstanceLock( * @return [Boolean] true if the lock was released, false otherwise. */ override fun unlock(resourceName: String): Boolean { - return backends.executeWithRetry( - scope = scope, - timeout = Duration.ofSeconds(1), - defaultDrift = Duration.ofMillis(3L * backends.size), - retryCount = retryCount, - retryDelay = retryDelay, - callee = { backend -> - unlockInstance(backend, resourceName) - }, - ).isNotEmpty() + return runBlocking { + backends.executeWithRetry( + scope = scope, + timeout = Duration.ofSeconds(1), + defaultDrift = Duration.ofMillis(3L * backends.size), + retryCount = retryCount, + retryDelay = retryDelay, + waiter = ::waitAllJobs, + callee = { backend -> + unlockInstance(backend, resourceName) + }, + ) + }.isNotEmpty() } protected fun backendSize(): Int = backends.size diff --git a/redpulsar-core/src/main/kotlin/com/himadieiev/redpulsar/core/locks/excecutors/MultyInstanceExecutor.kt b/redpulsar-core/src/main/kotlin/com/himadieiev/redpulsar/core/locks/excecutors/MultyInstanceExecutor.kt index e3ae9ad..816a5cf 100644 --- a/redpulsar-core/src/main/kotlin/com/himadieiev/redpulsar/core/locks/excecutors/MultyInstanceExecutor.kt +++ b/redpulsar-core/src/main/kotlin/com/himadieiev/redpulsar/core/locks/excecutors/MultyInstanceExecutor.kt @@ -6,7 +6,6 @@ import com.himadieiev.redpulsar.core.utils.withTimeoutInThread import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Job import kotlinx.coroutines.launch -import kotlinx.coroutines.runBlocking import java.time.Duration import java.util.Collections import kotlin.system.measureTimeMillis @@ -32,13 +31,13 @@ import kotlin.system.measureTimeMillis * @param waiter [Function] the function to wait for results. * @param callee [Function] the function to call on each backend. */ -inline fun multiInstanceExecute( +suspend inline fun multiInstanceExecute( backends: List, scope: CoroutineScope, timeout: Duration, defaultDrift: Duration = Duration.ofMillis(3), crossinline cleanUp: (backend: T) -> Unit = { _ -> }, - crossinline waiter: suspend (jobs: List) -> Unit = ::waitAllJobs, + crossinline waiter: suspend (jobs: List) -> Unit, crossinline callee: suspend (backend: T) -> R, ): List { val jobs = mutableListOf() @@ -57,7 +56,7 @@ inline fun multiInstanceExecute( }, ) } - runBlocking(scope.coroutineContext) { waiter(jobs) } + waiter(jobs) } val validity = timeout.toMillis() - timeDiff - clockDrift if (results.size < quorum || validity < 0) { @@ -65,13 +64,13 @@ inline fun multiInstanceExecute( backends.forEach { backend -> cleanUpJobs.add(scope.launch { cleanUp(backend) }) } - runBlocking(scope.coroutineContext) { waitAllJobs(cleanUpJobs) } + waitAllJobs(cleanUpJobs) return emptyList() } return results } -inline fun multyInstanceExecuteWithRetry( +suspend inline fun multiInstanceExecuteWithRetry( backends: List, scope: CoroutineScope, timeout: Duration, @@ -79,7 +78,7 @@ inline fun multyInstanceExecuteWithRetry( retryCount: Int = 3, retryDelay: Duration = Duration.ofMillis(100), crossinline cleanUp: (backend: T) -> Unit = { _ -> }, - crossinline waiter: suspend (jobs: List) -> Unit = ::waitAllJobs, + crossinline waiter: suspend (jobs: List) -> Unit, crossinline callee: suspend (backend: T) -> R, ): List { return withRetry(retryCount = retryCount, retryDelay = retryDelay) { @@ -95,17 +94,17 @@ inline fun multyInstanceExecuteWithRetry( } } -fun List.executeWithRetry( +suspend fun List.executeWithRetry( scope: CoroutineScope, timeout: Duration, defaultDrift: Duration = Duration.ofMillis(3), retryCount: Int = 3, retryDelay: Duration = Duration.ofMillis(100), cleanUp: (backend: T) -> Unit = { _ -> }, - waiter: suspend (jobs: List) -> Unit = ::waitAllJobs, + waiter: suspend (jobs: List) -> Unit, callee: suspend (backend: T) -> R, ): List { - return multyInstanceExecuteWithRetry( + return multiInstanceExecuteWithRetry( backends = this, scope = scope, timeout = timeout, diff --git a/redpulsar-core/src/test/kotlin/com/himadieiev/redpulsar/core/locks/excecutors/MultyInstanceExecutorTest.kt b/redpulsar-core/src/test/kotlin/com/himadieiev/redpulsar/core/locks/excecutors/MultyInstanceExecutorTest.kt index ebcb67c..6d4a636 100644 --- a/redpulsar-core/src/test/kotlin/com/himadieiev/redpulsar/core/locks/excecutors/MultyInstanceExecutorTest.kt +++ b/redpulsar-core/src/test/kotlin/com/himadieiev/redpulsar/core/locks/excecutors/MultyInstanceExecutorTest.kt @@ -7,6 +7,7 @@ import io.mockk.mockk import io.mockk.verify import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.runBlocking import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.Assertions.assertTrue import org.junit.jupiter.api.BeforeEach @@ -33,13 +34,15 @@ class MultyInstanceExecutorTest { backends.forEach { backend -> every { backend.test() } returns "OK" } val result = - multiInstanceExecute( - backends = backends, - scope = scope, - timeout = Duration.ofSeconds(1), - waiter = ::waitAllJobs, - ) { backend -> - backend.test() + runBlocking { + multiInstanceExecute( + backends = backends, + scope = scope, + timeout = Duration.ofSeconds(1), + waiter = ::waitAllJobs, + ) { backend -> + backend.test() + } } assertEquals(number, result.size) @@ -57,13 +60,15 @@ class MultyInstanceExecutorTest { } val result = - multiInstanceExecute( - backends = backends, - scope = scope, - timeout = Duration.ofSeconds(1), - waiter = ::waitAllJobs, - ) { backend -> - backend.test() + runBlocking { + multiInstanceExecute( + backends = backends, + scope = scope, + timeout = Duration.ofSeconds(1), + waiter = ::waitAllJobs, + ) { backend -> + backend.test() + } } assertEquals(emptyList(), result) @@ -81,13 +86,15 @@ class MultyInstanceExecutorTest { } val result = - multiInstanceExecute( - backends = backends, - scope = scope, - timeout = Duration.ofSeconds(1), - waiter = ::waitAllJobs, - ) { backend -> - backend.test() + runBlocking { + multiInstanceExecute( + backends = backends, + scope = scope, + timeout = Duration.ofSeconds(1), + waiter = ::waitAllJobs, + ) { backend -> + backend.test() + } } assertTrue(number / 2 + 1 <= result.size) @@ -101,12 +108,14 @@ class MultyInstanceExecutorTest { backends.forEach { backend -> every { backend.test() } returns "OK" } val result = - multiInstanceExecute( - backends = backends, - scope = scope, - timeout = Duration.ofSeconds(1), - waiter = ::waitMajorityJobs, - ) { backend -> backend.test() } + runBlocking { + multiInstanceExecute( + backends = backends, + scope = scope, + timeout = Duration.ofSeconds(1), + waiter = ::waitMajorityJobs, + ) { backend -> backend.test() } + } assertTrue(number / 2 + 1 <= result.size) verify(exactly = 1) { backends.forEach { backend -> backend.test() } } @@ -122,12 +131,14 @@ class MultyInstanceExecutorTest { } val result = - multiInstanceExecute( - backends = backends, - scope = scope, - timeout = Duration.ofSeconds(1), - waiter = ::waitMajorityJobs, - ) { backend -> backend.test() } + runBlocking { + multiInstanceExecute( + backends = backends, + scope = scope, + timeout = Duration.ofSeconds(1), + waiter = ::waitMajorityJobs, + ) { backend -> backend.test() } + } assertTrue(number / 2 + 1 <= result.size) verify(exactly = 1) { backends.forEach { backend -> backend.test() } } @@ -144,15 +155,17 @@ class MultyInstanceExecutorTest { } val result = - multyInstanceExecuteWithRetry( - backends = backends, - scope = scope, - timeout = Duration.ofSeconds(1), - retryCount = 3, - retryDelay = Duration.ofMillis(1), - waiter = ::waitAllJobs, - ) { backend -> - backend.test() + runBlocking { + multiInstanceExecuteWithRetry( + backends = backends, + scope = scope, + timeout = Duration.ofSeconds(1), + retryCount = 3, + retryDelay = Duration.ofMillis(1), + waiter = ::waitAllJobs, + ) { backend -> + backend.test() + } } assertEquals(emptyList(), result)