Skip to content

Commit

Permalink
ipcz: Allow node reconnection
Browse files Browse the repository at this point in the history
If an application explicitly issues a ConnectNode() call targeting a
broker to which it's already been connected, ipcz will now assume the
application intended to replace the old connection.  Prior to this
change the new connection request would effectively be ignored.

Additionally, when a non-broker loses a connection to its primary broker
it also now severs its connections to all other nodes. This resolves a
related issue that affected tests for the above changes.

Fixed: 1449062
Change-Id: I6c4818015f1182d3cda6400c486e422cb18b2073
Reviewed-on: https://chromium-review.googlesource.com/c/chromium/src/+/4574863
Reviewed-by: Alex Gough <ajgo@chromium.org>
Commit-Queue: Ken Rockot <rockot@google.com>
Cr-Commit-Position: refs/heads/main@{#1154099}
  • Loading branch information
krockot authored and Chromium LUCI CQ committed Jun 6, 2023
1 parent e55944d commit 0f15bea
Show file tree
Hide file tree
Showing 12 changed files with 319 additions and 72 deletions.
54 changes: 47 additions & 7 deletions mojo/core/ipcz_driver/driver_for_ipcz_tests.cc
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,13 @@ class MojoIpczInProcessTestNodeController
};

MojoIpczInProcessTestNodeController(
ipcz::test::TestNode& source,
const std::string& node_name,
std::unique_ptr<ipcz::test::TestNode> test_node,
ipcz::test::TestDriver* test_driver)
: node_thread_delegate_(std::move(test_node), test_driver),
: source_(source),
is_broker_(test_node->GetDetails().is_broker),
node_thread_delegate_(std::move(test_node), test_driver),
node_thread_(&node_thread_delegate_, node_name) {
node_thread_.StartAsync();
}
Expand All @@ -71,20 +74,42 @@ class MojoIpczInProcessTestNodeController
return true;
}

ipcz::test::TransportPair CreateNewTransports() override {
ipcz::test::TransportPair transports;
if (is_broker_) {
transports = source_.CreateBrokerToBrokerTransports();
} else {
transports = source_.CreateTransports();
}

Transport::FromHandle(transports.ours)
->set_remote_process(base::Process::Current());
Transport::FromHandle(transports.theirs)
->set_remote_process(base::Process::Current());
return transports;
}

private:
~MojoIpczInProcessTestNodeController() override {
CHECK(node_thread_.HasBeenJoined());
}

ipcz::test::TestNode& source_;
const bool is_broker_;
NodeThreadDelegate node_thread_delegate_;
base::DelegateSimpleThread node_thread_;
};

class MojoIpczChildTestNodeController
: public ipcz::test::TestNode::TestNodeController {
public:
explicit MojoIpczChildTestNodeController(base::Process process)
: process_(std::move(process)) {}
MojoIpczChildTestNodeController(
ipcz::test::TestNode& source,
const ipcz::test::TestNodeDetails& child_details,
base::Process process)
: source_(source),
is_broker_(child_details.is_broker),
process_(std::move(process)) {}

// ipcz::test::TestNode::TestNodeController:
bool WaitForShutdown() override {
Expand All @@ -101,9 +126,24 @@ class MojoIpczChildTestNodeController
return *result_;
}

ipcz::test::TransportPair CreateNewTransports() override {
ipcz::test::TransportPair transports;
if (is_broker_) {
transports = source_.CreateBrokerToBrokerTransports();
} else {
transports = source_.CreateTransports();
}

Transport::FromHandle(transports.ours)
->set_remote_process(process_.Duplicate());
return transports;
}

private:
~MojoIpczChildTestNodeController() override { DCHECK(result_.has_value()); }

ipcz::test::TestNode& source_;
const bool is_broker_;
base::Process process_;
absl::optional<bool> result_;
};
Expand All @@ -127,7 +167,7 @@ class MojoIpczTestDriver : public ipcz::test::TestDriver {
return kMojoIpczMultiprocessTestDriverName;
}

