Skip to content

Commit

Permalink
Roll back filter changes
Browse files Browse the repository at this point in the history
  • Loading branch information
kpayson64 committed Jun 26, 2018
1 parent 2069bf5 commit a9e27ae
Show file tree
Hide file tree
Showing 3 changed files with 3 additions and 87 deletions.
14 changes: 1 addition & 13 deletions src/core/ext/filters/http/client/http_client_filter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ struct call_data {
grpc_linked_mdelem user_agent;
// State for handling recv_initial_metadata ops.
grpc_metadata_batch* recv_initial_metadata;
grpc_error* recv_initial_metadata_error;
grpc_closure* original_recv_initial_metadata_ready;
grpc_closure recv_initial_metadata_ready;
// State for handling recv_trailing_metadata ops.
Expand Down Expand Up @@ -148,7 +147,6 @@ static void recv_initial_metadata_ready(void* user_data, grpc_error* error) {
call_data* calld = static_cast<call_data*>(elem->call_data);
if (error == GRPC_ERROR_NONE) {
error = client_filter_incoming_metadata(elem, calld->recv_initial_metadata);
calld->recv_initial_metadata_error = GRPC_ERROR_REF(error);
} else {
GRPC_ERROR_REF(error);
}
Expand All @@ -164,13 +162,6 @@ static void recv_trailing_metadata_ready(void* user_data, grpc_error* error) {
} else {
GRPC_ERROR_REF(error);
}
if (calld->recv_initial_metadata_error != GRPC_ERROR_NONE) {
if (error == GRPC_ERROR_NONE) {
error = GRPC_ERROR_REF(calld->recv_initial_metadata_error);
} else if (error != calld->recv_initial_metadata_error) {
error = grpc_error_add_child(error, calld->recv_initial_metadata_error);
}
}
GRPC_CLOSURE_RUN(calld->original_recv_trailing_metadata_ready, error);
}

Expand Down Expand Up @@ -443,10 +434,7 @@ static grpc_error* init_call_elem(grpc_call_element* elem,
/* Destructor for call_data */
static void destroy_call_elem(grpc_call_element* elem,
const grpc_call_final_info* final_info,
grpc_closure* ignored) {
call_data* calld = static_cast<call_data*>(elem->call_data);
GRPC_ERROR_UNREF(calld->recv_initial_metadata_error);
}
grpc_closure* ignored) {}

static grpc_mdelem scheme_from_args(const grpc_channel_args* args) {
unsigned i;
Expand Down
31 changes: 0 additions & 31 deletions src/core/ext/filters/http/server/http_server_filter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ struct call_data {

// State for intercepting recv_initial_metadata.
grpc_closure recv_initial_metadata_ready;
grpc_error* recv_initial_metadata_ready_error;
grpc_closure* original_recv_initial_metadata_ready;
grpc_metadata_batch* recv_initial_metadata;
uint32_t* recv_initial_metadata_flags;
Expand All @@ -61,9 +60,6 @@ struct call_data {
grpc_closure recv_message_ready;
grpc_core::OrphanablePtr<grpc_core::ByteStream>* recv_message;
bool seen_recv_message_ready;

grpc_closure recv_trailing_metadata_ready;
grpc_closure* original_recv_trailing_metadata_ready;
};

} // namespace
Expand Down Expand Up @@ -271,7 +267,6 @@ static void hs_recv_initial_metadata_ready(void* user_data, grpc_error* err) {
calld->seen_recv_initial_metadata_ready = true;
if (err == GRPC_ERROR_NONE) {
err = hs_filter_incoming_metadata(elem, calld->recv_initial_metadata);
calld->recv_initial_metadata_ready_error = GRPC_ERROR_REF(err);
if (calld->seen_recv_message_ready) {
// We've already seen the recv_message callback, but we previously
// deferred it, so we need to return it here.
Expand Down Expand Up @@ -318,23 +313,6 @@ static void hs_recv_message_ready(void* user_data, grpc_error* err) {
}
}

static void hs_recv_trailing_metadata_ready(void* user_data, grpc_error* err) {
grpc_call_element* elem = static_cast<grpc_call_element*>(user_data);
call_data* calld = static_cast<call_data*>(elem->call_data);
if (calld->recv_initial_metadata_ready_error != GRPC_ERROR_NONE) {
if (err == GRPC_ERROR_NONE) {
err = GRPC_ERROR_REF(calld->recv_initial_metadata_ready_error);
} else if (err != calld->recv_initial_metadata_ready_error) {
err = grpc_error_add_child(err, calld->recv_initial_metadata_ready_error);
} else {
err = GRPC_ERROR_REF(err);
}
} else {
err = GRPC_ERROR_REF(err);
}
GRPC_CLOSURE_RUN(calld->original_recv_trailing_metadata_ready, err);
}

static grpc_error* hs_mutate_op(grpc_call_element* elem,
grpc_transport_stream_op_batch* op) {
/* grab pointers to our data from the call element */
Expand Down Expand Up @@ -379,11 +357,6 @@ static grpc_error* hs_mutate_op(grpc_call_element* elem,
op->payload->recv_message.recv_message_ready = &calld->recv_message_ready;
}

if (op->recv_trailing_metadata) {
calld->original_recv_trailing_metadata_ready =
op->payload->recv_trailing_metadata.recv_trailing_metadata_ready;
}

if (op->send_trailing_metadata) {
grpc_error* error = hs_filter_outgoing_metadata(
elem, op->payload->send_trailing_metadata.send_trailing_metadata);
Expand Down Expand Up @@ -416,9 +389,6 @@ static grpc_error* hs_init_call_elem(grpc_call_element* elem,
grpc_schedule_on_exec_ctx);
GRPC_CLOSURE_INIT(&calld->recv_message_ready, hs_recv_message_ready, elem,
grpc_schedule_on_exec_ctx);
GRPC_CLOSURE_INIT(&calld->recv_trailing_metadata_ready,
hs_recv_trailing_metadata_ready, elem,
grpc_schedule_on_exec_ctx);
return GRPC_ERROR_NONE;
}

Expand All @@ -427,7 +397,6 @@ static void hs_destroy_call_elem(grpc_call_element* elem,
const grpc_call_final_info* final_info,
grpc_closure* ignored) {
call_data* calld = static_cast<call_data*>(elem->call_data);
GRPC_ERROR_UNREF(calld->recv_initial_metadata_ready_error);
if (calld->have_read_stream) {
calld->read_stream->Orphan();
}
Expand Down
45 changes: 2 additions & 43 deletions src/core/ext/filters/message_size/message_size_filter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -99,15 +99,10 @@ struct call_data {
// recv_message_ready up-call on transport_stream_op, and remember to
// call our next_recv_message_ready member after handling it.
grpc_closure recv_message_ready;
grpc_closure recv_trailing_metadata_ready;
// The error caused by a message that is too large, or GRPC_ERROR_NONE
grpc_error* error;
// Used by recv_message_ready.
grpc_core::OrphanablePtr<grpc_core::ByteStream>* recv_message;
// Original recv_message_ready callback, invoked after our own.
grpc_closure* next_recv_message_ready;
// Original recv_trailing_metadata callback, invoked after our own.
grpc_closure* next_recv_trailing_metadata_ready;
};

struct channel_data {
Expand Down Expand Up @@ -135,13 +130,12 @@ static void recv_message_ready(void* user_data, grpc_error* error) {
grpc_error* new_error = grpc_error_set_int(
GRPC_ERROR_CREATE_FROM_COPIED_STRING(message_string),
GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_RESOURCE_EXHAUSTED);
GRPC_ERROR_UNREF(calld->error);
if (error == GRPC_ERROR_NONE) {
error = new_error;
} else {
error = grpc_error_add_child(error, new_error);
GRPC_ERROR_UNREF(new_error);
}
calld->error = GRPC_ERROR_REF(error);
gpr_free(message_string);
} else {
GRPC_ERROR_REF(error);
Expand All @@ -150,26 +144,6 @@ static void recv_message_ready(void* user_data, grpc_error* error) {
GRPC_CLOSURE_RUN(calld->next_recv_message_ready, error);
}

// Callback invoked on completion of recv_trailing_metadata
// Notifies the recv_trailing_metadata batch of any message size failures
static void recv_trailing_metadata_ready(void* user_data, grpc_error* error) {
grpc_call_element* elem = static_cast<grpc_call_element*>(user_data);
call_data* calld = static_cast<call_data*>(elem->call_data);
if (calld->error != GRPC_ERROR_NONE) {
if (error == GRPC_ERROR_NONE) {
error = GRPC_ERROR_REF(calld->error);
} else if (error != calld->error) {
error = grpc_error_add_child(error, GRPC_ERROR_REF(calld->error));
} else {
error = GRPC_ERROR_REF(error);
}
} else {
error = GRPC_ERROR_REF(error);
}
// Invoke the next callback.
GRPC_CLOSURE_RUN(calld->next_recv_trailing_metadata_ready, error);
}

// Start transport stream op.
static void start_transport_stream_op_batch(
grpc_call_element* elem, grpc_transport_stream_op_batch* op) {
Expand Down Expand Up @@ -198,13 +172,6 @@ static void start_transport_stream_op_batch(
calld->recv_message = op->payload->recv_message.recv_message;
op->payload->recv_message.recv_message_ready = &calld->recv_message_ready;
}
// Inject callback for receiving trailing metadata.
if (op->recv_trailing_metadata) {
calld->next_recv_trailing_metadata_ready =
op->payload->recv_trailing_metadata.recv_trailing_metadata_ready;
op->payload->recv_trailing_metadata.recv_trailing_metadata_ready =
&calld->recv_trailing_metadata_ready;
}
// Chain to the next filter.
grpc_call_next_op(elem, op);
}
Expand All @@ -216,13 +183,8 @@ static grpc_error* init_call_elem(grpc_call_element* elem,
call_data* calld = static_cast<call_data*>(elem->call_data);
calld->call_combiner = args->call_combiner;
calld->next_recv_message_ready = nullptr;
calld->next_recv_trailing_metadata_ready = nullptr;
calld->error = GRPC_ERROR_NONE;
GRPC_CLOSURE_INIT(&calld->recv_message_ready, recv_message_ready, elem,
grpc_schedule_on_exec_ctx);
GRPC_CLOSURE_INIT(&calld->recv_trailing_metadata_ready,
recv_trailing_metadata_ready, elem,
grpc_schedule_on_exec_ctx);
// Get max sizes from channel data, then merge in per-method config values.
// Note: Per-method config is only available on the client, so we
// apply the max request size to the send limit and the max response
Expand Down Expand Up @@ -251,10 +213,7 @@ static grpc_error* init_call_elem(grpc_call_element* elem,
// Destructor for call_data.
static void destroy_call_elem(grpc_call_element* elem,
const grpc_call_final_info* final_info,
grpc_closure* ignored) {
call_data* calld = (call_data*)elem->call_data;
GRPC_ERROR_UNREF(calld->error);
}
grpc_closure* ignored) {}

static int default_size(const grpc_channel_args* args,
int without_minimal_stack) {
Expand Down

0 comments on commit a9e27ae

Please sign in to comment.