Skip to content

Commit

Permalink
Overload: Reset H2 server stream only use codec level reset mechanism (
Browse files Browse the repository at this point in the history
…#18895)

Signed-off-by: Kevin Baichoo <kbaichoo@google.com>
  • Loading branch information
KBaichoo authored Nov 29, 2021
1 parent 209b7ba commit 3b45d6c
Show file tree
Hide file tree
Showing 7 changed files with 170 additions and 51 deletions.
7 changes: 7 additions & 0 deletions source/common/http/conn_manager_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,13 @@ void ConnectionManagerImpl::doDeferredStreamDestroy(ActiveStream& stream) {

read_callbacks_->connection().dispatcher().deferredDelete(stream.removeFromList(streams_));

// The response_encoder should never be dangling (unless we're destroying a
// stream we are recreating) as the codec level stream will either outlive the
// ActiveStream, or be alive in deferred deletion queue at this point.
if (stream.response_encoder_) {
stream.response_encoder_->getStream().removeCallbacks(stream);
}

if (connection_idle_timer_ && streams_.empty()) {
connection_idle_timer_->enableTimer(config_.idleTimeout().value());
}
Expand Down
7 changes: 5 additions & 2 deletions source/common/http/http2/codec_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -639,8 +639,11 @@ void ConnectionImpl::StreamImpl::resetStream(StreamResetReason reason) {

// If we submit a reset, nghttp2 will cancel outbound frames that have not yet been sent.
// We want these frames to go out so we defer the reset until we send all of the frames that
// end the local stream.
if (useDeferredReset() && local_end_stream_ && !local_end_stream_sent_) {
// end the local stream. However, if we're resetting the stream due to
// overload, we should reset the stream as soon as possible to free used
// resources.
if (useDeferredReset() && local_end_stream_ && !local_end_stream_sent_ &&
reason != StreamResetReason::OverloadManager) {
ASSERT(parent_.getStream(stream_id_) != nullptr);
parent_.pending_deferred_reset_streams_.emplace(stream_id_, this);
deferred_reset_ = reason;
Expand Down
87 changes: 46 additions & 41 deletions test/common/http/conn_manager_impl_test_2.cc
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ TEST_F(HttpConnectionManagerImplTest, DownstreamProtocolError) {
return codecProtocolError("protocol error");
}));

EXPECT_CALL(response_encoder_.stream_, removeCallbacks(_));
EXPECT_CALL(response_encoder_.stream_, removeCallbacks(_)).Times(2);
EXPECT_CALL(filter_factory_, createFilterChain(_)).Times(0);

// A protocol exception should result in reset of the streams followed by a remote or local close
Expand Down Expand Up @@ -232,7 +232,7 @@ TEST_F(HttpConnectionManagerImplTest, FrameFloodError) {
return bufferFloodError("too many outbound frames");
}));

EXPECT_CALL(response_encoder_.stream_, removeCallbacks(_));
EXPECT_CALL(response_encoder_.stream_, removeCallbacks(_)).Times(2);
EXPECT_CALL(filter_factory_, createFilterChain(_)).Times(0);

// FrameFloodException should result in reset of the streams followed by abortive close.
Expand Down Expand Up @@ -1104,7 +1104,7 @@ TEST_F(HttpConnectionManagerImplTest, UpstreamWatermarkCallbacks) {
EXPECT_EQ(1U, stats_.named_.downstream_flow_control_resumed_reading_total_.value());

// Backup upstream once again.
EXPECT_CALL(response_encoder_, getStream()).WillOnce(ReturnRef(stream_));
EXPECT_CALL(response_encoder_, getStream()).WillRepeatedly(ReturnRef(stream_));
EXPECT_CALL(stream_, readDisable(true));
ASSERT(decoder_filters_[0]->callbacks_ != nullptr);
decoder_filters_[0]->callbacks_->onDecoderFilterAboveWriteBufferHighWatermark();
Expand Down Expand Up @@ -1334,7 +1334,7 @@ TEST_F(HttpConnectionManagerImplTest, HitFilterWatermarkLimits) {
}));

expectOnDestroy();
EXPECT_CALL(stream_, removeCallbacks(_));
EXPECT_CALL(stream_, removeCallbacks(_)).Times(2);
filter_callbacks_.connection_.raiseEvent(Network::ConnectionEvent::LocalClose);
}

