Skip to content

Commit

Permalink
make multiInstanceExecute cooperative (#58)
Browse files Browse the repository at this point in the history
* make multiInstanceExecute cooperative

* bump version to 1.1.3
  • Loading branch information
himadieievsv authored Jan 13, 2024
1 parent 2219224 commit f29c052
Show file tree
Hide file tree
Showing 5 changed files with 138 additions and 110 deletions.
2 changes: 1 addition & 1 deletion build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ plugins {

allprojects {
group = "com.himadieiev"
version = "1.1.2"
version = "1.1.3"

repositories {
mavenCentral()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import com.himadieiev.redpulsar.core.locks.abstracts.backends.CountDownLatchBack
import com.himadieiev.redpulsar.core.locks.api.CallResult
import com.himadieiev.redpulsar.core.locks.api.CountDownLatch
import com.himadieiev.redpulsar.core.locks.excecutors.executeWithRetry
import com.himadieiev.redpulsar.core.locks.excecutors.waitAllJobs
import com.himadieiev.redpulsar.core.locks.excecutors.waitMajorityJobs
import kotlinx.coroutines.CoroutineName
import kotlinx.coroutines.CoroutineScope
Expand Down Expand Up @@ -140,46 +141,55 @@ class ListeningCountDownLatch(
}

private fun count(): List<String?> {
return backends.executeWithRetry(
scope = scope,
timeout = maxDuration,
retryCount = retryCount,
retryDelay = retryDelay,
) { backend ->
backend.count(
latchKeyName = buildKey(name),
channelName = buildKey(channelSpace, name),
clientId = clientId,
count = currentCounter.get(),
initialCount = count,
ttl = maxDuration.multipliedBy(2),
)
return runBlocking {
backends.executeWithRetry(
scope = scope,
timeout = maxDuration,
retryCount = retryCount,
retryDelay = retryDelay,
waiter = ::waitAllJobs,
) { backend ->
backend.count(
latchKeyName = buildKey(name),
channelName = buildKey(channelSpace, name),
clientId = clientId,
count = currentCounter.get(),
initialCount = count,
ttl = maxDuration.multipliedBy(2),
)
}
}
}

private fun undoCount() {
backends.executeWithRetry(
scope = scope,
timeout = maxDuration,
retryCount = retryCount,
retryDelay = retryDelay,
) { backend ->
backend.undoCount(
latchKeyName = buildKey(name),
clientId = clientId,
count = currentCounter.get(),
)
runBlocking {
backends.executeWithRetry(
scope = scope,
timeout = maxDuration,
retryCount = retryCount,
retryDelay = retryDelay,
waiter = ::waitAllJobs,
) { backend ->
backend.undoCount(
latchKeyName = buildKey(name),
clientId = clientId,
count = currentCounter.get(),
)
}
}
}

private fun checkCount(scope: CoroutineScope): List<Long?> {
return backends.executeWithRetry(
scope = scope,
timeout = maxDuration.multipliedBy(2),
retryCount = retryCount,
retryDelay = retryDelay,
) { backend ->
backend.checkCount(latchKeyName = buildKey(name))
return runBlocking {
backends.executeWithRetry(
scope = scope,
timeout = maxDuration.multipliedBy(2),
retryCount = retryCount,
retryDelay = retryDelay,
waiter = ::waitAllJobs,
) { backend ->
backend.checkCount(latchKeyName = buildKey(name))
}
}
}

Expand All @@ -192,8 +202,6 @@ class ListeningCountDownLatch(
timeout = timeout,
retryCount = retryCount,
retryDelay = retryDelay,
// Allow non-quorum polling here. That might need to be changed as it could lead to unexpected behavior
// if multiple instances goes down or encounter network issue.
waiter = ::waitMajorityJobs,
) { backend ->
backend.listen(channelName = buildKey(channelSpace, name))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ package com.himadieiev.redpulsar.core.locks.abstracts

import com.himadieiev.redpulsar.core.locks.abstracts.backends.LocksBackend
import com.himadieiev.redpulsar.core.locks.excecutors.executeWithRetry
import com.himadieiev.redpulsar.core.locks.excecutors.waitAllJobs
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.runBlocking
import java.time.Duration

/**
Expand Down Expand Up @@ -31,19 +33,22 @@ abstract class AbstractMultiInstanceLock(
resourceName: String,
ttl: Duration,
): Boolean {
return backends.executeWithRetry(
scope = scope,
timeout = ttl,
defaultDrift = Duration.ofMillis(3L * backends.size),
retryCount = retryCount,
retryDelay = retryDelay,
cleanUp = { backend ->
unlockInstance(backend, resourceName)
},
callee = { backend ->
lockInstance(backend, resourceName, ttl)
},
).isNotEmpty()
return runBlocking {
backends.executeWithRetry(
scope = scope,
timeout = ttl,
defaultDrift = Duration.ofMillis(3L * backends.size),
retryCount = retryCount,
retryDelay = retryDelay,
cleanUp = { backend ->
unlockInstance(backend, resourceName)
},
waiter = ::waitAllJobs,
callee = { backend ->
lockInstance(backend, resourceName, ttl)
},
)
}.isNotEmpty()
}

/**
Expand All @@ -52,16 +57,19 @@ abstract class AbstractMultiInstanceLock(
* @return [Boolean] true if the lock was released, false otherwise.
*/
override fun unlock(resourceName: String): Boolean {
return backends.executeWithRetry(
scope = scope,
timeout = Duration.ofSeconds(1),
defaultDrift = Duration.ofMillis(3L * backends.size),
retryCount = retryCount,
retryDelay = retryDelay,
callee = { backend ->
unlockInstance(backend, resourceName)
},
).isNotEmpty()
return runBlocking {
backends.executeWithRetry(
scope = scope,
timeout = Duration.ofSeconds(1),
defaultDrift = Duration.ofMillis(3L * backends.size),
retryCount = retryCount,
retryDelay = retryDelay,
waiter = ::waitAllJobs,
callee = { backend ->
unlockInstance(backend, resourceName)
},
)
}.isNotEmpty()
}

protected fun backendSize(): Int = backends.size
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import com.himadieiev.redpulsar.core.utils.withTimeoutInThread
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Job
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import java.time.Duration
import java.util.Collections
import kotlin.system.measureTimeMillis
Expand All @@ -32,13 +31,13 @@ import kotlin.system.measureTimeMillis
* @param waiter [Function] the function to wait for results.
* @param callee [Function] the function to call on each backend.
*/
inline fun <T : Backend, R> multiInstanceExecute(
suspend inline fun <T : Backend, R> multiInstanceExecute(
backends: List<T>,
scope: CoroutineScope,
timeout: Duration,
defaultDrift: Duration = Duration.ofMillis(3),
crossinline cleanUp: (backend: T) -> Unit = { _ -> },
crossinline waiter: suspend (jobs: List<Job>) -> Unit = ::waitAllJobs,
crossinline waiter: suspend (jobs: List<Job>) -> Unit,
crossinline callee: suspend (backend: T) -> R,
): List<R> {
val jobs = mutableListOf<Job>()
Expand All @@ -57,29 +56,29 @@ inline fun <T : Backend, R> multiInstanceExecute(
},
)
}
runBlocking(scope.coroutineContext) { waiter(jobs) }
waiter(jobs)
}
val validity = timeout.toMillis() - timeDiff - clockDrift
if (results.size < quorum || validity < 0) {
val cleanUpJobs = mutableListOf<Job>()
backends.forEach { backend ->
cleanUpJobs.add(scope.launch { cleanUp(backend) })
}
runBlocking(scope.coroutineContext) { waitAllJobs(cleanUpJobs) }
waitAllJobs(cleanUpJobs)
return emptyList()
}
return results
}

inline fun <T : Backend, R> multyInstanceExecuteWithRetry(
suspend inline fun <T : Backend, R> multiInstanceExecuteWithRetry(
backends: List<T>,
scope: CoroutineScope,
timeout: Duration,
defaultDrift: Duration = Duration.ofMillis(3),
retryCount: Int = 3,
retryDelay: Duration = Duration.ofMillis(100),
crossinline cleanUp: (backend: T) -> Unit = { _ -> },
crossinline waiter: suspend (jobs: List<Job>) -> Unit = ::waitAllJobs,
crossinline waiter: suspend (jobs: List<Job>) -> Unit,
crossinline callee: suspend (backend: T) -> R,
): List<R> {
return withRetry(retryCount = retryCount, retryDelay = retryDelay) {
Expand All @@ -95,17 +94,17 @@ inline fun <T : Backend, R> multyInstanceExecuteWithRetry(
}
}

fun <T : Backend, R> List<T>.executeWithRetry(
suspend fun <T : Backend, R> List<T>.executeWithRetry(
scope: CoroutineScope,
timeout: Duration,
defaultDrift: Duration = Duration.ofMillis(3),
retryCount: Int = 3,
retryDelay: Duration = Duration.ofMillis(100),
cleanUp: (backend: T) -> Unit = { _ -> },
waiter: suspend (jobs: List<Job>) -> Unit = ::waitAllJobs,
waiter: suspend (jobs: List<Job>) -> Unit,
callee: suspend (backend: T) -> R,
): List<R> {
return multyInstanceExecuteWithRetry(
return multiInstanceExecuteWithRetry(
backends = this,
scope = scope,
timeout = timeout,
Expand Down
Loading

0 comments on commit f29c052

Please sign in to comment.