Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: notify when publish/subscribe/unsubscribe packets are acked #916

Open
wants to merge 25 commits into
base: main
Choose a base branch
from

Conversation

de-sh
Copy link
Member

@de-sh de-sh commented Sep 29, 2024

Solves #805

Type of change

New feature, that notifies the requester when their publish(QoS 1/2)/subscribe/unsubscribe request are acknowledged by the broker.

BREAKING: pending definition changes from VecDeque<Request> to VecDeque<(Request, Option<PromiseTx>)>.

Checklist:

  • Formatted with cargo fmt
  • Make an entry to CHANGELOG.md if it's relevant to the users of the library. If it's not relevant mention why.

@de-sh de-sh changed the title feat: acknowledge notification feat: notify when publish/subscribe/unsubscribe packets are acked Sep 29, 2024
@coveralls
Copy link

coveralls commented Sep 29, 2024

Pull Request Test Coverage Report for Build 11231284962

Details

  • 173 of 492 (35.16%) changed or added relevant lines in 7 files are covered.
  • 131 unchanged lines in 8 files lost coverage.
  • Overall coverage increased (+1.4%) to 37.48%

Changes Missing Coverage Covered Lines Changed/Added Lines %
rumqttc/src/v5/eventloop.rs 0 7 0.0%
rumqttc/src/state.rs 80 90 88.89%
rumqttc/src/lib.rs 16 38 42.11%
rumqttc/src/v5/state.rs 37 106 34.91%
rumqttc/src/client.rs 26 109 23.85%
rumqttc/src/v5/client.rs 7 135 5.19%
Files with Coverage Reduction New Missed Lines %
rumqttc/src/v5/state.rs 5 63.14%
rumqttc/src/eventloop.rs 5 85.27%
rumqttd/src/link/timer.rs 8 0.0%
rumqttc/src/v5/client.rs 10 12.83%
rumqttc/src/client.rs 10 32.89%
rumqttc/src/v5/eventloop.rs 25 7.51%
rumqttd/src/link/remote.rs 25 46.03%
rumqttd/src/link/bridge.rs 43 0.0%
Totals Coverage Status
Change from base Build 10511632669: 1.4%
Covered Lines: 6303
Relevant Lines: 16817

💛 - Coveralls

@de-sh de-sh marked this pull request as ready for review September 29, 2024 20:08
@xiaocq2001
Copy link
Contributor

xiaocq2001 commented Sep 30, 2024

Glad to see the progress here. Some of the things to discuss:

  1. Is there any example on how the feature is used?
    I tried to use following code to test
    println!("--- Publishing messages and wait for ack ---");
    let mut set = JoinSet::new();

    let ack_promise = client
        .publish("hello/world", QoS::AtMostOnce, false, vec![1; 1])
        .await
        .unwrap();
    set.spawn(async move {
        ack_promise.await
    });

    let ack_promise = client
        .publish("hello/world", QoS::AtLeastOnce, false, vec![1; 2])
        .await
        .unwrap();
    set.spawn(async move {
        ack_promise.await
    });

    let ack_promise = client
        .publish("hello/world", QoS::ExactlyOnce, false, vec![1; 3])
        .await
        .unwrap();
    set.spawn(async move {
        ack_promise.await
    });

    while let Some(res) = set.join_next().await {
        println!("Acknoledged = {:?}", res?);
    }

The output shows "RecvError"

--- Publishing messages and wait for ack ---
 DEBUG rumqttc::v5::state > Publish. Topic = hello/world, Pkid = 0, Payload Size = 1
Event = Outgoing(Publish(0))
 DEBUG rumqttc::v5::state > Publish. Topic = hello/world, Pkid = 4, Payload Size = 2
Event = Outgoing(Publish(4))
 DEBUG rumqttc::v5::state > Publish. Topic = hello/world, Pkid = 5, Payload Size = 3
Event = Outgoing(Publish(5))
Acknoledged = Err(RecvError(()))
Acknoledged = Err(RecvError(()))
Acknoledged = Err(RecvError(()))
Event = Incoming(Publish(Publish { dup: false, qos: AtMostOnce, retain: false, topic: b"hello/world", pkid: 0, payload: b"\x01", properties: None }))
Event = Incoming(PubAck(PubAck { pkid: 4, reason: Success, properties: None }))
Event = Incoming(PubRec(PubRec { pkid: 5, reason: Success, properties: None }))
Event = Outgoing(PubRel(5))
Event = Incoming(Publish(Publish { dup: false, qos: AtLeastOnce, retain: false, topic: b"hello/world", pkid: 1, payload: b"\x01\x01", properties: None }))
Event = Outgoing(PubAck(1))
Event = Incoming(Publish(Publish { dup: false, qos: ExactlyOnce, retain: false, topic: b"hello/world", pkid: 2, payload: b"\x01\x01\x01", properties: None }))
Event = Outgoing(PubRec(2))
Event = Incoming(PubComp(PubComp { pkid: 5, reason: Success, properties: None }))
Event = Incoming(PubRel(PubRel { pkid: 2, reason: Success, properties: None }))
Event = Outgoing(PubComp(2))

In outgoing_publish, the tx is not saved to ack_waiter, it's dropped!
Maybe you can add (with QoS0 notification in discuss 2)

        if publish.qos != QoS::AtMostOnce {
            self.ack_waiter[pkid as usize] = tx;
        } else {
            if let Some(tx) = tx {
                tx.resolve();
            }
        }

after

        let event = Event::Outgoing(Outgoing::Publish(pkid));
        self.events.push_back(event);
  1. It seems in QoS0, there is no notification, is it worthy that we have notification on QoS0 packet sent (outgoing_publish).

@de-sh
Copy link
Member Author

de-sh commented Sep 30, 2024

Thanks for the review, two more things I have been thinking about with respect to the interface are as follows:

  1. Returning pkid instead of ().
  2. Errors with more context about why a request was refused by the broker, both subscribe and unsub acks have reason codes as response. Should this just return the acknowledgement packets received from the broker or repackage the same into a more presentable type?

@@ -75,11 +75,11 @@ pub struct EventLoop {
/// Current state of the connection
pub state: MqttState,
/// Request stream
requests_rx: Receiver<Request>,
requests_rx: Receiver<(Request, Option<PromiseTx>)>,
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
requests_rx: Receiver<(Request, Option<PromiseTx>)>,
requests_rx: Receiver<PendingRequest>,

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Resolved in #917

/// Pending packets from last session
pub pending: VecDeque<Request>,
pub pending: VecDeque<(Request, Option<PromiseTx>)>,
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a breaking change, note on release

@de-sh de-sh mentioned this pull request Oct 1, 2024
2 tasks
@xiaocq2001
Copy link
Contributor

Thanks for the review, two more things I have been thinking about with respect to the interface are as follows:

  1. Returning pkid instead of ().
  2. Errors with more context about why a request was refused by the broker, both subscribe and unsub acks have reason codes as response. Should this just return the acknowledgement packets received from the broker or repackage the same into a more presentable type?

For 1, that's great. That also helps on 2, before we decide to put more things in error cases, the pkid helps to extract the error packet from event loop events.
For 2, I vote the acknowledgement packets, since that's not only the reason code, but also with reason string and maybe other user defined properties.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants