Skip to content

Commit

Permalink
Make mojo::Callback safe to copy across threads and destroy on any th…
Browse files Browse the repository at this point in the history
…read.

This brings the behaviour of mojo::Callback close to that of base::Callback.

BUG=595939

Review URL: https://codereview.chromium.org/1819463002

Cr-Commit-Position: refs/heads/master@{#382349}
  • Loading branch information
akmistry authored and Commit bot committed Mar 21, 2016
1 parent e233d14 commit 314a150
Show file tree
Hide file tree
Showing 6 changed files with 99 additions and 32 deletions.
27 changes: 19 additions & 8 deletions mojo/public/cpp/bindings/callback.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@

#include <utility>

#include "base/memory/ref_counted.h"
#include "base/memory/scoped_ptr.h"
#include "mojo/public/cpp/bindings/lib/callback_internal.h"
#include "mojo/public/cpp/bindings/lib/shared_ptr.h"
#include "mojo/public/cpp/bindings/lib/template_util.h"

namespace mojo {
Expand Down Expand Up @@ -40,7 +41,7 @@ class Callback<void(Args...)> {

// Constructs a callback that will run |runnable|. The callback takes
// ownership of |runnable|.
explicit Callback(Runnable* runnable) : sink_(runnable) {}
explicit Callback(Runnable* runnable) : sink_(new RunnableHolder(runnable)) {}

// As above, but can take an object that isn't derived from Runnable, so long
// as it has a compatible operator() or Run() method. operator() will be
Expand All @@ -50,27 +51,27 @@ class Callback<void(Args...)> {
using sink_type = typename internal::Conditional<
internal::HasCompatibleCallOperator<Sink, Args...>::value,
FunctorAdapter<Sink>, RunnableAdapter<Sink>>::type;
sink_ = internal::SharedPtr<Runnable>(new sink_type(sink));
sink_ = new RunnableHolder(new sink_type(sink));
}

// As above, but can take a compatible function pointer.
Callback(void (*function_ptr)(
typename internal::Callback_ParamTraits<Args>::ForwardType...))
: sink_(new FunctionPtrAdapter(function_ptr)) {}
: sink_(new RunnableHolder(new FunctionPtrAdapter(function_ptr))) {}

// Executes the callback function.
void Run(typename internal::Callback_ParamTraits<Args>::ForwardType... args)
const {
if (sink_.get())
sink_->Run(std::forward<
if (sink_)
sink_->runnable->Run(std::forward<
typename internal::Callback_ParamTraits<Args>::ForwardType>(
args)...);
}

bool is_null() const { return !sink_.get(); }

// Resets the callback to the "null" state.
void reset() { sink_.reset(); }
void reset() { sink_ = nullptr; }

private:
// Adapts a class that has a Run() method but is not derived from Runnable to
Expand Down Expand Up @@ -123,7 +124,17 @@ class Callback<void(Args...)> {
FunctionPtr function_ptr;
};

internal::SharedPtr<Runnable> sink_;
struct RunnableHolder : public base::RefCountedThreadSafe<RunnableHolder> {
explicit RunnableHolder(Runnable* runnable) : runnable(runnable) {}

scoped_ptr<Runnable> runnable;

private:
friend class base::RefCountedThreadSafe<RunnableHolder>;
~RunnableHolder() {}
};

scoped_refptr<RunnableHolder> sink_;
};

// A specialization of Callback which takes no parameters.
Expand Down
43 changes: 32 additions & 11 deletions mojo/public/cpp/bindings/lib/interface_endpoint_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,12 @@
#include <utility>

#include "base/bind.h"
#include "base/location.h"
#include "base/macros.h"
#include "base/message_loop/message_loop.h"
#include "base/single_thread_task_runner.h"
#include "base/stl_util.h"
#include "base/thread_task_runner_handle.h"
#include "mojo/public/cpp/bindings/associated_group.h"
#include "mojo/public/cpp/bindings/lib/multiplex_router.h"

Expand All @@ -21,6 +24,12 @@ namespace internal {

namespace {

void DCheckIfInvalid(const base::WeakPtr<InterfaceEndpointClient>& client,
const std::string& message) {
bool is_valid = client && !client->encountered_error();
DCHECK(!is_valid) << message;
}

// When receiving an incoming message which expects a repsonse,
// InterfaceEndpointClient creates a ResponderThunk object and passes it to the
// incoming message receiver. When the receiver finishes processing the message,
Expand All @@ -29,21 +38,21 @@ class ResponderThunk : public MessageReceiverWithStatus {
public:
explicit ResponderThunk(
const base::WeakPtr<InterfaceEndpointClient>& endpoint_client)
: endpoint_client_(endpoint_client), accept_was_invoked_(false) {}
: endpoint_client_(endpoint_client), accept_was_invoked_(false),
task_runner_(base::ThreadTaskRunnerHandle::Get()) {}
~ResponderThunk() override {
if (!accept_was_invoked_) {
// The Mojo application handled a message that was expecting a response
// but did not send a response.
if (endpoint_client_) {
// We raise an error to signal the calling application that an error
// condition occurred. Without this the calling application would have
// no way of knowing it should stop waiting for a response.
//
// We raise the error asynchronously and only if |endpoint_client_| is
// still alive when the task is run. That way it won't break the case
// where the user abandons the interface endpoint client soon after
// he/she abandons the callback.
base::MessageLoop::current()->PostTask(
if (task_runner_->RunsTasksOnCurrentThread()) {
if (endpoint_client_) {
// We raise an error to signal the calling application that an error
// condition occurred. Without this the calling application would have
// no way of knowing it should stop waiting for a response.
endpoint_client_->RaiseError();
}
} else {
task_runner_->PostTask(
FROM_HERE,
base::Bind(&InterfaceEndpointClient::RaiseError, endpoint_client_));
}
Expand All @@ -52,6 +61,7 @@ class ResponderThunk : public MessageReceiverWithStatus {

// MessageReceiver implementation:
bool Accept(Message* message) override {
DCHECK(task_runner_->RunsTasksOnCurrentThread());
accept_was_invoked_ = true;
DCHECK(message->has_flag(kMessageIsResponse));

Expand All @@ -65,12 +75,23 @@ class ResponderThunk : public MessageReceiverWithStatus {

// MessageReceiverWithStatus implementation:
bool IsValid() override {
DCHECK(task_runner_->RunsTasksOnCurrentThread());
return endpoint_client_ && !endpoint_client_->encountered_error();
}

void DCheckInvalid(const std::string& message) override {
if (task_runner_->RunsTasksOnCurrentThread()) {
DCheckIfInvalid(endpoint_client_, message);
} else {
task_runner_->PostTask(
FROM_HERE, base::Bind(&DCheckIfInvalid, endpoint_client_, message));
}
}

private:
base::WeakPtr<InterfaceEndpointClient> endpoint_client_;
bool accept_was_invoked_;
scoped_refptr<base::SingleThreadTaskRunner> task_runner_;

DISALLOW_COPY_AND_ASSIGN(ResponderThunk);
};
Expand Down
39 changes: 33 additions & 6 deletions mojo/public/cpp/bindings/lib/router.cc
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,12 @@
#include <utility>

#include "base/bind.h"
#include "base/location.h"
#include "base/logging.h"
#include "base/message_loop/message_loop.h"
#include "base/single_thread_task_runner.h"
#include "base/stl_util.h"
#include "base/thread_task_runner_handle.h"

namespace mojo {
namespace internal {
Expand All @@ -19,25 +22,38 @@ namespace internal {

namespace {

void DCheckIfInvalid(const base::WeakPtr<Router>& router,
const std::string& message) {
bool is_valid = router && !router->encountered_error() && router->is_valid();
DCHECK(!is_valid) << message;
}

class ResponderThunk : public MessageReceiverWithStatus {
public:
explicit ResponderThunk(const base::WeakPtr<Router>& router)
: router_(router), accept_was_invoked_(false) {}
: router_(router), accept_was_invoked_(false),
task_runner_(base::ThreadTaskRunnerHandle::Get()) {}
~ResponderThunk() override {
if (!accept_was_invoked_) {
// The Mojo application handled a message that was expecting a response
// but did not send a response.
if (router_) {
// We raise an error to signal the calling application that an error
// condition occurred. Without this the calling application would have
// no way of knowing it should stop waiting for a response.
router_->RaiseError();
if (task_runner_->RunsTasksOnCurrentThread()) {
if (router_) {
// We raise an error to signal the calling application that an error
// condition occurred. Without this the calling application would have
// no way of knowing it should stop waiting for a response.
router_->RaiseError();
}
} else {
task_runner_->PostTask(FROM_HERE,
base::Bind(&Router::RaiseError, router_));
}
}
}

// MessageReceiver implementation:
bool Accept(Message* message) override {
DCHECK(task_runner_->RunsTasksOnCurrentThread());
accept_was_invoked_ = true;
DCHECK(message->has_flag(kMessageIsResponse));

Expand All @@ -51,12 +67,23 @@ class ResponderThunk : public MessageReceiverWithStatus {

// MessageReceiverWithStatus implementation:
bool IsValid() override {
DCHECK(task_runner_->RunsTasksOnCurrentThread());
return router_ && !router_->encountered_error() && router_->is_valid();
}

void DCheckInvalid(const std::string& message) override {
if (task_runner_->RunsTasksOnCurrentThread()) {
DCheckIfInvalid(router_, message);
} else {
task_runner_->PostTask(FROM_HERE,
base::Bind(&DCheckIfInvalid, router_, message));
}
}

private:
base::WeakPtr<Router> router_;
bool accept_was_invoked_;
scoped_refptr<base::SingleThreadTaskRunner> task_runner_;
};

} // namespace
Expand Down
5 changes: 5 additions & 0 deletions mojo/public/cpp/bindings/message.h
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,11 @@ class MessageReceiverWithStatus : public MessageReceiver {
// Returns |true| if this MessageReceiver is currently bound to a MessagePipe,
// the pipe has not been closed, and the pipe has not encountered an error.
virtual bool IsValid() = 0;

// DCHECKs if this MessageReceiver is currently bound to a MessagePipe, the
// pipe has not been closed, and the pipe has not encountered an error.
// This function may be called on any thread.
virtual void DCheckInvalid(const std::string& message) = 0;
};

// An alternative to MessageReceiverWithResponder for cases in which it
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,7 @@ TEST_F(BindingCallbackTest, DeleteCallbackBeforeBindingDeathTest) {
#if !defined(NDEBUG) || defined(DCHECK_ALWAYS_ON)
// Delete the callback without running it. This should cause a crash in debug
// builds due to a DCHECK.
std::string regex("Check failed: !callback_was_dropped.");
std::string regex("Check failed: !is_valid");
#if defined(OS_WIN)
// TODO(msw): Fix MOJO_DCHECK logs and EXPECT_DEATH* on Win: crbug.com/535014
regex.clear();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,15 +200,18 @@ class {{class_name}}_{{method.name}}_ProxyToResponder
: public {{class_name}}::{{method.name}}Callback::Runnable {
public:
~{{class_name}}_{{method.name}}_ProxyToResponder() override {
// Is the Mojo application destroying the callback without running it
// and without first closing the pipe?
bool callback_was_dropped = responder_ && responder_->IsValid();
#if DCHECK_IS_ON()
if (responder_) {
// Is the Mojo application destroying the callback without running it
// and without first closing the pipe?
responder_->DCheckInvalid("The callback passed to "
"{{class_name}}::{{method.name}}({%- if method.parameters -%}{{pass_params(method.parameters)}}, {% endif -%}callback) "
"was never run.");
}
#endif
// If the Callback was dropped then deleting the responder will close
// the pipe so the calling application knows to stop waiting for a reply.
delete responder_;
DCHECK(!callback_was_dropped) << "The callback passed to "
"{{class_name}}::{{method.name}}({%- if method.parameters -%}{{pass_params(method.parameters)}}, {% endif -%}callback) "
"was never run.";
}

{{class_name}}_{{method.name}}_ProxyToResponder(
Expand Down

0 comments on commit 314a150

Please sign in to comment.