Skip to content

Commit

Permalink
Refactor message history sync, add to CLI (#930)
Browse files Browse the repository at this point in the history
* fix: stubs support for SyncHistory

* fix: send the history_request

* fix: stubs handling the reply as well

* fix: amend request-history-sync command

* wip on the correct placement for the streams

* fix: handle stream out in cli

* fix: stream sync groups

* fix: cleanup wip notes

* fix: cleanup command

* fix: formatting

* fix: remove redundant commented fn call

* fix: remove redundant log

* fix: removes unused fn

* Update Cargo.lock

* Change how message history messages are stored

* Add helper to get sync group

* Only create sync group if not present

* Refactor PIN verification

* Add checks before sending history requests

* Add checks before sending history reply

* Update test_send_history_request test

* Apply patch

* Remove sync groups from stream_all_messages

* Replace bincode with serde_json

* Refactor history message checks

* Update send history reply test

* Add new_request_id test

* Remove bundle_hash and signing_key

* Simplify prepare groups

* Refactor error handling

* Add message history processing

* Make message_history public

* Remove redundant history URL check

* Add official message history URLs

* Use and require history_sync_url

* Fix FFI bindings error type

* Update CLI client for message history commands

* Remove unused fn

* Return group without assignment

* Update CLI README

* Update var name

* Remove history message processing during sync

* Add logging and more message processing

* Add more message history CLI commands

* Update README

* Update history bundle download path

* Fix test

* Prevent processing reply from self

* Add temp file cleanup

* Remove ring dependency

* Only return MlsGroup from get_sync_group

* allow_history_sync => enable_history_sync

* Add serde_json errors, remove map_err

* Expose serde_json errors

* Refactor match

* Update CLI

* Use chained calls

* Fix clippy

* Use log::error

* Add get_pending_history_request test

* Add get_latest_history_reply test

* Add reply_to_history_request test

---------

Co-authored-by: Ry Racherbaumer <ry@xmtp.com>
  • Loading branch information
tuddman and rygine authored Aug 28, 2024
1 parent 0a3787f commit 6acf381
Show file tree
Hide file tree
Showing 13 changed files with 698 additions and 302 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion bindings_ffi/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 4 additions & 1 deletion bindings_ffi/src/mls.rs
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,10 @@ impl FfiXmtpClient {
}

pub async fn request_history_sync(&self) -> Result<(), GenericError> {
self.inner_client.send_history_request().await?;
self.inner_client
.send_history_request()
.await
.map_err(GenericError::from_error)?;
Ok(())
}

Expand Down
1 change: 0 additions & 1 deletion bindings_node/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions examples/cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ serde_json.workspace = true
thiserror.workspace = true
timeago = "0.4.1"
tokio = "1.28.1"
tokio-stream = "0.1.15"
url = "2.3.1"
xmtp_api_grpc = { path = "../../xmtp_api_grpc" }
xmtp_cryptography = { path = "../../xmtp_cryptography" }
Expand Down
24 changes: 24 additions & 0 deletions examples/cli/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,30 @@ Use the CLI to send a [double ratchet message](https://github.com/xmtp/libxmtp/b
./xli.sh --db user2.db3 list-group-messages $GROUP_ID
```
9. Request a message history sync
```bash
./xli.sh --db user1.db3 request-history-sync
```
10. Reply to the history sync request
```bash
./xli.sh --db user1.db3 reply-to-history-sync-request
```
11. Process the history sync reply
```bash
./xli.sh --db user1.db3 process-history-sync-reply
```
12. List the history sync messages
```bash
./xli.sh --db user1.db3 list-history-sync-messages
```
If you want to run the CLI against localhost, go to the root directory and run `dev/up` to start a local server. Then run the CLI commands using the `--local` flag.
## Structured logging
Expand Down
99 changes: 87 additions & 12 deletions examples/cli/cli-client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,16 @@ extern crate ethers;
extern crate log;
extern crate xmtp_mls;

use std::iter::Iterator;
use std::{fs, path::PathBuf, time::Duration};

use clap::{Parser, Subcommand, ValueEnum};
use ethers::signers::{coins_bip39::English, LocalWallet, MnemonicBuilder};
use kv_log_macro::{error, info};
use prost::Message;
use xmtp_id::associations::RecoverableEcdsaSignature;
use xmtp_mls::groups::message_history::MessageHistoryContent;
use xmtp_mls::storage::group_message::GroupMessageKind;

use crate::{
json_logger::make_value,
Expand All @@ -34,7 +37,7 @@ use xmtp_mls::{
builder::ClientBuilderError,
client::ClientError,
codecs::{text::TextCodec, ContentCodec},
groups::{GroupMetadataOptions, MlsGroup},
groups::{message_history::MessageHistoryUrls, GroupMetadataOptions, MlsGroup},
identity::IdentityStrategy,
storage::{
group_message::StoredGroupMessage, EncryptedMessageStore, EncryptionKey, StorageError,
Expand Down Expand Up @@ -108,6 +111,10 @@ enum Commands {
#[clap(short, long, value_parser, num_args = 1.., value_delimiter = ' ')]
account_addresses: Vec<String>,
},
RequestHistorySync {},
ReplyToHistorySyncRequest {},
ProcessHistorySyncReply {},
ListHistorySyncMessages {},
/// Information about the account that owns the DB
Info {},
Clear {},
Expand Down Expand Up @@ -333,6 +340,68 @@ async fn main() {
let serializable: SerializableGroup = group.into();
info!("Group {}", group_id, { command_output: true, group_id: group_id, group_info: make_value(&serializable) })
}
Commands::RequestHistorySync {} => {
let client = create_client(&cli, IdentityStrategy::CachedOnly)
.await
.unwrap();
client.sync_welcomes().await.unwrap();
client.enable_history_sync().await.unwrap();
let (group_id, _) = client.send_history_request().await.unwrap();
let group_id_str = hex::encode(group_id);
info!("Sent history sync request in sync group {group_id_str}", { group_id: group_id_str})
}
Commands::ReplyToHistorySyncRequest {} => {
let client = create_client(&cli, IdentityStrategy::CachedOnly)
.await
.unwrap();
let group = client.get_sync_group().unwrap();
let group_id_str = hex::encode(group.group_id);
let reply = client.reply_to_history_request().await.unwrap();

info!("Sent history sync reply in sync group {group_id_str}", { group_id: group_id_str});
info!("Reply: {:?}", reply);
}
Commands::ProcessHistorySyncReply {} => {
let client = create_client(&cli, IdentityStrategy::CachedOnly)
.await
.unwrap();
client.sync_welcomes().await.unwrap();
client.enable_history_sync().await.unwrap();
client.process_history_reply().await.unwrap();

info!("History bundle downloaded and inserted into user DB", {})
}
Commands::ListHistorySyncMessages {} => {
let client = create_client(&cli, IdentityStrategy::CachedOnly)
.await
.unwrap();
client.sync_welcomes().await.unwrap();
client.enable_history_sync().await.unwrap();
let group = client.get_sync_group().unwrap();
let group_id_str = hex::encode(group.group_id.clone());
group.sync(&client).await.unwrap();
let messages = group
.find_messages(Some(GroupMessageKind::Application), None, None, None, None)
.unwrap();
info!("Listing history sync messages", { group_id: group_id_str, messages: messages.len()});
for message in messages {
let message_history_content = serde_json::from_slice::<MessageHistoryContent>(
&message.decrypted_message_bytes,
);

match message_history_content {
Ok(MessageHistoryContent::Request(ref request)) => {
info!("Request: {:?}", request);
}
Ok(MessageHistoryContent::Reply(ref reply)) => {
info!("Reply: {:?}", reply);
}
_ => {
info!("Unknown message type: {:?}", message);
}
}
}
}
Commands::Clear {} => {
fs::remove_file(cli.db.unwrap()).unwrap();
}
Expand All @@ -345,21 +414,27 @@ async fn create_client(cli: &Cli, account: IdentityStrategy) -> Result<Client, C

if cli.local {
info!("Using local network");
builder = builder.api_client(
ApiClient::create("http://localhost:5556".into(), false)
.await
.unwrap(),
);
builder = builder
.api_client(
ApiClient::create("http://localhost:5556".into(), false)
.await
.unwrap(),
)
.history_sync_url(MessageHistoryUrls::LOCAL_ADDRESS);
} else {
info!("Using dev network");
builder = builder.api_client(
ApiClient::create("https://grpc.dev.xmtp.network:443".into(), true)
.await
.unwrap(),
);
builder = builder
.api_client(
ApiClient::create("https://grpc.dev.xmtp.network:443".into(), true)
.await
.unwrap(),
)
.history_sync_url(MessageHistoryUrls::DEV_ADDRESS);
}

builder.build().await.map_err(CliError::ClientBuilder)
let client = builder.build().await.map_err(CliError::ClientBuilder)?;

Ok(client)
}

async fn register(cli: &Cli, maybe_seed_phrase: Option<String>) -> Result<(), CliError> {
Expand Down
1 change: 0 additions & 1 deletion xmtp_mls/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ parking_lot = "0.12.3"
prost = { workspace = true, features = ["prost-derive"] }
rand = { workspace = true }
reqwest = { version = "0.12.4", features = ["stream"] }
ring = "0.17.8"
serde = { workspace = true }
serde_json.workspace = true
sha2.workspace = true
Expand Down
2 changes: 2 additions & 0 deletions xmtp_mls/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,8 @@ pub enum MessageProcessingError {
ClearPendingCommit(#[from] sql_key_store::SqlKeyStoreError),
#[error(transparent)]
Group(#[from] Box<GroupError>),
#[error("Serialization/Deserialization Error {0}")]
Serde(#[from] serde_json::Error),
#[error("generic:{0}")]
Generic(String),
#[error("intent is missing staged_commit field")]
Expand Down
2 changes: 0 additions & 2 deletions xmtp_mls/src/configuration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@ pub const MAX_PAST_EPOCHS: usize = 3;
/// we leave 5 * 1024 * 1024 as extra buffer room
pub const GRPC_DATA_LIMIT: usize = 45 * 1024 * 1024;

pub const DELIMITER: char = '\x01';

/// MLS Extension Types
///
/// Copied from draft-ietf-mls-protocol-16:
Expand Down
Loading

0 comments on commit 6acf381

Please sign in to comment.