diff --git a/gpu/ipc/client/gpu_channel_host.cc b/gpu/ipc/client/gpu_channel_host.cc index 105c328bebb05f..a1e981a8a6c2cc 100644 --- a/gpu/ipc/client/gpu_channel_host.cc +++ b/gpu/ipc/client/gpu_channel_host.cc @@ -52,16 +52,9 @@ GpuChannelHost::GpuChannelHost( static_cast( GpuChannelReservedRoutes::kImageDecodeAccelerator)) { mojo::PendingAssociatedRemote channel; - auto receiver = channel.InitWithNewEndpointAndPassReceiver(); - if (io_thread_->BelongsToCurrentThread()) { - listener_->Initialize(std::move(handle), std::move(receiver), io_thread_); - } else { - io_thread_->PostTask( - FROM_HERE, - base::BindOnce(&Listener::Initialize, base::Unretained(listener_.get()), - std::move(handle), std::move(receiver), io_thread_)); - } - + listener_->Initialize(std::move(handle), + channel.InitWithNewEndpointAndPassReceiver(), + io_thread_); gpu_channel_ = mojo::SharedAssociatedRemote( std::move(channel), io_thread_); @@ -297,8 +290,7 @@ void GpuChannelHost::Listener::Initialize( scoped_refptr io_task_runner) { channel_ = IPC::ChannelMojo::Create( std::move(handle), IPC::Channel::MODE_CLIENT, this, io_task_runner, - base::ThreadTaskRunnerHandle::Get(), - mojo::internal::MessageQuotaChecker::MaybeCreate()); + io_task_runner, mojo::internal::MessageQuotaChecker::MaybeCreate()); DCHECK(channel_); bool result = channel_->Connect(); DCHECK(result); diff --git a/gpu/ipc/client/gpu_channel_host.h b/gpu/ipc/client/gpu_channel_host.h index d42478d3909d77..89b3b5a040428a 100644 --- a/gpu/ipc/client/gpu_channel_host.h +++ b/gpu/ipc/client/gpu_channel_host.h @@ -166,7 +166,7 @@ class GPU_EXPORT GpuChannelHost Listener(); ~Listener() override; - // Called on the IO thread. + // Called on the GpuChannelHost's thread. void Initialize(mojo::ScopedMessagePipeHandle handle, mojo::PendingAssociatedReceiver receiver, scoped_refptr io_task_runner); diff --git a/ipc/ipc_channel_mojo.cc b/ipc/ipc_channel_mojo.cc index bdb19d6a456f83..16f70588bf65b6 100644 --- a/ipc/ipc_channel_mojo.cc +++ b/ipc/ipc_channel_mojo.cc @@ -186,21 +186,31 @@ ChannelMojo::~ChannelMojo() { } bool ChannelMojo::Connect() { - DCHECK(task_runner_->RunsTasksInCurrentSequence()); - WillConnect(); - mojo::AssociatedRemote sender; + mojo::PendingAssociatedRemote sender; mojo::PendingAssociatedReceiver receiver; bootstrap_->Connect(&sender, &receiver); DCHECK(!message_reader_); - sender->SetPeerPid(GetSelfPID()); message_reader_ = std::make_unique( - pipe_, std::move(sender), std::move(receiver), this); + pipe_, std::move(sender), std::move(receiver), task_runner_, this); + + if (task_runner_->RunsTasksInCurrentSequence()) { + FinishConnectOnIOThread(); + } else { + task_runner_->PostTask( + FROM_HERE, + base::BindOnce(&ChannelMojo::FinishConnectOnIOThread, weak_ptr_)); + } return true; } +void ChannelMojo::FinishConnectOnIOThread() { + DCHECK(message_reader_); + message_reader_->FinishInitializationOnIOThread(GetSelfPID()); +} + void ChannelMojo::Pause() { bootstrap_->Pause(); } @@ -375,6 +385,12 @@ void ChannelMojo::GetGenericRemoteAssociatedInterface( const std::string& name, mojo::ScopedInterfaceEndpointHandle handle) { if (message_reader_) { + if (!task_runner_->RunsTasksInCurrentSequence()) { + message_reader_->thread_safe_sender().GetAssociatedInterface( + name, mojo::PendingAssociatedReceiver( + std::move(handle))); + return; + } message_reader_->GetRemoteInterface(name, std::move(handle)); } else { // Attach the associated interface to a disconnected pipe, so that the diff --git a/ipc/ipc_channel_mojo.h b/ipc/ipc_channel_mojo.h index d99bb2f19f51a7..3379466834bd5c 100644 --- a/ipc/ipc_channel_mojo.h +++ b/ipc/ipc_channel_mojo.h @@ -116,6 +116,8 @@ class COMPONENT_EXPORT(IPC) ChannelMojo const std::string& name, mojo::ScopedInterfaceEndpointHandle handle) override; + void FinishConnectOnIOThread(); + base::WeakPtr weak_ptr_; // A TaskRunner which runs tasks on the ChannelMojo's owning thread. diff --git a/ipc/ipc_message_pipe_reader.cc b/ipc/ipc_message_pipe_reader.cc index eb290abd98135d..8deecd3078aa84 100644 --- a/ipc/ipc_message_pipe_reader.cc +++ b/ipc/ipc_message_pipe_reader.cc @@ -19,29 +19,83 @@ #include "base/trace_event/trace_event.h" #include "ipc/ipc_channel_mojo.h" #include "mojo/public/cpp/bindings/message.h" +#include "mojo/public/cpp/bindings/thread_safe_proxy.h" namespace IPC { namespace internal { +namespace { + +class ThreadSafeProxy : public mojo::ThreadSafeProxy { + public: + using Forwarder = base::RepeatingCallback; + + ThreadSafeProxy(scoped_refptr task_runner, + Forwarder forwarder, + mojo::AssociatedGroupController& group_controller) + : task_runner_(std::move(task_runner)), + forwarder_(std::move(forwarder)), + group_controller_(group_controller) {} + + // mojo::ThreadSafeProxy: + void SendMessage(mojo::Message& message) override { + message.SerializeHandles(&group_controller_); + task_runner_->PostTask(FROM_HERE, + base::BindOnce(forwarder_, std::move(message))); + } + + void SendMessageWithResponder( + mojo::Message& message, + std::unique_ptr responder) override { + // We don't bother supporting this because it's not used in practice. + NOTREACHED(); + } + + private: + ~ThreadSafeProxy() override = default; + + const scoped_refptr task_runner_; + const Forwarder forwarder_; + mojo::AssociatedGroupController& group_controller_; +}; + +} // namespace + MessagePipeReader::MessagePipeReader( mojo::MessagePipeHandle pipe, - mojo::AssociatedRemote sender, + mojo::PendingAssociatedRemote sender, mojo::PendingAssociatedReceiver receiver, + scoped_refptr task_runner, MessagePipeReader::Delegate* delegate) : delegate_(delegate), - sender_(std::move(sender)), - receiver_(this, std::move(receiver)) { + sender_(std::move(sender), task_runner), + receiver_(this, std::move(receiver), task_runner) { + thread_safe_sender_ = + std::make_unique>( + base::MakeRefCounted( + task_runner, + base::BindRepeating(&MessagePipeReader::ForwardMessage, + weak_ptr_factory_.GetWeakPtr()), + *sender_.internal_state()->associated_group()->GetController())); + + thread_checker_.DetachFromThread(); +} + +MessagePipeReader::~MessagePipeReader() { + DCHECK(thread_checker_.CalledOnValidThread()); + // The pipe should be closed before deletion. +} + +void MessagePipeReader::FinishInitializationOnIOThread( + base::ProcessId self_pid) { sender_.set_disconnect_handler( base::BindOnce(&MessagePipeReader::OnPipeError, base::Unretained(this), MOJO_RESULT_FAILED_PRECONDITION)); receiver_.set_disconnect_handler( base::BindOnce(&MessagePipeReader::OnPipeError, base::Unretained(this), MOJO_RESULT_FAILED_PRECONDITION)); -} -MessagePipeReader::~MessagePipeReader() { - DCHECK(thread_checker_.CalledOnValidThread()); - // The pipe should be closed before deletion. + sender_->SetPeerPid(self_pid); } void MessagePipeReader::Close() { @@ -128,5 +182,9 @@ void MessagePipeReader::OnPipeError(MojoResult error) { delegate_->OnPipeError(); } +void MessagePipeReader::ForwardMessage(mojo::Message message) { + sender_.internal_state()->ForwardMessage(std::move(message)); +} + } // namespace internal } // namespace IPC diff --git a/ipc/ipc_message_pipe_reader.h b/ipc/ipc_message_pipe_reader.h index b7f73d2a9aeefc..1894bf332eef9e 100644 --- a/ipc/ipc_message_pipe_reader.h +++ b/ipc/ipc_message_pipe_reader.h @@ -14,6 +14,7 @@ #include "base/compiler_specific.h" #include "base/component_export.h" #include "base/macros.h" +#include "base/memory/weak_ptr.h" #include "base/process/process_handle.h" #include "base/threading/thread_checker.h" #include "ipc/ipc.mojom.h" @@ -22,6 +23,7 @@ #include "mojo/public/cpp/bindings/associated_remote.h" #include "mojo/public/cpp/bindings/pending_associated_receiver.h" #include "mojo/public/cpp/bindings/scoped_interface_endpoint_handle.h" +#include "mojo/public/cpp/bindings/shared_remote.h" #include "mojo/public/cpp/system/core.h" #include "mojo/public/cpp/system/message_pipe.h" @@ -68,11 +70,14 @@ class COMPONENT_EXPORT(IPC) MessagePipeReader : public mojom::Channel { // // Note that MessagePipeReader doesn't delete |delegate|. MessagePipeReader(mojo::MessagePipeHandle pipe, - mojo::AssociatedRemote sender, + mojo::PendingAssociatedRemote sender, mojo::PendingAssociatedReceiver receiver, + scoped_refptr task_runner, Delegate* delegate); ~MessagePipeReader() override; + void FinishInitializationOnIOThread(base::ProcessId self_pid); + // Close and destroy the MessagePipe. void Close(); @@ -88,6 +93,7 @@ class COMPONENT_EXPORT(IPC) MessagePipeReader : public mojom::Channel { mojo::ScopedInterfaceEndpointHandle handle); mojo::AssociatedRemote& sender() { return sender_; } + mojom::Channel& thread_safe_sender() { return thread_safe_sender_->proxy(); } protected: void OnPipeClosed(); @@ -102,11 +108,16 @@ class COMPONENT_EXPORT(IPC) MessagePipeReader : public mojom::Channel { mojo::PendingAssociatedReceiver receiver) override; + void ForwardMessage(mojo::Message message); + // |delegate_| is null once the message pipe is closed. Delegate* delegate_; mojo::AssociatedRemote sender_; + std::unique_ptr> + thread_safe_sender_; mojo::AssociatedReceiver receiver_; base::ThreadChecker thread_checker_; + base::WeakPtrFactory weak_ptr_factory_{this}; DISALLOW_COPY_AND_ASSIGN(MessagePipeReader); }; diff --git a/ipc/ipc_mojo_bootstrap.cc b/ipc/ipc_mojo_bootstrap.cc index 4c84885085552a..a6d60a9541410e 100644 --- a/ipc/ipc_mojo_bootstrap.cc +++ b/ipc/ipc_mojo_bootstrap.cc @@ -164,9 +164,6 @@ class ChannelAssociatedGroupController } void Bind(mojo::ScopedMessagePipeHandle handle) { - DCHECK(thread_checker_.CalledOnValidThread()); - DCHECK(task_runner_->BelongsToCurrentThread()); - connector_ = std::make_unique( std::move(handle), mojo::Connector::SINGLE_THREADED_SEND, task_runner_, "IPC Channel"); @@ -211,7 +208,7 @@ class ChannelAssociatedGroupController } void CreateChannelEndpoints( - mojo::AssociatedRemote* sender, + mojo::PendingAssociatedRemote* sender, mojo::PendingAssociatedReceiver* receiver) { mojo::InterfaceId sender_id, receiver_id; if (set_interface_id_namespace_bit_) { @@ -237,8 +234,8 @@ class ChannelAssociatedGroupController mojo::ScopedInterfaceEndpointHandle receiver_handle = CreateScopedInterfaceEndpointHandle(receiver_id); - sender->Bind(mojo::PendingAssociatedRemote( - std::move(sender_handle), 0)); + *sender = mojo::PendingAssociatedRemote( + std::move(sender_handle), 0); *receiver = mojo::PendingAssociatedReceiver( std::move(receiver_handle)); } @@ -1104,7 +1101,7 @@ class MojoBootstrapImpl : public MojoBootstrap { private: void Connect( - mojo::AssociatedRemote* sender, + mojo::PendingAssociatedRemote* sender, mojo::PendingAssociatedReceiver* receiver) override { controller_->Bind(std::move(handle_)); controller_->CreateChannelEndpoints(sender, receiver); diff --git a/ipc/ipc_mojo_bootstrap.h b/ipc/ipc_mojo_bootstrap.h index d231ab2cb84a2a..8d9b574512bfc8 100644 --- a/ipc/ipc_mojo_bootstrap.h +++ b/ipc/ipc_mojo_bootstrap.h @@ -48,7 +48,7 @@ class COMPONENT_EXPORT(IPC) MojoBootstrap { // Start the handshake over the underlying message pipe. virtual void Connect( - mojo::AssociatedRemote* sender, + mojo::PendingAssociatedRemote* sender, mojo::PendingAssociatedReceiver* receiver) = 0; // Stop transmitting messages and start queueing them instead. diff --git a/ipc/ipc_mojo_bootstrap_unittest.cc b/ipc/ipc_mojo_bootstrap_unittest.cc index eddcde20d247c0..ddf160447b8be1 100644 --- a/ipc/ipc_mojo_bootstrap_unittest.cc +++ b/ipc/ipc_mojo_bootstrap_unittest.cc @@ -26,7 +26,9 @@ class Connection { explicit Connection(std::unique_ptr bootstrap, int32_t sender_id) : bootstrap_(std::move(bootstrap)) { - bootstrap_->Connect(&sender_, &receiver_); + mojo::PendingAssociatedRemote sender; + bootstrap_->Connect(&sender, &receiver_); + sender_.Bind(std::move(sender)); sender_->SetPeerPid(sender_id); } diff --git a/mojo/public/cpp/bindings/connector.h b/mojo/public/cpp/bindings/connector.h index 86ff9a7e474f6e..488f8d51a62cea 100644 --- a/mojo/public/cpp/bindings/connector.h +++ b/mojo/public/cpp/bindings/connector.h @@ -98,7 +98,6 @@ class COMPONENT_EXPORT(MOJO_CPP_BINDINGS) Connector : public MessageReceiver { // Connector will read messages from the pipe regardless of whether or not an // incoming receiver has been set. void set_incoming_receiver(MessageReceiver* receiver) { - DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_); incoming_receiver_ = receiver; } @@ -106,7 +105,6 @@ class COMPONENT_EXPORT(MOJO_CPP_BINDINGS) Connector : public MessageReceiver { // state, where no more messages will be processed. This method is used // during testing to prevent that from happening. void set_enforce_errors_from_incoming_receiver(bool enforce) { - DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_); enforce_errors_from_incoming_receiver_ = enforce; } @@ -120,7 +118,6 @@ class COMPONENT_EXPORT(MOJO_CPP_BINDINGS) Connector : public MessageReceiver { // Sets the error handler to receive notifications when an error is // encountered while reading from the pipe or waiting to read from the pipe. void set_connection_error_handler(base::OnceClosure error_handler) { - DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_); connection_error_handler_ = std::move(error_handler); }