From a20a2ad41bfb1c30b0442578fa9fa1754bc60d53 Mon Sep 17 00:00:00 2001 From: David Garcia Quintas Date: Thu, 19 May 2016 10:53:44 -0700 Subject: [PATCH 01/20] moved core_codegen from src/cpp/... to include/grpc++/... --- BUILD | 5 +++-- Makefile | 2 ++ build.yaml | 4 ++-- .../grpc++/impl/codegen}/core_codegen.h | 0 include/grpc++/impl/grpc_library.h | 3 +-- src/cpp/common/core_codegen.cc | 2 +- tools/doxygen/Doxyfile.c++ | 1 + tools/doxygen/Doxyfile.c++.internal | 3 ++- tools/run_tests/sources_and_headers.json | 8 ++++---- vsprojects/vcxproj/grpc++/grpc++.vcxproj | 3 ++- vsprojects/vcxproj/grpc++/grpc++.vcxproj.filters | 9 ++++++--- .../vcxproj/grpc++_unsecure/grpc++_unsecure.vcxproj | 2 +- .../grpc++_unsecure/grpc++_unsecure.vcxproj.filters | 6 +++--- 13 files changed, 28 insertions(+), 20 deletions(-) rename {src/cpp/common => include/grpc++/impl/codegen}/core_codegen.h (100%) diff --git a/BUILD b/BUILD index 793c1c714de5e..c7fe631265101 100644 --- a/BUILD +++ b/BUILD @@ -844,8 +844,8 @@ cc_library( cc_library( name = "grpc++", srcs = [ + "include/grpc++/impl/codegen/core_codegen.h", "src/cpp/client/secure_credentials.h", - "src/cpp/common/core_codegen.h", "src/cpp/common/secure_auth_context.h", "src/cpp/server/secure_server_credentials.h", "src/cpp/client/create_channel_internal.h", @@ -894,6 +894,7 @@ cc_library( "include/grpc++/grpc++.h", "include/grpc++/impl/call.h", "include/grpc++/impl/client_unary_call.h", + "include/grpc++/impl/codegen/core_codegen.h", "include/grpc++/impl/grpc_library.h", "include/grpc++/impl/method_handler_impl.h", "include/grpc++/impl/proto_utils.h", @@ -1001,7 +1002,6 @@ cc_library( name = "grpc++_unsecure", srcs = [ "src/cpp/client/create_channel_internal.h", - "src/cpp/common/core_codegen.h", "src/cpp/server/dynamic_thread_pool.h", "src/cpp/server/thread_pool_interface.h", "src/cpp/common/insecure_create_auth_context.cc", @@ -1042,6 +1042,7 @@ cc_library( "include/grpc++/grpc++.h", "include/grpc++/impl/call.h", "include/grpc++/impl/client_unary_call.h", + "include/grpc++/impl/codegen/core_codegen.h", "include/grpc++/impl/grpc_library.h", "include/grpc++/impl/method_handler_impl.h", "include/grpc++/impl/proto_utils.h", diff --git a/Makefile b/Makefile index 42cedf51c3a86..d42e672cd5d3c 100644 --- a/Makefile +++ b/Makefile @@ -3216,6 +3216,7 @@ PUBLIC_HEADERS_CXX += \ include/grpc++/grpc++.h \ include/grpc++/impl/call.h \ include/grpc++/impl/client_unary_call.h \ + include/grpc++/impl/codegen/core_codegen.h \ include/grpc++/impl/grpc_library.h \ include/grpc++/impl/method_handler_impl.h \ include/grpc++/impl/proto_utils.h \ @@ -3522,6 +3523,7 @@ PUBLIC_HEADERS_CXX += \ include/grpc++/grpc++.h \ include/grpc++/impl/call.h \ include/grpc++/impl/client_unary_call.h \ + include/grpc++/impl/codegen/core_codegen.h \ include/grpc++/impl/grpc_library.h \ include/grpc++/impl/method_handler_impl.h \ include/grpc++/impl/proto_utils.h \ diff --git a/build.yaml b/build.yaml index ac61612da4074..fff4a49c2d06c 100644 --- a/build.yaml +++ b/build.yaml @@ -597,6 +597,7 @@ filegroups: - include/grpc++/grpc++.h - include/grpc++/impl/call.h - include/grpc++/impl/client_unary_call.h + - include/grpc++/impl/codegen/core_codegen.h - include/grpc++/impl/grpc_library.h - include/grpc++/impl/method_handler_impl.h - include/grpc++/impl/proto_utils.h @@ -633,7 +634,6 @@ filegroups: - include/grpc++/support/time.h headers: - src/cpp/client/create_channel_internal.h - - src/cpp/common/core_codegen.h - src/cpp/server/dynamic_thread_pool.h - src/cpp/server/thread_pool_interface.h src: @@ -880,8 +880,8 @@ libs: build: all language: c++ headers: + - include/grpc++/impl/codegen/core_codegen.h - src/cpp/client/secure_credentials.h - - src/cpp/common/core_codegen.h - src/cpp/common/secure_auth_context.h - src/cpp/server/secure_server_credentials.h src: diff --git a/src/cpp/common/core_codegen.h b/include/grpc++/impl/codegen/core_codegen.h similarity index 100% rename from src/cpp/common/core_codegen.h rename to include/grpc++/impl/codegen/core_codegen.h diff --git a/include/grpc++/impl/grpc_library.h b/include/grpc++/impl/grpc_library.h index 175cf99a82bae..aaa9e4c8a5479 100644 --- a/include/grpc++/impl/grpc_library.h +++ b/include/grpc++/impl/grpc_library.h @@ -38,10 +38,9 @@ #include #include +#include #include -#include "src/cpp/common/core_codegen.h" - namespace grpc { namespace internal { diff --git a/src/cpp/common/core_codegen.cc b/src/cpp/common/core_codegen.cc index 8e8d42eb294ea..cc35aa69bab12 100644 --- a/src/cpp/common/core_codegen.cc +++ b/src/cpp/common/core_codegen.cc @@ -31,7 +31,7 @@ * */ -#include "src/cpp/common/core_codegen.h" +#include #include diff --git a/tools/doxygen/Doxyfile.c++ b/tools/doxygen/Doxyfile.c++ index 798d68b0181e8..c92259f991b4c 100644 --- a/tools/doxygen/Doxyfile.c++ +++ b/tools/doxygen/Doxyfile.c++ @@ -770,6 +770,7 @@ include/grpc++/generic/generic_stub.h \ include/grpc++/grpc++.h \ include/grpc++/impl/call.h \ include/grpc++/impl/client_unary_call.h \ +include/grpc++/impl/codegen/core_codegen.h \ include/grpc++/impl/grpc_library.h \ include/grpc++/impl/method_handler_impl.h \ include/grpc++/impl/proto_utils.h \ diff --git a/tools/doxygen/Doxyfile.c++.internal b/tools/doxygen/Doxyfile.c++.internal index ce1d6ac3c1b61..bdc4534c90d76 100644 --- a/tools/doxygen/Doxyfile.c++.internal +++ b/tools/doxygen/Doxyfile.c++.internal @@ -770,6 +770,7 @@ include/grpc++/generic/generic_stub.h \ include/grpc++/grpc++.h \ include/grpc++/impl/call.h \ include/grpc++/impl/client_unary_call.h \ +include/grpc++/impl/codegen/core_codegen.h \ include/grpc++/impl/grpc_library.h \ include/grpc++/impl/method_handler_impl.h \ include/grpc++/impl/proto_utils.h \ @@ -859,8 +860,8 @@ include/grpc++/impl/codegen/config.h \ include/grpc++/impl/codegen/config_protobuf.h \ include/grpc++/support/config.h \ include/grpc++/support/config_protobuf.h \ +include/grpc++/impl/codegen/core_codegen.h \ src/cpp/client/secure_credentials.h \ -src/cpp/common/core_codegen.h \ src/cpp/common/secure_auth_context.h \ src/cpp/server/secure_server_credentials.h \ src/cpp/client/create_channel_internal.h \ diff --git a/tools/run_tests/sources_and_headers.json b/tools/run_tests/sources_and_headers.json index 3866ebb0e55e4..840fc3be4b655 100644 --- a/tools/run_tests/sources_and_headers.json +++ b/tools/run_tests/sources_and_headers.json @@ -4325,18 +4325,18 @@ "grpc++_codegen" ], "headers": [ + "include/grpc++/impl/codegen/core_codegen.h", "src/cpp/client/secure_credentials.h", - "src/cpp/common/core_codegen.h", "src/cpp/common/secure_auth_context.h", "src/cpp/server/secure_server_credentials.h" ], "language": "c++", "name": "grpc++", "src": [ + "include/grpc++/impl/codegen/core_codegen.h", "src/cpp/client/secure_credentials.cc", "src/cpp/client/secure_credentials.h", "src/cpp/common/auth_property_iterator.cc", - "src/cpp/common/core_codegen.h", "src/cpp/common/secure_auth_context.cc", "src/cpp/common/secure_auth_context.h", "src/cpp/common/secure_channel_arguments.cc", @@ -6380,6 +6380,7 @@ "include/grpc++/grpc++.h", "include/grpc++/impl/call.h", "include/grpc++/impl/client_unary_call.h", + "include/grpc++/impl/codegen/core_codegen.h", "include/grpc++/impl/grpc_library.h", "include/grpc++/impl/method_handler_impl.h", "include/grpc++/impl/proto_utils.h", @@ -6415,7 +6416,6 @@ "include/grpc++/support/sync_stream.h", "include/grpc++/support/time.h", "src/cpp/client/create_channel_internal.h", - "src/cpp/common/core_codegen.h", "src/cpp/server/dynamic_thread_pool.h", "src/cpp/server/thread_pool_interface.h" ], @@ -6432,6 +6432,7 @@ "include/grpc++/grpc++.h", "include/grpc++/impl/call.h", "include/grpc++/impl/client_unary_call.h", + "include/grpc++/impl/codegen/core_codegen.h", "include/grpc++/impl/grpc_library.h", "include/grpc++/impl/method_handler_impl.h", "include/grpc++/impl/proto_utils.h", @@ -6477,7 +6478,6 @@ "src/cpp/common/channel_arguments.cc", "src/cpp/common/completion_queue.cc", "src/cpp/common/core_codegen.cc", - "src/cpp/common/core_codegen.h", "src/cpp/common/rpc_method.cc", "src/cpp/server/async_generic_service.cc", "src/cpp/server/create_default_thread_pool.cc", diff --git a/vsprojects/vcxproj/grpc++/grpc++.vcxproj b/vsprojects/vcxproj/grpc++/grpc++.vcxproj index 65de5e97175a0..6a1ae52924ad0 100644 --- a/vsprojects/vcxproj/grpc++/grpc++.vcxproj +++ b/vsprojects/vcxproj/grpc++/grpc++.vcxproj @@ -268,6 +268,7 @@ + @@ -359,8 +360,8 @@ + - diff --git a/vsprojects/vcxproj/grpc++/grpc++.vcxproj.filters b/vsprojects/vcxproj/grpc++/grpc++.vcxproj.filters index ce50bd9de8d86..2116d6a6558de 100644 --- a/vsprojects/vcxproj/grpc++/grpc++.vcxproj.filters +++ b/vsprojects/vcxproj/grpc++/grpc++.vcxproj.filters @@ -126,6 +126,9 @@ include\grpc++\impl + + include\grpc++\impl\codegen + include\grpc++\impl @@ -395,12 +398,12 @@ + + include\grpc++\impl\codegen + src\cpp\client - - src\cpp\common - src\cpp\common diff --git a/vsprojects/vcxproj/grpc++_unsecure/grpc++_unsecure.vcxproj b/vsprojects/vcxproj/grpc++_unsecure/grpc++_unsecure.vcxproj index 895e2233518fa..82240a79c04b2 100644 --- a/vsprojects/vcxproj/grpc++_unsecure/grpc++_unsecure.vcxproj +++ b/vsprojects/vcxproj/grpc++_unsecure/grpc++_unsecure.vcxproj @@ -268,6 +268,7 @@ + @@ -360,7 +361,6 @@ - diff --git a/vsprojects/vcxproj/grpc++_unsecure/grpc++_unsecure.vcxproj.filters b/vsprojects/vcxproj/grpc++_unsecure/grpc++_unsecure.vcxproj.filters index ab305fa92972d..60f5d4182ea2f 100644 --- a/vsprojects/vcxproj/grpc++_unsecure/grpc++_unsecure.vcxproj.filters +++ b/vsprojects/vcxproj/grpc++_unsecure/grpc++_unsecure.vcxproj.filters @@ -111,6 +111,9 @@ include\grpc++\impl + + include\grpc++\impl\codegen + include\grpc++\impl @@ -383,9 +386,6 @@ src\cpp\client - - src\cpp\common - src\cpp\server From 8ba60db6ed050a53a5e97d5da176363ef5e4b2d4 Mon Sep 17 00:00:00 2001 From: David Garcia Quintas Date: Fri, 20 May 2016 13:53:14 -0700 Subject: [PATCH 02/20] Check content type on the client response path --- src/core/lib/channel/http_client_filter.c | 20 ++++++++++++++++++++ src/core/lib/channel/http_server_filter.c | 2 +- 2 files changed, 21 insertions(+), 1 deletion(-) diff --git a/src/core/lib/channel/http_client_filter.c b/src/core/lib/channel/http_client_filter.c index 516e708d1f039..d56e3ab67281e 100644 --- a/src/core/lib/channel/http_client_filter.c +++ b/src/core/lib/channel/http_client_filter.c @@ -39,6 +39,9 @@ #include "src/core/lib/support/string.h" #include "src/core/lib/transport/static_metadata.h" +#define EXPECTED_CONTENT_TYPE "application/grpc" +#define EXPECTED_CONTENT_TYPE_LENGTH sizeof(EXPECTED_CONTENT_TYPE) - 1 + typedef struct call_data { grpc_linked_mdelem method; grpc_linked_mdelem scheme; @@ -74,7 +77,24 @@ static grpc_mdelem *client_recv_filter(void *user_data, grpc_mdelem *md) { } else if (md->key == GRPC_MDSTR_STATUS) { grpc_call_element_send_cancel(a->exec_ctx, a->elem); return NULL; + } else if (md == GRPC_MDELEM_CONTENT_TYPE_APPLICATION_SLASH_GRPC) { + return NULL; } else if (md->key == GRPC_MDSTR_CONTENT_TYPE) { + const char *value_str = grpc_mdstr_as_c_string(md->value); + if (strncmp(value_str, EXPECTED_CONTENT_TYPE, + EXPECTED_CONTENT_TYPE_LENGTH) == 0 && + (value_str[EXPECTED_CONTENT_TYPE_LENGTH] == '+' || + value_str[EXPECTED_CONTENT_TYPE_LENGTH] == ';')) { + /* Although the C implementation doesn't (currently) generate them, + any custom +-suffix is explicitly valid. */ + /* TODO(klempner): We should consider preallocating common values such + as +proto or +json, or at least stashing them if we see them. */ + /* TODO(klempner): Should we be surfacing this to application code? */ + } else { + /* TODO(klempner): We're currently allowing this, but we shouldn't + see it without a proxy so log for now. */ + gpr_log(GPR_INFO, "Unexpected content-type '%s'", value_str); + } return NULL; } return md; diff --git a/src/core/lib/channel/http_server_filter.c b/src/core/lib/channel/http_server_filter.c index ba865416decdc..cbc8b189b4bb6 100644 --- a/src/core/lib/channel/http_server_filter.c +++ b/src/core/lib/channel/http_server_filter.c @@ -108,7 +108,7 @@ static grpc_mdelem *server_filter(void *user_data, grpc_mdelem *md) { } else { /* TODO(klempner): We're currently allowing this, but we shouldn't see it without a proxy so log for now. */ - gpr_log(GPR_INFO, "Unexpected content-type %s", value_str); + gpr_log(GPR_INFO, "Unexpected content-type '%s'", value_str); } return NULL; } else if (md->key == GRPC_MDSTR_TE || md->key == GRPC_MDSTR_METHOD || From 8d8f9a891baebde4146f85f6c82d0372469f3b33 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Fri, 3 Jun 2016 08:49:11 -0700 Subject: [PATCH 03/20] Fix refcounting algorithm --- src/core/lib/transport/metadata.c | 39 +++++++++++++++---------------- 1 file changed, 19 insertions(+), 20 deletions(-) diff --git a/src/core/lib/transport/metadata.c b/src/core/lib/transport/metadata.c index 82c8e239f6870..751305e5d7c9c 100644 --- a/src/core/lib/transport/metadata.c +++ b/src/core/lib/transport/metadata.c @@ -129,7 +129,10 @@ typedef struct mdtab_shard { internal_metadata **elems; size_t count; size_t capacity; - size_t free; + /** Estimate of the number of unreferenced mdelems in the hash table. + This will eventually converge to the exact number, but it's instantaneous + accuracy is not guaranteed */ + gpr_atm free_estimate; } mdtab_shard; #define LOG2_STRTAB_SHARD_COUNT 5 @@ -217,7 +220,7 @@ void grpc_mdctx_global_init(void) { mdtab_shard *shard = &g_mdtab_shard[i]; gpr_mu_init(&shard->mu); shard->count = 0; - shard->free = 0; + gpr_atm_no_barrier_store(&shard->free_estimate, 0); shard->capacity = INITIAL_MDTAB_CAPACITY; shard->elems = gpr_malloc(sizeof(*shard->elems) * shard->capacity); memset(shard->elems, 0, sizeof(*shard->elems) * shard->capacity); @@ -281,10 +284,8 @@ static void ref_md_locked(mdtab_shard *shard, grpc_mdstr_as_c_string((grpc_mdstr *)md->key), grpc_mdstr_as_c_string((grpc_mdstr *)md->value)); #endif - if (0 == gpr_atm_no_barrier_fetch_add(&md->refcnt, 2)) { - shard->free--; - } else { - GPR_ASSERT(1 != gpr_atm_no_barrier_fetch_add(&md->refcnt, -1)); + if (0 == gpr_atm_no_barrier_fetch_add(&md->refcnt, 1)) { + gpr_atm_no_barrier_fetch_add(&shard->free_estimate, -1); } } @@ -447,6 +448,7 @@ static void gc_mdtab(mdtab_shard *shard) { size_t i; internal_metadata **prev_next; internal_metadata *md, *next; + gpr_atm num_freed = 0; GPR_TIMER_BEGIN("gc_mdtab", 0); for (i = 0; i < shard->capacity; i++) { @@ -463,13 +465,14 @@ static void gc_mdtab(mdtab_shard *shard) { } gpr_free(md); *prev_next = next; - shard->free--; + num_freed++; shard->count--; } else { prev_next = &md->bucket_next; } } } + gpr_atm_no_barrier_fetch_add(&shard->free_estimate, -num_freed); GPR_TIMER_END("gc_mdtab", 0); } @@ -504,7 +507,8 @@ static void grow_mdtab(mdtab_shard *shard) { } static void rehash_mdtab(mdtab_shard *shard) { - if (shard->free > shard->capacity / 4) { + if ((size_t)gpr_atm_no_barrier_load(&shard->free_estimate) > + shard->capacity / 4) { gc_mdtab(shard); } else { grow_mdtab(shard); @@ -553,7 +557,7 @@ grpc_mdelem *grpc_mdelem_from_metadata_strings(grpc_mdstr *mkey, /* not found: create a new pair */ md = gpr_malloc(sizeof(internal_metadata)); - gpr_atm_rel_store(&md->refcnt, 2); + gpr_atm_rel_store(&md->refcnt, 1); md->key = key; md->value = value; md->user_data = 0; @@ -645,7 +649,7 @@ grpc_mdelem *grpc_mdelem_ref(grpc_mdelem *gmd DEBUG_ARGS) { this function - meaning that no adjustment to mdtab_free is necessary, simplifying the logic here to be just an atomic increment */ /* use C assert to have this removed in opt builds */ - assert(gpr_atm_no_barrier_load(&md->refcnt) >= 2); + assert(gpr_atm_no_barrier_load(&md->refcnt) >= 1); gpr_atm_no_barrier_fetch_add(&md->refcnt, 1); return gmd; } @@ -662,18 +666,13 @@ void grpc_mdelem_unref(grpc_mdelem *gmd DEBUG_ARGS) { grpc_mdstr_as_c_string((grpc_mdstr *)md->key), grpc_mdstr_as_c_string((grpc_mdstr *)md->value)); #endif - if (2 == gpr_atm_full_fetch_add(&md->refcnt, -1)) { - uint32_t hash = GRPC_MDSTR_KV_HASH(md->key->hash, md->value->hash); + uint32_t hash = GRPC_MDSTR_KV_HASH(md->key->hash, md->value->hash); + if (1 == gpr_atm_full_fetch_add(&md->refcnt, -1)) { + /* once the refcount hits zero, some other thread can come along and + free md at any time: it's unsafe from this point on to access it */ mdtab_shard *shard = &g_mdtab_shard[SHARD_IDX(hash, LOG2_MDTAB_SHARD_COUNT)]; - GPR_TIMER_BEGIN("grpc_mdelem_unref.to_zero", 0); - gpr_mu_lock(&shard->mu); - if (1 == gpr_atm_no_barrier_load(&md->refcnt)) { - shard->free++; - gpr_atm_no_barrier_store(&md->refcnt, 0); - } - gpr_mu_unlock(&shard->mu); - GPR_TIMER_END("grpc_mdelem_unref.to_zero", 0); + gpr_atm_no_barrier_fetch_add(&shard->free_estimate, 1); } } From 139098c1882b03bd4571af03a5a7a6694f87e0dd Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Mon, 6 Jun 2016 08:10:08 -0700 Subject: [PATCH 04/20] Fix cast to avoid potential wraparound --- src/core/lib/transport/metadata.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/core/lib/transport/metadata.c b/src/core/lib/transport/metadata.c index 751305e5d7c9c..79de54beb595c 100644 --- a/src/core/lib/transport/metadata.c +++ b/src/core/lib/transport/metadata.c @@ -507,8 +507,8 @@ static void grow_mdtab(mdtab_shard *shard) { } static void rehash_mdtab(mdtab_shard *shard) { - if ((size_t)gpr_atm_no_barrier_load(&shard->free_estimate) > - shard->capacity / 4) { + if (gpr_atm_no_barrier_load(&shard->free_estimate) > + (gpr_atm)(shard->capacity / 4)) { gc_mdtab(shard); } else { grow_mdtab(shard); From 571c12ef18ab9021bfaadac3434c5321d6297c38 Mon Sep 17 00:00:00 2001 From: Ken Payson Date: Mon, 6 Jun 2016 23:13:27 -0700 Subject: [PATCH 05/20] Remove AsyncStreaming client from python qps tests --- .../grpcio/tests/qps/benchmark_client.py | 45 ------------------- src/python/grpcio/tests/qps/qps_worker.py | 4 +- src/python/grpcio/tests/qps/worker_server.py | 5 +-- .../run_tests/performance/scenario_config.py | 8 ++-- 4 files changed, 7 insertions(+), 55 deletions(-) diff --git a/src/python/grpcio/tests/qps/benchmark_client.py b/src/python/grpcio/tests/qps/benchmark_client.py index b372ea01ade53..aac218ed81953 100644 --- a/src/python/grpcio/tests/qps/benchmark_client.py +++ b/src/python/grpcio/tests/qps/benchmark_client.py @@ -185,48 +185,3 @@ def _request_generator(self): yield request except queue.Empty: pass - - -class AsyncReceiver(face.ResponseReceiver): - """Receiver for async stream responses.""" - - def __init__(self, send_time_queue, response_handler): - self._send_time_queue = send_time_queue - self._response_handler = response_handler - - def initial_metadata(self, initial_mdetadata): - pass - - def response(self, response): - end_time = time.time() - self._response_handler(end_time - self._send_time_queue.get_nowait()) - - def complete(self, terminal_metadata, code, details): - pass - - -class StreamingAsyncBenchmarkClient(BenchmarkClient): - - def __init__(self, server, config, hist): - super(StreamingAsyncBenchmarkClient, self).__init__(server, config, hist) - self._send_time_queue = queue.Queue() - self._receiver = AsyncReceiver(self._send_time_queue, self._handle_response) - self._rendezvous = None - - def send_request(self): - if self._rendezvous is not None: - self._send_time_queue.put(time.time()) - self._rendezvous.consume(self._request) - - def start(self): - if self._generic: - stream_callable = self._stub.stream_stream( - 'grpc.testing.BenchmarkService', 'StreamingCall') - else: - stream_callable = self._stub.StreamingCall - self._rendezvous = stream_callable.event( - self._receiver, lambda *args: None, _TIMEOUT) - - def stop(self): - self._rendezvous.terminate() - self._rendezvous = None diff --git a/src/python/grpcio/tests/qps/qps_worker.py b/src/python/grpcio/tests/qps/qps_worker.py index 3dda718638e21..16926379a5bca 100644 --- a/src/python/grpcio/tests/qps/qps_worker.py +++ b/src/python/grpcio/tests/qps/qps_worker.py @@ -43,9 +43,7 @@ def run_worker_server(port): server.add_insecure_port('[::]:{}'.format(port)) server.start() servicer.wait_for_quit() - # Drain outstanding requests for clean exit - time.sleep(2) - server.stop(0) + server.stop(2) if __name__ == '__main__': diff --git a/src/python/grpcio/tests/qps/worker_server.py b/src/python/grpcio/tests/qps/worker_server.py index 1f9af5482ccc4..d41f8377c2a11 100644 --- a/src/python/grpcio/tests/qps/worker_server.py +++ b/src/python/grpcio/tests/qps/worker_server.py @@ -153,9 +153,8 @@ def _create_client_runner(self, server, config, qps_data): if config.rpc_type == control_pb2.UNARY: client = benchmark_client.UnaryAsyncBenchmarkClient( server, config, qps_data) - elif config.rpc_type == control_pb2.STREAMING: - client = benchmark_client.StreamingAsyncBenchmarkClient( - server, config, qps_data) + else: + raise Exception('Async streaming client not supported') else: raise Exception('Unsupported client type {}'.format(config.client_type)) diff --git a/tools/run_tests/performance/scenario_config.py b/tools/run_tests/performance/scenario_config.py index b55d728d840fa..81569e8b7a467 100644 --- a/tools/run_tests/performance/scenario_config.py +++ b/tools/run_tests/performance/scenario_config.py @@ -395,8 +395,8 @@ def scenarios(self): # categories=[SMOKETEST]) yield _ping_pong_scenario( - 'python_protobuf_async_streaming_ping_pong', rpc_type='STREAMING', - client_type='ASYNC_CLIENT', server_type='SYNC_SERVER') + 'python_protobuf_sync_streaming_ping_pong', rpc_type='STREAMING', + client_type='SYNC_CLIENT', server_type='SYNC_SERVER') yield _ping_pong_scenario( 'python_protobuf_async_unary_ping_pong', rpc_type='UNARY', @@ -413,8 +413,8 @@ def scenarios(self): unconstrained_client='sync') yield _ping_pong_scenario( - 'python_protobuf_async_streaming_qps_unconstrained', rpc_type='STREAMING', - client_type='ASYNC_CLIENT', server_type='SYNC_SERVER', + 'python_protobuf_sync_streaming_qps_unconstrained', rpc_type='STREAMING', + client_type='SYNC_CLIENT', server_type='SYNC_SERVER', unconstrained_client='async') yield _ping_pong_scenario( From 5cdc9b9181b990a29cffaf797e3396d25017cd1e Mon Sep 17 00:00:00 2001 From: Makarand Dharmapurikar Date: Tue, 7 Jun 2016 13:24:35 -0700 Subject: [PATCH 06/20] added comment about host parameter format (issue 4147) --- src/objective-c/GRPCClient/GRPCCall.h | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/objective-c/GRPCClient/GRPCCall.h b/src/objective-c/GRPCClient/GRPCCall.h index 7a77ae60b6a99..b9e741dfa8fe3 100644 --- a/src/objective-c/GRPCClient/GRPCCall.h +++ b/src/objective-c/GRPCClient/GRPCCall.h @@ -220,6 +220,8 @@ extern id const kGRPCTrailersKey; * messages to the response side of the call indefinitely (depending on the semantics of the * specific remote method called). * To finish a call right away, invoke cancel. + * host parameter should not contain the scheme (http:// or https://), only the name or IP addr + * and the port number, for example @"localhost:5050". */ - (instancetype)initWithHost:(NSString *)host path:(NSString *)path From b43bda43ec86607449e446de37c4d357a437589c Mon Sep 17 00:00:00 2001 From: Makarand Dharmapurikar Date: Tue, 7 Jun 2016 13:40:36 -0700 Subject: [PATCH 07/20] added comment about host parameter format (issue 4147) --- src/objective-c/ProtoRPC/ProtoRPC.h | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/objective-c/ProtoRPC/ProtoRPC.h b/src/objective-c/ProtoRPC/ProtoRPC.h index bd926b732873e..642b8890ffbd4 100644 --- a/src/objective-c/ProtoRPC/ProtoRPC.h +++ b/src/objective-c/ProtoRPC/ProtoRPC.h @@ -38,6 +38,10 @@ @interface ProtoRPC : GRPCCall +/* + * host parameter should not contain the scheme (http:// or https://), only the name or IP addr + * and the port number, for example @"localhost:5050". + */ - (instancetype)initWithHost:(NSString *)host method:(ProtoMethod *)method requestsWriter:(GRXWriter *)requestsWriter From 150e5025cb6535eb1387bdf7482f5f61243cdd71 Mon Sep 17 00:00:00 2001 From: Makarand Dharmapurikar Date: Tue, 7 Jun 2016 15:00:20 -0700 Subject: [PATCH 08/20] added ** for doc autogen. --- src/objective-c/ProtoRPC/ProtoRPC.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/objective-c/ProtoRPC/ProtoRPC.h b/src/objective-c/ProtoRPC/ProtoRPC.h index 642b8890ffbd4..170d8b9c689ae 100644 --- a/src/objective-c/ProtoRPC/ProtoRPC.h +++ b/src/objective-c/ProtoRPC/ProtoRPC.h @@ -38,7 +38,7 @@ @interface ProtoRPC : GRPCCall -/* +/** * host parameter should not contain the scheme (http:// or https://), only the name or IP addr * and the port number, for example @"localhost:5050". */ From 6f9501098ec0714f620b0913ffdc6f824f8aca14 Mon Sep 17 00:00:00 2001 From: Makarand Dharmapurikar Date: Tue, 7 Jun 2016 16:52:19 -0700 Subject: [PATCH 09/20] fix for issue 5548. Deprecated old classes and created subclasses. --- src/compiler/objective_c_generator.cc | 4 ++-- src/objective-c/ProtoRPC/ProtoMethod.h | 9 +++++++++ src/objective-c/ProtoRPC/ProtoMethod.m | 4 ++++ src/objective-c/ProtoRPC/ProtoRPC.h | 11 ++++++++++- src/objective-c/ProtoRPC/ProtoRPC.m | 6 +++++- src/objective-c/ProtoRPC/ProtoService.h | 15 +++++++++++++-- src/objective-c/ProtoRPC/ProtoService.m | 4 ++++ 7 files changed, 47 insertions(+), 6 deletions(-) diff --git a/src/compiler/objective_c_generator.cc b/src/compiler/objective_c_generator.cc index 465491e385f4b..75665cd5c0d46 100644 --- a/src/compiler/objective_c_generator.cc +++ b/src/compiler/objective_c_generator.cc @@ -94,7 +94,7 @@ void PrintSimpleSignature(Printer *printer, const MethodDescriptor *method, void PrintAdvancedSignature(Printer *printer, const MethodDescriptor *method, map< ::grpc::string, ::grpc::string> vars) { vars["method_name"] = "RPCTo" + vars["method_name"]; - vars["return_type"] = "ProtoRPC *"; + vars["return_type"] = "GRPCProtoRPC *"; PrintMethodSignature(printer, method, vars); } @@ -199,7 +199,7 @@ ::grpc::string GetHeader(const ServiceDescriptor *service) { " marshalling and parsing.\n"); printer.Print(vars, "@interface $service_class$ :" - " ProtoService<$service_class$>\n"); + " GRPCProtoService<$service_class$>\n"); printer.Print( "- (instancetype)initWithHost:(NSString *)host" " NS_DESIGNATED_INITIALIZER;\n"); diff --git a/src/objective-c/ProtoRPC/ProtoMethod.h b/src/objective-c/ProtoRPC/ProtoMethod.h index a0ed2cf98a5e2..bd1a848c19abb 100644 --- a/src/objective-c/ProtoRPC/ProtoMethod.h +++ b/src/objective-c/ProtoRPC/ProtoMethod.h @@ -37,6 +37,7 @@ * A fully-qualified proto service method name. Full qualification is needed because a gRPC endpoint * can implement multiple services. */ +__attribute__((deprecated("Please use GRPCProtoMethod."))) @interface ProtoMethod : NSObject @property(nonatomic, readonly) NSString *package; @property(nonatomic, readonly) NSString *service; @@ -48,3 +49,11 @@ service:(NSString *)service method:(NSString *)method; @end + +/** + * This subclass is empty now. Eventually we'll remove ProtoService class + * to avoid potential naming conflict + */ +@interface GRPCProtoMethod : ProtoMethod + +@end diff --git a/src/objective-c/ProtoRPC/ProtoMethod.m b/src/objective-c/ProtoRPC/ProtoMethod.m index 4b7ed63123c01..e9978f38afe00 100644 --- a/src/objective-c/ProtoRPC/ProtoMethod.m +++ b/src/objective-c/ProtoRPC/ProtoMethod.m @@ -53,3 +53,7 @@ - (NSString *)HTTPPath { } } @end + +@implementation GRPCProtoMethod + +@end diff --git a/src/objective-c/ProtoRPC/ProtoRPC.h b/src/objective-c/ProtoRPC/ProtoRPC.h index bd926b732873e..55fefb0aa8857 100644 --- a/src/objective-c/ProtoRPC/ProtoRPC.h +++ b/src/objective-c/ProtoRPC/ProtoRPC.h @@ -36,13 +36,22 @@ #import "ProtoMethod.h" +__attribute__((deprecated("Please use GRPCProtoRPC."))) @interface ProtoRPC : GRPCCall - (instancetype)initWithHost:(NSString *)host - method:(ProtoMethod *)method + method:(GRPCProtoMethod *)method requestsWriter:(GRXWriter *)requestsWriter responseClass:(Class)responseClass responsesWriteable:(id)responsesWriteable NS_DESIGNATED_INITIALIZER; - (void)start; @end + +/** + * This subclass is empty now. Eventually we'll remove ProtoService class + * to avoid potential naming conflict + */ +@interface GRPCProtoRPC : ProtoRPC + +@end diff --git a/src/objective-c/ProtoRPC/ProtoRPC.m b/src/objective-c/ProtoRPC/ProtoRPC.m index 9bf66f347ac17..27c8b0eff7ec0 100644 --- a/src/objective-c/ProtoRPC/ProtoRPC.m +++ b/src/objective-c/ProtoRPC/ProtoRPC.m @@ -70,7 +70,7 @@ - (instancetype)initWithHost:(NSString *)host // Designated initializer - (instancetype)initWithHost:(NSString *)host - method:(ProtoMethod *)method + method:(GRPCProtoMethod *)method requestsWriter:(GRXWriter *)requestsWriter responseClass:(Class)responseClass responsesWriteable:(id)responsesWriteable { @@ -117,3 +117,7 @@ - (void)startWithWriteable:(id)writeable { _responseWriteable = nil; } @end + +@implementation GRPCProtoRPC + +@end diff --git a/src/objective-c/ProtoRPC/ProtoService.h b/src/objective-c/ProtoRPC/ProtoService.h index 2e8cb33696b99..edec8a4770f2c 100644 --- a/src/objective-c/ProtoRPC/ProtoService.h +++ b/src/objective-c/ProtoRPC/ProtoService.h @@ -33,17 +33,28 @@ #import -@class ProtoRPC; +@class GRPCProtoRPC; @protocol GRXWriteable; @class GRXWriter; + +__attribute__((deprecated("Please use GRPCProtoService."))) @interface ProtoService : NSObject - (instancetype)initWithHost:(NSString *)host packageName:(NSString *)packageName serviceName:(NSString *)serviceName NS_DESIGNATED_INITIALIZER; -- (ProtoRPC *)RPCToMethod:(NSString *)method +- (GRPCProtoRPC *)RPCToMethod:(NSString *)method requestsWriter:(GRXWriter *)requestsWriter responseClass:(Class)responseClass responsesWriteable:(id)responsesWriteable; @end + + +/** + * This subclass is empty now. Eventually we'll remove ProtoService class + * to avoid potential naming conflict + */ +@interface GRPCProtoService : ProtoService + +@end diff --git a/src/objective-c/ProtoRPC/ProtoService.m b/src/objective-c/ProtoRPC/ProtoService.m index fccc6aadc9cc8..597c3cf0fed22 100644 --- a/src/objective-c/ProtoRPC/ProtoService.m +++ b/src/objective-c/ProtoRPC/ProtoService.m @@ -79,3 +79,7 @@ - (ProtoRPC *)RPCToMethod:(NSString *)method responsesWriteable:responsesWriteable]; } @end + +@implementation GRPCProtoService + +@end From c524ec0121ae70c354fa507e11880a62511d7e20 Mon Sep 17 00:00:00 2001 From: Jan Tattermusch Date: Tue, 7 Jun 2016 16:52:53 -0700 Subject: [PATCH 10/20] make Metadata.Entry a class --- src/csharp/Grpc.Core/Metadata.cs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/csharp/Grpc.Core/Metadata.cs b/src/csharp/Grpc.Core/Metadata.cs index e982fa0c486ee..f73f720094a39 100644 --- a/src/csharp/Grpc.Core/Metadata.cs +++ b/src/csharp/Grpc.Core/Metadata.cs @@ -95,6 +95,7 @@ public int IndexOf(Metadata.Entry item) public void Insert(int index, Metadata.Entry item) { + GrpcPreconditions.CheckNotNull(item); CheckWriteable(); entries.Insert(index, item); } @@ -114,6 +115,7 @@ public Metadata.Entry this[int index] set { + GrpcPreconditions.CheckNotNull(value); CheckWriteable(); entries[index] = value; } @@ -121,6 +123,7 @@ public Metadata.Entry this[int index] public void Add(Metadata.Entry item) { + GrpcPreconditions.CheckNotNull(item); CheckWriteable(); entries.Add(item); } @@ -187,7 +190,7 @@ private void CheckWriteable() /// /// Metadata entry /// - public struct Entry + public class Entry { private static readonly Encoding Encoding = Encoding.ASCII; private static readonly Regex ValidKeyRegex = new Regex("^[a-z0-9_-]+$"); From 9a36e6c7cd4daca929d5d3457edd0060a93030ca Mon Sep 17 00:00:00 2001 From: Ken Payson Date: Tue, 7 Jun 2016 17:49:03 -0700 Subject: [PATCH 11/20] Changed Python Sync streaming qps to follow spec --- .../grpcio/tests/qps/benchmark_client.py | 63 +++++++++++++------ src/python/grpcio/tests/qps/client_runner.py | 5 +- .../run_tests/performance/scenario_config.py | 2 +- 3 files changed, 46 insertions(+), 24 deletions(-) diff --git a/src/python/grpcio/tests/qps/benchmark_client.py b/src/python/grpcio/tests/qps/benchmark_client.py index aac218ed81953..e2922347f98bf 100644 --- a/src/python/grpcio/tests/qps/benchmark_client.py +++ b/src/python/grpcio/tests/qps/benchmark_client.py @@ -82,6 +82,7 @@ def __init__(self, server, config, hist): self._response_callbacks = [] def add_response_callback(self, callback): + """callback will be invoked as callback(client, query_time)""" self._response_callbacks.append(callback) @abc.abstractmethod @@ -95,10 +96,10 @@ def start(self): def stop(self): pass - def _handle_response(self, query_time): + def _handle_response(self, client, query_time): self._hist.add(query_time * 1e9) # Report times in nanoseconds for callback in self._response_callbacks: - callback(query_time) + callback(client, query_time) class UnarySyncBenchmarkClient(BenchmarkClient): @@ -121,7 +122,7 @@ def _dispatch_request(self): start_time = time.time() self._stub.UnaryCall(self._request, _TIMEOUT) end_time = time.time() - self._handle_response(end_time - start_time) + self._handle_response(self, end_time - start_time) class UnaryAsyncBenchmarkClient(BenchmarkClient): @@ -136,19 +137,20 @@ def send_request(self): def _response_received(self, start_time, resp): resp.result() end_time = time.time() - self._handle_response(end_time - start_time) + self._handle_response(self, end_time - start_time) def stop(self): self._stub = None -class StreamingSyncBenchmarkClient(BenchmarkClient): +class _SyncStream(object): - def __init__(self, server, config, hist): - super(StreamingSyncBenchmarkClient, self).__init__(server, config, hist) + def __init__(self, stub, generic, request, handle_response): + self._stub = stub + self._generic = generic + self._request = request + self._handle_response = handle_response self._is_streaming = False - self._pool = futures.ThreadPoolExecutor(max_workers=1) - # Use a thread-safe queue to put requests on the stream self._request_queue = queue.Queue() self._send_time_queue = queue.Queue() @@ -157,15 +159,6 @@ def send_request(self): self._request_queue.put(self._request) def start(self): - self._is_streaming = True - self._pool.submit(self._request_stream) - - def stop(self): - self._is_streaming = False - self._pool.shutdown(wait=True) - self._stub = None - - def _request_stream(self): self._is_streaming = True if self._generic: stream_callable = self._stub.stream_stream( @@ -175,8 +168,11 @@ def _request_stream(self): response_stream = stream_callable(self._request_generator(), _TIMEOUT) for _ in response_stream: - end_time = time.time() - self._handle_response(end_time - self._send_time_queue.get_nowait()) + self._handle_response( + self, time.time() - self._send_time_queue.get_nowait()) + + def stop(self): + self._is_streaming = False def _request_generator(self): while self._is_streaming: @@ -185,3 +181,30 @@ def _request_generator(self): yield request except queue.Empty: pass + + +class StreamingSyncBenchmarkClient(BenchmarkClient): + + def __init__(self, server, config, hist): + super(StreamingSyncBenchmarkClient, self).__init__(server, config, hist) + self._pool = futures.ThreadPoolExecutor( + max_workers=config.outstanding_rpcs_per_channel) + self._streams = [_SyncStream(self._stub, self._generic, + self._request, self._handle_response) + for _ in xrange(config.outstanding_rpcs_per_channel)] + self._curr_stream = 0 + + def send_request(self): + # Use a round_robin scheduler to determine what stream to send on + self._streams[self._curr_stream].send_request() + self._curr_stream = (self._curr_stream + 1) % len(self._streams) + + def start(self): + for stream in self._streams: + self._pool.submit(stream.start) + + def stop(self): + for stream in self._streams: + stream.stop() + self._pool.shutdown(wait=True) + self._stub = None diff --git a/src/python/grpcio/tests/qps/client_runner.py b/src/python/grpcio/tests/qps/client_runner.py index 1ede7d2af1b67..2d1d981733bf0 100644 --- a/src/python/grpcio/tests/qps/client_runner.py +++ b/src/python/grpcio/tests/qps/client_runner.py @@ -98,7 +98,6 @@ def stop(self): self._client.stop() self._client = None - def _send_request(self, response_time): + def _send_request(self, client, response_time): if self._is_running: - self._client.send_request() - + client.send_request() diff --git a/tools/run_tests/performance/scenario_config.py b/tools/run_tests/performance/scenario_config.py index 81569e8b7a467..2d5130e1e8682 100644 --- a/tools/run_tests/performance/scenario_config.py +++ b/tools/run_tests/performance/scenario_config.py @@ -415,7 +415,7 @@ def scenarios(self): yield _ping_pong_scenario( 'python_protobuf_sync_streaming_qps_unconstrained', rpc_type='STREAMING', client_type='SYNC_CLIENT', server_type='SYNC_SERVER', - unconstrained_client='async') + unconstrained_client='sync') yield _ping_pong_scenario( 'python_to_cpp_protobuf_sync_unary_ping_pong', rpc_type='UNARY', From 5756c8fead48fb024fda5192c6c63a98a59573cf Mon Sep 17 00:00:00 2001 From: David Garcia Quintas Date: Tue, 7 Jun 2016 18:00:44 -0700 Subject: [PATCH 12/20] removed RANDOM and simplified req for compression --- src/proto/grpc/testing/messages.proto | 19 ++---- test/cpp/interop/interop_client.cc | 94 +++++++++++++-------------- test/cpp/interop/server_main.cc | 24 ++----- 3 files changed, 54 insertions(+), 83 deletions(-) diff --git a/src/proto/grpc/testing/messages.proto b/src/proto/grpc/testing/messages.proto index a063b470c7b2e..e1090156ab483 100644 --- a/src/proto/grpc/testing/messages.proto +++ b/src/proto/grpc/testing/messages.proto @@ -41,17 +41,6 @@ enum PayloadType { // Uncompressable binary format. UNCOMPRESSABLE = 1; - - // Randomly chosen from all other formats defined in this enum. - RANDOM = 2; -} - -// Compression algorithms -enum CompressionType { - // No compression - NONE = 0; - GZIP = 1; - DEFLATE = 2; } // A block of data, to simply increase gRPC message size. @@ -88,8 +77,8 @@ message SimpleRequest { // Whether SimpleResponse should include OAuth scope. bool fill_oauth_scope = 5; - // Compression algorithm to be used by the server for the response (stream) - CompressionType response_compression = 6; + // Whether to request the server to compress the response. + bool request_compressed_response = 6; // Whether server should return a given status EchoStatus response_status = 7; @@ -145,8 +134,8 @@ message StreamingOutputCallRequest { // Optional input payload sent along with the request. Payload payload = 3; - // Compression algorithm to be used by the server for the response (stream) - CompressionType response_compression = 6; + // Whether to request the server to compress the response. + bool request_compressed_response = 6; // Whether server should return a given status EchoStatus response_status = 7; diff --git a/test/cpp/interop/interop_client.cc b/test/cpp/interop/interop_client.cc index 189e4a8aabeb2..90e54fd3b65e2 100644 --- a/test/cpp/interop/interop_client.cc +++ b/test/cpp/interop/interop_client.cc @@ -67,30 +67,26 @@ const int kReceiveDelayMilliSeconds = 20; const int kLargeRequestSize = 271828; const int kLargeResponseSize = 314159; -CompressionType GetInteropCompressionTypeFromCompressionAlgorithm( - grpc_compression_algorithm algorithm) { - switch (algorithm) { - case GRPC_COMPRESS_NONE: - return CompressionType::NONE; - case GRPC_COMPRESS_GZIP: - return CompressionType::GZIP; - case GRPC_COMPRESS_DEFLATE: - return CompressionType::DEFLATE; - default: - GPR_ASSERT(false); - } -} - void NoopChecks(const InteropClientContextInspector& inspector, const SimpleRequest* request, const SimpleResponse* response) {} void CompressionChecks(const InteropClientContextInspector& inspector, const SimpleRequest* request, const SimpleResponse* response) { - GPR_ASSERT(request->response_compression() == - GetInteropCompressionTypeFromCompressionAlgorithm( - inspector.GetCallCompressionAlgorithm())); - if (request->response_compression() == NONE) { + const grpc_compression_algorithm received_compression = + inspector.GetCallCompressionAlgorithm(); + if (request->request_compressed_response() && + received_compression == GRPC_COMPRESS_NONE) { + if (request->request_compressed_response() && + received_compression == GRPC_COMPRESS_NONE) { + // Requested some compression, got NONE. This is an error. + gpr_log(GPR_ERROR, + "Failure: Requested compression but got uncompressed response " + "from server."); + abort(); + } + } + if (!request->request_compressed_response()) { GPR_ASSERT(!(inspector.GetMessageFlags() & GRPC_WRITE_INTERNAL_COMPRESS)); } else if (request->response_type() == PayloadType::COMPRESSABLE) { // requested compression and compressable response => results should always @@ -211,20 +207,22 @@ bool InteropClient::PerformLargeUnary(SimpleRequest* request, custom_checks_fn(inspector, request, response); // Payload related checks. - if (request->response_type() != PayloadType::RANDOM) { - GPR_ASSERT(response->payload().type() == request->response_type()); - } + GPR_ASSERT(response->payload().type() == request->response_type()); switch (response->payload().type()) { case PayloadType::COMPRESSABLE: GPR_ASSERT(response->payload().body() == grpc::string(kLargeResponseSize, '\0')); break; case PayloadType::UNCOMPRESSABLE: { - std::ifstream rnd_file(kRandomFile); - GPR_ASSERT(rnd_file.good()); - for (int i = 0; i < kLargeResponseSize; i++) { - GPR_ASSERT(response->payload().body()[i] == (char)rnd_file.get()); - } + // We don't really check anything: We can't assert that the payload is + // uncompressed because it's the server's prerogative to decide on that, + // and different implementations decide differently (ie, Java always + // compresses when requested to do so, whereas C core throws away the + // compressed payload if the output is larger than the input). + // In addition, we don't compare the actual random bytes received because + // asserting that data is sent/received properly isn't the purpose of this + // test. Moreover, different implementations are also free to use + // different sets of random bytes. } break; default: GPR_ASSERT(false); @@ -341,13 +339,13 @@ bool InteropClient::DoLargeUnary() { } bool InteropClient::DoLargeCompressedUnary() { - const CompressionType compression_types[] = {NONE, GZIP, DEFLATE}; - const PayloadType payload_types[] = {COMPRESSABLE, UNCOMPRESSABLE, RANDOM}; + const bool request_compression[] = {false, true}; + const PayloadType payload_types[] = {COMPRESSABLE, UNCOMPRESSABLE}; for (size_t i = 0; i < GPR_ARRAY_SIZE(payload_types); i++) { - for (size_t j = 0; j < GPR_ARRAY_SIZE(compression_types); j++) { + for (size_t j = 0; j < GPR_ARRAY_SIZE(request_compression); j++) { char* log_suffix; gpr_asprintf(&log_suffix, "(compression=%s; payload=%s)", - CompressionType_Name(compression_types[j]).c_str(), + request_compression[j] ? "true" : "false", PayloadType_Name(payload_types[i]).c_str()); gpr_log(GPR_DEBUG, "Sending a large compressed unary rpc %s.", @@ -355,7 +353,7 @@ bool InteropClient::DoLargeCompressedUnary() { SimpleRequest request; SimpleResponse response; request.set_response_type(payload_types[i]); - request.set_response_compression(compression_types[j]); + request.set_request_compressed_response(request_compression[j]); if (!PerformLargeUnary(&request, &response, CompressionChecks)) { gpr_log(GPR_ERROR, "Large compressed unary failed %s", log_suffix); @@ -452,23 +450,23 @@ bool InteropClient::DoResponseStreaming() { } bool InteropClient::DoResponseCompressedStreaming() { - const CompressionType compression_types[] = {NONE, GZIP, DEFLATE}; - const PayloadType payload_types[] = {COMPRESSABLE, UNCOMPRESSABLE, RANDOM}; + const bool request_compression[] = {false, true}; + const PayloadType payload_types[] = {COMPRESSABLE, UNCOMPRESSABLE}; for (size_t i = 0; i < GPR_ARRAY_SIZE(payload_types); i++) { - for (size_t j = 0; j < GPR_ARRAY_SIZE(compression_types); j++) { + for (size_t j = 0; j < GPR_ARRAY_SIZE(request_compression); j++) { ClientContext context; InteropClientContextInspector inspector(context); StreamingOutputCallRequest request; char* log_suffix; gpr_asprintf(&log_suffix, "(compression=%s; payload=%s)", - CompressionType_Name(compression_types[j]).c_str(), + request_compression[j] ? "true" : "false", PayloadType_Name(payload_types[i]).c_str()); gpr_log(GPR_DEBUG, "Receiving response streaming rpc %s.", log_suffix); request.set_response_type(payload_types[i]); - request.set_response_compression(compression_types[j]); + request.set_request_compressed_response(request_compression[j]); for (size_t k = 0; k < response_stream_sizes.size(); ++k) { ResponseParameters* response_parameter = @@ -483,9 +481,7 @@ bool InteropClient::DoResponseCompressedStreaming() { size_t k = 0; while (stream->Read(&response)) { // Payload related checks. - if (request.response_type() != PayloadType::RANDOM) { - GPR_ASSERT(response.payload().type() == request.response_type()); - } + GPR_ASSERT(response.payload().type() == request.response_type()); switch (response.payload().type()) { case PayloadType::COMPRESSABLE: GPR_ASSERT(response.payload().body() == @@ -503,17 +499,19 @@ bool InteropClient::DoResponseCompressedStreaming() { } // Compression related checks. - GPR_ASSERT(request.response_compression() == - GetInteropCompressionTypeFromCompressionAlgorithm( - inspector.GetCallCompressionAlgorithm())); - if (request.response_compression() == NONE) { + if (request.request_compressed_response()) { + GPR_ASSERT(inspector.GetCallCompressionAlgorithm() > + GRPC_COMPRESS_NONE); + if (request.response_type() == PayloadType::COMPRESSABLE) { + // requested compression and compressable response => results should + // always be compressed. + GPR_ASSERT(inspector.GetMessageFlags() & + GRPC_WRITE_INTERNAL_COMPRESS); + } + } else { + // requested *no* compression. GPR_ASSERT( !(inspector.GetMessageFlags() & GRPC_WRITE_INTERNAL_COMPRESS)); - } else if (request.response_type() == PayloadType::COMPRESSABLE) { - // requested compression and compressable response => results should - // always be compressed. - GPR_ASSERT(inspector.GetMessageFlags() & - GRPC_WRITE_INTERNAL_COMPRESS); } ++k; diff --git a/test/cpp/interop/server_main.cc b/test/cpp/interop/server_main.cc index 889874fe493ae..bbedda14d2562 100644 --- a/test/cpp/interop/server_main.cc +++ b/test/cpp/interop/server_main.cc @@ -110,14 +110,7 @@ void MaybeEchoMetadata(ServerContext* context) { } } -bool SetPayload(PayloadType type, int size, Payload* payload) { - PayloadType response_type; - if (type == PayloadType::RANDOM) { - response_type = - rand() & 0x1 ? PayloadType::COMPRESSABLE : PayloadType::UNCOMPRESSABLE; - } else { - response_type = type; - } +bool SetPayload(PayloadType response_type, int size, Payload* payload) { payload->set_type(response_type); switch (response_type) { case PayloadType::COMPRESSABLE: { @@ -141,18 +134,9 @@ bool SetPayload(PayloadType type, int size, Payload* payload) { template void SetResponseCompression(ServerContext* context, const RequestType& request) { - switch (request.response_compression()) { - case grpc::testing::NONE: - context->set_compression_algorithm(GRPC_COMPRESS_NONE); - break; - case grpc::testing::GZIP: - context->set_compression_algorithm(GRPC_COMPRESS_GZIP); - break; - case grpc::testing::DEFLATE: - context->set_compression_algorithm(GRPC_COMPRESS_DEFLATE); - break; - default: - abort(); + if (request.request_compressed_response()) { + // Any level would do, let's go for HIGH because we are overachievers. + context->set_compression_level(GRPC_COMPRESS_LEVEL_HIGH); } } From d3ce0218d15c5b19a75fff6d6c8cc5af2066266d Mon Sep 17 00:00:00 2001 From: David Garcia Quintas Date: Tue, 7 Jun 2016 18:45:21 -0700 Subject: [PATCH 13/20] disabled bogus checks for streaming compressed --- test/cpp/interop/interop_client.cc | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/test/cpp/interop/interop_client.cc b/test/cpp/interop/interop_client.cc index 90e54fd3b65e2..e67cb5ac9d4a2 100644 --- a/test/cpp/interop/interop_client.cc +++ b/test/cpp/interop/interop_client.cc @@ -488,12 +488,7 @@ bool InteropClient::DoResponseCompressedStreaming() { grpc::string(response_stream_sizes[k], '\0')); break; case PayloadType::UNCOMPRESSABLE: { - std::ifstream rnd_file(kRandomFile); - GPR_ASSERT(rnd_file.good()); - for (int n = 0; n < response_stream_sizes[k]; n++) { - GPR_ASSERT(response.payload().body()[n] == (char)rnd_file.get()); - } - } break; + break; default: GPR_ASSERT(false); } From 393ca51923e8274c4d0a3b09c021b4cd2482ff4c Mon Sep 17 00:00:00 2001 From: David Garcia Quintas Date: Tue, 7 Jun 2016 18:45:36 -0700 Subject: [PATCH 14/20] updated interop spec --- doc/interop-test-descriptions.md | 127 +++++++++---------------------- 1 file changed, 35 insertions(+), 92 deletions(-) diff --git a/doc/interop-test-descriptions.md b/doc/interop-test-descriptions.md index 6297b5cc3ef48..7fd21c7022ed6 100644 --- a/doc/interop-test-descriptions.md +++ b/doc/interop-test-descriptions.md @@ -93,26 +93,24 @@ Client asserts: ### large_compressed_unary This test verifies compressed unary calls succeed in sending messages. It -sends one unary request for every combination of compression algorithm and -payload type. +sends one unary request for every payload type, with and without requesting a +compressed response from the server. In all scenarios, whether compression was actually performed is determined by -the compression bit in the response's message flags. The response's compression -value indicates which algorithm was used if said compression bit is set. +the compression bit in the response's message flags. Server features: * [UnaryCall][] * [Compressable Payload][] * [Uncompressable Payload][] -* [Random Payload][] Procedure: 1. Client calls UnaryCall with: ``` { - response_compression: + request_compressed_response: bool response_type: COMPRESSABLE response_size: 314159 payload:{ @@ -123,11 +121,10 @@ Procedure: Client asserts: * call was successful * response payload type is COMPRESSABLE - * response compression is consistent with the requested one. - * if `response_compression == NONE`, the response MUST NOT have the + * if `request_compressed_response` is false, the response MUST NOT have the + compressed message flag set. + * if `request_compressed_response` is true, the response MUST have the compressed message flag set. - * if `response_compression != NONE`, the response MUST have the compressed - message flag set. * response payload body is 314159 bytes in size * clients are free to assert that the response payload body contents are zero and comparing the entire response message against a golden response @@ -136,7 +133,7 @@ Procedure: 2. Client calls UnaryCall with: ``` { - response_compression: + request_compressed_response: bool response_type: UNCOMPRESSABLE response_size: 314159 payload:{ @@ -147,29 +144,11 @@ Procedure: Client asserts: * call was successful * response payload type is UNCOMPRESSABLE - * response compression is consistent with the requested one. - * the response MUST NOT have the compressed message flag set. + * the response MAY have the compressed message flag set. Some + implementations will choose to compress the payload even when the output + size if larger than the input. * response payload body is 314159 bytes in size - * clients are free to assert that the response payload body contents are - identical to the golden uncompressable data at `test/cpp/interop/rnd.dat`. - - 3. Client calls UnaryCall with: - ``` - { - response_compression: - response_type: RANDOM - response_size: 314159 - payload:{ - body: 271828 bytes of zeros - } - } - ``` - Client asserts: - * call was successful - * response payload type is either COMPRESSABLE or UNCOMPRESSABLE - * the behavior is consistent with the randomly chosen incoming payload type, - as described in their respective sections. ### client_streaming @@ -245,7 +224,7 @@ Procedure: size: 31415 } response_parameters:{ - size: 9 + size: 59 } response_parameters:{ size: 2653 @@ -272,7 +251,6 @@ Server features: * [StreamingOutputCall][] * [Compressable Payload][] * [Uncompressable Payload][] -* [Random Payload][] Procedure: @@ -280,13 +258,13 @@ Procedure: ``` { - response_compression: + request_compressed_response: bool response_type:COMPRESSABLE response_parameters:{ size: 31415 } response_parameters:{ - size: 9 + size: 59 } response_parameters:{ size: 2653 @@ -301,12 +279,11 @@ Procedure: * call was successful * exactly four responses * response payloads are COMPRESSABLE - * response compression is consistent with the requested one. - * if `response_compression == NONE`, the response MUST NOT have the - compressed message flag set. - * if `response_compression != NONE`, the response MUST have the compressed - message flag set. - * response payload bodies are sized (in order): 31415, 9, 2653, 58979 + * if `request_compressed_response` is false, the response's messages MUST + NOT have the compressed message flag set. + * if `request_compressed_response` is true, the response's messages MUST + have the compressed message flag set. + * response payload bodies are sized (in order): 31415, 59, 2653, 58979 * clients are free to assert that the response payload body contents are zero and comparing the entire response messages against golden responses @@ -315,13 +292,13 @@ Procedure: ``` { - response_compression: + request_compressed_response: bool response_type:UNCOMPRESSABLE response_parameters:{ size: 31415 } response_parameters:{ - size: 9 + size: 59 } response_parameters:{ size: 2653 @@ -336,40 +313,14 @@ Procedure: * call was successful * exactly four responses * response payloads are UNCOMPRESSABLE - * response compressions are consistent with the requested one. - * the responses MUST NOT have the compressed message flag set. - * response payload bodies are sized (in order): 31415, 9, 2653, 58979 + * the response MAY have the compressed message flag set. Some + implementations will choose to compress the payload even when the output + size if larger than the input. + * response payload bodies are sized (in order): 31415, 59, 2653, 58979 * clients are free to assert that the body of the responses are identical to the golden uncompressable data at `test/cpp/interop/rnd.dat`. - 3. Client calls StreamingOutputCall with: - - ``` - { - response_compression: - response_type:RANDOM - response_parameters:{ - size: 31415 - } - response_parameters:{ - size: 9 - } - response_parameters:{ - size: 2653 - } - response_parameters:{ - size: 58979 - } - } - ``` - - Client asserts: - * call was successful - * response payload type is either COMPRESSABLE or UNCOMPRESSABLE - * the behavior is consistent with the randomly chosen incoming payload type, - as described in their respective sections. - ### ping_pong This test verifies that full duplex bidi is supported. @@ -399,7 +350,7 @@ Procedure: { response_type: COMPRESSABLE response_parameters:{ - size: 9 + size: 59 } payload:{ body: 8 bytes of zeros @@ -932,9 +883,9 @@ Server implements EmptyCall which immediately returns the empty message. [UnaryCall]: #unarycall Server implements UnaryCall which immediately returns a SimpleResponse with a -payload body of size SimpleRequest.response_size bytes and type as appropriate -for the SimpleRequest.response_type. If the server does not support the -response_type, then it should fail the RPC with INVALID_ARGUMENT. +payload body of size `SimpleRequest.response_size` bytes and type as appropriate +for the `SimpleRequest.response_type`. If the server does not support the +`response_type`, then it should fail the RPC with `INVALID_ARGUMENT`. ### StreamingInputCall [StreamingInputCall]: #streaminginputcall @@ -974,15 +925,7 @@ COMPRESSABLE. When the client requests UNCOMPRESSABLE payload, the response includes a payload of the size requested containing uncompressable data and the payload type is -UNCOMPRESSABLE. A 512 kB dump from /dev/urandom is the current golden data, -stored at `test/cpp/interop/rnd.dat` - -### Random Payload -[Random Payload]: #random-payload - -When the client requests RANDOM payload, the response includes either a randomly -chosen COMPRESSABLE or UNCOMPRESSABLE payload. The data and the payload type -will be consistent with this choice. +UNCOMPRESSABLE. ### Echo Status [Echo Status]: #echo-status @@ -1004,8 +947,8 @@ key and the corresponding value back to the client as trailing metadata. [Observe ResponseParameters.interval_us]: #observe-responseparametersinterval_us In StreamingOutputCall and FullDuplexCall, server delays sending a -StreamingOutputCallResponse by the ResponseParameters's interval_us for that -particular response, relative to the last response sent. That is, interval_us +StreamingOutputCallResponse by the ResponseParameters's `interval_us` for that +particular response, relative to the last response sent. That is, `interval_us` acts like a sleep *before* sending the response and accumulates from one response to the next. @@ -1027,13 +970,13 @@ an email address. #### Echo OAuth scope [Echo OAuth Scope]: #echo-oauth-scope -If a SimpleRequest has fill_oauth_scope=true and that request was successfully +If a SimpleRequest has `fill_oauth_scope=true` and that request was successfully authenticated via OAuth, then the SimpleResponse should have oauth_scope filled with the scope of the method being invoked. Although a general server-side feature, most test servers won't implement this -feature. The TLS server grpc-test.sandbox.googleapis.com:443 supports this feature. -It requires at least the OAuth scope +feature. The TLS server `grpc-test.sandbox.googleapis.com:443` supports this +feature. It requires at least the OAuth scope `https://www.googleapis.com/auth/xapi.zoo` for authentication to succeed. Discussion: From 22a65e1a2b7c1782bc288d9e8af1c0c9f0f90795 Mon Sep 17 00:00:00 2001 From: Ken Payson Date: Tue, 7 Jun 2016 19:06:05 -0700 Subject: [PATCH 15/20] Added python jwt_token_creds interop test --- setup.py | 2 +- src/python/grpcio/grpc/_auth.py | 15 ++++++++++++++- src/python/grpcio/tests/interop/client.py | 3 +++ src/python/grpcio/tests/interop/methods.py | 13 +++++++++++++ tools/run_tests/run_interop_tests.py | 2 +- 5 files changed, 32 insertions(+), 3 deletions(-) diff --git a/setup.py b/setup.py index f96824fa8834f..0e2646d5d2fdf 100644 --- a/setup.py +++ b/setup.py @@ -202,7 +202,7 @@ def cython_extensions(module_names, extra_sources, include_dirs, } TESTS_REQUIRE = ( - 'oauth2client>=1.4.7', + 'oauth2client>=2.1.0', 'protobuf>=3.0.0a3', 'coverage>=4.0', ) + INSTALL_REQUIRES diff --git a/src/python/grpcio/grpc/_auth.py b/src/python/grpcio/grpc/_auth.py index 3ae00ca23a7f5..dea3221c9d897 100644 --- a/src/python/grpcio/grpc/_auth.py +++ b/src/python/grpcio/grpc/_auth.py @@ -29,6 +29,7 @@ """GRPCAuthMetadataPlugins for standard authentication.""" +import inspect from concurrent import futures import grpc @@ -46,9 +47,21 @@ def __init__(self, credentials): self._credentials = credentials self._pool = futures.ThreadPoolExecutor(max_workers=1) + # Hack to determine if these are JWT creds and we need to pass + # additional_claims when getting a token + if 'additional_claims' in inspect.getargspec( + credentials.get_access_token).args: + self._is_jwt = True + else: + self._is_jwt = False + def __call__(self, context, callback): # MetadataPlugins cannot block (see grpc.beta.interfaces.py) - future = self._pool.submit(self._credentials.get_access_token) + if self._is_jwt: + future = self._pool.submit(self._credentials.get_access_token, + additional_claims={'aud': context.service_url}) + else: + future = self._pool.submit(self._credentials.get_access_token) future.add_done_callback(lambda x: self._get_token_callback(callback, x)) def _get_token_callback(self, callback, future): diff --git a/src/python/grpcio/tests/interop/client.py b/src/python/grpcio/tests/interop/client.py index e3d5545a020c5..8aa1ce30c1a82 100644 --- a/src/python/grpcio/tests/interop/client.py +++ b/src/python/grpcio/tests/interop/client.py @@ -76,6 +76,9 @@ def _stub(args): creds = oauth2client_client.GoogleCredentials.get_application_default() scoped_creds = creds.create_scoped([args.oauth_scope]) call_creds = implementations.google_call_credentials(scoped_creds) + elif args.test_case == 'jwt_token_creds': + creds = oauth2client_client.GoogleCredentials.get_application_default() + call_creds = implementations.google_call_credentials(creds) else: call_creds = None if args.use_tls: diff --git a/src/python/grpcio/tests/interop/methods.py b/src/python/grpcio/tests/interop/methods.py index d5ef0c68bb9ad..7eac5115258d2 100644 --- a/src/python/grpcio/tests/interop/methods.py +++ b/src/python/grpcio/tests/interop/methods.py @@ -310,6 +310,16 @@ def _oauth2_auth_token(stub, args): (response.oauth_scope, args.oauth_scope)) +def _jwt_token_creds(stub, args): + json_key_filename = os.environ[ + oauth2client_client.GOOGLE_APPLICATION_CREDENTIALS] + wanted_email = json.load(open(json_key_filename, 'rb'))['client_email'] + response = _large_unary_common_behavior(stub, True, False) + if wanted_email != response.username: + raise ValueError( + 'expected username %s, got %s' % (wanted_email, response.username)) + + def _per_rpc_creds(stub, args): json_key_filename = os.environ[ oauth2client_client.GOOGLE_APPLICATION_CREDENTIALS] @@ -338,6 +348,7 @@ class TestCase(enum.Enum): EMPTY_STREAM = 'empty_stream' COMPUTE_ENGINE_CREDS = 'compute_engine_creds' OAUTH2_AUTH_TOKEN = 'oauth2_auth_token' + JWT_TOKEN_CREDS = 'jwt_token_creds' PER_RPC_CREDS = 'per_rpc_creds' TIMEOUT_ON_SLEEPING_SERVER = 'timeout_on_sleeping_server' @@ -364,6 +375,8 @@ def test_interoperability(self, stub, args): _compute_engine_creds(stub, args) elif self is TestCase.OAUTH2_AUTH_TOKEN: _oauth2_auth_token(stub, args) + elif self is TestCase.JWT_TOKEN_CREDS: + _jwt_token_creds(stub, args) elif self is TestCase.PER_RPC_CREDS: _per_rpc_creds(stub, args) else: diff --git a/tools/run_tests/run_interop_tests.py b/tools/run_tests/run_interop_tests.py index 053aabc9b5b80..5aaefb1ae144d 100755 --- a/tools/run_tests/run_interop_tests.py +++ b/tools/run_tests/run_interop_tests.py @@ -317,7 +317,7 @@ def global_env(self): 'PYTHONPATH': '{}/src/python/gens'.format(DOCKER_WORKDIR_ROOT)} def unimplemented_test_cases(self): - return _SKIP_ADVANCED + _SKIP_COMPRESSION + ['jwt_token_creds'] + return _SKIP_ADVANCED + _SKIP_COMPRESSION def unimplemented_test_cases_server(self): return _SKIP_ADVANCED + _SKIP_COMPRESSION From cf41d19a9e1c7edf77fb0362824214dc9308a075 Mon Sep 17 00:00:00 2001 From: David Garcia Quintas Date: Tue, 7 Jun 2016 21:05:43 -0700 Subject: [PATCH 16/20] clang-format --- include/grpc++/impl/grpc_library.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/include/grpc++/impl/grpc_library.h b/include/grpc++/impl/grpc_library.h index aaa9e4c8a5479..1184d1bf09356 100644 --- a/include/grpc++/impl/grpc_library.h +++ b/include/grpc++/impl/grpc_library.h @@ -37,8 +37,8 @@ #include #include -#include #include +#include #include namespace grpc { From 126ae106d61ca7b8d28b7c9da86ecaf5f2034748 Mon Sep 17 00:00:00 2001 From: David Garcia Quintas Date: Tue, 7 Jun 2016 21:15:05 -0700 Subject: [PATCH 17/20] fixed silly typo --- test/cpp/interop/interop_client.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/cpp/interop/interop_client.cc b/test/cpp/interop/interop_client.cc index e67cb5ac9d4a2..537fa317dace7 100644 --- a/test/cpp/interop/interop_client.cc +++ b/test/cpp/interop/interop_client.cc @@ -487,7 +487,7 @@ bool InteropClient::DoResponseCompressedStreaming() { GPR_ASSERT(response.payload().body() == grpc::string(response_stream_sizes[k], '\0')); break; - case PayloadType::UNCOMPRESSABLE: { + case PayloadType::UNCOMPRESSABLE: break; default: GPR_ASSERT(false); From 20d802db98fc8aba78169567415b003a85b55b84 Mon Sep 17 00:00:00 2001 From: David Garcia Quintas Date: Tue, 7 Jun 2016 22:04:10 -0700 Subject: [PATCH 18/20] removed unused vble --- test/cpp/interop/interop_client.cc | 2 -- 1 file changed, 2 deletions(-) diff --git a/test/cpp/interop/interop_client.cc b/test/cpp/interop/interop_client.cc index 537fa317dace7..a0479e8f689df 100644 --- a/test/cpp/interop/interop_client.cc +++ b/test/cpp/interop/interop_client.cc @@ -55,8 +55,6 @@ namespace grpc { namespace testing { -static const char* kRandomFile = "test/cpp/interop/rnd.dat"; - namespace { // The same value is defined by the Java client. const std::vector request_stream_sizes = {27182, 8, 1828, 45904}; From 080749e8ba5707d4c927baf92af4e82ce1718a7b Mon Sep 17 00:00:00 2001 From: "Matthew D. Steele" Date: Tue, 7 Jun 2016 13:55:04 -0400 Subject: [PATCH 19/20] Add interfaces for ServerAsyncReader/Writers --- include/grpc++/impl/codegen/async_stream.h | 43 ++++++++++++++----- .../grpc++/impl/codegen/impl/async_stream.h | 43 ++++++++++++++----- 2 files changed, 64 insertions(+), 22 deletions(-) diff --git a/include/grpc++/impl/codegen/async_stream.h b/include/grpc++/impl/codegen/async_stream.h index a607a4710605f..70a8b39312043 100644 --- a/include/grpc++/impl/codegen/async_stream.h +++ b/include/grpc++/impl/codegen/async_stream.h @@ -298,8 +298,16 @@ class ClientAsyncReaderWriter GRPC_FINAL }; template -class ServerAsyncReader GRPC_FINAL : public ServerAsyncStreamingInterface, - public AsyncReaderInterface { +class ServerAsyncReaderInterface : public ServerAsyncStreamingInterface, + public AsyncReaderInterface { + public: + virtual void Finish(const W& msg, const Status& status, void* tag) = 0; + + virtual void FinishWithError(const Status& status, void* tag) = 0; +}; + +template +class ServerAsyncReader GRPC_FINAL : public ServerAsyncReaderInterface { public: explicit ServerAsyncReader(ServerContext* ctx) : call_(nullptr, nullptr, nullptr), ctx_(ctx) {} @@ -320,7 +328,7 @@ class ServerAsyncReader GRPC_FINAL : public ServerAsyncStreamingInterface, call_.PerformOps(&read_ops_); } - void Finish(const W& msg, const Status& status, void* tag) { + void Finish(const W& msg, const Status& status, void* tag) GRPC_OVERRIDE { finish_ops_.set_output_tag(tag); if (!ctx_->sent_initial_metadata_) { finish_ops_.SendInitialMetadata(ctx_->initial_metadata_, @@ -337,7 +345,7 @@ class ServerAsyncReader GRPC_FINAL : public ServerAsyncStreamingInterface, call_.PerformOps(&finish_ops_); } - void FinishWithError(const Status& status, void* tag) { + void FinishWithError(const Status& status, void* tag) GRPC_OVERRIDE { GPR_CODEGEN_ASSERT(!status.ok()); finish_ops_.set_output_tag(tag); if (!ctx_->sent_initial_metadata_) { @@ -362,8 +370,14 @@ class ServerAsyncReader GRPC_FINAL : public ServerAsyncStreamingInterface, }; template -class ServerAsyncWriter GRPC_FINAL : public ServerAsyncStreamingInterface, - public AsyncWriterInterface { +class ServerAsyncWriterInterface : public ServerAsyncStreamingInterface, + public AsyncWriterInterface { + public: + virtual void Finish(const Status& status, void* tag) = 0; +}; + +template +class ServerAsyncWriter GRPC_FINAL : public ServerAsyncWriterInterface { public: explicit ServerAsyncWriter(ServerContext* ctx) : call_(nullptr, nullptr, nullptr), ctx_(ctx) {} @@ -390,7 +404,7 @@ class ServerAsyncWriter GRPC_FINAL : public ServerAsyncStreamingInterface, call_.PerformOps(&write_ops_); } - void Finish(const Status& status, void* tag) { + void Finish(const Status& status, void* tag) GRPC_OVERRIDE { finish_ops_.set_output_tag(tag); if (!ctx_->sent_initial_metadata_) { finish_ops_.SendInitialMetadata(ctx_->initial_metadata_, @@ -413,9 +427,16 @@ class ServerAsyncWriter GRPC_FINAL : public ServerAsyncStreamingInterface, /// Server-side interface for asynchronous bi-directional streaming. template -class ServerAsyncReaderWriter GRPC_FINAL : public ServerAsyncStreamingInterface, - public AsyncWriterInterface, - public AsyncReaderInterface { +class ServerAsyncReaderWriterInterface : public ServerAsyncStreamingInterface, + public AsyncWriterInterface, + public AsyncReaderInterface { + public: + virtual void Finish(const Status& status, void* tag) = 0; +}; + +template +class ServerAsyncReaderWriter GRPC_FINAL + : public ServerAsyncReaderWriterInterface { public: explicit ServerAsyncReaderWriter(ServerContext* ctx) : call_(nullptr, nullptr, nullptr), ctx_(ctx) {} @@ -448,7 +469,7 @@ class ServerAsyncReaderWriter GRPC_FINAL : public ServerAsyncStreamingInterface, call_.PerformOps(&write_ops_); } - void Finish(const Status& status, void* tag) { + void Finish(const Status& status, void* tag) GRPC_OVERRIDE { finish_ops_.set_output_tag(tag); if (!ctx_->sent_initial_metadata_) { finish_ops_.SendInitialMetadata(ctx_->initial_metadata_, diff --git a/include/grpc++/impl/codegen/impl/async_stream.h b/include/grpc++/impl/codegen/impl/async_stream.h index 8f99e7eea43f6..7d7a9568077a3 100644 --- a/include/grpc++/impl/codegen/impl/async_stream.h +++ b/include/grpc++/impl/codegen/impl/async_stream.h @@ -295,8 +295,16 @@ class ClientAsyncReaderWriter GRPC_FINAL }; template -class ServerAsyncReader GRPC_FINAL : public ServerAsyncStreamingInterface, - public AsyncReaderInterface { +class ServerAsyncReaderInterface : public ServerAsyncStreamingInterface, + public AsyncReaderInterface { + public: + virtual void Finish(const W& msg, const Status& status, void* tag) = 0; + + virtual void FinishWithError(const Status& status, void* tag) = 0; +}; + +template +class ServerAsyncReader GRPC_FINAL : public ServerAsyncReaderInterface { public: explicit ServerAsyncReader(ServerContext* ctx) : call_(nullptr, nullptr, nullptr), ctx_(ctx) {} @@ -316,7 +324,7 @@ class ServerAsyncReader GRPC_FINAL : public ServerAsyncStreamingInterface, call_.PerformOps(&read_ops_); } - void Finish(const W& msg, const Status& status, void* tag) { + void Finish(const W& msg, const Status& status, void* tag) GRPC_OVERRIDE { finish_ops_.set_output_tag(tag); if (!ctx_->sent_initial_metadata_) { finish_ops_.SendInitialMetadata(ctx_->initial_metadata_); @@ -332,7 +340,7 @@ class ServerAsyncReader GRPC_FINAL : public ServerAsyncStreamingInterface, call_.PerformOps(&finish_ops_); } - void FinishWithError(const Status& status, void* tag) { + void FinishWithError(const Status& status, void* tag) GRPC_OVERRIDE { GPR_CODEGEN_ASSERT(!status.ok()); finish_ops_.set_output_tag(tag); if (!ctx_->sent_initial_metadata_) { @@ -356,8 +364,14 @@ class ServerAsyncReader GRPC_FINAL : public ServerAsyncStreamingInterface, }; template -class ServerAsyncWriter GRPC_FINAL : public ServerAsyncStreamingInterface, - public AsyncWriterInterface { +class ServerAsyncWriterInterface : public ServerAsyncStreamingInterface, + public AsyncWriterInterface { + public: + virtual void Finish(const Status& status, void* tag) = 0; +}; + +template +class ServerAsyncWriter GRPC_FINAL : public ServerAsyncWriterInterface { public: explicit ServerAsyncWriter(ServerContext* ctx) : call_(nullptr, nullptr, nullptr), ctx_(ctx) {} @@ -382,7 +396,7 @@ class ServerAsyncWriter GRPC_FINAL : public ServerAsyncStreamingInterface, call_.PerformOps(&write_ops_); } - void Finish(const Status& status, void* tag) { + void Finish(const Status& status, void* tag) GRPC_OVERRIDE { finish_ops_.set_output_tag(tag); if (!ctx_->sent_initial_metadata_) { finish_ops_.SendInitialMetadata(ctx_->initial_metadata_); @@ -404,9 +418,16 @@ class ServerAsyncWriter GRPC_FINAL : public ServerAsyncStreamingInterface, /// Server-side interface for asynchronous bi-directional streaming. template -class ServerAsyncReaderWriter GRPC_FINAL : public ServerAsyncStreamingInterface, - public AsyncWriterInterface, - public AsyncReaderInterface { +class ServerAsyncReaderWriterInterface : public ServerAsyncStreamingInterface, + public AsyncWriterInterface, + public AsyncReaderInterface { + public: + virtual void Finish(const Status& status, void* tag) = 0; +}; + +template +class ServerAsyncReaderWriter GRPC_FINAL + : public ServerAsyncReaderWriterInterface { public: explicit ServerAsyncReaderWriter(ServerContext* ctx) : call_(nullptr, nullptr, nullptr), ctx_(ctx) {} @@ -437,7 +458,7 @@ class ServerAsyncReaderWriter GRPC_FINAL : public ServerAsyncStreamingInterface, call_.PerformOps(&write_ops_); } - void Finish(const Status& status, void* tag) { + void Finish(const Status& status, void* tag) GRPC_OVERRIDE { finish_ops_.set_output_tag(tag); if (!ctx_->sent_initial_metadata_) { finish_ops_.SendInitialMetadata(ctx_->initial_metadata_); From 4f11ab1ffd20a3a0cf2c33a563fcd5437733c189 Mon Sep 17 00:00:00 2001 From: Makarand Dharmapurikar Date: Wed, 8 Jun 2016 10:40:00 -0700 Subject: [PATCH 20/20] addressed feedback. --- src/compiler/objective_c_generator.cc | 2 +- src/objective-c/ProtoRPC/ProtoMethod.h | 2 +- src/objective-c/ProtoRPC/ProtoRPC.h | 6 +++--- src/objective-c/ProtoRPC/ProtoRPC.m | 2 +- src/objective-c/ProtoRPC/ProtoService.h | 4 ++-- 5 files changed, 8 insertions(+), 8 deletions(-) diff --git a/src/compiler/objective_c_generator.cc b/src/compiler/objective_c_generator.cc index 75665cd5c0d46..71a674174d783 100644 --- a/src/compiler/objective_c_generator.cc +++ b/src/compiler/objective_c_generator.cc @@ -94,7 +94,7 @@ void PrintSimpleSignature(Printer *printer, const MethodDescriptor *method, void PrintAdvancedSignature(Printer *printer, const MethodDescriptor *method, map< ::grpc::string, ::grpc::string> vars) { vars["method_name"] = "RPCTo" + vars["method_name"]; - vars["return_type"] = "GRPCProtoRPC *"; + vars["return_type"] = "GRPCProtoCall *"; PrintMethodSignature(printer, method, vars); } diff --git a/src/objective-c/ProtoRPC/ProtoMethod.h b/src/objective-c/ProtoRPC/ProtoMethod.h index bd1a848c19abb..f9fdbb35ffdee 100644 --- a/src/objective-c/ProtoRPC/ProtoMethod.h +++ b/src/objective-c/ProtoRPC/ProtoMethod.h @@ -51,7 +51,7 @@ __attribute__((deprecated("Please use GRPCProtoMethod."))) @end /** - * This subclass is empty now. Eventually we'll remove ProtoService class + * This subclass is empty now. Eventually we'll remove ProtoMethod class * to avoid potential naming conflict */ @interface GRPCProtoMethod : ProtoMethod diff --git a/src/objective-c/ProtoRPC/ProtoRPC.h b/src/objective-c/ProtoRPC/ProtoRPC.h index 55fefb0aa8857..f22d731f68b27 100644 --- a/src/objective-c/ProtoRPC/ProtoRPC.h +++ b/src/objective-c/ProtoRPC/ProtoRPC.h @@ -36,7 +36,7 @@ #import "ProtoMethod.h" -__attribute__((deprecated("Please use GRPCProtoRPC."))) +__attribute__((deprecated("Please use GRPCProtoCall."))) @interface ProtoRPC : GRPCCall - (instancetype)initWithHost:(NSString *)host @@ -49,9 +49,9 @@ __attribute__((deprecated("Please use GRPCProtoRPC."))) @end /** - * This subclass is empty now. Eventually we'll remove ProtoService class + * This subclass is empty now. Eventually we'll remove ProtoRPC class * to avoid potential naming conflict */ -@interface GRPCProtoRPC : ProtoRPC +@interface GRPCProtoCall : ProtoRPC @end diff --git a/src/objective-c/ProtoRPC/ProtoRPC.m b/src/objective-c/ProtoRPC/ProtoRPC.m index 27c8b0eff7ec0..fb0b566f199eb 100644 --- a/src/objective-c/ProtoRPC/ProtoRPC.m +++ b/src/objective-c/ProtoRPC/ProtoRPC.m @@ -118,6 +118,6 @@ - (void)startWithWriteable:(id)writeable { } @end -@implementation GRPCProtoRPC +@implementation GRPCProtoCall @end diff --git a/src/objective-c/ProtoRPC/ProtoService.h b/src/objective-c/ProtoRPC/ProtoService.h index edec8a4770f2c..87d06e1ae5956 100644 --- a/src/objective-c/ProtoRPC/ProtoService.h +++ b/src/objective-c/ProtoRPC/ProtoService.h @@ -33,7 +33,7 @@ #import -@class GRPCProtoRPC; +@class GRPCProtoCall; @protocol GRXWriteable; @class GRXWriter; @@ -44,7 +44,7 @@ __attribute__((deprecated("Please use GRPCProtoService."))) packageName:(NSString *)packageName serviceName:(NSString *)serviceName NS_DESIGNATED_INITIALIZER; -- (GRPCProtoRPC *)RPCToMethod:(NSString *)method +- (GRPCProtoCall *)RPCToMethod:(NSString *)method requestsWriter:(GRXWriter *)requestsWriter responseClass:(Class)responseClass responsesWriteable:(id)responsesWriteable;