diff --git a/redpulsar-core/src/main/kotlin/me/himadieiev/redpulsar/core/locks/ListeningCountDownLatch.kt b/redpulsar-core/src/main/kotlin/me/himadieiev/redpulsar/core/locks/ListeningCountDownLatch.kt index bd2c0c2..17ad165 100644 --- a/redpulsar-core/src/main/kotlin/me/himadieiev/redpulsar/core/locks/ListeningCountDownLatch.kt +++ b/redpulsar-core/src/main/kotlin/me/himadieiev/redpulsar/core/locks/ListeningCountDownLatch.kt @@ -139,7 +139,7 @@ class ListeningCountDownLatch( private fun count(): List { return backends.executeWithRetry( scope = scope, - releaseTime = maxDuration, + timeout = maxDuration, retryCount = retryCount, retryDelay = retryDelay, ) { backend -> @@ -157,7 +157,7 @@ class ListeningCountDownLatch( private fun undoCount() { backends.executeWithRetry( scope = scope, - releaseTime = maxDuration, + timeout = maxDuration, retryCount = retryCount, retryDelay = retryDelay, ) { backend -> @@ -172,7 +172,7 @@ class ListeningCountDownLatch( private fun checkCount(scope: CoroutineScope): List { return backends.executeWithRetry( scope = scope, - releaseTime = maxDuration * 2, + timeout = maxDuration * 2, retryCount = retryCount, retryDelay = retryDelay, ) { backend -> @@ -186,7 +186,7 @@ class ListeningCountDownLatch( ): List { return backends.executeWithRetry( scope = scope, - releaseTime = timeout, + timeout = timeout, retryCount = retryCount, retryDelay = retryDelay, // Allow non-quorum polling here. That might need to be changed as it could lead to unexpected behavior diff --git a/redpulsar-core/src/main/kotlin/me/himadieiev/redpulsar/core/locks/excecutors/MultyInstanceExecutor.kt b/redpulsar-core/src/main/kotlin/me/himadieiev/redpulsar/core/locks/excecutors/MultyInstanceExecutor.kt index a8e706e..756ebee 100644 --- a/redpulsar-core/src/main/kotlin/me/himadieiev/redpulsar/core/locks/excecutors/MultyInstanceExecutor.kt +++ b/redpulsar-core/src/main/kotlin/me/himadieiev/redpulsar/core/locks/excecutors/MultyInstanceExecutor.kt @@ -13,20 +13,20 @@ import kotlin.time.Duration import kotlin.time.Duration.Companion.milliseconds /** - * Algorithm for run closure on multiple remote instances proxied by [backends]. - * Each call will be executed in separate [Job]. - * After all calls are finished, algorithm will check whether the result is successful depends on a strategy. - * So far there are two self-explanatory strategies: [waitAllJobs] and [waitAnyJobs]. - * Besides those strategies, there is a clock drift. Some operation might be time sensitive - * e.g. setting expiration date, so clock drift is used to judge whether the result is valid allowing - * some reasonable time difference in closure executions on multiple instances. + * An algorithm for running closure on multiple remote instances proxied by [backends]. + * Each call will be executed in separate [Job] and wait for the result using one of two self-explanatory strategies: + * [waitAllJobs] and [waitAnyJobs]. + * Also, it checks whether the result is successful on majority (depends on waiting strategy) of instances and time + * spend for getting results is not exceeding some reasonable time difference using [timeout] and + * clok drift. + * * Coroutine used by callee must be cooperative coroutine (not blocking). * In order to cancel jobs forcefully, use [withTimeoutInThread] instead. */ inline fun multyInstanceExecute( backends: List, scope: CoroutineScope, - releaseTime: Duration, + timeout: Duration, defaultDrift: Duration = 3.milliseconds, crossinline waiter: suspend (jobs: List, results: MutableList) -> Unit = ::waitAllJobs, crossinline callee: suspend (backend: T) -> R, @@ -34,8 +34,7 @@ inline fun multyInstanceExecute( val jobs = mutableListOf() val quorum: Int = backends.size / 2 + 1 val results = Collections.synchronizedList(mutableListOf()) - val clockDrift = - (releaseTime.inWholeMilliseconds * 0.01).toLong() + defaultDrift.inWholeMilliseconds * backends.size + val clockDrift = (timeout.inWholeMilliseconds * 0.01).toLong() + defaultDrift.inWholeMilliseconds val timeDiff = measureTimeMillis { backends.forEach { backend -> @@ -50,7 +49,7 @@ inline fun multyInstanceExecute( } runBlocking(scope.coroutineContext) { waiter(jobs, results) } } - val validity = releaseTime.inWholeMilliseconds - timeDiff - clockDrift + val validity = timeout.inWholeMilliseconds - timeDiff - clockDrift if (results.size < quorum || validity < 0) { return emptyList() } @@ -60,7 +59,7 @@ inline fun multyInstanceExecute( inline fun multyInstanceExecuteWithRetry( backends: List, scope: CoroutineScope, - releaseTime: Duration, + timeout: Duration, defaultDrift: Duration = 3.milliseconds, retryCount: Int = 3, retryDelay: Duration = 100.milliseconds, @@ -71,7 +70,7 @@ inline fun multyInstanceExecuteWithRetry( return@withRetry multyInstanceExecute( backends = backends, scope = scope, - releaseTime = releaseTime, + timeout = timeout, defaultDrift = defaultDrift, waiter = waiter, callee = callee, @@ -81,7 +80,7 @@ inline fun multyInstanceExecuteWithRetry( fun List.executeWithRetry( scope: CoroutineScope, - releaseTime: Duration, + timeout: Duration, defaultDrift: Duration = 3.milliseconds, retryCount: Int = 3, retryDelay: Duration = 100.milliseconds, @@ -91,7 +90,7 @@ fun List.executeWithRetry( return multyInstanceExecuteWithRetry( backends = this, scope = scope, - releaseTime = releaseTime, + timeout = timeout, defaultDrift = defaultDrift, retryCount = retryCount, retryDelay = retryDelay, diff --git a/redpulsar-core/src/test/kotlin/me/himadieiev/redpulsar/core/locks/excecutors/MultyInstanceExecutorTest.kt b/redpulsar-core/src/test/kotlin/me/himadieiev/redpulsar/core/locks/excecutors/MultyInstanceExecutorTest.kt index bc212d2..c583b3e 100644 --- a/redpulsar-core/src/test/kotlin/me/himadieiev/redpulsar/core/locks/excecutors/MultyInstanceExecutorTest.kt +++ b/redpulsar-core/src/test/kotlin/me/himadieiev/redpulsar/core/locks/excecutors/MultyInstanceExecutorTest.kt @@ -37,7 +37,7 @@ class MultyInstanceExecutorTest { multyInstanceExecute( backends = backends, scope = scope, - releaseTime = 1.seconds, + timeout = 1.seconds, waiter = ::waitAllJobs, ) { backend -> backend.test() @@ -61,7 +61,7 @@ class MultyInstanceExecutorTest { multyInstanceExecute( backends = backends, scope = scope, - releaseTime = 1.seconds, + timeout = 1.seconds, waiter = ::waitAllJobs, ) { backend -> backend.test() @@ -85,7 +85,7 @@ class MultyInstanceExecutorTest { multyInstanceExecute( backends = backends, scope = scope, - releaseTime = 1.seconds, + timeout = 1.seconds, waiter = ::waitAllJobs, ) { backend -> backend.test() @@ -105,7 +105,7 @@ class MultyInstanceExecutorTest { multyInstanceExecute( backends = backends, scope = scope, - releaseTime = 1.seconds, + timeout = 1.seconds, waiter = ::waitAnyJobs, ) { backend -> backend.test() } @@ -124,7 +124,7 @@ class MultyInstanceExecutorTest { multyInstanceExecute( backends = backends, scope = scope, - releaseTime = 1.seconds, + timeout = 1.seconds, waiter = ::waitAnyJobs, ) { backend -> backend.test() } @@ -146,7 +146,7 @@ class MultyInstanceExecutorTest { multyInstanceExecuteWithRetry( backends = backends, scope = scope, - releaseTime = 1.seconds, + timeout = 1.seconds, retryCount = 3, retryDelay = 1.milliseconds, waiter = ::waitAllJobs,