Skip to content

Commit

Permalink
Provide a Mojo equivalent of ThreadSafeSender.
Browse files Browse the repository at this point in the history
Introduces the ThreadSafeAssociatedInterfacePtrProvider class that
provides functionalities similar to the content::ThreadSafeSender.
You create it with a ChannelProxy and you can then retrieve
ThreadSafeInterfacePtr's from it that you can call methods on from any
thread and even before the actual channel is connected.

BUG=668317

Review-Url: https://codereview.chromium.org/2522333002
Cr-Commit-Position: refs/heads/master@{#435005}
  • Loading branch information
jcivelli authored and Commit bot committed Nov 29, 2016
1 parent 6e97ef0 commit 315d17f
Show file tree
Hide file tree
Showing 8 changed files with 257 additions and 20 deletions.
1 change: 1 addition & 0 deletions content/renderer/BUILD.gn
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,7 @@ target(link_target_type, "renderer") {
"mojo/blink_interface_registry_impl.h",
"mojo/interface_provider_js_wrapper.cc",
"mojo/interface_provider_js_wrapper.h",
"mojo/thread_safe_associated_interface_ptr_provider.h",
"mojo_bindings_controller.cc",
"mojo_bindings_controller.h",
"mojo_context_state.cc",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
// Copyright 2016 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#ifndef CONTENT_RENDERER_MOJO_THREAD_SAFE_ASSOCIATED_INTERFACE_PTR_PROVIDER_H_
#define CONTENT_RENDERER_MOJO_THREAD_SAFE_ASSOCIATED_INTERFACE_PTR_PROVIDER_H_

#include "base/bind.h"
#include "base/macros.h"
#include "base/memory/ref_counted.h"
#include "ipc/ipc_channel_proxy.h"
#include "mojo/public/cpp/bindings/thread_safe_interface_ptr.h"

namespace content {

// This class provides a way to create ThreadSafeAssociatedInterfacePtr's from
// the main thread that can be used right away (even though the backing
// AssociatedInterfacePtr is created on the IO thread and the channel may not be
// connected yet).
class ThreadSafeAssociatedInterfacePtrProvider {
public:
// Note that this does not take ownership of |channel_proxy|. It's the
// caller responsibility to ensure |channel_proxy| outlives this object.
explicit ThreadSafeAssociatedInterfacePtrProvider(
IPC::ChannelProxy* channel_proxy)
: channel_proxy_(channel_proxy) {}

template <typename Interface>
scoped_refptr<mojo::ThreadSafeAssociatedInterfacePtr<Interface>>
CreateInterfacePtr() {
scoped_refptr<mojo::ThreadSafeAssociatedInterfacePtr<Interface>> ptr =
mojo::ThreadSafeAssociatedInterfacePtr<Interface>::CreateUnbound();
channel_proxy_->RetrieveAssociatedInterfaceOnIOThread<Interface>(base::Bind(
&ThreadSafeAssociatedInterfacePtrProvider::BindInterfacePtr<Interface>,
ptr));
return ptr;
}

private:
template <typename Interface>
static void BindInterfacePtr(
const scoped_refptr<mojo::ThreadSafeAssociatedInterfacePtr<Interface>>&
ptr,
mojo::AssociatedInterfacePtr<Interface> interface_ptr) {
bool success = ptr->Bind(std::move(interface_ptr));
DCHECK(success);
}

IPC::ChannelProxy* channel_proxy_;

DISALLOW_COPY_AND_ASSIGN(ThreadSafeAssociatedInterfacePtrProvider);
};

} // namespace content

#endif // CONTENT_RENDERER_MOJO_THREAD_SAFE_ASSOCIATED_INTERFACE_PTR_PROVIDER_H_
19 changes: 19 additions & 0 deletions content/renderer/render_thread_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@
#include "content/renderer/media/render_media_client.h"
#include "content/renderer/media/renderer_gpu_video_accelerator_factories.h"
#include "content/renderer/media/video_capture_impl_manager.h"
#include "content/renderer/mojo/thread_safe_associated_interface_ptr_provider.h"
#include "content/renderer/net_info_helper.h"
#include "content/renderer/p2p/socket_dispatcher.h"
#include "content/renderer/render_frame_proxy.h"
Expand Down Expand Up @@ -580,6 +581,13 @@ mojom::RenderMessageFilter* RenderThreadImpl::current_render_message_filter() {
return current()->render_message_filter();
}

// static
const scoped_refptr<mojom::ThreadSafeRenderMessageFilterAssociatedPtr>&
RenderThreadImpl::current_thread_safe_render_message_filter() {
DCHECK(current());
return current()->thread_safe_render_message_filter();
}

// static
void RenderThreadImpl::SetRenderMessageFilterForTesting(
mojom::RenderMessageFilter* render_message_filter) {
Expand Down Expand Up @@ -687,6 +695,12 @@ void RenderThreadImpl::Init(
db_message_filter_ = new DBMessageFilter();
AddFilter(db_message_filter_.get());

thread_safe_associated_interface_ptr_provider_ =
base::MakeUnique<ThreadSafeAssociatedInterfacePtrProvider>(channel());
thread_safe_render_message_filter_ =
thread_safe_associated_interface_ptr_provider_
->CreateInterfacePtr<mojom::RenderMessageFilter>();

vc_manager_.reset(new VideoCaptureImplManager());

browser_plugin_manager_.reset(new BrowserPluginManager());
Expand Down Expand Up @@ -2110,6 +2124,11 @@ mojom::RenderMessageFilter* RenderThreadImpl::render_message_filter() {
return render_message_filter_.get();
}

const scoped_refptr<mojom::ThreadSafeRenderMessageFilterAssociatedPtr>&
RenderThreadImpl::thread_safe_render_message_filter() {
return thread_safe_render_message_filter_;
}

gpu::GpuChannelHost* RenderThreadImpl::GetGpuChannel() {
if (!gpu_channel_)
return nullptr;
Expand Down
10 changes: 10 additions & 0 deletions content/renderer/render_thread_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
#include "media/media_features.h"
#include "mojo/public/cpp/bindings/associated_binding.h"
#include "mojo/public/cpp/bindings/binding.h"
#include "mojo/public/cpp/bindings/thread_safe_interface_ptr.h"
#include "net/base/network_change_notifier.h"
#include "third_party/WebKit/public/platform/WebConnectionType.h"
#include "third_party/WebKit/public/platform/scheduler/renderer/renderer_scheduler.h"
Expand Down Expand Up @@ -126,6 +127,7 @@ class RenderThreadObserver;
class RendererBlinkPlatformImpl;
class RendererGpuVideoAcceleratorFactories;
class ResourceDispatchThrottler;
class ThreadSafeAssociatedInterfacePtrProvider;
class VideoCaptureImplManager;

#if defined(OS_ANDROID)
Expand Down Expand Up @@ -164,6 +166,8 @@ class CONTENT_EXPORT RenderThreadImpl
std::unique_ptr<blink::scheduler::RendererScheduler> renderer_scheduler);
static RenderThreadImpl* current();
static mojom::RenderMessageFilter* current_render_message_filter();
static const scoped_refptr<mojom::ThreadSafeRenderMessageFilterAssociatedPtr>&
current_thread_safe_render_message_filter();

static void SetRenderMessageFilterForTesting(
mojom::RenderMessageFilter* render_message_filter);
Expand Down Expand Up @@ -343,6 +347,8 @@ class CONTENT_EXPORT RenderThreadImpl

mojom::RenderFrameMessageFilter* render_frame_message_filter();
mojom::RenderMessageFilter* render_message_filter();
const scoped_refptr<mojom::ThreadSafeRenderMessageFilterAssociatedPtr>&
thread_safe_render_message_filter();

// Get the GPU channel. Returns NULL if the channel is not established or
// has been lost.
Expand Down Expand Up @@ -590,6 +596,8 @@ class CONTENT_EXPORT RenderThreadImpl

// Used on the render thread.
std::unique_ptr<VideoCaptureImplManager> vc_manager_;
std::unique_ptr<ThreadSafeAssociatedInterfacePtrProvider>
thread_safe_associated_interface_ptr_provider_;

// The count of RenderWidgets running through this thread.
int widget_count_;
Expand Down Expand Up @@ -731,6 +739,8 @@ class CONTENT_EXPORT RenderThreadImpl

mojom::RenderFrameMessageFilterAssociatedPtr render_frame_message_filter_;
mojom::RenderMessageFilterAssociatedPtr render_message_filter_;
scoped_refptr<mojom::ThreadSafeRenderMessageFilterAssociatedPtr>
thread_safe_render_message_filter_;

base::CancelableClosure record_purge_suspend_metric_closure_;
bool is_renderer_suspended_;
Expand Down
22 changes: 21 additions & 1 deletion ipc/ipc_channel_proxy.h
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,19 @@ class IPC_EXPORT ChannelProxy : public Sender, public base::NonThreadSafe {
}
#endif

template <typename Interface>
using AssociatedInterfaceRetrievedCallback =
base::Callback<void(mojo::AssociatedInterfacePtr<Interface>)>;
// Creates an AssociatedInterfacePtr to |Interface| on the IO thread and
// passes it to |callback|, also invoked on the IO thread.
template <typename Interface>
void RetrieveAssociatedInterfaceOnIOThread(
const AssociatedInterfaceRetrievedCallback<Interface>& callback) {
context_->ipc_task_runner()->PostTask(
FROM_HERE, base::Bind(&Context::RetrieveAssociatedInterface<Interface>,
context_, callback));
}

// Called to clear the pointer to the IPC task runner when it's going away.
void ClearIPCTaskRunner();

Expand All @@ -239,7 +252,6 @@ class IPC_EXPORT ChannelProxy : public Sender, public base::NonThreadSafe {
// to the internal state.
explicit ChannelProxy(Context* context);


// Used internally to hold state that is referenced on the IPC thread.
class Context : public base::RefCountedThreadSafe<Context>,
public Listener {
Expand Down Expand Up @@ -304,6 +316,14 @@ class IPC_EXPORT ChannelProxy : public Sender, public base::NonThreadSafe {
void OnSendMessage(std::unique_ptr<Message> message_ptr);
void OnAddFilter();
void OnRemoveFilter(MessageFilter* filter);
template <typename Interface>
void RetrieveAssociatedInterface(
const AssociatedInterfaceRetrievedCallback<Interface>& callback) {
mojo::AssociatedInterfacePtr<Interface> interface_ptr;
channel_->GetAssociatedInterfaceSupport()->GetRemoteAssociatedInterface(
&interface_ptr);
callback.Run(std::move(interface_ptr));
}

// Methods called on the listener thread.
void AddFilter(MessageFilter* filter);
Expand Down
63 changes: 63 additions & 0 deletions mojo/public/cpp/bindings/tests/associated_interface_unittest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1050,6 +1050,69 @@ TEST_F(AssociatedInterfaceTest, ThreadSafeAssociatedInterfacePtr) {
run_loop.Run();
}

struct ForwarderTestContext {
IntegerSenderConnectionPtr connection_ptr;
std::unique_ptr<IntegerSenderConnectionImpl> interface_impl;
};

TEST_F(AssociatedInterfaceTest, BindLaterThreadSafeAssociatedInterfacePtr) {
// Create a ThreadSafeAssociatedPtr that we'll bind from a different thread.
scoped_refptr<ThreadSafeIntegerSenderAssociatedPtr> thread_safe_ptr =
ThreadSafeIntegerSenderAssociatedPtr::CreateUnbound();

// Start the thread from where we'll bind the interface pointer.
base::Thread other_thread("service test thread");
other_thread.Start();
const scoped_refptr<base::SingleThreadTaskRunner>& other_thread_task_runner =
other_thread.message_loop()->task_runner();
ForwarderTestContext* context = new ForwarderTestContext();
{
base::RunLoop run_loop;
auto run_method = base::Bind(
[](const scoped_refptr<base::TaskRunner>& main_task_runner,
const base::Closure& quit_closure,
const scoped_refptr<ThreadSafeIntegerSenderAssociatedPtr>&
thread_safe_ptr,
ForwarderTestContext* context) {
// We are on the background thread, create the interface ptr.
context->interface_impl =
base::MakeUnique<IntegerSenderConnectionImpl>(
GetProxy(&(context->connection_ptr)));
IntegerSenderAssociatedPtr sender;
context->connection_ptr->GetSender(
GetProxy(&sender, context->connection_ptr.associated_group()));
thread_safe_ptr->Bind(std::move(sender));
main_task_runner->PostTask(FROM_HERE, quit_closure);
},
base::SequencedTaskRunnerHandle::Get(), run_loop.QuitClosure(),
thread_safe_ptr, context);

other_thread_task_runner->PostTask(FROM_HERE, run_method);
// Block until the associated pointer is bound.
run_loop.Run();
}

{
// Now we can call methods on the interface from the main thread.
auto echo_callback =
base::Bind([](const base::Closure& quit_closure, int32_t result) {
EXPECT_EQ(123, result);
quit_closure.Run();
});
base::RunLoop run_loop;
(*thread_safe_ptr)
->Echo(123, base::Bind(echo_callback, run_loop.QuitClosure()));
// Block until the method callback is called.
run_loop.Run();
}

other_thread_task_runner->DeleteSoon(FROM_HERE, context);

// Reset the pointer now so the InterfacePtr associated resources can be
// deleted before the background thread's message loop is invalidated.
thread_safe_ptr = nullptr;
}

} // namespace
} // namespace test
} // namespace mojo
52 changes: 52 additions & 0 deletions mojo/public/cpp/bindings/tests/interface_ptr_unittest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -851,6 +851,58 @@ TEST_F(InterfacePtrTest, ThreadSafeInterfacePointer) {
run_loop.Run();
}

TEST_F(InterfacePtrTest, BindLaterThreadSafeInterfacePointer) {
// Create a ThreadSafeInterfacePtr that we'll bind from a different thread.
scoped_refptr<math::ThreadSafeCalculatorPtr> thread_safe_ptr =
math::ThreadSafeCalculatorPtr::CreateUnbound();
ASSERT_TRUE(thread_safe_ptr);

// Create and start the thread from where we'll bind the interface pointer.
base::Thread other_thread("service test thread");
other_thread.Start();
const scoped_refptr<base::SingleThreadTaskRunner>& other_thread_task_runner =
other_thread.message_loop()->task_runner();
MathCalculatorImpl* math_calc_impl = nullptr;
{
base::RunLoop run_loop;
auto run_method = base::Bind(
[](const scoped_refptr<base::TaskRunner>& main_task_runner,
const base::Closure& quit_closure,
const scoped_refptr<math::ThreadSafeCalculatorPtr>& thread_safe_ptr,
MathCalculatorImpl** math_calc_impl) {
math::CalculatorPtr ptr;
// In real life, the implementation would have a legitimate owner.
*math_calc_impl = new MathCalculatorImpl(GetProxy(&ptr));
thread_safe_ptr->Bind(std::move(ptr));
main_task_runner->PostTask(FROM_HERE, quit_closure);
},
base::SequencedTaskRunnerHandle::Get(), run_loop.QuitClosure(),
thread_safe_ptr, &math_calc_impl);
other_thread.message_loop()->task_runner()->PostTask(FROM_HERE, run_method);
run_loop.Run();
}

{
// The interface ptr is bound, we can call methods on it.
auto calc_callback =
base::Bind([](const base::Closure& quit_closure, double result) {
EXPECT_EQ(123, result);
quit_closure.Run();
});
base::RunLoop run_loop;
(*thread_safe_ptr)
->Add(123, base::Bind(calc_callback, run_loop.QuitClosure()));
// Block until the method callback is called.
run_loop.Run();
}

other_thread_task_runner->DeleteSoon(FROM_HERE, math_calc_impl);

// Reset the pointer now so the InterfacePtr associated resources can be
// deleted before the background thread's message loop is invalidated.
thread_safe_ptr = nullptr;
}

} // namespace
} // namespace test
} // namespace mojo
Loading

0 comments on commit 315d17f

Please sign in to comment.