diff --git a/build.gradle.kts b/build.gradle.kts index 5552fcd..d5c158b 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -18,7 +18,7 @@ plugins { allprojects { group = "com.himadieiev" - version = "1.1.0" + version = "1.1.1" repositories { mavenCentral() diff --git a/redpulsar-core/src/main/kotlin/com/himadieiev/redpulsar/core/locks/ListeningCountDownLatch.kt b/redpulsar-core/src/main/kotlin/com/himadieiev/redpulsar/core/locks/ListeningCountDownLatch.kt index ce74cff..a847fae 100644 --- a/redpulsar-core/src/main/kotlin/com/himadieiev/redpulsar/core/locks/ListeningCountDownLatch.kt +++ b/redpulsar-core/src/main/kotlin/com/himadieiev/redpulsar/core/locks/ListeningCountDownLatch.kt @@ -5,12 +5,10 @@ import com.himadieiev.redpulsar.core.locks.api.CallResult import com.himadieiev.redpulsar.core.locks.api.CountDownLatch import com.himadieiev.redpulsar.core.locks.excecutors.executeWithRetry import com.himadieiev.redpulsar.core.locks.excecutors.waitAnyJobs -import com.himadieiev.redpulsar.core.utils.failsafe import kotlinx.coroutines.CoroutineName import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.async -import kotlinx.coroutines.flow.first import kotlinx.coroutines.runBlocking import kotlinx.coroutines.withTimeout import java.time.Duration @@ -198,10 +196,7 @@ class ListeningCountDownLatch( // if multiple instances goes down or encounter network issue. waiter = ::waitAnyJobs, ) { backend -> - failsafe(null) { - val flow = backend.listen(channelName = buildKey(channelSpace, name)) - flow.first() - } + backend.listen(channelName = buildKey(channelSpace, name)) } } diff --git a/redpulsar-core/src/main/kotlin/com/himadieiev/redpulsar/core/locks/abstracts/backends/CountDownLatchBackend.kt b/redpulsar-core/src/main/kotlin/com/himadieiev/redpulsar/core/locks/abstracts/backends/CountDownLatchBackend.kt index ba5646a..2f6c22e 100644 --- a/redpulsar-core/src/main/kotlin/com/himadieiev/redpulsar/core/locks/abstracts/backends/CountDownLatchBackend.kt +++ b/redpulsar-core/src/main/kotlin/com/himadieiev/redpulsar/core/locks/abstracts/backends/CountDownLatchBackend.kt @@ -1,7 +1,6 @@ package com.himadieiev.redpulsar.core.locks.abstracts.backends import com.himadieiev.redpulsar.core.locks.abstracts.Backend -import kotlinx.coroutines.flow.Flow import java.time.Duration /** @@ -31,6 +30,6 @@ abstract class CountDownLatchBackend : Backend() { abstract fun checkCount(latchKeyName: String): Long? - /** Receive notification about count down latch is opened now. This is supposed to be a blocking call*/ - abstract fun listen(channelName: String): Flow + /** Receive notification about count down latch is opened now. */ + abstract suspend fun listen(channelName: String): String? } diff --git a/redpulsar-core/src/test/kotlin/com/himadieiev/redpulsar/core/locks/ListeningCountDownLatchTest.kt b/redpulsar-core/src/test/kotlin/com/himadieiev/redpulsar/core/locks/ListeningCountDownLatchTest.kt index 8a21129..60e5f84 100644 --- a/redpulsar-core/src/test/kotlin/com/himadieiev/redpulsar/core/locks/ListeningCountDownLatchTest.kt +++ b/redpulsar-core/src/test/kotlin/com/himadieiev/redpulsar/core/locks/ListeningCountDownLatchTest.kt @@ -4,14 +4,14 @@ import TestTags import com.himadieiev.redpulsar.core.locks.abstracts.backends.CountDownLatchBackend import com.himadieiev.redpulsar.core.locks.api.CallResult import io.mockk.coEvery +import io.mockk.coVerify import io.mockk.every import io.mockk.mockk import io.mockk.mockkObject import io.mockk.verify import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.delay -import kotlinx.coroutines.flow.Flow -import kotlinx.coroutines.flow.flow +import kotlinx.coroutines.runBlocking import org.junit.jupiter.api.Assertions.assertDoesNotThrow import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.BeforeEach @@ -21,7 +21,6 @@ import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertThrows import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.ValueSource -import java.io.IOException import java.time.Duration import java.util.concurrent.CancellationException @@ -94,8 +93,7 @@ class ListeningCountDownLatchTest { @Test fun await() { - val flow = flow { emit("open") } - backend.everyListen(flow) + backend.everyListen("open") backend.everyCheckCount("countdownlatch:test", 2) val latch = ListeningCountDownLatch( @@ -107,8 +105,10 @@ class ListeningCountDownLatchTest { assertEquals(CallResult.SUCCESS, latch.await()) - verify(exactly = 1) { + coVerify(exactly = 1) { backend.listen(any()) + } + verify(exactly = 1) { backend.checkCount(any()) } } @@ -129,7 +129,7 @@ class ListeningCountDownLatchTest { verify(exactly = 1) { backend.checkCount(any()) } - verify(exactly = 0) { + coVerify(exactly = 0) { backend.listen(any()) } } @@ -150,7 +150,7 @@ class ListeningCountDownLatchTest { assertEquals(CallResult.FAILED, latch.await()) verify(exactly = 2) { backend.checkCount(any()) } - verify(exactly = 0) { backend.listen(any()) } + coVerify(exactly = 0) { backend.listen(any()) } } @Test @@ -164,6 +164,8 @@ class ListeningCountDownLatchTest { assertEquals(CallResult.FAILED, latch.await()) verify(exactly = 0) { backend.checkCount(any()) + } + coVerify(exactly = 0) { backend.listen(any()) } } @@ -172,10 +174,10 @@ class ListeningCountDownLatchTest { fun `await timed out`() { coEvery { backend.listen(eq("countdownlatch:channels:test")) - } returns - flow { - delay(1000) - } + } answers { + runBlocking { delay(1000) } + null + } backend.everyCheckCount("countdownlatch:test", 2) val latch = ListeningCountDownLatch("test", 4, listOf(backend)) @@ -183,18 +185,15 @@ class ListeningCountDownLatchTest { assertEquals(CallResult.FAILED, latch.await(Duration.ofMillis(110))) verify(exactly = 1) { backend.checkCount(any()) + } + coVerify(exactly = 1) { backend.listen(any()) } } @Test fun `await failed`() { - val flow = - flow { - delay(10) - IOException("test exception") - } - backend.everyListen(flow) + backend.everyListen(null) backend.everyCheckCount("countdownlatch:test", 3) val latch = ListeningCountDownLatch( @@ -209,26 +208,7 @@ class ListeningCountDownLatchTest { assertEquals(CallResult.FAILED, latch.await()) verify(exactly = 1) { backend.checkCount(any()) } - verify(exactly = 5) { backend.listen(any()) } - } - - @Test - fun `await in flow throws cancellation exception`() { - val flow = flow { CancellationException("test exception") } - backend.everyListen(flow) - backend.everyCheckCount("countdownlatch:test", 2) - val latch = - ListeningCountDownLatch( - "test", - 4, - listOf(backend), - retryDelay = Duration.ofMillis(1), - ) - - assertEquals(CallResult.FAILED, latch.await()) - - verify(exactly = 1) { backend.checkCount(any()) } - verify(exactly = 3) { backend.listen(any()) } + coVerify(exactly = 5) { backend.listen(any()) } } @ParameterizedTest(name = "current count: {0}") @@ -408,7 +388,14 @@ class ListeningCountDownLatchTest { @Test fun `all instances are in quorum for count down`() { instances.forEach { backend -> - backend.everyCount("countdownlatch:test", "countdownlatch:channels:test", 4, 4, Duration.ofMinutes(10), "OK") + backend.everyCount( + "countdownlatch:test", + "countdownlatch:channels:test", + 4, + 4, + Duration.ofMinutes(10), + "OK", + ) } val latch = ListeningCountDownLatch( @@ -427,9 +414,30 @@ class ListeningCountDownLatchTest { @Test fun `two instances are in quorum for count down`() { - backend1.everyCount("countdownlatch:test", "countdownlatch:channels:test", 4, 4, Duration.ofMinutes(10), "OK") - backend2.everyCount("countdownlatch:test", "countdownlatch:channels:test", 4, 4, Duration.ofMinutes(10), null) - backend3.everyCount("countdownlatch:test", "countdownlatch:channels:test", 4, 4, Duration.ofMinutes(10), "OK") + backend1.everyCount( + "countdownlatch:test", + "countdownlatch:channels:test", + 4, + 4, + Duration.ofMinutes(10), + "OK", + ) + backend2.everyCount( + "countdownlatch:test", + "countdownlatch:channels:test", + 4, + 4, + Duration.ofMinutes(10), + null, + ) + backend3.everyCount( + "countdownlatch:test", + "countdownlatch:channels:test", + 4, + 4, + Duration.ofMinutes(10), + "OK", + ) val latch = ListeningCountDownLatch( "test", @@ -447,9 +455,30 @@ class ListeningCountDownLatchTest { @Test fun `quorum wasn't reach for count down`() { - backend1.everyCount("countdownlatch:test", "countdownlatch:channels:test", 4, 4, Duration.ofMinutes(10), null) - backend2.everyCount("countdownlatch:test", "countdownlatch:channels:test", 4, 4, Duration.ofMinutes(10), null) - backend3.everyCount("countdownlatch:test", "countdownlatch:channels:test", 4, 4, Duration.ofMinutes(10), "OK") + backend1.everyCount( + "countdownlatch:test", + "countdownlatch:channels:test", + 4, + 4, + Duration.ofMinutes(10), + null, + ) + backend2.everyCount( + "countdownlatch:test", + "countdownlatch:channels:test", + 4, + 4, + Duration.ofMinutes(10), + null, + ) + backend3.everyCount( + "countdownlatch:test", + "countdownlatch:channels:test", + 4, + 4, + Duration.ofMinutes(10), + "OK", + ) instances.forEach { backend -> backend.everyUndoCount("countdownlatch:test", 4, 1) } @@ -470,9 +499,8 @@ class ListeningCountDownLatchTest { @Test fun `all instances are in quorum for await`() { - val flow = flow { emit("open") } instances.forEach { backend -> - backend.everyListen(flow) + backend.everyListen("open") backend.everyCheckCount("countdownlatch:test", 1) } val latch = @@ -484,26 +512,22 @@ class ListeningCountDownLatchTest { ) assertEquals(CallResult.SUCCESS, latch.await()) - verify(exactly = 1) { + coVerify(atMost = 1) { instances.forEach { backend -> backend.listen(any()) } + } + verify(exactly = 1) { instances.forEach { backend -> backend.checkCount(any()) } } } @Test fun `two instances are in quorum for await`() { - val flow = - flow { - delay(50) - IOException("test exception 2") - } - val okFlow = flow { emit("open") } instances.forEach { backend -> backend.everyCheckCount("countdownlatch:test", 1) } - backend1.everyListen(okFlow) - backend2.everyListen(flow) - backend3.everyListen(okFlow) + backend1.everyListen("open") + backend2.everyListen(null) + backend3.everyListen("open") val latch = ListeningCountDownLatch( "test", @@ -513,26 +537,22 @@ class ListeningCountDownLatchTest { ) assertEquals(CallResult.SUCCESS, latch.await()) - verify(exactly = 1) { + coVerify(exactly = 1) { instances.forEach { backend -> backend.listen(any()) } + } + verify(exactly = 1) { instances.forEach { backend -> backend.checkCount(any()) } } } @Test fun `quorum wasn't reach but await succeed`() { - val flow = - flow { - delay(10) - IOException("test exception") - } - val okFlow = flow { emit("open") } instances.forEach { backend -> backend.everyCheckCount("countdownlatch:test", 1) } - backend1.everyListen(flow) - backend2.everyListen(okFlow) - backend3.everyListen(flow) + backend1.everyListen(null) + backend2.everyListen("open") + backend3.everyListen(null) val latch = ListeningCountDownLatch( "test", @@ -542,22 +562,19 @@ class ListeningCountDownLatchTest { ) assertEquals(CallResult.SUCCESS, latch.await()) - verify(exactly = 1) { + coVerify(atMost = 1) { instances.forEach { backend -> backend.listen(any()) } + } + verify(exactly = 1) { instances.forEach { backend -> backend.checkCount(any()) } } } @Test fun `all instances are down`() { - val flow = - flow { - delay(10) - IOException("test exception") - } instances.forEach { backend -> backend.everyCheckCount("countdownlatch:test", 1) - backend.everyListen(flow) + backend.everyListen(null) } val latch = ListeningCountDownLatch( @@ -568,7 +585,7 @@ class ListeningCountDownLatchTest { ) assertEquals(CallResult.FAILED, latch.await()) - verify(exactly = 3) { + coVerify(exactly = 3) { instances.forEach { backend -> backend.listen(any()) } } verify(exactly = 1) { @@ -651,7 +668,7 @@ class ListeningCountDownLatchTest { } returns returnVal } - private fun CountDownLatchBackend.everyListen(returnVal: Flow) { + private fun CountDownLatchBackend.everyListen(returnVal: String?) { val backend = this coEvery { backend.listen(eq("countdownlatch:channels:test")) diff --git a/redpulsar-core/src/test/kotlin/com/himadieiev/redpulsar/core/locks/excecutors/MultyInstanceExecutorTest.kt b/redpulsar-core/src/test/kotlin/com/himadieiev/redpulsar/core/locks/excecutors/MultyInstanceExecutorTest.kt index 33727e2..c305664 100644 --- a/redpulsar-core/src/test/kotlin/com/himadieiev/redpulsar/core/locks/excecutors/MultyInstanceExecutorTest.kt +++ b/redpulsar-core/src/test/kotlin/com/himadieiev/redpulsar/core/locks/excecutors/MultyInstanceExecutorTest.kt @@ -8,6 +8,7 @@ import io.mockk.verify import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Dispatchers import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Assertions.assertTrue import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.Tag import org.junit.jupiter.params.ParameterizedTest @@ -108,7 +109,7 @@ class MultyInstanceExecutorTest { waiter = ::waitAnyJobs, ) { backend -> backend.test() } - assertEquals(createResponses(number), result) + assertTrue(createResponses(number).size <= result.size) verify(exactly = 1) { backends.forEach { backend -> backend.test() } } } diff --git a/redpulsar-jedis/src/main/kotlin/com/himadieiev/redpulsar/jedis/locks/backends/JedisCountDownLatchBackend.kt b/redpulsar-jedis/src/main/kotlin/com/himadieiev/redpulsar/jedis/locks/backends/JedisCountDownLatchBackend.kt index 3e894eb..1cc848d 100644 --- a/redpulsar-jedis/src/main/kotlin/com/himadieiev/redpulsar/jedis/locks/backends/JedisCountDownLatchBackend.kt +++ b/redpulsar-jedis/src/main/kotlin/com/himadieiev/redpulsar/jedis/locks/backends/JedisCountDownLatchBackend.kt @@ -3,8 +3,8 @@ package com.himadieiev.redpulsar.jedis.locks.backends import com.himadieiev.redpulsar.core.locks.abstracts.backends.CountDownLatchBackend import com.himadieiev.redpulsar.core.utils.failsafe import kotlinx.coroutines.channels.awaitClose -import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.callbackFlow +import kotlinx.coroutines.flow.first import kotlinx.coroutines.launch import mu.KotlinLogging import redis.clients.jedis.JedisPubSub @@ -64,8 +64,8 @@ internal class JedisCountDownLatchBackend(private val jedis: UnifiedJedis) : Cou } } - override fun listen(channelName: String): Flow { - val flow = + override suspend fun listen(channelName: String): String? { + return failsafe(null) { callbackFlow { val pubSub = object : JedisPubSub() { @@ -83,13 +83,13 @@ internal class JedisCountDownLatchBackend(private val jedis: UnifiedJedis) : Cou try { pubSub.unsubscribe(channelName) } catch (e: Exception) { - // supress unsubscribe errors as it might be uew to lost connection + // suppress unsubscribe errors as it might be uew to lost connection val logger = KotlinLogging.logger {} logger.info { "Unsubscribe failed: ${e.message}" } } job.cancel() } - } - return flow + }.first() + } } } diff --git a/redpulsar-jedis/src/test/kotlin/com/himadieiev/redpulsar/jedis/locks/backends/JedisCountDownLatchBackendTest.kt b/redpulsar-jedis/src/test/kotlin/com/himadieiev/redpulsar/jedis/locks/backends/JedisCountDownLatchBackendTest.kt index a2abdcd..ec1004e 100644 --- a/redpulsar-jedis/src/test/kotlin/com/himadieiev/redpulsar/jedis/locks/backends/JedisCountDownLatchBackendTest.kt +++ b/redpulsar-jedis/src/test/kotlin/com/himadieiev/redpulsar/jedis/locks/backends/JedisCountDownLatchBackendTest.kt @@ -7,7 +7,7 @@ import io.mockk.verify import kotlinx.coroutines.CoroutineName import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.TimeoutCancellationException -import kotlinx.coroutines.flow.first +import kotlinx.coroutines.delay import kotlinx.coroutines.launch import kotlinx.coroutines.runBlocking import kotlinx.coroutines.withTimeout @@ -23,7 +23,6 @@ import org.junit.jupiter.params.provider.ValueSource import redis.clients.jedis.JedisPubSub import redis.clients.jedis.UnifiedJedis import java.io.IOException -import java.lang.Thread.sleep import java.time.Duration @Tag(TestTags.UNIT) @@ -49,7 +48,8 @@ class JedisCountDownLatchBackendTest { eq(listOf("${clientId}0", "5000", "4")), ) } returns "OK" - val callResult = countDownLatchBackend.count("latch:test", "latch:channel:test", clientId, 0, 4, Duration.ofSeconds(5)) + val callResult = + countDownLatchBackend.count("latch:test", "latch:channel:test", clientId, 0, 4, Duration.ofSeconds(5)) assertEquals("OK", callResult) verify(exactly = 1) { @@ -67,7 +67,8 @@ class JedisCountDownLatchBackendTest { eq(listOf("${clientId}0", "5000", "4")), ) } throws IOException("test exception") - val callResult = countDownLatchBackend.count("latch:test", "latch:channel:test", clientId, 0, 4, Duration.ofSeconds(5)) + val callResult = + countDownLatchBackend.count("latch:test", "latch:channel:test", clientId, 0, 4, Duration.ofSeconds(5)) assertNull(callResult) verify(exactly = 1) { @@ -130,16 +131,14 @@ class JedisCountDownLatchBackendTest { val pubSubSlot = slot() val channel = slot() every { redis.subscribe(capture(pubSubSlot), capture(channel)) } returns Unit - val flow = countDownLatchBackend.listen("latch:channel:test") - CoroutineScope(CoroutineName("latch:channel:test")).launch { - val first = flow.first() - assertEquals("open", first) + CoroutineScope(CoroutineName("test")).launch { + val result = countDownLatchBackend.listen("latch:channel:test") + assertEquals("open", result) } - runBlocking { sleep(100) } + runBlocking { delay(100) } repeat(messageCount) { - pubSubSlot.captured.onMessage("latch:channel:test", "test") + pubSubSlot.captured.onMessage("latch:channel:test", "open") } - assertEquals("latch:channel:test", channel.captured) verify(exactly = 1) { redis.subscribe(any(), any()) @@ -149,11 +148,10 @@ class JedisCountDownLatchBackendTest { @Test fun `message not received`() { every { redis.subscribe(any(), eq("latch:channel:test")) } returns Unit - val flow = countDownLatchBackend.listen("latch:channel:test") runBlocking { assertThrows { - withTimeout(100) { flow.first() } + withTimeout(100) { countDownLatchBackend.listen("latch:channel:test") } } } verify(exactly = 1) { diff --git a/redpulsar-lettuce/src/main/kotlin/com/himadieiev/redpulsar/lettuce/locks/backends/LettuceCountDownLatchBackend.kt b/redpulsar-lettuce/src/main/kotlin/com/himadieiev/redpulsar/lettuce/locks/backends/LettuceCountDownLatchBackend.kt index 3964a52..d7a9905 100644 --- a/redpulsar-lettuce/src/main/kotlin/com/himadieiev/redpulsar/lettuce/locks/backends/LettuceCountDownLatchBackend.kt +++ b/redpulsar-lettuce/src/main/kotlin/com/himadieiev/redpulsar/lettuce/locks/backends/LettuceCountDownLatchBackend.kt @@ -6,8 +6,8 @@ import com.himadieiev.redpulsar.lettuce.LettucePubSubPooled import io.lettuce.core.ScriptOutputType import io.lettuce.core.pubsub.RedisPubSubListener import kotlinx.coroutines.channels.awaitClose -import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.callbackFlow +import kotlinx.coroutines.flow.first import kotlinx.coroutines.isActive import kotlinx.coroutines.launch import java.time.Duration @@ -78,8 +78,8 @@ internal class LettuceCountDownLatchBackend(private val redis: LettucePubSubPool } } - override fun listen(channelName: String): Flow { - val flow = + override suspend fun listen(channelName: String): String? { + return failsafe(null) { callbackFlow { val pubSub = object : RedisPubSubListener { @@ -138,7 +138,7 @@ internal class LettuceCountDownLatchBackend(private val redis: LettucePubSubPool awaitClose { job.cancel() } - } - return flow + }.first() + } } } diff --git a/redpulsar-lettuce/src/test/kotlin/com/himadieiev/redpulsar/lettuce/locks/backends/LettuceCountDownLatchBackendTest.kt b/redpulsar-lettuce/src/test/kotlin/com/himadieiev/redpulsar/lettuce/locks/backends/LettuceCountDownLatchBackendTest.kt index 54cd5d8..6862706 100644 --- a/redpulsar-lettuce/src/test/kotlin/com/himadieiev/redpulsar/lettuce/locks/backends/LettuceCountDownLatchBackendTest.kt +++ b/redpulsar-lettuce/src/test/kotlin/com/himadieiev/redpulsar/lettuce/locks/backends/LettuceCountDownLatchBackendTest.kt @@ -12,7 +12,6 @@ import io.mockk.verify import kotlinx.coroutines.CoroutineName import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.TimeoutCancellationException -import kotlinx.coroutines.flow.first import kotlinx.coroutines.launch import kotlinx.coroutines.runBlocking import kotlinx.coroutines.withTimeout @@ -155,19 +154,16 @@ class LettuceCountDownLatchBackendTest { fun `listen produce value`(messageCount: Int) { val listener = slot>() every { connection.addListener(capture(listener)) } returns Unit - val flow = countDownLatchBackend.listen("latch:channel:test") - val job = - CoroutineScope(CoroutineName("latch:channel:test")).launch { - val first = flow.first() - Assertions.assertEquals("open", first) - } - runBlocking { Thread.sleep(50) } - job.cancel() - runBlocking { Thread.sleep(50) } - repeat(messageCount) { - listener.captured.message("latch:channel:test", "test") + CoroutineScope(CoroutineName("latch:channel:test")).launch { + val result = countDownLatchBackend.listen("latch:channel:test") + Assertions.assertEquals("open", result) } + runBlocking { Thread.sleep(100) } + repeat(messageCount) { + listener.captured.message("latch:channel:test", "open") + } + runBlocking { Thread.sleep(100) } verify(exactly = 1) { sync.subscribe(eq("latch:channel:test")) sync.unsubscribe(eq("latch:channel:test")) @@ -176,11 +172,9 @@ class LettuceCountDownLatchBackendTest { @Test fun `message not received`() { - val flow = countDownLatchBackend.listen("latch:channel:test") - runBlocking { assertThrows { - withTimeout(100) { flow.first() } + withTimeout(100) { countDownLatchBackend.listen("latch:channel:test") } } } verify(exactly = 1) {