Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor multiInstanceExecute #63

Merged
merged 7 commits into from
Jan 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ plugins {

allprojects {
group = "com.himadieiev"
version = "1.1.3"
version = "1.1.4"

repositories {
mavenCentral()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,8 @@ package com.himadieiev.redpulsar.core.locks
import com.himadieiev.redpulsar.core.locks.abstracts.backends.CountDownLatchBackend
import com.himadieiev.redpulsar.core.locks.api.CallResult
import com.himadieiev.redpulsar.core.locks.api.CountDownLatch
import com.himadieiev.redpulsar.core.locks.excecutors.WaitStrategy
import com.himadieiev.redpulsar.core.locks.excecutors.executeWithRetry
import com.himadieiev.redpulsar.core.locks.excecutors.waitAllJobs
import com.himadieiev.redpulsar.core.locks.excecutors.waitMajorityJobs
import kotlinx.coroutines.CoroutineName
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
Expand Down Expand Up @@ -147,7 +146,7 @@ class ListeningCountDownLatch(
timeout = maxDuration,
retryCount = retryCount,
retryDelay = retryDelay,
waiter = ::waitAllJobs,
waitStrategy = WaitStrategy.ALL,
) { backend ->
backend.count(
latchKeyName = buildKey(name),
Expand All @@ -168,7 +167,7 @@ class ListeningCountDownLatch(
timeout = maxDuration,
retryCount = retryCount,
retryDelay = retryDelay,
waiter = ::waitAllJobs,
waitStrategy = WaitStrategy.ALL,
) { backend ->
backend.undoCount(
latchKeyName = buildKey(name),
Expand All @@ -186,7 +185,7 @@ class ListeningCountDownLatch(
timeout = maxDuration.multipliedBy(2),
retryCount = retryCount,
retryDelay = retryDelay,
waiter = ::waitAllJobs,
waitStrategy = WaitStrategy.ALL,
) { backend ->
backend.checkCount(latchKeyName = buildKey(name))
}
Expand All @@ -202,7 +201,7 @@ class ListeningCountDownLatch(
timeout = timeout,
retryCount = retryCount,
retryDelay = retryDelay,
waiter = ::waitMajorityJobs,
waitStrategy = WaitStrategy.MAJORITY,
) { backend ->
backend.listen(channelName = buildKey(channelSpace, name))
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package com.himadieiev.redpulsar.core.locks.abstracts

import com.himadieiev.redpulsar.core.locks.abstracts.backends.LocksBackend
import com.himadieiev.redpulsar.core.locks.excecutors.WaitStrategy
import com.himadieiev.redpulsar.core.locks.excecutors.executeWithRetry
import com.himadieiev.redpulsar.core.locks.excecutors.waitAllJobs
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.runBlocking
import java.time.Duration
Expand Down Expand Up @@ -43,7 +43,7 @@ abstract class AbstractMultiInstanceLock(
cleanUp = { backend ->
unlockInstance(backend, resourceName)
},
waiter = ::waitAllJobs,
waitStrategy = WaitStrategy.ALL,
callee = { backend ->
lockInstance(backend, resourceName, ttl)
},
Expand All @@ -64,7 +64,7 @@ abstract class AbstractMultiInstanceLock(
defaultDrift = Duration.ofMillis(3L * backends.size),
retryCount = retryCount,
retryDelay = retryDelay,
waiter = ::waitAllJobs,
waitStrategy = WaitStrategy.ALL,
callee = { backend ->
unlockInstance(backend, resourceName)
},
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,21 @@ import com.himadieiev.redpulsar.core.locks.abstracts.Backend
import com.himadieiev.redpulsar.core.utils.withRetry
import com.himadieiev.redpulsar.core.utils.withTimeoutInThread
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Deferred
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.Job
import kotlinx.coroutines.async
import kotlinx.coroutines.joinAll
import kotlinx.coroutines.launch
import kotlinx.coroutines.yield
import java.time.Duration
import java.util.Collections
import kotlin.system.measureTimeMillis
import java.util.concurrent.atomic.AtomicInteger

/**
* An algorithm for running closure on multiple remote instances proxied by [backends].
* Each call will be executed in separate [Job] and wait for the result using one of two self-explanatory strategies:
* [waitAllJobs] and [waitMajorityJobs].
* [WaitStrategy.ALL] and [WaitStrategy.MAJORITY].
* Also, it checks whether the result is successful on majority (depends on waiting strategy) of instances and time
* spend for getting results is not exceeding some reasonable time difference using [timeout] and
* clock drift.
Expand All @@ -27,44 +32,60 @@ import kotlin.system.measureTimeMillis
* @param scope [CoroutineScope] the scope to run coroutine in.
* @param timeout [Duration] the maximum time to wait.
* @param defaultDrift [Duration] the default clock drift.
* @param waitStrategy [WaitStrategy] the strategy to wait for results.
* @param cleanUp [Function] the function to clean up resources on each backend.
* @param waiter [Function] the function to wait for results.
* @param callee [Function] the function to call on each backend.
*/
@OptIn(ExperimentalCoroutinesApi::class)
suspend inline fun <T : Backend, R> multiInstanceExecute(
backends: List<T>,
scope: CoroutineScope,
timeout: Duration,
defaultDrift: Duration = Duration.ofMillis(3),
waitStrategy: WaitStrategy = WaitStrategy.ALL,
crossinline cleanUp: (backend: T) -> Unit = { _ -> },
crossinline waiter: suspend (jobs: List<Job>) -> Unit,
crossinline callee: suspend (backend: T) -> R,
): List<R> {
val jobs = mutableListOf<Job>()
val quorum: Int = backends.size / 2 + 1
val jobs = mutableListOf<Deferred<R>>()
val quorum = backends.size / 2 + 1
val results = Collections.synchronizedList(mutableListOf<R>())
val clockDrift = (timeout.toMillis() * 0.01).toLong() + defaultDrift.toMillis()
val timeDiff =
measureTimeMillis {
backends.forEach { backend ->
jobs.add(
scope.launch {
val result = callee(backend)
if (result != null) {
results.add(result)
}
},
)
val t1 = System.currentTimeMillis()
backends.forEach { backend ->
jobs.add(
scope.async { callee(backend) },
)
}
val succeed = AtomicInteger(0)
val failed = AtomicInteger(0)
jobs.forEach { job ->
job.invokeOnCompletion { cause ->
if (cause == null) {
val result = job.getCompleted()
if (result != null) {
results.add(result)
}
succeed.incrementAndGet()
} else {
failed.incrementAndGet()
}
waiter(jobs)
}
val validity = timeout.toMillis() - timeDiff - clockDrift
}
while (succeed.get() + failed.get() < backends.size) {
if (waitStrategy == WaitStrategy.MAJORITY && results.size >= quorum) {
jobs.forEach(Job::cancel)
break
}
yield()
}
val t2 = System.currentTimeMillis()
val validity = timeout.toMillis() - (t2 - t1) - clockDrift
if (results.size < quorum || validity < 0) {
val cleanUpJobs = mutableListOf<Job>()
backends.forEach { backend ->
cleanUpJobs.add(scope.launch { cleanUp(backend) })
}
waitAllJobs(cleanUpJobs)
cleanUpJobs.joinAll()
return emptyList()
}
return results
Expand All @@ -77,8 +98,8 @@ suspend inline fun <T : Backend, R> multiInstanceExecuteWithRetry(
defaultDrift: Duration = Duration.ofMillis(3),
retryCount: Int = 3,
retryDelay: Duration = Duration.ofMillis(100),
waitStrategy: WaitStrategy = WaitStrategy.ALL,
crossinline cleanUp: (backend: T) -> Unit = { _ -> },
crossinline waiter: suspend (jobs: List<Job>) -> Unit,
crossinline callee: suspend (backend: T) -> R,
): List<R> {
return withRetry(retryCount = retryCount, retryDelay = retryDelay) {
Expand All @@ -87,7 +108,7 @@ suspend inline fun <T : Backend, R> multiInstanceExecuteWithRetry(
scope = scope,
timeout = timeout,
defaultDrift = defaultDrift,
waiter = waiter,
waitStrategy = waitStrategy,
callee = callee,
cleanUp = cleanUp,
)
Expand All @@ -101,7 +122,7 @@ suspend fun <T : Backend, R> List<T>.executeWithRetry(
retryCount: Int = 3,
retryDelay: Duration = Duration.ofMillis(100),
cleanUp: (backend: T) -> Unit = { _ -> },
waiter: suspend (jobs: List<Job>) -> Unit,
waitStrategy: WaitStrategy = WaitStrategy.ALL,
callee: suspend (backend: T) -> R,
): List<R> {
return multiInstanceExecuteWithRetry(
Expand All @@ -111,7 +132,7 @@ suspend fun <T : Backend, R> List<T>.executeWithRetry(
defaultDrift = defaultDrift,
retryCount = retryCount,
retryDelay = retryDelay,
waiter = waiter,
waitStrategy = waitStrategy,
callee = callee,
cleanUp = cleanUp,
)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package com.himadieiev.redpulsar.core.locks.excecutors

enum class WaitStrategy {
ALL,
MAJORITY,
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package com.himadieiev.redpulsar.core.locks
import TestTags
import com.himadieiev.redpulsar.core.locks.abstracts.backends.CountDownLatchBackend
import com.himadieiev.redpulsar.core.locks.api.CallResult
import io.mockk.Call
import io.mockk.MockKAnswerScope
import io.mockk.coEvery
import io.mockk.coVerify
import io.mockk.every
Expand Down Expand Up @@ -547,6 +549,76 @@ class ListeningCountDownLatchTest {
}
}

@Test
fun `one instance will continue to wait`() {
instances.forEach { backend ->
backend.everyCheckCount("countdownlatch:test", 1)
}
backend1.everyListen {
delay(50)
"open"
}
backend2.everyListen {
delay(2000)
null
}
backend3.everyListen {
delay(100)
"open"
}
val latch =
ListeningCountDownLatch(
"test",
5,
backends = instances,
retryDelay = Duration.ofMillis(1),
retryCount = 1,
)
assertEquals(CallResult.SUCCESS, latch.await(Duration.ofSeconds(1)))

coVerify(atMost = 1) {
instances.forEach { backend -> backend.listen(any()) }
}
verify(exactly = 1) {
instances.forEach { backend -> backend.checkCount(any()) }
}
}

@Test
fun `failed instance return value first`() {
instances.forEach { backend ->
backend.everyCheckCount("countdownlatch:test", 1)
}
backend1.everyListen {
delay(100)
"open"
}
backend2.everyListen {
delay(200)
"open"
}
backend3.everyListen {
delay(50)
null
}
val latch =
ListeningCountDownLatch(
"test",
5,
backends = instances,
retryDelay = Duration.ofMillis(1),
retryCount = 1,
)
assertEquals(CallResult.SUCCESS, latch.await(Duration.ofSeconds(1)))

coVerify(atMost = 1) {
instances.forEach { backend -> backend.listen(any()) }
}
verify(exactly = 1) {
instances.forEach { backend -> backend.checkCount(any()) }
}
}

@Test
fun `quorum wasn't reach at majority`() {
instances.forEach { backend ->
Expand Down Expand Up @@ -677,6 +749,13 @@ class ListeningCountDownLatchTest {
} returns returnVal
}

private fun CountDownLatchBackend.everyListen(answer: suspend MockKAnswerScope<String?, String?>.(Call) -> String?) {
val backend = this
coEvery {
backend.listen(eq("countdownlatch:channels:test"))
} coAnswers answer
}

private fun CountDownLatchBackend.everyCheckCount(
latchKeyName: String,
returnVal: Long?,
Expand Down
Loading