Skip to content

Commit

Permalink
Mojo: Armed Watchers
Browse files Browse the repository at this point in the history
Changes the Watcher C API as described in this post:
https://groups.google.com/a/chromium.org/forum/#!topic/chromium-mojo/UcA97R4IznI

Makes watchers a first-class non-transferable handle type in Mojo,
allows multiple distinct events to be watched by a single watcher,
and requires watchers to be explicitly armed before a notification
will fire. Also now allows for cancellation (and watcher closure)
from within notification callbacks, simplifying cancellation logic
and avoiding any possibility of EDK deadlocks caused by cancellations,
during watch notification.

Updates the Watcher class in mojo/public/cpp/system to reflect the
new model, adding an explicit ArmingPolicy to allow users to select
manual or automatic arming. Also renames it to SimpleWatcher to
adequately convey that this is only a simplified helper class that
does not utilize the full power of watchers.

Automatic arming provides imperfect edge-triggered behavior, which is
still an improvement over the old behavior in many cases.

Manual arming is used in the bindings Connector to ensure that all
messages are flushed from a pipe before control returns from a
handle-ready notification, and is also now used for Watchers which
watch a data pipe handle.

Other users of the Watcher C API (namely Blink's MojoWatcher and
content's MessagePort) have also been adapted to the new API.

BUG=693595,700171

Review-Url: https://codereview.chromium.org/2725133002
Cr-Commit-Position: refs/heads/master@{#457269}
  • Loading branch information
krockot authored and Commit bot committed Mar 15, 2017
1 parent 2ca0368 commit 9eadaba
Show file tree
Hide file tree
Showing 84 changed files with 3,976 additions and 1,565 deletions.
19 changes: 12 additions & 7 deletions chrome/browser/media/cast_remoting_sender.cc
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ CastRemotingSender::CastRemotingSender(
latest_acked_frame_id_(media::cast::FrameId::first() - 1),
duplicate_ack_counter_(0),
input_queue_discards_remaining_(0),
pipe_watcher_(FROM_HERE),
pipe_watcher_(FROM_HERE, mojo::SimpleWatcher::ArmingPolicy::MANUAL),
flow_restart_pending_(true),
weak_factory_(this) {
// Confirm this constructor is running on the IO BrowserThread.
Expand Down Expand Up @@ -164,10 +164,11 @@ void CastRemotingSender::FindAndBind(
sender->error_callback_ = error_callback;

sender->pipe_ = std::move(pipe);
sender->pipe_watcher_.Start(
sender->pipe_.get(), MOJO_HANDLE_SIGNAL_READABLE,
base::Bind(&CastRemotingSender::ProcessInputQueue,
base::Unretained(sender)));
sender->pipe_watcher_.Watch(sender->pipe_.get(),
MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE,
base::Bind(&CastRemotingSender::ProcessInputQueue,
base::Unretained(sender)));
sender->pipe_watcher_.ArmOrNotify();
sender->binding_.Bind(std::move(request));
sender->binding_.set_connection_error_handler(sender->error_callback_);
}
Expand Down Expand Up @@ -378,8 +379,10 @@ bool CastRemotingSender::TryConsumeDataChunk(uint32_t offset, uint32_t size,
MOJO_READ_DATA_FLAG_DISCARD | MOJO_READ_DATA_FLAG_ALL_OR_NONE);
if (result == MOJO_RESULT_OK)
return true; // Successfully discarded data.
if (result == MOJO_RESULT_OUT_OF_RANGE)
if (result == MOJO_RESULT_OUT_OF_RANGE) {
pipe_watcher_.ArmOrNotify();
return false; // Retry later.
}
LOG(ERROR) << SENDER_SSRC
<< "Unexpected result when discarding from data pipe ("
<< result << ')';
Expand All @@ -395,8 +398,10 @@ bool CastRemotingSender::TryConsumeDataChunk(uint32_t offset, uint32_t size,
MOJO_READ_DATA_FLAG_ALL_OR_NONE);
if (result == MOJO_RESULT_OK)
return true; // Successfully consumed data.
if (result == MOJO_RESULT_OUT_OF_RANGE)
if (result == MOJO_RESULT_OUT_OF_RANGE) {
pipe_watcher_.ArmOrNotify();
return false; // Retry later.
}
LOG(ERROR)
<< SENDER_SSRC << "Read from data pipe failed (" << result << ')';
} while (false);
Expand Down
4 changes: 2 additions & 2 deletions chrome/browser/media/cast_remoting_sender.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
#include "media/cast/net/rtcp/rtcp_defines.h"
#include "media/mojo/interfaces/remoting.mojom.h"
#include "mojo/public/cpp/bindings/binding.h"
#include "mojo/public/cpp/system/watcher.h"
#include "mojo/public/cpp/system/simple_watcher.h"

namespace cast {

Expand Down Expand Up @@ -196,7 +196,7 @@ class CastRemotingSender : public media::mojom::RemotingDataStreamSender {

// Watches |pipe_| for more data to become available, and then calls
// ProcessInputQueue().
mojo::Watcher pipe_watcher_;
mojo::SimpleWatcher pipe_watcher_;

// Set to true if the first frame has not yet been sent, or if a
// CancelInFlightData() operation just completed. This causes TrySendFrame()
Expand Down
12 changes: 9 additions & 3 deletions content/browser/loader/mojo_async_resource_handler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ MojoAsyncResourceHandler::MojoAsyncResourceHandler(
: ResourceHandler(request),
rdh_(rdh),
binding_(this, std::move(mojo_request)),
handle_watcher_(FROM_HERE),
handle_watcher_(FROM_HERE, mojo::SimpleWatcher::ArmingPolicy::MANUAL),
url_loader_client_(std::move(url_loader_client)),
weak_factory_(this) {
DCHECK(url_loader_client_);
Expand Down Expand Up @@ -245,9 +245,10 @@ void MojoAsyncResourceHandler::OnWillRead(

response_body_consumer_handle_ = std::move(data_pipe.consumer_handle);
shared_writer_ = new SharedWriter(std::move(data_pipe.producer_handle));
handle_watcher_.Start(shared_writer_->writer(), MOJO_HANDLE_SIGNAL_WRITABLE,
handle_watcher_.Watch(shared_writer_->writer(), MOJO_HANDLE_SIGNAL_WRITABLE,
base::Bind(&MojoAsyncResourceHandler::OnWritable,
base::Unretained(this)));
handle_watcher_.ArmOrNotify();

bool defer = false;
scoped_refptr<net::IOBufferWithSize> buffer;
Expand Down Expand Up @@ -388,11 +389,16 @@ MojoResult MojoAsyncResourceHandler::BeginWrite(void** data,
shared_writer_->writer(), data, available, MOJO_WRITE_DATA_FLAG_NONE);
if (result == MOJO_RESULT_OK)
*available = std::min(*available, static_cast<uint32_t>(kMaxChunkSize));
else if (result == MOJO_RESULT_SHOULD_WAIT)
handle_watcher_.ArmOrNotify();
return result;
}

MojoResult MojoAsyncResourceHandler::EndWrite(uint32_t written) {
return mojo::EndWriteDataRaw(shared_writer_->writer(), written);
MojoResult result = mojo::EndWriteDataRaw(shared_writer_->writer(), written);
if (result == MOJO_RESULT_OK)
handle_watcher_.ArmOrNotify();
return result;
}

net::IOBufferWithSize* MojoAsyncResourceHandler::GetResponseMetadata(
Expand Down
4 changes: 2 additions & 2 deletions content/browser/loader/mojo_async_resource_handler.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
#include "content/public/common/resource_type.h"
#include "mojo/public/cpp/bindings/associated_binding.h"
#include "mojo/public/cpp/system/data_pipe.h"
#include "mojo/public/cpp/system/watcher.h"
#include "mojo/public/cpp/system/simple_watcher.h"
#include "net/base/io_buffer.h"
#include "net/base/request_priority.h"

Expand Down Expand Up @@ -137,7 +137,7 @@ class CONTENT_EXPORT MojoAsyncResourceHandler
base::TimeTicks response_started_ticks_;
int64_t reported_total_received_bytes_ = 0;

mojo::Watcher handle_watcher_;
mojo::SimpleWatcher handle_watcher_;
std::unique_ptr<mojom::URLLoader> url_loader_;
mojom::URLLoaderClientPtr url_loader_client_;
scoped_refptr<net::IOBufferWithSize> buffer_;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@
#include "base/atomic_sequence_num.h"
#include "base/macros.h"
#include "base/memory/ptr_util.h"
#include "base/run_loop.h"
#include "base/strings/string16.h"
#include "base/strings/utf_string_conversions.h"
#include "base/synchronization/lock.h"
#include "base/synchronization/waitable_event.h"
#include "content/browser/shared_worker/shared_worker_message_filter.h"
#include "content/browser/shared_worker/worker_storage_partition.h"
#include "content/common/view_messages.h"
Expand Down Expand Up @@ -112,11 +112,9 @@ static const unsigned long long kDocumentIDs[] = {200, 201, 202};
static const int kRenderFrameRouteIDs[] = {300, 301, 302};

void BlockingReadFromMessagePort(MessagePort port, base::string16* message) {
base::WaitableEvent event(base::WaitableEvent::ResetPolicy::MANUAL,
base::WaitableEvent::InitialState::NOT_SIGNALED);
port.SetCallback(
base::Bind(&base::WaitableEvent::Signal, base::Unretained(&event)));
event.Wait();
base::RunLoop run_loop;
port.SetCallback(run_loop.QuitClosure());
run_loop.Run();

std::vector<MessagePort> should_be_empty;
EXPECT_TRUE(port.GetMessage(message, &should_be_empty));
Expand Down
24 changes: 12 additions & 12 deletions content/child/url_response_body_consumer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -48,15 +48,15 @@ URLResponseBodyConsumer::URLResponseBodyConsumer(
: request_id_(request_id),
resource_dispatcher_(resource_dispatcher),
handle_(std::move(handle)),
handle_watcher_(FROM_HERE, task_runner),
handle_watcher_(FROM_HERE,
mojo::SimpleWatcher::ArmingPolicy::MANUAL,
task_runner),
task_runner_(task_runner),
has_seen_end_of_data_(!handle_.is_valid()) {
handle_watcher_.Start(
handle_watcher_.Watch(
handle_.get(), MOJO_HANDLE_SIGNAL_READABLE,
base::Bind(&URLResponseBodyConsumer::OnReadable, base::Unretained(this)));
task_runner_->PostTask(
FROM_HERE, base::Bind(&URLResponseBodyConsumer::OnReadable, AsWeakPtr(),
MOJO_RESULT_OK));
handle_watcher_.ArmOrNotify();
}

URLResponseBodyConsumer::~URLResponseBodyConsumer() {}
Expand Down Expand Up @@ -91,9 +91,7 @@ void URLResponseBodyConsumer::Reclaim(uint32_t size) {
if (is_in_on_readable_)
return;

task_runner_->PostTask(
FROM_HERE, base::Bind(&URLResponseBodyConsumer::OnReadable, AsWeakPtr(),
MOJO_RESULT_OK));
handle_watcher_.ArmOrNotify();
}

void URLResponseBodyConsumer::OnReadable(MojoResult unused) {
Expand All @@ -112,7 +110,11 @@ void URLResponseBodyConsumer::OnReadable(MojoResult unused) {
uint32_t available = 0;
MojoResult result = mojo::BeginReadDataRaw(
handle_.get(), &buffer, &available, MOJO_READ_DATA_FLAG_NONE);
if (result == MOJO_RESULT_SHOULD_WAIT || result == MOJO_RESULT_BUSY)
if (result == MOJO_RESULT_SHOULD_WAIT) {
handle_watcher_.ArmOrNotify();
return;
}
if (result == MOJO_RESULT_BUSY)
return;
if (result == MOJO_RESULT_FAILED_PRECONDITION) {
has_seen_end_of_data_ = true;
Expand All @@ -134,9 +136,7 @@ void URLResponseBodyConsumer::OnReadable(MojoResult unused) {
// to the next task.
result = mojo::EndReadDataRaw(handle_.get(), 0);
DCHECK_EQ(result, MOJO_RESULT_OK);
task_runner_->PostTask(FROM_HERE,
base::Bind(&URLResponseBodyConsumer::OnReadable,
AsWeakPtr(), MOJO_RESULT_OK));
handle_watcher_.ArmOrNotify();
return;
}
num_bytes_consumed += available;
Expand Down
4 changes: 2 additions & 2 deletions content/child/url_response_body_consumer.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
#include "content/common/content_export.h"
#include "content/common/url_loader.mojom.h"
#include "mojo/public/cpp/system/data_pipe.h"
#include "mojo/public/cpp/system/watcher.h"
#include "mojo/public/cpp/system/simple_watcher.h"

namespace content {

Expand Down Expand Up @@ -71,7 +71,7 @@ class CONTENT_EXPORT URLResponseBodyConsumer final
const int request_id_;
ResourceDispatcher* resource_dispatcher_;
mojo::ScopedDataPipeConsumerHandle handle_;
mojo::Watcher handle_watcher_;
mojo::SimpleWatcher handle_watcher_;
ResourceRequestCompletionStatus completion_status_;
scoped_refptr<base::SingleThreadTaskRunner> task_runner_;

Expand Down
6 changes: 4 additions & 2 deletions content/child/web_data_consumer_handle_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,9 @@ class WebDataConsumerHandleImpl::Context
WebDataConsumerHandleImpl::ReaderImpl::ReaderImpl(
scoped_refptr<Context> context,
Client* client)
: context_(context), handle_watcher_(FROM_HERE), client_(client) {
: context_(context),
handle_watcher_(FROM_HERE, mojo::SimpleWatcher::ArmingPolicy::AUTOMATIC),
client_(client) {
if (client_)
StartWatching();
}
Expand Down Expand Up @@ -130,7 +132,7 @@ Result WebDataConsumerHandleImpl::ReaderImpl::HandleReadResult(
}

void WebDataConsumerHandleImpl::ReaderImpl::StartWatching() {
handle_watcher_.Start(
handle_watcher_.Watch(
context_->handle().get(), MOJO_HANDLE_SIGNAL_READABLE,
base::Bind(&ReaderImpl::OnHandleGotReadable, base::Unretained(this)));
}
Expand Down
4 changes: 2 additions & 2 deletions content/child/web_data_consumer_handle_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

#include "content/common/content_export.h"
#include "mojo/public/cpp/system/data_pipe.h"
#include "mojo/public/cpp/system/watcher.h"
#include "mojo/public/cpp/system/simple_watcher.h"
#include "third_party/WebKit/public/platform/WebDataConsumerHandle.h"

namespace content {
Expand Down Expand Up @@ -41,7 +41,7 @@ class CONTENT_EXPORT WebDataConsumerHandleImpl final
void OnHandleGotReadable(MojoResult);

scoped_refptr<Context> context_;
mojo::Watcher handle_watcher_;
mojo::SimpleWatcher handle_watcher_;
Client* client_;

DISALLOW_COPY_AND_ASSIGN(ReaderImpl);
Expand Down
88 changes: 68 additions & 20 deletions content/common/message_port.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@

#include "content/common/message_port.h"

#include "base/bind.h"
#include "base/logging.h"
#include "base/threading/thread_task_runner_handle.h"

namespace content {

Expand Down Expand Up @@ -147,40 +149,86 @@ void MessagePort::State::AddWatch() {
if (!callback_)
return;

DCHECK(!watcher_handle_.is_valid());
MojoResult rv = CreateWatcher(&State::CallOnHandleReady, &watcher_handle_);
DCHECK_EQ(MOJO_RESULT_OK, rv);

// We use a scoped_refptr<State> instance as the watch context. This is owned
// by the watch and deleted upon receiving a cancellation notification.
scoped_refptr<State>* state_ref = new scoped_refptr<State>(this);
context_ = reinterpret_cast<uintptr_t>(state_ref);

// NOTE: An HTML MessagePort does not receive an event to tell it when the
// peer has gone away, so we only watch for readability here.
MojoResult rv = MojoWatch(handle_.get().value(),
MOJO_HANDLE_SIGNAL_READABLE,
&MessagePort::State::OnHandleReady,
reinterpret_cast<uintptr_t>(this));
if (rv != MOJO_RESULT_OK)
DVLOG(1) << this << " MojoWatch failed: " << rv;
rv = MojoWatch(watcher_handle_.get().value(), handle_.get().value(),
MOJO_HANDLE_SIGNAL_READABLE, context_);
DCHECK_EQ(MOJO_RESULT_OK, rv);

ArmWatcher();
}

void MessagePort::State::CancelWatch() {
if (!callback_)
watcher_handle_.reset();
context_ = 0;
}

MessagePort::State::~State() = default;

void MessagePort::State::ArmWatcher() {
if (!watcher_handle_.is_valid())
return;

// NOTE: This synchronizes with the thread where OnHandleReady runs so we are
// sure to not be racing with it.
MojoCancelWatch(handle_.get().value(), reinterpret_cast<uintptr_t>(this));
uint32_t num_ready_contexts = 1;
uintptr_t ready_context;
MojoResult ready_result;
MojoHandleSignalsState ready_state;
MojoResult rv =
MojoArmWatcher(watcher_handle_.get().value(), &num_ready_contexts,
&ready_context, &ready_result, &ready_state);
if (rv == MOJO_RESULT_OK)
return;

// The watcher could not be armed because it would notify immediately.
DCHECK_EQ(MOJO_RESULT_FAILED_PRECONDITION, rv);
DCHECK_EQ(1u, num_ready_contexts);
DCHECK_EQ(context_, ready_context);

if (ready_result == MOJO_RESULT_OK) {
// The handle is already signaled, so we trigger a callback now.
base::ThreadTaskRunnerHandle::Get()->PostTask(
FROM_HERE, base::Bind(&State::OnHandleReady, this, MOJO_RESULT_OK));
return;
}

if (ready_result == MOJO_RESULT_FAILED_PRECONDITION) {
DVLOG(1) << this << " MojoArmWatcher failed because of a broken pipe.";
return;
}

NOTREACHED();
}

// static
void MessagePort::State::OnHandleReady(
uintptr_t context,
MojoResult result,
MojoHandleSignalsState signals_state,
MojoWatchNotificationFlags flags) {
if (result == MOJO_RESULT_OK) {
reinterpret_cast<MessagePort::State*>(context)->callback_.Run();
void MessagePort::State::OnHandleReady(MojoResult result) {
if (result == MOJO_RESULT_OK && callback_) {
callback_.Run();
ArmWatcher();
} else {
// And now his watch is ended.
}
}

MessagePort::State::~State() {
CancelWatch();
// static
void MessagePort::State::CallOnHandleReady(uintptr_t context,
MojoResult result,
MojoHandleSignalsState signals_state,
MojoWatcherNotificationFlags flags) {
auto* state_ref = reinterpret_cast<scoped_refptr<State>*>(context);
if (result == MOJO_RESULT_CANCELLED) {
// Last notification. Delete the watch's owned State ref.
delete state_ref;
} else {
(*state_ref)->OnHandleReady(result);
}
}

} // namespace content
Loading

0 comments on commit 9eadaba

Please sign in to comment.