Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add fn to await a synced room from Client #3979

Merged
merged 5 commits into from
Sep 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions bindings/matrix-sdk-ffi/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1002,6 +1002,16 @@ impl Client {

Ok(RoomPreview::from_sdk(sdk_room_preview))
}

/// Waits until an at least partially synced room is received, and returns
/// it.
///
/// **Note: this function will loop endlessly until either it finds the room
/// or an externally set timeout happens.**
Comment on lines +1009 to +1010
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i'd do a slightly different API at the FFI level: add an Option<Duration> as a parameter, and add a timeout here, so the consumer doesn't have to. Does it make sense? (You get to decide if it's a good idea, based on your usage of the API in the app :))

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I asked @stefanceriu and we decided to have the client implement the timeout.

pub async fn await_room_remote_echo(&self, room_id: String) -> Result<Arc<Room>, ClientError> {
let room_id = RoomId::parse(room_id)?;
Ok(Arc::new(Room::new(self.inner.await_room_remote_echo(&room_id).await)))
}
}

#[uniffi::export(callback_interface)]
Expand Down
7 changes: 7 additions & 0 deletions crates/matrix-sdk-base/src/rooms/normal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,13 @@ impl Room {
self.inner.read().sync_info == SyncInfo::FullySynced
}

/// Check if the room state has been at least partially synced.
///
/// See [`Room::is_state_fully_synced`] for more info.
pub fn is_state_partially_or_fully_synced(&self) -> bool {
self.inner.read().sync_info != SyncInfo::NoState
}

/// Check if the room has its encryption event synced.
///
/// The encryption event can be missing when the room hasn't appeared in
Expand Down
38 changes: 22 additions & 16 deletions crates/matrix-sdk-ui/src/room_list_service/room.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use std::{ops::Deref, sync::Arc};
use async_once_cell::OnceCell as AsyncOnceCell;
use matrix_sdk::SlidingSync;
use ruma::RoomId;
use tracing::info;

