Skip to content

Commit

Permalink
Allocate retry payload fields with subchannel call instead of with ea…
Browse files Browse the repository at this point in the history
…ch batch.
  • Loading branch information
markdroth committed Jun 19, 2018
1 parent 547bb5e commit 5bacf2e
Showing 1 changed file with 72 additions and 57 deletions.
129 changes: 72 additions & 57 deletions src/core/ext/filters/client_channel/client_channel.cc
Original file line number Diff line number Diff line change
Expand Up @@ -794,6 +794,15 @@ typedef struct {
// The batch to use in the subchannel call.
// Its payload field points to subchannel_call_retry_state.batch_payload.
grpc_transport_stream_op_batch batch;
// For intercepting on_complete.
grpc_closure on_complete;
} subchannel_batch_data;

// Retry state associated with a subchannel call.
// Stored in the parent_data of the subchannel call object.
typedef struct {
// subchannel_batch_data.batch.payload points to this.
grpc_transport_stream_op_batch_payload batch_payload;
// For send_initial_metadata.
// Note that we need to make a copy of the initial metadata for each
// subchannel call instead of just referring to the copy in call_data,
Expand All @@ -818,15 +827,6 @@ typedef struct {
grpc_metadata_batch recv_trailing_metadata;
grpc_transport_stream_stats collect_stats;
grpc_closure recv_trailing_metadata_ready;
// For intercepting on_complete.
grpc_closure on_complete;
} subchannel_batch_data;

// Retry state associated with a subchannel call.
// Stored in the parent_data of the subchannel call object.
typedef struct {
// subchannel_batch_data.batch.payload points to this.
grpc_transport_stream_op_batch_payload batch_payload;
// These fields indicate which ops have been started and completed on
// this subchannel call.
size_t started_send_message_count;
Expand Down Expand Up @@ -1524,17 +1524,21 @@ static subchannel_batch_data* batch_data_create(grpc_call_element* elem,

static void batch_data_unref(subchannel_batch_data* batch_data) {
if (gpr_unref(&batch_data->refs)) {
if (batch_data->send_initial_metadata_storage != nullptr) {
grpc_metadata_batch_destroy(&batch_data->send_initial_metadata);
subchannel_call_retry_state* retry_state =
static_cast<subchannel_call_retry_state*>(
grpc_connected_subchannel_call_get_parent_data(
batch_data->subchannel_call));
if (batch_data->batch.send_initial_metadata) {
grpc_metadata_batch_destroy(&retry_state->send_initial_metadata);
}
if (batch_data->send_trailing_metadata_storage != nullptr) {
grpc_metadata_batch_destroy(&batch_data->send_trailing_metadata);
if (batch_data->batch.send_trailing_metadata) {
grpc_metadata_batch_destroy(&retry_state->send_trailing_metadata);
}
if (batch_data->batch.recv_initial_metadata) {
grpc_metadata_batch_destroy(&batch_data->recv_initial_metadata);
grpc_metadata_batch_destroy(&retry_state->recv_initial_metadata);
}
if (batch_data->batch.recv_trailing_metadata) {
grpc_metadata_batch_destroy(&batch_data->recv_trailing_metadata);
grpc_metadata_batch_destroy(&retry_state->recv_trailing_metadata);
}
GRPC_SUBCHANNEL_CALL_UNREF(batch_data->subchannel_call, "batch_data_unref");
call_data* calld = static_cast<call_data*>(batch_data->elem->call_data);
Expand All @@ -1560,8 +1564,12 @@ static void invoke_recv_initial_metadata_callback(void* arg,
});
GPR_ASSERT(pending != nullptr);
// Return metadata.
subchannel_call_retry_state* retry_state =
static_cast<subchannel_call_retry_state*>(
grpc_connected_subchannel_call_get_parent_data(
batch_data->subchannel_call));
grpc_metadata_batch_move(
&batch_data->recv_initial_metadata,
&retry_state->recv_initial_metadata,
pending->batch->payload->recv_initial_metadata.recv_initial_metadata);
// Update bookkeeping.
// Note: Need to do this before invoking the callback, since invoking
Expand Down Expand Up @@ -1606,7 +1614,7 @@ static void recv_initial_metadata_ready(void* arg, grpc_error* error) {
// the recv_trailing_metadata_ready callback, then defer propagating this
// callback back to the surface. We can evaluate whether to retry when
// recv_trailing_metadata comes back.
if (GPR_UNLIKELY((batch_data->trailing_metadata_available ||
if (GPR_UNLIKELY((retry_state->trailing_metadata_available ||
error != GRPC_ERROR_NONE) &&
!retry_state->completed_recv_trailing_metadata)) {
if (grpc_client_channel_trace.enabled()) {
Expand Down Expand Up @@ -1651,8 +1659,12 @@ static void invoke_recv_message_callback(void* arg, grpc_error* error) {
});
GPR_ASSERT(pending != nullptr);
// Return payload.
subchannel_call_retry_state* retry_state =
static_cast<subchannel_call_retry_state*>(
grpc_connected_subchannel_call_get_parent_data(
batch_data->subchannel_call));
*pending->batch->payload->recv_message.recv_message =
std::move(batch_data->recv_message);
std::move(retry_state->recv_message);
// Update bookkeeping.
// Note: Need to do this before invoking the callback, since invoking
// the callback will result in yielding the call combiner.
Expand Down Expand Up @@ -1693,7 +1705,7 @@ static void recv_message_ready(void* arg, grpc_error* error) {
// callback back to the surface. We can evaluate whether to retry when
// recv_trailing_metadata comes back.
if (GPR_UNLIKELY(
(batch_data->recv_message == nullptr || error != GRPC_ERROR_NONE) &&
(retry_state->recv_message == nullptr || error != GRPC_ERROR_NONE) &&
!retry_state->completed_recv_trailing_metadata)) {
if (grpc_client_channel_trace.enabled()) {
gpr_log(GPR_INFO,
Expand Down Expand Up @@ -1766,8 +1778,12 @@ static void add_closure_for_recv_trailing_metadata_ready(
return;
}
// Return metadata.
subchannel_call_retry_state* retry_state =
static_cast<subchannel_call_retry_state*>(
grpc_connected_subchannel_call_get_parent_data(
batch_data->subchannel_call));
grpc_metadata_batch_move(
&batch_data->recv_trailing_metadata,
&retry_state->recv_trailing_metadata,
pending->batch->payload->recv_trailing_metadata.recv_trailing_metadata);
// Add closure.
closures->Add(pending->batch->payload->recv_trailing_metadata
Expand All @@ -1788,23 +1804,23 @@ static void add_closures_for_deferred_recv_callbacks(
// Add closure for deferred recv_initial_metadata_ready.
if (GPR_UNLIKELY(retry_state->recv_initial_metadata_ready_deferred_batch !=
nullptr)) {
GRPC_CLOSURE_INIT(&batch_data->recv_initial_metadata_ready,
GRPC_CLOSURE_INIT(&retry_state->recv_initial_metadata_ready,
invoke_recv_initial_metadata_callback,
retry_state->recv_initial_metadata_ready_deferred_batch,
grpc_schedule_on_exec_ctx);
closures->Add(&batch_data->recv_initial_metadata_ready,
closures->Add(&retry_state->recv_initial_metadata_ready,
retry_state->recv_initial_metadata_error,
"resuming recv_initial_metadata_ready");
retry_state->recv_initial_metadata_ready_deferred_batch = nullptr;
}
// Add closure for deferred recv_message_ready.
if (GPR_UNLIKELY(retry_state->recv_message_ready_deferred_batch !=
nullptr)) {
GRPC_CLOSURE_INIT(&batch_data->recv_message_ready,
GRPC_CLOSURE_INIT(&retry_state->recv_message_ready,
invoke_recv_message_callback,
retry_state->recv_message_ready_deferred_batch,
grpc_schedule_on_exec_ctx);
closures->Add(&batch_data->recv_message_ready,
closures->Add(&retry_state->recv_message_ready,
retry_state->recv_message_error,
"resuming recv_message_ready");
retry_state->recv_message_ready_deferred_batch = nullptr;
Expand Down Expand Up @@ -2120,28 +2136,28 @@ static void add_retriable_send_initial_metadata_op(
//
// If we've already completed one or more attempts, add the
// grpc-retry-attempts header.
batch_data->send_initial_metadata_storage =
retry_state->send_initial_metadata_storage =
static_cast<grpc_linked_mdelem*>(gpr_arena_alloc(
calld->arena, sizeof(grpc_linked_mdelem) *
(calld->send_initial_metadata.list.count +
(calld->num_attempts_completed > 0))));
grpc_metadata_batch_copy(&calld->send_initial_metadata,
&batch_data->send_initial_metadata,
batch_data->send_initial_metadata_storage);
if (GPR_UNLIKELY(batch_data->send_initial_metadata.idx.named
&retry_state->send_initial_metadata,
retry_state->send_initial_metadata_storage);
if (GPR_UNLIKELY(retry_state->send_initial_metadata.idx.named
.grpc_previous_rpc_attempts != nullptr)) {
grpc_metadata_batch_remove(
&batch_data->send_initial_metadata,
batch_data->send_initial_metadata.idx.named.grpc_previous_rpc_attempts);
grpc_metadata_batch_remove(&retry_state->send_initial_metadata,
retry_state->send_initial_metadata.idx.named
.grpc_previous_rpc_attempts);
}
if (GPR_UNLIKELY(calld->num_attempts_completed > 0)) {
grpc_mdelem retry_md = grpc_mdelem_from_slices(
GRPC_MDSTR_GRPC_PREVIOUS_RPC_ATTEMPTS,
*retry_count_strings[calld->num_attempts_completed - 1]);
grpc_error* error = grpc_metadata_batch_add_tail(
&batch_data->send_initial_metadata,
&batch_data->send_initial_metadata_storage[calld->send_initial_metadata
.list.count],
&retry_state->send_initial_metadata,
&retry_state->send_initial_metadata_storage[calld->send_initial_metadata
.list.count],
retry_md);
if (GPR_UNLIKELY(error != GRPC_ERROR_NONE)) {
gpr_log(GPR_ERROR, "error adding retry metadata: %s",
Expand All @@ -2152,7 +2168,7 @@ static void add_retriable_send_initial_metadata_op(
retry_state->started_send_initial_metadata = true;
batch_data->batch.send_initial_metadata = true;
batch_data->batch.payload->send_initial_metadata.send_initial_metadata =
&batch_data->send_initial_metadata;
&retry_state->send_initial_metadata;
batch_data->batch.payload->send_initial_metadata.send_initial_metadata_flags =
calld->send_initial_metadata_flags;
batch_data->batch.payload->send_initial_metadata.peer_string =
Expand All @@ -2173,10 +2189,10 @@ static void add_retriable_send_message_op(
grpc_core::ByteStreamCache* cache =
(*calld->send_messages)[retry_state->started_send_message_count];
++retry_state->started_send_message_count;
batch_data->send_message.Init(cache);
retry_state->send_message.Init(cache);
batch_data->batch.send_message = true;
batch_data->batch.payload->send_message.send_message.reset(
batch_data->send_message.get());
retry_state->send_message.get());
}

// Adds retriable send_trailing_metadata op to batch_data.
Expand All @@ -2186,17 +2202,17 @@ static void add_retriable_send_trailing_metadata_op(
// We need to make a copy of the metadata batch for each attempt, since
// the filters in the subchannel stack may modify this batch, and we don't
// want those modifications to be passed forward to subsequent attempts.
batch_data->send_trailing_metadata_storage =
retry_state->send_trailing_metadata_storage =
static_cast<grpc_linked_mdelem*>(gpr_arena_alloc(
calld->arena, sizeof(grpc_linked_mdelem) *
calld->send_trailing_metadata.list.count));
grpc_metadata_batch_copy(&calld->send_trailing_metadata,
&batch_data->send_trailing_metadata,
batch_data->send_trailing_metadata_storage);
&retry_state->send_trailing_metadata,
retry_state->send_trailing_metadata_storage);
retry_state->started_send_trailing_metadata = true;
batch_data->batch.send_trailing_metadata = true;
batch_data->batch.payload->send_trailing_metadata.send_trailing_metadata =
&batch_data->send_trailing_metadata;
&retry_state->send_trailing_metadata;
}

// Adds retriable recv_initial_metadata op to batch_data.
Expand All @@ -2205,16 +2221,16 @@ static void add_retriable_recv_initial_metadata_op(
subchannel_batch_data* batch_data) {
retry_state->started_recv_initial_metadata = true;
batch_data->batch.recv_initial_metadata = true;
grpc_metadata_batch_init(&batch_data->recv_initial_metadata);
grpc_metadata_batch_init(&retry_state->recv_initial_metadata);
batch_data->batch.payload->recv_initial_metadata.recv_initial_metadata =
&batch_data->recv_initial_metadata;
&retry_state->recv_initial_metadata;
batch_data->batch.payload->recv_initial_metadata.trailing_metadata_available =
&batch_data->trailing_metadata_available;
GRPC_CLOSURE_INIT(&batch_data->recv_initial_metadata_ready,
&retry_state->trailing_metadata_available;
GRPC_CLOSURE_INIT(&retry_state->recv_initial_metadata_ready,
recv_initial_metadata_ready, batch_data,
grpc_schedule_on_exec_ctx);
batch_data->batch.payload->recv_initial_metadata.recv_initial_metadata_ready =
&batch_data->recv_initial_metadata_ready;
&retry_state->recv_initial_metadata_ready;
}

// Adds retriable recv_message op to batch_data.
Expand All @@ -2224,11 +2240,11 @@ static void add_retriable_recv_message_op(
++retry_state->started_recv_message_count;
batch_data->batch.recv_message = true;
batch_data->batch.payload->recv_message.recv_message =
&batch_data->recv_message;
GRPC_CLOSURE_INIT(&batch_data->recv_message_ready, recv_message_ready,
&retry_state->recv_message;
GRPC_CLOSURE_INIT(&retry_state->recv_message_ready, recv_message_ready,
batch_data, grpc_schedule_on_exec_ctx);
batch_data->batch.payload->recv_message.recv_message_ready =
&batch_data->recv_message_ready;
&retry_state->recv_message_ready;
}

// Adds retriable recv_trailing_metadata op to batch_data.
Expand All @@ -2237,16 +2253,17 @@ static void add_retriable_recv_trailing_metadata_op(
subchannel_batch_data* batch_data) {
retry_state->started_recv_trailing_metadata = true;
batch_data->batch.recv_trailing_metadata = true;
grpc_metadata_batch_init(&batch_data->recv_trailing_metadata);
grpc_metadata_batch_init(&retry_state->recv_trailing_metadata);
batch_data->batch.payload->recv_trailing_metadata.recv_trailing_metadata =
&batch_data->recv_trailing_metadata;
&retry_state->recv_trailing_metadata;
batch_data->batch.payload->recv_trailing_metadata.collect_stats =
&batch_data->collect_stats;
GRPC_CLOSURE_INIT(&batch_data->recv_trailing_metadata_ready,
&retry_state->collect_stats;
GRPC_CLOSURE_INIT(&retry_state->recv_trailing_metadata_ready,
recv_trailing_metadata_ready, batch_data,
grpc_schedule_on_exec_ctx);
batch_data->batch.payload->recv_trailing_metadata
.recv_trailing_metadata_ready = &batch_data->recv_trailing_metadata_ready;
.recv_trailing_metadata_ready =
&retry_state->recv_trailing_metadata_ready;
}

// Helper function used to start a recv_trailing_metadata batch. This
Expand Down Expand Up @@ -2400,11 +2417,9 @@ static void add_subchannel_batches_for_pending_batches(
// started subchannel batch, since we'll propagate the
// completion when it completes.
if (retry_state->completed_recv_trailing_metadata) {
subchannel_batch_data* batch_data =
retry_state->recv_trailing_metadata_internal_batch;
// Batches containing recv_trailing_metadata always succeed.
closures->Add(
&batch_data->recv_trailing_metadata_ready, GRPC_ERROR_NONE,
&retry_state->recv_trailing_metadata_ready, GRPC_ERROR_NONE,
"re-executing recv_trailing_metadata_ready to propagate "
"internally triggered result");
} else {
Expand Down

0 comments on commit 5bacf2e

Please sign in to comment.