From 27a35b7576109f32b33a9a7a63047cce2be6efeb Mon Sep 17 00:00:00 2001 From: ivmarkov Date: Wed, 24 Apr 2024 06:29:21 +0000 Subject: [PATCH] Support for BLE and BTP Fix CI to install libdbus Fix a few typos in doc comments MAX_BTP_SESSIONS needs to be public Clone for the callback Compute MTU more precisely, also based on the reported GATT MTU --- .github/workflows/ci-tlv-tool.yml | 3 + .github/workflows/ci.yml | 3 + .github/workflows/publish-dry-run.yml | 3 + .github/workflows/publish.yml | 3 + examples/onoff_light/src/main.rs | 56 +- rs-matter/Cargo.toml | 6 +- rs-matter/src/error.rs | 15 + rs-matter/src/transport/network.rs | 1 + rs-matter/src/transport/network/btp.rs | 330 +++++++++ .../src/transport/network/btp/context.rs | 380 +++++++++++ rs-matter/src/transport/network/btp/gatt.rs | 220 ++++++ .../src/transport/network/btp/gatt/bluer.rs | 383 +++++++++++ .../src/transport/network/btp/session.rs | 625 ++++++++++++++++++ .../transport/network/btp/session/packet.rs | 461 +++++++++++++ rs-matter/src/utils/mod.rs | 2 + rs-matter/src/utils/ringbuf.rs | 258 ++++++++ rs-matter/src/utils/std_mutex.rs | 42 ++ 17 files changed, 2775 insertions(+), 16 deletions(-) create mode 100644 rs-matter/src/transport/network/btp.rs create mode 100644 rs-matter/src/transport/network/btp/context.rs create mode 100644 rs-matter/src/transport/network/btp/gatt.rs create mode 100644 rs-matter/src/transport/network/btp/gatt/bluer.rs create mode 100644 rs-matter/src/transport/network/btp/session.rs create mode 100644 rs-matter/src/transport/network/btp/session/packet.rs create mode 100644 rs-matter/src/utils/ringbuf.rs create mode 100644 rs-matter/src/utils/std_mutex.rs diff --git a/.github/workflows/ci-tlv-tool.yml b/.github/workflows/ci-tlv-tool.yml index 1f74d4f8..a8611c38 100644 --- a/.github/workflows/ci-tlv-tool.yml +++ b/.github/workflows/ci-tlv-tool.yml @@ -24,6 +24,9 @@ jobs: toolchain: ${{ env.RUST_TOOLCHAIN }} components: rustfmt, clippy, rust-src + - name: Install libdbus + run: sudo apt-get install -y libdbus-1-dev + - name: Checkout uses: actions/checkout@v3 diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 958214d6..6529e951 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -29,6 +29,9 @@ jobs: toolchain: ${{ env.RUST_TOOLCHAIN }} components: rustfmt, clippy, rust-src + - name: Install libdbus + run: sudo apt-get install -y libdbus-1-dev + - name: Checkout uses: actions/checkout@v3 diff --git a/.github/workflows/publish-dry-run.yml b/.github/workflows/publish-dry-run.yml index 9803a418..cde69167 100644 --- a/.github/workflows/publish-dry-run.yml +++ b/.github/workflows/publish-dry-run.yml @@ -17,6 +17,9 @@ jobs: toolchain: ${{ env.RUST_TOOLCHAIN }} components: rustfmt, clippy, rust-src + - name: Install libdbus + run: sudo apt-get install -y libdbus-1-dev + - name: Checkout uses: actions/checkout@v3 diff --git a/.github/workflows/publish.yml b/.github/workflows/publish.yml index d955b28d..e75d4f5a 100644 --- a/.github/workflows/publish.yml +++ b/.github/workflows/publish.yml @@ -17,6 +17,9 @@ jobs: toolchain: ${{ env.RUST_TOOLCHAIN }} components: rustfmt, clippy, rust-src + - name: Install libdbus + run: sudo apt-get install -y libdbus-1-dev + - name: Checkout uses: actions/checkout@v3 diff --git a/examples/onoff_light/src/main.rs b/examples/onoff_light/src/main.rs index 192d62f4..98eb4316 100644 --- a/examples/onoff_light/src/main.rs +++ b/examples/onoff_light/src/main.rs @@ -18,6 +18,7 @@ use core::borrow::Borrow; use core::pin::pin; use std::net::UdpSocket; +use std::sync::Arc; use embassy_futures::select::{select, select4}; @@ -36,16 +37,24 @@ use rs_matter::data_model::subscriptions::Subscriptions; use rs_matter::data_model::system_model::descriptor; use rs_matter::error::Error; use rs_matter::mdns::MdnsService; +use rs_matter::pairing::DiscoveryCapabilities; use rs_matter::persist::Psm; use rs_matter::respond::DefaultResponder; use rs_matter::secure_channel::spake2p::VerifierData; use rs_matter::transport::core::MATTER_SOCKET_BIND_ADDR; +use rs_matter::transport::network::{ + btp::{Btp, BtpContext}, + Address, ChainedNetwork, +}; use rs_matter::utils::buf::PooledBuffers; use rs_matter::utils::select::Coalesce; +use rs_matter::utils::std_mutex::StdRawMutex; use rs_matter::MATTER_PORT; mod dev_att; +static BTP_CONTEXT: BtpContext = BtpContext::::new(); + fn main() -> Result<(), Error> { let thread = std::thread::Builder::new() // Increase the stack size until the example can work without stack blowups. @@ -55,7 +64,7 @@ fn main() -> Result<(), Error> { // e.g., an opt-level of "0" will require a several times' larger stack. // // Optimizing/lowering `rs-matter` memory consumption is an ongoing topic. - .stack_size(95 * 1024) + .stack_size(950 * 1024) .spawn(run) .unwrap(); @@ -98,6 +107,15 @@ fn run() -> Result<(), Error> { MATTER_PORT, ); + let dev_comm = CommissioningData { + // TODO: Hard-coded for now + verifier: VerifierData::new_with_pw(123456, *matter.borrow()), + discriminator: 250, + }; + + //let discovery_caps = DiscoveryCapabilities::new(true, false, false); + let discovery_caps = DiscoveryCapabilities::new(false, true, false); + matter.initialize_transport_buffers()?; info!("Matter initialized"); @@ -141,23 +159,30 @@ fn run() -> Result<(), Error> { } }); + //let btp = Btp::new_builtin(Arc::new(BtpContext::::new())); + let btp = Btp::new_builtin(&BTP_CONTEXT); + + let mut bluetooth = pin!(btp.run("MT", &dev_det, &dev_comm)); + // NOTE: // When using a custom UDP stack (e.g. for `no_std` environments), replace with a UDP socket bind for your custom UDP stack // The returned socket should be splittable into two halves, where each half implements `UdpSend` and `UdpReceive` respectively - let socket = async_io::Async::::bind(MATTER_SOCKET_BIND_ADDR)?; + //let udp = async_io::Async::::bind(MATTER_SOCKET_BIND_ADDR)?; - // Run the Matter and mDNS transports + // let network = ChainedNetwork::new( + // Address::is_btp, + // &btp, + // &udp); + + // Run the Matter transport let mut transport = pin!(matter.run( - &socket, - &socket, - Some(( - CommissioningData { - // TODO: Hard-coded for now - verifier: VerifierData::new_with_pw(123456, *matter.borrow()), - discriminator: 250, - }, - Default::default(), - )), + // network.clone(), + // network, + &btp, + &btp, + // &udp, + // &udp, + Some((dev_comm, discovery_caps)), )); // NOTE: @@ -168,14 +193,15 @@ fn run() -> Result<(), Error> { // Combine all async tasks in a single one let all = select4( &mut transport, - &mut mdns, + &mut bluetooth, + //&mut mdns, &mut persist, select(&mut respond, &mut device).coalesce(), ); // NOTE: // Replace with a different executor for e.g. `no_std` environments - futures_lite::future::block_on(all.coalesce()) + futures_lite::future::block_on(async_compat::Compat::new(all.coalesce())) } const NODE: Node<'static> = Node { diff --git a/rs-matter/Cargo.toml b/rs-matter/Cargo.toml index d6689157..0cd26e8b 100644 --- a/rs-matter/Cargo.toml +++ b/rs-matter/Cargo.toml @@ -69,12 +69,16 @@ x509-cert = { version = "0.2", default-features = false, features = ["pem"], opt # STD rand = { version = "0.8", optional = true, default-features = false, features = ["std", "std_rng"] } async-io = { version = "2", optional = true, default-features = false } +async-compat = { version = "0.2", optional = true, default-features = false } [target.'cfg(target_os = "macos")'.dependencies] astro-dnssd = { version = "0.3" } [target.'cfg(target_os = "linux")'.dependencies] zeroconf = { version = "0.12", optional = true } +bluer = { version = "0.17", features = ["bluetoothd"] } +tokio = { version = "1" } +tokio-stream = { version = "0.1" } [dev-dependencies] env_logger = "0.11" @@ -84,7 +88,7 @@ futures-lite = "1" [[example]] name = "onoff_light" path = "../examples/onoff_light/src/main.rs" -required-features = ["std", "async-io"] +required-features = ["std", "async-io", "async-compat"] # [[example]] # name = "speaker" diff --git a/rs-matter/src/error.rs b/rs-matter/src/error.rs index 4e373cc7..7a07d583 100644 --- a/rs-matter/src/error.rs +++ b/rs-matter/src/error.rs @@ -204,6 +204,21 @@ impl From for Error { } } +#[cfg(all(feature = "std", target_os = "linux", not(feature = "backtrace")))] +impl From for Error { + fn from(e: bluer::Error) -> Self { + ::log::error!("Error in BTP: {e}"); + Self::new(ErrorCode::BtpError) + } +} + +#[cfg(all(feature = "std", target_os = "linux", feature = "backtrace"))] +impl From for Error { + fn from(e: bluer::Error) -> Self { + Self::new_with_details(ErrorCode::BtpError, Box::new(e)) + } +} + #[cfg(feature = "std")] impl From for Error { fn from(_e: std::time::SystemTimeError) -> Self { diff --git a/rs-matter/src/transport/network.rs b/rs-matter/src/transport/network.rs index f7a74baa..850e02a6 100644 --- a/rs-matter/src/transport/network.rs +++ b/rs-matter/src/transport/network.rs @@ -26,6 +26,7 @@ use embassy_futures::select::{select, Either}; use crate::error::{Error, ErrorCode}; +pub mod btp; pub mod udp; // Maximum UDP RX packet size per Matter spec diff --git a/rs-matter/src/transport/network/btp.rs b/rs-matter/src/transport/network/btp.rs new file mode 100644 index 00000000..d4fd0779 --- /dev/null +++ b/rs-matter/src/transport/network/btp.rs @@ -0,0 +1,330 @@ +/* + * + * Copyright (c) 2020-2022 Project CHIP Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +use core::borrow::Borrow; +use core::future::Future; +use core::marker::PhantomData; +use core::ops::DerefMut; + +use embassy_futures::select::select4; +use embassy_sync::blocking_mutex::raw::{NoopRawMutex, RawMutex}; +use embassy_time::{Duration, Instant, Timer}; + +use log::trace; + +use crate::data_model::cluster_basic_information::BasicInfoConfig; +use crate::error::{Error, ErrorCode}; +use crate::transport::network::{Address, BtAddr, NetworkReceive, NetworkSend}; +use crate::utils::ifmutex::IfMutex; +use crate::utils::select::Coalesce; +use crate::CommissioningData; + +pub use context::{BtpContext, MAX_BTP_SESSIONS}; +pub use gatt::*; + +use self::context::SessionSendLock; + +mod context; +mod gatt; +mod session; + +/// The maximum size of a BTP segment. +pub(crate) const MAX_BTP_SEGMENT_SIZE: usize = 244; +/// The size of the GATT header. `MAX_BTP_SEGMENT_SIZE` + `GATT_HEADER_SIZE` is 247 bytes, which is the maximum ATT MTU size supported by the BTP protocol. +pub(crate) const GATT_HEADER_SIZE: usize = 3; + +/// The minimum MTU that can be used as per specification. +pub(crate) const MIN_MTU: u16 = (20 + GATT_HEADER_SIZE) as u16; +/// The maximum MTU that can be used as per specification. +pub(crate) const MAX_MTU: u16 = (MAX_BTP_SEGMENT_SIZE + GATT_HEADER_SIZE) as u16; + +/// An implementation of the Matter BTP protocol. +/// This is a low-level protocol that is used to send and receive Matter messages over BLE. +/// +/// The implementation needs a `Gatt` trait implementation which is OS/platform-specific. +/// All aspects of the BTP protocol however are implemented in platform-neutral way. +pub struct Btp { + gatt: T, + context: C, + send_buf: IfMutex>, + _mutex: PhantomData, +} + +#[cfg(all(feature = "std", target_os = "linux"))] +impl Btp +where + C: Borrow> + Clone + Send + Sync + 'static, + M: RawMutex + Send + Sync, +{ + #[inline(always)] + pub fn new_builtin(context: C) -> Self { + Self::new(BuiltinGattPeripheral::new(None), context) + } +} + +impl Btp +where + C: Borrow> + Clone + Send + Sync + 'static, + M: RawMutex + Send + Sync, + T: GattPeripheral, +{ + /// Construct a new BTP object with the provided `GattPeripheral` trait implementation and with the + /// provided BTP `context`. + #[inline(always)] + pub const fn new(gatt: T, context: C) -> Self { + Self { + gatt, + context, + send_buf: IfMutex::new(heapless::Vec::new()), + _mutex: PhantomData, + } + } + + /// Run the BTP protocol + /// + /// While all sending and receiving of Matter packets (a.k.a. BTP SDUs) is done via the `recv` and `send` methods + /// on the `Btp` struct, this method is responsible for managing internal implementation aspects of + /// the BTP protocol implementation, like e.g. the sessions' keepalive logic. + /// + /// Therefore, user is expected to call this method in order to run the BTP protocol. + pub fn run<'a>( + &'a self, + service_name: &'a str, + dev_det: &BasicInfoConfig<'_>, + dev_comm: &CommissioningData, + ) -> impl Future> + 'a { + let adv_data = AdvData::new(dev_det, dev_comm); + + let context = self.context.clone(); + + async move { + select4( + self.gatt.run(service_name, &adv_data, move |event| { + context.borrow().on_event(event) + }), + self.handshake(), + self.ack(), + self.remove_expired(), + ) + .coalesce() + .await + } + } + + /// Wait until there is at least one Matter (a.k.a. BTP SDU) packet available for consumption. + pub async fn wait_available(&self) -> Result<(), Error> { + self.context.borrow().wait_available().await + } + + /// Receive a Matter (a.k.a. BTP SDU) packet. + /// + /// If there is no packet available, this method will block asynchronously until a packet is available. + /// Returns the size of the received packet, as well as the address of the BLE peer from where the packet originates. + pub async fn recv(&self, buf: &mut [u8]) -> Result<(usize, BtAddr), Error> { + self.context.borrow().recv(buf).await + } + + /// Send a Matter (a.k.a. BTP SDU) packet to the specified BLE peer. + /// + /// The `data` parameter is the data to be sent. + /// The `address` parameter is the BLE address of the peer to which the data should be sent. + /// + /// If the peer is not connected, this method will return an error. + /// If the BTP stack is busy sending data to another peer, this method will block asynchronously until the stack is ready to send the data. + pub async fn send(&self, data: &[u8], address: BtAddr) -> Result<(), Error> { + let context = self.context.borrow(); + + loop { + if let Some(session_lock) = + SessionSendLock::try_lock(context, |session| session.address() == address) + .map_err(|_| ErrorCode::NoNetworkInterface)? + { + self.do_send(&session_lock, data).await?; + break; + } + + context.send_notif.wait().await; + } + + Ok(()) + } + + /// Internal utility method that sends a BTP SDU packet on behalf of a session which is locked for sending. + /// + /// The `session_lock` parameter represents a session which had been locked for sending. + /// The `data` parameter is the data to be sent as part of the BTP SDU packet. + async fn do_send( + &self, + session_lock: &SessionSendLock<'_, M>, + data: &[u8], + ) -> Result<(), Error> { + let mut offset = 0; + + loop { + let mut buf = self.send_buf().await; + + let packet = session_lock + .with_session(|session| session.prep_tx_data(data, offset, &mut buf))?; + + if let Some((slice, new_offset)) = packet { + self.gatt.indicate(slice, session_lock.address()).await?; + offset = new_offset; + + trace!( + "Sent {slice:02x?} bytes to address {}", + session_lock.address() + ); + + if offset == data.len() { + break; + } + } else { + drop(buf); + + self.context.borrow().send_notif.wait().await; + } + } + + Ok(()) + } + + /// A job that is responsible for removing all sessions, which are considered expired due to + /// the remote peers not sending an ACK packet on time. + async fn remove_expired(&self) -> Result<(), Error> { + let context = self.context.borrow(); + + loop { + Timer::after(Duration::from_secs(1)).await; + + // Remove all timed-out sessions + context.remove(|session| session.is_timed_out(Instant::now()))?; + } + } + + /// A job that is responsible for sending ACK on behalf of all sessions, which + /// either have their receive windows full, or which would otherwise expire due to inactivity. + async fn ack(&self) -> Result<(), Error> { + let context = self.context.borrow(); + + loop { + while let Some(session_lock) = + SessionSendLock::lock(context, |session| session.is_ack_due(Instant::now())) + { + self.do_send(&session_lock, &[]).await?; + } + + context.ack_notif.wait().await; + } + } + + /// A job that is resposible for sending the Handshake Response packet to all remote peers that + /// in the meantime have connected to the peripheral, subscribed to chracteristic `C2` and had + /// written the Handshake Request packet to characteristic `C1`. + async fn handshake(&self) -> Result<(), Error> { + let context = self.context.borrow(); + + loop { + while let Some(session_lock) = + SessionSendLock::lock(context, session::Session::is_handshake_resp_due) + { + let mut buf = self.send_buf().await; + + let slice = + session_lock.with_session(|session| session.prep_tx_handshake(&mut buf))?; + + self.gatt.indicate(slice, session_lock.address()).await?; + + trace!( + "Sent {slice:02x?} bytes to address {}", + session_lock.address() + ); + } + + context.handshake_notif.wait().await; + } + } + + /// Get a mutable reference to the send buffer, asybchronously waiting for the buffer to become available, + /// in case it is used by another operation. + async fn send_buf( + &self, + ) -> impl DerefMut> + '_ { + let mut buf = self.send_buf.lock().await; + + // Unwrap is safe because the max size of the buffer is MAX_PDU_SIZE + buf.resize_default(MAX_BTP_SEGMENT_SIZE).unwrap(); + + buf + } +} + +impl NetworkSend for &Btp +where + C: Borrow> + Clone + Send + Sync + 'static, + M: RawMutex + Send + Sync, + T: GattPeripheral, +{ + async fn send_to(&mut self, data: &[u8], addr: Address) -> Result<(), Error> { + (*self) + .send(data, addr.btp().ok_or(ErrorCode::NoNetworkInterface)?) + .await + } +} + +impl NetworkReceive for &Btp +where + C: Borrow> + Clone + Send + Sync + 'static, + M: RawMutex + Send + Sync, + T: GattPeripheral, +{ + async fn wait_available(&mut self) -> Result<(), Error> { + (*self).wait_available().await + } + + async fn recv_from(&mut self, buffer: &mut [u8]) -> Result<(usize, Address), Error> { + (*self) + .recv(buffer) + .await + .map(|(len, addr)| (len, Address::Btp(addr))) + } +} + +impl NetworkSend for Btp +where + C: Borrow> + Clone + Send + Sync + 'static, + M: RawMutex + Send + Sync, + T: GattPeripheral, +{ + async fn send_to(&mut self, data: &[u8], addr: Address) -> Result<(), Error> { + (&*self).send_to(data, addr).await + } +} + +impl NetworkReceive for Btp +where + C: Borrow> + Clone + Send + Sync + 'static, + M: RawMutex + Send + Sync, + T: GattPeripheral, +{ + async fn wait_available(&mut self) -> Result<(), Error> { + (*self).wait_available().await + } + + async fn recv_from(&mut self, buffer: &mut [u8]) -> Result<(usize, Address), Error> { + (&*self).recv_from(buffer).await + } +} diff --git a/rs-matter/src/transport/network/btp/context.rs b/rs-matter/src/transport/network/btp/context.rs new file mode 100644 index 00000000..8cd0c8d3 --- /dev/null +++ b/rs-matter/src/transport/network/btp/context.rs @@ -0,0 +1,380 @@ +/* + * + * Copyright (c) 2020-2022 Project CHIP Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +use core::cell::RefCell; + +use embassy_sync::blocking_mutex::{raw::RawMutex, Mutex}; +use log::{error, info, trace, warn}; + +use crate::{ + error::{Error, ErrorCode}, + transport::network::BtAddr, + utils::notification::Notification, +}; + +use super::{session::Session, GattPeripheralEvent}; + +/// The maximum number of BTP sessions that can be active at any given time. +/// This is an `rs-matter` specific limit, and is not a requirement of the Matter BTP spec, and which in future should be configurable. +/// +/// The `GattPeripheral` implementation is expected to enforce this limit as well, +/// i.e. it should not allow more than `MAX_BTP_SESSIONS` active subscriptions to characteristic `C2`. +pub const MAX_BTP_SESSIONS: usize = 2; + +/// Represents an error that occurred while trying to lock a session for sending. +#[derive(Debug)] +pub(crate) struct LockError(()); + +/// An internal utility for representing a session which is locked for sending. +/// +/// This type is used to ensure that at any moment in time, a session either is not sending anything, +/// or is sending the BTP PDUs of a single BTP SDU, which is a requirement of the Matter BTP spec. +/// +/// The send lock is removed once this object is dropped. +pub(crate) struct SessionSendLock<'a, M> +where + M: RawMutex, +{ + context: &'a BtpContext, + address: BtAddr, +} + +impl<'a, M> SessionSendLock<'a, M> +where + M: RawMutex, +{ + /// Try to find a session that matches the given condition and lock it for sending. + /// + /// If there is no session matching the given condition, the methos will return `LockError`. + /// If the first session matching the given condition is already locked for sending, the method will return `None`. + /// + /// Due to the above semantics, the condition is expected to uniquely identify a session, by - say - matching on + /// the session peer BLE address. + pub fn try_lock(context: &'a BtpContext, condition: F) -> Result, LockError> + where + F: Fn(&Session) -> bool, + { + context.sessions.lock(move |sessions| { + let mut sessions = sessions.borrow_mut(); + + let Some(session) = sessions.iter_mut().find(|session| condition(session)) else { + return Err(LockError(())); + }; + + let lock = if session.set_sending(true) { + Some(Self { + context, + address: session.address(), + }) + } else { + None + }; + + Ok(lock) + }) + } + + /// Lock one (out of potentially many) sessions matcing the provided condition for sending. + /// + /// If all sessions matching the provided condition are already locked for sending, or if there is no + /// session matching the provided condition, the method will return `None`. + pub fn lock(context: &'a BtpContext, condition: F) -> Option + where + F: Fn(&Session) -> bool, + { + context.sessions.lock(move |sessions| { + sessions.borrow_mut().iter_mut().find_map(|session| { + if condition(session) && session.set_sending(true) { + Some(Self { + context, + address: session.address(), + }) + } else { + None + } + }) + }) + } + + /// Return the peer BLE address. + pub fn address(&self) -> BtAddr { + self.address + } + + /// Execute the provided closure with a mutable reference to the session locked for sending. + /// + /// If the session is no longer present, the method will return `ErrorCode::NoNetworkInterface`. + pub fn with_session(&self, f: F) -> Result + where + F: FnOnce(&mut Session) -> Result, + { + self.context.sessions.lock(|sessions| { + let mut sessions = sessions.borrow_mut(); + let session = sessions + .iter_mut() + .find(|session| session.address() == self.address) + .ok_or(ErrorCode::NoNetworkInterface)?; + + f(session) + }) + } +} + +impl<'a, M> Drop for SessionSendLock<'a, M> +where + M: RawMutex, +{ + fn drop(&mut self) { + self.context.sessions.lock(|sessions| { + if let Some(session) = sessions + .borrow_mut() + .iter_mut() + .find(|session| session.address() == self.address) + { + if !session.set_sending(false) || !session.set_running() { + unreachable!("Should not happen") + } + } + }); + + self.context.send_notif.notify(); + } +} + +/// A structure representing a BTP "context". +/// +/// The BTP protocol implementation is split into two structures: +/// - `Btp` - the main BTP protocol implementation, which is responsible for handling the BTP protocol itself. This structure is not `Send` and `Sync` +/// and is overall a typical future-based protocol implementation, like the others in the `rs-matter` stack. +/// - `BtpContext` - a structure that holds the state of the BTP protocol shared between itself and the Gatt peripheral implementation. +/// In terms of ownership, The `Btp` instance holds a `'static` reference to the context, i.e. a `&'static BtpContext` reference, +/// or an `Arc>` instance for platforms where the Rust `alloc::sync` module is available. +/// Furthermore, the state kept in `BtpContext` is safe to share amongst multiple threads. +/// +/// The need to split the BTP implementation into two structures is due to the fact that the `GattPeripheral` trait uses a +/// `'static + Send + Sync` callback closure so as to report subscribe, unsubscribe and write events back to the BTP protocol implementation. +/// +/// While this simplifies the implementation of the `GattPeripheral` trait (as MCU-based Gatt peripheral stacks often expect a closure with these +/// precise restrictions), it complicates the implementation of the BTP protocol and necessiates the isolation of the shared state in the +/// `BtpContext` structure. +pub struct BtpContext +where + M: RawMutex, +{ + sessions: Mutex>>, + pub(crate) handshake_notif: Notification, + pub(crate) available_notif: Notification, + pub(crate) recv_notif: Notification, + pub(crate) ack_notif: Notification, + pub(crate) send_notif: Notification, +} + +impl Default for BtpContext +where + M: RawMutex, +{ + fn default() -> Self { + Self::new() + } +} + +impl BtpContext +where + M: RawMutex, +{ + /// Create a new BTP context. + #[inline(always)] + pub const fn new() -> Self { + Self { + sessions: Mutex::new(RefCell::new(heapless::Vec::new())), + handshake_notif: Notification::new(), + available_notif: Notification::new(), + recv_notif: Notification::new(), + ack_notif: Notification::new(), + send_notif: Notification::new(), + } + } +} + +impl BtpContext +where + M: RawMutex, +{ + /// The `Btp` instance passes a closure of this method to the `GattPeripheral` implementation which is in use + /// so that the peripheral can report to it subscribe, unsubscribe and write events. + pub(crate) fn on_event(&self, event: GattPeripheralEvent) { + let result = match event { + GattPeripheralEvent::NotifySubscribed(address) => self.on_subscribe(address), + GattPeripheralEvent::NotifyUnsubscribed(address) => self.on_unsubscribe(address), + GattPeripheralEvent::Write { + address, + data, + gatt_mtu, + } => self.on_write(address, data, gatt_mtu), + }; + + if let Err(e) = result { + error!("Unexpected error in GATT callback: {e:?}"); + } + } + + /// Handles a write event to characteristic `C1` from the GATT peripheral. + fn on_write(&self, address: BtAddr, data: &[u8], gatt_mtu: Option) -> Result<(), Error> { + trace!("Received {data:02x?} bytes from {address}"); + + self.sessions.lock(|sessions| { + let mut sessions = sessions.borrow_mut(); + + if Session::is_handshake(data)? { + if sessions.len() >= MAX_BTP_SESSIONS { + warn!("Too many BTP sessions, dropping a handshake request from address {address}"); + } else { + // Unwrap is safe because we checked the length above + sessions + .push(Session::process_rx_handshake(address, data, gatt_mtu)?) + .unwrap(); + } + + Ok(()) + } else { + let Some(index) = sessions + .iter_mut() + .position(|session| session.address() == address) + else { + warn!("Dropping data from address {address} because there is no session for it"); + return Ok(()); + }; + + let session = &mut sessions[index]; + let result = session.process_rx_data(data); + + if result.is_err() { + sessions.swap_remove(index); + error!("Dropping session {address} because of an error: {result:?}"); + } + + self.available_notif.notify(); + self.recv_notif.notify(); + self.ack_notif.notify(); + self.send_notif.notify(); + + result + } + }) + } + + /// Handles a subscribe event to characteristic `C2` from the GATT peripheral. + fn on_subscribe(&self, address: BtAddr) -> Result<(), Error> { + info!("Subscribe request from {address}"); + + self.sessions.lock(|sessions| { + let mut sessions = sessions.borrow_mut(); + if let Some(session) = sessions + .iter_mut() + .find(|session| session.address() == address) + { + if !session.set_subscribed() { + unreachable!(); + } + + self.handshake_notif.notify(); + } else { + warn!("No session for address {address}"); + } + + Ok(()) + }) + } + + /// Handles an unsubscribe event to characteristic `C2` from the GATT peripheral. + fn on_unsubscribe(&self, address: BtAddr) -> Result<(), Error> { + info!("Unsubscribe request from {address}"); + + self.remove(|session| session.address() == address) + } + + /// Removes all sesssions that match the provided condition. + pub(crate) fn remove(&self, condition: F) -> Result<(), Error> + where + F: Fn(&Session) -> bool, + { + self.sessions.lock(|sessions| { + let mut sessions = sessions.borrow_mut(); + while let Some(index) = sessions.iter().position(&condition) { + let session = sessions.swap_remove(index); + info!("Session {} removed", session.address()); + + self.send_notif.notify(); + } + + Ok(()) + }) + } + + /// Will wait until there is at least one session which has a BTP SDU packet ready for consumption by the Matter stack. + /// + /// `Btp::wait_available` internally delegates to this method. + pub(crate) async fn wait_available(&self) -> Result<(), Error> { + loop { + let available = self.sessions.lock(|sessions| { + sessions + .borrow() + .iter() + .any(|session| session.message_available()) + }); + + if available { + break; + } + + self.available_notif.wait().await; + } + + Ok(()) + } + + /// Receive a Matter (a.k.a. BTP SDU) packet. + /// + /// If there is no packet available, this method will block asynchronously until a packet is available. + /// Returns the size of the received packet, as well as the address of the BLE peer from where the packet originates. + /// + /// `Btp::recv` internally delegates to this method. + pub(crate) async fn recv(&self, buf: &mut [u8]) -> Result<(usize, BtAddr), Error> { + loop { + let result = self.sessions.lock(|sessions| { + let mut sessions = sessions.borrow_mut(); + + let Some(session) = sessions + .iter_mut() + .find(|session| session.message_available()) + else { + return Ok::<_, Error>(None); + }; + + let len = session.fetch_message(buf)?; + + Ok(Some((len, session.address()))) + })?; + + if let Some(result) = result { + break Ok(result); + } + + self.recv_notif.wait().await; + } + } +} diff --git a/rs-matter/src/transport/network/btp/gatt.rs b/rs-matter/src/transport/network/btp/gatt.rs new file mode 100644 index 00000000..9abdace7 --- /dev/null +++ b/rs-matter/src/transport/network/btp/gatt.rs @@ -0,0 +1,220 @@ +/* + * + * Copyright (c) 2020-2022 Project CHIP Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +use core::iter::{empty, once}; + +use crate::{ + data_model::cluster_basic_information::BasicInfoConfig, error::Error, + secure_channel::spake2p::VerifierOption, transport::network::BtAddr, CommissioningData, +}; + +#[cfg(all(feature = "std", target_os = "linux"))] +pub use builtin::BluerGattPeripheral as BuiltinGattPeripheral; + +use super::{GATT_HEADER_SIZE, MAX_BTP_SEGMENT_SIZE}; + +#[cfg(all(feature = "std", target_os = "linux"))] +#[path = "gatt/bluer.rs"] +mod builtin; + +// The 16-bit, registered Matter Service UUID, as per the Matter Core spec. +pub const MATTER_BLE_SERVICE_UUID16: u16 = 0xFFF6; +// A 128-bit expanded representation of the Matter Service UUID. +pub const MATTER_BLE_SERVICE_UUID: u128 = 0x0000FFF600001000800000805F9B34FB; + +/// `C1` characteristic UUID, as per the Matter Core spec. +pub const C1_CHARACTERISTIC_UUID: u128 = 0x18EE2EF5263D4559959F4F9C429F9D11; +/// `C2` characteristic UUID, as per the Matter Core spec. +pub const C2_CHARACTERISTIC_UUID: u128 = 0x18EE2EF5263D4559959F4F9C429F9D12; +/// `C3` characteristic UUID, as per the Matter Core spec. +pub const C3_CHARACTERISTIC_UUID: u128 = 0x64630238877245F2B87D748A83218F04; + +/// The maximum length of packet data written to the `C1` characteristic, as per the Matter Core spec, and as advertised in the GATT service. +pub const C1_MAX_LEN: usize = MAX_BTP_SEGMENT_SIZE + GATT_HEADER_SIZE; +/// The maximum length of packet data indicated via the `C2` characteristic, as per the Matter Core spec, and as advertised in the GATT service. +pub const C2_MAX_LEN: usize = MAX_BTP_SEGMENT_SIZE + GATT_HEADER_SIZE; +/// The maximum length of data read from the `C3` characteristic, as per the Matter Core spec, and as advertised in the GATT service. +pub const C3_MAX_LEN: usize = 512; + +/// Encapsulates the advertising data for the Matter BTP protocol. +/// +/// See section "5.4.2.5.6. Advertising Data" in the Core Matter spec +#[derive(Clone)] +pub struct AdvData { + vid: u16, + pid: u16, + discriminator: u16, + pin: u32, +} + +impl AdvData { + /// Create a new instance by using the provided `BasicInfoConfig` and `CommissioningData`. + pub const fn new(dev_det: &BasicInfoConfig, comm_data: &CommissioningData) -> Self { + Self { + vid: dev_det.vid, + pid: dev_det.pid, + discriminator: comm_data.discriminator, + pin: match comm_data.verifier.data { + VerifierOption::Password(password) => password, + _ => 0, + }, + } + } + + /// The PIN to be used when pairing. + pub const fn pin(&self) -> u32 { + self.pin + } + + /// Return an iterator over the binary representation of the advertising data. + /// + /// As per the Matter Core spec, the advertising data consists of + /// an AD1 record which is of Flags type, and an AD2 record, which is of type UUID16+Service Data + pub fn iter(&self) -> impl Iterator + '_ { + self.flags_iter().chain(self.service_iter()) + } + + /// Return an iterator over the binary representation of the AD1 advertising data (Flags). + /// Useful with GATT stacks that require the advertising data to be reported as separate AD records + pub fn flags_iter(&self) -> impl Iterator + '_ { + empty() + .chain(once(self.flags_payload_iter().count() as u8 + 1)) // 1-byte type + .chain(once(self.flags_adv_type())) + .chain(self.flags_payload_iter()) + } + + /// The AD1 advertising data type (Flags). + pub const fn flags_adv_type(&self) -> u8 { + 0x01 + } + + /// Return an iterator over the binary representation of the AD1 advertising data _payload_. + /// Useful with GATT stacks that require the advertising data to be reported as separate AD records + pub fn flags_payload_iter(&self) -> impl Iterator + '_ { + once(0x06) + } + + /// Return an iterator over the binary representation of the AD2 advertising data (UUID16+Service Data). + pub fn service_iter(&self) -> impl Iterator + '_ { + empty() + .chain(once(self.service_payload_iter().count() as u8 + 3)) // + 1-byte type and 2-bytes Matter UUID16 Service + .chain(once(self.service_adv_type())) + .chain(MATTER_BLE_SERVICE_UUID16.to_le_bytes()) + .chain(self.service_payload_iter()) + } + + /// The AD2 advertising data type (UUID16+Service Data). + pub const fn service_adv_type(&self) -> u8 { + 0x16 + } + + /// Return an iterator over the binary representation of the AD2 advertising data _payload_. + /// Useful with GATT stacks that require the advertising data to be reported as separate AD records + pub fn service_payload_iter(&self) -> impl Iterator + '_ { + [ + 0, // Always 0 = "Commissionable" + self.discriminator.to_le_bytes()[0], + self.discriminator.to_le_bytes()[1], + self.vid.to_le_bytes()[0], + self.vid.to_le_bytes()[1], + self.pid.to_le_bytes()[0], + self.pid.to_le_bytes()[1], + 0, // No additional data + ] + .into_iter() + } +} + +/// A minimal GATT peripheral event. +/// This enum is used to abstract the platform-specific GATT peripheral events. +/// +/// The abstraction is "minimal" in the sense that it is good enough for the purposes of +/// the Matter BTP protocol, but is otherwise not really having the ambition to model all +/// possible events of a generic GATT peripheral, which would result in a much larger API surface. +#[derive(Debug, Clone)] +pub enum GattPeripheralEvent<'a> { + /// A GATT central has subscribed for notifications from characteristic `C2`. + /// In other words, the GATT central is now ready to receive BTP packets. + /// + /// See the Matter Core spec w.r.t. details on characteristic `C2`. + NotifySubscribed(BtAddr), + /// A GATT central has unsubscribed for notifications from characteristic `C2`. + /// In other words, the GATT central is closing the BTP session. + /// + /// See the Matter Core spec w.r.t. details on characteristic `C2`. + NotifyUnsubscribed(BtAddr), + /// A GATT central has requested a Write to characteristic `C1`. + /// In other words, the GATT central had sent a BTP packet. + /// + /// See the Matter Core spec w.r.t. details on characteristic `C1`. + /// + /// `gatt_mtu` is the ATT MTU (contains +3 bytes for the GATT header) + /// as negotiated between the GATT central and the GATT peripheral. + /// Might be `None` if a concrete GATT peripheral implementation does + /// not provide access to this value. In that case, the minimum MTU + /// will be used (23 bytes, including the GATT header). + Write { + address: BtAddr, + data: &'a [u8], + gatt_mtu: Option, + }, +} + +/// A minimal GATT peripheral trait. +/// This trait is used to abstract the platform-specific GATT peripheral implementation. +/// +/// The abstraction is "minimal" in the sense that it is good enough for the purposes of +/// the Matter BTP protocol, but is otherwise not really having the ambition to model all +/// the aspects of a generic GATT peripheral, which would result in a much larger trait. +/// +/// The design of this trait is deliberately chosen to be simple and easy to implement +/// on top of MCU-based GATT peripherals; hence the blocking, callback-based approach for modeling +/// notification subscriptions and chracteristic writes, which - while making the BTP protocol +/// implementation more complex - is a good fit for MCUs where these operations might also be +/// implemented via a callback which cannot await. +pub trait GattPeripheral { + /// Run the GATT peripheral. + /// + /// The implementation of this method is expected to do the following: + /// - GATT peripheral lifecycle: + /// - Start avertising a GATT service with UUID `MATTER_BLE_SERVICE_UUID16`, by utilizing + /// the provided `service_name` and `adv_data` parameters. + /// - Possibly stop advertising the GATT service when the first notification subscription is received. + /// - Stop advertising and tear down the GATT service when the future of this method is dropped. + /// - Gatt peripheral incoming data: + /// - Handle incoming GATT events, and call the provided `callback` function for each event. + /// See `GattPeripheralEvent` for the possible events and their semantics. + /// + /// The callback is constraned to be `Send`, `Sync`, `Clone` and `'static` on purpose, as it might be + /// the case that the GATT implementation needs to invoke the callback from a different thread than the Matter thread, + /// as well as it might need multiple instances of it. + /// Therefore, this constraint is not a problem but an advantage for `GattPeripheral` trait implementors (it is a deliberate design decision). + async fn run( + &self, + service_name: &str, + adv_data: &AdvData, + callback: F, + ) -> Result<(), Error> + where + F: Fn(GattPeripheralEvent) + Send + Sync + Clone + 'static; + + /// Indicate data changes in characteristics `C2` to to a GATT central. + /// In other words, send a BTP packet to a GATT central. + /// + /// See the Matter Core spec w.r.t. details on characteristic C2. + async fn indicate(&self, data: &[u8], address: BtAddr) -> Result<(), Error>; +} diff --git a/rs-matter/src/transport/network/btp/gatt/bluer.rs b/rs-matter/src/transport/network/btp/gatt/bluer.rs new file mode 100644 index 00000000..5a5bf7f4 --- /dev/null +++ b/rs-matter/src/transport/network/btp/gatt/bluer.rs @@ -0,0 +1,383 @@ +/* + * + * Copyright (c) 2020-2022 Project CHIP Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +use core::iter::once; + +use alloc::sync::Arc; + +use bluer::{ + adv::Advertisement, + agent::Agent, + gatt::{ + local::{ + characteristic_control, Application, Characteristic, CharacteristicControl, + CharacteristicControlEvent, CharacteristicNotify, CharacteristicNotifyMethod, + CharacteristicWrite, CharacteristicWriteMethod, Service, + }, + CharacteristicWriter, + }, + Uuid, +}; + +use embassy_futures::select::{select, select_slice, Either}; + +use log::{info, trace, warn}; + +use tokio::io::AsyncWriteExt; +use tokio_stream::StreamExt; + +use crate::{ + error::{Error, ErrorCode}, + transport::network::{btp::context::MAX_BTP_SESSIONS, BtAddr}, + utils::{ifmutex::IfMutex, select::Coalesce, signal::Signal, std_mutex::StdRawMutex}, +}; + +use super::{AdvData, GattPeripheral, GattPeripheralEvent}; +use super::{C1_CHARACTERISTIC_UUID, C2_CHARACTERISTIC_UUID, MATTER_BLE_SERVICE_UUID}; + +const MAX_CONNECTIONS: usize = MAX_BTP_SESSIONS; + +/// The internal state of the peripheral. +/// Arc-ed so as to be thread-safe and to have `'static` interior, as demanded by the BlueR bindings. +struct GattState { + /// The name of the bluetooth adapter to use. If `None`, the default adapter is used. + adapter_name: Option, + /// The list of active notifiers on characteristic `C2`. + notifiers: IfMutex>, + /// A signal necessary so that we can switch between two states: + /// - Indicating data to a notifier + /// - Listening all notifiers for a closed one (i.e. a remote peer had unsubscribed from characteristic `C2`) + notifiers_listen_allowed: Signal, +} + +/// Implements the `GattPeripheral` trait using the BlueZ GATT stack. +#[derive(Clone)] +pub struct BluerGattPeripheral(Arc); + +impl Default for BluerGattPeripheral { + fn default() -> Self { + Self::new(None) + } +} + +impl BluerGattPeripheral { + /// Create a new instance. + pub fn new(adapter_name: Option<&str>) -> Self { + Self(Arc::new(GattState { + adapter_name: adapter_name.map(|name| name.into()), + notifiers: IfMutex::new(heapless::Vec::new()), + notifiers_listen_allowed: Signal::new(true), + })) + } + + /// Runs the GATT peripheral service. + /// What this means in details: + /// - Advertises the service with the provided name and advertising data, where the advertising data + /// contains the elements specified in the Matter Core spec. + /// - Serves a GATT peripheral service with the `C1`, `C2` and `C3` characteristics, as specified + /// in the Matter Core spec. + /// - Calls the provided callback with the events that occur during the service lifetime, on the `C1` + /// and `C2` characteristics. + pub async fn run( + &self, + service_name: &str, + service_adv_data: &AdvData, + callback: F, + ) -> Result<(), Error> + where + F: Fn(GattPeripheralEvent) + Send + Sync + 'static, + { + let _pin = service_adv_data.pin(); + + let session = bluer::Session::new().await?; + + // Need to register our own agent or else the Matter comissioner will not be able to pair with us + session + .register_agent(Agent { + //request_passkey: Some(Box::new(move |_| Box::pin(async move { Ok(pin) }))), + ..Default::default() + }) + .await?; + + let adapter = if let Some(adapter_name) = self.0.adapter_name.as_ref() { + session.adapter(adapter_name)? + } else { + session.default_adapter().await? + }; + + adapter.set_powered(true).await?; + + info!( + "Advertising on Bluetooth adapter {} with address {}", + adapter.name(), + BtAddr(adapter.address().await?.0) + ); + + let le_advertisement = Advertisement { + discoverable: Some(true), + local_name: Some(service_name.into()), + service_data: once(( + Uuid::from_u128(MATTER_BLE_SERVICE_UUID), + service_adv_data.service_payload_iter().collect(), + )) + .collect(), + ..Default::default() + }; + + // TODO: Stop advertizing after the first connection? + let _adv_handle = adapter.advertise(le_advertisement).await?; + + info!( + "Serving GATT echo service on Bluetooth adapter {}", + adapter.name() + ); + + let callback_w = Arc::new(callback); + let callback_n = callback_w.clone(); + let callback_s = callback_w.clone(); + + let (notify, notify_handle) = characteristic_control(); + + // Service and characteristics as per the Matter Core spec + let app = Application { + services: vec![Service { + uuid: Uuid::from_u128(MATTER_BLE_SERVICE_UUID), + primary: true, + characteristics: vec![ + Characteristic { + uuid: Uuid::from_u128(C1_CHARACTERISTIC_UUID), + write: Some(CharacteristicWrite { + write: true, + method: CharacteristicWriteMethod::Fun(Box::new( + move |new_value, req| { + let address = BtAddr(req.device_address.0); + let data = &new_value; + + trace!("Got write request from {address}: {data:02x?}"); + + // Notify the BTP protocol implementation for the write + callback_w(GattPeripheralEvent::Write { + gatt_mtu: Some(req.mtu), + address, + data, + }); + + // We don't need a future because the callback is synchronous + Box::pin(core::future::ready(Ok(()))) + }, + )), + ..Default::default() + }), + ..Default::default() + }, + Characteristic { + uuid: Uuid::from_u128(C2_CHARACTERISTIC_UUID), + notify: Some(CharacteristicNotify { + indicate: true, + // Reason why we don't use the (simpler) callback-based approach here: + // The callback approach does not provide us with access to the remote peer address + // when a notification subscription is received. This is necessary for the Matter BTP protocol + // to work correctly. + // + // Restriction seems to come from BlueZ dBus bindings, where their `StartNotify` method does not + // provide the address of the remote peer, nor any other peer properties thereof. + method: CharacteristicNotifyMethod::Io, + ..Default::default() + }), + control_handle: notify_handle, + ..Default::default() + }, + // Characteristic { + // uuid: Uuid::from_u128(C3_CHARACTERISTIC_UUID), + // read: Some(CharacteristicRead { + // method: CharacteristicReadMethod::Io, + // ..Default::default() + // }), + // control_handle: write_handle, + // ..Default::default() + // }, + ], + ..Default::default() + }], + ..Default::default() + }; + + let _app_handle = adapter.serve_gatt_application(app).await?; + + select( + self.closed(callback_s), + self.pull_notify(notify, callback_n), + ) + .coalesce() + .await + } + + /// Indicate new data on characteristic `C2` to a remote peer. + pub async fn indicate(&self, data: &[u8], address: BtAddr) -> Result<(), Error> { + self.0.notifiers_listen_allowed.modify(|listen| { + *listen = false; + + (true, ()) + }); + + let mut notifiers = self.0.notifiers.lock().await; + + let result = if let Some(notifier) = notifiers + .iter_mut() + .find(|notifier| notifier.device_address().0 == address.0) + { + notifier.write_all(data).await.map_err(|e| e.into()) + } else { + Err(Error::new(ErrorCode::NoNetworkInterface)) + }; + + self.0.notifiers_listen_allowed.modify(|listen| { + *listen = true; + + (true, ()) + }); + + result?; + + trace!("Indicated {data:02x?} bytes to address {address}"); + + Ok(()) + } + + /// Handle a new subscription to the `C2` characteristic + /// by registering the notifier in the internal state. + async fn add_notifier(&self, notifier: CharacteristicWriter) { + // Tell the `Self::closed` method to unlock the `notifiers` mutex + self.0.notifiers_listen_allowed.modify(|listen| { + *listen = false; + + (true, ()) + }); + + let mut notifiers = self.0.notifiers.lock().await; + + let address = BtAddr(notifier.device_address().0); + + if notifiers.len() < MAX_CONNECTIONS { + // Unwraping is safe because we just checked the length + notifiers.push(notifier).map_err(|_| ()).unwrap(); + trace!("Notify connection from address {address} started"); + } else { + warn!("Notifiers limit reached; ignoring notifier from address {address}"); + } + + drop(notifiers); + + // `Self::close` can listen again for closed connections + self.0.notifiers_listen_allowed.modify(|listen| { + *listen = true; + + (true, ()) + }); + } + + /// Pull new subscription notifications from the `C2` characteristic. + async fn pull_notify( + &self, + mut notify: CharacteristicControl, + callback: Arc, + ) -> Result<(), Error> + where + F: Fn(GattPeripheralEvent) + Send + Sync + 'static, + { + while let Some(event) = notify.next().await { + match event { + // Should never happen, as characteristic `C2` is not marked as capable of taking writes. + CharacteristicControlEvent::Write(_) => unreachable!(), + CharacteristicControlEvent::Notify(writer) => { + let address = BtAddr(writer.device_address().0); + + self.add_notifier(writer).await; + + // Notify the BTP protocol implementation + callback(GattPeripheralEvent::NotifySubscribed(address)); + } + } + } + + Ok(()) + } + + /// Listen for stopped connections (i.e. unsubscriptions from characteristic `C2`). + async fn closed(&self, callback: Arc) -> Result<(), Error> + where + F: Fn(GattPeripheralEvent) + Send + Sync + 'static, + { + loop { + // Wait until we are allowed to listen for closed connections + self.0 + .notifiers_listen_allowed + .wait(|allowed| (*allowed).then_some(())) + .await; + + { + let mut notifiers = self.0.notifiers.lock().await; + + let notifiers_listen_allowed = self + .0 + .notifiers_listen_allowed + .wait(|allowed| (!*allowed).then_some(())); + + let mut closed = notifiers + .iter() + .map(|notifier| notifier.closed()) + .collect::>(); + + // Await until we are no longer allowed to await (future notifiers_listen_allowed) + // or until we have a closed notifier + let result = select(notifiers_listen_allowed, select_slice(&mut closed)).await; + + match result { + // No longer allowed to await for closed connections, wait until we are allowed again + Either::First(_) => continue, + Either::Second((_, index)) => { + // Remove the closed notifier + + let address = BtAddr(notifiers[index].device_address().0); + + drop(closed); + + notifiers.swap_remove(index); + + // Notify the BTP protocol implementation + callback(GattPeripheralEvent::NotifyUnsubscribed(address)); + + trace!("Notify connection from address {address} stopped"); + } + } + } + } + } +} + +impl GattPeripheral for BluerGattPeripheral { + async fn run(&self, service_name: &str, adv_data: &AdvData, callback: F) -> Result<(), Error> + where + F: Fn(GattPeripheralEvent) + Send + Sync + 'static, + { + BluerGattPeripheral::run(self, service_name, adv_data, callback).await + } + + async fn indicate(&self, data: &[u8], address: BtAddr) -> Result<(), Error> { + BluerGattPeripheral::indicate(self, data, address).await + } +} diff --git a/rs-matter/src/transport/network/btp/session.rs b/rs-matter/src/transport/network/btp/session.rs new file mode 100644 index 00000000..ecddf903 --- /dev/null +++ b/rs-matter/src/transport/network/btp/session.rs @@ -0,0 +1,625 @@ +/* + * + * Copyright (c) 2020-2022 Project CHIP Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +use core::cmp::min; + +use embassy_time::{Duration, Instant}; + +use log::{info, warn}; + +use crate::error::{Error, ErrorCode}; +use crate::transport::network::btp::session::packet::{HandshakeReq, HandshakeResp}; +use crate::transport::network::btp::{GATT_HEADER_SIZE, MAX_MTU, MIN_MTU}; +use crate::transport::network::{BtAddr, MAX_RX_PACKET_SIZE}; +use crate::utils::{ringbuf::RingBuf, writebuf::WriteBuf}; + +use self::packet::BtpHdr; + +mod packet; + +/// Matter Core spec constant: +/// The maximum amount of time after receipt of a segment before a stand-alone ACK must be sent. +const BTP_ACK_TIMEOUT_SECS: usize = BTP_CONN_IDLE_TIMEOUT_SECS / 2; +/// Matter Core spec constant: +/// The maximum amount of time no unique data has been sent over a BTP session before the +/// Central Device must close the BTP session. +const BTP_CONN_IDLE_TIMEOUT_SECS: usize = 30; + +/// Represents the three possible states of each BTP session +#[derive(Debug)] +enum SessionState { + /// The session was just created as a result of a remote peer writing a BTP Handshake Request SDU + /// to characteristic `C1`. + New, + /// After sending the BTP Handshake SDU, the remote peer now also subscribed to characteristic `C2`. + Subscribed, + /// The session is fully established and data can be exchanged, as we have sent (a.k.a. indicated) + /// to the remote peer a BTP Handshake Response SDU, via characteristic `C2`. + Running, +} + +/// Represents the sending window of a BTP session, as per the Matter Core spec. +#[derive(Debug)] +struct SendWindow { + /// The negotiated window size + window_size: u8, + /// The current level of the window. 0 means the window is completely full + level: u8, + /// The last sequence number sent + last_sent_seq_num: u8, + /// The instant when the last BTP segment was sent. `Instant::MAX` means no segment was received yet. + sent_at: Instant, + /// Whether the session is currently locked for sending + sending: bool, +} + +impl SendWindow { + /// Initialize a new sending window with the provided window size + const fn new(window_size: u8) -> Self { + Self { + window_size, + level: window_size, + last_sent_seq_num: 255, + sent_at: Instant::MAX, + sending: false, + } + } + + /// Update the sending window level when a new BTP segment had arrived, + /// based on the ACK seq num in the incoming packet (if any). + fn accept_incoming(&mut self, hdr: &BtpHdr) { + let Some(ack_seq_num) = hdr.get_ack() else { + return; + }; + + if self.last_sent_seq_num == ack_seq_num { + self.level = self.window_size; + self.sent_at = Instant::MAX; + } else { + let distance = if ack_seq_num > self.last_sent_seq_num { + 255 - ack_seq_num + self.last_sent_seq_num + 1 + } else { + self.last_sent_seq_num - ack_seq_num + }; + + self.level = self.window_size - distance; + self.sent_at = Instant::now(); + } + } + + /// Return true if the sending window is full. + /// + /// A reference to the receiving window is necessary, because - as per the Matter Core spec - + /// the window is considered also full at level = 1 if the receiving window does not have + /// a pending ACK. + fn is_full(&self, recv_window: &RecvWindow) -> bool { + self.level == 0 || self.level == 1 && recv_window.ack_level == 0 + } + + /// Return the next sequence to be used when sending a BTP segment. + fn next_seq_num(&self) -> u8 { + self.last_sent_seq_num.wrapping_add(1) + } + + /// Update the state of the window after sending a BTP segment. + /// Basically decreases the window level, updates the next sequence num and + /// records the current instant as the time when the last BTP segment was sent. + fn post_send(&mut self) { + self.level -= 1; + self.last_sent_seq_num = self.last_sent_seq_num.wrapping_add(1); + self.sent_at = Instant::now(); + } +} + +/// Enough room for one full Matter message + one extra +const MAX_MESSAGE_SIZE: usize = MAX_RX_PACKET_SIZE * 2; + +/// Represents the receiving window of a BTP session, as per the Matter Core spec. +#[derive(Debug)] +struct RecvWindow { + /// A ring-buffer holding all received BTP segment' payloads, including not been fully processed yet + buf: RingBuf, + /// The number of complete Matter messages (i.e. BTP SDU payloads) currently in the buffer + buf_messages_ct: u8, + /// The current level of the window. 0 means the window is completely full + level: u8, + /// The level of the window that would be re-gained when sending ACK for the sequence kept in `ack_seq` + ack_level: u8, + /// The sequence that should be ACKed. If `ack_level` is 0, this is not used. + ack_seq: u8, + /// The instant when the last BTP segment was received. `Instant::MAX` means no packet was received yet. + received_at: Instant, + /// The remaining length of the current SDU being received. Used for packet validity checking only. + rem_msg_len: u16, +} + +impl RecvWindow { + /// Initialize a new receiving window with the provided window size. + #[inline(always)] + pub const fn new(window_size: u8) -> Self { + Self { + buf: RingBuf::new(), + buf_messages_ct: 0, + level: window_size, + ack_level: 0, + ack_seq: 255, + received_at: Instant::MAX, + rem_msg_len: 0, + } + } + + /// Process an incoming BTP segment, updating the state of the window accordingly. + fn accept_incoming(&mut self, hdr: &BtpHdr, payload: &[u8], mtu: u16) -> Result<(), Error> { + // Check received packet integrity, as per the Matter Core spec + self.check_data_integrity(hdr, payload, mtu)?; + + if let Some(msg_len) = hdr.get_msg_len() { + if msg_len <= mtu && !hdr.is_final() { + // Additional packet integrity: an SDU that fits in a single BTP segment must be final + Err(ErrorCode::InvalidData)?; + } + + self.rem_msg_len = msg_len; + + if msg_len > 0 { + if self.buf.free() >= core::mem::size_of::() { + // New SDU; skip 0-length ones as they do not contain Matter messages + self.buf.push(&u16::to_le_bytes(msg_len)); + } else { + Err(ErrorCode::NoSpace)?; + } + } + } + + if self.rem_msg_len < payload.len() as u16 { + // Additional packet integrity: the packet contains more data than the message length + Err(ErrorCode::InvalidData)?; + } else { + self.rem_msg_len -= payload.len() as u16; + if hdr.is_final() && self.rem_msg_len > 0 { + // Additional packet integrity: the packet is final but the message length is not reached + Err(ErrorCode::InvalidData)?; + } + } + + if self.buf.free() >= payload.len() { + self.buf.push(payload); + } else { + Err(ErrorCode::NoSpace)?; + } + + self.level -= 1; + // Unwrap is safe because we are only processing BTP data segments here and they always have a sequence number + self.ack_seq = hdr.get_seq().unwrap(); + self.ack_level += 1; + self.received_at = Instant::now(); + + if hdr.is_final() && !payload.is_empty() { + self.buf_messages_ct += 1; + } + + Ok(()) + } + + fn check_data_integrity(&self, hdr: &BtpHdr, payload: &[u8], mtu: u16) -> Result<(), Error> { + if hdr.is_handshake() // Handshake packets are not allowed here + || hdr.get_opcode().is_some() + { + // Data and standalone ACK packets must not have an opcode + return Err(ErrorCode::InvalidData.into()); + } + + if hdr.is_standalone_ack() { + if !payload.is_empty() { + // Standalone ACKs don't have a payload + return Err(ErrorCode::InvalidData.into()); + } + } else { + if hdr.get_msg_len().is_none() && !hdr.is_continue() && !hdr.is_final() { + // Should have at least one of the three flags raised + return Err(ErrorCode::InvalidData.into()); + } + + if hdr.get_msg_len().is_some() && hdr.is_continue() { + // Cannot be a beginning and a continuation + return Err(ErrorCode::InvalidData.into()); + } + + if !hdr.is_final() && payload.len() + hdr.len() != mtu as _ { + // Non-final packets should have a size equal to the MTU size + return Err(ErrorCode::InvalidData.into()); + } + } + + if hdr + .get_seq() + .map(|seq| self.ack_seq.wrapping_add(1) != seq) + .unwrap_or(true) + { + // Data packets must have a sequence number which is equal to the last one received + 1 + return Err(ErrorCode::InvalidData.into()); + } + + Ok(()) + } + + fn check_handshake_integrity(hdr: &BtpHdr) -> Result<(), Error> { + if !hdr.is_handshake() // Data packets are not allowed here + || !hdr.is_final() // Handshake packets must be final + || !matches!(hdr.get_opcode(), Some(0x6c)) // Handshake packets must have (the only existing) opcode 0x6c + || hdr.get_msg_len().is_some() // Handshake packets must not have a message length + || hdr.is_continue() // Handshake packets must not be continue packets + || hdr.get_seq().is_some() // Handshake packets must not have a sequence number + || hdr.get_ack().is_some() + // Handshake packets must not have an ACK + { + return Err(ErrorCode::InvalidData.into()); + } + + Ok(()) + } + + /// Return the sequence number that should be ACKed, if any. + fn pending_ack(&self) -> Option { + if self.ack_level > 0 && self.buf_messages_ct == 0 { + // Do not send ACKs when there is one complete message in the buffer, or else we risk overflowing the buffer + Some(self.ack_seq) + } else { + None + } + } + + /// Update the state of the window after sending a BTP segment. + /// Basically increases the window level, based on the last ACKed sequence num. + fn post_send(&mut self) { + if self.pending_ack().is_some() { + self.level += self.ack_level; + self.ack_level = 0; + } + } + + /// Pops and fetches one SDU payload (a.k.a. a Matter message) from the front of the buffer. + /// Returns the size of the fetched SDU payload, or the buffer size if the SDU is larger. + /// + /// If there are no complete SDUs inside the buffer, the method will return 0. + fn fetch_message(&mut self, buf: &mut [u8]) -> Result { + if self.buf_messages_ct == 0 { + return Ok(0); + } + + let len = u16::from_le_bytes([ + self.buf.pop_byte().ok_or(ErrorCode::Invalid)?, + self.buf.pop_byte().ok_or(ErrorCode::Invalid)?, + ]) as usize; + + let pop_len = min(len, buf.len()); + + if self.buf.pop(&mut buf[..pop_len]) != pop_len { + Err(ErrorCode::Invalid)?; + } + + if pop_len < len { + warn!("Truncating packet"); + + for _ in pop_len..len { + if self.buf.pop_byte().is_none() { + Err(ErrorCode::Invalid)?; + } + } + } + + self.buf_messages_ct -= 1; + + Ok(pop_len) + } +} + +/// Represents a BTP Session, as per the MAtter Core spec. +#[derive(Debug)] +pub struct Session { + address: BtAddr, + state: SessionState, + version: u8, + mtu: u16, + window_size: u8, + recv_window: RecvWindow, + send_window: SendWindow, +} + +impl Session { + /// Initialize a new BTP session with the provided address, version, MTU and window size. + /// + /// Initializing a session is done based on the data that had arrived in the Handshake Request message, + /// written by a remote peer on the `C1` characteristic. + #[inline(always)] + const fn new(address: BtAddr, version: u8, mtu: u16, window_size: u8) -> Self { + Self { + address, + state: SessionState::New, + version, + mtu, + window_size, + recv_window: RecvWindow::new(window_size), + send_window: SendWindow::new(window_size), + } + } + + /// Return the address of the remote peer. + pub fn address(&self) -> BtAddr { + self.address + } + + /// Return true if this session is in a state where we need to send a BTP Handshake Response message + /// to the remote peer (i.e. the remote peer did subscribe to characteristic `C2`). + pub fn is_handshake_resp_due(&self) -> bool { + matches!(self.state, SessionState::Subscribed) + } + + /// Return true if this session is in a state where an ACK is available and needs to be sent immediately. + /// I.e. the inactivity timeout had expired, or the window is full. + pub fn is_ack_due(&self, now: Instant) -> bool { + matches!(self.state, SessionState::Running) + && self.recv_window.pending_ack().is_some() + && (self.recv_window.level <= 1 + || self + .recv_window + .received_at + .checked_add(Duration::from_secs(BTP_ACK_TIMEOUT_SECS as _)) + .map(|expires| expires <= now) + .unwrap_or(false)) + } + + /// Return true if this session needs to be removed due to inactivity. + /// (I.e. the remote peer did not sent an ACK in due time.) + pub fn is_timed_out(&self, now: Instant) -> bool { + self.send_window + .sent_at + .checked_add(Duration::from_secs(BTP_CONN_IDLE_TIMEOUT_SECS as _)) + .map(|expires| expires < now) + .unwrap_or(false) + } + + /// Set the session in subscribed state. + /// This method should be called when the remote peer subscribes to the `C2` characteristic. + /// + /// Will return false if the current state of the session is not `New`. + pub fn set_subscribed(&mut self) -> bool { + if matches!(self.state, SessionState::New) { + self.state = SessionState::Subscribed; + true + } else { + false + } + } + + /// Set the session in running state. + /// This method should be called after we had sent the BTP Handshake Response message to the remote peer. + /// Calling this method on an already running session has no effect. + /// + /// Will return false if the current state of the session is not `Subscribed` or `Running`. + pub fn set_running(&mut self) -> bool { + if matches!(self.state, SessionState::Running | SessionState::Subscribed) { + self.state = SessionState::Running; + true + } else { + false + } + } + + /// Lock the session for sending. + /// (As per the Matter Core spec, at any moment in time, a session + /// can send the BTP segments of a single BTP SDU, hence the need for locking.) + /// + /// Will return `false` if the session is already locked for sending. + pub fn set_sending(&mut self, sending: bool) -> bool { + if sending { + if self.send_window.sending { + false + } else { + self.send_window.sending = true; + true + } + } else { + self.send_window.sending = false; + true + } + } + + /// Return `true` if the session buffer contains at least one complete DSDU, for consumption by the + /// Matter transport stack. + pub fn message_available(&self) -> bool { + matches!(self.state, SessionState::Running) && self.recv_window.buf_messages_ct > 0 + } + + /// Fetches the data of the first DSDU available in the buffer of the session. + /// Returns the size of the fetched data. + /// + /// If there is no DSDU available, the method will return 0. + pub fn fetch_message(&mut self, buf: &mut [u8]) -> Result { + self.recv_window.fetch_message(buf) + } + + /// A utility method to check if the provided BTP segment represents a Handshake Request message. + /// + /// Alleviates the need to expose the `BtpHdr` struct to the outside world. + pub fn is_handshake(data: &[u8]) -> Result { + let hdr = BtpHdr::from(data.iter().copied())?; + + Ok(hdr.is_handshake()) + } + + /// Process an incoming BTP segment of type Handshake Request, updating the state of the session accordingly. + pub fn process_rx_handshake( + address: BtAddr, + data: &[u8], + gatt_mtu: Option, + ) -> Result { + let mut iter = data.iter(); + + let hdr = BtpHdr::from((&mut iter).copied())?; + let payload = iter.as_slice(); + + // Check received packet integrity, as per the Matter Core spec + RecvWindow::check_handshake_integrity(&hdr)?; + + let req = HandshakeReq::from(payload.iter().copied())?; + + let version = req.versions().min().unwrap_or(4); + + let mtu = if gatt_mtu.map(|gatt_mtu| gatt_mtu != req.mtu).unwrap_or(true) { + // We don't know our MTU or what we know is not what the other peer reports + // => use the minimum MTU + MIN_MTU + } else { + // Used MTU should not be bigger than the maximum allowed + min(req.mtu, MAX_MTU) + }; + + // Remove the header as we need to report back the payload MTU + // and we'll use the payload MTU anyway for all operations + let mtu = mtu - GATT_HEADER_SIZE as u16; + + // Make sure we are using a window size that would allow us to receive at least one full BTP SDU + // TODO: Revisit the mtu and window_size computations + let window_size = min( + req.window_size, + min(MAX_MESSAGE_SIZE as u16 / mtu / 2, 255) as u8, + ); + + info!("\n>>>>> (BTP IO) {address} [{hdr}]\nHANDSHAKE REQ {req:?}\nSelected version: {version}, MTU: {mtu}, window size: {window_size}"); + + Ok(Self::new(address, version, mtu, window_size)) + } + + /// Process an incoming BTP segment of a regular data or ACK type, updating the state of the session accordingly. + pub fn process_rx_data(&mut self, data: &[u8]) -> Result<(), Error> { + let mut iter = data.iter(); + + let hdr = BtpHdr::from((&mut iter).copied())?; + let payload = iter.as_slice(); + + info!( + "\n>>>>> (BTP IO) {} [{hdr}]\nREAD {}B", + self.address, + payload.len() + ); + + self.recv_window.accept_incoming(&hdr, payload, self.mtu)?; + self.send_window.accept_incoming(&hdr); + + Ok(()) + } + + /// Prepare a BTP segment to be sent as a response to a Handshake Request message. + pub fn prep_tx_handshake<'s>(&mut self, buf: &'s mut [u8]) -> Result<&'s [u8], Error> { + let resp = HandshakeResp { + version: self.version, + mtu: self.mtu, + window_size: self.window_size, + }; + + let mut wb = WriteBuf::new(buf); + + let mut hdr = BtpHdr::new(); + hdr.set_handshake(); + hdr.set_opcode(Some(0x6c)); + + info!( + "\n<<<<< (BTP IO) {} [{hdr}]\nHANDSHAKE RESP {resp:?}", + self.address + ); + + hdr.encode(&mut wb)?; + resp.encode(&mut wb)?; + + self.send_window.post_send(); + + let len = wb.get_tail(); + let slice = &buf[..len]; + + Ok(slice) + } + + /// Prepare a BTP segment to be sent as a regular data or a standalone ACK message. + /// The data to be sent will be "chopped off" from the provided `data` slice, starting from offset `offset`. + /// + /// When the `data` slice is empty, the method will prepare a standalone ACK message. + /// + /// The method will return `None` if the session is not in a state where it can send data (i.e. send window full). + /// In case the session is in a state where it can send data, the method will return the slice of the input `buf` + /// filled with the binary BTP segment, and the new offset. + pub fn prep_tx_data<'s>( + &mut self, + data: &[u8], + offset: usize, + buf: &'s mut [u8], + ) -> Result, Error> { + if self.send_window.is_full(&self.recv_window) { + return Ok(None); + } + + let mut hdr = BtpHdr::new(); + + hdr.set_seq(Some(self.send_window.next_seq_num())); + hdr.set_ack(self.recv_window.pending_ack()); + + let segment_data = if !data.is_empty() { + // Enhance to a data packet + + if offset == 0 { + hdr.set_msg_len(Some(data.len() as u16)); + } else { + hdr.set_continue(); + } + + let remaining_data = &data[offset..]; + + let max_payload_len = self.mtu as usize - hdr.len(); + + let chunk_end = min(remaining_data.len(), max_payload_len); + + if chunk_end == remaining_data.len() { + hdr.set_final(); + } + + &remaining_data[..chunk_end] + } else { + // ACK packet + + &[] + }; + + let mut wb = WriteBuf::new(buf); + + hdr.encode(&mut wb)?; + wb.append(segment_data)?; + + info!( + "\n<<<<< (BTP IO) {} [{hdr}]\nWRITE {}B", + self.address, + segment_data.len() + ); + + self.send_window.post_send(); + self.recv_window.post_send(); + + let len = wb.get_tail(); + let slice = &buf[..len]; + + Ok(Some((slice, offset + segment_data.len()))) + } +} diff --git a/rs-matter/src/transport/network/btp/session/packet.rs b/rs-matter/src/transport/network/btp/session/packet.rs new file mode 100644 index 00000000..1cfef15a --- /dev/null +++ b/rs-matter/src/transport/network/btp/session/packet.rs @@ -0,0 +1,461 @@ +/* + * + * Copyright (c) 2020-2022 Project CHIP Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +use core::fmt; + +use bitflags::bitflags; + +use log::trace; + +use crate::error::{Error, ErrorCode}; +use crate::utils::writebuf::WriteBuf; + +bitflags! { + /// Models the flags in the BTP header. + /// + /// Consult the Matter Core Specification for more information. + #[repr(transparent)] + #[derive(Default, Debug, Clone, Copy, PartialEq, Eq, Hash)] + pub struct BtpFlags: u8 { + const HANDSHAKE = 0x40; + const MANAGEMENT = 0x20; + const ACK = 0x08; + const ENDING_SEGMENT = 0x04; + // NOTE: NOT documented in the Matter Core Spec but specified here: + // https://github.com/project-chip/connectedhomeip/blob/master/src/ble/BtpEngine.h#L83 + const CONTINUE = 0x02; + const BEGINNING_SEGMENT = 0x01; + } +} + +impl fmt::Display for BtpFlags { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let mut sep = false; + for flag in [ + Self::HANDSHAKE, + Self::MANAGEMENT, + Self::ACK, + Self::BEGINNING_SEGMENT, + Self::CONTINUE, + Self::ENDING_SEGMENT, + ] { + if self.contains(flag) { + if sep { + write!(f, "|")?; + } + + let str = match flag { + Self::HANDSHAKE => "H", + Self::MANAGEMENT => "M", + Self::ACK => "A", + Self::BEGINNING_SEGMENT => "B", + Self::CONTINUE => "C", + Self::ENDING_SEGMENT => "E", + _ => "?", + }; + + write!(f, "{}", str)?; + sep = true; + } + } + + Ok(()) + } +} + +/// Models the BTP header. +#[derive(Debug, Default, Clone)] +pub struct BtpHdr { + flags: BtpFlags, + opcode: u8, + ack_num: u8, + seq_num: u8, + msg_len: u16, +} + +impl BtpHdr { + /// Create a new BTP header. + #[inline(always)] + pub const fn new() -> Self { + Self { + flags: BtpFlags::empty(), + opcode: 0, + ack_num: 0, + seq_num: 0, + msg_len: 0, + } + } + + /// Decode a BTP header from an iterator of bytes. + pub fn from(msg: I) -> Result + where + I: Iterator, + { + let mut hdr = Self::new(); + + hdr.decode(msg)?; + + Ok(hdr) + } + + /// Return `true` if the BTP header indicates a handshake message (request or response). + pub fn is_handshake(&self) -> bool { + self.flags.contains(BtpFlags::HANDSHAKE) + } + + /// Set the BTP header to indicate a handshake message (request or response). + pub fn set_handshake(&mut self) { + self.flags |= BtpFlags::HANDSHAKE | BtpFlags::BEGINNING_SEGMENT | BtpFlags::ENDING_SEGMENT; + } + + /// Get the opcode from the BTP header. + /// An opcode will be present only if the header indicates a management message. + pub fn get_opcode(&self) -> Option { + self.flags + .contains(BtpFlags::MANAGEMENT) + .then_some(self.opcode) + } + + /// Set (or clear) the opcode in the BTP header. + /// This automatically marks/unmarks the message as a management message. + pub fn set_opcode(&mut self, opcode: Option) { + if let Some(opcode) = opcode { + self.flags |= BtpFlags::MANAGEMENT; + self.opcode = opcode + } else { + self.flags.remove(BtpFlags::MANAGEMENT); + self.opcode = 0; + } + } + + /// Get the acknowledgement number from the BTP header. + /// An acknowledgement number will be present only if the header indicates an acknowledgement. + pub fn get_ack(&self) -> Option { + self.flags.contains(BtpFlags::ACK).then_some(self.ack_num) + } + + /// Set (or clear) the acknowledgement number in the BTP header. + /// This automatically marks/unmarks the message with the acknowledgement flag. + pub fn set_ack(&mut self, ack_num: Option) { + if let Some(ack_num) = ack_num { + self.flags |= BtpFlags::ACK; + self.ack_num = ack_num; + } else { + self.flags.remove(BtpFlags::ACK); + self.ack_num = 0; + } + } + + /// Get the sequence number from the BTP header. + /// A sequence number will be present only if the header does not indicate a handshake message. + pub fn get_seq(&self) -> Option { + (!self.flags.contains(BtpFlags::HANDSHAKE)).then_some(self.seq_num) + } + + /// Set (or clear) the sequence number in the BTP header. + /// This automatically marks/unmarks the message as a handshake message. + pub fn set_seq(&mut self, seq_num: Option) { + if let Some(seq_num) = seq_num { + self.flags.remove(BtpFlags::HANDSHAKE); + self.seq_num = seq_num; + } else { + self.flags |= BtpFlags::HANDSHAKE; + self.seq_num = 0; + } + } + + /// Indicate that the BTP header is a standalone acknowledgement. + /// Turns out this is possible even if not clearly specified ion the Matter Core spec. + /// Standalone ACKs seem to have the following properties: + /// - No message length + /// - No continue flag + /// - No final segment + /// - An acknowledgement number + /// - A sequence number (obviously) + pub fn is_standalone_ack(&self) -> bool { + !self.is_handshake() + && self.get_msg_len().is_none() + && !self.is_continue() + && !self.is_final() + && self.get_ack().is_some() + } + + /// Get the message length from the BTP header. + /// A message length will be present only if the header indicates a beginning segment and the header does not + /// indicate a handshake message. + pub fn get_msg_len(&self) -> Option { + (self.flags.contains(BtpFlags::BEGINNING_SEGMENT) + && !self.flags.contains(BtpFlags::HANDSHAKE)) + .then_some(self.msg_len) + } + + /// Set (or clear) the message length in the BTP header. + /// This automatically marks/unmarks the message as a beginning segment. + pub fn set_msg_len(&mut self, msg_len: Option) { + if let Some(msg_len) = msg_len { + self.flags |= BtpFlags::BEGINNING_SEGMENT; + self.msg_len = msg_len; + } else { + self.flags.remove(BtpFlags::BEGINNING_SEGMENT); + self.msg_len = 0; + } + } + + /// Return `true` if the BTP header indicates a continuation segment. + /// Not specified in the Matter Core spec, but apparently exists. + pub fn is_continue(&self) -> bool { + self.flags.contains(BtpFlags::CONTINUE) + } + + /// Set the BTP header to indicate a continuation segment. + /// Not specified in the Matter Core spec, but apparently exists. + pub fn set_continue(&mut self) { + self.flags |= BtpFlags::CONTINUE; + } + + /// Return `true` if the BTP header indicates the final segment of a message. + pub fn is_final(&self) -> bool { + self.flags.contains(BtpFlags::ENDING_SEGMENT) + } + + /// Set the BTP header to indicate the final segment of a message. + pub fn set_final(&mut self) { + self.flags |= BtpFlags::ENDING_SEGMENT; + } + + /// Load the header from a byte iterator. + fn decode(&mut self, mut msg: I) -> Result<(), Error> + where + I: Iterator, + { + self.flags = BtpFlags::from_bits_truncate(msg.next().ok_or(ErrorCode::Invalid)?); + + if self.flags.contains(BtpFlags::MANAGEMENT) { + self.opcode = msg.next().ok_or(ErrorCode::Invalid)?; + } + + if self.flags.contains(BtpFlags::ACK) { + self.ack_num = msg.next().ok_or(ErrorCode::Invalid)?; + } + + if !self.flags.contains(BtpFlags::HANDSHAKE) { + self.seq_num = msg.next().ok_or(ErrorCode::Invalid)?; + } + + if self.flags.contains(BtpFlags::BEGINNING_SEGMENT) + && !self.flags.contains(BtpFlags::HANDSHAKE) + { + let msg_len = [ + msg.next().ok_or(ErrorCode::Invalid)?, + msg.next().ok_or(ErrorCode::Invalid)?, + ]; + + self.msg_len = u16::from_le_bytes(msg_len); + } + + trace!("[decode] {}", self); + Ok(()) + } + + /// Encode the header into a byte buffer. + pub fn encode(&self, resp_buf: &mut WriteBuf) -> Result<(), Error> { + trace!("[encode] {}", self); + + resp_buf.le_u8(self.flags.bits())?; + + if self.flags.contains(BtpFlags::MANAGEMENT) { + resp_buf.le_u8(self.opcode)?; + } + + if self.flags.contains(BtpFlags::ACK) { + resp_buf.le_u8(self.ack_num)?; + } + + if !self.flags.contains(BtpFlags::HANDSHAKE) { + resp_buf.le_u8(self.seq_num)?; + } + + if self.flags.contains(BtpFlags::BEGINNING_SEGMENT) + && !self.flags.contains(BtpFlags::HANDSHAKE) + { + resp_buf.le_u16(self.msg_len)?; + } + + Ok(()) + } + + /// Return the length of the encoded header in bytes. + pub fn len(&self) -> usize { + let mut len = 1; // Flags + + if self.flags.contains(BtpFlags::MANAGEMENT) { + len += 1; + } + + if self.flags.contains(BtpFlags::ACK) { + len += 1; + } + + if !self.flags.contains(BtpFlags::HANDSHAKE) { + len += 1; + } + + if self.flags.contains(BtpFlags::BEGINNING_SEGMENT) + && !self.flags.contains(BtpFlags::HANDSHAKE) + { + len += 2; + } + + len + } +} + +impl fmt::Display for BtpHdr { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + if !self.flags.is_empty() { + write!(f, "{}", self.flags)?; + } + + if let Some(opcode) = self.get_opcode() { + write!(f, ",OP:{:x}", opcode)?; + } + + if let Some(ack_num) = self.get_ack() { + write!(f, ",ACTR:{:x}", ack_num)?; + } + + if let Some(seq_num) = self.get_seq() { + write!(f, ",CTR:{:x}", seq_num)?; + } + + if let Some(msg_len) = self.get_msg_len() { + write!(f, ",LEN:{:x}", msg_len)?; + } + + Ok(()) + } +} + +/// Models the BTP handshake request. +#[derive(Debug, Default)] +pub struct HandshakeReq { + /// The versions supported by the BTP handshake request. + versions: u32, + /// The ATT MTU size supported by the BTP handshake request. + pub mtu: u16, + /// The window size supported by the BTP handshake request. + pub window_size: u8, +} + +impl HandshakeReq { + /// Create a new BTP handshake request from a byte iterator representing the raw BTP packet. + pub fn from(msg: I) -> Result + where + I: Iterator, + { + let mut req = Self::default(); + + req.decode(msg)?; + + Ok(req) + } + + /// Return an iterator over the versions supported by the BTP handshake request. + pub fn versions(&self) -> impl Iterator + '_ { + (0..7u8) + .map(|index| (self.versions >> (index * 4) & 0xff) as u8) + .filter(|version| *version > 0) + } + + // Future + // Set the versions supported by the BTP handshake request. + // fn set_versions(&mut self, versions: I) + // where + // I: Iterator, + // { + // for (index, version) in (0_u8..).zip(versions) { + // self.versions |= (version as u32) << (index * 4); + // } + // } + + /// Decode a BTP handshake request from a byte iterator representing the data payload of a BTP Handshake request packet. + fn decode(&mut self, mut msg: I) -> Result<(), Error> + where + I: Iterator, + { + self.versions = u32::from_le_bytes([ + msg.next().ok_or(ErrorCode::Invalid)?, + msg.next().ok_or(ErrorCode::Invalid)?, + msg.next().ok_or(ErrorCode::Invalid)?, + msg.next().ok_or(ErrorCode::Invalid)?, + ]); + self.mtu = u16::from_le_bytes([ + msg.next().ok_or(ErrorCode::Invalid)?, + msg.next().ok_or(ErrorCode::Invalid)?, + ]); + self.window_size = msg.next().ok_or(ErrorCode::Invalid)?; + + Ok(()) + } + + // Future + // Encode the BTP handshake request into a byte buffer. + // fn encode(&self, resp_buf: &mut WriteBuf) -> Result<(), Error> { + // resp_buf.le_u32(self.versions)?; + // resp_buf.le_u16(self.mtu)?; + // resp_buf.le_u8(self.window_size)?; + + // Ok(()) + // } +} + +/// Models the BTP handshake response. +#[derive(Debug, Default)] +pub struct HandshakeResp { + /// The version of the BTP protocol supported by the responder. + pub version: u8, + /// The chosen ATT MTU size by the responder. + pub mtu: u16, + /// The chosen window size by the responder. + pub window_size: u8, +} + +impl HandshakeResp { + // Future + // // Decode a BTP handshake request from a byte iterator representing the data payload of a BTP Handshake request packet. + // fn decode(&mut self, mut msg: I) -> Result<(), Error> + // where + // I: Iterator, + // { + // self.version = msg.next().ok_or(ErrorCode::Invalid)?; + // self.mtu = u16::from_le_bytes([ + // msg.next().ok_or(ErrorCode::Invalid)?, + // msg.next().ok_or(ErrorCode::Invalid)?, + // ]); + // self.window_size = msg.next().ok_or(ErrorCode::Invalid)?; + + // Ok(()) + // } + + /// Encode the BTP handshake response into a byte buffer. + pub fn encode(&self, resp_buf: &mut WriteBuf) -> Result<(), Error> { + resp_buf.le_u8(self.version)?; + resp_buf.le_u16(self.mtu)?; + resp_buf.le_u8(self.window_size)?; + Ok(()) + } +} diff --git a/rs-matter/src/utils/mod.rs b/rs-matter/src/utils/mod.rs index 0b09e5fc..b7c0136c 100644 --- a/rs-matter/src/utils/mod.rs +++ b/rs-matter/src/utils/mod.rs @@ -21,6 +21,8 @@ pub mod ifmutex; pub mod notification; pub mod parsebuf; pub mod rand; +pub mod ringbuf; pub mod select; pub mod signal; +pub mod std_mutex; pub mod writebuf; diff --git a/rs-matter/src/utils/ringbuf.rs b/rs-matter/src/utils/ringbuf.rs new file mode 100644 index 00000000..a02f8264 --- /dev/null +++ b/rs-matter/src/utils/ringbuf.rs @@ -0,0 +1,258 @@ +/* + * + * Copyright (c) 2020-2022 Project CHIP Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +use core::cmp::min; + +/// A ring buffer of a fixed capacity `N` using owned storage. +#[derive(Debug)] +pub struct RingBuf { + buf: heapless::Vec, + start: usize, + end: usize, + empty: bool, +} + +impl Default for RingBuf { + fn default() -> Self { + Self::new() + } +} + +impl RingBuf { + /// Create a new ring buffer. + #[inline(always)] + pub const fn new() -> Self { + Self { + buf: heapless::Vec::new(), + start: 0, + end: 0, + empty: true, + } + } + + /// Push new data to the end of the buffer. + /// If the data does not fit in the buffer, the oldest data is dropped to make room for the new one. + /// + /// Return the new length of data in the buffer. + #[inline(always)] + pub fn push(&mut self, data: &[u8]) -> usize { + // Unwrap is safe because the max size of the buffer is N + self.buf.resize_default(N).unwrap(); + + let mut offset = 0; + + while offset < data.len() { + let len = min(self.buf.len() - self.end, data.len() - offset); + + self.buf[self.end..self.end + len].copy_from_slice(&data[offset..offset + len]); + + offset += len; + + if !self.empty && self.start >= self.end && self.start < self.end + len { + // Dropping oldest data + self.start = self.end + len; + } + + self.end += len; + + self.wrap(); + + self.empty = false; + } + + self.len() + } + + /// Push a single byte to the end of the buffer. + /// If the buffer is full, the oldest byte is dropped to make room for the new one. + /// + /// Return the new length of data in the buffer. + #[inline(always)] + pub fn push_byte(&mut self, data: u8) -> usize { + // Unwrap is safe because the max size of the buffer is N + self.buf.resize_default(N).unwrap(); + + self.buf[self.end] = data; + + if !self.empty && self.start == self.end { + // Dropping oldest data + self.start = self.end + 1; + } + + self.end += 1; + + self.wrap(); + + self.empty = false; + + self.len() + } + + /// Pop one byte from the start of the buffer. + /// If the bufer is empty, return `None`. + #[inline(always)] + pub fn pop_byte(&mut self) -> Option { + let mut buf = [0; 1]; + + if self.pop(&mut buf) == 1 { + Some(buf[0]) + } else { + None + } + } + + /// Pop data from the start of the buffer. + /// Return the number of bytes copied to the output buffer. + #[inline(always)] + pub fn pop(&mut self, out_buf: &mut [u8]) -> usize { + let mut offset = 0; + + while offset < out_buf.len() && !self.empty { + let len = min( + if self.start < self.end { + self.end + } else { + self.buf.len() + } - self.start, + out_buf.len() - offset, + ); + + out_buf[offset..offset + len].copy_from_slice(&self.buf[self.start..self.start + len]); + + self.start += len; + + self.wrap(); + + if self.start == self.end { + self.empty = true + } + + offset += len; + } + + offset + } + + /// Return `true` when the buffer is full. + #[inline(always)] + pub fn is_full(&self) -> bool { + self.start == self.end && !self.empty + } + + /// Return `true` when the buffer is empty. + #[inline(always)] + pub fn is_empty(&self) -> bool { + self.empty + } + + /// Return the current size of the data in the buffer. + #[inline(always)] + #[allow(unused)] + pub fn len(&self) -> usize { + if self.empty { + 0 + } else if self.start < self.end { + self.end - self.start + } else { + self.buf.len() + self.end - self.start + } + } + + /// Return the free space in the buffer. + #[inline(always)] + #[allow(unused)] + pub fn free(&self) -> usize { + N - self.len() + } + + /// Clear the buffer. + #[inline(always)] + pub fn clear(&mut self) { + self.start = 0; + self.end = 0; + self.empty = true; + } + + #[inline(always)] + fn wrap(&mut self) { + if self.start == self.buf.len() { + self.start = 0; + } + + if self.end == self.buf.len() { + self.end = 0; + } + } +} + +impl Iterator for RingBuf { + type Item = u8; + + fn next(&mut self) -> Option { + self.pop_byte() + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn push_pop() { + let mut rb = RingBuf::<4>::new(); + assert!(rb.is_empty()); + + rb.push(&[0, 1, 2]); + assert_eq!(3, rb.len()); + assert!(!rb.is_empty()); + assert!(!rb.is_full()); + + rb.push(&[3]); + assert_eq!(4, rb.len()); + assert!(!rb.is_empty()); + assert!(rb.is_full()); + + let mut buf = [0; 256]; + + let len = rb.pop(&mut buf); + assert_eq!(4, len); + assert_eq!(&buf[0..4], &[0, 1, 2, 3]); + assert!(rb.is_empty()); + + rb.push(&[0, 1, 2, 3, 4, 5]); + assert_eq!(4, rb.len()); + assert!(!rb.is_empty()); + assert!(rb.is_full()); + + let len = rb.pop(&mut buf[..3]); + assert_eq!(3, len); + assert_eq!(&buf[0..len], &[2, 3, 4]); + assert!(!rb.is_empty()); + assert!(!rb.is_full()); + + let len = rb.pop(&mut buf); + assert_eq!(1, len); + assert_eq!(&buf[0..len], &[5]); + assert!(rb.is_empty()); + assert!(!rb.is_full()); + + let len = rb.pop(&mut buf); + assert_eq!(0, len); + assert!(rb.is_empty()); + assert!(!rb.is_full()); + } +} diff --git a/rs-matter/src/utils/std_mutex.rs b/rs-matter/src/utils/std_mutex.rs new file mode 100644 index 00000000..868e2339 --- /dev/null +++ b/rs-matter/src/utils/std_mutex.rs @@ -0,0 +1,42 @@ +/* + * + * Copyright (c) 2020-2022 Project CHIP Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#![cfg(feature = "std")] + +use embassy_sync::blocking_mutex::raw::RawMutex; + +/// An `embassy-sync` `RawMutex` implementation using `std::sync::Mutex`. +/// TODO: Upstream into `embassy-sync` itself. +#[derive(Default)] +pub struct StdRawMutex(std::sync::Mutex<()>); + +impl StdRawMutex { + pub const fn new() -> Self { + Self(std::sync::Mutex::new(())) + } +} + +unsafe impl RawMutex for StdRawMutex { + #[allow(clippy::declare_interior_mutable_const)] + const INIT: Self = StdRawMutex(std::sync::Mutex::new(())); + + fn lock(&self, f: impl FnOnce() -> R) -> R { + let _guard = self.0.lock().unwrap(); + + f() + } +}