Skip to content

Commit

Permalink
change doc for MultyInstanceExecutor
Browse files Browse the repository at this point in the history
  • Loading branch information
himadieievsv committed Jan 5, 2024
1 parent 1c36420 commit 0c02a40
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ class ListeningCountDownLatch(
private fun count(): List<String?> {
return backends.executeWithRetry(
scope = scope,
releaseTime = maxDuration,
timeout = maxDuration,
retryCount = retryCount,
retryDelay = retryDelay,
) { backend ->
Expand All @@ -157,7 +157,7 @@ class ListeningCountDownLatch(
private fun undoCount() {
backends.executeWithRetry(
scope = scope,
releaseTime = maxDuration,
timeout = maxDuration,
retryCount = retryCount,
retryDelay = retryDelay,
) { backend ->
Expand All @@ -172,7 +172,7 @@ class ListeningCountDownLatch(
private fun checkCount(scope: CoroutineScope): List<Long?> {
return backends.executeWithRetry(
scope = scope,
releaseTime = maxDuration * 2,
timeout = maxDuration * 2,
retryCount = retryCount,
retryDelay = retryDelay,
) { backend ->
Expand All @@ -186,7 +186,7 @@ class ListeningCountDownLatch(
): List<String?> {
return backends.executeWithRetry(
scope = scope,
releaseTime = timeout,
timeout = timeout,
retryCount = retryCount,
retryDelay = retryDelay,
// Allow non-quorum polling here. That might need to be changed as it could lead to unexpected behavior
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,29 +13,28 @@ import kotlin.time.Duration
import kotlin.time.Duration.Companion.milliseconds

/**
* Algorithm for run closure on multiple remote instances proxied by [backends].
* Each call will be executed in separate [Job].
* After all calls are finished, algorithm will check whether the result is successful depends on a strategy.
* So far there are two self-explanatory strategies: [waitAllJobs] and [waitAnyJobs].
* Besides those strategies, there is a clock drift. Some operation might be time sensitive
* e.g. setting expiration date, so clock drift is used to judge whether the result is valid allowing
* some reasonable time difference in closure executions on multiple instances.
* An algorithm for running closure on multiple remote instances proxied by [backends].
* Each call will be executed in separate [Job] and wait for the result using one of two self-explanatory strategies:
* [waitAllJobs] and [waitAnyJobs].
* Also, it checks whether the result is successful on majority (depends on waiting strategy) of instances and time
* spend for getting results is not exceeding some reasonable time difference using [timeout] and
* clok drift.
*
* Coroutine used by callee must be cooperative coroutine (not blocking).
* In order to cancel jobs forcefully, use [withTimeoutInThread] instead.
*/
inline fun <T : Backend, R> multyInstanceExecute(
backends: List<T>,
scope: CoroutineScope,
releaseTime: Duration,
timeout: Duration,
defaultDrift: Duration = 3.milliseconds,
crossinline waiter: suspend (jobs: List<Job>, results: MutableList<R>) -> Unit = ::waitAllJobs,
crossinline callee: suspend (backend: T) -> R,
): List<R> {
val jobs = mutableListOf<Job>()
val quorum: Int = backends.size / 2 + 1
val results = Collections.synchronizedList(mutableListOf<R>())
val clockDrift =
(releaseTime.inWholeMilliseconds * 0.01).toLong() + defaultDrift.inWholeMilliseconds * backends.size
val clockDrift = (timeout.inWholeMilliseconds * 0.01).toLong() + defaultDrift.inWholeMilliseconds
val timeDiff =
measureTimeMillis {
backends.forEach { backend ->
Expand All @@ -50,7 +49,7 @@ inline fun <T : Backend, R> multyInstanceExecute(
}
runBlocking(scope.coroutineContext) { waiter(jobs, results) }
}
val validity = releaseTime.inWholeMilliseconds - timeDiff - clockDrift
val validity = timeout.inWholeMilliseconds - timeDiff - clockDrift
if (results.size < quorum || validity < 0) {
return emptyList()
}
Expand All @@ -60,7 +59,7 @@ inline fun <T : Backend, R> multyInstanceExecute(
inline fun <T : Backend, R> multyInstanceExecuteWithRetry(
backends: List<T>,
scope: CoroutineScope,
releaseTime: Duration,
timeout: Duration,
defaultDrift: Duration = 3.milliseconds,
retryCount: Int = 3,
retryDelay: Duration = 100.milliseconds,
Expand All @@ -71,7 +70,7 @@ inline fun <T : Backend, R> multyInstanceExecuteWithRetry(
return@withRetry multyInstanceExecute(
backends = backends,
scope = scope,
releaseTime = releaseTime,
timeout = timeout,
defaultDrift = defaultDrift,
waiter = waiter,
callee = callee,
Expand All @@ -81,7 +80,7 @@ inline fun <T : Backend, R> multyInstanceExecuteWithRetry(

fun <T : Backend, R> List<T>.executeWithRetry(
scope: CoroutineScope,
releaseTime: Duration,
timeout: Duration,
defaultDrift: Duration = 3.milliseconds,
retryCount: Int = 3,
retryDelay: Duration = 100.milliseconds,
Expand All @@ -91,7 +90,7 @@ fun <T : Backend, R> List<T>.executeWithRetry(
return multyInstanceExecuteWithRetry(
backends = this,
scope = scope,
releaseTime = releaseTime,
timeout = timeout,
defaultDrift = defaultDrift,
retryCount = retryCount,
retryDelay = retryDelay,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ class MultyInstanceExecutorTest {
multyInstanceExecute(
backends = backends,
scope = scope,
releaseTime = 1.seconds,
timeout = 1.seconds,
waiter = ::waitAllJobs,
) { backend ->
backend.test()
Expand All @@ -61,7 +61,7 @@ class MultyInstanceExecutorTest {
multyInstanceExecute(
backends = backends,
scope = scope,
releaseTime = 1.seconds,
timeout = 1.seconds,
waiter = ::waitAllJobs,
) { backend ->
backend.test()
Expand All @@ -85,7 +85,7 @@ class MultyInstanceExecutorTest {
multyInstanceExecute(
backends = backends,
scope = scope,
releaseTime = 1.seconds,
timeout = 1.seconds,
waiter = ::waitAllJobs,
) { backend ->
backend.test()
Expand All @@ -105,7 +105,7 @@ class MultyInstanceExecutorTest {
multyInstanceExecute(
backends = backends,
scope = scope,
releaseTime = 1.seconds,
timeout = 1.seconds,
waiter = ::waitAnyJobs,
) { backend -> backend.test() }

Expand All @@ -124,7 +124,7 @@ class MultyInstanceExecutorTest {
multyInstanceExecute(
backends = backends,
scope = scope,
releaseTime = 1.seconds,
timeout = 1.seconds,
waiter = ::waitAnyJobs,
) { backend -> backend.test() }

Expand All @@ -146,7 +146,7 @@ class MultyInstanceExecutorTest {
multyInstanceExecuteWithRetry(
backends = backends,
scope = scope,
releaseTime = 1.seconds,
timeout = 1.seconds,
retryCount = 3,
retryDelay = 1.milliseconds,
waiter = ::waitAllJobs,
Expand Down

0 comments on commit 0c02a40

Please sign in to comment.