Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Overload: Reset H2 server stream only use codec level reset mechanism #18895

Merged
merged 15 commits into from
Nov 29, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions source/common/http/conn_manager_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,13 @@ void ConnectionManagerImpl::doDeferredStreamDestroy(ActiveStream& stream) {

read_callbacks_->connection().dispatcher().deferredDelete(stream.removeFromList(streams_));

// The response_encoder should never be dangling (unless we're destroying a
// stream we are recreating) as the codec level stream will either outlive the
// ActiveStream, or be alive in deferred deletion queue at this point.
if (stream.response_encoder_) {
stream.response_encoder_->getStream().removeCallbacks(stream);
alyssawilk marked this conversation as resolved.
Show resolved Hide resolved
}

if (connection_idle_timer_ && streams_.empty()) {
connection_idle_timer_->enableTimer(config_.idleTimeout().value());
}
Expand Down
7 changes: 5 additions & 2 deletions source/common/http/http2/codec_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -639,8 +639,11 @@ void ConnectionImpl::StreamImpl::resetStream(StreamResetReason reason) {

// If we submit a reset, nghttp2 will cancel outbound frames that have not yet been sent.
// We want these frames to go out so we defer the reset until we send all of the frames that
// end the local stream.
if (useDeferredReset() && local_end_stream_ && !local_end_stream_sent_) {
// end the local stream. However, if we're resetting the stream due to
// overload, we should reset the stream as soon as possible to free used
// resources.
if (useDeferredReset() && local_end_stream_ && !local_end_stream_sent_ &&
reason != StreamResetReason::OverloadManager) {
mattklein123 marked this conversation as resolved.
Show resolved Hide resolved
ASSERT(parent_.getStream(stream_id_) != nullptr);
parent_.pending_deferred_reset_streams_.emplace(stream_id_, this);
deferred_reset_ = reason;
Expand Down
87 changes: 46 additions & 41 deletions test/common/http/conn_manager_impl_test_2.cc
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ TEST_F(HttpConnectionManagerImplTest, DownstreamProtocolError) {
return codecProtocolError("protocol error");
}));

EXPECT_CALL(response_encoder_.stream_, removeCallbacks(_));
EXPECT_CALL(response_encoder_.stream_, removeCallbacks(_)).Times(2);
EXPECT_CALL(filter_factory_, createFilterChain(_)).Times(0);

// A protocol exception should result in reset of the streams followed by a remote or local close
Expand Down Expand Up @@ -232,7 +232,7 @@ TEST_F(HttpConnectionManagerImplTest, FrameFloodError) {
return bufferFloodError("too many outbound frames");
}));

EXPECT_CALL(response_encoder_.stream_, removeCallbacks(_));
EXPECT_CALL(response_encoder_.stream_, removeCallbacks(_)).Times(2);
EXPECT_CALL(filter_factory_, createFilterChain(_)).Times(0);

// FrameFloodException should result in reset of the streams followed by abortive close.
Expand Down Expand Up @@ -1104,7 +1104,7 @@ TEST_F(HttpConnectionManagerImplTest, UpstreamWatermarkCallbacks) {
EXPECT_EQ(1U, stats_.named_.downstream_flow_control_resumed_reading_total_.value());

// Backup upstream once again.
EXPECT_CALL(response_encoder_, getStream()).WillOnce(ReturnRef(stream_));
EXPECT_CALL(response_encoder_, getStream()).WillRepeatedly(ReturnRef(stream_));
EXPECT_CALL(stream_, readDisable(true));
ASSERT(decoder_filters_[0]->callbacks_ != nullptr);
decoder_filters_[0]->callbacks_->onDecoderFilterAboveWriteBufferHighWatermark();
Expand Down Expand Up @@ -1334,7 +1334,7 @@ TEST_F(HttpConnectionManagerImplTest, HitFilterWatermarkLimits) {
}));

expectOnDestroy();
EXPECT_CALL(stream_, removeCallbacks(_));
EXPECT_CALL(stream_, removeCallbacks(_)).Times(2);
filter_callbacks_.connection_.raiseEvent(Network::ConnectionEvent::LocalClose);
}

