Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HTTP1: Refactor HTTP1 Active Request to be defer deletable. #19062

Merged
merged 2 commits into from
Nov 19, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 43 additions & 47 deletions source/common/http/http1/codec_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -996,18 +996,17 @@ ServerConnectionImpl::ServerConnectionImpl(

uint32_t ServerConnectionImpl::getHeadersSize() {
// Add in the size of the request URL if processing request headers.
const uint32_t url_size = (!processing_trailers_ && active_request_.has_value())
? active_request_.value().request_url_.size()
: 0;
const uint32_t url_size =
(!processing_trailers_ && active_request_) ? active_request_->request_url_.size() : 0;
return url_size + ConnectionImpl::getHeadersSize();
}

void ServerConnectionImpl::onEncodeComplete() {
if (active_request_.value().remote_complete_) {
if (active_request_->remote_complete_) {
// Only do this if remote is complete. If we are replying before the request is complete the
// only logical thing to do is for higher level code to reset() / close the connection so we
// leave the request around so that it can fire reset callbacks.
active_request_.reset();
connection_.dispatcher().deferredDelete(std::move(active_request_));
}
}

Expand All @@ -1018,12 +1017,11 @@ Status ServerConnectionImpl::handlePath(RequestHeaderMap& headers, absl::string_
bool is_connect = (method == header_values.MethodValues.Connect);

// The url is relative or a wildcard when the method is OPTIONS. Nothing to do here.
auto& active_request = active_request_.value();
if (!is_connect && !active_request.request_url_.getStringView().empty() &&
(active_request.request_url_.getStringView()[0] == '/' ||
if (!is_connect && !active_request_->request_url_.getStringView().empty() &&
(active_request_->request_url_.getStringView()[0] == '/' ||
(method == header_values.MethodValues.Options &&
active_request.request_url_.getStringView()[0] == '*'))) {
headers.addViaMove(std::move(path), std::move(active_request.request_url_));
active_request_->request_url_.getStringView()[0] == '*'))) {
headers.addViaMove(std::move(path), std::move(active_request_->request_url_));
return okStatus();
}

Expand All @@ -1032,12 +1030,12 @@ Status ServerConnectionImpl::handlePath(RequestHeaderMap& headers, absl::string_
// CONNECT "urls" are actually host:port so look like absolute URLs to the above checks.
// Absolute URLS in CONNECT requests will be rejected below by the URL class validation.
if (!codec_settings_.allow_absolute_url_ && !is_connect) {
headers.addViaMove(std::move(path), std::move(active_request.request_url_));
headers.addViaMove(std::move(path), std::move(active_request_->request_url_));
return okStatus();
}

Utility::Url absolute_url;
if (!absolute_url.initialize(active_request.request_url_.getStringView(), is_connect)) {
if (!absolute_url.initialize(active_request_->request_url_.getStringView(), is_connect)) {
RETURN_IF_ERROR(sendProtocolError(Http1ResponseCodeDetails::get().InvalidUrl));
return codecProtocolError("http/1.1 protocol error: invalid url in request line");
}
Expand Down Expand Up @@ -1068,16 +1066,15 @@ Status ServerConnectionImpl::handlePath(RequestHeaderMap& headers, absl::string_
if (!absolute_url.pathAndQueryParams().empty()) {
headers.setPath(absolute_url.pathAndQueryParams());
}
active_request.request_url_.clear();
active_request_->request_url_.clear();
return okStatus();
}

Envoy::StatusOr<ParserStatus> ServerConnectionImpl::onHeadersCompleteBase() {
// Handle the case where response happens prior to request complete. It's up to upper layer code
// to disconnect the connection but we shouldn't fire any more events since it doesn't make
// sense.
if (active_request_.has_value()) {
auto& active_request = active_request_.value();
if (active_request_) {
auto& headers = absl::get<RequestHeaderMapPtr>(headers_or_trailers_);
ENVOY_CONN_LOG(trace, "Server: onHeadersComplete size={}", connection_, headers->size());

Expand All @@ -1097,13 +1094,13 @@ Envoy::StatusOr<ParserStatus> ServerConnectionImpl::onHeadersCompleteBase() {
// Inform the response encoder about any HEAD method, so it can set content
// length and transfer encoding headers correctly.
const Http::HeaderValues& header_values = Http::Headers::get();
active_request.response_encoder_.setIsResponseToHeadRequest(parser_->methodName() ==
header_values.MethodValues.Head);
active_request.response_encoder_.setIsResponseToConnectRequest(
active_request_->response_encoder_.setIsResponseToHeadRequest(parser_->methodName() ==
header_values.MethodValues.Head);
active_request_->response_encoder_.setIsResponseToConnectRequest(
parser_->methodName() == header_values.MethodValues.Connect);

RETURN_IF_ERROR(handlePath(*headers, parser_->methodName()));
ASSERT(active_request.request_url_.empty());
ASSERT(active_request_->request_url_.empty());

headers->setMethod(parser_->methodName());

Expand All @@ -1124,7 +1121,7 @@ Envoy::StatusOr<ParserStatus> ServerConnectionImpl::onHeadersCompleteBase() {
if (parser_->isChunked() ||
(parser_->contentLength().has_value() && parser_->contentLength().value() > 0) ||
handling_upgrade_) {
active_request.request_decoder_->decodeHeaders(std::move(headers), false);
active_request_->request_decoder_->decodeHeaders(std::move(headers), false);

// If the connection has been closed (or is closing) after decoding headers, pause the parser
// so we return control to the caller.
Expand All @@ -1141,13 +1138,12 @@ Envoy::StatusOr<ParserStatus> ServerConnectionImpl::onHeadersCompleteBase() {

Status ServerConnectionImpl::onMessageBeginBase() {
if (!resetStreamCalled()) {
ASSERT(!active_request_.has_value());
active_request_.emplace(*this, std::move(bytes_meter_before_stream_));
auto& active_request = active_request_.value();
ASSERT(active_request_ == nullptr);
active_request_ = std::make_unique<ActiveRequest>(*this, std::move(bytes_meter_before_stream_));
if (resetStreamCalled()) {
return codecClientError("cannot create new streams after calling reset");
}
active_request.request_decoder_ = &callbacks_.newStream(active_request.response_encoder_);
active_request_->request_decoder_ = &callbacks_.newStream(active_request_->response_encoder_);

// Check for pipelined request flood as we prepare to accept a new request.
// Parse errors that happen prior to onMessageBegin result in stream termination, it is not
Expand All @@ -1158,8 +1154,8 @@ Status ServerConnectionImpl::onMessageBeginBase() {
}

Status ServerConnectionImpl::onUrl(const char* data, size_t length) {
if (active_request_.has_value()) {
active_request_.value().request_url_.append(data, length);
if (active_request_) {
active_request_->request_url_.append(data, length);

RETURN_IF_ERROR(checkMaxHeadersSize());
}
Expand All @@ -1169,31 +1165,31 @@ Status ServerConnectionImpl::onUrl(const char* data, size_t length) {

void ServerConnectionImpl::onBody(Buffer::Instance& data) {
ASSERT(!deferred_end_stream_headers_);
if (active_request_.has_value()) {
if (active_request_) {
ENVOY_CONN_LOG(trace, "body size={}", connection_, data.length());
active_request_.value().request_decoder_->decodeData(data, false);
active_request_->request_decoder_->decodeData(data, false);
}
}

ParserStatus ServerConnectionImpl::onMessageCompleteBase() {
ASSERT(!handling_upgrade_);
if (active_request_.has_value()) {
auto& active_request = active_request_.value();
if (active_request_) {

// The request_decoder should be non-null after we've called the newStream on callbacks.
ASSERT(active_request_->request_decoder_);
active_request_->response_encoder_.readDisable(true);
active_request_->remote_complete_ = true;

if (active_request.request_decoder_) {
active_request.response_encoder_.readDisable(true);
}
active_request.remote_complete_ = true;
if (deferred_end_stream_headers_) {
active_request.request_decoder_->decodeHeaders(
active_request_->request_decoder_->decodeHeaders(
std::move(absl::get<RequestHeaderMapPtr>(headers_or_trailers_)), true);
deferred_end_stream_headers_ = false;
} else if (processing_trailers_) {
active_request.request_decoder_->decodeTrailers(
active_request_->request_decoder_->decodeTrailers(
std::move(absl::get<RequestTrailerMapPtr>(headers_or_trailers_)));
} else {
Buffer::OwnedImpl buffer;
active_request.request_decoder_->decodeData(buffer, true);
active_request_->request_decoder_->decodeData(buffer, true);
}

// Reset to ensure no information from one requests persists to the next.
Expand All @@ -1207,33 +1203,33 @@ ParserStatus ServerConnectionImpl::onMessageCompleteBase() {
}

void ServerConnectionImpl::onResetStream(StreamResetReason reason) {
active_request_.value().response_encoder_.runResetCallbacks(reason);
active_request_.reset();
active_request_->response_encoder_.runResetCallbacks(reason);
connection_.dispatcher().deferredDelete(std::move(active_request_));
}

Status ServerConnectionImpl::sendProtocolError(absl::string_view details) {
// We do this here because we may get a protocol error before we have a logical stream.
if (!active_request_.has_value()) {
if (active_request_ == nullptr) {
RETURN_IF_ERROR(onMessageBegin());
}
ASSERT(active_request_.has_value());
ASSERT(active_request_);

active_request_.value().response_encoder_.setDetails(details);
if (!active_request_.value().response_encoder_.startedResponse()) {
active_request_->response_encoder_.setDetails(details);
if (!active_request_->response_encoder_.startedResponse()) {
active_request_->request_decoder_->sendLocalReply(
error_code_, CodeUtility::toString(error_code_), nullptr, absl::nullopt, details);
}
return okStatus();
}

void ServerConnectionImpl::onAboveHighWatermark() {
if (active_request_.has_value()) {
active_request_.value().response_encoder_.runHighWatermarkCallbacks();
if (active_request_) {
active_request_->response_encoder_.runHighWatermarkCallbacks();
}
}
void ServerConnectionImpl::onBelowLowWatermark() {
if (active_request_.has_value()) {
active_request_.value().response_encoder_.runLowWatermarkCallbacks();
if (active_request_) {
active_request_->response_encoder_.runLowWatermarkCallbacks();
}
}

Expand Down
9 changes: 5 additions & 4 deletions source/common/http/http1/codec_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -467,18 +467,19 @@ class ServerConnectionImpl : public ServerConnection, public ConnectionImpl {
/**
* An active HTTP/1.1 request.
*/
struct ActiveRequest {
struct ActiveRequest : public Event::DeferredDeletable {
ActiveRequest(ServerConnectionImpl& connection, StreamInfo::BytesMeterSharedPtr&& bytes_meter)
: response_encoder_(connection, std::move(bytes_meter),
connection.codec_settings_.stream_error_on_invalid_http_message_) {}
~ActiveRequest() override = default;

void dumpState(std::ostream& os, int indent_level) const;
HeaderString request_url_;
RequestDecoder* request_decoder_{};
ResponseEncoderImpl response_encoder_;
bool remote_complete_{};
};
absl::optional<ActiveRequest>& activeRequest() { return active_request_; }
ActiveRequest* activeRequest() { return active_request_.get(); }
// ConnectionImpl
ParserStatus onMessageCompleteBase() override;
// Add the size of the request_url to the reported header size when processing request headers.
Expand All @@ -501,7 +502,7 @@ class ServerConnectionImpl : public ServerConnection, public ConnectionImpl {
// ConnectionImpl
void onEncodeComplete() override;
StreamInfo::BytesMeter& getBytesMeter() override {
if (active_request_.has_value()) {
if (active_request_) {
return *(active_request_->response_encoder_.getStream().bytesMeter());
}
if (bytes_meter_before_stream_ == nullptr) {
Expand Down Expand Up @@ -550,7 +551,7 @@ class ServerConnectionImpl : public ServerConnection, public ConnectionImpl {
Status checkHeaderNameForUnderscores() override;

ServerConnectionCallbacks& callbacks_;
absl::optional<ActiveRequest> active_request_;
std::unique_ptr<ActiveRequest> active_request_;
const Buffer::OwnedBufferFragmentImpl::Releasor response_buffer_releasor_;
uint32_t outbound_responses_{};
// This defaults to 2, which functionally disables pipelining. If any users
Expand Down
4 changes: 4 additions & 0 deletions test/common/http/codec_impl_fuzz_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -683,6 +683,10 @@ void codecFuzz(const test::common::http::CodecImplFuzzTestCase& input, HttpVersi
dynamic_cast<Http2::ClientConnectionImpl&>(*client).goAway();
dynamic_cast<Http2::ServerConnectionImpl&>(*server).goAway();
}

// Run deletion as would happen on the dispatchers to avoid inversion of
// lifetimes of dispatcher and connection.
server_connection.dispatcher_.to_delete_.clear();
}

} // namespace
Expand Down
6 changes: 6 additions & 0 deletions test/common/http/http1/codec_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,12 @@ class Http1ServerConnectionImplTest : public Http1CodecTestBase {
max_request_headers_count_, headers_with_underscores_action_);
}

~Http1ServerConnectionImplTest() override {
// Run deletion as would happen on the dispatchers to avoid inversion of
// lifetimes of dispatcher and connection.
connection_.dispatcher_.to_delete_.clear();
}

NiceMock<Network::MockConnection> connection_;
NiceMock<Http::MockServerConnectionCallbacks> callbacks_;
NiceMock<Http1Settings> codec_settings_;
Expand Down
8 changes: 4 additions & 4 deletions test/integration/fake_upstream.cc
Original file line number Diff line number Diff line change
Expand Up @@ -308,17 +308,17 @@ class TestHttp1ServerConnectionImpl : public Http::Http1::ServerConnectionImpl {
Http::Http1::ParserStatus onMessageCompleteBase() override {
auto rc = ServerConnectionImpl::onMessageCompleteBase();

if (activeRequest().has_value() && activeRequest().value().request_decoder_) {
if (activeRequest() && activeRequest()->request_decoder_) {
// Undo the read disable from the base class - we have many tests which
// waitForDisconnect after a full request has been read which will not
// receive the disconnect if reading is disabled.
activeRequest().value().response_encoder_.readDisable(false);
activeRequest()->response_encoder_.readDisable(false);
}
return rc;
}
~TestHttp1ServerConnectionImpl() override {
if (activeRequest().has_value()) {
activeRequest().value().response_encoder_.clearReadDisableCallsForTests();
if (activeRequest()) {
activeRequest()->response_encoder_.clearReadDisableCallsForTests();
}
}
};
Expand Down