Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: implement a generation counter for the CryptoStore lock #2155

Merged
merged 3 commits into from
Jun 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 0 additions & 6 deletions bindings/matrix-sdk-ffi/src/encryption_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,12 +73,6 @@ impl EncryptionSync {
error!("Error when stopping the encryption sync: {err}");
}
}

pub fn reload_caches(&self) {
if let Err(err) = RUNTIME.block_on(self.sync.reload_caches()) {
error!("Error when reloading caches: {err}");
}
}
}

impl Client {
Expand Down
127 changes: 125 additions & 2 deletions crates/matrix-sdk-crypto/src/machine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,8 @@ use crate::{
requests::{IncomingResponse, OutgoingRequest, UploadSigningKeysRequest},
session_manager::{GroupSessionManager, SessionManager},
store::{
Changes, DeviceChanges, DynCryptoStore, IdentityChanges, IntoCryptoStore, MemoryStore,
Result as StoreResult, SecretImportError, Store,
locks::LockStoreError, Changes, DeviceChanges, DynCryptoStore, IdentityChanges,
IntoCryptoStore, MemoryStore, Result as StoreResult, SecretImportError, Store,
},
types::{
events::{
Expand Down Expand Up @@ -129,6 +129,18 @@ pub struct OlmMachineInner {
/// A state machine that handles creating room key backups.
#[cfg(feature = "backups_v1")]
backup_machine: BackupMachine,
/// Latest "generation" of data known by the crypto store.
///
/// This is a counter that only increments, set in the database (and can
/// wrap). It's incremented whenever some process acquires a lock for the
/// first time. *This assumes the crypto store lock is being held, to
/// avoid data races on writing to this value in the store*.
///
/// The current process will maintain this value in local memory and in the
/// DB over time. Observing a different value than the one read in
/// memory, when reading from the store indicates that somebody else has
/// written into the database under our feet.
pub(crate) crypto_store_generation: Arc<Mutex<Option<u64>>>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it make sense to use a SequenceNumber for this?

/// A numeric type that can represent an infinite ordered sequence.
///
/// It uses wrapping arithmetic to make sure we never run out of numbers. (2**64
/// should be enough for anyone, but it's easy enough just to make it wrap.)
//
/// Internally it uses a *signed* counter so that we can compare values via a
/// subtraction. For example, suppose we've just overflowed from i64::MAX to
/// i64::MIN. (i64::MAX.wrapping_sub(i64::MIN)) is -1, which tells us that
/// i64::MAX comes before i64::MIN in the sequence.
#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
pub(crate) struct SequenceNumber(i64);

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I could use it, and it does impl PartialEq/Eq, but I'm really just using a single wrapping_add here 😁

}

#[cfg(not(tarpaulin_include))]
Expand All @@ -142,6 +154,8 @@ impl std::fmt::Debug for OlmMachine {
}

impl OlmMachine {
const CURRENT_GENERATION_STORE_KEY: &str = "generation-counter";

/// Create a new memory based OlmMachine.
///
/// The created machine will keep the encryption keys only in memory and
Expand Down Expand Up @@ -212,6 +226,7 @@ impl OlmMachine {
identity_manager,
#[cfg(feature = "backups_v1")]
backup_machine,
crypto_store_generation: Arc::new(Mutex::new(None)),
});

Self { inner }
Expand Down Expand Up @@ -1728,6 +1743,114 @@ impl OlmMachine {
pub fn backup_machine(&self) -> &BackupMachine {
&self.inner.backup_machine
}

/// Syncs the database and in-memory generation counter.
///
/// This requires that the crypto store lock has been acquired already.
pub async fn initialize_crypto_store_generation(&self) -> StoreResult<()> {
// Avoid reentrant initialization by taking the lock for the entire's function
// scope.
let mut gen_guard = self.inner.crypto_store_generation.lock().await;

let prev_generation =
self.inner.store.get_custom_value(Self::CURRENT_GENERATION_STORE_KEY).await?;

let gen = match prev_generation {
Some(val) => {
// There was a value in the store. We need to signal that we're a different
// process, so we don't just reuse the value but increment it.
u64::from_le_bytes(
val.try_into().map_err(|_| LockStoreError::InvalidGenerationFormat)?,
)
.wrapping_add(1)
}
None => 0,
};

self.inner
.store
.set_custom_value(Self::CURRENT_GENERATION_STORE_KEY, gen.to_le_bytes().to_vec())
.await?;

*gen_guard = Some(gen);

Ok(())
}

/// If needs be, update the local and on-disk crypto store generation.
///
/// Returns true whether another user has modified the internal generation
/// counter, and as such we've incremented and updated it in the
/// database.
///
/// ## Requirements
///
/// - This assumes that `initialize_crypto_store_generation` has been called
/// beforehand.
/// - This requires that the crypto store lock has been acquired.
pub async fn maintain_crypto_store_generation(&self) -> StoreResult<bool> {
let mut gen_guard = self.inner.crypto_store_generation.lock().await;

// The database value must be there:
// - either we could initialize beforehand, thus write into the database,
// - or we couldn't, and then another process was holding onto the database's
// lock, thus
// has written a generation counter in there.
let actual_gen = self
.inner
.store
.get_custom_value(Self::CURRENT_GENERATION_STORE_KEY)
.await?
.ok_or(LockStoreError::MissingGeneration)?;

let actual_gen = u64::from_le_bytes(
actual_gen.try_into().map_err(|_| LockStoreError::InvalidGenerationFormat)?,
);

let expected_gen = match gen_guard.as_ref() {
Some(expected_gen) => {
if actual_gen == *expected_gen {
bnjbvr marked this conversation as resolved.
Show resolved Hide resolved
return Ok(false);
}
// Increment the biggest, and store it everywhere.
actual_gen.max(*expected_gen).wrapping_add(1)
}
None => {
// Some other process hold onto the lock when initializing, so we must reload.
// Increment database value, and store it everywhere.
actual_gen.wrapping_add(1)
}
};

tracing::debug!(
"Crypto store generation mismatch: previously known was {:?}, actual is {:?}, next is {}",
*gen_guard,
actual_gen,
expected_gen
);

// Update known value.
*gen_guard = Some(expected_gen);

// Update value in database.
self.inner
.store
.set_custom_value(
Self::CURRENT_GENERATION_STORE_KEY,
expected_gen.to_le_bytes().to_vec(),
)
.await?;

Ok(true)
}

#[cfg(any(feature = "testing", test))]
/// Returns whether this `OlmMachine` is the same another one.
///
/// Useful for testing purposes only.
pub fn same_as(&self, other: &OlmMachine) -> bool {
Arc::ptr_eq(&self.inner, &other.inner)
}
}

#[cfg(any(feature = "testing", test))]
Expand Down
9 changes: 9 additions & 0 deletions crates/matrix-sdk-crypto/src/store/locks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,15 @@ pub enum LockStoreError {
/// Spent too long waiting for a database lock.
#[error("a lock timed out")]
LockTimeout,

/// The generation counter is missing, and should always be present.
#[error("missing generation counter in the store")]
MissingGeneration,

/// Unexpected format for the generation counter. Is someone tampering the
/// database?
#[error("invalid format of the generation counter")]
InvalidGenerationFormat,
}

#[cfg(test)]
Expand Down
1 change: 1 addition & 0 deletions crates/matrix-sdk-sqlite/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ rust-version = { workspace = true }

[features]
default = ["state-store"]
testing = ["matrix-sdk-crypto?/testing"]

bundled = ["rusqlite/bundled"]
crypto-store = [
Expand Down
13 changes: 0 additions & 13 deletions crates/matrix-sdk-ui/src/encryption_sync/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -213,19 +213,6 @@ impl EncryptionSync {

Ok(())
}

/// Request a reload of the internal caches used by this sync.
///
/// This must be called every time the process running this loop was
/// suspended and got back into the foreground, and another process may have
/// written to the same underlying store (e.g. notification process vs
/// main process).
pub async fn reload_caches(&self) -> Result<(), Error> {
// Regenerate the crypto store caches first.
self.client.encryption().reload_caches().await.map_err(Error::ClientError)?;

Ok(())
}
}

/// Errors for the [`EncryptionSync`].
Expand Down
2 changes: 1 addition & 1 deletion crates/matrix-sdk/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ rustdoc-args = ["--cfg", "docsrs"]

[features]
default = ["e2e-encryption", "automatic-room-key-forwarding", "sqlite", "native-tls"]
testing = []
testing = ["matrix-sdk-sqlite?/testing"]

e2e-encryption = [
"matrix-sdk-base/e2e-encryption",
Expand Down
Loading