Skip to content

Commit

Permalink
Allow earlier association of Channel interfaces
Browse files Browse the repository at this point in the history
Previously IPC Channels were always created on the IO thread, and they
carried an implicit requirement that any Channel-associated interfaces
could not be associated until after the IO thread initialization was
done. This is incompatible with the effort to move process hosts to the
UI thread.

This loosens some unnecessary constraints on how all the ChannelMojo
internals are set up, and enables off-thread construction and (limited)
early use of the ChannelMojo prior to IO thread initialization.

Bug: 904556
Change-Id: I67a72d2b191b20d3757768bb38a5a77f29b3bea2
Reviewed-on: https://chromium-review.googlesource.com/c/chromium/src/+/2910960
Commit-Queue: Ken Rockot <rockot@google.com>
Reviewed-by: Zhenyao Mo <zmo@chromium.org>
Cr-Commit-Position: refs/heads/master@{#885330}
  • Loading branch information
John Abd-El-Malek authored and Chromium LUCI CQ committed May 21, 2021
1 parent 2c454ea commit 509586f
Show file tree
Hide file tree
Showing 10 changed files with 113 additions and 38 deletions.
16 changes: 4 additions & 12 deletions gpu/ipc/client/gpu_channel_host.cc
Original file line number Diff line number Diff line change
Expand Up @@ -52,16 +52,9 @@ GpuChannelHost::GpuChannelHost(
static_cast<int32_t>(
GpuChannelReservedRoutes::kImageDecodeAccelerator)) {
mojo::PendingAssociatedRemote<mojom::GpuChannel> channel;
auto receiver = channel.InitWithNewEndpointAndPassReceiver();
if (io_thread_->BelongsToCurrentThread()) {
listener_->Initialize(std::move(handle), std::move(receiver), io_thread_);
} else {
io_thread_->PostTask(
FROM_HERE,
base::BindOnce(&Listener::Initialize, base::Unretained(listener_.get()),
std::move(handle), std::move(receiver), io_thread_));
}

listener_->Initialize(std::move(handle),
channel.InitWithNewEndpointAndPassReceiver(),
io_thread_);
gpu_channel_ = mojo::SharedAssociatedRemote<mojom::GpuChannel>(
std::move(channel), io_thread_);

Expand Down Expand Up @@ -297,8 +290,7 @@ void GpuChannelHost::Listener::Initialize(
scoped_refptr<base::SingleThreadTaskRunner> io_task_runner) {
channel_ = IPC::ChannelMojo::Create(
std::move(handle), IPC::Channel::MODE_CLIENT, this, io_task_runner,
base::ThreadTaskRunnerHandle::Get(),
mojo::internal::MessageQuotaChecker::MaybeCreate());
io_task_runner, mojo::internal::MessageQuotaChecker::MaybeCreate());
DCHECK(channel_);
bool result = channel_->Connect();
DCHECK(result);
Expand Down
2 changes: 1 addition & 1 deletion gpu/ipc/client/gpu_channel_host.h
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ class GPU_EXPORT GpuChannelHost
Listener();
~Listener() override;

// Called on the IO thread.
// Called on the GpuChannelHost's thread.
void Initialize(mojo::ScopedMessagePipeHandle handle,
mojo::PendingAssociatedReceiver<mojom::GpuChannel> receiver,
scoped_refptr<base::SingleThreadTaskRunner> io_task_runner);
Expand Down
26 changes: 21 additions & 5 deletions ipc/ipc_channel_mojo.cc
Original file line number Diff line number Diff line change
Expand Up @@ -186,21 +186,31 @@ ChannelMojo::~ChannelMojo() {
}

bool ChannelMojo::Connect() {
DCHECK(task_runner_->RunsTasksInCurrentSequence());

WillConnect();

mojo::AssociatedRemote<mojom::Channel> sender;
mojo::PendingAssociatedRemote<mojom::Channel> sender;
mojo::PendingAssociatedReceiver<mojom::Channel> receiver;
bootstrap_->Connect(&sender, &receiver);

DCHECK(!message_reader_);
sender->SetPeerPid(GetSelfPID());
message_reader_ = std::make_unique<internal::MessagePipeReader>(
pipe_, std::move(sender), std::move(receiver), this);
pipe_, std::move(sender), std::move(receiver), task_runner_, this);

if (task_runner_->RunsTasksInCurrentSequence()) {
FinishConnectOnIOThread();
} else {
task_runner_->PostTask(
FROM_HERE,
base::BindOnce(&ChannelMojo::FinishConnectOnIOThread, weak_ptr_));
}
return true;
}

void ChannelMojo::FinishConnectOnIOThread() {
DCHECK(message_reader_);
message_reader_->FinishInitializationOnIOThread(GetSelfPID());
}

void ChannelMojo::Pause() {
bootstrap_->Pause();
}
Expand Down Expand Up @@ -375,6 +385,12 @@ void ChannelMojo::GetGenericRemoteAssociatedInterface(
const std::string& name,
mojo::ScopedInterfaceEndpointHandle handle) {
if (message_reader_) {
if (!task_runner_->RunsTasksInCurrentSequence()) {
message_reader_->thread_safe_sender().GetAssociatedInterface(
name, mojo::PendingAssociatedReceiver<mojom::GenericInterface>(
std::move(handle)));
return;
}
message_reader_->GetRemoteInterface(name, std::move(handle));
} else {
// Attach the associated interface to a disconnected pipe, so that the
Expand Down
2 changes: 2 additions & 0 deletions ipc/ipc_channel_mojo.h
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,8 @@ class COMPONENT_EXPORT(IPC) ChannelMojo
const std::string& name,
mojo::ScopedInterfaceEndpointHandle handle) override;

void FinishConnectOnIOThread();

base::WeakPtr<ChannelMojo> weak_ptr_;

// A TaskRunner which runs tasks on the ChannelMojo's owning thread.
Expand Down
72 changes: 65 additions & 7 deletions ipc/ipc_message_pipe_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,29 +19,83 @@
#include "base/trace_event/trace_event.h"
#include "ipc/ipc_channel_mojo.h"
#include "mojo/public/cpp/bindings/message.h"
#include "mojo/public/cpp/bindings/thread_safe_proxy.h"

namespace IPC {
namespace internal {

namespace {

class ThreadSafeProxy : public mojo::ThreadSafeProxy {
public:
using Forwarder = base::RepeatingCallback<void(mojo::Message)>;

ThreadSafeProxy(scoped_refptr<base::SequencedTaskRunner> task_runner,
Forwarder forwarder,
mojo::AssociatedGroupController& group_controller)
: task_runner_(std::move(task_runner)),
forwarder_(std::move(forwarder)),
group_controller_(group_controller) {}

// mojo::ThreadSafeProxy:
void SendMessage(mojo::Message& message) override {
message.SerializeHandles(&group_controller_);
task_runner_->PostTask(FROM_HERE,
base::BindOnce(forwarder_, std::move(message)));
}

void SendMessageWithResponder(
mojo::Message& message,
std::unique_ptr<mojo::MessageReceiver> responder) override {
// We don't bother supporting this because it's not used in practice.
NOTREACHED();
}

private:
~ThreadSafeProxy() override = default;

const scoped_refptr<base::SequencedTaskRunner> task_runner_;
const Forwarder forwarder_;
mojo::AssociatedGroupController& group_controller_;
};

} // namespace

MessagePipeReader::MessagePipeReader(
mojo::MessagePipeHandle pipe,
mojo::AssociatedRemote<mojom::Channel> sender,
mojo::PendingAssociatedRemote<mojom::Channel> sender,
mojo::PendingAssociatedReceiver<mojom::Channel> receiver,
scoped_refptr<base::SequencedTaskRunner> task_runner,
MessagePipeReader::Delegate* delegate)
: delegate_(delegate),
sender_(std::move(sender)),
receiver_(this, std::move(receiver)) {
sender_(std::move(sender), task_runner),
receiver_(this, std::move(receiver), task_runner) {
thread_safe_sender_ =
std::make_unique<mojo::ThreadSafeForwarder<mojom::Channel>>(
base::MakeRefCounted<ThreadSafeProxy>(
task_runner,
base::BindRepeating(&MessagePipeReader::ForwardMessage,
weak_ptr_factory_.GetWeakPtr()),
*sender_.internal_state()->associated_group()->GetController()));

thread_checker_.DetachFromThread();
}

MessagePipeReader::~MessagePipeReader() {
DCHECK(thread_checker_.CalledOnValidThread());
// The pipe should be closed before deletion.
}

void MessagePipeReader::FinishInitializationOnIOThread(
base::ProcessId self_pid) {
sender_.set_disconnect_handler(
base::BindOnce(&MessagePipeReader::OnPipeError, base::Unretained(this),
MOJO_RESULT_FAILED_PRECONDITION));
receiver_.set_disconnect_handler(
base::BindOnce(&MessagePipeReader::OnPipeError, base::Unretained(this),
MOJO_RESULT_FAILED_PRECONDITION));
}

MessagePipeReader::~MessagePipeReader() {
DCHECK(thread_checker_.CalledOnValidThread());
// The pipe should be closed before deletion.
sender_->SetPeerPid(self_pid);
}

void MessagePipeReader::Close() {
Expand Down Expand Up @@ -128,5 +182,9 @@ void MessagePipeReader::OnPipeError(MojoResult error) {
delegate_->OnPipeError();
}

void MessagePipeReader::ForwardMessage(mojo::Message message) {
sender_.internal_state()->ForwardMessage(std::move(message));
}

} // namespace internal
} // namespace IPC
13 changes: 12 additions & 1 deletion ipc/ipc_message_pipe_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
#include "base/compiler_specific.h"
#include "base/component_export.h"
#include "base/macros.h"
#include "base/memory/weak_ptr.h"
#include "base/process/process_handle.h"
#include "base/threading/thread_checker.h"
#include "ipc/ipc.mojom.h"
Expand All @@ -22,6 +23,7 @@
#include "mojo/public/cpp/bindings/associated_remote.h"
#include "mojo/public/cpp/bindings/pending_associated_receiver.h"
#include "mojo/public/cpp/bindings/scoped_interface_endpoint_handle.h"
#include "mojo/public/cpp/bindings/shared_remote.h"
#include "mojo/public/cpp/system/core.h"
#include "mojo/public/cpp/system/message_pipe.h"

Expand Down Expand Up @@ -68,11 +70,14 @@ class COMPONENT_EXPORT(IPC) MessagePipeReader : public mojom::Channel {
//
// Note that MessagePipeReader doesn't delete |delegate|.
MessagePipeReader(mojo::MessagePipeHandle pipe,
mojo::AssociatedRemote<mojom::Channel> sender,
mojo::PendingAssociatedRemote<mojom::Channel> sender,
mojo::PendingAssociatedReceiver<mojom::Channel> receiver,
scoped_refptr<base::SequencedTaskRunner> task_runner,
Delegate* delegate);
~MessagePipeReader() override;

void FinishInitializationOnIOThread(base::ProcessId self_pid);

// Close and destroy the MessagePipe.
void Close();

Expand All @@ -88,6 +93,7 @@ class COMPONENT_EXPORT(IPC) MessagePipeReader : public mojom::Channel {
mojo::ScopedInterfaceEndpointHandle handle);

mojo::AssociatedRemote<mojom::Channel>& sender() { return sender_; }
mojom::Channel& thread_safe_sender() { return thread_safe_sender_->proxy(); }

protected:
void OnPipeClosed();
Expand All @@ -102,11 +108,16 @@ class COMPONENT_EXPORT(IPC) MessagePipeReader : public mojom::Channel {
mojo::PendingAssociatedReceiver<mojom::GenericInterface> receiver)
override;

void ForwardMessage(mojo::Message message);

// |delegate_| is null once the message pipe is closed.
Delegate* delegate_;
mojo::AssociatedRemote<mojom::Channel> sender_;
std::unique_ptr<mojo::ThreadSafeForwarder<mojom::Channel>>
thread_safe_sender_;
mojo::AssociatedReceiver<mojom::Channel> receiver_;
base::ThreadChecker thread_checker_;
base::WeakPtrFactory<MessagePipeReader> weak_ptr_factory_{this};

DISALLOW_COPY_AND_ASSIGN(MessagePipeReader);
};
Expand Down
11 changes: 4 additions & 7 deletions ipc/ipc_mojo_bootstrap.cc
Original file line number Diff line number Diff line change
Expand Up @@ -164,9 +164,6 @@ class ChannelAssociatedGroupController
}

void Bind(mojo::ScopedMessagePipeHandle handle) {
DCHECK(thread_checker_.CalledOnValidThread());
DCHECK(task_runner_->BelongsToCurrentThread());

connector_ = std::make_unique<mojo::Connector>(
std::move(handle), mojo::Connector::SINGLE_THREADED_SEND, task_runner_,
"IPC Channel");
Expand Down Expand Up @@ -211,7 +208,7 @@ class ChannelAssociatedGroupController
}

void CreateChannelEndpoints(
mojo::AssociatedRemote<mojom::Channel>* sender,
mojo::PendingAssociatedRemote<mojom::Channel>* sender,
mojo::PendingAssociatedReceiver<mojom::Channel>* receiver) {
mojo::InterfaceId sender_id, receiver_id;
if (set_interface_id_namespace_bit_) {
Expand All @@ -237,8 +234,8 @@ class ChannelAssociatedGroupController
mojo::ScopedInterfaceEndpointHandle receiver_handle =
CreateScopedInterfaceEndpointHandle(receiver_id);

sender->Bind(mojo::PendingAssociatedRemote<mojom::Channel>(
std::move(sender_handle), 0));
*sender = mojo::PendingAssociatedRemote<mojom::Channel>(
std::move(sender_handle), 0);
*receiver = mojo::PendingAssociatedReceiver<mojom::Channel>(
std::move(receiver_handle));
}
Expand Down Expand Up @@ -1104,7 +1101,7 @@ class MojoBootstrapImpl : public MojoBootstrap {

private:
void Connect(
mojo::AssociatedRemote<mojom::Channel>* sender,
mojo::PendingAssociatedRemote<mojom::Channel>* sender,
mojo::PendingAssociatedReceiver<mojom::Channel>* receiver) override {
controller_->Bind(std::move(handle_));
controller_->CreateChannelEndpoints(sender, receiver);
Expand Down
2 changes: 1 addition & 1 deletion ipc/ipc_mojo_bootstrap.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ class COMPONENT_EXPORT(IPC) MojoBootstrap {

// Start the handshake over the underlying message pipe.
virtual void Connect(
mojo::AssociatedRemote<mojom::Channel>* sender,
mojo::PendingAssociatedRemote<mojom::Channel>* sender,
mojo::PendingAssociatedReceiver<mojom::Channel>* receiver) = 0;

// Stop transmitting messages and start queueing them instead.
Expand Down
4 changes: 3 additions & 1 deletion ipc/ipc_mojo_bootstrap_unittest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@ class Connection {
explicit Connection(std::unique_ptr<IPC::MojoBootstrap> bootstrap,
int32_t sender_id)
: bootstrap_(std::move(bootstrap)) {
bootstrap_->Connect(&sender_, &receiver_);
mojo::PendingAssociatedRemote<IPC::mojom::Channel> sender;
bootstrap_->Connect(&sender, &receiver_);
sender_.Bind(std::move(sender));
sender_->SetPeerPid(sender_id);
}

Expand Down
3 changes: 0 additions & 3 deletions mojo/public/cpp/bindings/connector.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,15 +98,13 @@ class COMPONENT_EXPORT(MOJO_CPP_BINDINGS) Connector : public MessageReceiver {
// Connector will read messages from the pipe regardless of whether or not an
// incoming receiver has been set.
void set_incoming_receiver(MessageReceiver* receiver) {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
incoming_receiver_ = receiver;
}

// Errors from incoming receivers will force the connector into an error
// state, where no more messages will be processed. This method is used
// during testing to prevent that from happening.
void set_enforce_errors_from_incoming_receiver(bool enforce) {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
enforce_errors_from_incoming_receiver_ = enforce;
}

Expand All @@ -120,7 +118,6 @@ class COMPONENT_EXPORT(MOJO_CPP_BINDINGS) Connector : public MessageReceiver {
// Sets the error handler to receive notifications when an error is
// encountered while reading from the pipe or waiting to read from the pipe.
void set_connection_error_handler(base::OnceClosure error_handler) {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
connection_error_handler_ = std::move(error_handler);
}

Expand Down

0 comments on commit 509586f

Please sign in to comment.