diff --git a/library/src/androidTest/java/org/xmtp/android/library/DmTest.kt b/library/src/androidTest/java/org/xmtp/android/library/DmTest.kt index 7c2090f7..bdce4588 100644 --- a/library/src/androidTest/java/org/xmtp/android/library/DmTest.kt +++ b/library/src/androidTest/java/org/xmtp/android/library/DmTest.kt @@ -91,8 +91,14 @@ class DmTest { @Test fun testCanCreateADm() { - val dm = runBlocking { - boClient.conversations.findOrCreateDm(alix.walletAddress) + runBlocking { + val convo1 = boClient.conversations.findOrCreateDm(alix.walletAddress) + val convo2 = caroClient.conversations.newConversation(alix.walletAddress) + alixClient.conversations.syncConversations() + val sameConvo1 = alixClient.conversations.findOrCreateDm(bo.walletAddress) + val sameConvo2 = alixClient.conversations.newConversation(caro.walletAddress) + assertEquals(convo1.id, sameConvo1.id) + assertEquals(convo2.id, sameConvo2.id) } } @Test @@ -158,17 +164,6 @@ class DmTest { assertEquals(groups.size, 2) } - @Test - fun testCanListAllConversations() { - runBlocking { - boClient.conversations.newGroup(listOf(alix.walletAddress)) - boClient.conversations.newGroup(listOf(caro.walletAddress)) - boClient.conversations.newConversation(alix.walletAddress) - } - val convos = runBlocking { boClient.conversations.list(includeGroups = true) } - assertEquals(convos.size, 3) - } - @Test fun testCannotCreateDmWithMemberNotOnV3() { val chuxAccount = PrivateKeyBuilder() @@ -328,38 +323,6 @@ class DmTest { job.cancel() } - @Test - fun testCanStreamAllMessages() { - val group = runBlocking { caroClient.conversations.newGroup(listOf(alix.walletAddress)) } - val conversation = - runBlocking { boClient.conversations.newConversation(alix.walletAddress) } - runBlocking { alixClient.conversations.syncGroups() } - - val allMessages = mutableListOf() - - val job = CoroutineScope(Dispatchers.IO).launch { - try { - alixClient.conversations.streamAllMessages(includeGroups = true) - .collect { message -> - allMessages.add(message) - } - } catch (e: Exception) { - } - } - Thread.sleep(2500) - - runBlocking { - group.send("hi") - conversation.send("hi") - } - - Thread.sleep(1000) - - assertEquals(2, allMessages.size) - - job.cancel() - } - @Test fun testCanStreamDecryptedDmMessages() = kotlinx.coroutines.test.runTest { val group = boClient.conversations.newGroup(listOf(alix.walletAddress)) @@ -410,38 +373,6 @@ class DmTest { job.cancel() } - @Test - fun testCanStreamAllDecryptedMessages() { - val group = runBlocking { caroClient.conversations.newGroup(listOf(alix.walletAddress)) } - val conversation = - runBlocking { boClient.conversations.newConversation(alix.walletAddress) } - runBlocking { alixClient.conversations.syncGroups() } - - val allMessages = mutableListOf() - - val job = CoroutineScope(Dispatchers.IO).launch { - try { - alixClient.conversations.streamAllDecryptedMessages(includeGroups = true) - .collect { message -> - allMessages.add(message) - } - } catch (e: Exception) { - } - } - Thread.sleep(2500) - - runBlocking { - group.send("hi") - conversation.send("hi") - } - - Thread.sleep(1000) - - assertEquals(2, allMessages.size) - - job.cancel() - } - @Test fun testCanStreamDms() = kotlinx.coroutines.test.runTest { boClient.conversations.streamGroups().test { @@ -454,34 +385,6 @@ class DmTest { } } - @Test - fun testCanStreamAllConversations() { - val allMessages = mutableListOf() - - val job = CoroutineScope(Dispatchers.IO).launch { - try { - alixClient.conversations.streamAll() - .collect { message -> - allMessages.add(message.topic) - } - } catch (e: Exception) { - } - } - Thread.sleep(2500) - - runBlocking { - alixClient.conversations.newConversation(bo.walletAddress) - Thread.sleep(2500) - caroClient.conversations.newGroup(listOf(alix.walletAddress)) - } - - Thread.sleep(2500) - - assertEquals(2, allMessages.size) - - job.cancel() - } - @Test fun testDmConsent() { runBlocking { @@ -505,37 +408,6 @@ class DmTest { } } - @Test - fun testCanAllowAndDenyInboxId() { - runBlocking { - val boGroup = boClient.conversations.newGroup(listOf(alix.walletAddress)) - assert(!boClient.contacts.isInboxAllowed(alixClient.inboxId)) - assert(!boClient.contacts.isInboxDenied(alixClient.inboxId)) - - boClient.contacts.allowInboxes(listOf(alixClient.inboxId)) - var alixMember = boGroup.members().firstOrNull { it.inboxId == alixClient.inboxId } - assertEquals(alixMember!!.consentState, ConsentState.ALLOWED) - - assert(boClient.contacts.isInboxAllowed(alixClient.inboxId)) - assert(!boClient.contacts.isInboxDenied(alixClient.inboxId)) - - boClient.contacts.denyInboxes(listOf(alixClient.inboxId)) - alixMember = boGroup.members().firstOrNull { it.inboxId == alixClient.inboxId } - assertEquals(alixMember!!.consentState, ConsentState.DENIED) - - assert(!boClient.contacts.isInboxAllowed(alixClient.inboxId)) - assert(boClient.contacts.isInboxDenied(alixClient.inboxId)) - - boClient.contacts.allow(listOf(alixClient.address)) - alixMember = boGroup.members().firstOrNull { it.inboxId == alixClient.inboxId } - assertEquals(alixMember!!.consentState, ConsentState.ALLOWED) - assert(boClient.contacts.isInboxAllowed(alixClient.inboxId)) - assert(!boClient.contacts.isInboxDenied(alixClient.inboxId)) - assert(boClient.contacts.isAllowed(alixClient.address)) - assert(!boClient.contacts.isDenied(alixClient.address)) - } - } - @Test fun testCanFetchDmById() { val boGroup = runBlocking { @@ -551,111 +423,4 @@ class DmTest { assertEquals(alixGroup?.id, boGroup.id) } - - @Test - fun testCanFetchMessageById() { - val boGroup = runBlocking { - boClient.conversations.newGroup( - listOf( - alix.walletAddress, - caro.walletAddress - ) - ) - } - val boMessageId = runBlocking { boGroup.send("Hello") } - runBlocking { alixClient.conversations.syncGroups() } - val alixGroup = alixClient.findGroup(boGroup.id) - runBlocking { alixGroup?.sync() } - val alixMessage = alixClient.findMessage(boMessageId) - - assertEquals(alixMessage?.id, boMessageId) - } - - @Test - fun testUnpublishedMessages() { - val boGroup = runBlocking { - boClient.conversations.newGroup( - listOf( - alix.walletAddress, - caro.walletAddress - ) - ) - } - runBlocking { alixClient.conversations.syncGroups() } - val alixGroup: Group = alixClient.findGroup(boGroup.id)!! - runBlocking { assert(!alixClient.contacts.isGroupAllowed(boGroup.id)) } - val preparedMessageId = runBlocking { alixGroup.prepareMessage("Test text") } - runBlocking { assert(alixClient.contacts.isGroupAllowed(boGroup.id)) } - assertEquals(alixGroup.messages().size, 1) - assertEquals(alixGroup.messages(deliveryStatus = MessageDeliveryStatus.PUBLISHED).size, 0) - assertEquals(alixGroup.messages(deliveryStatus = MessageDeliveryStatus.UNPUBLISHED).size, 1) - - runBlocking { - alixGroup.publishMessages() - alixGroup.sync() - } - - assertEquals(alixGroup.messages(deliveryStatus = MessageDeliveryStatus.PUBLISHED).size, 1) - assertEquals(alixGroup.messages(deliveryStatus = MessageDeliveryStatus.UNPUBLISHED).size, 0) - assertEquals(alixGroup.messages().size, 1) - - val message = alixGroup.messages().first() - - assertEquals(preparedMessageId, message.id) - } - - @Test - fun testSyncsAllDmsInParallel() { - val boGroup = runBlocking { - boClient.conversations.newGroup( - listOf( - alix.walletAddress, - ) - ) - } - val boGroup2 = runBlocking { - boClient.conversations.newGroup( - listOf( - alix.walletAddress, - ) - ) - } - runBlocking { alixClient.conversations.syncGroups() } - val alixGroup: Group = alixClient.findGroup(boGroup.id)!! - val alixGroup2: Group = alixClient.findGroup(boGroup2.id)!! - var numGroups: UInt? - - assertEquals(alixGroup.messages().size, 0) - assertEquals(alixGroup2.messages().size, 0) - - runBlocking { - boGroup.send("hi") - boGroup2.send("hi") - numGroups = alixClient.conversations.syncAllGroups() - } - - assertEquals(alixGroup.messages().size, 1) - assertEquals(alixGroup2.messages().size, 1) - assertEquals(numGroups, 2u) - - runBlocking { - boGroup2.removeMembers(listOf(alix.walletAddress)) - boGroup.send("hi") - boGroup.send("hi") - boGroup2.send("hi") - boGroup2.send("hi") - numGroups = alixClient.conversations.syncAllGroups() - } - - assertEquals(alixGroup.messages().size, 3) - assertEquals(alixGroup2.messages().size, 2) - // First syncAllGroups after remove includes the group you're removed from - assertEquals(numGroups, 2u) - - runBlocking { - numGroups = alixClient.conversations.syncAllGroups() - } - // Next syncAllGroups will not include the inactive group - assertEquals(numGroups, 1u) - } } diff --git a/library/src/androidTest/java/org/xmtp/android/library/V3ClientTest.kt b/library/src/androidTest/java/org/xmtp/android/library/V3ClientTest.kt index be31c2c2..d49072db 100644 --- a/library/src/androidTest/java/org/xmtp/android/library/V3ClientTest.kt +++ b/library/src/androidTest/java/org/xmtp/android/library/V3ClientTest.kt @@ -281,9 +281,9 @@ class V3ClientTest { Thread.sleep(1000) runBlocking { - caroV2V3Client.conversations.newConversation(boV3.walletAddress) - Thread.sleep(1000) caroV2V3Client.conversations.newGroup(listOf(boV3.walletAddress)) + Thread.sleep(1000) + caroV2V3Client.conversations.newConversation(boV3.walletAddress) } Thread.sleep(2000) diff --git a/library/src/main/java/org/xmtp/android/library/Conversations.kt b/library/src/main/java/org/xmtp/android/library/Conversations.kt index 551adbcd..dec40716 100644 --- a/library/src/main/java/org/xmtp/android/library/Conversations.kt +++ b/library/src/main/java/org/xmtp/android/library/Conversations.kt @@ -546,15 +546,34 @@ data class Conversations( ) awaitClose { launch { stream.end() } } - } else if (client.v3Client != null) { + } + if (client.v3Client != null) { streamDmConversations() } } fun streamAll(): Flow { + if (!client.hasV2Client) { + streamConversations() + } return merge(streamGroupConversations(), stream()) } + private fun streamConversations(): Flow = callbackFlow { + val conversationCallback = object : FfiConversationCallback { + override fun onConversation(conversation: FfiConversation) { + if(conversation.groupMetadata().conversationType() == "dm") { + trySend(Conversation.Dm(Dm(client, conversation))) + } else { + trySend(Conversation.Group(Group(client, conversation))) + } + } + } + val stream = libXMTPConversations?.stream(conversationCallback) + ?: throw XMTPException("Client does not support Groups") + awaitClose { stream.end() } + } + fun streamGroups(): Flow = callbackFlow { val groupCallback = object : FfiConversationCallback { override fun onConversation(conversation: FfiConversation) { @@ -687,12 +706,12 @@ data class Conversations( } private fun streamDmConversations(): Flow = callbackFlow { - val groupCallback = object : FfiConversationCallback { + val dmCallback = object : FfiConversationCallback { override fun onConversation(conversation: FfiConversation) { trySend(Conversation.Dm(Dm(client, conversation))) } } - val stream = libXMTPConversations?.streamDms(groupCallback) + val stream = libXMTPConversations?.streamDms(dmCallback) ?: throw XMTPException("Client does not support V3") awaitClose { stream.end() } }