Skip to content

Commit

Permalink
Add a base class for objects that want to filter messages on the IO t…
Browse files Browse the repository at this point in the history
…hread. I'll switch the filters to it in future separate changes.

I've also taken out the special case for an initial filter from the IPC classes.  The reason it existed was that there was a race condition of some messages not being filtered if a filter is added after construction but before launching the peer process.  Taking it out allows us to add more than one filter and makes things a little cleaner.
Review URL: http://codereview.chromium.org/5513001

git-svn-id: svn://svn.chromium.org/chrome/trunk/src@68043 0039d316-1c4b-4281-b951-d872f2087c98
  • Loading branch information
jam@chromium.org committed Dec 2, 2010
1 parent e669998 commit 4b580bf
Show file tree
Hide file tree
Showing 20 changed files with 186 additions and 65 deletions.
2 changes: 1 addition & 1 deletion chrome/browser/automation/automation_provider.cc
Original file line number Diff line number Diff line change
Expand Up @@ -176,9 +176,9 @@ bool AutomationProvider::InitializeChannel(const std::string& channel_id) {
use_named_interface ? IPC::Channel::MODE_NAMED_SERVER
: IPC::Channel::MODE_CLIENT,
this,
automation_resource_message_filter_,
g_browser_process->io_thread()->message_loop(),
true, g_browser_process->shutdown_event()));
channel_->AddFilter(automation_resource_message_filter_);

TRACE_EVENT_END("AutomationProvider::InitializeChannel", 0, "");

Expand Down
36 changes: 36 additions & 0 deletions chrome/browser/browser_io_message_filter.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
// Copyright (c) 2010 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#include "chrome/browser/browser_io_message_filter.h"

#include "base/logging.h"
#include "base/process_util.h"

BrowserIOMessageFilter::BrowserIOMessageFilter() : channel_(NULL) {
}

BrowserIOMessageFilter::~BrowserIOMessageFilter() {
}

void BrowserIOMessageFilter::OnFilterAdded(IPC::Channel* channel) {
channel_ = channel;
}

void BrowserIOMessageFilter::OnChannelClosing() {
channel_ = NULL;
}

void BrowserIOMessageFilter::OnChannelConnected(int32 peer_pid) {
if (!base::OpenProcessHandle(peer_pid, &peer_handle_)) {
NOTREACHED();
}
}

bool BrowserIOMessageFilter::Send(IPC::Message* msg) {
if (channel_)
return channel_->Send(msg);

delete msg;
return false;
}
37 changes: 37 additions & 0 deletions chrome/browser/browser_io_message_filter.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
// Copyright (c) 2010 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#ifndef CHROME_BROWSER_BROWSER_IO_MESSAGE_FILTER_H_
#define CHROME_BROWSER_BROWSER_IO_MESSAGE_FILTER_H_
#pragma once

#include "base/process.h"
#include "ipc/ipc_channel_proxy.h"

// Base class for message filters in the browser process that reside on the IO
// thread.
class BrowserIOMessageFilter : public IPC::ChannelProxy::MessageFilter,
public IPC::Message::Sender {
public:
BrowserIOMessageFilter();
virtual ~BrowserIOMessageFilter();

// IPC::ChannelProxy::MessageFilter methods. If you override them, make sure
// to call them as well.
virtual void OnFilterAdded(IPC::Channel* channel);
virtual void OnChannelClosing();
virtual void OnChannelConnected(int32 peer_pid);

// IPC::Message::Sender implementation:
virtual bool Send(IPC::Message* msg);

protected:
base::ProcessHandle peer_handle() { return peer_handle_; }

private:
IPC::Channel* channel_;
base::ProcessHandle peer_handle_;
};

