Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(send queue): refactor in preparation for media local echoes #4176

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion bindings/matrix-sdk-ffi/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -478,7 +478,7 @@ impl Client {
Arc::new(TaskHandle::new(RUNTIME.spawn(async move {
// Respawn tasks for rooms that had unsent events. At this point we've just
// created the subscriber, so it'll be notified about errors.
q.respawn_tasks_for_rooms_with_unsent_events().await;
q.respawn_tasks_for_rooms_with_unsent_requests().await;

loop {
match subscriber.recv().await {
Expand Down
85 changes: 41 additions & 44 deletions crates/matrix-sdk-base/src/store/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,10 @@ use ruma::{
};
use serde_json::{json, value::Value as JsonValue};

use super::{DependentQueuedEventKind, DynStateStore, ServerCapabilities};
use super::{DependentQueuedRequestKind, DynStateStore, ServerCapabilities};
use crate::{
deserialized_responses::MemberEvent,
store::{
traits::ChildTransactionId, QueueWedgeError, Result, SerializableEventContent,
StateStoreExt,
},
store::{ChildTransactionId, QueueWedgeError, Result, SerializableEventContent, StateStoreExt},
RoomInfo, RoomMemberships, RoomState, StateChanges, StateStoreDataKey, StateStoreDataValue,
};

Expand Down Expand Up @@ -1205,24 +1202,24 @@ impl StateStoreIntegrationTests for DynStateStore {
let room_id = room_id!("!test_send_queue:localhost");

// No queued event in store at first.
let events = self.load_send_queue_events(room_id).await.unwrap();
let events = self.load_send_queue_requests(room_id).await.unwrap();
assert!(events.is_empty());

// Saving one thing should work.
let txn0 = TransactionId::new();
let event0 =
SerializableEventContent::new(&RoomMessageEventContent::text_plain("msg0").into())
.unwrap();
self.save_send_queue_event(room_id, txn0.clone(), event0).await.unwrap();
self.save_send_queue_request(room_id, txn0.clone(), event0).await.unwrap();

// Reading it will work.
let pending = self.load_send_queue_events(room_id).await.unwrap();
let pending = self.load_send_queue_requests(room_id).await.unwrap();

assert_eq!(pending.len(), 1);
{
assert_eq!(pending[0].transaction_id, txn0);

let deserialized = pending[0].event.deserialize().unwrap();
let deserialized = pending[0].as_event().unwrap().deserialize().unwrap();
assert_let!(AnyMessageLikeEventContent::RoomMessage(content) = deserialized);
assert_eq!(content.body(), "msg0");

Expand All @@ -1237,27 +1234,27 @@ impl StateStoreIntegrationTests for DynStateStore {
)
.unwrap();

self.save_send_queue_event(room_id, txn, event).await.unwrap();
self.save_send_queue_request(room_id, txn, event).await.unwrap();
}

// Reading all the events should work.
let pending = self.load_send_queue_events(room_id).await.unwrap();
let pending = self.load_send_queue_requests(room_id).await.unwrap();

// All the events should be retrieved, in the same order.
assert_eq!(pending.len(), 4);

assert_eq!(pending[0].transaction_id, txn0);

for i in 0..4 {
let deserialized = pending[i].event.deserialize().unwrap();
let deserialized = pending[i].as_event().unwrap().deserialize().unwrap();
assert_let!(AnyMessageLikeEventContent::RoomMessage(content) = deserialized);
assert_eq!(content.body(), format!("msg{i}"));
assert!(!pending[i].is_wedged());
}

// Marking an event as wedged works.
let txn2 = &pending[2].transaction_id;
self.update_send_queue_event_status(
self.update_send_queue_request_status(
room_id,
txn2,
Some(QueueWedgeError::GenericApiError { msg: "Oops".to_owned() }),
Expand All @@ -1266,7 +1263,7 @@ impl StateStoreIntegrationTests for DynStateStore {
.unwrap();

// And it is reflected.
let pending = self.load_send_queue_events(room_id).await.unwrap();
let pending = self.load_send_queue_requests(room_id).await.unwrap();

// All the events should be retrieved, in the same order.
assert_eq!(pending.len(), 4);
Expand All @@ -1287,24 +1284,24 @@ impl StateStoreIntegrationTests for DynStateStore {
&RoomMessageEventContent::text_plain("wow that's a cool test").into(),
)
.unwrap();
self.update_send_queue_event(room_id, txn2, event0).await.unwrap();
self.update_send_queue_request(room_id, txn2, event0).await.unwrap();

// And it is reflected.
let pending = self.load_send_queue_events(room_id).await.unwrap();
let pending = self.load_send_queue_requests(room_id).await.unwrap();

assert_eq!(pending.len(), 4);
{
assert_eq!(pending[2].transaction_id, *txn2);

let deserialized = pending[2].event.deserialize().unwrap();
let deserialized = pending[2].as_event().unwrap().deserialize().unwrap();
assert_let!(AnyMessageLikeEventContent::RoomMessage(content) = deserialized);
assert_eq!(content.body(), "wow that's a cool test");

assert!(!pending[2].is_wedged());

for i in 0..4 {
if i != 2 {
let deserialized = pending[i].event.deserialize().unwrap();
let deserialized = pending[i].as_event().unwrap().deserialize().unwrap();
assert_let!(AnyMessageLikeEventContent::RoomMessage(content) = deserialized);
assert_eq!(content.body(), format!("msg{i}"));

Expand All @@ -1314,10 +1311,10 @@ impl StateStoreIntegrationTests for DynStateStore {
}

// Removing an event works.
self.remove_send_queue_event(room_id, &txn0).await.unwrap();
self.remove_send_queue_request(room_id, &txn0).await.unwrap();

// And it is reflected.
let pending = self.load_send_queue_events(room_id).await.unwrap();
let pending = self.load_send_queue_requests(room_id).await.unwrap();

assert_eq!(pending.len(), 3);
assert_eq!(pending[1].transaction_id, *txn2);
Expand All @@ -1335,7 +1332,7 @@ impl StateStoreIntegrationTests for DynStateStore {
let event =
SerializableEventContent::new(&RoomMessageEventContent::text_plain("room2").into())
.unwrap();
self.save_send_queue_event(room_id2, txn.clone(), event).await.unwrap();
self.save_send_queue_request(room_id2, txn.clone(), event).await.unwrap();
}

// Add and remove one event for room3.
Expand All @@ -1345,14 +1342,14 @@ impl StateStoreIntegrationTests for DynStateStore {
let event =
SerializableEventContent::new(&RoomMessageEventContent::text_plain("room3").into())
.unwrap();
self.save_send_queue_event(room_id3, txn.clone(), event).await.unwrap();
self.save_send_queue_request(room_id3, txn.clone(), event).await.unwrap();

self.remove_send_queue_event(room_id3, &txn).await.unwrap();
self.remove_send_queue_request(room_id3, &txn).await.unwrap();
}

// Query all the rooms which have unsent events. Per the previous steps,
// it should be room1 and room2, not room3.
let outstanding_rooms = self.load_rooms_with_unsent_events().await.unwrap();
let outstanding_rooms = self.load_rooms_with_unsent_requests().await.unwrap();
assert_eq!(outstanding_rooms.len(), 2);
assert!(outstanding_rooms.iter().any(|room| room == room_id));
assert!(outstanding_rooms.iter().any(|room| room == room_id2));
Expand All @@ -1366,77 +1363,77 @@ impl StateStoreIntegrationTests for DynStateStore {
let event0 =
SerializableEventContent::new(&RoomMessageEventContent::text_plain("hey").into())
.unwrap();
self.save_send_queue_event(room_id, txn0.clone(), event0).await.unwrap();
self.save_send_queue_request(room_id, txn0.clone(), event0).await.unwrap();

// No dependents, to start with.
assert!(self.list_dependent_send_queue_events(room_id).await.unwrap().is_empty());
assert!(self.load_dependent_queued_requests(room_id).await.unwrap().is_empty());

// Save a redaction for that event.
let child_txn = ChildTransactionId::new();
self.save_dependent_send_queue_event(
self.save_dependent_queued_request(
room_id,
&txn0,
child_txn.clone(),
DependentQueuedEventKind::Redact,
DependentQueuedRequestKind::RedactEvent,
)
.await
.unwrap();

// It worked.
let dependents = self.list_dependent_send_queue_events(room_id).await.unwrap();
let dependents = self.load_dependent_queued_requests(room_id).await.unwrap();
assert_eq!(dependents.len(), 1);
assert_eq!(dependents[0].parent_transaction_id, txn0);
assert_eq!(dependents[0].own_transaction_id, child_txn);
assert!(dependents[0].event_id.is_none());
assert_matches!(dependents[0].kind, DependentQueuedEventKind::Redact);
assert_matches!(dependents[0].kind, DependentQueuedRequestKind::RedactEvent);

// Update the event id.
let event_id = owned_event_id!("$1");
let num_updated =
self.update_dependent_send_queue_event(room_id, &txn0, event_id.clone()).await.unwrap();
self.update_dependent_queued_request(room_id, &txn0, event_id.clone()).await.unwrap();
assert_eq!(num_updated, 1);

// It worked.
let dependents = self.list_dependent_send_queue_events(room_id).await.unwrap();
let dependents = self.load_dependent_queued_requests(room_id).await.unwrap();
assert_eq!(dependents.len(), 1);
assert_eq!(dependents[0].parent_transaction_id, txn0);
assert_eq!(dependents[0].own_transaction_id, child_txn);
assert_eq!(dependents[0].event_id.as_ref(), Some(&event_id));
assert_matches!(dependents[0].kind, DependentQueuedEventKind::Redact);
assert_matches!(dependents[0].kind, DependentQueuedRequestKind::RedactEvent);

// Now remove it.
let removed = self
.remove_dependent_send_queue_event(room_id, &dependents[0].own_transaction_id)
.remove_dependent_queued_request(room_id, &dependents[0].own_transaction_id)
.await
.unwrap();
assert!(removed);

// It worked.
assert!(self.list_dependent_send_queue_events(room_id).await.unwrap().is_empty());
assert!(self.load_dependent_queued_requests(room_id).await.unwrap().is_empty());

// Now, inserting a dependent event and removing the original send queue event
// will NOT remove the dependent event.
let txn1 = TransactionId::new();
let event1 =
SerializableEventContent::new(&RoomMessageEventContent::text_plain("hey2").into())
.unwrap();
self.save_send_queue_event(room_id, txn1.clone(), event1).await.unwrap();
self.save_send_queue_request(room_id, txn1.clone(), event1).await.unwrap();

self.save_dependent_send_queue_event(
self.save_dependent_queued_request(
room_id,
&txn0,
ChildTransactionId::new(),
DependentQueuedEventKind::Redact,
DependentQueuedRequestKind::RedactEvent,
)
.await
.unwrap();
assert_eq!(self.list_dependent_send_queue_events(room_id).await.unwrap().len(), 1);
assert_eq!(self.load_dependent_queued_requests(room_id).await.unwrap().len(), 1);

self.save_dependent_send_queue_event(
self.save_dependent_queued_request(
room_id,
&txn1,
ChildTransactionId::new(),
DependentQueuedEventKind::Edit {
DependentQueuedRequestKind::EditEvent {
new_content: SerializableEventContent::new(
&RoomMessageEventContent::text_plain("edit").into(),
)
Expand All @@ -1445,14 +1442,14 @@ impl StateStoreIntegrationTests for DynStateStore {
)
.await
.unwrap();
assert_eq!(self.list_dependent_send_queue_events(room_id).await.unwrap().len(), 2);
assert_eq!(self.load_dependent_queued_requests(room_id).await.unwrap().len(), 2);

// Remove event0 / txn0.
let removed = self.remove_send_queue_event(room_id, &txn0).await.unwrap();
let removed = self.remove_send_queue_request(room_id, &txn0).await.unwrap();
assert!(removed);

// This has removed none of the dependent events.
let dependents = self.list_dependent_send_queue_events(room_id).await.unwrap();
let dependents = self.load_dependent_queued_requests(room_id).await.unwrap();
assert_eq!(dependents.len(), 2);
}
}
Expand Down
Loading
Loading