Skip to content

Commit

Permalink
[Mojo] Do not read mojo messages until they can be dispatched
Browse files Browse the repository at this point in the history
Before this change, logic allowed dequeuing messages from mojo pipe and
holding them inside internal queue (|Connector::dispatch_queue_|) until
a future post-task would dispatch them. If - however - the receiver
would be unbound between these moments, all dequeued messages would be
lost. For async mojo calls, this may lead to logic problems. For sync
calls, it will deadlock the whole calling thread.

This change removes the |dispatch_queue_|, simplifies the logic of
dispatching messages from mojo pipe and ensures that no mojo message
can be dequeued without dispatching it.

A new unit test is added to ensure that implementation is not
vulnerable to such a problem anymore. Without the patch, it deadlocks
as the sync message is dropped on the receiver's thread during
rebinding of the |mojo::Receiver| objects.

Bug: 1066761
Change-Id: I94af241dca6656c8dc0e21fb8c7ec75c9f9a0b32
Tbr: wez@chromium.org
Reviewed-on: https://chromium-review.googlesource.com/c/chromium/src/+/2139704
Commit-Queue: Ken Rockot <rockot@google.com>
Reviewed-by: Ken Rockot <rockot@google.com>
Cr-Commit-Position: refs/heads/master@{#765123}
  • Loading branch information
Damian Dyńdo authored and Commit Bot committed May 4, 2020
1 parent ba7c696 commit a0aa431
Show file tree
Hide file tree
Showing 5 changed files with 143 additions and 155 deletions.
1 change: 1 addition & 0 deletions fuchsia/base/message_port.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
#include <vector>

#include "base/bind.h"
#include "base/containers/circular_deque.h"
#include "base/fuchsia/fuchsia_logging.h"
#include "base/macros.h"
#include "fuchsia/base/mem_buffer_util.h"
Expand Down
50 changes: 15 additions & 35 deletions mojo/public/cpp/bindings/connector.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,13 @@
#include "base/callback.h"
#include "base/compiler_specific.h"
#include "base/component_export.h"
#include "base/containers/queue.h"
#include "base/memory/ref_counted.h"
#include "base/memory/weak_ptr.h"
#include "base/optional.h"
#include "base/sequence_checker.h"
#include "base/sequenced_task_runner.h"
#include "mojo/public/cpp/bindings/connection_group.h"
#include "mojo/public/cpp/bindings/message.h"
#include "mojo/public/cpp/bindings/sequence_local_sync_event_watcher.h"
#include "mojo/public/cpp/bindings/sync_handle_watcher.h"
#include "mojo/public/cpp/system/core.h"
#include "mojo/public/cpp/system/handle_signal_tracker.h"
Expand Down Expand Up @@ -223,6 +221,8 @@ class COMPONENT_EXPORT(MOJO_CPP_BINDINGS) Connector : public MessageReceiver {

void WaitToReadMore();

uint64_t QueryPendingMessageCount() const;

// Attempts to read a single Message from the pipe. Returns |MOJO_RESULT_OK|
// and a valid message in |*message| iff a message was successfully read and
// prepared for dispatch.
Expand All @@ -233,24 +233,17 @@ class COMPONENT_EXPORT(MOJO_CPP_BINDINGS) Connector : public MessageReceiver {
// validation).
bool DispatchMessage(Message message);

// Posts a task to dispatch the next message in |dispatch_queue_|. These two
// functions keep |num_pending_dispatch_tasks_| up to date, so as to allow
// bounding the number of posted tasks when the Connector is e.g. paused and
// resumed repeatedly.
void PostDispatchNextMessageInQueue();
void CallDispatchNextMessageInQueue();

// Used to schedule dispatch of a single message from the front of
// |dispatch_queue_|. Returns |true| if the dispatch succeeded and |false|
// otherwise (e.g. if the message failed validation).
bool DispatchNextMessageInQueue();

// Dispatches all queued messages to the receiver immediately. This is
// necessary to ensure proper ordering when beginning to wait for a sync
// response, because new incoming messages need to be dispatched as they
// arrive. Returns |true| if all queued messages were successfully dispatched,
// and |false| if any dispatch fails.
bool DispatchAllQueuedMessages();
// Posts a task to read the next message from the pipe. These two functions
// keep |num_pending_read_tasks_| up to date to limit the number of posted
// tasks when the Connector is e.g. paused and resumed repeatedly.
void PostDispatchNextMessageFromPipe();
void CallDispatchNextMessageFromPipe();

// Ensures that enough tasks are posted to dispatch |pending_message_count|
// messages based on current |num_pending_dispatch_tasks_| value. If there are
// no more pending messages, it will call ArmOrNotify() on |handle_watcher_|.
void ScheduleDispatchOfPendingMessagesOrWaitForMore(
uint64_t pending_message_count);

// Reads all available messages off of the pipe, possibly dispatching one or
// more of them depending on the state of the Connector when this is called.
Expand All @@ -262,7 +255,7 @@ class COMPONENT_EXPORT(MOJO_CPP_BINDINGS) Connector : public MessageReceiver {
// asynchronously.
void HandleError(bool force_pipe_reset, bool force_async_handler);

// Cancels any calls made to |waiter_|.
// Cancels any calls made to |handle_watcher_|.
void CancelWait();

void EnsureSyncWatcherExists();
Expand Down Expand Up @@ -292,18 +285,6 @@ class COMPONENT_EXPORT(MOJO_CPP_BINDINGS) Connector : public MessageReceiver {
// See |set_force_immediate_dispatch()|.
bool force_immediate_dispatch_;

// Messages which have been read off the pipe but not yet dispatched. This
// exists so that we can schedule individual dispatch tasks for each read
// message in parallel rather than having to do it in series as each message
// is read off the pipe.
base::queue<Message> dispatch_queue_;

// Indicates whether a non-fatal pipe error (i.e. peer closure and no more
// incoming messages) was detected while |dispatch_queue_| was non-empty.
// When |true|, ensures that an error will be propagated outward as soon as
// |dispatch_queue_| is fully flushed.
bool pending_error_dispatch_ = false;

OutgoingSerializationMode outgoing_serialization_mode_;
IncomingSerializationMode incoming_serialization_mode_;

Expand All @@ -312,7 +293,6 @@ class COMPONENT_EXPORT(MOJO_CPP_BINDINGS) Connector : public MessageReceiver {
base::Optional<base::Lock> lock_;

std::unique_ptr<SyncHandleWatcher> sync_watcher_;
std::unique_ptr<SequenceLocalSyncEventWatcher> dispatch_queue_watcher_;

bool allow_woken_up_by_others_ = false;
// If non-zero, currently the control flow is inside the sync handle watcher
Expand All @@ -339,7 +319,7 @@ class COMPONENT_EXPORT(MOJO_CPP_BINDINGS) Connector : public MessageReceiver {
// nested dispatch operations.
bool is_dispatching_ = false;

// The number of outstanding tasks for CallDispatchNextMessageInQueue.
// The number of pending tasks for |CallDispatchNextMessageFromPipe|.
size_t num_pending_dispatch_tasks_ = 0;

#if defined(ENABLE_IPC_FUZZER)
Expand Down
181 changes: 61 additions & 120 deletions mojo/public/cpp/bindings/lib/connector.cc
Original file line number Diff line number Diff line change
Expand Up @@ -241,9 +241,6 @@ bool Connector::WaitForIncomingMessage(MojoDeadline deadline) {
// INDEFINITE deadlines at present, so we only support those.
DCHECK(deadline == 0 || deadline == MOJO_DEADLINE_INDEFINITE);

if (!dispatch_queue_.empty())
return DispatchNextMessageInQueue();

MojoResult rv = MOJO_RESULT_UNKNOWN;
if (deadline == 0 && !message_pipe_->QuerySignalsState().readable())
return false;
Expand Down Expand Up @@ -286,20 +283,6 @@ void Connector::ResumeIncomingMethodCallProcessing() {
if (!paused_)
return;

// Some number of queued dispatch tasks may have been aborted due to the
// Connector being paused at task execution time. We either dispatch them all
// now (if immediate dispatch is enabled) or schedule new tasks for each of
// them. Some of the scheduled tasks may be redundant, but that's OK.
if (should_dispatch_messages_immediately()) {
base::WeakPtr<Connector> weak_self = weak_self_;
DispatchAllQueuedMessages();
if (!weak_self)
return;
} else {
while (num_pending_dispatch_tasks_ < dispatch_queue_.size())
PostDispatchNextMessageInQueue();
}

paused_ = false;
WaitToReadMore();
}
Expand Down Expand Up @@ -376,7 +359,6 @@ void Connector::AllowWokenUpBySyncWatchOnSameThread() {

EnsureSyncWatcherExists();
sync_watcher_->AllowWokenUpBySyncWatchOnSameThread();
dispatch_queue_watcher_->AllowWokenUpBySyncWatchOnSameSequence();
}

void Connector::SetWatcherHeapProfilerTag(const char* tag) {
Expand Down Expand Up @@ -424,25 +406,15 @@ void Connector::OnHandleReadyInternal(MojoResult result) {

if (result == MOJO_RESULT_FAILED_PRECONDITION) {
// No more messages on the pipe and the peer is closed.
if (dispatch_queue_.empty()) {
HandleError(false /* force_pipe_reset */,
false /* force_async_handler */);
return;
} else {
// We don't want to propagate an error signal yet because we still have
// queued messages to dispatch.
pending_error_dispatch_ = true;
}
HandleError(false /* force_pipe_reset */, false /* force_async_handler */);
return;
} else if (result != MOJO_RESULT_OK) {
// Some other fatal error condition was encountered. We can propagate this
// immediately.
HandleError(true /* force_pipe_reset */, false /* force_async_handler */);
return;
}

if (dispatch_queue_watcher_)
dispatch_queue_watcher_->ResetEvent();

ReadAllAvailableMessages();
// At this point, this object might have been deleted. Return.
}
Expand Down Expand Up @@ -478,10 +450,18 @@ void Connector::WaitToReadMore() {
if (allow_woken_up_by_others_) {
EnsureSyncWatcherExists();
sync_watcher_->AllowWokenUpBySyncWatchOnSameThread();
dispatch_queue_watcher_->AllowWokenUpBySyncWatchOnSameSequence();
}
}

uint64_t Connector::QueryPendingMessageCount() const {
uint64_t unused_current_limit = 0;
uint64_t pending_message_count = 0;
MojoQueryQuota(
message_pipe_.get().value(), MOJO_QUOTA_TYPE_RECEIVE_QUEUE_LENGTH,
/*options=*/nullptr, &unused_current_limit, &pending_message_count);
return pending_message_count;
}

MojoResult Connector::ReadMessage(Message* message) {
ScopedMessageHandle handle;
MojoResult result =
Expand Down Expand Up @@ -553,117 +533,83 @@ bool Connector::DispatchMessage(Message message) {
return true;
}

void Connector::PostDispatchNextMessageInQueue() {
DCHECK_LT(num_pending_dispatch_tasks_, dispatch_queue_.size());
void Connector::PostDispatchNextMessageFromPipe() {
++num_pending_dispatch_tasks_;
task_runner_->PostTask(
FROM_HERE,
base::BindOnce(&Connector::CallDispatchNextMessageInQueue, weak_self_));
base::BindOnce(&Connector::CallDispatchNextMessageFromPipe, weak_self_));
}

void Connector::CallDispatchNextMessageInQueue() {
void Connector::CallDispatchNextMessageFromPipe() {
DCHECK_GT(num_pending_dispatch_tasks_, 0u);
--num_pending_dispatch_tasks_;
DispatchNextMessageInQueue();
ReadAllAvailableMessages();
}

bool Connector::DispatchNextMessageInQueue() {
if (error_ || paused_)
return false;

if (dispatch_queue_.empty())
return true;

Message message = std::move(dispatch_queue_.front());
dispatch_queue_.pop();

base::WeakPtr<Connector> weak_self = weak_self_;

// NOTE: May delete |this|.
bool result = DispatchMessage(std::move(message));
if (weak_self) {
// If that was our last queued message and we've detected a pipe error, we
// can propagate it now.
if (dispatch_queue_.empty() && pending_error_dispatch_) {
HandleError(false /* force_pipe_reset */,
false /* force_async_handler */);
}
void Connector::ScheduleDispatchOfPendingMessagesOrWaitForMore(
uint64_t pending_message_count) {
if (pending_message_count == 0) {
// We're done only because there are no more messages to read, so go back to
// watching the pipe for more.
handle_watcher_->ArmOrNotify();
return;
}

return result;
}

bool Connector::DispatchAllQueuedMessages() {
base::WeakPtr<Connector> weak_self = weak_self_;
while (weak_self && !dispatch_queue_.empty()) {
if (!DispatchNextMessageInQueue())
return false;
while (pending_message_count > num_pending_dispatch_tasks_) {
PostDispatchNextMessageFromPipe();
}

return true;
}

void Connector::ReadAllAvailableMessages() {
base::WeakPtr<Connector> weak_self = weak_self_;
if (should_dispatch_messages_immediately()) {
// If we're dispatching messages immediately, we have to ensure that the
// pending dispatch queue is flushed before we started reading and
// dispatching messages fresh off the pipe. Otherwise messages would get
// reordered.
if (!DispatchAllQueuedMessages() || !weak_self)
return;
if (paused_) {
return;
}

// Flush all messages from the pipe.
Message message;
MojoResult rv;
bool first_message_in_batch = dispatch_queue_.empty();
while ((rv = ReadMessage(&message)) == MOJO_RESULT_OK) {
DCHECK(!message.IsNull());

if (first_message_in_batch || should_dispatch_messages_immediately()) {
// Dispatch immediately if this is the first available message or if
// immediate dispatch is currently enabled for whatever reason.
DCHECK(dispatch_queue_.empty());
if (!DispatchMessage(std::move(message)) || !weak_self || paused_)
return;
} else {
dispatch_queue_.push(std::move(message));
if (num_pending_dispatch_tasks_ < dispatch_queue_.size())
PostDispatchNextMessageInQueue();
}
base::WeakPtr<Connector> weak_self = weak_self_;

first_message_in_batch = false;
}
do {
Message message;
MojoResult rv = ReadMessage(&message);

switch (rv) {
case MOJO_RESULT_OK:
DCHECK(!message.IsNull());
if (!DispatchMessage(std::move(message)) || !weak_self || paused_) {
return;
}
break;

case MOJO_RESULT_SHOULD_WAIT:
// No more messages - we need to wait for new ones to arrive.
ScheduleDispatchOfPendingMessagesOrWaitForMore(
/*pending_message_count*/ 0u);
return;

if (!dispatch_queue_.empty() && dispatch_queue_watcher_)
dispatch_queue_watcher_->SignalEvent();
case MOJO_RESULT_FAILED_PRECONDITION:
// The peer endpoint was closed and there are no more messages to read.
// We can signal an error right away.
HandleError(false /* force_pipe_reset */,
false /* force_async_handler */);
return;

if (rv == MOJO_RESULT_SHOULD_WAIT) {
// We're done only because there are no more messages to read, so go back to
// watching the pipe for more.
handle_watcher_->ArmOrNotify();
return;
}
default:
// A fatal error occurred on the pipe, handle it immediately.
HandleError(true /* force_pipe_reset */,
false /* force_async_handler */);
return;
}
} while (weak_self && should_dispatch_messages_immediately());

if (rv != MOJO_RESULT_FAILED_PRECONDITION) {
// A fatal error occurred on the pipe, handle it immediately.
HandleError(true /* force_pipe_reset */, false /* force_async_handler */);
} else if (dispatch_queue_.empty()) {
// The peer endpoint was closed and there are no more messages to read, and
// our dispatch queue is empty. We can signal an error right away.
HandleError(false /* force_pipe_reset */, false /* force_async_handler */);
} else {
// Peer closed but we still have messages to dispatch. Defer error
// propagation.
pending_error_dispatch_ = true;
if (weak_self) {
const auto pending_message_count = QueryPendingMessageCount();
ScheduleDispatchOfPendingMessagesOrWaitForMore(pending_message_count);
}
}

void Connector::CancelWait() {
peer_remoteness_tracker_.reset();
handle_watcher_.reset();
sync_watcher_.reset();
dispatch_queue_watcher_.reset();
}

void Connector::HandleError(bool force_pipe_reset, bool force_async_handler) {
Expand Down Expand Up @@ -707,11 +653,6 @@ void Connector::EnsureSyncWatcherExists() {
message_pipe_.get(), MOJO_HANDLE_SIGNAL_READABLE,
base::BindRepeating(&Connector::OnSyncHandleWatcherHandleReady,
base::Unretained(this))));
dispatch_queue_watcher_ = std::make_unique<SequenceLocalSyncEventWatcher>(
base::BindRepeating(&Connector::OnSyncHandleWatcherHandleReady,
base::Unretained(this), MOJO_RESULT_OK));
if (!dispatch_queue_.empty())
dispatch_queue_watcher_->SignalEvent();
}

} // namespace mojo
Loading

0 comments on commit a0aa431

Please sign in to comment.