Skip to content

Commit

Permalink
Change threading semantics in P2PSocketClient.
Browse files Browse the repository at this point in the history
BUG=None
TEST=None

Review URL: http://codereview.chromium.org/6611024

git-svn-id: svn://svn.chromium.org/chrome/trunk/src@76831 0039d316-1c4b-4281-b951-d872f2087c98
  • Loading branch information
sergeyu@chromium.org committed Mar 3, 2011
1 parent 94611e1 commit 184759d
Show file tree
Hide file tree
Showing 3 changed files with 85 additions and 52 deletions.
88 changes: 56 additions & 32 deletions chrome/renderer/p2p/socket_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@

P2PSocketClient::P2PSocketClient(P2PSocketDispatcher* dispatcher)
: dispatcher_(dispatcher),
message_loop_(dispatcher->message_loop()),
ipc_message_loop_(dispatcher->message_loop()),
delegate_message_loop_(NULL),
socket_id_(0), delegate_(NULL),
state_(STATE_UNINITIALIZED) {
}
Expand All @@ -20,26 +21,30 @@ P2PSocketClient::~P2PSocketClient() {
state_ == STATE_ERROR);
}

void P2PSocketClient::Init(P2PSocketType type, P2PSocketAddress address,
P2PSocketClient::Delegate* delegate) {
if (!message_loop_->BelongsToCurrentThread()) {
message_loop_->PostTask(
void P2PSocketClient::Init(
P2PSocketType type, const P2PSocketAddress& address,
P2PSocketClient::Delegate* delegate,
scoped_refptr<base::MessageLoopProxy> delegate_loop) {
if (!ipc_message_loop_->BelongsToCurrentThread()) {
ipc_message_loop_->PostTask(
FROM_HERE, NewRunnableMethod(this, &P2PSocketClient::Init,
type, address, delegate));
type, address, delegate, delegate_loop));
return;
}

DCHECK_EQ(state_, STATE_UNINITIALIZED);
state_ = STATE_OPENING;
delegate_ = delegate;
delegate_message_loop_ = delegate_loop;
socket_id_ = dispatcher_->RegisterClient(this);
dispatcher_->SendP2PMessage(
new P2PHostMsg_CreateSocket(0, type, socket_id_, address));
state_ = STATE_OPENING;
}

void P2PSocketClient::Send(P2PSocketAddress address,
void P2PSocketClient::Send(const P2PSocketAddress& address,
const std::vector<char>& data) {
if (!message_loop_->BelongsToCurrentThread()) {
message_loop_->PostTask(
if (!ipc_message_loop_->BelongsToCurrentThread()) {
ipc_message_loop_->PostTask(
FROM_HERE, NewRunnableMethod(this, &P2PSocketClient::Send, address,
data));
return;
Expand All @@ -51,14 +56,16 @@ void P2PSocketClient::Send(P2PSocketAddress address,
new P2PHostMsg_Send(0, socket_id_, address, data));
}

void P2PSocketClient::Close(Task* closed_task) {
if (!message_loop_->BelongsToCurrentThread()) {
message_loop_->PostTask(
FROM_HERE, NewRunnableMethod(this, &P2PSocketClient::Close,
closed_task));
return;
}
void P2PSocketClient::Close() {
DCHECK(delegate_message_loop_->BelongsToCurrentThread());

delegate_ = NULL;

ipc_message_loop_->PostTask(
FROM_HERE, NewRunnableMethod(this, &P2PSocketClient::DoClose));
}

void P2PSocketClient::DoClose() {
if (dispatcher_) {
if (state_ == STATE_OPEN || state_ == STATE_OPENING) {
dispatcher_->SendP2PMessage(new P2PHostMsg_DestroySocket(0, socket_id_));
Expand All @@ -67,34 +74,51 @@ void P2PSocketClient::Close(Task* closed_task) {
}

state_ = STATE_CLOSED;

closed_task->Run();
delete closed_task;
}

void P2PSocketClient::OnSocketCreated(P2PSocketAddress address) {
DCHECK(message_loop_->BelongsToCurrentThread());
void P2PSocketClient::OnSocketCreated(const P2PSocketAddress& address) {
DCHECK(ipc_message_loop_->BelongsToCurrentThread());
DCHECK_EQ(state_, STATE_OPENING);
state_ = STATE_OPEN;
delegate_->OnOpen(address);

delegate_message_loop_->PostTask(FROM_HERE, NewRunnableMethod(
this, &P2PSocketClient::DeliverOnSocketCreated, address));
}

void P2PSocketClient::DeliverOnSocketCreated(const P2PSocketAddress& address) {
if (delegate_)
delegate_->OnOpen(address);
}

void P2PSocketClient::OnError() {
DCHECK(message_loop_->BelongsToCurrentThread());
DCHECK(ipc_message_loop_->BelongsToCurrentThread());
state_ = STATE_ERROR;
delegate_->OnError();

delegate_message_loop_->PostTask(FROM_HERE, NewRunnableMethod(
this, &P2PSocketClient::DeliverOnError));
}

void P2PSocketClient::DeliverOnError() {
if (delegate_)
delegate_->OnError();
}

void P2PSocketClient::OnDataReceived(P2PSocketAddress address,
void P2PSocketClient::OnDataReceived(const P2PSocketAddress& address,
const std::vector<char>& data) {
DCHECK(message_loop_->BelongsToCurrentThread());
DCHECK(ipc_message_loop_->BelongsToCurrentThread());
DCHECK_EQ(STATE_OPEN, state_);
delegate_->OnDataReceived(address, data);
delegate_message_loop_->PostTask(FROM_HERE, NewRunnableMethod(
this, &P2PSocketClient::DeliverOnDataReceived, address, data));
}

void P2PSocketClient::DeliverOnDataReceived(const P2PSocketAddress& address,
const std::vector<char>& data) {
if (delegate_)
delegate_->OnDataReceived(address, data);
}

void P2PSocketClient::Detach() {
DCHECK(message_loop_->BelongsToCurrentThread());
delegate_ = NULL;
state_ = STATE_ERROR;
delegate_->OnError();
DCHECK(ipc_message_loop_->BelongsToCurrentThread());
dispatcher_ = NULL;
OnError();
}
41 changes: 29 additions & 12 deletions chrome/renderer/p2p/socket_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,23 @@ class MessageLoopProxy;
} // namespace base

class P2PSocketDispatcher;
class Task;

// P2P socket that rountes all calls over IPC. It can be created and
// used from any thread.
// P2P socket that rountes all calls over IPC.
//
// The object runs on two threads: IPC thread and delegate thread. The
// IPC thread is used to interact with P2PSocketDispatcher. All
// callbacks to the user of this class are called on the delegate
// thread which is specified in Init().
class P2PSocketClient : public base::RefCountedThreadSafe<P2PSocketClient> {
public:
// Delegate is called on the IPC thread.
// Delegate is called on the the same thread on the delegate thread.
class Delegate {
public:
virtual ~Delegate() { }

virtual void OnOpen(P2PSocketAddress address) = 0;
virtual void OnOpen(const P2PSocketAddress& address) = 0;
virtual void OnError() = 0;
virtual void OnDataReceived(P2PSocketAddress address,
virtual void OnDataReceived(const P2PSocketAddress& address,
const std::vector<char>& data) = 0;
};

Expand All @@ -37,14 +40,16 @@ class P2PSocketClient : public base::RefCountedThreadSafe<P2PSocketClient> {
// Initialize socket of the specified |type| and connected to the
// specified |address|. |address| matters only when |type| is set to
// P2P_SOCKET_TCP_CLIENT.
void Init(P2PSocketType type, P2PSocketAddress address, Delegate* delegate);
void Init(P2PSocketType type, const P2PSocketAddress& address,
Delegate* delegate,
scoped_refptr<base::MessageLoopProxy> delegate_loop);

// Send the |data| to the |address|.
void Send(P2PSocketAddress address, const std::vector<char>& data);
void Send(const P2PSocketAddress& address, const std::vector<char>& data);

// Must be called before the socket is destroyed. The delegate may
// not be called after |closed_task| is executed.
void Close(Task* closed_task);
void Close();

int socket_id() const { return socket_id_; }

Expand All @@ -64,16 +69,28 @@ class P2PSocketClient : public base::RefCountedThreadSafe<P2PSocketClient> {

virtual ~P2PSocketClient();

void OnSocketCreated(P2PSocketAddress address);
// Message handlers that run on IPC thread.
void OnSocketCreated(const P2PSocketAddress& address);
void OnError();
void OnDataReceived(P2PSocketAddress address,
void OnDataReceived(const P2PSocketAddress& address,
const std::vector<char>& data);

// Proxy methods that deliver messages to the delegate thread.
void DeliverOnSocketCreated(const P2PSocketAddress& address);
void DeliverOnError();
void DeliverOnDataReceived(const P2PSocketAddress& address,
const std::vector<char>& data);

// Scheduled on the IPC thread to finish closing the connection.
void DoClose();


// Called by the dispatcher when it is destroyed.
void Detach();

P2PSocketDispatcher* dispatcher_;
scoped_refptr<base::MessageLoopProxy> message_loop_;
scoped_refptr<base::MessageLoopProxy> ipc_message_loop_;
scoped_refptr<base::MessageLoopProxy> delegate_message_loop_;
int socket_id_;
Delegate* delegate_;
State state_;
Expand Down
8 changes: 0 additions & 8 deletions chrome/renderer/p2p/socket_dispatcher.cc
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,6 @@ bool P2PSocketDispatcher::OnMessageReceived(const IPC::Message& message) {
return handled;
}

P2PSocketClient* P2PSocketDispatcher::CreateSocket(
P2PSocketType type, P2PSocketAddress address,
P2PSocketClient::Delegate* delegate) {
P2PSocketClient* socket = new P2PSocketClient(this);
socket->Init(type, address, delegate);
return socket;
}

int P2PSocketDispatcher::RegisterClient(P2PSocketClient* client) {
return clients_.Add(client);
}
Expand Down

0 comments on commit 184759d

Please sign in to comment.