Skip to content

Commit

Permalink
clustered keys for CountDownLatch
Browse files Browse the repository at this point in the history
  • Loading branch information
himadieievsv committed Feb 14, 2024
1 parent aec83f5 commit 3911557
Show file tree
Hide file tree
Showing 4 changed files with 70 additions and 70 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ class ListeningCountDownLatch(
private val scope: CoroutineScope = CoroutineScope(CoroutineName("listeningCountDownLatch") + Dispatchers.IO)
private val clientId: String = UUID.randomUUID().toString()
private val keySpace = "countdownlatch"
private val channelSpace = "channels"
private val channelSpace = "channel"
private val currentCounter = AtomicInteger(count)
private val minimalMaxDuration = Duration.ofMillis(100)

Expand Down Expand Up @@ -150,7 +150,7 @@ class ListeningCountDownLatch(
) { backend ->
backend.count(
latchKeyName = buildKey(name),
channelName = buildKey(channelSpace, name),
channelName = buildKey(name, channelSpace),
clientId = clientId,
count = currentCounter.get(),
initialCount = count,
Expand Down Expand Up @@ -203,10 +203,10 @@ class ListeningCountDownLatch(
retryDelay = retryDelay,
waitStrategy = WaitStrategy.MAJORITY,
) { backend ->
backend.listen(channelName = buildKey(channelSpace, name))
backend.listen(channelName = buildKey(name, channelSpace))
}
}

@Suppress("NOTHING_TO_INLINE")
private inline fun buildKey(vararg parts: String) = keySpace + ":" + parts.joinToString(":")
private inline fun buildKey(name: String, vararg parts: String) = "{$keySpace:$name}:" + parts.joinToString(":")
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,9 @@ class ListeningCountDownLatchTest {

@Test
fun `count down`() {
backend.everyCount("countdownlatch:test", "countdownlatch:channels:test", 4, 4, returnVal = "OK")
backend.everyCount("countdownlatch:test", "countdownlatch:channels:test", 3, 4, returnVal = "OK")
backend.everyCount("countdownlatch:test", "countdownlatch:channels:test", 2, 4, returnVal = "OK")
backend.everyCount("{countdownlatch:test}:", "{countdownlatch:test}:channel", 4, 4, returnVal = "OK")
backend.everyCount("{countdownlatch:test}:", "{countdownlatch:test}:channel", 3, 4, returnVal = "OK")
backend.everyCount("{countdownlatch:test}:", "{countdownlatch:test}:channel", 2, 4, returnVal = "OK")
val latch =
ListeningCountDownLatch(
"test",
Expand All @@ -57,8 +57,8 @@ class ListeningCountDownLatchTest {

@Test
fun `count down failing`() {
backend.everyCount("countdownlatch:test", "countdownlatch:channels:test", 2, 2, returnVal = null)
backend.everyUndoCount("countdownlatch:test", 2, 1)
backend.everyCount("{countdownlatch:test}:", "{countdownlatch:test}:channel", 2, 2, returnVal = null)
backend.everyUndoCount("{countdownlatch:test}:", 2, 1)
val latch =
ListeningCountDownLatch(
"test",
Expand All @@ -76,8 +76,8 @@ class ListeningCountDownLatchTest {

@Test
fun `undo count failing`() {
backend.everyCount("countdownlatch:test", "countdownlatch:channels:test", 2, 2, returnVal = null)
backend.everyUndoCount("countdownlatch:test", 2, null)
backend.everyCount("{countdownlatch:test}:", "{countdownlatch:test}:channel", 2, 2, returnVal = null)
backend.everyUndoCount("{countdownlatch:test}:", 2, null)
val latch =
ListeningCountDownLatch(
"test",
Expand All @@ -96,7 +96,7 @@ class ListeningCountDownLatchTest {
@Test
fun await() {
backend.everyListen("open")
backend.everyCheckCount("countdownlatch:test", 2)
backend.everyCheckCount("{countdownlatch:test}:", 2)
val latch =
ListeningCountDownLatch(
"test",
Expand All @@ -117,7 +117,7 @@ class ListeningCountDownLatchTest {

@Test
fun `await finished by global count`() {
backend.everyCheckCount("countdownlatch:test", 4)
backend.everyCheckCount("{countdownlatch:test}:", 4)
val latch =
ListeningCountDownLatch(
"test",
Expand All @@ -138,7 +138,7 @@ class ListeningCountDownLatchTest {

@Test
fun `await - check count failed`() {
backend.everyCheckCount("countdownlatch:test", null)
backend.everyCheckCount("{countdownlatch:test}:", null)
val latch =
ListeningCountDownLatch(
"test",
Expand Down Expand Up @@ -175,13 +175,13 @@ class ListeningCountDownLatchTest {
@Test
fun `await timed out`() {
coEvery {
backend.listen(eq("countdownlatch:channels:test"))
backend.listen(eq("{countdownlatch:test}:channel"))
} answers {
runBlocking { delay(1000) }
null
}

backend.everyCheckCount("countdownlatch:test", 2)
backend.everyCheckCount("{countdownlatch:test}:", 2)
val latch = ListeningCountDownLatch("test", 4, listOf(backend))

assertEquals(CallResult.FAILED, latch.await(Duration.ofMillis(110)))
Expand All @@ -196,7 +196,7 @@ class ListeningCountDownLatchTest {
@Test
fun `await failed`() {
backend.everyListen(null)
backend.everyCheckCount("countdownlatch:test", 3)
backend.everyCheckCount("{countdownlatch:test}:", 3)
val latch =
ListeningCountDownLatch(
"test",
Expand All @@ -216,7 +216,7 @@ class ListeningCountDownLatchTest {
@ParameterizedTest(name = "current count: {0}")
@ValueSource(ints = [-123, -1, 0, 1, 2, 5])
fun `check count`(count: Long) {
backend.everyCheckCount("countdownlatch:test", count)
backend.everyCheckCount("{countdownlatch:test}:", count)
val latch = ListeningCountDownLatch("test", 5, listOf(backend))

assertEquals(5 - count.toInt(), latch.getCount())
Expand All @@ -226,7 +226,7 @@ class ListeningCountDownLatchTest {

@Test
fun `check count failed`() {
backend.everyCheckCount("countdownlatch:test", null)
backend.everyCheckCount("{countdownlatch:test}:", null)
val latch =
ListeningCountDownLatch(
"test",
Expand Down Expand Up @@ -391,8 +391,8 @@ class ListeningCountDownLatchTest {
fun `all instances are in quorum for count down`() {
instances.forEach { backend ->
backend.everyCount(
"countdownlatch:test",
"countdownlatch:channels:test",
"{countdownlatch:test}:",
"{countdownlatch:test}:channel",
4,
4,
Duration.ofMinutes(10),
Expand All @@ -417,24 +417,24 @@ class ListeningCountDownLatchTest {
@Test
fun `two instances are in quorum for count down`() {
backend1.everyCount(
"countdownlatch:test",
"countdownlatch:channels:test",
"{countdownlatch:test}:",
"{countdownlatch:test}:channel",
4,
4,
Duration.ofMinutes(10),
"OK",
)
backend2.everyCount(
"countdownlatch:test",
"countdownlatch:channels:test",
"{countdownlatch:test}:",
"{countdownlatch:test}:channel",
4,
4,
Duration.ofMinutes(10),
null,
)
backend3.everyCount(
"countdownlatch:test",
"countdownlatch:channels:test",
"{countdownlatch:test}:",
"{countdownlatch:test}:channel",
4,
4,
Duration.ofMinutes(10),
Expand All @@ -458,31 +458,31 @@ class ListeningCountDownLatchTest {
@Test
fun `quorum wasn't reach for count down`() {
backend1.everyCount(
"countdownlatch:test",
"countdownlatch:channels:test",
"{countdownlatch:test}:",
"{countdownlatch:test}:channel",
4,
4,
Duration.ofMinutes(10),
null,
)
backend2.everyCount(
"countdownlatch:test",
"countdownlatch:channels:test",
"{countdownlatch:test}:",
"{countdownlatch:test}:channel",
4,
4,
Duration.ofMinutes(10),
null,
)
backend3.everyCount(
"countdownlatch:test",
"countdownlatch:channels:test",
"{countdownlatch:test}:",
"{countdownlatch:test}:channel",
4,
4,
Duration.ofMinutes(10),
"OK",
)
instances.forEach { backend ->
backend.everyUndoCount("countdownlatch:test", 4, 1)
backend.everyUndoCount("{countdownlatch:test}:", 4, 1)
}
val latch =
ListeningCountDownLatch(
Expand All @@ -503,7 +503,7 @@ class ListeningCountDownLatchTest {
fun `all instances are in quorum for await`() {
instances.forEach { backend ->
backend.everyListen("open")
backend.everyCheckCount("countdownlatch:test", 1)
backend.everyCheckCount("{countdownlatch:test}:", 1)
}
val latch =
ListeningCountDownLatch(
Expand All @@ -526,7 +526,7 @@ class ListeningCountDownLatchTest {
@Test
fun `two instances are in quorum for await`() {
instances.forEach { backend ->
backend.everyCheckCount("countdownlatch:test", 1)
backend.everyCheckCount("{countdownlatch:test}:", 1)
}
backend1.everyListen("open")
backend2.everyListen(null)
Expand All @@ -552,7 +552,7 @@ class ListeningCountDownLatchTest {
@Test
fun `one instance will continue to wait`() {
instances.forEach { backend ->
backend.everyCheckCount("countdownlatch:test", 1)
backend.everyCheckCount("{countdownlatch:test}:", 1)
}
backend1.everyListen {
delay(50)
Expand Down Expand Up @@ -587,7 +587,7 @@ class ListeningCountDownLatchTest {
@Test
fun `failed instance return value first`() {
instances.forEach { backend ->
backend.everyCheckCount("countdownlatch:test", 1)
backend.everyCheckCount("{countdownlatch:test}:", 1)
}
backend1.everyListen {
delay(100)
Expand Down Expand Up @@ -622,7 +622,7 @@ class ListeningCountDownLatchTest {
@Test
fun `quorum wasn't reach at majority`() {
instances.forEach { backend ->
backend.everyCheckCount("countdownlatch:test", 1)
backend.everyCheckCount("{countdownlatch:test}:", 1)
}
backend1.everyListen(null)
backend2.everyListen("open")
Expand All @@ -647,7 +647,7 @@ class ListeningCountDownLatchTest {
@Test
fun `all instances are down`() {
instances.forEach { backend ->
backend.everyCheckCount("countdownlatch:test", 1)
backend.everyCheckCount("{countdownlatch:test}:", 1)
backend.everyListen(null)
}
val latch =
Expand All @@ -669,9 +669,9 @@ class ListeningCountDownLatchTest {

@Test
fun `check count return max value of majority`() {
backend1.everyCheckCount("countdownlatch:test", 1)
backend2.everyCheckCount("countdownlatch:test", 2)
backend3.everyCheckCount("countdownlatch:test", 1)
backend1.everyCheckCount("{countdownlatch:test}:", 1)
backend2.everyCheckCount("{countdownlatch:test}:", 2)
backend3.everyCheckCount("{countdownlatch:test}:", 1)

val latch =
ListeningCountDownLatch(
Expand All @@ -687,9 +687,9 @@ class ListeningCountDownLatchTest {

@Test
fun `check count return min int`() {
backend1.everyCheckCount("countdownlatch:test", null)
backend2.everyCheckCount("countdownlatch:test", null)
backend3.everyCheckCount("countdownlatch:test", 2)
backend1.everyCheckCount("{countdownlatch:test}:", null)
backend2.everyCheckCount("{countdownlatch:test}:", null)
backend3.everyCheckCount("{countdownlatch:test}:", 2)

val latch =
ListeningCountDownLatch(
Expand Down Expand Up @@ -745,14 +745,14 @@ class ListeningCountDownLatchTest {
private fun CountDownLatchBackend.everyListen(returnVal: String?) {
val backend = this
coEvery {
backend.listen(eq("countdownlatch:channels:test"))
backend.listen(eq("{countdownlatch:test}:channel"))
} returns returnVal
}

private fun CountDownLatchBackend.everyListen(answer: suspend MockKAnswerScope<String?, String?>.(Call) -> String?) {
val backend = this
coEvery {
backend.listen(eq("countdownlatch:channels:test"))
backend.listen(eq("{countdownlatch:test}:channel"))
} coAnswers answer
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,12 @@ class JedisCountDownLatchBackendTest {
every {
redis.eval(
any(),
eq(listOf("latch:test", "latch:channel:test")),
eq(listOf("latch:test", "latch:test:channel")),
eq(listOf("${clientId}0", "5000", "4")),
)
} returns "OK"
val callResult =
countDownLatchBackend.count("latch:test", "latch:channel:test", clientId, 0, 4, Duration.ofSeconds(5))
countDownLatchBackend.count("latch:test", "latch:test:channel", clientId, 0, 4, Duration.ofSeconds(5))

assertEquals("OK", callResult)
verify(exactly = 1) {
Expand All @@ -63,12 +63,12 @@ class JedisCountDownLatchBackendTest {
every {
redis.eval(
any(),
eq(listOf("latch:test", "latch:channel:test")),
eq(listOf("latch:test", "latch:test:channel")),
eq(listOf("${clientId}0", "5000", "4")),
)
} throws IOException("test exception")
val callResult =
countDownLatchBackend.count("latch:test", "latch:channel:test", clientId, 0, 4, Duration.ofSeconds(5))
countDownLatchBackend.count("latch:test", "latch:test:channel", clientId, 0, 4, Duration.ofSeconds(5))

assertNull(callResult)
verify(exactly = 1) {
Expand Down Expand Up @@ -132,26 +132,26 @@ class JedisCountDownLatchBackendTest {
val channel = slot<String>()
every { redis.subscribe(capture(pubSubSlot), capture(channel)) } returns Unit
CoroutineScope(CoroutineName("test")).launch {
val result = countDownLatchBackend.listen("latch:channel:test")
val result = countDownLatchBackend.listen("latch:test:channel")
assertEquals("open", result)
}
runBlocking { delay(200) }
repeat(messageCount) {
pubSubSlot.captured.onMessage("latch:channel:test", "open")
pubSubSlot.captured.onMessage("latch:test:channel", "open")
}
assertEquals("latch:channel:test", channel.captured)
assertEquals("latch:test:channel", channel.captured)
verify(exactly = 1) {
redis.subscribe(any<JedisPubSub>(), any<String>())
}
}

@Test
fun `message not received`() {
every { redis.subscribe(any(), eq("latch:channel:test")) } returns Unit
every { redis.subscribe(any(), eq("latch:test:channel")) } returns Unit

runBlocking {
assertThrows<TimeoutCancellationException> {
withTimeout(100) { countDownLatchBackend.listen("latch:channel:test") }
withTimeout(100) { countDownLatchBackend.listen("latch:test:channel") }
}
}
verify(exactly = 1) {
Expand Down
Loading

0 comments on commit 3911557

Please sign in to comment.