Skip to content

Commit

Permalink
Revert of Support early associated interface binding on ChannelMojo (…
Browse files Browse the repository at this point in the history
…patchset chromium#7 id:120001 of https://codereview.chromium.org/2163633003/ )

Reason for revert:
Probable cause of failures on Mac ASan. See crbug.com/630564 for more details.

Original issue's description:
> Support early associated interface binding on ChannelMojo
>
> Changes the associated bindings implementation for ChannelMojo
> such that remote interfaces can be acquired immediately upon
> ChannelMojo construction rather than having to wait for connection
> on the IO thread.
>
> Simplifies the Channel bootstrapping process, removing a round-trip
> Init message (and in fact the entire IPC::mojom::Boostrap interface)
> since there's no need to actually exchange associated interface handles
> over the pipe. Instead both sides can assume the other will use a fixed,
> reserved endpoint ID for their IPC::mojom::Channel interface.
>
> This also removes the restriction that associated interfaces must be
> added to a Channel after Init. Instead the same constraints apply as
> with AddFilter: an associated interface, like a filter, may be added
> at any time as long as either Init hasn't been called OR the remote
> process hasn't been launched.
>
> The result of this CL is that any place it's safe to AddFilter,
> it's also safe to AddAssociatedInterface; and any place it's safe to
> Send, it's also safe to GetRemoteAssociatedInterface and begin using
> any such remote interface immediately.
>
> Remote interface requests as well as all messages to remote interfaces
> retain FIFO with respect to any Send calls on the same thread. Local
> interface request dispatch as well as all messages on locally bound
> associated interfaces retain FIFO with respect to any OnMessageReceived
> calls on the same thread.
>
> BUG=612500,619202
>
> Committed: https://crrev.com/e1037f997da9e1d44ca3b09d4ff32f0465673091
> Committed: https://crrev.com/508da24622f957a01b076ccd058bfdccc79068a4
> Cr-Original-Commit-Position: refs/heads/master@{#406720}
> Cr-Commit-Position: refs/heads/master@{#407050}

TBR=yzshen@chromium.org,rockot@chromium.org
# Skipping CQ checks because original CL landed less than 1 days ago.
NOPRESUBMIT=true
NOTREECHECKS=true
NOTRY=true
BUG=612500,619202

Review-Url: https://codereview.chromium.org/2173753002
Cr-Commit-Position: refs/heads/master@{#407120}
  • Loading branch information
msramek authored and Commit bot committed Jul 22, 2016
1 parent f79d797 commit 5507fee
Show file tree
Hide file tree
Showing 13 changed files with 593 additions and 338 deletions.
16 changes: 10 additions & 6 deletions ipc/ipc.mojom
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,17 @@ struct SerializedHandle {
interface GenericInterface {};

interface Channel {
// Informs the remote end of this client's PID. Must be called exactly once,
// before any calls to Receive() below.
SetPeerPid(int32 pid);

// Transmits a classical Chrome IPC message.
Receive(array<uint8> data, array<SerializedHandle>? handles);

// Requests a Channel-associated interface.
GetAssociatedInterface(string name, associated GenericInterface& request);
};

// An interface for connecting a pair of Channel interfaces representing a
// bidirectional IPC channel.
interface Bootstrap {
// Initializes a Chrome IPC channel over |to_client_channel| and
// |to_server_channel|. Each side also sends its PID to the other side.
Init(associated Channel& to_client_channel,
associated Channel to_server_channel,
int32 pid) => (int32 pid);
};
10 changes: 7 additions & 3 deletions ipc/ipc_channel.h
Original file line number Diff line number Diff line change
Expand Up @@ -103,11 +103,10 @@ class IPC_EXPORT Channel : public Endpoint {
virtual ~AssociatedInterfaceSupport() {}

// Accesses the AssociatedGroup used to associate new interface endpoints
// with this Channel. Must be safe to call from any thread.
// with this Channel.
virtual mojo::AssociatedGroup* GetAssociatedGroup() = 0;

// Adds an interface factory to this channel for interface |name|. Must be
// safe to call from any thread.
// Adds an interface factory to this channel for interface |name|.
virtual void AddGenericAssociatedInterface(
const std::string& name,
const GenericAssociatedInterfaceFactory& factory) = 0;
Expand All @@ -117,6 +116,11 @@ class IPC_EXPORT Channel : public Endpoint {
const std::string& name,
mojo::ScopedInterfaceEndpointHandle handle) = 0;

// Sets the TaskRunner on which to support proxied dispatch for associated
// interfaces.
virtual void SetProxyTaskRunner(
scoped_refptr<base::SingleThreadTaskRunner> task_runner) = 0;

// Template helper to add an interface factory to this channel.
template <typename Interface>
using AssociatedInterfaceFactory =
Expand Down
154 changes: 98 additions & 56 deletions ipc/ipc_channel_mojo.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
#include "base/lazy_instance.h"
#include "base/macros.h"
#include "base/memory/ptr_util.h"
#include "base/process/process_handle.h"
#include "base/threading/thread_task_runner_handle.h"
#include "build/build_config.h"
#include "ipc/ipc_listener.h"
Expand Down Expand Up @@ -270,11 +269,13 @@ ChannelMojo::ChannelMojo(
Mode mode,
Listener* listener,
const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner)
: pipe_(handle.get()), listener_(listener), weak_factory_(this) {
: pipe_(handle.get()),
listener_(listener),
waiting_connect_(true),
weak_factory_(this) {
// Create MojoBootstrap after all members are set as it touches
// ChannelMojo from a different thread.
bootstrap_ =
MojoBootstrap::Create(std::move(handle), mode, this, ipc_task_runner);
bootstrap_ = MojoBootstrap::Create(std::move(handle), mode, this);
}

ChannelMojo::~ChannelMojo() {
Expand All @@ -283,32 +284,89 @@ ChannelMojo::~ChannelMojo() {

bool ChannelMojo::Connect() {
WillConnect();

DCHECK(!task_runner_);
task_runner_ = base::ThreadTaskRunnerHandle::Get();
DCHECK(!message_reader_);

{
base::AutoLock lock(lock_);
DCHECK(!task_runner_);
task_runner_ = base::ThreadTaskRunnerHandle::Get();
DCHECK(!message_reader_);
}
bootstrap_->Connect();
return true;
}

void ChannelMojo::Close() {
// NOTE: The MessagePipeReader's destructor may re-enter this function. Use
// caution when changing this method.
std::unique_ptr<internal::MessagePipeReader> reader =
std::move(message_reader_);
reader.reset();
std::unique_ptr<internal::MessagePipeReader, ReaderDeleter> reader;
{
base::AutoLock lock(lock_);
if (!message_reader_)
return;
// The reader's destructor may re-enter Close, so we swap it out first to
// avoid deadlock when freeing it below.
std::swap(message_reader_, reader);

// We might Close() before we Connect().
waiting_connect_ = false;
}

base::AutoLock lock(associated_interface_lock_);
associated_interfaces_.clear();
reader.reset();
}

// MojoBootstrap::Delegate implementation
void ChannelMojo::OnPipesAvailable(mojom::ChannelAssociatedPtr sender,
mojom::ChannelAssociatedRequest receiver) {
sender->SetPeerPid(GetSelfPID());
message_reader_.reset(new internal::MessagePipeReader(
pipe_, std::move(sender), std::move(receiver), this));
void ChannelMojo::OnPipesAvailable(
mojom::ChannelAssociatedPtrInfo send_channel,
mojom::ChannelAssociatedRequest receive_channel,
int32_t peer_pid) {
InitMessageReader(std::move(send_channel), std::move(receive_channel),
peer_pid);
}

void ChannelMojo::OnBootstrapError() {
listener_->OnChannelError();
}

void ChannelMojo::OnAssociatedInterfaceRequest(
const std::string& name,
mojo::ScopedInterfaceEndpointHandle handle) {
auto iter = associated_interfaces_.find(name);
if (iter != associated_interfaces_.end())
iter->second.Run(std::move(handle));
}

void ChannelMojo::InitMessageReader(mojom::ChannelAssociatedPtrInfo sender,
mojom::ChannelAssociatedRequest receiver,
base::ProcessId peer_pid) {
mojom::ChannelAssociatedPtr sender_ptr;
sender_ptr.Bind(std::move(sender));
std::unique_ptr<internal::MessagePipeReader, ChannelMojo::ReaderDeleter>
reader(new internal::MessagePipeReader(
pipe_, std::move(sender_ptr), std::move(receiver), peer_pid, this));

bool connected = true;
{
base::AutoLock lock(lock_);
for (size_t i = 0; i < pending_messages_.size(); ++i) {
if (!reader->Send(std::move(pending_messages_[i]))) {
LOG(ERROR) << "Failed to flush pending messages";
pending_messages_.clear();
connected = false;
break;
}
}

if (connected) {
// We set |message_reader_| here and won't get any |pending_messages_|
// hereafter. Although we might have some if there is an error, we don't
// care. They cannot be sent anyway.
message_reader_ = std::move(reader);
pending_messages_.clear();
waiting_connect_ = false;
}
}

if (connected)
listener_->OnChannelConnected(static_cast<int32_t>(GetPeerPID()));
else
OnPipeError();
}

void ChannelMojo::OnPipeError() {
Expand All @@ -322,25 +380,14 @@ void ChannelMojo::OnPipeError() {
}
}

void ChannelMojo::OnAssociatedInterfaceRequest(
const std::string& name,
mojo::ScopedInterfaceEndpointHandle handle) {
GenericAssociatedInterfaceFactory factory;
{
base::AutoLock locker(associated_interface_lock_);
auto iter = associated_interfaces_.find(name);
if (iter != associated_interfaces_.end())
factory = iter->second;
}

if (!factory.is_null())
factory.Run(std::move(handle));
}

bool ChannelMojo::Send(Message* message) {
std::unique_ptr<Message> scoped_message = base::WrapUnique(message);
if (!message_reader_)
return false;
base::AutoLock lock(lock_);
if (!message_reader_) {
pending_messages_.push_back(base::WrapUnique(message));
// Counts as OK before the connection is established, but it's an
// error otherwise.
return waiting_connect_;
}

// Comment copied from ipc_channel_posix.cc:
// We can't close the pipe here, because calling OnChannelError may destroy
Expand All @@ -351,38 +398,28 @@ bool ChannelMojo::Send(Message* message) {
//
// With Mojo, there's no OnFileCanReadWithoutBlocking, but we expect the
// pipe's connection error handler will be invoked in its place.
return message_reader_->Send(std::move(scoped_message));
return message_reader_->Send(base::WrapUnique(message));
}

bool ChannelMojo::IsSendThreadSafe() const {
return false;
}

base::ProcessId ChannelMojo::GetPeerPID() const {
base::AutoLock lock(lock_);
if (!message_reader_)
return base::kNullProcessId;

return message_reader_->GetPeerPid();
}

base::ProcessId ChannelMojo::GetSelfPID() const {
#if defined(OS_LINUX)
if (int global_pid = GetGlobalPid())
return global_pid;
#endif // OS_LINUX
#if defined(OS_NACL)
return -1;
#else
return base::GetCurrentProcId();
#endif // defined(OS_NACL)
return bootstrap_->GetSelfPID();
}

Channel::AssociatedInterfaceSupport*
ChannelMojo::GetAssociatedInterfaceSupport() { return this; }

void ChannelMojo::OnPeerPidReceived() {
listener_->OnChannelConnected(static_cast<int32_t>(GetPeerPID()));
}

void ChannelMojo::OnMessageReceived(const Message& message) {
TRACE_EVENT2("ipc,toplevel", "ChannelMojo::OnMessageReceived",
"class", IPC_MESSAGE_ID_CLASS(message.type()),
Expand Down Expand Up @@ -467,16 +504,21 @@ mojo::AssociatedGroup* ChannelMojo::GetAssociatedGroup() {
void ChannelMojo::AddGenericAssociatedInterface(
const std::string& name,
const GenericAssociatedInterfaceFactory& factory) {
base::AutoLock locker(associated_interface_lock_);
auto result = associated_interfaces_.insert({ name, factory });
DCHECK(result.second);
}

void ChannelMojo::GetGenericRemoteAssociatedInterface(
const std::string& name,
mojo::ScopedInterfaceEndpointHandle handle) {
if (message_reader_)
message_reader_->GetRemoteInterface(name, std::move(handle));
DCHECK(message_reader_);
message_reader_->GetRemoteInterface(name, std::move(handle));
}

void ChannelMojo::SetProxyTaskRunner(
scoped_refptr<base::SingleThreadTaskRunner> task_runner) {
DCHECK(bootstrap_);
bootstrap_->SetProxyTaskRunner(task_runner);
}

} // namespace IPC
35 changes: 25 additions & 10 deletions ipc/ipc_channel_mojo.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ namespace IPC {
class IPC_EXPORT ChannelMojo
: public Channel,
public Channel::AssociatedInterfaceSupport,
public NON_EXPORTED_BASE(MojoBootstrap::Delegate),
public MojoBootstrap::Delegate,
public NON_EXPORTED_BASE(internal::MessagePipeReader::Delegate) {
public:
// Creates a ChannelMojo.
Expand Down Expand Up @@ -90,16 +90,17 @@ class IPC_EXPORT ChannelMojo
mojo::Array<mojom::SerializedHandlePtr>* handles);

// MojoBootstrapDelegate implementation
void OnPipesAvailable(mojom::ChannelAssociatedPtr sender,
mojom::ChannelAssociatedRequest receiver) override;
void OnPipesAvailable(mojom::ChannelAssociatedPtrInfo send_channel,
mojom::ChannelAssociatedRequest receive_channel,
int32_t peer_pid) override;
void OnBootstrapError() override;
void OnAssociatedInterfaceRequest(
const std::string& name,
mojo::ScopedInterfaceEndpointHandle handle) override;

// MessagePipeReader::Delegate
void OnPeerPidReceived() override;
void OnMessageReceived(const Message& message) override;
void OnPipeError() override;
void OnAssociatedInterfaceRequest(
const std::string& name,
mojo::ScopedInterfaceEndpointHandle handle) override;

private:
ChannelMojo(
Expand All @@ -108,6 +109,10 @@ class IPC_EXPORT ChannelMojo
Listener* listener,
const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner);

void InitMessageReader(mojom::ChannelAssociatedPtrInfo sender,
mojom::ChannelAssociatedRequest receiver,
base::ProcessId peer_pid);

// Channel::AssociatedInterfaceSupport:
mojo::AssociatedGroup* GetAssociatedGroup() override;
void AddGenericAssociatedInterface(
Expand All @@ -116,6 +121,13 @@ class IPC_EXPORT ChannelMojo
void GetGenericRemoteAssociatedInterface(
const std::string& name,
mojo::ScopedInterfaceEndpointHandle handle) override;
void SetProxyTaskRunner(
scoped_refptr<base::SingleThreadTaskRunner> task_runner) override;

// ChannelMojo needs to kill its MessagePipeReader in delayed manner
// because the channel wants to kill these readers during the
// notifications invoked by them.
typedef internal::MessagePipeReader::DelayedDeleter ReaderDeleter;

// A TaskRunner which runs tasks on the ChannelMojo's owning thread.
scoped_refptr<base::TaskRunner> task_runner_;
Expand All @@ -124,12 +136,15 @@ class IPC_EXPORT ChannelMojo
std::unique_ptr<MojoBootstrap> bootstrap_;
Listener* listener_;

std::unique_ptr<internal::MessagePipeReader> message_reader_;

base::Lock associated_interface_lock_;
std::map<std::string, GenericAssociatedInterfaceFactory>
associated_interfaces_;

// Guards access to the fields below.
mutable base::Lock lock_;
std::unique_ptr<internal::MessagePipeReader, ReaderDeleter> message_reader_;
std::vector<std::unique_ptr<Message>> pending_messages_;
bool waiting_connect_;

base::WeakPtrFactory<ChannelMojo> weak_factory_;

DISALLOW_COPY_AND_ASSIGN(ChannelMojo);
Expand Down
Loading

0 comments on commit 5507fee

Please sign in to comment.