use super::Error;
use crate::{
Expand Down Expand Up @@ -150,27 +151,32 @@ impl Room {
}

/// Create a new [`TimelineBuilder`] with the default configuration.
///
/// If the room was synced before some initial events will be added to the
/// [`TimelineBuilder`].
bnjbvr marked this conversation as resolved.
Show resolved Hide resolved
pub async fn default_room_timeline_builder(&self) -> Result<TimelineBuilder, Error> {
// TODO we can remove this once the event cache handles his own cache.

let sliding_sync_room =
let sliding_sync_room = self.inner.sliding_sync.get_room(self.inner.room.room_id()).await;
bnjbvr marked this conversation as resolved.
Show resolved Hide resolved

if let Some(sliding_sync_room) = sliding_sync_room {
bnjbvr marked this conversation as resolved.
Show resolved Hide resolved
self.inner
.sliding_sync
.get_room(self.inner.room.room_id())
.room
.client()
.event_cache()
.add_initial_events(
self.inner.room.room_id(),
sliding_sync_room.timeline_queue().iter().cloned().collect(),
sliding_sync_room.prev_batch(),
)
.await
.ok_or_else(|| Error::RoomNotFound(self.inner.room.room_id().to_owned()))?;

self.inner
.room
.client()
.event_cache()
.add_initial_events(
self.inner.room.room_id(),
sliding_sync_room.timeline_queue().iter().cloned().collect(),
sliding_sync_room.prev_batch(),
)
.await
.map_err(Error::EventCache)?;
.map_err(Error::EventCache)?;
} else {
info!(
"No cached sliding sync room found for `{}`, the timeline will be empty.",
self.room_id()
);
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add some info! statement in the else case, to indicate that we didn't have a timeline cache for this room? That will help debunk issues when people are in offline mode.


Ok(Timeline::builder(&self.inner.room).track_read_marker_and_receipts())
}
Expand Down
2 changes: 1 addition & 1 deletion crates/matrix-sdk-ui/src/timeline/controller/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -652,7 +652,7 @@ impl<P: RoomDataProvider> TimelineController<P> {
// now we may want to replace a populated timeline with an empty one.
if !state.items.is_empty() || !events.is_empty() {
state
.replace_with_remove_events(
.replace_with_remote_events(
events,
TimelineEnd::Back,
origin,
Expand Down
2 changes: 1 addition & 1 deletion crates/matrix-sdk-ui/src/timeline/controller/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ impl TimelineState {
/// Note: when the `position` is [`TimelineEnd::Front`], prepended events
/// should be ordered in *reverse* topological order, that is, `events[0]`
/// is the most recent.
pub(super) async fn replace_with_remove_events<P: RoomDataProvider>(
pub(super) async fn replace_with_remote_events<P: RoomDataProvider>(
&mut self,
events: Vec<SyncTimelineEvent>,
position: TimelineEnd,
Expand Down
33 changes: 32 additions & 1 deletion crates/matrix-sdk-ui/tests/integration/room_list_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,18 @@ use matrix_sdk_ui::{
RoomListService,
};
use ruma::{
api::client::room::create_room::v3::Request as CreateRoomRequest,
assign, event_id,
events::{room::message::RoomMessageEventContent, StateEventType},
mxc_uri, room_id, uint,
};
use serde_json::json;
use stream_assert::{assert_next_matches, assert_pending};
use tokio::{spawn, sync::mpsc::channel, task::yield_now};
use wiremock::MockServer;
use wiremock::{
matchers::{header, method, path},
Mock, MockServer, ResponseTemplate,
};

use crate::timeline::sliding_sync::{assert_timeline_stream, timeline_event};

Expand Down Expand Up @@ -2404,6 +2408,33 @@ async fn test_room_timeline() -> Result<(), Error> {
Ok(())
}

#[async_test]
async fn test_room_empty_timeline() {
let (client, server, room_list) = new_room_list_service().await.unwrap();
mock_encryption_state(&server, false).await;

Mock::given(method("POST"))
.and(path("_matrix/client/r0/createRoom"))
.and(header("authorization", "Bearer 1234"))
.respond_with(
ResponseTemplate::new(200).set_body_json(json!({ "room_id": "!example:localhost"})),
)
.mount(&server)
.await;

let room = client.create_room(CreateRoomRequest::default()).await.unwrap();
let room_id = room.room_id().to_owned();

// The room wasn't synced, but it will be available
let room = room_list.room(&room_id).unwrap();
let timeline = room.default_room_timeline_builder().await.unwrap().build().await.unwrap();
let (prev_items, _) = timeline.subscribe().await;

// However, since the room wasn't synced its timeline won't have any initial
// items
assert!(prev_items.is_empty());
}

#[async_test]
async fn test_room_latest_event() -> Result<(), Error> {
let (_, server, room_list) = new_room_list_service().await?;
Expand Down
148 changes: 143 additions & 5 deletions crates/matrix-sdk/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2240,6 +2240,26 @@ impl Client {
// SAFETY: always initialized in the `Client` ctor.
self.inner.event_cache.get().unwrap()
}

/// Waits until an at least partially synced room is received, and returns
/// it.
///
/// **Note: this function will loop endlessly until either it finds the room
/// or an externally set timeout happens.**
pub async fn await_room_remote_echo(&self, room_id: &RoomId) -> Room {
loop {
if let Some(room) = self.get_room(room_id) {
if room.is_state_partially_or_fully_synced() {
debug!("Found just created room!");
return room;
}
debug!("Room wasn't partially synced, waiting for sync beat to try again");
} else {
debug!("Room wasn't found, waiting for sync beat to try again");
}
self.inner.sync_beat.listen().await;
}
}
}

/// A weak reference to the inner client, useful when trying to get a handle
Expand Down Expand Up @@ -2288,6 +2308,7 @@ pub(crate) mod tests {
use std::{sync::Arc, time::Duration};

use assert_matches::assert_matches;
use futures_util::FutureExt;
use matrix_sdk_base::{
store::{MemoryStore, StoreConfig},
RoomState,
Expand All @@ -2300,12 +2321,19 @@ pub(crate) mod tests {
wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser);

use ruma::{
api::MatrixVersion, events::ignored_user_list::IgnoredUserListEventContent, owned_room_id,
room_id, RoomId, ServerName, UserId,
api::{client::room::create_room::v3::Request as CreateRoomRequest, MatrixVersion},
assign,
events::ignored_user_list::IgnoredUserListEventContent,
owned_room_id, room_id, RoomId, ServerName, UserId,
};
use serde_json::json;
use tokio::{
spawn,
time::{sleep, timeout},
};
use url::Url;
use wiremock::{
matchers::{body_json, header, method, path},
matchers::{body_json, header, method, path, query_param_is_missing},
Mock, MockServer, ResponseTemplate,
};

Expand All @@ -2315,6 +2343,7 @@ pub(crate) mod tests {
config::{RequestConfig, SyncSettings},
test_utils::{
logged_in_client, no_retry_test_client, set_client_session, test_client_builder,
test_client_builder_with_server,
},
Error,
};
Expand Down Expand Up @@ -2642,7 +2671,7 @@ pub(crate) mod tests {
let client = logged_in_client(None).await;

// Wait for the init tasks to die.
tokio::time::sleep(Duration::from_secs(1)).await;
sleep(Duration::from_secs(1)).await;

let weak_client = WeakClient::from_client(&client);
assert_eq!(weak_client.strong_count(), 1);
Expand All @@ -2665,7 +2694,7 @@ pub(crate) mod tests {
drop(client);

// Give a bit of time for background tasks to die.
tokio::time::sleep(Duration::from_secs(1)).await;
sleep(Duration::from_secs(1)).await;

// The weak client must be the last reference to the client now.
assert_eq!(weak_client.strong_count(), 0);
Expand Down Expand Up @@ -2773,4 +2802,113 @@ pub(crate) mod tests {
// network error.
client.whoami().await.unwrap_err();
}

#[async_test]
async fn test_await_room_remote_echo_returns_the_room_if_it_was_already_synced() {
let (client_builder, server) = test_client_builder_with_server().await;
let client = client_builder.request_config(RequestConfig::new()).build().await.unwrap();
set_client_session(&client).await;

let builder = Mock::given(method("GET"))
.and(path("/_matrix/client/r0/sync"))
.and(header("authorization", "Bearer 1234"))
.and(query_param_is_missing("since"));

let room_id = room_id!("!room:example.org");
let joined_room_builder = JoinedRoomBuilder::new(room_id);
let mut sync_response_builder = SyncResponseBuilder::new();
sync_response_builder.add_joined_room(joined_room_builder);
let response_body = sync_response_builder.build_json_sync_response();

builder
.respond_with(ResponseTemplate::new(200).set_body_json(response_body))
.mount(&server)
.await;

client.sync_once(SyncSettings::default()).await.unwrap();

let room = client.await_room_remote_echo(room_id).now_or_never().unwrap();
assert_eq!(room.room_id(), room_id);
}

#[async_test]
async fn test_await_room_remote_echo_returns_the_room_when_it_is_ready() {
let (client_builder, server) = test_client_builder_with_server().await;
let client = client_builder.request_config(RequestConfig::new()).build().await.unwrap();
set_client_session(&client).await;

let builder = Mock::given(method("GET"))
.and(path("/_matrix/client/r0/sync"))
.and(header("authorization", "Bearer 1234"))
.and(query_param_is_missing("since"));

let room_id = room_id!("!room:example.org");
let joined_room_builder = JoinedRoomBuilder::new(room_id);
let mut sync_response_builder = SyncResponseBuilder::new();
sync_response_builder.add_joined_room(joined_room_builder);
let response_body = sync_response_builder.build_json_sync_response();

builder
.respond_with(ResponseTemplate::new(200).set_body_json(response_body))
.mount(&server)
.await;

let client = Arc::new(client);

// Perform the /sync request with a delay so it starts after the
// `await_room_remote_echo` call has happened
spawn({
let client = client.clone();
async move {
sleep(Duration::from_millis(100)).await;
client.sync_once(SyncSettings::default()).await.unwrap();
}
});

let room =
timeout(Duration::from_secs(1), client.await_room_remote_echo(room_id)).await.unwrap();
assert_eq!(room.room_id(), room_id);
}

#[async_test]
async fn test_await_room_remote_echo_will_timeout_if_no_room_is_found() {
let (client_builder, _) = test_client_builder_with_server().await;
let client = client_builder.request_config(RequestConfig::new()).build().await.unwrap();
set_client_session(&client).await;

let room_id = room_id!("!room:example.org");
// Room is not present so the client won't be able to find it. The call will
// timeout.
timeout(Duration::from_secs(1), client.await_room_remote_echo(room_id)).await.unwrap_err();
}

#[async_test]
async fn test_await_room_remote_echo_will_timeout_if_room_is_found_but_not_synced() {
let (client_builder, server) = test_client_builder_with_server().await;
let client = client_builder.request_config(RequestConfig::new()).build().await.unwrap();
set_client_session(&client).await;

Mock::given(method("POST"))
.and(path("_matrix/client/r0/createRoom"))
.and(header("authorization", "Bearer 1234"))
.respond_with(
ResponseTemplate::new(200).set_body_json(json!({ "room_id": "!room:example.org"})),
)
.mount(&server)
.await;

// Create a room in the internal store
let room = client
.create_room(assign!(CreateRoomRequest::new(), {
invite: vec![],
is_direct: false,
}))
.await
.unwrap();

// Room is locally present, but not synced, the call will timeout
timeout(Duration::from_secs(1), client.await_room_remote_echo(room.room_id()))
.await
.unwrap_err();
}
}
Loading