Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor listening backend api #55

Merged
merged 2 commits into from
Jan 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ plugins {

allprojects {
group = "com.himadieiev"
version = "1.1.0"
version = "1.1.1"

repositories {
mavenCentral()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,10 @@ import com.himadieiev.redpulsar.core.locks.api.CallResult
import com.himadieiev.redpulsar.core.locks.api.CountDownLatch
import com.himadieiev.redpulsar.core.locks.excecutors.executeWithRetry
import com.himadieiev.redpulsar.core.locks.excecutors.waitAnyJobs
import com.himadieiev.redpulsar.core.utils.failsafe
import kotlinx.coroutines.CoroutineName
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.async
import kotlinx.coroutines.flow.first
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.withTimeout
import java.time.Duration
Expand Down Expand Up @@ -198,10 +196,7 @@ class ListeningCountDownLatch(
// if multiple instances goes down or encounter network issue.
waiter = ::waitAnyJobs,
) { backend ->
failsafe(null) {
val flow = backend.listen(channelName = buildKey(channelSpace, name))
flow.first()
}
backend.listen(channelName = buildKey(channelSpace, name))
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package com.himadieiev.redpulsar.core.locks.abstracts.backends

import com.himadieiev.redpulsar.core.locks.abstracts.Backend
import kotlinx.coroutines.flow.Flow
import java.time.Duration

/**
Expand Down Expand Up @@ -31,6 +30,6 @@ abstract class CountDownLatchBackend : Backend() {

abstract fun checkCount(latchKeyName: String): Long?

/** Receive notification about count down latch is opened now. This is supposed to be a blocking call*/
abstract fun listen(channelName: String): Flow<String>
/** Receive notification about count down latch is opened now. */
abstract suspend fun listen(channelName: String): String?
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,14 @@ import TestTags
import com.himadieiev.redpulsar.core.locks.abstracts.backends.CountDownLatchBackend
import com.himadieiev.redpulsar.core.locks.api.CallResult
import io.mockk.coEvery
import io.mockk.coVerify
import io.mockk.every
import io.mockk.mockk
import io.mockk.mockkObject
import io.mockk.verify
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.runBlocking
import org.junit.jupiter.api.Assertions.assertDoesNotThrow
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.BeforeEach
Expand All @@ -21,7 +21,6 @@ import org.junit.jupiter.api.Test
import org.junit.jupiter.api.assertThrows
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource
import java.io.IOException
import java.time.Duration
import java.util.concurrent.CancellationException

Expand Down Expand Up @@ -94,8 +93,7 @@ class ListeningCountDownLatchTest {

@Test
fun await() {
val flow = flow { emit("open") }
backend.everyListen(flow)
backend.everyListen("open")
backend.everyCheckCount("countdownlatch:test", 2)
val latch =
ListeningCountDownLatch(
Expand All @@ -107,8 +105,10 @@ class ListeningCountDownLatchTest {

assertEquals(CallResult.SUCCESS, latch.await())

verify(exactly = 1) {
coVerify(exactly = 1) {
backend.listen(any())
}
verify(exactly = 1) {
backend.checkCount(any())
}
}
Expand All @@ -129,7 +129,7 @@ class ListeningCountDownLatchTest {
verify(exactly = 1) {
backend.checkCount(any())
}
verify(exactly = 0) {
coVerify(exactly = 0) {
backend.listen(any())
}
}
Expand All @@ -150,7 +150,7 @@ class ListeningCountDownLatchTest {
assertEquals(CallResult.FAILED, latch.await())

verify(exactly = 2) { backend.checkCount(any()) }
verify(exactly = 0) { backend.listen(any()) }
coVerify(exactly = 0) { backend.listen(any()) }
}

@Test
Expand All @@ -164,6 +164,8 @@ class ListeningCountDownLatchTest {
assertEquals(CallResult.FAILED, latch.await())
verify(exactly = 0) {
backend.checkCount(any())
}
coVerify(exactly = 0) {
backend.listen(any())
}
}
Expand All @@ -172,29 +174,26 @@ class ListeningCountDownLatchTest {
fun `await timed out`() {
coEvery {
backend.listen(eq("countdownlatch:channels:test"))
} returns
flow {
delay(1000)
}
} answers {
runBlocking { delay(1000) }
null
}

backend.everyCheckCount("countdownlatch:test", 2)
val latch = ListeningCountDownLatch("test", 4, listOf(backend))

assertEquals(CallResult.FAILED, latch.await(Duration.ofMillis(110)))
verify(exactly = 1) {
backend.checkCount(any())
}
coVerify(exactly = 1) {
backend.listen(any())
}
}

@Test
fun `await failed`() {
val flow =
flow<String> {
delay(10)
IOException("test exception")
}
backend.everyListen(flow)
backend.everyListen(null)
backend.everyCheckCount("countdownlatch:test", 3)
val latch =
ListeningCountDownLatch(
Expand All @@ -209,26 +208,7 @@ class ListeningCountDownLatchTest {
assertEquals(CallResult.FAILED, latch.await())

verify(exactly = 1) { backend.checkCount(any()) }
verify(exactly = 5) { backend.listen(any()) }
}

@Test
fun `await in flow throws cancellation exception`() {
val flow = flow<String> { CancellationException("test exception") }
backend.everyListen(flow)
backend.everyCheckCount("countdownlatch:test", 2)
val latch =
ListeningCountDownLatch(
"test",
4,
listOf(backend),
retryDelay = Duration.ofMillis(1),
)

assertEquals(CallResult.FAILED, latch.await())

verify(exactly = 1) { backend.checkCount(any()) }
verify(exactly = 3) { backend.listen(any()) }
coVerify(exactly = 5) { backend.listen(any()) }
}

@ParameterizedTest(name = "current count: {0}")
Expand Down Expand Up @@ -408,7 +388,14 @@ class ListeningCountDownLatchTest {
@Test
fun `all instances are in quorum for count down`() {
instances.forEach { backend ->
backend.everyCount("countdownlatch:test", "countdownlatch:channels:test", 4, 4, Duration.ofMinutes(10), "OK")
backend.everyCount(
"countdownlatch:test",
"countdownlatch:channels:test",
4,
4,
Duration.ofMinutes(10),
"OK",
)
}
val latch =
ListeningCountDownLatch(
Expand All @@ -427,9 +414,30 @@ class ListeningCountDownLatchTest {

@Test
fun `two instances are in quorum for count down`() {
backend1.everyCount("countdownlatch:test", "countdownlatch:channels:test", 4, 4, Duration.ofMinutes(10), "OK")
backend2.everyCount("countdownlatch:test", "countdownlatch:channels:test", 4, 4, Duration.ofMinutes(10), null)
backend3.everyCount("countdownlatch:test", "countdownlatch:channels:test", 4, 4, Duration.ofMinutes(10), "OK")
backend1.everyCount(
"countdownlatch:test",
"countdownlatch:channels:test",
4,
4,
Duration.ofMinutes(10),
"OK",
)
backend2.everyCount(
"countdownlatch:test",
"countdownlatch:channels:test",
4,
4,
Duration.ofMinutes(10),
null,
)
backend3.everyCount(
"countdownlatch:test",
"countdownlatch:channels:test",
4,
4,
Duration.ofMinutes(10),
"OK",
)
val latch =
ListeningCountDownLatch(
"test",
Expand All @@ -447,9 +455,30 @@ class ListeningCountDownLatchTest {

@Test
fun `quorum wasn't reach for count down`() {
backend1.everyCount("countdownlatch:test", "countdownlatch:channels:test", 4, 4, Duration.ofMinutes(10), null)
backend2.everyCount("countdownlatch:test", "countdownlatch:channels:test", 4, 4, Duration.ofMinutes(10), null)
backend3.everyCount("countdownlatch:test", "countdownlatch:channels:test", 4, 4, Duration.ofMinutes(10), "OK")
backend1.everyCount(
"countdownlatch:test",
"countdownlatch:channels:test",
4,
4,
Duration.ofMinutes(10),
null,
)
backend2.everyCount(
"countdownlatch:test",
"countdownlatch:channels:test",
4,
4,
Duration.ofMinutes(10),
null,
)
backend3.everyCount(
"countdownlatch:test",
"countdownlatch:channels:test",
4,
4,
Duration.ofMinutes(10),
"OK",
)
instances.forEach { backend ->
backend.everyUndoCount("countdownlatch:test", 4, 1)
}
Expand All @@ -470,9 +499,8 @@ class ListeningCountDownLatchTest {

@Test
fun `all instances are in quorum for await`() {
val flow = flow { emit("open") }
instances.forEach { backend ->
backend.everyListen(flow)
backend.everyListen("open")
backend.everyCheckCount("countdownlatch:test", 1)
}
val latch =
Expand All @@ -484,26 +512,22 @@ class ListeningCountDownLatchTest {
)
assertEquals(CallResult.SUCCESS, latch.await())

verify(exactly = 1) {
coVerify(atMost = 1) {
instances.forEach { backend -> backend.listen(any()) }
}
verify(exactly = 1) {
instances.forEach { backend -> backend.checkCount(any()) }
}
}

@Test
fun `two instances are in quorum for await`() {
val flow =
flow<String> {
delay(50)
IOException("test exception 2")
}
val okFlow = flow { emit("open") }
instances.forEach { backend ->
backend.everyCheckCount("countdownlatch:test", 1)
}
backend1.everyListen(okFlow)
backend2.everyListen(flow)
backend3.everyListen(okFlow)
backend1.everyListen("open")
backend2.everyListen(null)
backend3.everyListen("open")
val latch =
ListeningCountDownLatch(
"test",
Expand All @@ -513,26 +537,22 @@ class ListeningCountDownLatchTest {
)
assertEquals(CallResult.SUCCESS, latch.await())

verify(exactly = 1) {
coVerify(exactly = 1) {
instances.forEach { backend -> backend.listen(any()) }
}
verify(exactly = 1) {
instances.forEach { backend -> backend.checkCount(any()) }
}
}

@Test
fun `quorum wasn't reach but await succeed`() {
val flow =
flow<String> {
delay(10)
IOException("test exception")
}
val okFlow = flow { emit("open") }
instances.forEach { backend ->
backend.everyCheckCount("countdownlatch:test", 1)
}
backend1.everyListen(flow)
backend2.everyListen(okFlow)
backend3.everyListen(flow)
backend1.everyListen(null)
backend2.everyListen("open")
backend3.everyListen(null)
val latch =
ListeningCountDownLatch(
"test",
Expand All @@ -542,22 +562,19 @@ class ListeningCountDownLatchTest {
)
assertEquals(CallResult.SUCCESS, latch.await())

verify(exactly = 1) {
coVerify(atMost = 1) {
instances.forEach { backend -> backend.listen(any()) }
}
verify(exactly = 1) {
instances.forEach { backend -> backend.checkCount(any()) }
}
}

@Test
fun `all instances are down`() {
val flow =
flow<String> {
delay(10)
IOException("test exception")
}
instances.forEach { backend ->
backend.everyCheckCount("countdownlatch:test", 1)
backend.everyListen(flow)
backend.everyListen(null)
}
val latch =
ListeningCountDownLatch(
Expand All @@ -568,7 +585,7 @@ class ListeningCountDownLatchTest {
)
assertEquals(CallResult.FAILED, latch.await())

verify(exactly = 3) {
coVerify(exactly = 3) {
instances.forEach { backend -> backend.listen(any()) }
}
verify(exactly = 1) {
Expand Down Expand Up @@ -651,7 +668,7 @@ class ListeningCountDownLatchTest {
} returns returnVal
}

private fun CountDownLatchBackend.everyListen(returnVal: Flow<String>) {
private fun CountDownLatchBackend.everyListen(returnVal: String?) {
val backend = this
coEvery {
backend.listen(eq("countdownlatch:channels:test"))
Expand Down
Loading