#endif // CHROME_BROWSER_BROWSER_IO_MESSAGE_FILTER_H_
36 changes: 20 additions & 16 deletions chrome/browser/renderer_host/browser_render_process_host.cc
Original file line number Diff line number Diff line change
Expand Up @@ -296,18 +296,6 @@ bool BrowserRenderProcessHost::Init(
// run the IPC channel on the shared IO thread.
base::Thread* io_thread = g_browser_process->io_thread();

// Construct the AudioRendererHost with the IO thread.
audio_renderer_host_ = new AudioRendererHost();

scoped_refptr<ResourceMessageFilter> resource_message_filter(
new ResourceMessageFilter(g_browser_process->resource_dispatcher_host(),
id(),
audio_renderer_host_.get(),
PluginService::GetInstance(),
g_browser_process->print_job_manager(),
profile(),
widget_helper_));

CommandLine::StringType renderer_prefix;
#if defined(OS_POSIX)
// A command prefix is something prepended to the command line of the spawned
Expand All @@ -329,17 +317,14 @@ bool BrowserRenderProcessHost::Init(
ChildProcessInfo::GenerateRandomChannelID(this);
channel_.reset(
new IPC::SyncChannel(channel_id, IPC::Channel::MODE_SERVER, this,
resource_message_filter,
io_thread->message_loop(), true,
g_browser_process->shutdown_event()));
// As a preventive mesure, we DCHECK if someone sends a synchronous message
// with no time-out, which in the context of the browser process we should not
// be doing.
channel_->set_sync_messages_with_no_timeout_allowed(false);

scoped_refptr<PepperFileMessageFilter> pepper_file_message_filter(
new PepperFileMessageFilter(id(), profile()));
channel_->AddFilter(pepper_file_message_filter);
CreateMessageFilters();

if (run_renderer_in_process()) {
// Crank up a thread and run the initialization there. With the way that
Expand Down Expand Up @@ -391,6 +376,25 @@ bool BrowserRenderProcessHost::Init(
return true;
}

void BrowserRenderProcessHost::CreateMessageFilters() {
// Construct the AudioRendererHost with the IO thread.
audio_renderer_host_ = new AudioRendererHost();

scoped_refptr<ResourceMessageFilter> resource_message_filter(
new ResourceMessageFilter(g_browser_process->resource_dispatcher_host(),
id(),
audio_renderer_host_.get(),
PluginService::GetInstance(),
g_browser_process->print_job_manager(),
profile(),
widget_helper_));
channel_->AddFilter(resource_message_filter);

scoped_refptr<PepperFileMessageFilter> pepper_file_message_filter(
new PepperFileMessageFilter(id(), profile()));
channel_->AddFilter(pepper_file_message_filter);
}

int BrowserRenderProcessHost::GetNextRoutingID() {
return widget_helper_->GetNextRoutingID();
}
Expand Down
3 changes: 3 additions & 0 deletions chrome/browser/renderer_host/browser_render_process_host.h
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,9 @@ class BrowserRenderProcessHost : public RenderProcessHost,
private:
friend class VisitRelayingRenderProcessHost;

// Creates and adds the IO thread message filters.
void CreateMessageFilters();

// Control message handlers.
void OnUpdatedCacheStats(const WebKit::WebCache::UsageStats& stats);
void SuddenTerminationChanged(bool enabled);
Expand Down
2 changes: 1 addition & 1 deletion chrome/browser/service/service_process_control.cc
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ void ServiceProcessControl::ConnectInternal() {
// TODO(hclam): Handle error connecting to channel.
const std::string channel_id = GetServiceProcessChannelName();
channel_.reset(
new IPC::SyncChannel(channel_id, IPC::Channel::MODE_CLIENT, this, NULL,
new IPC::SyncChannel(channel_id, IPC::Channel::MODE_CLIENT, this,
io_thread->message_loop(), true,
g_browser_process->shutdown_event()));
channel_->set_sync_messages_with_no_timeout_allowed(false);
Expand Down
2 changes: 2 additions & 0 deletions chrome/chrome_browser.gypi
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,8 @@
'browser/browser_about_handler.h',
'browser/browser_child_process_host.cc',
'browser/browser_child_process_host.h',
'browser/browser_io_message_filter.cc',
'browser/browser_io_message_filter.h',
'browser/browser_main.cc',
'browser/browser_main_gtk.cc',
'browser/browser_main_gtk.h',
Expand Down
2 changes: 1 addition & 1 deletion chrome/common/child_thread.cc
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ void ChildThread::Init() {
}

channel_.reset(new IPC::SyncChannel(channel_name_,
IPC::Channel::MODE_CLIENT, this, NULL,
IPC::Channel::MODE_CLIENT, this,
ChildProcess::current()->io_message_loop(), true,
ChildProcess::current()->GetShutDownEvent()));
#ifdef IPC_MESSAGE_LOG_ENABLED
Expand Down
2 changes: 1 addition & 1 deletion chrome/gpu/gpu_channel.cc
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ bool GpuChannel::Init() {
IPC::AddChannelSocket(channel_name, gpu_fd);
#endif
channel_.reset(new IPC::SyncChannel(
channel_name, IPC::Channel::MODE_SERVER, this, NULL,
channel_name, IPC::Channel::MODE_SERVER, this,
ChildProcess::current()->io_message_loop(), false,
ChildProcess::current()->GetShutDownEvent()));

Expand Down
2 changes: 1 addition & 1 deletion chrome/plugin/plugin_channel_base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ NPObjectBase* PluginChannelBase::GetNPObjectListenerForRoute(int route_id) {
bool PluginChannelBase::Init(MessageLoop* ipc_message_loop,
bool create_pipe_now) {
channel_.reset(new IPC::SyncChannel(
channel_name_, mode_, this, NULL, ipc_message_loop, create_pipe_now,
channel_name_, mode_, this, ipc_message_loop, create_pipe_now,
ChildProcess::current()->GetShutDownEvent()));
channel_valid_ = true;
return true;
Expand Down
2 changes: 1 addition & 1 deletion chrome/renderer/gpu_channel_host.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ GpuChannelHost::~GpuChannelHost() {
void GpuChannelHost::Connect(const std::string& channel_name) {
// Open a channel to the GPU process.
channel_.reset(new IPC::SyncChannel(
channel_name, IPC::Channel::MODE_CLIENT, this, NULL,
channel_name, IPC::Channel::MODE_CLIENT, this,
ChildProcess::current()->io_message_loop(), true,
ChildProcess::current()->GetShutDownEvent()));

Expand Down
2 changes: 1 addition & 1 deletion chrome/service/service_ipc_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ bool ServiceIPCServer::Init() {

void ServiceIPCServer::CreateChannel() {
channel_.reset(new IPC::SyncChannel(channel_name_,
IPC::Channel::MODE_SERVER, this, NULL,
IPC::Channel::MODE_SERVER, this,
g_service_process->io_thread()->message_loop(), true,
g_service_process->shutdown_event()));
DCHECK(sync_message_filter_.get());
Expand Down
2 changes: 1 addition & 1 deletion chrome/test/automation/automation_proxy.cc
Original file line number Diff line number Diff line change
Expand Up @@ -157,10 +157,10 @@ void AutomationProxy::InitializeChannel(const std::string& channel_id,
use_named_interface ? IPC::Channel::MODE_NAMED_CLIENT
: IPC::Channel::MODE_SERVER,
this, // we are the listener
new AutomationMessageFilter(this),
thread_->message_loop(),
true,
shutdown_event_.get()));
channel_->AddFilter(new AutomationMessageFilter(this));
}

void AutomationProxy::InitializeHandleTracker() {
Expand Down
61 changes: 41 additions & 20 deletions ipc/ipc_channel_proxy.cc
Original file line number Diff line number Diff line change
Expand Up @@ -61,16 +61,13 @@ void ChannelProxy::MessageFilter::OnDestruct() const {
//------------------------------------------------------------------------------

ChannelProxy::Context::Context(Channel::Listener* listener,
MessageFilter* filter,
MessageLoop* ipc_message_loop)
: listener_message_loop_(MessageLoop::current()),
listener_(listener),
ipc_message_loop_(ipc_message_loop),
channel_(NULL),
peer_pid_(0),
channel_connected_called_(false) {
if (filter)
filters_.push_back(make_scoped_refptr(filter));
}

void ChannelProxy::Context::CreateChannel(const std::string& id,
Expand Down Expand Up @@ -118,6 +115,12 @@ void ChannelProxy::Context::OnMessageReceivedNoFilter(const Message& message) {

// Called on the IPC::Channel thread
void ChannelProxy::Context::OnChannelConnected(int32 peer_pid) {
// Add any pending filters. This avoids a race condition where someone
// creates a ChannelProxy, calls AddFilter, and then right after starts the
// peer process. The IO thread could receive a message before the task to add
// the filter is run on the IO thread.
OnAddFilter();

peer_pid_ = peer_pid;
for (size_t i = 0; i < filters_.size(); ++i)
filters_[i]->OnChannelConnected(peer_pid);
Expand Down Expand Up @@ -189,13 +192,24 @@ void ChannelProxy::Context::OnSendMessage(Message* message) {
}

// Called on the IPC::Channel thread
void ChannelProxy::Context::OnAddFilter(MessageFilter* filter) {
filters_.push_back(make_scoped_refptr(filter));
void ChannelProxy::Context::OnAddFilter() {
std::vector<scoped_refptr<MessageFilter> > filters;
{
AutoLock auto_lock(pending_filters_lock_);
filters.swap(pending_filters_);
}

for (size_t i = 0; i < filters.size(); ++i) {
filters_.push_back(filters[i]);

// If the channel has already been created, then we need to send this message
// so that the filter gets access to the Channel.
if (channel_)
filter->OnFilterAdded(channel_);
// If the channel has already been created, then we need to send this
// message so that the filter gets access to the Channel.
if (channel_)
filters[i]->OnFilterAdded(channel_);
// Ditto for the peer process id.
if (peer_pid_)
filters[i]->OnChannelConnected(peer_pid_);
}
}

// Called on the IPC::Channel thread
Expand All @@ -211,6 +225,15 @@ void ChannelProxy::Context::OnRemoveFilter(MessageFilter* filter) {
NOTREACHED() << "filter to be removed not found";
}

// Called on the listener's thread
void ChannelProxy::Context::AddFilter(MessageFilter* filter) {
AutoLock auto_lock(pending_filters_lock_);
pending_filters_.push_back(make_scoped_refptr(filter));
ipc_message_loop_->PostTask(
FROM_HERE,
NewRunnableMethod(this, &Context::OnAddFilter));
}

// Called on the listener's thread
void ChannelProxy::Context::OnDispatchMessage(const Message& message) {
if (!listener_)
Expand Down Expand Up @@ -255,15 +278,18 @@ void ChannelProxy::Context::OnDispatchError() {

//-----------------------------------------------------------------------------

ChannelProxy::ChannelProxy(const std::string& channel_id, Channel::Mode mode,
Channel::Listener* listener, MessageFilter* filter,
ChannelProxy::ChannelProxy(const std::string& channel_id,
Channel::Mode mode,
Channel::Listener* listener,
MessageLoop* ipc_thread)
: context_(new Context(listener, filter, ipc_thread)) {
: context_(new Context(listener, ipc_thread)) {
Init(channel_id, mode, ipc_thread, true);
}

ChannelProxy::ChannelProxy(const std::string& channel_id, Channel::Mode mode,
MessageLoop* ipc_thread, Context* context,
ChannelProxy::ChannelProxy(const std::string& channel_id,
Channel::Mode mode,
MessageLoop* ipc_thread,
Context* context,
bool create_pipe_now)
: context_(context) {
Init(channel_id, mode, ipc_thread, create_pipe_now);
Expand Down Expand Up @@ -314,12 +340,7 @@ bool ChannelProxy::Send(Message* message) {
}

void ChannelProxy::AddFilter(MessageFilter* filter) {
context_->ipc_message_loop()->PostTask(
FROM_HERE,
NewRunnableMethod(
context_.get(),
&Context::OnAddFilter,
make_scoped_refptr(filter)));
context_->AddFilter(filter);
}

void ChannelProxy::RemoveFilter(MessageFilter* filter) {
Expand Down
Loading

0 comments on commit 4b580bf

Please sign in to comment.