Expand Down Expand Up @@ -1364,45 +1364,47 @@ TEST_F(HttpConnectionManagerImplTest, HitRequestBufferLimits) {

// Return 413 from an intermediate filter and make sure we don't continue the filter chain.
TEST_F(HttpConnectionManagerImplTest, HitRequestBufferLimitsIntermediateFilter) {
InSequence s;
initial_buffer_limit_ = 10;
setup(false, "");

EXPECT_CALL(*codec_, dispatch(_)).WillOnce(Invoke([&](Buffer::Instance&) -> Http::Status {
decoder_ = &conn_manager_->newStream(response_encoder_);
RequestHeaderMapPtr headers{
new TestRequestHeaderMapImpl{{":authority", "host"}, {":path", "/"}, {":method", "GET"}}};
decoder_->decodeHeaders(std::move(headers), false);
{
InSequence s;
initial_buffer_limit_ = 10;
setup(false, "");

Buffer::OwnedImpl fake_data("hello");
decoder_->decodeData(fake_data, false);
EXPECT_CALL(*codec_, dispatch(_)).WillOnce(Invoke([&](Buffer::Instance&) -> Http::Status {
decoder_ = &conn_manager_->newStream(response_encoder_);
RequestHeaderMapPtr headers{
new TestRequestHeaderMapImpl{{":authority", "host"}, {":path", "/"}, {":method", "GET"}}};
decoder_->decodeHeaders(std::move(headers), false);

Buffer::OwnedImpl fake_data2("world world");
decoder_->decodeData(fake_data2, true);
return Http::okStatus();
}));
Buffer::OwnedImpl fake_data("hello");
decoder_->decodeData(fake_data, false);

setUpBufferLimits();
setupFilterChain(2, 1);
Buffer::OwnedImpl fake_data2("world world");
decoder_->decodeData(fake_data2, true);
return Http::okStatus();
}));

EXPECT_CALL(*decoder_filters_[0], decodeHeaders(_, false))
.WillOnce(Return(FilterHeadersStatus::StopIteration));
EXPECT_CALL(*decoder_filters_[0], decodeData(_, false))
.WillOnce(Return(FilterDataStatus::StopIterationAndBuffer));
EXPECT_CALL(*decoder_filters_[0], decodeData(_, true))
.WillOnce(Return(FilterDataStatus::Continue));
EXPECT_CALL(*decoder_filters_[0], decodeComplete());
Http::TestResponseHeaderMapImpl response_headers{
{":status", "413"}, {"content-length", "17"}, {"content-type", "text/plain"}};
EXPECT_CALL(*encoder_filters_[0], encodeHeaders(HeaderMapEqualRef(&response_headers), false))
.WillOnce(Return(FilterHeadersStatus::StopIteration));
EXPECT_CALL(*encoder_filters_[0], encodeData(_, true))
.WillOnce(Return(FilterDataStatus::StopIterationAndWatermark));
EXPECT_CALL(*encoder_filters_[0], encodeComplete());
setUpBufferLimits();
setupFilterChain(2, 1);

// Kick off the incoming data.
Buffer::OwnedImpl fake_input("1234");
conn_manager_->onData(fake_input, false);
EXPECT_CALL(*decoder_filters_[0], decodeHeaders(_, false))
.WillOnce(Return(FilterHeadersStatus::StopIteration));
EXPECT_CALL(*decoder_filters_[0], decodeData(_, false))
.WillOnce(Return(FilterDataStatus::StopIterationAndBuffer));
EXPECT_CALL(*decoder_filters_[0], decodeData(_, true))
.WillOnce(Return(FilterDataStatus::Continue));
EXPECT_CALL(*decoder_filters_[0], decodeComplete());
Http::TestResponseHeaderMapImpl response_headers{
{":status", "413"}, {"content-length", "17"}, {"content-type", "text/plain"}};
EXPECT_CALL(*encoder_filters_[0], encodeHeaders(HeaderMapEqualRef(&response_headers), false))
.WillOnce(Return(FilterHeadersStatus::StopIteration));
EXPECT_CALL(*encoder_filters_[0], encodeData(_, true))
.WillOnce(Return(FilterDataStatus::StopIterationAndWatermark));
EXPECT_CALL(*encoder_filters_[0], encodeComplete());

// Kick off the incoming data.
Buffer::OwnedImpl fake_input("1234");
conn_manager_->onData(fake_input, false);
}

doRemoteClose(false);
}
Expand Down Expand Up @@ -1469,17 +1471,20 @@ TEST_F(HttpConnectionManagerImplTest, HitResponseBufferLimitsAfterHeaders) {
const std::string data = "A long enough string to go over watermarks";
Buffer::OwnedImpl fake_response(data);
InSequence s;
EXPECT_CALL(stream_, removeCallbacks(_));
expectOnDestroy(false);
EXPECT_CALL(*encoder_filters_[1], encodeData(_, false))
.WillOnce(Return(FilterDataStatus::StopIterationAndBuffer));
EXPECT_CALL(stream_, resetStream(_));
filter_callbacks_.connection_.raiseEvent(Network::ConnectionEvent::LocalClose);
EXPECT_LOG_CONTAINS(
"debug",
"Resetting stream due to response_payload_too_large. Prior headers have already been sent",
decoder_filters_[0]->callbacks_->encodeData(fake_response, false););
EXPECT_EQ(1U, stats_.named_.rs_too_large_.value());

// Clean up connection
EXPECT_CALL(stream_, removeCallbacks(_));
expectOnDestroy(false);
EXPECT_CALL(stream_, removeCallbacks(_));
filter_callbacks_.connection_.raiseEvent(Network::ConnectionEvent::LocalClose);
}

TEST_F(HttpConnectionManagerImplTest, FilterHeadReply) {
Expand Down
4 changes: 3 additions & 1 deletion test/common/http/conn_manager_impl_test_base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,9 @@ void HttpConnectionManagerImplTest::expectOnDestroy(bool deferred) {
}

void HttpConnectionManagerImplTest::doRemoteClose(bool deferred) {
EXPECT_CALL(stream_, removeCallbacks(_));
// We will call removeCallbacks twice.
// Once in resetAllStreams, and once in doDeferredStreamDestroy.
EXPECT_CALL(stream_, removeCallbacks(_)).Times(2);
expectOnDestroy(deferred);
filter_callbacks_.connection_.raiseEvent(Network::ConnectionEvent::RemoteClose);
}
Expand Down
87 changes: 87 additions & 0 deletions test/integration/buffer_accounting_integration_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
namespace Envoy {
namespace {

using testing::HasSubstr;

std::string protocolTestParamsAndBoolToString(
const ::testing::TestParamInfo<std::tuple<HttpProtocolTestParams, bool, bool>>& params) {
return fmt::format("{}_{}_{}",
Expand Down Expand Up @@ -617,4 +619,89 @@ TEST_P(Http2OverloadManagerIntegrationTest,
EXPECT_EQ(smallest_response->headers().getStatusValue(), "200");
}

TEST_P(Http2OverloadManagerIntegrationTest, CanResetStreamIfEnvoyLevelStreamEnded) {
useAccessLog("%RESPONSE_CODE%");
initializeOverloadManagerInBootstrap(
TestUtility::parseYaml<envoy::config::overload::v3::OverloadAction>(R"EOF(
name: "envoy.overload_actions.reset_high_memory_stream"
triggers:
- name: "envoy.resource_monitors.testonly.fake_resource_monitor"
scaled:
scaling_threshold: 0.90
saturation_threshold: 0.98
)EOF"));
initialize();

// Set 10MiB receive window for the client.
const int downstream_window_size = 10 * 1024 * 1024;
envoy::config::core::v3::Http2ProtocolOptions http2_options =
::Envoy::Http2::Utility::initializeAndValidateOptions(
envoy::config::core::v3::Http2ProtocolOptions());
http2_options.mutable_initial_stream_window_size()->set_value(downstream_window_size);
http2_options.mutable_initial_connection_window_size()->set_value(downstream_window_size);
codec_client_ = makeRawHttpConnection(makeClientConnection(lookupPort("http")), http2_options);

// Makes us have Envoy's writes to downstream return EAGAIN
writev_matcher_->setSourcePort(lookupPort("http"));
writev_matcher_->setWritevReturnsEgain();

// Send a request
auto encoder_decoder = codec_client_->startRequest(Http::TestRequestHeaderMapImpl{
{":method", "POST"},
{":path", "/"},
{":scheme", "http"},
{":authority", "host"},
{"content-length", "10"},
});
auto& encoder = encoder_decoder.first;
const std::string data(10, 'a');
codec_client_->sendData(encoder, data, true);
auto response = std::move(encoder_decoder.second);

waitForNextUpstreamRequest();
FakeStreamPtr upstream_request_for_response = std::move(upstream_request_);

// Send the responses back. It is larger than the downstream's receive window
// size. Thus, the codec will not end the stream, but the Envoy level stream
// should.
upstream_request_for_response->encodeHeaders(Http::TestResponseHeaderMapImpl{{":status", "200"}},
false);
const int response_size = downstream_window_size + 1024; // Slightly over the window size.
upstream_request_for_response->encodeData(response_size, true);

if (streamBufferAccounting()) {
// Wait for access log to know the Envoy level stream has been deleted.
EXPECT_THAT(waitForAccessLog(access_log_name_), HasSubstr("200"));
}

// Set the pressure so the overload action kills the response if doing stream
// accounting
updateResource(0.95);
test_server_->waitForGaugeEq(
"overload.envoy.overload_actions.reset_high_memory_stream.scale_percent", 62);

if (streamBufferAccounting()) {
test_server_->waitForCounterGe("envoy.overload_actions.reset_high_memory_stream.count", 1);
}

// Reduce resource pressure
updateResource(0.80);
test_server_->waitForGaugeEq(
"overload.envoy.overload_actions.reset_high_memory_stream.scale_percent", 0);

// Resume writes to downstream.
writev_matcher_->setResumeWrites();

if (streamBufferAccounting()) {
EXPECT_TRUE(response->waitForReset());
EXPECT_TRUE(response->reset());
} else {
// If we're not doing the accounting, we didn't end up resetting the
// streams.
ASSERT_TRUE(response->waitForEndStream());
ASSERT_TRUE(response->complete());
EXPECT_EQ(response->headers().getStatusValue(), "200");
}
}

} // namespace Envoy
17 changes: 13 additions & 4 deletions test/mocks/http/stream.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,21 @@ MockStream::MockStream() {
}));

ON_CALL(*this, removeCallbacks(_))
.WillByDefault(
Invoke([this](StreamCallbacks& callbacks) -> void { callbacks_.remove(&callbacks); }));
.WillByDefault(Invoke([this](StreamCallbacks& callbacks) -> void {
for (auto& callback : callbacks_) {
if (callback == &callbacks) {
callback = nullptr;
return;
}
}
}));

ON_CALL(*this, resetStream(_)).WillByDefault(Invoke([this](StreamResetReason reason) -> void {
for (StreamCallbacks* callbacks : callbacks_) {
callbacks->onResetStream(reason, absl::string_view());
for (auto& callback : callbacks_) {
if (callback) {
callback->onResetStream(reason, absl::string_view());
callback = nullptr;
}
}
}));

Expand Down
12 changes: 9 additions & 3 deletions test/mocks/http/stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,19 +23,25 @@ class MockStream : public Stream {
MOCK_METHOD(void, setFlushTimeout, (std::chrono::milliseconds timeout));
MOCK_METHOD(void, setAccount, (Buffer::BufferMemoryAccountSharedPtr));

std::list<StreamCallbacks*> callbacks_{};
// Use the same underlying structure as StreamCallbackHelper to insure iteration stability
// if we remove callbacks during iteration.
absl::InlinedVector<StreamCallbacks*, 8> callbacks_;
Network::Address::InstanceConstSharedPtr connection_local_address_;
Buffer::BufferMemoryAccountSharedPtr account_;

void runHighWatermarkCallbacks() {
for (auto* callback : callbacks_) {
callback->onAboveWriteBufferHighWatermark();
if (callback) {
callback->onAboveWriteBufferHighWatermark();
}
}
}

void runLowWatermarkCallbacks() {
for (auto* callback : callbacks_) {
callback->onBelowWriteBufferLowWatermark();
if (callback) {
callback->onBelowWriteBufferLowWatermark();
}
}
}

Expand Down