From 39115574cbd1303b03a97c91cb1c5a2c753db7d0 Mon Sep 17 00:00:00 2001 From: himadieievsv Date: Wed, 14 Feb 2024 19:28:03 +0900 Subject: [PATCH] clustered keys for CountDownLatch --- .../core/locks/ListeningCountDownLatch.kt | 8 +- .../core/locks/ListeningCountDownLatchTest.kt | 88 +++++++++---------- .../JedisCountDownLatchBackendTest.kt | 18 ++-- .../LettuceCountDownLatchBackendTest.kt | 26 +++--- 4 files changed, 70 insertions(+), 70 deletions(-) diff --git a/redpulsar-core/src/main/kotlin/com/himadieiev/redpulsar/core/locks/ListeningCountDownLatch.kt b/redpulsar-core/src/main/kotlin/com/himadieiev/redpulsar/core/locks/ListeningCountDownLatch.kt index d812a32..f9c58fe 100644 --- a/redpulsar-core/src/main/kotlin/com/himadieiev/redpulsar/core/locks/ListeningCountDownLatch.kt +++ b/redpulsar-core/src/main/kotlin/com/himadieiev/redpulsar/core/locks/ListeningCountDownLatch.kt @@ -40,7 +40,7 @@ class ListeningCountDownLatch( private val scope: CoroutineScope = CoroutineScope(CoroutineName("listeningCountDownLatch") + Dispatchers.IO) private val clientId: String = UUID.randomUUID().toString() private val keySpace = "countdownlatch" - private val channelSpace = "channels" + private val channelSpace = "channel" private val currentCounter = AtomicInteger(count) private val minimalMaxDuration = Duration.ofMillis(100) @@ -150,7 +150,7 @@ class ListeningCountDownLatch( ) { backend -> backend.count( latchKeyName = buildKey(name), - channelName = buildKey(channelSpace, name), + channelName = buildKey(name, channelSpace), clientId = clientId, count = currentCounter.get(), initialCount = count, @@ -203,10 +203,10 @@ class ListeningCountDownLatch( retryDelay = retryDelay, waitStrategy = WaitStrategy.MAJORITY, ) { backend -> - backend.listen(channelName = buildKey(channelSpace, name)) + backend.listen(channelName = buildKey(name, channelSpace)) } } @Suppress("NOTHING_TO_INLINE") - private inline fun buildKey(vararg parts: String) = keySpace + ":" + parts.joinToString(":") + private inline fun buildKey(name: String, vararg parts: String) = "{$keySpace:$name}:" + parts.joinToString(":") } diff --git a/redpulsar-core/src/test/kotlin/com/himadieiev/redpulsar/core/locks/ListeningCountDownLatchTest.kt b/redpulsar-core/src/test/kotlin/com/himadieiev/redpulsar/core/locks/ListeningCountDownLatchTest.kt index ac3b470..bee15ea 100644 --- a/redpulsar-core/src/test/kotlin/com/himadieiev/redpulsar/core/locks/ListeningCountDownLatchTest.kt +++ b/redpulsar-core/src/test/kotlin/com/himadieiev/redpulsar/core/locks/ListeningCountDownLatchTest.kt @@ -39,9 +39,9 @@ class ListeningCountDownLatchTest { @Test fun `count down`() { - backend.everyCount("countdownlatch:test", "countdownlatch:channels:test", 4, 4, returnVal = "OK") - backend.everyCount("countdownlatch:test", "countdownlatch:channels:test", 3, 4, returnVal = "OK") - backend.everyCount("countdownlatch:test", "countdownlatch:channels:test", 2, 4, returnVal = "OK") + backend.everyCount("{countdownlatch:test}:", "{countdownlatch:test}:channel", 4, 4, returnVal = "OK") + backend.everyCount("{countdownlatch:test}:", "{countdownlatch:test}:channel", 3, 4, returnVal = "OK") + backend.everyCount("{countdownlatch:test}:", "{countdownlatch:test}:channel", 2, 4, returnVal = "OK") val latch = ListeningCountDownLatch( "test", @@ -57,8 +57,8 @@ class ListeningCountDownLatchTest { @Test fun `count down failing`() { - backend.everyCount("countdownlatch:test", "countdownlatch:channels:test", 2, 2, returnVal = null) - backend.everyUndoCount("countdownlatch:test", 2, 1) + backend.everyCount("{countdownlatch:test}:", "{countdownlatch:test}:channel", 2, 2, returnVal = null) + backend.everyUndoCount("{countdownlatch:test}:", 2, 1) val latch = ListeningCountDownLatch( "test", @@ -76,8 +76,8 @@ class ListeningCountDownLatchTest { @Test fun `undo count failing`() { - backend.everyCount("countdownlatch:test", "countdownlatch:channels:test", 2, 2, returnVal = null) - backend.everyUndoCount("countdownlatch:test", 2, null) + backend.everyCount("{countdownlatch:test}:", "{countdownlatch:test}:channel", 2, 2, returnVal = null) + backend.everyUndoCount("{countdownlatch:test}:", 2, null) val latch = ListeningCountDownLatch( "test", @@ -96,7 +96,7 @@ class ListeningCountDownLatchTest { @Test fun await() { backend.everyListen("open") - backend.everyCheckCount("countdownlatch:test", 2) + backend.everyCheckCount("{countdownlatch:test}:", 2) val latch = ListeningCountDownLatch( "test", @@ -117,7 +117,7 @@ class ListeningCountDownLatchTest { @Test fun `await finished by global count`() { - backend.everyCheckCount("countdownlatch:test", 4) + backend.everyCheckCount("{countdownlatch:test}:", 4) val latch = ListeningCountDownLatch( "test", @@ -138,7 +138,7 @@ class ListeningCountDownLatchTest { @Test fun `await - check count failed`() { - backend.everyCheckCount("countdownlatch:test", null) + backend.everyCheckCount("{countdownlatch:test}:", null) val latch = ListeningCountDownLatch( "test", @@ -175,13 +175,13 @@ class ListeningCountDownLatchTest { @Test fun `await timed out`() { coEvery { - backend.listen(eq("countdownlatch:channels:test")) + backend.listen(eq("{countdownlatch:test}:channel")) } answers { runBlocking { delay(1000) } null } - backend.everyCheckCount("countdownlatch:test", 2) + backend.everyCheckCount("{countdownlatch:test}:", 2) val latch = ListeningCountDownLatch("test", 4, listOf(backend)) assertEquals(CallResult.FAILED, latch.await(Duration.ofMillis(110))) @@ -196,7 +196,7 @@ class ListeningCountDownLatchTest { @Test fun `await failed`() { backend.everyListen(null) - backend.everyCheckCount("countdownlatch:test", 3) + backend.everyCheckCount("{countdownlatch:test}:", 3) val latch = ListeningCountDownLatch( "test", @@ -216,7 +216,7 @@ class ListeningCountDownLatchTest { @ParameterizedTest(name = "current count: {0}") @ValueSource(ints = [-123, -1, 0, 1, 2, 5]) fun `check count`(count: Long) { - backend.everyCheckCount("countdownlatch:test", count) + backend.everyCheckCount("{countdownlatch:test}:", count) val latch = ListeningCountDownLatch("test", 5, listOf(backend)) assertEquals(5 - count.toInt(), latch.getCount()) @@ -226,7 +226,7 @@ class ListeningCountDownLatchTest { @Test fun `check count failed`() { - backend.everyCheckCount("countdownlatch:test", null) + backend.everyCheckCount("{countdownlatch:test}:", null) val latch = ListeningCountDownLatch( "test", @@ -391,8 +391,8 @@ class ListeningCountDownLatchTest { fun `all instances are in quorum for count down`() { instances.forEach { backend -> backend.everyCount( - "countdownlatch:test", - "countdownlatch:channels:test", + "{countdownlatch:test}:", + "{countdownlatch:test}:channel", 4, 4, Duration.ofMinutes(10), @@ -417,24 +417,24 @@ class ListeningCountDownLatchTest { @Test fun `two instances are in quorum for count down`() { backend1.everyCount( - "countdownlatch:test", - "countdownlatch:channels:test", + "{countdownlatch:test}:", + "{countdownlatch:test}:channel", 4, 4, Duration.ofMinutes(10), "OK", ) backend2.everyCount( - "countdownlatch:test", - "countdownlatch:channels:test", + "{countdownlatch:test}:", + "{countdownlatch:test}:channel", 4, 4, Duration.ofMinutes(10), null, ) backend3.everyCount( - "countdownlatch:test", - "countdownlatch:channels:test", + "{countdownlatch:test}:", + "{countdownlatch:test}:channel", 4, 4, Duration.ofMinutes(10), @@ -458,31 +458,31 @@ class ListeningCountDownLatchTest { @Test fun `quorum wasn't reach for count down`() { backend1.everyCount( - "countdownlatch:test", - "countdownlatch:channels:test", + "{countdownlatch:test}:", + "{countdownlatch:test}:channel", 4, 4, Duration.ofMinutes(10), null, ) backend2.everyCount( - "countdownlatch:test", - "countdownlatch:channels:test", + "{countdownlatch:test}:", + "{countdownlatch:test}:channel", 4, 4, Duration.ofMinutes(10), null, ) backend3.everyCount( - "countdownlatch:test", - "countdownlatch:channels:test", + "{countdownlatch:test}:", + "{countdownlatch:test}:channel", 4, 4, Duration.ofMinutes(10), "OK", ) instances.forEach { backend -> - backend.everyUndoCount("countdownlatch:test", 4, 1) + backend.everyUndoCount("{countdownlatch:test}:", 4, 1) } val latch = ListeningCountDownLatch( @@ -503,7 +503,7 @@ class ListeningCountDownLatchTest { fun `all instances are in quorum for await`() { instances.forEach { backend -> backend.everyListen("open") - backend.everyCheckCount("countdownlatch:test", 1) + backend.everyCheckCount("{countdownlatch:test}:", 1) } val latch = ListeningCountDownLatch( @@ -526,7 +526,7 @@ class ListeningCountDownLatchTest { @Test fun `two instances are in quorum for await`() { instances.forEach { backend -> - backend.everyCheckCount("countdownlatch:test", 1) + backend.everyCheckCount("{countdownlatch:test}:", 1) } backend1.everyListen("open") backend2.everyListen(null) @@ -552,7 +552,7 @@ class ListeningCountDownLatchTest { @Test fun `one instance will continue to wait`() { instances.forEach { backend -> - backend.everyCheckCount("countdownlatch:test", 1) + backend.everyCheckCount("{countdownlatch:test}:", 1) } backend1.everyListen { delay(50) @@ -587,7 +587,7 @@ class ListeningCountDownLatchTest { @Test fun `failed instance return value first`() { instances.forEach { backend -> - backend.everyCheckCount("countdownlatch:test", 1) + backend.everyCheckCount("{countdownlatch:test}:", 1) } backend1.everyListen { delay(100) @@ -622,7 +622,7 @@ class ListeningCountDownLatchTest { @Test fun `quorum wasn't reach at majority`() { instances.forEach { backend -> - backend.everyCheckCount("countdownlatch:test", 1) + backend.everyCheckCount("{countdownlatch:test}:", 1) } backend1.everyListen(null) backend2.everyListen("open") @@ -647,7 +647,7 @@ class ListeningCountDownLatchTest { @Test fun `all instances are down`() { instances.forEach { backend -> - backend.everyCheckCount("countdownlatch:test", 1) + backend.everyCheckCount("{countdownlatch:test}:", 1) backend.everyListen(null) } val latch = @@ -669,9 +669,9 @@ class ListeningCountDownLatchTest { @Test fun `check count return max value of majority`() { - backend1.everyCheckCount("countdownlatch:test", 1) - backend2.everyCheckCount("countdownlatch:test", 2) - backend3.everyCheckCount("countdownlatch:test", 1) + backend1.everyCheckCount("{countdownlatch:test}:", 1) + backend2.everyCheckCount("{countdownlatch:test}:", 2) + backend3.everyCheckCount("{countdownlatch:test}:", 1) val latch = ListeningCountDownLatch( @@ -687,9 +687,9 @@ class ListeningCountDownLatchTest { @Test fun `check count return min int`() { - backend1.everyCheckCount("countdownlatch:test", null) - backend2.everyCheckCount("countdownlatch:test", null) - backend3.everyCheckCount("countdownlatch:test", 2) + backend1.everyCheckCount("{countdownlatch:test}:", null) + backend2.everyCheckCount("{countdownlatch:test}:", null) + backend3.everyCheckCount("{countdownlatch:test}:", 2) val latch = ListeningCountDownLatch( @@ -745,14 +745,14 @@ class ListeningCountDownLatchTest { private fun CountDownLatchBackend.everyListen(returnVal: String?) { val backend = this coEvery { - backend.listen(eq("countdownlatch:channels:test")) + backend.listen(eq("{countdownlatch:test}:channel")) } returns returnVal } private fun CountDownLatchBackend.everyListen(answer: suspend MockKAnswerScope.(Call) -> String?) { val backend = this coEvery { - backend.listen(eq("countdownlatch:channels:test")) + backend.listen(eq("{countdownlatch:test}:channel")) } coAnswers answer } diff --git a/redpulsar-jedis/src/test/kotlin/com/himadieiev/redpulsar/jedis/locks/backends/JedisCountDownLatchBackendTest.kt b/redpulsar-jedis/src/test/kotlin/com/himadieiev/redpulsar/jedis/locks/backends/JedisCountDownLatchBackendTest.kt index a1f1bee..a6d0a59 100644 --- a/redpulsar-jedis/src/test/kotlin/com/himadieiev/redpulsar/jedis/locks/backends/JedisCountDownLatchBackendTest.kt +++ b/redpulsar-jedis/src/test/kotlin/com/himadieiev/redpulsar/jedis/locks/backends/JedisCountDownLatchBackendTest.kt @@ -44,12 +44,12 @@ class JedisCountDownLatchBackendTest { every { redis.eval( any(), - eq(listOf("latch:test", "latch:channel:test")), + eq(listOf("latch:test", "latch:test:channel")), eq(listOf("${clientId}0", "5000", "4")), ) } returns "OK" val callResult = - countDownLatchBackend.count("latch:test", "latch:channel:test", clientId, 0, 4, Duration.ofSeconds(5)) + countDownLatchBackend.count("latch:test", "latch:test:channel", clientId, 0, 4, Duration.ofSeconds(5)) assertEquals("OK", callResult) verify(exactly = 1) { @@ -63,12 +63,12 @@ class JedisCountDownLatchBackendTest { every { redis.eval( any(), - eq(listOf("latch:test", "latch:channel:test")), + eq(listOf("latch:test", "latch:test:channel")), eq(listOf("${clientId}0", "5000", "4")), ) } throws IOException("test exception") val callResult = - countDownLatchBackend.count("latch:test", "latch:channel:test", clientId, 0, 4, Duration.ofSeconds(5)) + countDownLatchBackend.count("latch:test", "latch:test:channel", clientId, 0, 4, Duration.ofSeconds(5)) assertNull(callResult) verify(exactly = 1) { @@ -132,14 +132,14 @@ class JedisCountDownLatchBackendTest { val channel = slot() every { redis.subscribe(capture(pubSubSlot), capture(channel)) } returns Unit CoroutineScope(CoroutineName("test")).launch { - val result = countDownLatchBackend.listen("latch:channel:test") + val result = countDownLatchBackend.listen("latch:test:channel") assertEquals("open", result) } runBlocking { delay(200) } repeat(messageCount) { - pubSubSlot.captured.onMessage("latch:channel:test", "open") + pubSubSlot.captured.onMessage("latch:test:channel", "open") } - assertEquals("latch:channel:test", channel.captured) + assertEquals("latch:test:channel", channel.captured) verify(exactly = 1) { redis.subscribe(any(), any()) } @@ -147,11 +147,11 @@ class JedisCountDownLatchBackendTest { @Test fun `message not received`() { - every { redis.subscribe(any(), eq("latch:channel:test")) } returns Unit + every { redis.subscribe(any(), eq("latch:test:channel")) } returns Unit runBlocking { assertThrows { - withTimeout(100) { countDownLatchBackend.listen("latch:channel:test") } + withTimeout(100) { countDownLatchBackend.listen("latch:test:channel") } } } verify(exactly = 1) { diff --git a/redpulsar-lettuce/src/test/kotlin/com/himadieiev/redpulsar/lettuce/locks/backends/LettuceCountDownLatchBackendTest.kt b/redpulsar-lettuce/src/test/kotlin/com/himadieiev/redpulsar/lettuce/locks/backends/LettuceCountDownLatchBackendTest.kt index 2829ae4..162ba02 100644 --- a/redpulsar-lettuce/src/test/kotlin/com/himadieiev/redpulsar/lettuce/locks/backends/LettuceCountDownLatchBackendTest.kt +++ b/redpulsar-lettuce/src/test/kotlin/com/himadieiev/redpulsar/lettuce/locks/backends/LettuceCountDownLatchBackendTest.kt @@ -55,12 +55,12 @@ class LettuceCountDownLatchBackendTest { sync.eval( any(), eq(ScriptOutputType.STATUS), - eq(arrayOf("latch:test", "latch:channel:test")), + eq(arrayOf("latch:test", "latch:test:channel")), eq("${clientId}0"), eq("5000"), eq("4"), ) } returns "OK" val callResult = - countDownLatchBackend.count("latch:test", "latch:channel:test", clientId, 0, 4, Duration.ofSeconds(5)) + countDownLatchBackend.count("latch:test", "latch:test:channel", clientId, 0, 4, Duration.ofSeconds(5)) Assertions.assertEquals("OK", callResult) verify(exactly = 1) { @@ -75,12 +75,12 @@ class LettuceCountDownLatchBackendTest { sync.eval( any(), eq(ScriptOutputType.STATUS), - eq(arrayOf("latch:test", "latch:channel:test")), + eq(arrayOf("latch:test", "latch:test:channel")), eq("${clientId}0"), eq("5000"), eq("4"), ) } throws IOException("test exception") val callResult = - countDownLatchBackend.count("latch:test", "latch:channel:test", clientId, 0, 4, Duration.ofSeconds(5)) + countDownLatchBackend.count("latch:test", "latch:test:channel", clientId, 0, 4, Duration.ofSeconds(5)) Assertions.assertNull(callResult) verify(exactly = 1) { @@ -142,8 +142,8 @@ class LettuceCountDownLatchBackendTest { @BeforeEach fun setUp() { connection = mockk() - every { sync.subscribe(eq("latch:channel:test")) } returns Unit - every { sync.unsubscribe(eq("latch:channel:test")) } returns Unit + every { sync.subscribe(eq("latch:test:channel")) } returns Unit + every { sync.unsubscribe(eq("latch:test:channel")) } returns Unit every { sync.statefulConnection } returns connection every { connection.addListener(any>()) } returns Unit every { connection.removeListener(any>()) } returns Unit @@ -154,19 +154,19 @@ class LettuceCountDownLatchBackendTest { fun `listen produce value`(messageCount: Int) { val listener = slot>() every { connection.addListener(capture(listener)) } returns Unit - CoroutineScope(CoroutineName("latch:channel:test")).launch { - val result = countDownLatchBackend.listen("latch:channel:test") + CoroutineScope(CoroutineName("latch:test:channel")).launch { + val result = countDownLatchBackend.listen("latch:test:channel") Assertions.assertEquals("open", result) } runBlocking { Thread.sleep(200) } repeat(messageCount) { - listener.captured.message("latch:channel:test", "open") + listener.captured.message("latch:test:channel", "open") } runBlocking { Thread.sleep(100) } verify(exactly = 1) { - sync.subscribe(eq("latch:channel:test")) - sync.unsubscribe(eq("latch:channel:test")) + sync.subscribe(eq("latch:test:channel")) + sync.unsubscribe(eq("latch:test:channel")) } } @@ -174,11 +174,11 @@ class LettuceCountDownLatchBackendTest { fun `message not received`() { runBlocking { assertThrows { - withTimeout(100) { countDownLatchBackend.listen("latch:channel:test") } + withTimeout(100) { countDownLatchBackend.listen("latch:test:channel") } } } verify(exactly = 1) { - sync.subscribe("latch:channel:test") + sync.subscribe("latch:test:channel") } } }