From 39816efa3b98341941980a7ff5ebd8cc042e28e4 Mon Sep 17 00:00:00 2001 From: Karthik Ravi Shankar Date: Fri, 19 Apr 2019 12:57:08 -0700 Subject: [PATCH 01/25] Test invoking a callback-based RPC under lock --- .../end2end/client_callback_end2end_test.cc | 28 +++++++++++++++++++ 1 file changed, 28 insertions(+) diff --git a/test/cpp/end2end/client_callback_end2end_test.cc b/test/cpp/end2end/client_callback_end2end_test.cc index 821fcc2da6d5b..da2ce85470308 100644 --- a/test/cpp/end2end/client_callback_end2end_test.cc +++ b/test/cpp/end2end/client_callback_end2end_test.cc @@ -374,6 +374,34 @@ TEST_P(ClientCallbackEnd2endTest, SimpleRpc) { SendRpcs(1, false); } +TEST_P(ClientCallbackEnd2endTest, SimpleRpcUnderLock) { + MAYBE_SKIP_TEST; + ResetStub(); + std::mutex mu; + std::condition_variable cv; + bool done; + EchoRequest request; + request.set_message("Hello locked world."); + EchoResponse response; + ClientContext cli_ctx; + { + std::lock_guard l(mu); + stub_->experimental_async()->Echo( + &cli_ctx, &request, &response, + [&mu, &cv, &done, &request, &response](Status s) { + std::lock_guard l(mu); + EXPECT_TRUE(s.ok()); + EXPECT_EQ(request.message(), response.message()); + done = true; + cv.notify_one(); + }); + } + std::unique_lock l(mu); + while (!done) { + cv.wait(l); + } +} + TEST_P(ClientCallbackEnd2endTest, SequentialRpcs) { MAYBE_SKIP_TEST; ResetStub(); From 72312daadf571adaef351a482a30f6afc1b94e2c Mon Sep 17 00:00:00 2001 From: Karthik Ravi Shankar Date: Fri, 19 Apr 2019 12:57:32 -0700 Subject: [PATCH 02/25] Move callback-based API to use Executors over ApplicationExecutionCtx --- src/core/lib/surface/completion_queue.cc | 21 ++++++++++++++++----- 1 file changed, 16 insertions(+), 5 deletions(-) diff --git a/src/core/lib/surface/completion_queue.cc b/src/core/lib/surface/completion_queue.cc index 7d679204bacaf..9f2a6b4783a6a 100644 --- a/src/core/lib/surface/completion_queue.cc +++ b/src/core/lib/surface/completion_queue.cc @@ -814,6 +814,11 @@ static void cq_end_op_for_pluck(grpc_completion_queue* cq, void* tag, GRPC_ERROR_UNREF(error); } +static void functor_callback(void* arg, grpc_error* error) { + auto* functor = static_cast(arg); + (*functor).functor_run(callback, error == GRPC_ERROR_NONE ? true : false); +} + /* Complete an event on a completion queue of type GRPC_CQ_CALLBACK */ static void cq_end_op_for_callback( grpc_completion_queue* cq, void* tag, grpc_error* error, @@ -822,7 +827,6 @@ static void cq_end_op_for_callback( GPR_TIMER_SCOPE("cq_end_op_for_callback", 0); cq_callback_data* cqd = static_cast DATA_FROM_CQ(cq); - bool is_success = (error == GRPC_ERROR_NONE); if (grpc_api_trace.enabled() || (grpc_trace_operation_failures.enabled() && error != GRPC_ERROR_NONE)) { @@ -847,10 +851,13 @@ static void cq_end_op_for_callback( cq_finish_shutdown_callback(cq); } - GRPC_ERROR_UNREF(error); - auto* functor = static_cast(tag); - grpc_core::ApplicationCallbackExecCtx::Enqueue(functor, is_success); + GRPC_CLOSURE_SCHED( + GRPC_CLOSURE_CREATE(functor_callback, functor, + grpc_core::Executor::Scheduler(grpc_core::ExecutorJobType::SHORT)), + GRPC_ERROR_REF(error)); + + GRPC_ERROR_UNREF(error); } void grpc_cq_end_op(grpc_completion_queue* cq, void* tag, grpc_error* error, @@ -1334,7 +1341,11 @@ static void cq_finish_shutdown_callback(grpc_completion_queue* cq) { GPR_ASSERT(cqd->shutdown_called); cq->poller_vtable->shutdown(POLLSET_FROM_CQ(cq), &cq->pollset_shutdown_done); - grpc_core::ApplicationCallbackExecCtx::Enqueue(callback, true); + GRPC_CLOSURE_SCHED( + GRPC_CLOSURE_CREATE(functor_callback, callback, + grpc_core::Executor::Scheduler(grpc_core::ExecutorJobType::SHORT)), + GRPC_ERROR_NONE); + } static void cq_shutdown_callback(grpc_completion_queue* cq) { From 56e34854caeefbbbbae0cd4c9e3649a01d466153 Mon Sep 17 00:00:00 2001 From: Karthik Ravi Shankar Date: Fri, 19 Apr 2019 13:52:55 -0700 Subject: [PATCH 03/25] Fix review comments and clang fixes. --- src/core/lib/surface/completion_queue.cc | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/src/core/lib/surface/completion_queue.cc b/src/core/lib/surface/completion_queue.cc index 9f2a6b4783a6a..29dd2101e10b4 100644 --- a/src/core/lib/surface/completion_queue.cc +++ b/src/core/lib/surface/completion_queue.cc @@ -34,6 +34,7 @@ #include "src/core/lib/gpr/string.h" #include "src/core/lib/gpr/tls.h" #include "src/core/lib/gprpp/atomic.h" +#include "src/core/lib/iomgr/executor.h" #include "src/core/lib/iomgr/pollset.h" #include "src/core/lib/iomgr/timer.h" #include "src/core/lib/profiling/timers.h" @@ -816,7 +817,7 @@ static void cq_end_op_for_pluck(grpc_completion_queue* cq, void* tag, static void functor_callback(void* arg, grpc_error* error) { auto* functor = static_cast(arg); - (*functor).functor_run(callback, error == GRPC_ERROR_NONE ? true : false); + functor->functor_run(functor, error == GRPC_ERROR_NONE); } /* Complete an event on a completion queue of type GRPC_CQ_CALLBACK */ @@ -853,8 +854,9 @@ static void cq_end_op_for_callback( auto* functor = static_cast(tag); GRPC_CLOSURE_SCHED( - GRPC_CLOSURE_CREATE(functor_callback, functor, - grpc_core::Executor::Scheduler(grpc_core::ExecutorJobType::SHORT)), + GRPC_CLOSURE_CREATE( + functor_callback, functor, + grpc_core::Executor::Scheduler(grpc_core::ExecutorJobType::SHORT)), GRPC_ERROR_REF(error)); GRPC_ERROR_UNREF(error); @@ -1342,10 +1344,10 @@ static void cq_finish_shutdown_callback(grpc_completion_queue* cq) { cq->poller_vtable->shutdown(POLLSET_FROM_CQ(cq), &cq->pollset_shutdown_done); GRPC_CLOSURE_SCHED( - GRPC_CLOSURE_CREATE(functor_callback, callback, - grpc_core::Executor::Scheduler(grpc_core::ExecutorJobType::SHORT)), + GRPC_CLOSURE_CREATE( + functor_callback, callback, + grpc_core::Executor::Scheduler(grpc_core::ExecutorJobType::SHORT)), GRPC_ERROR_NONE); - } static void cq_shutdown_callback(grpc_completion_queue* cq) { From f3911e7ff17e58d57b01132ac6351e837de877ab Mon Sep 17 00:00:00 2001 From: Karthik Ravi Shankar Date: Mon, 22 Apr 2019 15:37:54 -0700 Subject: [PATCH 04/25] Fix the completion queue tests for the new behavior --- test/core/surface/completion_queue_test.cc | 46 +++++++++++++++++++++- 1 file changed, 45 insertions(+), 1 deletion(-) diff --git a/test/core/surface/completion_queue_test.cc b/test/core/surface/completion_queue_test.cc index 7c3630eaf18c8..214d673cdf650 100644 --- a/test/core/surface/completion_queue_test.cc +++ b/test/core/surface/completion_queue_test.cc @@ -23,6 +23,7 @@ #include #include "src/core/lib/gpr/useful.h" #include "src/core/lib/gprpp/memory.h" +#include "src/core/lib/gprpp/sync.h" #include "src/core/lib/iomgr/iomgr.h" #include "test/core/util/test_config.h" @@ -365,10 +366,19 @@ static void test_callback(void) { GRPC_CQ_DEFAULT_POLLING, GRPC_CQ_NON_LISTENING, GRPC_CQ_NON_POLLING}; grpc_completion_queue_attributes attr; unsigned i; + static gpr_mu mu, shutdown_mu; + static gpr_cv cv, shutdown_cv; + static int cb_counter; + gpr_mu_init(&mu); + gpr_mu_init(&shutdown_mu); + gpr_cv_init(&cv); + gpr_cv_init(&shutdown_cv); LOG_TEST("test_callback"); + gpr_mu_lock(&shutdown_mu); bool got_shutdown = false; + gpr_mu_unlock(&shutdown_mu); class ShutdownCallback : public grpc_experimental_completion_queue_functor { public: ShutdownCallback(bool* done) : done_(done) { @@ -376,7 +386,11 @@ static void test_callback(void) { } ~ShutdownCallback() {} static void Run(grpc_experimental_completion_queue_functor* cb, int ok) { + gpr_mu_lock(&shutdown_mu); *static_cast(cb)->done_ = static_cast(ok); + gpr_mu_unlock(&shutdown_mu); + // Signal when the shutdown callback is completed. + gpr_cv_signal(&shutdown_cv); } private: @@ -391,9 +405,11 @@ static void test_callback(void) { for (size_t pidx = 0; pidx < GPR_ARRAY_SIZE(polling_types); pidx++) { int sumtags = 0; int counter = 0; + gpr_mu_lock(&mu); + cb_counter = 0; + gpr_mu_unlock(&mu); { // reset exec_ctx types - grpc_core::ApplicationCallbackExecCtx callback_exec_ctx; grpc_core::ExecCtx exec_ctx; attr.cq_polling_type = polling_types[pidx]; cc = grpc_completion_queue_create( @@ -409,7 +425,13 @@ static void test_callback(void) { int ok) { GPR_ASSERT(static_cast(ok)); auto* callback = static_cast(cb); + gpr_mu_lock(&mu); + cb_counter++; *callback->counter_ += callback->tag_; + if (cb_counter == GPR_ARRAY_SIZE(tags)) { + gpr_cv_signal(&cv); + } + gpr_mu_unlock(&mu); grpc_core::Delete(callback); }; @@ -429,12 +451,34 @@ static void test_callback(void) { nullptr, &completions[i]); } + gpr_mu_lock(&mu); + while (cb_counter != GPR_ARRAY_SIZE(tags)) { + // Wait for all the callbacks to complete. + gpr_cv_wait(&cv, &mu, gpr_inf_future(GPR_CLOCK_REALTIME)); + } + gpr_mu_unlock(&mu); + shutdown_and_destroy(cc); + + gpr_mu_lock(&shutdown_mu); + while (!got_shutdown) { + // Wait for the shutdown callback to complete. + gpr_cv_wait(&shutdown_cv, &shutdown_mu, gpr_inf_future(GPR_CLOCK_REALTIME)); + } + gpr_mu_unlock(&shutdown_mu); } + + gpr_mu_lock(&mu); + // Run the assertions to check if the test ran successfully. GPR_ASSERT(sumtags == counter); GPR_ASSERT(got_shutdown); + gpr_mu_unlock(&mu); got_shutdown = false; } + gpr_mu_destroy(&mu); + gpr_mu_destroy(&shutdown_mu); + gpr_cv_destroy(&cv); + gpr_cv_destroy(&shutdown_cv); } struct thread_state { From 4f4476e00f61b14a8f1141a0ba743215e39d88b2 Mon Sep 17 00:00:00 2001 From: Karthik Ravi Shankar Date: Tue, 23 Apr 2019 10:17:13 -0700 Subject: [PATCH 05/25] Make the exector to LONG from SHORT --- src/core/lib/surface/completion_queue.cc | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/core/lib/surface/completion_queue.cc b/src/core/lib/surface/completion_queue.cc index 29dd2101e10b4..afd046596efad 100644 --- a/src/core/lib/surface/completion_queue.cc +++ b/src/core/lib/surface/completion_queue.cc @@ -856,7 +856,7 @@ static void cq_end_op_for_callback( GRPC_CLOSURE_SCHED( GRPC_CLOSURE_CREATE( functor_callback, functor, - grpc_core::Executor::Scheduler(grpc_core::ExecutorJobType::SHORT)), + grpc_core::Executor::Scheduler(grpc_core::ExecutorJobType::LONG)), GRPC_ERROR_REF(error)); GRPC_ERROR_UNREF(error); @@ -1346,7 +1346,7 @@ static void cq_finish_shutdown_callback(grpc_completion_queue* cq) { GRPC_CLOSURE_SCHED( GRPC_CLOSURE_CREATE( functor_callback, callback, - grpc_core::Executor::Scheduler(grpc_core::ExecutorJobType::SHORT)), + grpc_core::Executor::Scheduler(grpc_core::ExecutorJobType::LONG)), GRPC_ERROR_NONE); } From e68316dda5c607118c9a20e79d46a2b99c72fccb Mon Sep 17 00:00:00 2001 From: Karthik Ravi Shankar Date: Tue, 14 May 2019 12:07:05 -0700 Subject: [PATCH 06/25] Fix errors from clang format script --- src/core/lib/surface/completion_queue.cc | 18 ++++++++---------- test/core/surface/completion_queue_test.cc | 3 ++- 2 files changed, 10 insertions(+), 11 deletions(-) diff --git a/src/core/lib/surface/completion_queue.cc b/src/core/lib/surface/completion_queue.cc index 18cc15d8b7a51..2b60033c42c3a 100644 --- a/src/core/lib/surface/completion_queue.cc +++ b/src/core/lib/surface/completion_queue.cc @@ -862,11 +862,10 @@ static void cq_end_op_for_callback( } auto* functor = static_cast(tag); - GRPC_CLOSURE_SCHED( - GRPC_CLOSURE_CREATE( - functor_callback, functor, - grpc_core::Executor::Scheduler(grpc_core::ExecutorJobType::LONG)), - GRPC_ERROR_REF(error)); + GRPC_CLOSURE_SCHED(GRPC_CLOSURE_CREATE(functor_callback, functor, + grpc_core::Executor::Scheduler( + grpc_core::ExecutorJobType::LONG)), + GRPC_ERROR_REF(error)); GRPC_ERROR_UNREF(error); } @@ -1352,11 +1351,10 @@ static void cq_finish_shutdown_callback(grpc_completion_queue* cq) { GPR_ASSERT(cqd->shutdown_called); cq->poller_vtable->shutdown(POLLSET_FROM_CQ(cq), &cq->pollset_shutdown_done); - GRPC_CLOSURE_SCHED( - GRPC_CLOSURE_CREATE( - functor_callback, callback, - grpc_core::Executor::Scheduler(grpc_core::ExecutorJobType::LONG)), - GRPC_ERROR_NONE); + GRPC_CLOSURE_SCHED(GRPC_CLOSURE_CREATE(functor_callback, callback, + grpc_core::Executor::Scheduler( + grpc_core::ExecutorJobType::LONG)), + GRPC_ERROR_NONE); } static void cq_shutdown_callback(grpc_completion_queue* cq) { diff --git a/test/core/surface/completion_queue_test.cc b/test/core/surface/completion_queue_test.cc index 214d673cdf650..35a67387335ba 100644 --- a/test/core/surface/completion_queue_test.cc +++ b/test/core/surface/completion_queue_test.cc @@ -463,7 +463,8 @@ static void test_callback(void) { gpr_mu_lock(&shutdown_mu); while (!got_shutdown) { // Wait for the shutdown callback to complete. - gpr_cv_wait(&shutdown_cv, &shutdown_mu, gpr_inf_future(GPR_CLOCK_REALTIME)); + gpr_cv_wait(&shutdown_cv, &shutdown_mu, + gpr_inf_future(GPR_CLOCK_REALTIME)); } gpr_mu_unlock(&shutdown_mu); } From a73f22bd73d72223136658d8ce1cd3da46d6457f Mon Sep 17 00:00:00 2001 From: Karthik Ravi Shankar Date: Tue, 14 May 2019 12:10:17 -0700 Subject: [PATCH 07/25] Make the executor SHORT instead of LONG. --- src/core/lib/surface/completion_queue.cc | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/core/lib/surface/completion_queue.cc b/src/core/lib/surface/completion_queue.cc index 2b60033c42c3a..8dce0e0114115 100644 --- a/src/core/lib/surface/completion_queue.cc +++ b/src/core/lib/surface/completion_queue.cc @@ -864,7 +864,7 @@ static void cq_end_op_for_callback( auto* functor = static_cast(tag); GRPC_CLOSURE_SCHED(GRPC_CLOSURE_CREATE(functor_callback, functor, grpc_core::Executor::Scheduler( - grpc_core::ExecutorJobType::LONG)), + grpc_core::ExecutorJobType::SHORT)), GRPC_ERROR_REF(error)); GRPC_ERROR_UNREF(error); @@ -1353,7 +1353,7 @@ static void cq_finish_shutdown_callback(grpc_completion_queue* cq) { cq->poller_vtable->shutdown(POLLSET_FROM_CQ(cq), &cq->pollset_shutdown_done); GRPC_CLOSURE_SCHED(GRPC_CLOSURE_CREATE(functor_callback, callback, grpc_core::Executor::Scheduler( - grpc_core::ExecutorJobType::LONG)), + grpc_core::ExecutorJobType::SHORT)), GRPC_ERROR_NONE); } From 091c12ad79049558c80bc3831c0198ea216c6673 Mon Sep 17 00:00:00 2001 From: Karthik Ravi Shankar Date: Tue, 14 May 2019 12:55:20 -0700 Subject: [PATCH 08/25] Fix clang errors --- src/core/lib/surface/completion_queue.cc | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/src/core/lib/surface/completion_queue.cc b/src/core/lib/surface/completion_queue.cc index 8dce0e0114115..cdf1020051f91 100644 --- a/src/core/lib/surface/completion_queue.cc +++ b/src/core/lib/surface/completion_queue.cc @@ -862,10 +862,11 @@ static void cq_end_op_for_callback( } auto* functor = static_cast(tag); - GRPC_CLOSURE_SCHED(GRPC_CLOSURE_CREATE(functor_callback, functor, - grpc_core::Executor::Scheduler( - grpc_core::ExecutorJobType::SHORT)), - GRPC_ERROR_REF(error)); + GRPC_CLOSURE_SCHED( + GRPC_CLOSURE_CREATE( + functor_callback, functor, + grpc_core::Executor::Scheduler(grpc_core::ExecutorJobType::SHORT)), + GRPC_ERROR_REF(error)); GRPC_ERROR_UNREF(error); } @@ -1351,10 +1352,11 @@ static void cq_finish_shutdown_callback(grpc_completion_queue* cq) { GPR_ASSERT(cqd->shutdown_called); cq->poller_vtable->shutdown(POLLSET_FROM_CQ(cq), &cq->pollset_shutdown_done); - GRPC_CLOSURE_SCHED(GRPC_CLOSURE_CREATE(functor_callback, callback, - grpc_core::Executor::Scheduler( - grpc_core::ExecutorJobType::SHORT)), - GRPC_ERROR_NONE); + GRPC_CLOSURE_SCHED( + GRPC_CLOSURE_CREATE( + functor_callback, callback, + grpc_core::Executor::Scheduler(grpc_core::ExecutorJobType::SHORT)), + GRPC_ERROR_NONE); } static void cq_shutdown_callback(grpc_completion_queue* cq) { From 172bb1b30f15b1d656603190c5bad2f426ac1354 Mon Sep 17 00:00:00 2001 From: Karthik Ravi Shankar Date: Tue, 14 May 2019 15:04:39 -0700 Subject: [PATCH 09/25] Whitelist internal code path to use ApplicationExecCtx --- src/core/lib/surface/completion_queue.cc | 27 +++++++++++++++--------- src/core/lib/surface/completion_queue.h | 2 +- src/core/lib/surface/server.cc | 2 +- 3 files changed, 19 insertions(+), 12 deletions(-) diff --git a/src/core/lib/surface/completion_queue.cc b/src/core/lib/surface/completion_queue.cc index cdf1020051f91..d040e91b79696 100644 --- a/src/core/lib/surface/completion_queue.cc +++ b/src/core/lib/surface/completion_queue.cc @@ -201,7 +201,7 @@ struct cq_vtable { bool (*begin_op)(grpc_completion_queue* cq, void* tag); void (*end_op)(grpc_completion_queue* cq, void* tag, grpc_error* error, void (*done)(void* done_arg, grpc_cq_completion* storage), - void* done_arg, grpc_cq_completion* storage); + void* done_arg, grpc_cq_completion* storage, bool internal); grpc_event (*next)(grpc_completion_queue* cq, gpr_timespec deadline, void* reserved); grpc_event (*pluck)(grpc_completion_queue* cq, void* tag, @@ -359,19 +359,19 @@ static void cq_end_op_for_next(grpc_completion_queue* cq, void* tag, grpc_error* error, void (*done)(void* done_arg, grpc_cq_completion* storage), - void* done_arg, grpc_cq_completion* storage); + void* done_arg, grpc_cq_completion* storage, bool internal = false); static void cq_end_op_for_pluck(grpc_completion_queue* cq, void* tag, grpc_error* error, void (*done)(void* done_arg, grpc_cq_completion* storage), - void* done_arg, grpc_cq_completion* storage); + void* done_arg, grpc_cq_completion* storage, bool internal = false); static void cq_end_op_for_callback(grpc_completion_queue* cq, void* tag, grpc_error* error, void (*done)(void* done_arg, grpc_cq_completion* storage), - void* done_arg, grpc_cq_completion* storage); + void* done_arg, grpc_cq_completion* storage, bool internal = false); static grpc_event cq_next(grpc_completion_queue* cq, gpr_timespec deadline, void* reserved); @@ -679,7 +679,8 @@ static void cq_end_op_for_next(grpc_completion_queue* cq, void* tag, grpc_error* error, void (*done)(void* done_arg, grpc_cq_completion* storage), - void* done_arg, grpc_cq_completion* storage) { + void* done_arg, grpc_cq_completion* storage, + bool internal) { GPR_TIMER_SCOPE("cq_end_op_for_next", 0); if (GRPC_TRACE_FLAG_ENABLED(grpc_api_trace) || @@ -759,7 +760,8 @@ static void cq_end_op_for_pluck(grpc_completion_queue* cq, void* tag, grpc_error* error, void (*done)(void* done_arg, grpc_cq_completion* storage), - void* done_arg, grpc_cq_completion* storage) { + void* done_arg, grpc_cq_completion* storage, + bool internal) { GPR_TIMER_SCOPE("cq_end_op_for_pluck", 0); cq_pluck_data* cqd = static_cast DATA_FROM_CQ(cq); @@ -831,7 +833,8 @@ static void functor_callback(void* arg, grpc_error* error) { static void cq_end_op_for_callback( grpc_completion_queue* cq, void* tag, grpc_error* error, void (*done)(void* done_arg, grpc_cq_completion* storage), void* done_arg, - grpc_cq_completion* storage) { + grpc_cq_completion* storage, + bool internal) { GPR_TIMER_SCOPE("cq_end_op_for_callback", 0); cq_callback_data* cqd = static_cast DATA_FROM_CQ(cq); @@ -862,19 +865,23 @@ static void cq_end_op_for_callback( } auto* functor = static_cast(tag); - GRPC_CLOSURE_SCHED( + if (internal) { + grpc_core::ApplicationCallbackExecCtx::Enqueue(functor, (error == GRPC_ERROR_NONE)); + } else { + GRPC_CLOSURE_SCHED( GRPC_CLOSURE_CREATE( functor_callback, functor, grpc_core::Executor::Scheduler(grpc_core::ExecutorJobType::SHORT)), GRPC_ERROR_REF(error)); + } GRPC_ERROR_UNREF(error); } void grpc_cq_end_op(grpc_completion_queue* cq, void* tag, grpc_error* error, void (*done)(void* done_arg, grpc_cq_completion* storage), - void* done_arg, grpc_cq_completion* storage) { - cq->vtable->end_op(cq, tag, error, done, done_arg, storage); + void* done_arg, grpc_cq_completion* storage, bool internal) { + cq->vtable->end_op(cq, tag, error, done, done_arg, storage, internal); } typedef struct { diff --git a/src/core/lib/surface/completion_queue.h b/src/core/lib/surface/completion_queue.h index d60fe6d6efe3f..f5b0822fcb4bb 100644 --- a/src/core/lib/surface/completion_queue.h +++ b/src/core/lib/surface/completion_queue.h @@ -77,7 +77,7 @@ bool grpc_cq_begin_op(grpc_completion_queue* cc, void* tag); grpc_cq_begin_op */ void grpc_cq_end_op(grpc_completion_queue* cc, void* tag, grpc_error* error, void (*done)(void* done_arg, grpc_cq_completion* storage), - void* done_arg, grpc_cq_completion* storage); + void* done_arg, grpc_cq_completion* storage, bool internal); grpc_pollset* grpc_cq_pollset(grpc_completion_queue* cc); diff --git a/src/core/lib/surface/server.cc b/src/core/lib/surface/server.cc index 2377c4d8f23d8..19f61c548d6b1 100644 --- a/src/core/lib/surface/server.cc +++ b/src/core/lib/surface/server.cc @@ -513,7 +513,7 @@ static void publish_call(grpc_server* server, call_data* calld, size_t cq_idx, } grpc_cq_end_op(calld->cq_new, rc->tag, GRPC_ERROR_NONE, done_request_event, - rc, &rc->completion); + rc, &rc->completion, true); } static void publish_new_rpc(void* arg, grpc_error* error) { From 4b89514919cbf1bb62294400587f9d98af323284 Mon Sep 17 00:00:00 2001 From: Karthik Ravi Shankar Date: Wed, 15 May 2019 12:04:13 -0700 Subject: [PATCH 10/25] Fix default value compile issues --- src/core/lib/surface/completion_queue.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/core/lib/surface/completion_queue.h b/src/core/lib/surface/completion_queue.h index f5b0822fcb4bb..16550ac61ebe1 100644 --- a/src/core/lib/surface/completion_queue.h +++ b/src/core/lib/surface/completion_queue.h @@ -77,7 +77,7 @@ bool grpc_cq_begin_op(grpc_completion_queue* cc, void* tag); grpc_cq_begin_op */ void grpc_cq_end_op(grpc_completion_queue* cc, void* tag, grpc_error* error, void (*done)(void* done_arg, grpc_cq_completion* storage), - void* done_arg, grpc_cq_completion* storage, bool internal); + void* done_arg, grpc_cq_completion* storage, bool internal = false); grpc_pollset* grpc_cq_pollset(grpc_completion_queue* cc); From 26ba981deeda821fae3a185b21308d16adcbbfe4 Mon Sep 17 00:00:00 2001 From: Karthik Ravi Shankar Date: Thu, 16 May 2019 09:34:20 -0700 Subject: [PATCH 11/25] Fix clang errors --- src/core/lib/surface/completion_queue.cc | 68 +++++++++++------------- src/core/lib/surface/completion_queue.h | 3 +- 2 files changed, 33 insertions(+), 38 deletions(-) diff --git a/src/core/lib/surface/completion_queue.cc b/src/core/lib/surface/completion_queue.cc index d040e91b79696..f1b31e9cbba3a 100644 --- a/src/core/lib/surface/completion_queue.cc +++ b/src/core/lib/surface/completion_queue.cc @@ -355,23 +355,20 @@ static bool cq_begin_op_for_callback(grpc_completion_queue* cq, void* tag); // queue. The done argument is a callback that will be invoked when it is // safe to free up that storage. The storage MUST NOT be freed until the // done callback is invoked. -static void cq_end_op_for_next(grpc_completion_queue* cq, void* tag, - grpc_error* error, - void (*done)(void* done_arg, - grpc_cq_completion* storage), - void* done_arg, grpc_cq_completion* storage, bool internal = false); - -static void cq_end_op_for_pluck(grpc_completion_queue* cq, void* tag, - grpc_error* error, - void (*done)(void* done_arg, - grpc_cq_completion* storage), - void* done_arg, grpc_cq_completion* storage, bool internal = false); - -static void cq_end_op_for_callback(grpc_completion_queue* cq, void* tag, - grpc_error* error, - void (*done)(void* done_arg, - grpc_cq_completion* storage), - void* done_arg, grpc_cq_completion* storage, bool internal = false); +static void cq_end_op_for_next( + grpc_completion_queue* cq, void* tag, grpc_error* error, + void (*done)(void* done_arg, grpc_cq_completion* storage), void* done_arg, + grpc_cq_completion* storage, bool internal = false); + +static void cq_end_op_for_pluck( + grpc_completion_queue* cq, void* tag, grpc_error* error, + void (*done)(void* done_arg, grpc_cq_completion* storage), void* done_arg, + grpc_cq_completion* storage, bool internal = false); + +static void cq_end_op_for_callback( + grpc_completion_queue* cq, void* tag, grpc_error* error, + void (*done)(void* done_arg, grpc_cq_completion* storage), void* done_arg, + grpc_cq_completion* storage, bool internal = false); static grpc_event cq_next(grpc_completion_queue* cq, gpr_timespec deadline, void* reserved); @@ -675,12 +672,10 @@ bool grpc_cq_begin_op(grpc_completion_queue* cq, void* tag) { /* Queue a GRPC_OP_COMPLETED operation to a completion queue (with a * completion * type of GRPC_CQ_NEXT) */ -static void cq_end_op_for_next(grpc_completion_queue* cq, void* tag, - grpc_error* error, - void (*done)(void* done_arg, - grpc_cq_completion* storage), - void* done_arg, grpc_cq_completion* storage, - bool internal) { +static void cq_end_op_for_next( + grpc_completion_queue* cq, void* tag, grpc_error* error, + void (*done)(void* done_arg, grpc_cq_completion* storage), void* done_arg, + grpc_cq_completion* storage, bool internal) { GPR_TIMER_SCOPE("cq_end_op_for_next", 0); if (GRPC_TRACE_FLAG_ENABLED(grpc_api_trace) || @@ -756,12 +751,10 @@ static void cq_end_op_for_next(grpc_completion_queue* cq, void* tag, /* Queue a GRPC_OP_COMPLETED operation to a completion queue (with a * completion * type of GRPC_CQ_PLUCK) */ -static void cq_end_op_for_pluck(grpc_completion_queue* cq, void* tag, - grpc_error* error, - void (*done)(void* done_arg, - grpc_cq_completion* storage), - void* done_arg, grpc_cq_completion* storage, - bool internal) { +static void cq_end_op_for_pluck( + grpc_completion_queue* cq, void* tag, grpc_error* error, + void (*done)(void* done_arg, grpc_cq_completion* storage), void* done_arg, + grpc_cq_completion* storage, bool internal) { GPR_TIMER_SCOPE("cq_end_op_for_pluck", 0); cq_pluck_data* cqd = static_cast DATA_FROM_CQ(cq); @@ -833,8 +826,7 @@ static void functor_callback(void* arg, grpc_error* error) { static void cq_end_op_for_callback( grpc_completion_queue* cq, void* tag, grpc_error* error, void (*done)(void* done_arg, grpc_cq_completion* storage), void* done_arg, - grpc_cq_completion* storage, - bool internal) { + grpc_cq_completion* storage, bool internal) { GPR_TIMER_SCOPE("cq_end_op_for_callback", 0); cq_callback_data* cqd = static_cast DATA_FROM_CQ(cq); @@ -866,13 +858,14 @@ static void cq_end_op_for_callback( auto* functor = static_cast(tag); if (internal) { - grpc_core::ApplicationCallbackExecCtx::Enqueue(functor, (error == GRPC_ERROR_NONE)); + grpc_core::ApplicationCallbackExecCtx::Enqueue(functor, + (error == GRPC_ERROR_NONE)); } else { GRPC_CLOSURE_SCHED( - GRPC_CLOSURE_CREATE( - functor_callback, functor, - grpc_core::Executor::Scheduler(grpc_core::ExecutorJobType::SHORT)), - GRPC_ERROR_REF(error)); + GRPC_CLOSURE_CREATE( + functor_callback, functor, + grpc_core::Executor::Scheduler(grpc_core::ExecutorJobType::SHORT)), + GRPC_ERROR_REF(error)); } GRPC_ERROR_UNREF(error); @@ -880,7 +873,8 @@ static void cq_end_op_for_callback( void grpc_cq_end_op(grpc_completion_queue* cq, void* tag, grpc_error* error, void (*done)(void* done_arg, grpc_cq_completion* storage), - void* done_arg, grpc_cq_completion* storage, bool internal) { + void* done_arg, grpc_cq_completion* storage, + bool internal) { cq->vtable->end_op(cq, tag, error, done, done_arg, storage, internal); } diff --git a/src/core/lib/surface/completion_queue.h b/src/core/lib/surface/completion_queue.h index 16550ac61ebe1..3ba9fbb87657d 100644 --- a/src/core/lib/surface/completion_queue.h +++ b/src/core/lib/surface/completion_queue.h @@ -77,7 +77,8 @@ bool grpc_cq_begin_op(grpc_completion_queue* cc, void* tag); grpc_cq_begin_op */ void grpc_cq_end_op(grpc_completion_queue* cc, void* tag, grpc_error* error, void (*done)(void* done_arg, grpc_cq_completion* storage), - void* done_arg, grpc_cq_completion* storage, bool internal = false); + void* done_arg, grpc_cq_completion* storage, + bool internal = false); grpc_pollset* grpc_cq_pollset(grpc_completion_queue* cc); From fe85756a8afd6e257b378b73cea1681785efb58d Mon Sep 17 00:00:00 2001 From: Karthik Ravi Shankar Date: Fri, 17 May 2019 10:15:41 -0700 Subject: [PATCH 12/25] Fix end2end tests --- test/cpp/end2end/client_callback_end2end_test.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/cpp/end2end/client_callback_end2end_test.cc b/test/cpp/end2end/client_callback_end2end_test.cc index ed7482b4be58b..8cf6def1073bd 100644 --- a/test/cpp/end2end/client_callback_end2end_test.cc +++ b/test/cpp/end2end/client_callback_end2end_test.cc @@ -379,7 +379,7 @@ TEST_P(ClientCallbackEnd2endTest, SimpleRpcUnderLock) { ResetStub(); std::mutex mu; std::condition_variable cv; - bool done; + bool done = false; EchoRequest request; request.set_message("Hello locked world."); EchoResponse response; From 92aa0530fa5c1c4a0419413223cc6016529a661a Mon Sep 17 00:00:00 2001 From: Karthik Ravi Shankar Date: Fri, 17 May 2019 12:17:16 -0700 Subject: [PATCH 13/25] Changes to locking order --- test/core/surface/completion_queue_test.cc | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/test/core/surface/completion_queue_test.cc b/test/core/surface/completion_queue_test.cc index 35a67387335ba..32f4debef133b 100644 --- a/test/core/surface/completion_queue_test.cc +++ b/test/core/surface/completion_queue_test.cc @@ -388,9 +388,9 @@ static void test_callback(void) { static void Run(grpc_experimental_completion_queue_functor* cb, int ok) { gpr_mu_lock(&shutdown_mu); *static_cast(cb)->done_ = static_cast(ok); - gpr_mu_unlock(&shutdown_mu); // Signal when the shutdown callback is completed. gpr_cv_signal(&shutdown_cv); + gpr_mu_unlock(&shutdown_mu); } private: @@ -469,17 +469,18 @@ static void test_callback(void) { gpr_mu_unlock(&shutdown_mu); } - gpr_mu_lock(&mu); // Run the assertions to check if the test ran successfully. GPR_ASSERT(sumtags == counter); GPR_ASSERT(got_shutdown); - gpr_mu_unlock(&mu); + gpr_mu_lock(&shutdown_mu); got_shutdown = false; + gpr_mu_unlock(&shutdown_mu); } - gpr_mu_destroy(&mu); - gpr_mu_destroy(&shutdown_mu); + gpr_cv_destroy(&cv); gpr_cv_destroy(&shutdown_cv); + gpr_mu_destroy(&mu); + gpr_mu_destroy(&shutdown_mu); } struct thread_state { From a89b1763afa69d8a943a129971e36e2b0129751f Mon Sep 17 00:00:00 2001 From: Karthik Ravi Shankar Date: Fri, 17 May 2019 15:35:12 -0700 Subject: [PATCH 14/25] changes --- src/core/lib/surface/completion_queue.cc | 37 ++++++++++-------------- src/core/lib/surface/completion_queue.h | 3 +- src/core/lib/surface/server.cc | 2 +- 3 files changed, 17 insertions(+), 25 deletions(-) diff --git a/src/core/lib/surface/completion_queue.cc b/src/core/lib/surface/completion_queue.cc index f1b31e9cbba3a..b62f3fa6add68 100644 --- a/src/core/lib/surface/completion_queue.cc +++ b/src/core/lib/surface/completion_queue.cc @@ -201,7 +201,7 @@ struct cq_vtable { bool (*begin_op)(grpc_completion_queue* cq, void* tag); void (*end_op)(grpc_completion_queue* cq, void* tag, grpc_error* error, void (*done)(void* done_arg, grpc_cq_completion* storage), - void* done_arg, grpc_cq_completion* storage, bool internal); + void* done_arg, grpc_cq_completion* storage); grpc_event (*next)(grpc_completion_queue* cq, gpr_timespec deadline, void* reserved); grpc_event (*pluck)(grpc_completion_queue* cq, void* tag, @@ -358,17 +358,17 @@ static bool cq_begin_op_for_callback(grpc_completion_queue* cq, void* tag); static void cq_end_op_for_next( grpc_completion_queue* cq, void* tag, grpc_error* error, void (*done)(void* done_arg, grpc_cq_completion* storage), void* done_arg, - grpc_cq_completion* storage, bool internal = false); + grpc_cq_completion* storage); static void cq_end_op_for_pluck( grpc_completion_queue* cq, void* tag, grpc_error* error, void (*done)(void* done_arg, grpc_cq_completion* storage), void* done_arg, - grpc_cq_completion* storage, bool internal = false); + grpc_cq_completion* storage); static void cq_end_op_for_callback( grpc_completion_queue* cq, void* tag, grpc_error* error, void (*done)(void* done_arg, grpc_cq_completion* storage), void* done_arg, - grpc_cq_completion* storage, bool internal = false); + grpc_cq_completion* storage); static grpc_event cq_next(grpc_completion_queue* cq, gpr_timespec deadline, void* reserved); @@ -675,7 +675,7 @@ bool grpc_cq_begin_op(grpc_completion_queue* cq, void* tag) { static void cq_end_op_for_next( grpc_completion_queue* cq, void* tag, grpc_error* error, void (*done)(void* done_arg, grpc_cq_completion* storage), void* done_arg, - grpc_cq_completion* storage, bool internal) { + grpc_cq_completion* storage) { GPR_TIMER_SCOPE("cq_end_op_for_next", 0); if (GRPC_TRACE_FLAG_ENABLED(grpc_api_trace) || @@ -754,7 +754,7 @@ static void cq_end_op_for_next( static void cq_end_op_for_pluck( grpc_completion_queue* cq, void* tag, grpc_error* error, void (*done)(void* done_arg, grpc_cq_completion* storage), void* done_arg, - grpc_cq_completion* storage, bool internal) { + grpc_cq_completion* storage) { GPR_TIMER_SCOPE("cq_end_op_for_pluck", 0); cq_pluck_data* cqd = static_cast DATA_FROM_CQ(cq); @@ -826,7 +826,7 @@ static void functor_callback(void* arg, grpc_error* error) { static void cq_end_op_for_callback( grpc_completion_queue* cq, void* tag, grpc_error* error, void (*done)(void* done_arg, grpc_cq_completion* storage), void* done_arg, - grpc_cq_completion* storage, bool internal) { + grpc_cq_completion* storage) { GPR_TIMER_SCOPE("cq_end_op_for_callback", 0); cq_callback_data* cqd = static_cast DATA_FROM_CQ(cq); @@ -857,25 +857,18 @@ static void cq_end_op_for_callback( } auto* functor = static_cast(tag); - if (internal) { - grpc_core::ApplicationCallbackExecCtx::Enqueue(functor, - (error == GRPC_ERROR_NONE)); - } else { - GRPC_CLOSURE_SCHED( - GRPC_CLOSURE_CREATE( - functor_callback, functor, - grpc_core::Executor::Scheduler(grpc_core::ExecutorJobType::SHORT)), - GRPC_ERROR_REF(error)); - } - + GRPC_CLOSURE_RUN( + GRPC_CLOSURE_CREATE( + functor_callback, functor, + grpc_core::Executor::Scheduler(grpc_core::ExecutorJobType::SHORT)), + GRPC_ERROR_REF(error)); GRPC_ERROR_UNREF(error); } void grpc_cq_end_op(grpc_completion_queue* cq, void* tag, grpc_error* error, void (*done)(void* done_arg, grpc_cq_completion* storage), - void* done_arg, grpc_cq_completion* storage, - bool internal) { - cq->vtable->end_op(cq, tag, error, done, done_arg, storage, internal); + void* done_arg, grpc_cq_completion* storage) { + cq->vtable->end_op(cq, tag, error, done, done_arg, storage); } typedef struct { @@ -1353,7 +1346,7 @@ static void cq_finish_shutdown_callback(grpc_completion_queue* cq) { GPR_ASSERT(cqd->shutdown_called); cq->poller_vtable->shutdown(POLLSET_FROM_CQ(cq), &cq->pollset_shutdown_done); - GRPC_CLOSURE_SCHED( + GRPC_CLOSURE_RUN( GRPC_CLOSURE_CREATE( functor_callback, callback, grpc_core::Executor::Scheduler(grpc_core::ExecutorJobType::SHORT)), diff --git a/src/core/lib/surface/completion_queue.h b/src/core/lib/surface/completion_queue.h index 3ba9fbb87657d..d60fe6d6efe3f 100644 --- a/src/core/lib/surface/completion_queue.h +++ b/src/core/lib/surface/completion_queue.h @@ -77,8 +77,7 @@ bool grpc_cq_begin_op(grpc_completion_queue* cc, void* tag); grpc_cq_begin_op */ void grpc_cq_end_op(grpc_completion_queue* cc, void* tag, grpc_error* error, void (*done)(void* done_arg, grpc_cq_completion* storage), - void* done_arg, grpc_cq_completion* storage, - bool internal = false); + void* done_arg, grpc_cq_completion* storage); grpc_pollset* grpc_cq_pollset(grpc_completion_queue* cc); diff --git a/src/core/lib/surface/server.cc b/src/core/lib/surface/server.cc index 19f61c548d6b1..2377c4d8f23d8 100644 --- a/src/core/lib/surface/server.cc +++ b/src/core/lib/surface/server.cc @@ -513,7 +513,7 @@ static void publish_call(grpc_server* server, call_data* calld, size_t cq_idx, } grpc_cq_end_op(calld->cq_new, rc->tag, GRPC_ERROR_NONE, done_request_event, - rc, &rc->completion, true); + rc, &rc->completion); } static void publish_new_rpc(void* arg, grpc_error* error) { From 56ff5a918f114372e7dfd9f130d4411b474bc466 Mon Sep 17 00:00:00 2001 From: Karthik Ravi Shankar Date: Mon, 20 May 2019 11:35:45 -0700 Subject: [PATCH 15/25] Address review comments - 1 --- test/core/surface/completion_queue_test.cc | 6 ------ 1 file changed, 6 deletions(-) diff --git a/test/core/surface/completion_queue_test.cc b/test/core/surface/completion_queue_test.cc index 32f4debef133b..ff6722ffb1758 100644 --- a/test/core/surface/completion_queue_test.cc +++ b/test/core/surface/completion_queue_test.cc @@ -376,9 +376,7 @@ static void test_callback(void) { LOG_TEST("test_callback"); - gpr_mu_lock(&shutdown_mu); bool got_shutdown = false; - gpr_mu_unlock(&shutdown_mu); class ShutdownCallback : public grpc_experimental_completion_queue_functor { public: ShutdownCallback(bool* done) : done_(done) { @@ -405,9 +403,7 @@ static void test_callback(void) { for (size_t pidx = 0; pidx < GPR_ARRAY_SIZE(polling_types); pidx++) { int sumtags = 0; int counter = 0; - gpr_mu_lock(&mu); cb_counter = 0; - gpr_mu_unlock(&mu); { // reset exec_ctx types grpc_core::ExecCtx exec_ctx; @@ -472,9 +468,7 @@ static void test_callback(void) { // Run the assertions to check if the test ran successfully. GPR_ASSERT(sumtags == counter); GPR_ASSERT(got_shutdown); - gpr_mu_lock(&shutdown_mu); got_shutdown = false; - gpr_mu_unlock(&shutdown_mu); } gpr_cv_destroy(&cv); From 061dfc911f54c8a641ff7126dec77ac0c27430e7 Mon Sep 17 00:00:00 2001 From: Karthik Ravi Shankar Date: Mon, 20 May 2019 13:51:27 -0700 Subject: [PATCH 16/25] Bring back the internalization --- src/core/lib/surface/completion_queue.cc | 23 ++++++++++++++--------- src/core/lib/surface/completion_queue.h | 3 ++- src/core/lib/surface/server.cc | 2 +- 3 files changed, 17 insertions(+), 11 deletions(-) diff --git a/src/core/lib/surface/completion_queue.cc b/src/core/lib/surface/completion_queue.cc index b62f3fa6add68..64ceb45cf24a5 100644 --- a/src/core/lib/surface/completion_queue.cc +++ b/src/core/lib/surface/completion_queue.cc @@ -201,7 +201,7 @@ struct cq_vtable { bool (*begin_op)(grpc_completion_queue* cq, void* tag); void (*end_op)(grpc_completion_queue* cq, void* tag, grpc_error* error, void (*done)(void* done_arg, grpc_cq_completion* storage), - void* done_arg, grpc_cq_completion* storage); + void* done_arg, grpc_cq_completion* storage, bool internal); grpc_event (*next)(grpc_completion_queue* cq, gpr_timespec deadline, void* reserved); grpc_event (*pluck)(grpc_completion_queue* cq, void* tag, @@ -358,17 +358,17 @@ static bool cq_begin_op_for_callback(grpc_completion_queue* cq, void* tag); static void cq_end_op_for_next( grpc_completion_queue* cq, void* tag, grpc_error* error, void (*done)(void* done_arg, grpc_cq_completion* storage), void* done_arg, - grpc_cq_completion* storage); + grpc_cq_completion* storage, bool internal); static void cq_end_op_for_pluck( grpc_completion_queue* cq, void* tag, grpc_error* error, void (*done)(void* done_arg, grpc_cq_completion* storage), void* done_arg, - grpc_cq_completion* storage); + grpc_cq_completion* storage, bool internal); static void cq_end_op_for_callback( grpc_completion_queue* cq, void* tag, grpc_error* error, void (*done)(void* done_arg, grpc_cq_completion* storage), void* done_arg, - grpc_cq_completion* storage); + grpc_cq_completion* storage, bool internal); static grpc_event cq_next(grpc_completion_queue* cq, gpr_timespec deadline, void* reserved); @@ -675,7 +675,7 @@ bool grpc_cq_begin_op(grpc_completion_queue* cq, void* tag) { static void cq_end_op_for_next( grpc_completion_queue* cq, void* tag, grpc_error* error, void (*done)(void* done_arg, grpc_cq_completion* storage), void* done_arg, - grpc_cq_completion* storage) { + grpc_cq_completion* storage, bool internal) { GPR_TIMER_SCOPE("cq_end_op_for_next", 0); if (GRPC_TRACE_FLAG_ENABLED(grpc_api_trace) || @@ -754,7 +754,7 @@ static void cq_end_op_for_next( static void cq_end_op_for_pluck( grpc_completion_queue* cq, void* tag, grpc_error* error, void (*done)(void* done_arg, grpc_cq_completion* storage), void* done_arg, - grpc_cq_completion* storage) { + grpc_cq_completion* storage, bool internal) { GPR_TIMER_SCOPE("cq_end_op_for_pluck", 0); cq_pluck_data* cqd = static_cast DATA_FROM_CQ(cq); @@ -826,7 +826,7 @@ static void functor_callback(void* arg, grpc_error* error) { static void cq_end_op_for_callback( grpc_completion_queue* cq, void* tag, grpc_error* error, void (*done)(void* done_arg, grpc_cq_completion* storage), void* done_arg, - grpc_cq_completion* storage) { + grpc_cq_completion* storage, bool internal) { GPR_TIMER_SCOPE("cq_end_op_for_callback", 0); cq_callback_data* cqd = static_cast DATA_FROM_CQ(cq); @@ -857,18 +857,23 @@ static void cq_end_op_for_callback( } auto* functor = static_cast(tag); + if (internal) { + grpc_core::ApplicationCallbackExecCtx::Enqueue(functor, + (error == GRPC_ERROR_NONE)); + } else { GRPC_CLOSURE_RUN( GRPC_CLOSURE_CREATE( functor_callback, functor, grpc_core::Executor::Scheduler(grpc_core::ExecutorJobType::SHORT)), GRPC_ERROR_REF(error)); + } GRPC_ERROR_UNREF(error); } void grpc_cq_end_op(grpc_completion_queue* cq, void* tag, grpc_error* error, void (*done)(void* done_arg, grpc_cq_completion* storage), - void* done_arg, grpc_cq_completion* storage) { - cq->vtable->end_op(cq, tag, error, done, done_arg, storage); + void* done_arg, grpc_cq_completion* storage, bool internal) { + cq->vtable->end_op(cq, tag, error, done, done_arg, storage, internal); } typedef struct { diff --git a/src/core/lib/surface/completion_queue.h b/src/core/lib/surface/completion_queue.h index d60fe6d6efe3f..3ba9fbb87657d 100644 --- a/src/core/lib/surface/completion_queue.h +++ b/src/core/lib/surface/completion_queue.h @@ -77,7 +77,8 @@ bool grpc_cq_begin_op(grpc_completion_queue* cc, void* tag); grpc_cq_begin_op */ void grpc_cq_end_op(grpc_completion_queue* cc, void* tag, grpc_error* error, void (*done)(void* done_arg, grpc_cq_completion* storage), - void* done_arg, grpc_cq_completion* storage); + void* done_arg, grpc_cq_completion* storage, + bool internal = false); grpc_pollset* grpc_cq_pollset(grpc_completion_queue* cc); diff --git a/src/core/lib/surface/server.cc b/src/core/lib/surface/server.cc index 2377c4d8f23d8..19f61c548d6b1 100644 --- a/src/core/lib/surface/server.cc +++ b/src/core/lib/surface/server.cc @@ -513,7 +513,7 @@ static void publish_call(grpc_server* server, call_data* calld, size_t cq_idx, } grpc_cq_end_op(calld->cq_new, rc->tag, GRPC_ERROR_NONE, done_request_event, - rc, &rc->completion); + rc, &rc->completion, true); } static void publish_new_rpc(void* arg, grpc_error* error) { From 035bf8eb14cc119dafec0aebd19e953e65142cb9 Mon Sep 17 00:00:00 2001 From: Karthik Ravi Shankar Date: Tue, 21 May 2019 10:50:18 -0700 Subject: [PATCH 17/25] Fix clang errors --- src/core/lib/surface/completion_queue.cc | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/src/core/lib/surface/completion_queue.cc b/src/core/lib/surface/completion_queue.cc index 64ceb45cf24a5..5fa36392da527 100644 --- a/src/core/lib/surface/completion_queue.cc +++ b/src/core/lib/surface/completion_queue.cc @@ -861,18 +861,19 @@ static void cq_end_op_for_callback( grpc_core::ApplicationCallbackExecCtx::Enqueue(functor, (error == GRPC_ERROR_NONE)); } else { - GRPC_CLOSURE_RUN( - GRPC_CLOSURE_CREATE( - functor_callback, functor, - grpc_core::Executor::Scheduler(grpc_core::ExecutorJobType::SHORT)), - GRPC_ERROR_REF(error)); + GRPC_CLOSURE_RUN( + GRPC_CLOSURE_CREATE( + functor_callback, functor, + grpc_core::Executor::Scheduler(grpc_core::ExecutorJobType::SHORT)), + GRPC_ERROR_REF(error)); } GRPC_ERROR_UNREF(error); } void grpc_cq_end_op(grpc_completion_queue* cq, void* tag, grpc_error* error, void (*done)(void* done_arg, grpc_cq_completion* storage), - void* done_arg, grpc_cq_completion* storage, bool internal) { + void* done_arg, grpc_cq_completion* storage, + bool internal) { cq->vtable->end_op(cq, tag, error, done, done_arg, storage, internal); } @@ -1351,11 +1352,10 @@ static void cq_finish_shutdown_callback(grpc_completion_queue* cq) { GPR_ASSERT(cqd->shutdown_called); cq->poller_vtable->shutdown(POLLSET_FROM_CQ(cq), &cq->pollset_shutdown_done); - GRPC_CLOSURE_RUN( - GRPC_CLOSURE_CREATE( - functor_callback, callback, - grpc_core::Executor::Scheduler(grpc_core::ExecutorJobType::SHORT)), - GRPC_ERROR_NONE); + GRPC_CLOSURE_RUN(GRPC_CLOSURE_CREATE(functor_callback, callback, + grpc_core::Executor::Scheduler( + grpc_core::ExecutorJobType::SHORT)), + GRPC_ERROR_NONE); } static void cq_shutdown_callback(grpc_completion_queue* cq) { From ccc105f3fdc0e017b8510f694f2ebb9c67c90324 Mon Sep 17 00:00:00 2001 From: Karthik Ravi Shankar Date: Tue, 21 May 2019 11:07:43 -0700 Subject: [PATCH 18/25] Move GRPC_CLOSURE_RUN to GRPC_CLOSURE_SCHED - As here we want it to be scheduled for execution later. --- src/core/lib/surface/completion_queue.cc | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/src/core/lib/surface/completion_queue.cc b/src/core/lib/surface/completion_queue.cc index 5fa36392da527..d0ed1a9f673b5 100644 --- a/src/core/lib/surface/completion_queue.cc +++ b/src/core/lib/surface/completion_queue.cc @@ -861,7 +861,7 @@ static void cq_end_op_for_callback( grpc_core::ApplicationCallbackExecCtx::Enqueue(functor, (error == GRPC_ERROR_NONE)); } else { - GRPC_CLOSURE_RUN( + GRPC_CLOSURE_SCHED( GRPC_CLOSURE_CREATE( functor_callback, functor, grpc_core::Executor::Scheduler(grpc_core::ExecutorJobType::SHORT)), @@ -1352,10 +1352,11 @@ static void cq_finish_shutdown_callback(grpc_completion_queue* cq) { GPR_ASSERT(cqd->shutdown_called); cq->poller_vtable->shutdown(POLLSET_FROM_CQ(cq), &cq->pollset_shutdown_done); - GRPC_CLOSURE_RUN(GRPC_CLOSURE_CREATE(functor_callback, callback, - grpc_core::Executor::Scheduler( - grpc_core::ExecutorJobType::SHORT)), - GRPC_ERROR_NONE); + GRPC_CLOSURE_SCHED( + GRPC_CLOSURE_CREATE( + functor_callback, callback, + grpc_core::Executor::Scheduler(grpc_core::ExecutorJobType::SHORT)), + GRPC_ERROR_NONE); } static void cq_shutdown_callback(grpc_completion_queue* cq) { From 4f7f561564e52b0f2dd97ac661594d2449886df4 Mon Sep 17 00:00:00 2001 From: Karthik Ravi Shankar Date: Wed, 22 May 2019 10:42:50 -0700 Subject: [PATCH 19/25] Add synchronization to bm test - since we made the callback run on another thread, add synchronization in bm tests as well --- test/cpp/microbenchmarks/bm_cq.cc | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/test/cpp/microbenchmarks/bm_cq.cc b/test/cpp/microbenchmarks/bm_cq.cc index 50eb9454fbe5d..1f0b9621b21dd 100644 --- a/test/cpp/microbenchmarks/bm_cq.cc +++ b/test/cpp/microbenchmarks/bm_cq.cc @@ -166,6 +166,9 @@ class TagCallback : public grpc_experimental_completion_queue_functor { int* iter_; }; +static gpr_mu shutdown_mu; +static gpr_cv shutdown_cv; + // Check if completion queue is shut down class ShutdownCallback : public grpc_experimental_completion_queue_functor { public: @@ -174,7 +177,10 @@ class ShutdownCallback : public grpc_experimental_completion_queue_functor { } ~ShutdownCallback() {} static void Run(grpc_experimental_completion_queue_functor* cb, int ok) { + gpr_mu_lock(&shutdown_mu); *static_cast(cb)->done_ = static_cast(ok); + gpr_cv_signal(&shutdown_cv); + gpr_mu_unlock(&shutdown_mu); } private: @@ -185,6 +191,8 @@ static void BM_Callback_CQ_Pass1Core(benchmark::State& state) { TrackCounters track_counters; int iteration = 0; TagCallback tag_cb(&iteration); + gpr_mu_init(&shutdown_mu); + gpr_cv_init(&shutdown_cv); bool got_shutdown = false; ShutdownCallback shutdown_cb(&got_shutdown); grpc_completion_queue* cc = @@ -198,9 +206,19 @@ static void BM_Callback_CQ_Pass1Core(benchmark::State& state) { nullptr, &completion); } shutdown_and_destroy(cc); + + gpr_mu_lock(&shutdown_mu); + while (!got_shutdown) { + // Wait for the shutdown callback to complete. + gpr_cv_wait(&shutdown_cv, &shutdown_mu, + gpr_inf_future(GPR_CLOCK_REALTIME)); + } + gpr_mu_unlock(&shutdown_mu); GPR_ASSERT(got_shutdown); GPR_ASSERT(iteration == static_cast(state.iterations())); track_counters.Finish(state); + gpr_cv_destroy(&shutdown_cv); + gpr_mu_destroy(&shutdown_mu); } BENCHMARK(BM_Callback_CQ_Pass1Core); From 4a208f0071b0ece3a36bf3be4fc3c7602cce2cd8 Mon Sep 17 00:00:00 2001 From: Can Guler Date: Wed, 22 May 2019 10:48:54 -0700 Subject: [PATCH 20/25] Add v1.21.0 releases of grpc-go to interop matrix --- tools/interop_matrix/client_matrix.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tools/interop_matrix/client_matrix.py b/tools/interop_matrix/client_matrix.py index 234c71295b356..f9c069f638934 100644 --- a/tools/interop_matrix/client_matrix.py +++ b/tools/interop_matrix/client_matrix.py @@ -124,6 +124,7 @@ def __init__(self, patch=[], runtime_subset=[], testcases_file=None): ('v1.18.0', ReleaseInfo(runtime_subset=['go1.11'])), ('v1.19.0', ReleaseInfo(runtime_subset=['go1.11'])), ('v1.20.0', ReleaseInfo(runtime_subset=['go1.11'])), + ('v1.21.0', ReleaseInfo(runtime_subset=['go1.11'])), ]), 'java': OrderedDict([ From e1f62278e316d8b20dc5a2d7f91f76b117d6199b Mon Sep 17 00:00:00 2001 From: Karthik Ravi Shankar Date: Wed, 22 May 2019 13:53:10 -0700 Subject: [PATCH 21/25] Fix clang error --- test/cpp/microbenchmarks/bm_cq.cc | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/test/cpp/microbenchmarks/bm_cq.cc b/test/cpp/microbenchmarks/bm_cq.cc index 1f0b9621b21dd..6ab4b083c13cf 100644 --- a/test/cpp/microbenchmarks/bm_cq.cc +++ b/test/cpp/microbenchmarks/bm_cq.cc @@ -210,8 +210,7 @@ static void BM_Callback_CQ_Pass1Core(benchmark::State& state) { gpr_mu_lock(&shutdown_mu); while (!got_shutdown) { // Wait for the shutdown callback to complete. - gpr_cv_wait(&shutdown_cv, &shutdown_mu, - gpr_inf_future(GPR_CLOCK_REALTIME)); + gpr_cv_wait(&shutdown_cv, &shutdown_mu, gpr_inf_future(GPR_CLOCK_REALTIME)); } gpr_mu_unlock(&shutdown_mu); GPR_ASSERT(got_shutdown); From 3d258e89aec313c2b3687a38eb06cc61559a1cda Mon Sep 17 00:00:00 2001 From: Karthik Ravi Shankar Date: Wed, 22 May 2019 14:59:33 -0700 Subject: [PATCH 22/25] Fix windows compiler errors --- test/core/surface/completion_queue_test.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/core/surface/completion_queue_test.cc b/test/core/surface/completion_queue_test.cc index ff6722ffb1758..4a33b934f436d 100644 --- a/test/core/surface/completion_queue_test.cc +++ b/test/core/surface/completion_queue_test.cc @@ -360,7 +360,7 @@ static void test_pluck_after_shutdown(void) { static void test_callback(void) { grpc_completion_queue* cc; - void* tags[128]; + static void* tags[128]; grpc_cq_completion completions[GPR_ARRAY_SIZE(tags)]; grpc_cq_polling_type polling_types[] = { GRPC_CQ_DEFAULT_POLLING, GRPC_CQ_NON_LISTENING, GRPC_CQ_NON_POLLING}; From d2c8eb94c954914e14f979229e963964211c80f8 Mon Sep 17 00:00:00 2001 From: Karthik Ravi Shankar Date: Thu, 23 May 2019 18:09:12 -0700 Subject: [PATCH 23/25] Fix microbenchmark failures --- test/cpp/microbenchmarks/callback_streaming_ping_pong.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/cpp/microbenchmarks/callback_streaming_ping_pong.h b/test/cpp/microbenchmarks/callback_streaming_ping_pong.h index 9fb86bd829915..0d27e0efa50c6 100644 --- a/test/cpp/microbenchmarks/callback_streaming_ping_pong.h +++ b/test/cpp/microbenchmarks/callback_streaming_ping_pong.h @@ -115,7 +115,7 @@ class BidiClient int msgs_size_; std::mutex mu; std::condition_variable cv; - bool done; + bool done = false; }; template From b18faa6c95bee22f03730ad4bc9b192440b5403b Mon Sep 17 00:00:00 2001 From: Karthik Ravi Shankar Date: Thu, 23 May 2019 20:08:45 -0700 Subject: [PATCH 24/25] Fix tsan error --- test/cpp/microbenchmarks/bm_cq.cc | 24 ++++++++++++++++++++---- 1 file changed, 20 insertions(+), 4 deletions(-) diff --git a/test/cpp/microbenchmarks/bm_cq.cc b/test/cpp/microbenchmarks/bm_cq.cc index 6ab4b083c13cf..edbff9c2be350 100644 --- a/test/cpp/microbenchmarks/bm_cq.cc +++ b/test/cpp/microbenchmarks/bm_cq.cc @@ -150,6 +150,9 @@ static void shutdown_and_destroy(grpc_completion_queue* cc) { grpc_completion_queue_destroy(cc); } +static gpr_mu shutdown_mu, mu; +static gpr_cv shutdown_cv, cv; + // Tag completion queue iterate times class TagCallback : public grpc_experimental_completion_queue_functor { public: @@ -158,17 +161,17 @@ class TagCallback : public grpc_experimental_completion_queue_functor { } ~TagCallback() {} static void Run(grpc_experimental_completion_queue_functor* cb, int ok) { + gpr_mu_lock(&mu); GPR_ASSERT(static_cast(ok)); *static_cast(cb)->iter_ += 1; + gpr_cv_signal(&cv); + gpr_mu_unlock(&mu); }; private: int* iter_; }; -static gpr_mu shutdown_mu; -static gpr_cv shutdown_cv; - // Check if completion queue is shut down class ShutdownCallback : public grpc_experimental_completion_queue_functor { public: @@ -189,8 +192,10 @@ class ShutdownCallback : public grpc_experimental_completion_queue_functor { static void BM_Callback_CQ_Pass1Core(benchmark::State& state) { TrackCounters track_counters; - int iteration = 0; + int iteration = 0, current_iterations = 0; TagCallback tag_cb(&iteration); + gpr_mu_init(&mu); + gpr_cv_init(&cv); gpr_mu_init(&shutdown_mu); gpr_cv_init(&shutdown_cv); bool got_shutdown = false; @@ -207,15 +212,26 @@ static void BM_Callback_CQ_Pass1Core(benchmark::State& state) { } shutdown_and_destroy(cc); + gpr_mu_lock(&mu); + current_iterations = static_cast(state.iterations()); + while (current_iterations != iteration) { + // Wait for all the callbacks to complete. + gpr_cv_wait(&cv, &mu, gpr_inf_future(GPR_CLOCK_REALTIME)); + } + gpr_mu_unlock(&mu); + gpr_mu_lock(&shutdown_mu); while (!got_shutdown) { // Wait for the shutdown callback to complete. gpr_cv_wait(&shutdown_cv, &shutdown_mu, gpr_inf_future(GPR_CLOCK_REALTIME)); } gpr_mu_unlock(&shutdown_mu); + GPR_ASSERT(got_shutdown); GPR_ASSERT(iteration == static_cast(state.iterations())); track_counters.Finish(state); + gpr_cv_destroy(&cv); + gpr_mu_destroy(&mu); gpr_cv_destroy(&shutdown_cv); gpr_mu_destroy(&shutdown_mu); } From 088319bc40f2d0b38c69a7b3c557749131451810 Mon Sep 17 00:00:00 2001 From: Karthik Ravi Shankar Date: Thu, 23 May 2019 22:10:53 -0700 Subject: [PATCH 25/25] IWYU in core_codegen_interface core_codegen_interface requires ByteBuffer in generated code and needs to include byte_buffer.h NO_BUG=Cleanup --- include/grpcpp/impl/codegen/core_codegen_interface.h | 1 + 1 file changed, 1 insertion(+) diff --git a/include/grpcpp/impl/codegen/core_codegen_interface.h b/include/grpcpp/impl/codegen/core_codegen_interface.h index 3792c3d4693a8..02b5033c51fb1 100644 --- a/include/grpcpp/impl/codegen/core_codegen_interface.h +++ b/include/grpcpp/impl/codegen/core_codegen_interface.h @@ -19,6 +19,7 @@ #ifndef GRPCPP_IMPL_CODEGEN_CORE_CODEGEN_INTERFACE_H #define GRPCPP_IMPL_CODEGEN_CORE_CODEGEN_INTERFACE_H +#include #include #include #include