Skip to content

Commit

Permalink
Listen to majority (#57)
Browse files Browse the repository at this point in the history
* listener will wait for the majority of instances

* remove unused argument

* atomic integers for concurrent counts
  • Loading branch information
himadieievsv authored Jan 13, 2024
1 parent 011f3a7 commit 2219224
Show file tree
Hide file tree
Showing 8 changed files with 53 additions and 53 deletions.
2 changes: 1 addition & 1 deletion build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ plugins {

allprojects {
group = "com.himadieiev"
version = "1.1.1"
version = "1.1.2"

repositories {
mavenCentral()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import com.himadieiev.redpulsar.core.locks.abstracts.backends.CountDownLatchBack
import com.himadieiev.redpulsar.core.locks.api.CallResult
import com.himadieiev.redpulsar.core.locks.api.CountDownLatch
import com.himadieiev.redpulsar.core.locks.excecutors.executeWithRetry
import com.himadieiev.redpulsar.core.locks.excecutors.waitAnyJobs
import com.himadieiev.redpulsar.core.locks.excecutors.waitMajorityJobs
import kotlinx.coroutines.CoroutineName
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
Expand Down Expand Up @@ -194,7 +194,7 @@ class ListeningCountDownLatch(
retryDelay = retryDelay,
// Allow non-quorum polling here. That might need to be changed as it could lead to unexpected behavior
// if multiple instances goes down or encounter network issue.
waiter = ::waitAnyJobs,
waiter = ::waitMajorityJobs,
) { backend ->
backend.listen(channelName = buildKey(channelSpace, name))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,28 @@ package com.himadieiev.redpulsar.core.locks.excecutors

import kotlinx.coroutines.Job
import kotlinx.coroutines.joinAll
import kotlinx.coroutines.selects.select
import kotlinx.coroutines.yield
import java.util.concurrent.atomic.AtomicInteger

suspend inline fun <T> waitAllJobs(
jobs: List<Job>,
@Suppress("UNUSED_PARAMETER") results: MutableList<T>,
) {
suspend inline fun waitAllJobs(jobs: List<Job>) {
jobs.joinAll()
}

suspend inline fun <T> waitAnyJobs(
jobs: List<Job>,
results: MutableList<T>,
) {
select { jobs.forEach { job -> job.onJoin { } } }
jobs.forEach { job -> job.cancel() }
// enough one success result to consider latch opened
if (results.isNotEmpty()) {
repeat(jobs.size - results.size) { results.add(results.first()) }
suspend inline fun waitMajorityJobs(jobs: List<Job>) {
val quorum: Int = jobs.size / 2 + 1
val succeed = AtomicInteger(0)
val failed = AtomicInteger(0)
jobs.forEach { job ->
job.invokeOnCompletion { cause ->
if (cause == null) {
succeed.incrementAndGet()
} else {
failed.incrementAndGet()
}
}
}
while (succeed.get() < quorum && failed.get() < quorum) {
yield()
}
return
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import kotlin.system.measureTimeMillis
/**
* An algorithm for running closure on multiple remote instances proxied by [backends].
* Each call will be executed in separate [Job] and wait for the result using one of two self-explanatory strategies:
* [waitAllJobs] and [waitAnyJobs].
* [waitAllJobs] and [waitMajorityJobs].
* Also, it checks whether the result is successful on majority (depends on waiting strategy) of instances and time
* spend for getting results is not exceeding some reasonable time difference using [timeout] and
* clock drift.
Expand All @@ -38,7 +38,7 @@ inline fun <T : Backend, R> multiInstanceExecute(
timeout: Duration,
defaultDrift: Duration = Duration.ofMillis(3),
crossinline cleanUp: (backend: T) -> Unit = { _ -> },
crossinline waiter: suspend (jobs: List<Job>, results: MutableList<R>) -> Unit = ::waitAllJobs,
crossinline waiter: suspend (jobs: List<Job>) -> Unit = ::waitAllJobs,
crossinline callee: suspend (backend: T) -> R,
): List<R> {
val jobs = mutableListOf<Job>()
Expand All @@ -57,15 +57,15 @@ inline fun <T : Backend, R> multiInstanceExecute(
},
)
}
runBlocking(scope.coroutineContext) { waiter(jobs, results) }
runBlocking(scope.coroutineContext) { waiter(jobs) }
}
val validity = timeout.toMillis() - timeDiff - clockDrift
if (results.size < quorum || validity < 0) {
val cleanUpJobs = mutableListOf<Job>()
backends.forEach { backend ->
cleanUpJobs.add(scope.launch { cleanUp(backend) })
}
runBlocking(scope.coroutineContext) { waitAllJobs(cleanUpJobs, Collections.emptyList<String>()) }
runBlocking(scope.coroutineContext) { waitAllJobs(cleanUpJobs) }
return emptyList()
}
return results
Expand All @@ -79,7 +79,7 @@ inline fun <T : Backend, R> multyInstanceExecuteWithRetry(
retryCount: Int = 3,
retryDelay: Duration = Duration.ofMillis(100),
crossinline cleanUp: (backend: T) -> Unit = { _ -> },
crossinline waiter: suspend (jobs: List<Job>, results: MutableList<R>) -> Unit = ::waitAllJobs,
crossinline waiter: suspend (jobs: List<Job>) -> Unit = ::waitAllJobs,
crossinline callee: suspend (backend: T) -> R,
): List<R> {
return withRetry(retryCount = retryCount, retryDelay = retryDelay) {
Expand All @@ -102,7 +102,7 @@ fun <T : Backend, R> List<T>.executeWithRetry(
retryCount: Int = 3,
retryDelay: Duration = Duration.ofMillis(100),
cleanUp: (backend: T) -> Unit = { _ -> },
waiter: suspend (jobs: List<Job>, results: MutableList<R>) -> Unit = ::waitAllJobs,
waiter: suspend (jobs: List<Job>) -> Unit = ::waitAllJobs,
callee: suspend (backend: T) -> R,
): List<R> {
return multyInstanceExecuteWithRetry(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -509,6 +509,7 @@ class ListeningCountDownLatchTest {
2,
backends = instances,
retryDelay = Duration.ofMillis(1),
retryCount = 1,
)
assertEquals(CallResult.SUCCESS, latch.await())

Expand All @@ -534,10 +535,11 @@ class ListeningCountDownLatchTest {
5,
backends = instances,
retryDelay = Duration.ofMillis(1),
retryCount = 1,
)
assertEquals(CallResult.SUCCESS, latch.await())

coVerify(exactly = 1) {
coVerify(atMost = 1) {
instances.forEach { backend -> backend.listen(any()) }
}
verify(exactly = 1) {
Expand All @@ -546,7 +548,7 @@ class ListeningCountDownLatchTest {
}

@Test
fun `quorum wasn't reach but await succeed`() {
fun `quorum wasn't reach at majority`() {
instances.forEach { backend ->
backend.everyCheckCount("countdownlatch:test", 1)
}
Expand All @@ -560,9 +562,9 @@ class ListeningCountDownLatchTest {
backends = instances,
retryDelay = Duration.ofMillis(1),
)
assertEquals(CallResult.SUCCESS, latch.await())
assertEquals(CallResult.FAILED, latch.await())

coVerify(atMost = 1) {
coVerify(atMost = 3) {
instances.forEach { backend -> backend.listen(any()) }
}
verify(exactly = 1) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import org.junit.jupiter.api.Tag
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource
import java.time.Duration
import kotlin.random.Random

@Tag(TestTags.UNIT)
class MultyInstanceExecutorTest {
Expand Down Expand Up @@ -43,13 +42,13 @@ class MultyInstanceExecutorTest {
backend.test()
}

assertEquals(createResponses(number), result)
assertEquals(number, result.size)
verify(exactly = 1) { backends.forEach { backend -> backend.test() } }
}

@ParameterizedTest(name = "quorum instance count is down {0} instances")
@ParameterizedTest(name = "quorum instances are down {0} instances")
@ValueSource(ints = [2, 3, 4, 5, 7, 10])
fun `quorum instance count is down`(number: Int) {
fun `quorum instances are down`(number: Int) {
val quorum = number / 2 + 1
backends = createBackends(number)
backends.forEach { backend -> every { backend.test() } returns "OK" }
Expand All @@ -71,9 +70,9 @@ class MultyInstanceExecutorTest {
verify(exactly = 1) { backends.forEach { backend -> backend.test() } }
}

@ParameterizedTest(name = "non quorum instance count is down {0} instances")
@ParameterizedTest(name = "non quorum instances are down {0} instances")
@ValueSource(ints = [2, 3, 4, 5, 7, 10])
fun `non quorum instance count is down`(number: Int) {
fun `non quorum instances are down`(number: Int) {
val quorum = number / 2 + 1
backends = createBackends(number)
backends.forEach { backend -> every { backend.test() } returns "OK" }
Expand All @@ -91,13 +90,13 @@ class MultyInstanceExecutorTest {
backend.test()
}

assertEquals(createResponses(quorum), result)
assertTrue(number / 2 + 1 <= result.size)
verify(exactly = 1) { backends.forEach { backend -> backend.test() } }
}

@ParameterizedTest(name = "all instances are ok, wait any with {0} instances")
@ParameterizedTest(name = "all instances are ok, wait majority with {0} instances")
@ValueSource(ints = [1, 2, 3, 4, 5, 7, 10])
fun `all instances are ok, wait any`(number: Int) {
fun `all instances are ok, wait majority`(number: Int) {
backends = createBackends(number)
backends.forEach { backend -> every { backend.test() } returns "OK" }

Expand All @@ -106,29 +105,31 @@ class MultyInstanceExecutorTest {
backends = backends,
scope = scope,
timeout = Duration.ofSeconds(1),
waiter = ::waitAnyJobs,
waiter = ::waitMajorityJobs,
) { backend -> backend.test() }

assertTrue(createResponses(number).size <= result.size)
assertTrue(number / 2 + 1 <= result.size)
verify(exactly = 1) { backends.forEach { backend -> backend.test() } }
}

@ParameterizedTest(name = "all instances are ok, wait any with {0} instances")
@ParameterizedTest(name = "all instances are ok, wait majority with {0} instances")
@ValueSource(ints = [1, 2, 3, 4, 5, 7, 10])
fun `one instance is up, wait any`(number: Int) {
fun `one instance is up, wait majority`(number: Int) {
backends = createBackends(number)
backends.forEach { backend -> every { backend.test() } returns null }
every { backends[Random.nextInt(0, number)].test() } returns "OK"
repeat(number) { i ->
every { backends[i].test() } returns "OK"
}

val result =
multiInstanceExecute(
backends = backends,
scope = scope,
timeout = Duration.ofSeconds(1),
waiter = ::waitAnyJobs,
waiter = ::waitMajorityJobs,
) { backend -> backend.test() }

assertEquals(createResponses(number), result)
assertTrue(number / 2 + 1 <= result.size)
verify(exactly = 1) { backends.forEach { backend -> backend.test() } }
}

Expand Down Expand Up @@ -169,12 +170,4 @@ class MultyInstanceExecutorTest {
}
return backends
}

private fun createResponses(number: Int): List<String> {
val responses = mutableListOf<String>()
repeat(number) {
responses.add("OK")
}
return responses
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ class JedisCountDownLatchBackendTest {
val result = countDownLatchBackend.listen("latch:channel:test")
assertEquals("open", result)
}
runBlocking { delay(100) }
runBlocking { delay(200) }
repeat(messageCount) {
pubSubSlot.captured.onMessage("latch:channel:test", "open")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ class LettuceCountDownLatchBackendTest {
Assertions.assertEquals("open", result)
}

runBlocking { Thread.sleep(100) }
runBlocking { Thread.sleep(200) }
repeat(messageCount) {
listener.captured.message("latch:channel:test", "open")
}
Expand Down

0 comments on commit 2219224

Please sign in to comment.