Skip to content

Commit

Permalink
Fix races with MessagePipeReader due to the Mojo IPC channel being th…
Browse files Browse the repository at this point in the history
…read-safe.

This change does three things:
1. Checks the threading of MessagePipeReader using a ThreadChecker.
2. Eliminates races on |pending_send_error_| by using atomic ops.
3. Documents and removes checks which are no longer valid because Send()
   can be called on a non-IO thread.

BUG=522888,492867
TESTED=Enabled Mojo IPC channel and ran StatsTableBrowserTest.StartWithStatTable
       with tsan 100 times.
       Before: IPC-related races and DCHECK failures
       After: No IPC-related races and DCHECK failures.

Review URL: https://codereview.chromium.org/1318453002

Cr-Commit-Position: refs/heads/master@{#346967}
  • Loading branch information
akmistry authored and Commit bot committed Sep 2, 2015
1 parent 52c41cb commit 0b0e748
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 13 deletions.
32 changes: 22 additions & 10 deletions ipc/mojo/ipc_message_pipe_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ namespace internal {
MessagePipeReader::MessagePipeReader(mojo::ScopedMessagePipeHandle handle,
MessagePipeReader::Delegate* delegate)
: pipe_(handle.Pass()),
handle_copy_(pipe_.get().value()),
delegate_(delegate),
async_waiter_(
new AsyncHandleWaiter(base::Bind(&MessagePipeReader::PipeIsReady,
Expand All @@ -27,40 +28,45 @@ MessagePipeReader::MessagePipeReader(mojo::ScopedMessagePipeHandle handle,
}

MessagePipeReader::~MessagePipeReader() {
DCHECK(thread_checker_.CalledOnValidThread());
// The pipe should be closed before deletion.
CHECK(!IsValid());
DCHECK_EQ(pending_send_error_, MOJO_RESULT_OK);
}

void MessagePipeReader::Close() {
// All pending errors should be signaled before Close().
DCHECK_EQ(pending_send_error_, MOJO_RESULT_OK);
DCHECK(thread_checker_.CalledOnValidThread());
async_waiter_.reset();
pipe_.reset();
OnPipeClosed();
}

void MessagePipeReader::CloseWithError(MojoResult error) {
DCHECK(thread_checker_.CalledOnValidThread());
OnPipeError(error);
Close();
}

void MessagePipeReader::CloseWithErrorIfPending() {
if (pending_send_error_ == MOJO_RESULT_OK)
DCHECK(thread_checker_.CalledOnValidThread());
MojoResult pending_error = base::subtle::NoBarrier_Load(&pending_send_error_);
if (pending_error == MOJO_RESULT_OK)
return;
MojoResult error = pending_send_error_;
pending_send_error_ = MOJO_RESULT_OK;
CloseWithError(error);
// NOTE: This races with Send(), and therefore the value of
// pending_send_error() can change.
CloseWithError(pending_error);
return;
}

void MessagePipeReader::CloseWithErrorLater(MojoResult error) {
pending_send_error_ = error;
DCHECK_NE(error, MOJO_RESULT_OK);
// NOTE: No assumptions about the value of |pending_send_error_| or whether or
// not the error has been signaled can be made. If Send() is called
// immediately before Close() and errors, it's possible for the error to not
// be signaled.
base::subtle::NoBarrier_Store(&pending_send_error_, error);
}

bool MessagePipeReader::Send(scoped_ptr<Message> message) {
DCHECK(IsValid());

TRACE_EVENT_WITH_FLOW0(TRACE_DISABLED_BY_DEFAULT("ipc.flow"),
"MessagePipeReader::Send",
message->flags(),
Expand Down Expand Up @@ -111,19 +117,22 @@ void MessagePipeReader::OnMessageReceived() {
}

void MessagePipeReader::OnPipeClosed() {
DCHECK(thread_checker_.CalledOnValidThread());
if (!delegate_)
return;
delegate_->OnPipeClosed(this);
delegate_ = nullptr;
}

void MessagePipeReader::OnPipeError(MojoResult error) {
DCHECK(thread_checker_.CalledOnValidThread());
if (!delegate_)
return;
delegate_->OnPipeError(this);
}

MojoResult MessagePipeReader::ReadMessageBytes() {
DCHECK(thread_checker_.CalledOnValidThread());
DCHECK(handle_buffer_.empty());

uint32_t num_bytes = static_cast<uint32_t>(data_buffer_.size());
Expand Down Expand Up @@ -153,6 +162,7 @@ MojoResult MessagePipeReader::ReadMessageBytes() {
}

void MessagePipeReader::ReadAvailableMessages() {
DCHECK(thread_checker_.CalledOnValidThread());
while (pipe_.is_valid()) {
MojoResult read_result = ReadMessageBytes();
if (read_result == MOJO_RESULT_SHOULD_WAIT)
Expand All @@ -171,6 +181,7 @@ void MessagePipeReader::ReadAvailableMessages() {
}

void MessagePipeReader::ReadMessagesThenWait() {
DCHECK(thread_checker_.CalledOnValidThread());
while (true) {
ReadAvailableMessages();
if (!pipe_.is_valid())
Expand All @@ -197,6 +208,7 @@ void MessagePipeReader::ReadMessagesThenWait() {
}

void MessagePipeReader::PipeIsReady(MojoResult wait_result) {
DCHECK(thread_checker_.CalledOnValidThread());
CloseWithErrorIfPending();
if (!IsValid()) {
// There was a pending error and it closed the pipe.
Expand Down
18 changes: 15 additions & 3 deletions ipc/mojo/ipc_message_pipe_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@

#include <vector>

#include "base/atomicops.h"
#include "base/compiler_specific.h"
#include "base/memory/scoped_ptr.h"
#include "base/threading/thread_checker.h"
#include "ipc/ipc_message.h"
#include "third_party/mojo/src/mojo/public/c/environment/async_waiter.h"
#include "third_party/mojo/src/mojo/public/cpp/system/core.h"
Expand All @@ -30,7 +32,9 @@ class AsyncHandleWaiter;
// * Create the subclass instance with a MessagePipeHandle.
// The constructor automatically start listening on the pipe.
//
// MessageReader has to be used in IO thread. It isn't thread-safe.
// All functions must be called on the IO thread, except for Send(), which can
// be called on any thread. All |Delegate| functions will be called on the IO
// thread.
//
class MessagePipeReader {
public:
Expand Down Expand Up @@ -62,7 +66,7 @@ class MessagePipeReader {
MessagePipeReader(mojo::ScopedMessagePipeHandle handle, Delegate* delegate);
virtual ~MessagePipeReader();

MojoHandle handle() const { return pipe_.get().value(); }
MojoHandle handle() const { return handle_copy_; }

// Returns received bytes.
const std::vector<char>& data_buffer() const {
Expand Down Expand Up @@ -100,10 +104,18 @@ class MessagePipeReader {
std::vector<char> data_buffer_;
std::vector<MojoHandle> handle_buffer_;
mojo::ScopedMessagePipeHandle pipe_;
// Constant copy of the message pipe handle. For use by Send(), which can run
// concurrently on non-IO threads.
// TODO(amistry): This isn't quite right because handles can be re-used and
// using this can run into the ABA problem. Currently, this is highly unlikely
// because Mojo internally uses an increasing uint32_t as handle values, but
// this could change. See crbug.com/524894.
const MojoHandle handle_copy_;
// |delegate_| and |async_waiter_| are null once the message pipe is closed.
Delegate* delegate_;
scoped_ptr<AsyncHandleWaiter> async_waiter_;
MojoResult pending_send_error_;
base::subtle::Atomic32 pending_send_error_;
base::ThreadChecker thread_checker_;

DISALLOW_COPY_AND_ASSIGN(MessagePipeReader);
};
Expand Down

0 comments on commit 0b0e748

Please sign in to comment.