ipcz::test::TestNode::TransportPair CreateTransports(
ipcz::test::TransportPair CreateTransports(
ipcz::test::TestNode& source,
bool for_broker_target) const override {
std::pair<scoped_refptr<Transport>, scoped_refptr<Transport>> transports;
Expand Down Expand Up @@ -199,8 +239,8 @@ class MojoIpczTestDriver : public ipcz::test::TestDriver {
std::unique_ptr<ipcz::test::TestNode> node = details.factory();
node->SetTransport(their_transport);
return ipcz::MakeRefCounted<MojoIpczInProcessTestNodeController>(
std::string(details.name.begin(), details.name.end()), std::move(node),
this);
source, std::string(details.name.begin(), details.name.end()),
std::move(node), this);
}

ipcz::Ref<ipcz::test::TestNode::TestNodeController> SpawnTestNodeProcess(
Expand Down Expand Up @@ -259,7 +299,7 @@ class MojoIpczTestDriver : public ipcz::test::TestDriver {
endpoint.ProcessLaunchAttempted();
Transport::FromHandle(our_transport)->set_remote_process(child.Duplicate());
return ipcz::MakeRefCounted<MojoIpczChildTestNodeController>(
std::move(child));
source, details, std::move(child));
}

const Mode mode_;
Expand Down
174 changes: 174 additions & 0 deletions third_party/ipcz/src/connect_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -352,5 +352,179 @@ MULTINODE_TEST(ConnectTest, MultiBrokerIntroductions) {
CloseAll({other_broker, client});
}

class ReconnectTestNode : public ConnectTestNode {
public:
void SendTransport(IpczHandle portal, IpczDriverHandle transport) {
IpczBoxContents contents{
.size = sizeof(contents),
.type = IPCZ_BOX_TYPE_DRIVER_OBJECT,
.object = {.driver_object = transport},
};
IpczHandle box;
ASSERT_EQ(IPCZ_RESULT_OK,
ipcz().Box(node(), &contents, IPCZ_NO_FLAGS, nullptr, &box));
EXPECT_EQ(IPCZ_RESULT_OK,
ipcz().Put(portal, nullptr, 0, &box, 1, IPCZ_NO_FLAGS, nullptr));
}

IpczDriverHandle ReceiveTransport(IpczHandle portal) {
IpczDriverHandle handle;
ReceiveTransport(portal, handle);
return handle;
}

IpczHandle Reconnect(IpczDriverHandle transport, IpczConnectNodeFlags flags) {
IpczHandle portal;
Reconnect(transport, flags, portal);
return portal;
}

private:
void ReceiveTransport(IpczHandle portal, IpczDriverHandle& handle) {
IpczHandle box;
EXPECT_EQ(IPCZ_RESULT_OK, WaitToGet(portal, nullptr, {&box, 1}));

IpczBoxContents contents{.size = sizeof(contents)};
EXPECT_EQ(IPCZ_RESULT_OK,
ipcz().Unbox(box, IPCZ_NO_FLAGS, nullptr, &contents));
ASSERT_EQ(IPCZ_BOX_TYPE_DRIVER_OBJECT, contents.type);
handle = contents.object.driver_object;
}

void Reconnect(IpczDriverHandle transport,
IpczConnectNodeFlags flags,
IpczHandle& portal) {
flags |= GetTestDriver()->GetExtraClientConnectNodeFlags();
ASSERT_EQ(IPCZ_RESULT_OK, ipcz().ConnectNode(node(), transport, 1, flags,
nullptr, &portal));
}
};

using ReconnectTest = test::MultinodeTest<ReconnectTestNode>;

MULTINODE_TEST_NODE(ReconnectTestNode, ReconnectionTestNonBroker) {
IpczHandle broker = ConnectToBroker();
VerifyEndToEnd(broker);
IpczDriverHandle new_transport = ReceiveTransport(broker);
IpczHandle new_portal = Reconnect(new_transport, IPCZ_CONNECT_NODE_TO_BROKER);
VerifyEndToEnd(new_portal);
WaitForConditionFlags(broker, IPCZ_TRAP_PEER_CLOSED);
CloseAll({broker, new_portal});
}

MULTINODE_TEST(ReconnectTest, BrokerNonBroker) {
IpczHandle non_broker;
auto controller = SpawnTestNode<ReconnectionTestNonBroker>({&non_broker, 1});
TransportPair transports = controller->CreateNewTransports();
VerifyEndToEnd(non_broker);
SendTransport(non_broker, transports.theirs);
IpczHandle new_portal = Reconnect(transports.ours, IPCZ_NO_FLAGS);
VerifyEndToEnd(new_portal);
WaitForConditionFlags(non_broker, IPCZ_TRAP_PEER_CLOSED);
CloseAll({non_broker, new_portal});
}

MULTINODE_TEST_BROKER_NODE(ReconnectTestNode, ReconnectionTestBroker) {
IpczHandle other_broker = ConnectToBroker();
VerifyEndToEnd(other_broker);
IpczDriverHandle new_transport = ReceiveTransport(other_broker);
IpczHandle new_portal = Reconnect(new_transport, IPCZ_CONNECT_NODE_TO_BROKER);
VerifyEndToEnd(new_portal);
WaitForConditionFlags(other_broker, IPCZ_TRAP_PEER_CLOSED);
CloseAll({other_broker, new_portal});
}

MULTINODE_TEST(ReconnectTest, BrokerBroker) {
IpczHandle other_broker;
auto controller = SpawnTestNode<ReconnectionTestBroker>({&other_broker, 1});
TransportPair transports = controller->CreateNewTransports();
VerifyEndToEnd(other_broker);
SendTransport(other_broker, transports.theirs);
IpczHandle new_portal =
Reconnect(transports.ours, IPCZ_CONNECT_NODE_TO_BROKER);
VerifyEndToEnd(new_portal);
WaitForConditionFlags(other_broker, IPCZ_TRAP_PEER_CLOSED);
CloseAll({other_broker, new_portal});
}

MULTINODE_TEST_NODE(ReconnectTestNode, TransitiveReconnectClientA) {
IpczHandle broker = ConnectToBroker();

IpczHandle client_b;
EXPECT_EQ(IPCZ_RESULT_OK, WaitToGet(broker, nullptr, {&client_b, 1}));
WaitForDirectRemoteLink(client_b);
VerifyEndToEnd(client_b);

IpczDriverHandle new_transport = ReceiveTransport(broker);
IpczHandle new_broker = Reconnect(new_transport, IPCZ_CONNECT_NODE_TO_BROKER);
VerifyEndToEnd(new_broker);
EXPECT_EQ(IPCZ_RESULT_OK,
WaitForConditionFlags(broker, IPCZ_TRAP_PEER_CLOSED));
EXPECT_EQ(IPCZ_RESULT_OK,
WaitForConditionFlags(client_b, IPCZ_TRAP_PEER_CLOSED));

// Pass a portal to the broker which it will forward to client B. Then verify
// that our portal ends up with a working direct link to it, implying that
// the two client nodes have been automatically re-introduced.
auto [new_client_a, new_client_b] = OpenPortals();
Put(new_broker, "", {&new_client_a, 1});
WaitForDirectRemoteLink(new_client_b);
VerifyEndToEnd(new_client_b);

Put(new_broker, "bye");
CloseAll({broker, new_broker, client_b, new_client_b});
}

MULTINODE_TEST_NODE(ReconnectTestNode, TransitiveReconnectClientB) {
IpczHandle broker = ConnectToBroker();

IpczHandle client_a;
EXPECT_EQ(IPCZ_RESULT_OK, WaitToGet(broker, nullptr, {&client_a, 1}));
WaitForDirectRemoteLink(client_a);
VerifyEndToEnd(client_a);

IpczHandle new_client_a;
EXPECT_EQ(IPCZ_RESULT_OK, WaitToGet(broker, nullptr, {&new_client_a, 1}));
EXPECT_EQ(IPCZ_RESULT_OK,
WaitForConditionFlags(client_a, IPCZ_TRAP_PEER_CLOSED));
WaitForDirectRemoteLink(new_client_a);
VerifyEndToEnd(new_client_a);

Put(broker, "bye");
CloseAll({broker, client_a, new_client_a});
}

MULTINODE_TEST(ReconnectTest, TransitiveReconnection) {
// Tests that when a non-broker reconnects to a broker, it can also get
// reconnected to other nodes via that broker.
IpczHandle client_a;
auto a_controller = SpawnTestNode<TransitiveReconnectClientA>({&client_a, 1});
IpczHandle client_b = SpawnTestNode<TransitiveReconnectClientB>();

// Establish a pair of portals between clients A and B.
auto [a, b] = OpenPortals();
Put(client_a, "", {&a, 1});
Put(client_b, "", {&b, 1});

// Send client A a transport with which to re-connect to us. Verify that it's
// reconnected and wait for its previous portal to be closed.
TransportPair transports = a_controller->CreateNewTransports();
SendTransport(client_a, transports.theirs);
IpczHandle new_client_a = Reconnect(transports.ours, IPCZ_NO_FLAGS);
VerifyEndToEnd(new_client_a);
EXPECT_EQ(IPCZ_RESULT_OK,
WaitForConditionFlags(client_a, IPCZ_TRAP_PEER_CLOSED));

// Accept a new portal from A and forward it to B.
IpczHandle new_b_to_a;
EXPECT_EQ(IPCZ_RESULT_OK, WaitToGet(new_client_a, nullptr, {&new_b_to_a, 1}));
Put(client_b, "", {&new_b_to_a, 1});

// Wait for A and B to confirm thier reconnection.
EXPECT_EQ("bye", WaitToGetString(new_client_a));
EXPECT_EQ("bye", WaitToGetString(client_b));
CloseAll({client_a, client_b, new_client_a});
}

} // namespace
} // namespace ipcz
51 changes: 33 additions & 18 deletions third_party/ipcz/src/ipcz/node.cc
Original file line number Diff line number Diff line change
Expand Up @@ -149,26 +149,34 @@ Ref<NodeLink> Node::GetBrokerLink() {
return broker_link_;
}

