Skip to content

Commit

Permalink
HTTP1: Refactor HTTP1 Active Request to be defer deletable. (#19062)
Browse files Browse the repository at this point in the history
Signed-off-by: Kevin Baichoo <kbaichoo@google.com>
  • Loading branch information
KBaichoo authored Nov 19, 2021
1 parent eda5458 commit 7dec99d
Show file tree
Hide file tree
Showing 5 changed files with 62 additions and 55 deletions.
90 changes: 43 additions & 47 deletions source/common/http/http1/codec_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -996,18 +996,17 @@ ServerConnectionImpl::ServerConnectionImpl(

uint32_t ServerConnectionImpl::getHeadersSize() {
// Add in the size of the request URL if processing request headers.
const uint32_t url_size = (!processing_trailers_ && active_request_.has_value())
? active_request_.value().request_url_.size()
: 0;
const uint32_t url_size =
(!processing_trailers_ && active_request_) ? active_request_->request_url_.size() : 0;
return url_size + ConnectionImpl::getHeadersSize();
}

void ServerConnectionImpl::onEncodeComplete() {
if (active_request_.value().remote_complete_) {
if (active_request_->remote_complete_) {
// Only do this if remote is complete. If we are replying before the request is complete the
// only logical thing to do is for higher level code to reset() / close the connection so we
// leave the request around so that it can fire reset callbacks.
active_request_.reset();
connection_.dispatcher().deferredDelete(std::move(active_request_));
}
}

Expand All @@ -1018,12 +1017,11 @@ Status ServerConnectionImpl::handlePath(RequestHeaderMap& headers, absl::string_
bool is_connect = (method == header_values.MethodValues.Connect);

// The url is relative or a wildcard when the method is OPTIONS. Nothing to do here.
auto& active_request = active_request_.value();
if (!is_connect && !active_request.request_url_.getStringView().empty() &&
(active_request.request_url_.getStringView()[0] == '/' ||
if (!is_connect && !active_request_->request_url_.getStringView().empty() &&
(active_request_->request_url_.getStringView()[0] == '/' ||
(method == header_values.MethodValues.Options &&
active_request.request_url_.getStringView()[0] == '*'))) {
headers.addViaMove(std::move(path), std::move(active_request.request_url_));
active_request_->request_url_.getStringView()[0] == '*'))) {
headers.addViaMove(std::move(path), std::move(active_request_->request_url_));
return okStatus();
}

Expand All @@ -1032,12 +1030,12 @@ Status ServerConnectionImpl::handlePath(RequestHeaderMap& headers, absl::string_
// CONNECT "urls" are actually host:port so look like absolute URLs to the above checks.
// Absolute URLS in CONNECT requests will be rejected below by the URL class validation.
if (!codec_settings_.allow_absolute_url_ && !is_connect) {
headers.addViaMove(std::move(path), std::move(active_request.request_url_));
headers.addViaMove(std::move(path), std::move(active_request_->request_url_));
return okStatus();
}

Utility::Url absolute_url;
if (!absolute_url.initialize(active_request.request_url_.getStringView(), is_connect)) {
if (!absolute_url.initialize(active_request_->request_url_.getStringView(), is_connect)) {
RETURN_IF_ERROR(sendProtocolError(Http1ResponseCodeDetails::get().InvalidUrl));
return codecProtocolError("http/1.1 protocol error: invalid url in request line");
}
Expand Down Expand Up @@ -1068,16 +1066,15 @@ Status ServerConnectionImpl::handlePath(RequestHeaderMap& headers, absl::string_
if (!absolute_url.pathAndQueryParams().empty()) {
headers.setPath(absolute_url.pathAndQueryParams());
}
active_request.request_url_.clear();
active_request_->request_url_.clear();
return okStatus();
}

Envoy::StatusOr<ParserStatus> ServerConnectionImpl::onHeadersCompleteBase() {
// Handle the case where response happens prior to request complete. It's up to upper layer code
// to disconnect the connection but we shouldn't fire any more events since it doesn't make
// sense.
if (active_request_.has_value()) {
auto& active_request = active_request_.value();
if (active_request_) {
auto& headers = absl::get<RequestHeaderMapPtr>(headers_or_trailers_);
ENVOY_CONN_LOG(trace, "Server: onHeadersComplete size={}", connection_, headers->size());

Expand All @@ -1097,13 +1094,13 @@ Envoy::StatusOr<ParserStatus> ServerConnectionImpl::onHeadersCompleteBase() {
// Inform the response encoder about any HEAD method, so it can set content
// length and transfer encoding headers correctly.
const Http::HeaderValues& header_values = Http::Headers::get();
active_request.response_encoder_.setIsResponseToHeadRequest(parser_->methodName() ==
header_values.MethodValues.Head);
active_request.response_encoder_.setIsResponseToConnectRequest(
active_request_->response_encoder_.setIsResponseToHeadRequest(parser_->methodName() ==
header_values.MethodValues.Head);
active_request_->response_encoder_.setIsResponseToConnectRequest(
parser_->methodName() == header_values.MethodValues.Connect);

RETURN_IF_ERROR(handlePath(*headers, parser_->methodName()));
ASSERT(active_request.request_url_.empty());
ASSERT(active_request_->request_url_.empty());

headers->setMethod(parser_->methodName());

Expand All @@ -1124,7 +1121,7 @@ Envoy::StatusOr<ParserStatus> ServerConnectionImpl::onHeadersCompleteBase() {
if (parser_->isChunked() ||
(parser_->contentLength().has_value() && parser_->contentLength().value() > 0) ||
handling_upgrade_) {
active_request.request_decoder_->decodeHeaders(std::move(headers), false);
active_request_->request_decoder_->decodeHeaders(std::move(headers), false);

// If the connection has been closed (or is closing) after decoding headers, pause the parser
// so we return control to the caller.
Expand All @@ -1141,13 +1138,12 @@ Envoy::StatusOr<ParserStatus> ServerConnectionImpl::onHeadersCompleteBase() {

Status ServerConnectionImpl::onMessageBeginBase() {
if (!resetStreamCalled()) {
ASSERT(!active_request_.has_value());
active_request_.emplace(*this, std::move(bytes_meter_before_stream_));
auto& active_request = active_request_.value();
ASSERT(active_request_ == nullptr);
active_request_ = std::make_unique<ActiveRequest>(*this, std::move(bytes_meter_before_stream_));
if (resetStreamCalled()) {
return codecClientError("cannot create new streams after calling reset");
}
active_request.request_decoder_ = &callbacks_.newStream(active_request.response_encoder_);
active_request_->request_decoder_ = &callbacks_.newStream(active_request_->response_encoder_);

// Check for pipelined request flood as we prepare to accept a new request.
// Parse errors that happen prior to onMessageBegin result in stream termination, it is not
Expand All @@ -1158,8 +1154,8 @@ Status ServerConnectionImpl::onMessageBeginBase() {
}

Status ServerConnectionImpl::onUrl(const char* data, size_t length) {
if (active_request_.has_value()) {
active_request_.value().request_url_.append(data, length);
if (active_request_) {
active_request_->request_url_.append(data, length);

RETURN_IF_ERROR(checkMaxHeadersSize());
}
Expand All @@ -1169,31 +1165,31 @@ Status ServerConnectionImpl::onUrl(const char* data, size_t length) {

void ServerConnectionImpl::onBody(Buffer::Instance& data) {
ASSERT(!deferred_end_stream_headers_);
if (active_request_.has_value()) {
if (active_request_) {
ENVOY_CONN_LOG(trace, "body size={}", connection_, data.length());
active_request_.value().request_decoder_->decodeData(data, false);
active_request_->request_decoder_->decodeData(data, false);
}
}

ParserStatus ServerConnectionImpl::onMessageCompleteBase() {
ASSERT(!handling_upgrade_);
if (active_request_.has_value()) {
auto& active_request = active_request_.value();
if (active_request_) {

// The request_decoder should be non-null after we've called the newStream on callbacks.
ASSERT(active_request_->request_decoder_);
active_request_->response_encoder_.readDisable(true);
active_request_->remote_complete_ = true;

if (active_request.request_decoder_) {
active_request.response_encoder_.readDisable(true);
}
active_request.remote_complete_ = true;
if (deferred_end_stream_headers_) {
active_request.request_decoder_->decodeHeaders(
active_request_->request_decoder_->decodeHeaders(
std::move(absl::get<RequestHeaderMapPtr>(headers_or_trailers_)), true);
deferred_end_stream_headers_ = false;
} else if (processing_trailers_) {
active_request.request_decoder_->decodeTrailers(
active_request_->request_decoder_->decodeTrailers(
std::move(absl::get<RequestTrailerMapPtr>(headers_or_trailers_)));
} else {
Buffer::OwnedImpl buffer;
active_request.request_decoder_->decodeData(buffer, true);
active_request_->request_decoder_->decodeData(buffer, true);
}

// Reset to ensure no information from one requests persists to the next.
Expand All @@ -1207,33 +1203,33 @@ ParserStatus ServerConnectionImpl::onMessageCompleteBase() {
}

void ServerConnectionImpl::onResetStream(StreamResetReason reason) {
active_request_.value().response_encoder_.runResetCallbacks(reason);
active_request_.reset();
active_request_->response_encoder_.runResetCallbacks(reason);
connection_.dispatcher().deferredDelete(std::move(active_request_));
}

Status ServerConnectionImpl::sendProtocolError(absl::string_view details) {
// We do this here because we may get a protocol error before we have a logical stream.
if (!active_request_.has_value()) {
if (active_request_ == nullptr) {
RETURN_IF_ERROR(onMessageBegin());
}
ASSERT(active_request_.has_value());
ASSERT(active_request_);

active_request_.value().response_encoder_.setDetails(details);
if (!active_request_.value().response_encoder_.startedResponse()) {
active_request_->response_encoder_.setDetails(details);
if (!active_request_->response_encoder_.startedResponse()) {
active_request_->request_decoder_->sendLocalReply(
error_code_, CodeUtility::toString(error_code_), nullptr, absl::nullopt, details);
}
return okStatus();
}

void ServerConnectionImpl::onAboveHighWatermark() {
if (active_request_.has_value()) {
active_request_.value().response_encoder_.runHighWatermarkCallbacks();
if (active_request_) {
active_request_->response_encoder_.runHighWatermarkCallbacks();
}
}
void ServerConnectionImpl::onBelowLowWatermark() {
if (active_request_.has_value()) {
active_request_.value().response_encoder_.runLowWatermarkCallbacks();
if (active_request_) {
active_request_->response_encoder_.runLowWatermarkCallbacks();
}
}

Expand Down
9 changes: 5 additions & 4 deletions source/common/http/http1/codec_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -467,18 +467,19 @@ class ServerConnectionImpl : public ServerConnection, public ConnectionImpl {
/**
* An active HTTP/1.1 request.
*/
struct ActiveRequest {
struct ActiveRequest : public Event::DeferredDeletable {
ActiveRequest(ServerConnectionImpl& connection, StreamInfo::BytesMeterSharedPtr&& bytes_meter)
: response_encoder_(connection, std::move(bytes_meter),
connection.codec_settings_.stream_error_on_invalid_http_message_) {}
~ActiveRequest() override = default;

void dumpState(std::ostream& os, int indent_level) const;
HeaderString request_url_;
RequestDecoder* request_decoder_{};
ResponseEncoderImpl response_encoder_;
bool remote_complete_{};
};
absl::optional<ActiveRequest>& activeRequest() { return active_request_; }
ActiveRequest* activeRequest() { return active_request_.get(); }
// ConnectionImpl
ParserStatus onMessageCompleteBase() override;
// Add the size of the request_url to the reported header size when processing request headers.
Expand All @@ -501,7 +502,7 @@ class ServerConnectionImpl : public ServerConnection, public ConnectionImpl {
// ConnectionImpl
void onEncodeComplete() override;
StreamInfo::BytesMeter& getBytesMeter() override {
if (active_request_.has_value()) {
if (active_request_) {
return *(active_request_->response_encoder_.getStream().bytesMeter());
}
if (bytes_meter_before_stream_ == nullptr) {
Expand Down Expand Up @@ -550,7 +551,7 @@ class ServerConnectionImpl : public ServerConnection, public ConnectionImpl {
Status checkHeaderNameForUnderscores() override;

ServerConnectionCallbacks& callbacks_;
absl::optional<ActiveRequest> active_request_;
std::unique_ptr<ActiveRequest> active_request_;
const Buffer::OwnedBufferFragmentImpl::Releasor response_buffer_releasor_;
uint32_t outbound_responses_{};
// This defaults to 2, which functionally disables pipelining. If any users
Expand Down
4 changes: 4 additions & 0 deletions test/common/http/codec_impl_fuzz_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -683,6 +683,10 @@ void codecFuzz(const test::common::http::CodecImplFuzzTestCase& input, HttpVersi
dynamic_cast<Http2::ClientConnectionImpl&>(*client).goAway();
dynamic_cast<Http2::ServerConnectionImpl&>(*server).goAway();
}

// Run deletion as would happen on the dispatchers to avoid inversion of
// lifetimes of dispatcher and connection.
server_connection.dispatcher_.to_delete_.clear();
}

} // namespace
Expand Down
6 changes: 6 additions & 0 deletions test/common/http/http1/codec_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,12 @@ class Http1ServerConnectionImplTest : public Http1CodecTestBase {
max_request_headers_count_, headers_with_underscores_action_);
}

~Http1ServerConnectionImplTest() override {
// Run deletion as would happen on the dispatchers to avoid inversion of
// lifetimes of dispatcher and connection.
connection_.dispatcher_.to_delete_.clear();
}

NiceMock<Network::MockConnection> connection_;
NiceMock<Http::MockServerConnectionCallbacks> callbacks_;
NiceMock<Http1Settings> codec_settings_;
Expand Down
8 changes: 4 additions & 4 deletions test/integration/fake_upstream.cc
Original file line number Diff line number Diff line change
Expand Up @@ -308,17 +308,17 @@ class TestHttp1ServerConnectionImpl : public Http::Http1::ServerConnectionImpl {
Http::Http1::ParserStatus onMessageCompleteBase() override {
auto rc = ServerConnectionImpl::onMessageCompleteBase();

if (activeRequest().has_value() && activeRequest().value().request_decoder_) {
if (activeRequest() && activeRequest()->request_decoder_) {
// Undo the read disable from the base class - we have many tests which
// waitForDisconnect after a full request has been read which will not
// receive the disconnect if reading is disabled.
activeRequest().value().response_encoder_.readDisable(false);
activeRequest()->response_encoder_.readDisable(false);
}
return rc;
}
~TestHttp1ServerConnectionImpl() override {
if (activeRequest().has_value()) {
activeRequest().value().response_encoder_.clearReadDisableCallsForTests();
if (activeRequest()) {
activeRequest()->response_encoder_.clearReadDisableCallsForTests();
}
}
};
Expand Down

0 comments on commit 7dec99d

Please sign in to comment.