Expand Down Expand Up @@ -1364,45 +1364,47 @@ TEST_F(HttpConnectionManagerImplTest, HitRequestBufferLimits) {

// Return 413 from an intermediate filter and make sure we don't continue the filter chain.
TEST_F(HttpConnectionManagerImplTest, HitRequestBufferLimitsIntermediateFilter) {
InSequence s;
initial_buffer_limit_ = 10;
setup(false, "");

EXPECT_CALL(*codec_, dispatch(_)).WillOnce(Invoke([&](Buffer::Instance&) -> Http::Status {
decoder_ = &conn_manager_->newStream(response_encoder_);
RequestHeaderMapPtr headers{
new TestRequestHeaderMapImpl{{":authority", "host"}, {":path", "/"}, {":method", "GET"}}};
decoder_->decodeHeaders(std::move(headers), false);
{
InSequence s;
initial_buffer_limit_ = 10;
setup(false, "");

Buffer::OwnedImpl fake_data("hello");
decoder_->decodeData(fake_data, false);
EXPECT_CALL(*codec_, dispatch(_)).WillOnce(Invoke([&](Buffer::Instance&) -> Http::Status {
decoder_ = &conn_manager_->newStream(response_encoder_);
RequestHeaderMapPtr headers{
new TestRequestHeaderMapImpl{{":authority", "host"}, {":path", "/"}, {":method", "GET"}}};
decoder_->decodeHeaders(std::move(headers), false);

Buffer::OwnedImpl fake_data2("world world");
decoder_->decodeData(fake_data2, true);
return Http::okStatus();
}));
Buffer::OwnedImpl fake_data("hello");
decoder_->decodeData(fake_data, false);

setUpBufferLimits();
setupFilterChain(2, 1);
Buffer::OwnedImpl fake_data2("world world");
decoder_->decodeData(fake_data2, true);
return Http::okStatus();
}));

EXPECT_CALL(*decoder_filters_[0], decodeHeaders(_, false))
.WillOnce(Return(FilterHeadersStatus::StopIteration));
EXPECT_CALL(*decoder_filters_[0], decodeData(_, false))
.WillOnce(Return(FilterDataStatus::StopIterationAndBuffer));
EXPECT_CALL(*decoder_filters_[0], decodeData(_, true))
.WillOnce(Return(FilterDataStatus::Continue));
EXPECT_CALL(*decoder_filters_[0], decodeComplete());
Http::TestResponseHeaderMapImpl response_headers{
{":status", "413"}, {"content-length", "17"}, {"content-type", "text/plain"}};
EXPECT_CALL(*encoder_filters_[0], encodeHeaders(HeaderMapEqualRef(&response_headers), false))
.WillOnce(Return(FilterHeadersStatus::StopIteration));
EXPECT_CALL(*encoder_filters_[0], encodeData(_, true))
.WillOnce(Return(FilterDataStatus::StopIterationAndWatermark));
EXPECT_CALL(*encoder_filters_[0], encodeComplete());
setUpBufferLimits();
setupFilterChain(2, 1);

// Kick off the incoming data.
Buffer::OwnedImpl fake_input("1234");
conn_manager_->onData(fake_input, false);
EXPECT_CALL(*decoder_filters_[0], decodeHeaders(_, false))
.WillOnce(Return(FilterHeadersStatus::StopIteration));
EXPECT_CALL(*decoder_filters_[0], decodeData(_, false))
.WillOnce(Return(FilterDataStatus::StopIterationAndBuffer));
EXPECT_CALL(*decoder_filters_[0], decodeData(_, true))
.WillOnce(Return(FilterDataStatus::Continue));
EXPECT_CALL(*decoder_filters_[0], decodeComplete());
Http::TestResponseHeaderMapImpl response_headers{
{":status", "413"}, {"content-length", "17"}, {"content-type", "text/plain"}};
EXPECT_CALL(*encoder_filters_[0], encodeHeaders(HeaderMapEqualRef(&response_headers), false))
.WillOnce(Return(FilterHeadersStatus::StopIteration));
EXPECT_CALL(*encoder_filters_[0], encodeData(_, true))
.WillOnce(Return(FilterDataStatus::StopIterationAndWatermark));
EXPECT_CALL(*encoder_filters_[0], encodeComplete());

// Kick off the incoming data.
Buffer::OwnedImpl fake_input("1234");
conn_manager_->onData(fake_input, false);
}

doRemoteClose(false);
}
Expand Down Expand Up @@ -1469,17 +1471,20 @@ TEST_F(HttpConnectionManagerImplTest, HitResponseBufferLimitsAfterHeaders) {
const std::string data = "A long enough string to go over watermarks";
Buffer::OwnedImpl fake_response(data);
InSequence s;
EXPECT_CALL(stream_, removeCallbacks(_));
expectOnDestroy(false);
EXPECT_CALL(*encoder_filters_[1], encodeData(_, false))
.WillOnce(Return(FilterDataStatus::StopIterationAndBuffer));
EXPECT_CALL(stream_, resetStream(_));
filter_callbacks_.connection_.raiseEvent(Network::ConnectionEvent::LocalClose);
EXPECT_LOG_CONTAINS(
"debug",
"Resetting stream due to response_payload_too_large. Prior headers have already been sent",
decoder_filters_[0]->callbacks_->encodeData(fake_response, false););
EXPECT_EQ(1U, stats_.named_.rs_too_large_.value());

// Clean up connection
EXPECT_CALL(stream_, removeCallbacks(_));
expectOnDestroy(false);
EXPECT_CALL(stream_, removeCallbacks(_));
filter_callbacks_.connection_.raiseEvent(Network::ConnectionEvent::LocalClose);
}

TEST_F(HttpConnectionManagerImplTest, FilterHeadReply) {
Expand Down
4 changes: 3 additions & 1 deletion test/common/http/conn_manager_impl_test_base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,9 @@ void HttpConnectionManagerImplTest::expectOnDestroy(bool deferred) {
}

void HttpConnectionManagerImplTest::doRemoteClose(bool deferred) {
EXPECT_CALL(stream_, removeCallbacks(_));
// We will call removeCallbacks twice.
// Once in resetAllStreams, and once in doDeferredStreamDestroy.
EXPECT_CALL(stream_, removeCallbacks(_)).Times(2);
expectOnDestroy(deferred);
filter_callbacks_.connection_.raiseEvent(Network::ConnectionEvent::RemoteClose);
}
Expand Down
87 changes: 87 additions & 0 deletions test/integration/buffer_accounting_integration_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
namespace Envoy {
namespace {

using testing::HasSubstr;

std::string protocolTestParamsAndBoolToString(
const ::testing::TestParamInfo<std::tuple<HttpProtocolTestParams, bool, bool>>& params) {
return fmt::format("{}_{}_{}",
Expand Down Expand Up @@ -617,4 +619,89 @@ TEST_P(Http2OverloadManagerIntegrationTest,
EXPECT_EQ(smallest_response->headers().getStatusValue(), "200");
}

TEST_P(Http2OverloadManagerIntegrationTest, CanResetStreamIfEnvoyLevelStreamEnded) {
useAccessLog("%RESPONSE_CODE%");
initializeOverloadManagerInBootstrap(
TestUtility::parseYaml<envoy::config::overload::v3::OverloadAction>(R"EOF(
name: "envoy.overload_actions.reset_high_memory_stream"
triggers:
- name: "envoy.resource_monitors.testonly.fake_resource_monitor"
scaled:
scaling_threshold: 0.90
saturation_threshold: 0.98
)EOF"));
initialize();

// Set 10MiB receive window for the client.
const int downstream_window_size = 10 * 1024 * 1024;
envoy::config::core::v3::Http2ProtocolOptions http2_options =
::Envoy::Http2::Utility::initializeAndValidateOptions(
envoy::config::core::v3::Http2ProtocolOptions());
http2_options.mutable_initial_stream_window_size()->set_value(downstream_window_size);
http2_options.mutable_initial_connection_window_size()->set_value(downstream_window_size);
codec_client_ = makeRawHttpConnection(makeClientConnection(lookupPort("http")), http2_options);

// Makes us have Envoy's writes to downstream return EAGAIN
writev_matcher_->setSourcePort(lookupPort("http"));
writev_matcher_->setWritevReturnsEgain();

// Send a request
auto encoder_decoder = codec_client_->startRequest(Http::TestRequestHeaderMapImpl{
{":method", "POST"},
{":path", "/"},
{":scheme", "http"},
{":authority", "host"},
{"content-length", "10"},
});
auto& encoder = encoder_decoder.first;
const std::string data(10, 'a');
codec_client_->sendData(encoder, data, true);
auto response = std::move(encoder_decoder.second);

waitForNextUpstreamRequest();
FakeStreamPtr upstream_request_for_response = std::move(upstream_request_);

// Send the responses back. It is larger than the downstream's receive window
// size. Thus, the codec will not end the stream, but the Envoy level stream
// should.
upstream_request_for_response->encodeHeaders(Http::TestResponseHeaderMapImpl{{":status", "200"}},
false);
const int response_size = downstream_window_size + 1024; // Slightly over the window size.
upstream_request_for_response->encodeData(response_size, true);

if (streamBufferAccounting()) {
// Wait for access log to know the Envoy level stream has been deleted.
EXPECT_THAT(waitForAccessLog(access_log_name_), HasSubstr("200"));
}

// Set the pressure so the overload action kills the response if doing stream
// accounting
updateResource(0.95);
test_server_->waitForGaugeEq(
"overload.envoy.overload_actions.reset_high_memory_stream.scale_percent", 62);

if (streamBufferAccounting()) {
test_server_->waitForCounterGe("envoy.overload_actions.reset_high_memory_stream.count", 1);
}

// Reduce resource pressure
updateResource(0.80);
test_server_->waitForGaugeEq(
"overload.envoy.overload_actions.reset_high_memory_stream.scale_percent", 0);

// Resume writes to downstream.
writev_matcher_->setResumeWrites();

if (streamBufferAccounting()) {
EXPECT_TRUE(response->waitForReset());
EXPECT_TRUE(response->reset());
} else {
// If we're not doing the accounting, we didn't end up resetting the
// streams.
ASSERT_TRUE(response->waitForEndStream());
ASSERT_TRUE(response->complete());
EXPECT_EQ(response->headers().getStatusValue(), "200");
}
}

} // namespace Envoy
17 changes: 13 additions & 4 deletions test/mocks/http/stream.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,21 @@ MockStream::MockStream() {
}));

ON_CALL(*this, removeCallbacks(_))
.WillByDefault(
Invoke([this](StreamCallbacks& callbacks) -> void { callbacks_.remove(&callbacks); }));
.WillByDefault(Invoke([this](StreamCallbacks& callbacks) -> void {
for (auto& callback : callbacks_) {
if (callback == &callbacks) {
callback = nullptr;
return;
}
}
}));

ON_CALL(*this, resetStream(_)).WillByDefault(Invoke([this](StreamResetReason reason) -> void {
for (StreamCallbacks* callbacks : callbacks_) {
callbacks->onResetStream(reason, absl::string_view());
for (auto& callback : callbacks_) {
if (callback) {
callback->onResetStream(reason, absl::string_view());
callback = nullptr;
}
}
}));

Expand Down
12 changes: 9 additions & 3 deletions test/mocks/http/stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,19 +23,25 @@ class MockStream : public Stream {
MOCK_METHOD(void, setFlushTimeout, (std::chrono::milliseconds timeout));
MOCK_METHOD(void, setAccount, (Buffer::BufferMemoryAccountSharedPtr));

std::list<StreamCallbacks*> callbacks_{};
// Use the same underlying structure as StreamCallbackHelper to insure iteration stability
// if we remove callbacks during iteration.
absl::InlinedVector<StreamCallbacks*, 8> callbacks_;
Network::Address::InstanceConstSharedPtr connection_local_address_;
Buffer::BufferMemoryAccountSharedPtr account_;

void runHighWatermarkCallbacks() {
for (auto* callback : callbacks_) {
callback->onAboveWriteBufferHighWatermark();
if (callback) {
callback->onAboveWriteBufferHighWatermark();
}
}
}

void runLowWatermarkCallbacks() {
for (auto* callback : callbacks_) {
callback->onBelowWriteBufferLowWatermark();
if (callback) {
callback->onBelowWriteBufferLowWatermark();
}
}
}

Expand Down

0 comments on commit 3b45d6c

Please sign in to comment.