void Node::SetAssignedName(const NodeName& name) {
absl::MutexLock lock(&mutex_);
ABSL_ASSERT(!assigned_name_.is_valid());
assigned_name_ = name;
}

bool Node::AddConnection(const NodeName& remote_node_name,
Connection connection) {
std::vector<BrokerLinkCallback> callbacks;
{
absl::ReleasableMutexLock lock(&mutex_);
auto [it, inserted] = connections_.insert({remote_node_name, connection});
if (!inserted) {
lock.Release();
absl::MutexLock lock(&mutex_);
for (;;) {
auto it = connections_.find(remote_node_name);
if (it == connections_.end()) {
break;
}

const OperationContext context{OperationContext::kTransportNotification};
connection.link->Deactivate(context);
return false;
// Assume that if we're getting a new connection to an already-known node,
// it must be because the application has explicitly initiated a new
// connection to the same node and it expects the previous connection to
// be replaced.
//
// Note that DropConnection() may elicit trap notifications. Although we
// may be here *either* within a ConnectNode() API call *or* while
// handling an incoming NodeConnector message, we can err on the side of
// caution (i.e. less re-entrancy in event handlers) by treating every
// case like an API call.
mutex_.Unlock();
const OperationContext context{OperationContext::kAPICall};
DropConnection(context, *it->second.link);
mutex_.Lock();
}

connections_.insert({remote_node_name, connection});
const bool remote_is_broker =
connection.link->remote_node_type() == Type::kBroker;
const bool local_is_broker = type_ == Type::kBroker;
Expand All @@ -184,6 +192,7 @@ bool Node::AddConnection(const NodeName& remote_node_name,
ABSL_ASSERT(!broker_link_);
broker_link_ = connection.link;
broker_link_callbacks_.swap(callbacks);
assigned_name_ = broker_link_->local_node_name();
}
}

Expand Down Expand Up @@ -221,7 +230,6 @@ NodeName Node::GenerateRandomName() const {

void Node::SetAllocationDelegate(Ref<NodeLink> link) {
absl::MutexLock lock(&mutex_);
ABSL_ASSERT(!allocation_delegate_link_);
allocation_delegate_link_ = std::move(link);
}

Expand Down Expand Up @@ -463,14 +471,14 @@ bool Node::AcceptRelayedMessage(msg::AcceptRelayedMessage& accept) {
}

void Node::DropConnection(const OperationContext& context,
const NodeName& name) {
const NodeLink& connection_link) {
Ref<NodeLink> link;
std::vector<NodeName> pending_introductions;
bool lost_broker = false;
{
absl::MutexLock lock(&mutex_);
auto it = connections_.find(name);
if (it == connections_.end()) {
auto it = connections_.find(connection_link.remote_node_name());
if (it == connections_.end() || it->second.link != &connection_link) {
return;
}
link = std::move(it->second.link);
Expand Down Expand Up @@ -505,7 +513,13 @@ void Node::DropConnection(const OperationContext& context,
link->Deactivate(context);

if (lost_broker) {
CancelAllIntroductions();
// Break all connections if the broker is lost. In practice we should only
// need to break connections which were introduced by the lost broker, but
// there's less risk of weird future inconsistencies if we just say that as
// a rule, primary broker disconnection serves as a sort of "reset" for a
// node. The node can be re-connected to a broker and continue operating
// normally from there.
ShutDown();
} else {
for (auto& target : pending_introductions) {
NotifyIntroductionFailed(*link, target);
Expand Down Expand Up @@ -582,6 +596,7 @@ void Node::ShutDown() {
broker_link_.reset();
allocation_delegate_link_.reset();
other_brokers_.clear();
assigned_name_ = {};
}

const OperationContext context{OperationContext::kAPICall};
Expand Down
Loading

0 comments on commit 0f15bea

Please sign in to comment.