diff --git a/BUILD b/BUILD index a566057e92613..c8c49ff4a4184 100644 --- a/BUILD +++ b/BUILD @@ -63,6 +63,11 @@ config_setting( values = {"cpu": "x64_windows_msvc"}, ) +config_setting( + name = "mac_x86_64", + values = {"cpu": "darwin"}, +) + # This should be updated along with build.yaml g_stands_for = "godric" @@ -981,6 +986,7 @@ grpc_cc_library( ], language = "c++", public_hdrs = GRPC_PUBLIC_HDRS, + use_cfstream = True, deps = [ "gpr_base", "grpc_codegen", @@ -1044,6 +1050,7 @@ grpc_cc_library( "src/core/lib/iomgr/endpoint_cfstream.h", "src/core/lib/iomgr/error_cfstream.h", ], + use_cfstream = True, deps = [ ":gpr_base", ":grpc_base", diff --git a/bazel/grpc_build_system.bzl b/bazel/grpc_build_system.bzl index be85bc873244e..3ea8e305ca567 100644 --- a/bazel/grpc_build_system.bzl +++ b/bazel/grpc_build_system.bzl @@ -35,6 +35,12 @@ def if_not_windows(a): "//conditions:default": a, }) +def if_mac(a): + return select({ + "//:mac_x86_64": a, + "//conditions:default": [], + }) + def _get_external_deps(external_deps): ret = [] for dep in external_deps: @@ -73,10 +79,16 @@ def grpc_cc_library( testonly = False, visibility = None, alwayslink = 0, - data = []): + data = [], + use_cfstream = False): copts = [] + if use_cfstream: + copts = if_mac(["-DGRPC_CFSTREAM"]) if language.upper() == "C": - copts = if_not_windows(["-std=c99"]) + copts = copts + if_not_windows(["-std=c99"]) + linkopts = if_not_windows(["-pthread"]) + if use_cfstream: + linkopts = linkopts + if_mac(["-framework CoreFoundation"]) native.cc_library( name = name, srcs = srcs, @@ -98,7 +110,7 @@ def grpc_cc_library( copts = copts, visibility = visibility, testonly = testonly, - linkopts = if_not_windows(["-pthread"]), + linkopts = linkopts, includes = [ "include", ], @@ -113,7 +125,6 @@ def grpc_proto_plugin(name, srcs = [], deps = []): deps = deps, ) - def grpc_proto_library( name, srcs = [], @@ -133,9 +144,9 @@ def grpc_proto_library( ) def grpc_cc_test(name, srcs = [], deps = [], external_deps = [], args = [], data = [], uses_polling = True, language = "C++", size = "medium", timeout = None, tags = [], exec_compatible_with = []): - copts = [] + copts = if_mac(["-DGRPC_CFSTREAM"]) if language.upper() == "C": - copts = if_not_windows(["-std=c99"]) + copts = copts + if_not_windows(["-std=c99"]) args = { "name": name, "srcs": srcs, diff --git a/src/core/lib/iomgr/endpoint_cfstream.cc b/src/core/lib/iomgr/endpoint_cfstream.cc index 7c4bc1ace2a36..25146e7861c22 100644 --- a/src/core/lib/iomgr/endpoint_cfstream.cc +++ b/src/core/lib/iomgr/endpoint_cfstream.cc @@ -182,7 +182,7 @@ static void ReadAction(void* arg, grpc_error* error) { GRPC_ERROR_CREATE_FROM_STATIC_STRING("Socket closed"), ep)); EP_UNREF(ep, "read"); } else { - if (read_size < len) { + if (read_size < static_cast(len)) { grpc_slice_buffer_trim_end(ep->read_slices, len - read_size, nullptr); } CallReadCb(ep, GRPC_ERROR_NONE); @@ -217,7 +217,7 @@ static void WriteAction(void* arg, grpc_error* error) { CallWriteCb(ep, error); EP_UNREF(ep, "write"); } else { - if (write_size < GRPC_SLICE_LENGTH(slice)) { + if (write_size < static_cast(GRPC_SLICE_LENGTH(slice))) { grpc_slice_buffer_undo_take_first( ep->write_slices, grpc_slice_sub(slice, write_size, slice_len)); } diff --git a/test/cpp/end2end/BUILD b/test/cpp/end2end/BUILD index 64b3eae60daba..1970f3693cb6a 100644 --- a/test/cpp/end2end/BUILD +++ b/test/cpp/end2end/BUILD @@ -625,3 +625,24 @@ grpc_cc_test( "//test/cpp/util:test_util", ], ) + +grpc_cc_test( + name = "cfstream_test", + srcs = ["cfstream_test.cc"], + external_deps = [ + "gtest", + ], + tags = ["manual"], # test requires root, won't work with bazel RBE + deps = [ + ":test_service_impl", + "//:gpr", + "//:grpc", + "//:grpc++", + "//:grpc_cfstream", + "//src/proto/grpc/testing:echo_messages_proto", + "//src/proto/grpc/testing:echo_proto", + "//src/proto/grpc/testing:simple_messages_proto", + "//test/core/util:grpc_test_util", + "//test/cpp/util:test_util", + ], +) diff --git a/test/cpp/end2end/cfstream_test.cc b/test/cpp/end2end/cfstream_test.cc new file mode 100644 index 0000000000000..9039329d81575 --- /dev/null +++ b/test/cpp/end2end/cfstream_test.cc @@ -0,0 +1,278 @@ +/* + * + * Copyright 2019 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "src/core/lib/iomgr/port.h" + +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "src/core/lib/backoff/backoff.h" +#include "src/core/lib/gpr/env.h" + +#include "src/proto/grpc/testing/echo.grpc.pb.h" +#include "test/core/util/port.h" +#include "test/core/util/test_config.h" +#include "test/cpp/end2end/test_service_impl.h" + +#ifdef GRPC_CFSTREAM +using grpc::testing::EchoRequest; +using grpc::testing::EchoResponse; +using std::chrono::system_clock; + +namespace grpc { +namespace testing { +namespace { + +class CFStreamTest : public ::testing::Test { + protected: + CFStreamTest() + : server_host_("grpctest"), + interface_("lo0"), + ipv4_address_("10.0.0.1"), + netmask_("/32"), + kRequestMessage_("🖖") {} + + void DNSUp() { + std::ostringstream cmd; + // Add DNS entry for server_host_ in /etc/hosts + cmd << "echo '" << ipv4_address_ << " " << server_host_ + << " ' | sudo tee -a /etc/hosts"; + std::system(cmd.str().c_str()); + } + + void DNSDown() { + std::ostringstream cmd; + // Remove DNS entry for server_host_ in /etc/hosts + cmd << "sudo sed -i '.bak' '/" << server_host_ << "/d' /etc/hosts"; + std::system(cmd.str().c_str()); + } + + void InterfaceUp() { + std::ostringstream cmd; + cmd << "sudo /sbin/ifconfig " << interface_ << " alias " << ipv4_address_; + std::system(cmd.str().c_str()); + } + + void InterfaceDown() { + std::ostringstream cmd; + cmd << "sudo /sbin/ifconfig " << interface_ << " -alias " << ipv4_address_; + std::system(cmd.str().c_str()); + } + + void NetworkUp() { + InterfaceUp(); + DNSUp(); + } + + void NetworkDown() { + InterfaceDown(); + DNSDown(); + } + + void SetUp() override { + NetworkUp(); + grpc_init(); + StartServer(); + } + + void TearDown() override { + NetworkDown(); + StopServer(); + grpc_shutdown(); + } + + void StartServer() { + port_ = grpc_pick_unused_port_or_die(); + server_.reset(new ServerData(port_)); + server_->Start(server_host_); + } + void StopServer() { server_->Shutdown(); } + + std::unique_ptr BuildStub( + const std::shared_ptr& channel) { + return grpc::testing::EchoTestService::NewStub(channel); + } + + std::shared_ptr BuildChannel() { + std::ostringstream server_address; + server_address << server_host_ << ":" << port_; + return CreateCustomChannel( + server_address.str(), InsecureChannelCredentials(), ChannelArguments()); + } + + void SendRpc( + const std::unique_ptr& stub, + bool expect_success = false) { + auto response = std::unique_ptr(new EchoResponse()); + EchoRequest request; + request.set_message(kRequestMessage_); + ClientContext context; + Status status = stub->Echo(&context, request, response.get()); + if (status.ok()) { + gpr_log(GPR_DEBUG, "RPC returned %s\n", response->message().c_str()); + } else { + gpr_log(GPR_DEBUG, "RPC failed: %s", status.error_message().c_str()); + } + if (expect_success) { + EXPECT_TRUE(status.ok()); + } + } + + bool WaitForChannelNotReady(Channel* channel, int timeout_seconds = 5) { + const gpr_timespec deadline = + grpc_timeout_seconds_to_deadline(timeout_seconds); + grpc_connectivity_state state; + while ((state = channel->GetState(false /* try_to_connect */)) == + GRPC_CHANNEL_READY) { + if (!channel->WaitForStateChange(state, deadline)) return false; + } + return true; + } + + bool WaitForChannelReady(Channel* channel, int timeout_seconds = 10) { + const gpr_timespec deadline = + grpc_timeout_seconds_to_deadline(timeout_seconds); + grpc_connectivity_state state; + while ((state = channel->GetState(true /* try_to_connect */)) != + GRPC_CHANNEL_READY) { + if (!channel->WaitForStateChange(state, deadline)) return false; + } + return true; + } + + private: + struct ServerData { + int port_; + std::unique_ptr server_; + TestServiceImpl service_; + std::unique_ptr thread_; + bool server_ready_ = false; + + explicit ServerData(int port) { port_ = port; } + + void Start(const grpc::string& server_host) { + gpr_log(GPR_INFO, "starting server on port %d", port_); + std::mutex mu; + std::unique_lock lock(mu); + std::condition_variable cond; + thread_.reset(new std::thread( + std::bind(&ServerData::Serve, this, server_host, &mu, &cond))); + cond.wait(lock, [this] { return server_ready_; }); + server_ready_ = false; + gpr_log(GPR_INFO, "server startup complete"); + } + + void Serve(const grpc::string& server_host, std::mutex* mu, + std::condition_variable* cond) { + std::ostringstream server_address; + server_address << server_host << ":" << port_; + ServerBuilder builder; + builder.AddListeningPort(server_address.str(), + InsecureServerCredentials()); + builder.RegisterService(&service_); + server_ = builder.BuildAndStart(); + std::lock_guard lock(*mu); + server_ready_ = true; + cond->notify_one(); + } + + void Shutdown(bool join = true) { + server_->Shutdown(grpc_timeout_milliseconds_to_deadline(0)); + if (join) thread_->join(); + } + }; + + const grpc::string server_host_; + const grpc::string interface_; + const grpc::string ipv4_address_; + const grpc::string netmask_; + std::unique_ptr stub_; + std::unique_ptr server_; + int port_; + const grpc::string kRequestMessage_; +}; + +// gRPC should automatically detech network flaps (without enabling keepalives) +// when CFStream is enabled +TEST_F(CFStreamTest, NetworkTransition) { + auto channel = BuildChannel(); + auto stub = BuildStub(channel); + // Channel should be in READY state after we send an RPC + SendRpc(stub, /*expect_success=*/true); + EXPECT_EQ(channel->GetState(false), GRPC_CHANNEL_READY); + + std::atomic_bool shutdown{false}; + std::thread sender = std::thread([this, &stub, &shutdown]() { + while (true) { + if (shutdown.load()) { + return; + } + SendRpc(stub); + std::this_thread::sleep_for(std::chrono::milliseconds(1000)); + } + }); + + // bring down network + NetworkDown(); + + // network going down should be detected by cfstream + EXPECT_TRUE(WaitForChannelNotReady(channel.get())); + + // bring network interface back up + std::this_thread::sleep_for(std::chrono::milliseconds(1000)); + NetworkUp(); + + // channel should reconnect + EXPECT_TRUE(WaitForChannelReady(channel.get())); + EXPECT_EQ(channel->GetState(false), GRPC_CHANNEL_READY); + shutdown.store(true); + sender.join(); +} + +} // namespace +} // namespace testing +} // namespace grpc +#endif // GRPC_CFSTREAM + +int main(int argc, char** argv) { + ::testing::InitGoogleTest(&argc, argv); + grpc_test_init(argc, argv); + gpr_setenv("grpc_cfstream", "1"); + // TODO (pjaikumar): remove the line below when + // https://github.com/grpc/grpc/issues/18080 has been fixed. + gpr_setenv("GRPC_DNS_RESOLVER", "native"); + const auto result = RUN_ALL_TESTS(); + return result; +} diff --git a/tools/internal_ci/macos/grpc_cfstream.cfg b/tools/internal_ci/macos/grpc_cfstream.cfg new file mode 100644 index 0000000000000..2b1ce0a89c7d6 --- /dev/null +++ b/tools/internal_ci/macos/grpc_cfstream.cfg @@ -0,0 +1,19 @@ +# Copyright 2019 The gRPC Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Config file for the internal CI (in protobuf text format) + +# Location of the continuous shell script in repository. +build_file: "grpc/tools/internal_ci/macos/grpc_run_bazel_tests.sh" + diff --git a/tools/internal_ci/macos/grpc_run_bazel_tests.sh b/tools/internal_ci/macos/grpc_run_bazel_tests.sh new file mode 100644 index 0000000000000..ef02a675d5b82 --- /dev/null +++ b/tools/internal_ci/macos/grpc_run_bazel_tests.sh @@ -0,0 +1,29 @@ +#!/usr/bin/env bash +# Copyright 2019 The gRPC Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +set -ex + +# change to grpc repo root +cd $(dirname $0)/../../.. + + +./tools/run_tests/start_port_server.py + +# run cfstream_test separately because it messes with the network +bazel test --spawn_strategy=standalone --genrule_strategy=standalone --test_output=all //test/cpp/end2end:cfstream_test + +# kill port_server.py to prevent the build from hanging +ps aux | grep port_server\\.py | awk '{print $2}' | xargs kill -9 +