Skip to content

Commit

Permalink
refactor multiInstanceExecute
Browse files Browse the repository at this point in the history
  • Loading branch information
himadieievsv committed Jan 16, 2024
1 parent 42f07a7 commit 0b590d1
Show file tree
Hide file tree
Showing 6 changed files with 71 additions and 56 deletions.
2 changes: 1 addition & 1 deletion build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ plugins {

allprojects {
group = "com.himadieiev"
version = "1.1.3"
version = "1.1.4"

repositories {
mavenCentral()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,8 @@ package com.himadieiev.redpulsar.core.locks
import com.himadieiev.redpulsar.core.locks.abstracts.backends.CountDownLatchBackend
import com.himadieiev.redpulsar.core.locks.api.CallResult
import com.himadieiev.redpulsar.core.locks.api.CountDownLatch
import com.himadieiev.redpulsar.core.locks.excecutors.WaitStrategy
import com.himadieiev.redpulsar.core.locks.excecutors.executeWithRetry
import com.himadieiev.redpulsar.core.locks.excecutors.waitAllJobs
import com.himadieiev.redpulsar.core.locks.excecutors.waitMajorityJobs
import kotlinx.coroutines.CoroutineName
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
Expand Down Expand Up @@ -147,7 +146,7 @@ class ListeningCountDownLatch(
timeout = maxDuration,
retryCount = retryCount,
retryDelay = retryDelay,
waiter = ::waitAllJobs,
waitStrategy = WaitStrategy.MAJORITY,
) { backend ->
backend.count(
latchKeyName = buildKey(name),
Expand All @@ -168,7 +167,7 @@ class ListeningCountDownLatch(
timeout = maxDuration,
retryCount = retryCount,
retryDelay = retryDelay,
waiter = ::waitAllJobs,
waitStrategy = WaitStrategy.MAJORITY,
) { backend ->
backend.undoCount(
latchKeyName = buildKey(name),
Expand All @@ -186,7 +185,7 @@ class ListeningCountDownLatch(
timeout = maxDuration.multipliedBy(2),
retryCount = retryCount,
retryDelay = retryDelay,
waiter = ::waitAllJobs,
waitStrategy = WaitStrategy.MAJORITY,
) { backend ->
backend.checkCount(latchKeyName = buildKey(name))
}
Expand All @@ -202,7 +201,7 @@ class ListeningCountDownLatch(
timeout = timeout,
retryCount = retryCount,
retryDelay = retryDelay,
waiter = ::waitMajorityJobs,
waitStrategy = WaitStrategy.MAJORITY,
) { backend ->
backend.listen(channelName = buildKey(channelSpace, name))
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package com.himadieiev.redpulsar.core.locks.abstracts

import com.himadieiev.redpulsar.core.locks.abstracts.backends.LocksBackend
import com.himadieiev.redpulsar.core.locks.excecutors.WaitStrategy
import com.himadieiev.redpulsar.core.locks.excecutors.executeWithRetry
import com.himadieiev.redpulsar.core.locks.excecutors.waitAllJobs
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.runBlocking
import java.time.Duration
Expand Down Expand Up @@ -43,7 +43,7 @@ abstract class AbstractMultiInstanceLock(
cleanUp = { backend ->
unlockInstance(backend, resourceName)
},
waiter = ::waitAllJobs,
waitStrategy = WaitStrategy.ALL,
callee = { backend ->
lockInstance(backend, resourceName, ttl)
},
Expand All @@ -64,7 +64,7 @@ abstract class AbstractMultiInstanceLock(
defaultDrift = Duration.ofMillis(3L * backends.size),
retryCount = retryCount,
retryDelay = retryDelay,
waiter = ::waitAllJobs,
waitStrategy = WaitStrategy.ALL,
callee = { backend ->
unlockInstance(backend, resourceName)
},
Expand Down
Original file line number Diff line number Diff line change
@@ -1,29 +1,26 @@
package com.himadieiev.redpulsar.core.locks.excecutors

import kotlinx.coroutines.Job
import kotlinx.coroutines.joinAll
import kotlinx.coroutines.yield
import java.util.concurrent.atomic.AtomicInteger

suspend inline fun waitAllJobs(jobs: List<Job>) {
jobs.joinAll()
enum class WaitStrategy {
ALL,
MAJORITY,
}

suspend inline fun waitMajorityJobs(jobs: List<Job>) {
val quorum: Int = jobs.size / 2 + 1
val succeed = AtomicInteger(0)
val failed = AtomicInteger(0)
jobs.forEach { job ->
job.invokeOnCompletion { cause ->
if (cause == null) {
succeed.incrementAndGet()
} else {
failed.incrementAndGet()
}
}
fun requiredToSuccessCount(
waitStrategy: WaitStrategy,
backendsSize: Int,
): Int {
return when (waitStrategy) {
WaitStrategy.ALL -> backendsSize
WaitStrategy.MAJORITY -> backendsSize / 2 + 1
}
while (succeed.get() < quorum && failed.get() < quorum) {
yield()
}

fun enoughToFailCount(
waitStrategy: WaitStrategy,
backendsSize: Int,
): Int {
return when (waitStrategy) {
WaitStrategy.ALL -> 1
WaitStrategy.MAJORITY -> backendsSize - requiredToSuccessCount(waitStrategy, backendsSize) + 1
}
return
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,23 @@ import com.himadieiev.redpulsar.core.locks.abstracts.Backend
import com.himadieiev.redpulsar.core.utils.withRetry
import com.himadieiev.redpulsar.core.utils.withTimeoutInThread
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Deferred
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.Job
import kotlinx.coroutines.async
import kotlinx.coroutines.joinAll
import kotlinx.coroutines.launch
import kotlinx.coroutines.yield
import mu.KotlinLogging
import java.time.Duration
import java.util.Collections
import java.util.concurrent.atomic.AtomicInteger
import kotlin.system.measureTimeMillis

/**
* An algorithm for running closure on multiple remote instances proxied by [backends].
* Each call will be executed in separate [Job] and wait for the result using one of two self-explanatory strategies:
* [waitAllJobs] and [waitMajorityJobs].
* [WaitStrategy.ALL] and [WaitStrategy.MAJORITY].
* Also, it checks whether the result is successful on majority (depends on waiting strategy) of instances and time
* spend for getting results is not exceeding some reasonable time difference using [timeout] and
* clock drift.
Expand All @@ -32,32 +38,45 @@ import kotlin.system.measureTimeMillis
* @param waiter [Function] the function to wait for results.
* @param callee [Function] the function to call on each backend.
*/
@OptIn(ExperimentalCoroutinesApi::class)
suspend inline fun <T : Backend, R> multiInstanceExecute(
backends: List<T>,
scope: CoroutineScope,
timeout: Duration,
defaultDrift: Duration = Duration.ofMillis(3),
waitStrategy: WaitStrategy = WaitStrategy.ALL,
crossinline cleanUp: (backend: T) -> Unit = { _ -> },
crossinline waiter: suspend (jobs: List<Job>) -> Unit,
crossinline callee: suspend (backend: T) -> R,
): List<R> {
val jobs = mutableListOf<Job>()
val quorum: Int = backends.size / 2 + 1
val jobs = mutableListOf<Deferred<R>>()
val quorum = backends.size / 2 + 1
val successCount = requiredToSuccessCount(waitStrategy, backends.size)
val failedCount = enoughToFailCount(waitStrategy, backends.size)
val results = Collections.synchronizedList(mutableListOf<R>())
val clockDrift = (timeout.toMillis() * 0.01).toLong() + defaultDrift.toMillis()
val timeDiff =
measureTimeMillis {
backends.forEach { backend ->
jobs.add(
scope.launch {
val result = callee(backend)
scope.async { callee(backend) },
)
}
val failed = AtomicInteger(0)
jobs.forEach { job ->
job.invokeOnCompletion { cause ->
if (cause == null) {
val result = job.getCompleted()
if (result != null) {
results.add(result)
return@invokeOnCompletion
}
},
)
}
failed.incrementAndGet()
}
}
while (results.size < successCount && failed.get() < failedCount) {
yield()
}
waiter(jobs)
}
val validity = timeout.toMillis() - timeDiff - clockDrift
val logger = KotlinLogging.logger { }
Expand All @@ -68,7 +87,7 @@ suspend inline fun <T : Backend, R> multiInstanceExecute(
backends.forEach { backend ->
cleanUpJobs.add(scope.launch { cleanUp(backend) })
}
waitAllJobs(cleanUpJobs)
cleanUpJobs.joinAll()
return emptyList()
}
return results
Expand All @@ -81,8 +100,8 @@ suspend inline fun <T : Backend, R> multiInstanceExecuteWithRetry(
defaultDrift: Duration = Duration.ofMillis(3),
retryCount: Int = 3,
retryDelay: Duration = Duration.ofMillis(100),
waitStrategy: WaitStrategy = WaitStrategy.ALL,
crossinline cleanUp: (backend: T) -> Unit = { _ -> },
crossinline waiter: suspend (jobs: List<Job>) -> Unit,
crossinline callee: suspend (backend: T) -> R,
): List<R> {
return withRetry(retryCount = retryCount, retryDelay = retryDelay) {
Expand All @@ -91,7 +110,7 @@ suspend inline fun <T : Backend, R> multiInstanceExecuteWithRetry(
scope = scope,
timeout = timeout,
defaultDrift = defaultDrift,
waiter = waiter,
waitStrategy = waitStrategy,
callee = callee,
cleanUp = cleanUp,
)
Expand All @@ -105,7 +124,7 @@ suspend fun <T : Backend, R> List<T>.executeWithRetry(
retryCount: Int = 3,
retryDelay: Duration = Duration.ofMillis(100),
cleanUp: (backend: T) -> Unit = { _ -> },
waiter: suspend (jobs: List<Job>) -> Unit,
waitStrategy: WaitStrategy = WaitStrategy.ALL,
callee: suspend (backend: T) -> R,
): List<R> {
return multiInstanceExecuteWithRetry(
Expand All @@ -115,7 +134,7 @@ suspend fun <T : Backend, R> List<T>.executeWithRetry(
defaultDrift = defaultDrift,
retryCount = retryCount,
retryDelay = retryDelay,
waiter = waiter,
waitStrategy = waitStrategy,
callee = callee,
cleanUp = cleanUp,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import org.junit.jupiter.params.provider.ValueSource
import java.time.Duration

@Tag(TestTags.UNIT)
class MultyInstanceExecutorTest {
class MultiInstanceExecutorTest {
private var backends: List<TestBackend> = emptyList()
private lateinit var scope: CoroutineScope

Expand All @@ -39,7 +39,7 @@ class MultyInstanceExecutorTest {
backends = backends,
scope = scope,
timeout = Duration.ofSeconds(1),
waiter = ::waitAllJobs,
waitStrategy = WaitStrategy.ALL,
) { backend ->
backend.test()
}
Expand All @@ -50,7 +50,7 @@ class MultyInstanceExecutorTest {
}

@ParameterizedTest(name = "quorum instances are down {0} instances")
@ValueSource(ints = [2, 3, 4, 5, 7, 10])
@ValueSource(ints = [1, 2, 3, 4, 5, 7, 10])
fun `quorum instances are down`(number: Int) {
val quorum = number / 2 + 1
backends = createBackends(number)
Expand All @@ -65,7 +65,7 @@ class MultyInstanceExecutorTest {
backends = backends,
scope = scope,
timeout = Duration.ofSeconds(1),
waiter = ::waitAllJobs,
waitStrategy = WaitStrategy.ALL,
) { backend ->
backend.test()
}
Expand All @@ -76,7 +76,7 @@ class MultyInstanceExecutorTest {
}

@ParameterizedTest(name = "non quorum instances are down {0} instances")
@ValueSource(ints = [2, 3, 4, 5, 7, 10])
@ValueSource(ints = [1, 2, 3, 4, 5, 7, 10])
fun `non quorum instances are down`(number: Int) {
val quorum = number / 2 + 1
backends = createBackends(number)
Expand All @@ -91,7 +91,7 @@ class MultyInstanceExecutorTest {
backends = backends,
scope = scope,
timeout = Duration.ofSeconds(1),
waiter = ::waitAllJobs,
waitStrategy = WaitStrategy.ALL,
) { backend ->
backend.test()
}
Expand All @@ -113,7 +113,7 @@ class MultyInstanceExecutorTest {
backends = backends,
scope = scope,
timeout = Duration.ofSeconds(1),
waiter = ::waitMajorityJobs,
waitStrategy = WaitStrategy.MAJORITY,
) { backend -> backend.test() }
}

Expand All @@ -136,7 +136,7 @@ class MultyInstanceExecutorTest {
backends = backends,
scope = scope,
timeout = Duration.ofSeconds(1),
waiter = ::waitMajorityJobs,
waitStrategy = WaitStrategy.MAJORITY,
) { backend -> backend.test() }
}

Expand All @@ -145,7 +145,7 @@ class MultyInstanceExecutorTest {
}

@ParameterizedTest(name = "retry on non quorum instance count is down {0} instances")
@ValueSource(ints = [2, 3, 4, 5, 7, 10])
@ValueSource(ints = [1, 2, 3, 4, 5, 7, 10])
fun `retry on non quorum instance count is down`(number: Int) {
val quorum = number / 2 + 1
backends = createBackends(number)
Expand All @@ -162,7 +162,7 @@ class MultyInstanceExecutorTest {
timeout = Duration.ofSeconds(1),
retryCount = 3,
retryDelay = Duration.ofMillis(1),
waiter = ::waitAllJobs,
waitStrategy = WaitStrategy.ALL,
) { backend ->
backend.test()
}
Expand Down

0 comments on commit 0b590d1

Please sign in to comment.