diff --git a/source/common/http/conn_manager_impl.cc b/source/common/http/conn_manager_impl.cc index d9c654a59633..d39aa9bda4b3 100644 --- a/source/common/http/conn_manager_impl.cc +++ b/source/common/http/conn_manager_impl.cc @@ -260,6 +260,13 @@ void ConnectionManagerImpl::doDeferredStreamDestroy(ActiveStream& stream) { read_callbacks_->connection().dispatcher().deferredDelete(stream.removeFromList(streams_)); + // The response_encoder should never be dangling (unless we're destroying a + // stream we are recreating) as the codec level stream will either outlive the + // ActiveStream, or be alive in deferred deletion queue at this point. + if (stream.response_encoder_) { + stream.response_encoder_->getStream().removeCallbacks(stream); + } + if (connection_idle_timer_ && streams_.empty()) { connection_idle_timer_->enableTimer(config_.idleTimeout().value()); } diff --git a/source/common/http/http2/codec_impl.cc b/source/common/http/http2/codec_impl.cc index 1e19b3c75290..c681acb6525d 100644 --- a/source/common/http/http2/codec_impl.cc +++ b/source/common/http/http2/codec_impl.cc @@ -639,8 +639,11 @@ void ConnectionImpl::StreamImpl::resetStream(StreamResetReason reason) { // If we submit a reset, nghttp2 will cancel outbound frames that have not yet been sent. // We want these frames to go out so we defer the reset until we send all of the frames that - // end the local stream. - if (useDeferredReset() && local_end_stream_ && !local_end_stream_sent_) { + // end the local stream. However, if we're resetting the stream due to + // overload, we should reset the stream as soon as possible to free used + // resources. + if (useDeferredReset() && local_end_stream_ && !local_end_stream_sent_ && + reason != StreamResetReason::OverloadManager) { ASSERT(parent_.getStream(stream_id_) != nullptr); parent_.pending_deferred_reset_streams_.emplace(stream_id_, this); deferred_reset_ = reason; diff --git a/test/common/http/conn_manager_impl_test_2.cc b/test/common/http/conn_manager_impl_test_2.cc index 44bd0db57dde..42f279490a7d 100644 --- a/test/common/http/conn_manager_impl_test_2.cc +++ b/test/common/http/conn_manager_impl_test_2.cc @@ -150,7 +150,7 @@ TEST_F(HttpConnectionManagerImplTest, DownstreamProtocolError) { return codecProtocolError("protocol error"); })); - EXPECT_CALL(response_encoder_.stream_, removeCallbacks(_)); + EXPECT_CALL(response_encoder_.stream_, removeCallbacks(_)).Times(2); EXPECT_CALL(filter_factory_, createFilterChain(_)).Times(0); // A protocol exception should result in reset of the streams followed by a remote or local close @@ -232,7 +232,7 @@ TEST_F(HttpConnectionManagerImplTest, FrameFloodError) { return bufferFloodError("too many outbound frames"); })); - EXPECT_CALL(response_encoder_.stream_, removeCallbacks(_)); + EXPECT_CALL(response_encoder_.stream_, removeCallbacks(_)).Times(2); EXPECT_CALL(filter_factory_, createFilterChain(_)).Times(0); // FrameFloodException should result in reset of the streams followed by abortive close. @@ -1104,7 +1104,7 @@ TEST_F(HttpConnectionManagerImplTest, UpstreamWatermarkCallbacks) { EXPECT_EQ(1U, stats_.named_.downstream_flow_control_resumed_reading_total_.value()); // Backup upstream once again. - EXPECT_CALL(response_encoder_, getStream()).WillOnce(ReturnRef(stream_)); + EXPECT_CALL(response_encoder_, getStream()).WillRepeatedly(ReturnRef(stream_)); EXPECT_CALL(stream_, readDisable(true)); ASSERT(decoder_filters_[0]->callbacks_ != nullptr); decoder_filters_[0]->callbacks_->onDecoderFilterAboveWriteBufferHighWatermark(); @@ -1334,7 +1334,7 @@ TEST_F(HttpConnectionManagerImplTest, HitFilterWatermarkLimits) { })); expectOnDestroy(); - EXPECT_CALL(stream_, removeCallbacks(_)); + EXPECT_CALL(stream_, removeCallbacks(_)).Times(2); filter_callbacks_.connection_.raiseEvent(Network::ConnectionEvent::LocalClose); } @@ -1364,45 +1364,47 @@ TEST_F(HttpConnectionManagerImplTest, HitRequestBufferLimits) { // Return 413 from an intermediate filter and make sure we don't continue the filter chain. TEST_F(HttpConnectionManagerImplTest, HitRequestBufferLimitsIntermediateFilter) { - InSequence s; - initial_buffer_limit_ = 10; - setup(false, ""); - - EXPECT_CALL(*codec_, dispatch(_)).WillOnce(Invoke([&](Buffer::Instance&) -> Http::Status { - decoder_ = &conn_manager_->newStream(response_encoder_); - RequestHeaderMapPtr headers{ - new TestRequestHeaderMapImpl{{":authority", "host"}, {":path", "/"}, {":method", "GET"}}}; - decoder_->decodeHeaders(std::move(headers), false); + { + InSequence s; + initial_buffer_limit_ = 10; + setup(false, ""); - Buffer::OwnedImpl fake_data("hello"); - decoder_->decodeData(fake_data, false); + EXPECT_CALL(*codec_, dispatch(_)).WillOnce(Invoke([&](Buffer::Instance&) -> Http::Status { + decoder_ = &conn_manager_->newStream(response_encoder_); + RequestHeaderMapPtr headers{ + new TestRequestHeaderMapImpl{{":authority", "host"}, {":path", "/"}, {":method", "GET"}}}; + decoder_->decodeHeaders(std::move(headers), false); - Buffer::OwnedImpl fake_data2("world world"); - decoder_->decodeData(fake_data2, true); - return Http::okStatus(); - })); + Buffer::OwnedImpl fake_data("hello"); + decoder_->decodeData(fake_data, false); - setUpBufferLimits(); - setupFilterChain(2, 1); + Buffer::OwnedImpl fake_data2("world world"); + decoder_->decodeData(fake_data2, true); + return Http::okStatus(); + })); - EXPECT_CALL(*decoder_filters_[0], decodeHeaders(_, false)) - .WillOnce(Return(FilterHeadersStatus::StopIteration)); - EXPECT_CALL(*decoder_filters_[0], decodeData(_, false)) - .WillOnce(Return(FilterDataStatus::StopIterationAndBuffer)); - EXPECT_CALL(*decoder_filters_[0], decodeData(_, true)) - .WillOnce(Return(FilterDataStatus::Continue)); - EXPECT_CALL(*decoder_filters_[0], decodeComplete()); - Http::TestResponseHeaderMapImpl response_headers{ - {":status", "413"}, {"content-length", "17"}, {"content-type", "text/plain"}}; - EXPECT_CALL(*encoder_filters_[0], encodeHeaders(HeaderMapEqualRef(&response_headers), false)) - .WillOnce(Return(FilterHeadersStatus::StopIteration)); - EXPECT_CALL(*encoder_filters_[0], encodeData(_, true)) - .WillOnce(Return(FilterDataStatus::StopIterationAndWatermark)); - EXPECT_CALL(*encoder_filters_[0], encodeComplete()); + setUpBufferLimits(); + setupFilterChain(2, 1); - // Kick off the incoming data. - Buffer::OwnedImpl fake_input("1234"); - conn_manager_->onData(fake_input, false); + EXPECT_CALL(*decoder_filters_[0], decodeHeaders(_, false)) + .WillOnce(Return(FilterHeadersStatus::StopIteration)); + EXPECT_CALL(*decoder_filters_[0], decodeData(_, false)) + .WillOnce(Return(FilterDataStatus::StopIterationAndBuffer)); + EXPECT_CALL(*decoder_filters_[0], decodeData(_, true)) + .WillOnce(Return(FilterDataStatus::Continue)); + EXPECT_CALL(*decoder_filters_[0], decodeComplete()); + Http::TestResponseHeaderMapImpl response_headers{ + {":status", "413"}, {"content-length", "17"}, {"content-type", "text/plain"}}; + EXPECT_CALL(*encoder_filters_[0], encodeHeaders(HeaderMapEqualRef(&response_headers), false)) + .WillOnce(Return(FilterHeadersStatus::StopIteration)); + EXPECT_CALL(*encoder_filters_[0], encodeData(_, true)) + .WillOnce(Return(FilterDataStatus::StopIterationAndWatermark)); + EXPECT_CALL(*encoder_filters_[0], encodeComplete()); + + // Kick off the incoming data. + Buffer::OwnedImpl fake_input("1234"); + conn_manager_->onData(fake_input, false); + } doRemoteClose(false); } @@ -1469,17 +1471,20 @@ TEST_F(HttpConnectionManagerImplTest, HitResponseBufferLimitsAfterHeaders) { const std::string data = "A long enough string to go over watermarks"; Buffer::OwnedImpl fake_response(data); InSequence s; - EXPECT_CALL(stream_, removeCallbacks(_)); - expectOnDestroy(false); EXPECT_CALL(*encoder_filters_[1], encodeData(_, false)) .WillOnce(Return(FilterDataStatus::StopIterationAndBuffer)); EXPECT_CALL(stream_, resetStream(_)); - filter_callbacks_.connection_.raiseEvent(Network::ConnectionEvent::LocalClose); EXPECT_LOG_CONTAINS( "debug", "Resetting stream due to response_payload_too_large. Prior headers have already been sent", decoder_filters_[0]->callbacks_->encodeData(fake_response, false);); EXPECT_EQ(1U, stats_.named_.rs_too_large_.value()); + + // Clean up connection + EXPECT_CALL(stream_, removeCallbacks(_)); + expectOnDestroy(false); + EXPECT_CALL(stream_, removeCallbacks(_)); + filter_callbacks_.connection_.raiseEvent(Network::ConnectionEvent::LocalClose); } TEST_F(HttpConnectionManagerImplTest, FilterHeadReply) { diff --git a/test/common/http/conn_manager_impl_test_base.cc b/test/common/http/conn_manager_impl_test_base.cc index 0ecfafee44fd..14ba94dd1a03 100644 --- a/test/common/http/conn_manager_impl_test_base.cc +++ b/test/common/http/conn_manager_impl_test_base.cc @@ -264,7 +264,9 @@ void HttpConnectionManagerImplTest::expectOnDestroy(bool deferred) { } void HttpConnectionManagerImplTest::doRemoteClose(bool deferred) { - EXPECT_CALL(stream_, removeCallbacks(_)); + // We will call removeCallbacks twice. + // Once in resetAllStreams, and once in doDeferredStreamDestroy. + EXPECT_CALL(stream_, removeCallbacks(_)).Times(2); expectOnDestroy(deferred); filter_callbacks_.connection_.raiseEvent(Network::ConnectionEvent::RemoteClose); } diff --git a/test/integration/buffer_accounting_integration_test.cc b/test/integration/buffer_accounting_integration_test.cc index 391cf60ec861..13f68cc08a40 100644 --- a/test/integration/buffer_accounting_integration_test.cc +++ b/test/integration/buffer_accounting_integration_test.cc @@ -23,6 +23,8 @@ namespace Envoy { namespace { +using testing::HasSubstr; + std::string protocolTestParamsAndBoolToString( const ::testing::TestParamInfo>& params) { return fmt::format("{}_{}_{}", @@ -617,4 +619,89 @@ TEST_P(Http2OverloadManagerIntegrationTest, EXPECT_EQ(smallest_response->headers().getStatusValue(), "200"); } +TEST_P(Http2OverloadManagerIntegrationTest, CanResetStreamIfEnvoyLevelStreamEnded) { + useAccessLog("%RESPONSE_CODE%"); + initializeOverloadManagerInBootstrap( + TestUtility::parseYaml(R"EOF( + name: "envoy.overload_actions.reset_high_memory_stream" + triggers: + - name: "envoy.resource_monitors.testonly.fake_resource_monitor" + scaled: + scaling_threshold: 0.90 + saturation_threshold: 0.98 + )EOF")); + initialize(); + + // Set 10MiB receive window for the client. + const int downstream_window_size = 10 * 1024 * 1024; + envoy::config::core::v3::Http2ProtocolOptions http2_options = + ::Envoy::Http2::Utility::initializeAndValidateOptions( + envoy::config::core::v3::Http2ProtocolOptions()); + http2_options.mutable_initial_stream_window_size()->set_value(downstream_window_size); + http2_options.mutable_initial_connection_window_size()->set_value(downstream_window_size); + codec_client_ = makeRawHttpConnection(makeClientConnection(lookupPort("http")), http2_options); + + // Makes us have Envoy's writes to downstream return EAGAIN + writev_matcher_->setSourcePort(lookupPort("http")); + writev_matcher_->setWritevReturnsEgain(); + + // Send a request + auto encoder_decoder = codec_client_->startRequest(Http::TestRequestHeaderMapImpl{ + {":method", "POST"}, + {":path", "/"}, + {":scheme", "http"}, + {":authority", "host"}, + {"content-length", "10"}, + }); + auto& encoder = encoder_decoder.first; + const std::string data(10, 'a'); + codec_client_->sendData(encoder, data, true); + auto response = std::move(encoder_decoder.second); + + waitForNextUpstreamRequest(); + FakeStreamPtr upstream_request_for_response = std::move(upstream_request_); + + // Send the responses back. It is larger than the downstream's receive window + // size. Thus, the codec will not end the stream, but the Envoy level stream + // should. + upstream_request_for_response->encodeHeaders(Http::TestResponseHeaderMapImpl{{":status", "200"}}, + false); + const int response_size = downstream_window_size + 1024; // Slightly over the window size. + upstream_request_for_response->encodeData(response_size, true); + + if (streamBufferAccounting()) { + // Wait for access log to know the Envoy level stream has been deleted. + EXPECT_THAT(waitForAccessLog(access_log_name_), HasSubstr("200")); + } + + // Set the pressure so the overload action kills the response if doing stream + // accounting + updateResource(0.95); + test_server_->waitForGaugeEq( + "overload.envoy.overload_actions.reset_high_memory_stream.scale_percent", 62); + + if (streamBufferAccounting()) { + test_server_->waitForCounterGe("envoy.overload_actions.reset_high_memory_stream.count", 1); + } + + // Reduce resource pressure + updateResource(0.80); + test_server_->waitForGaugeEq( + "overload.envoy.overload_actions.reset_high_memory_stream.scale_percent", 0); + + // Resume writes to downstream. + writev_matcher_->setResumeWrites(); + + if (streamBufferAccounting()) { + EXPECT_TRUE(response->waitForReset()); + EXPECT_TRUE(response->reset()); + } else { + // If we're not doing the accounting, we didn't end up resetting the + // streams. + ASSERT_TRUE(response->waitForEndStream()); + ASSERT_TRUE(response->complete()); + EXPECT_EQ(response->headers().getStatusValue(), "200"); + } +} + } // namespace Envoy diff --git a/test/mocks/http/stream.cc b/test/mocks/http/stream.cc index 19181d8c26ed..9410afdf2774 100644 --- a/test/mocks/http/stream.cc +++ b/test/mocks/http/stream.cc @@ -13,12 +13,21 @@ MockStream::MockStream() { })); ON_CALL(*this, removeCallbacks(_)) - .WillByDefault( - Invoke([this](StreamCallbacks& callbacks) -> void { callbacks_.remove(&callbacks); })); + .WillByDefault(Invoke([this](StreamCallbacks& callbacks) -> void { + for (auto& callback : callbacks_) { + if (callback == &callbacks) { + callback = nullptr; + return; + } + } + })); ON_CALL(*this, resetStream(_)).WillByDefault(Invoke([this](StreamResetReason reason) -> void { - for (StreamCallbacks* callbacks : callbacks_) { - callbacks->onResetStream(reason, absl::string_view()); + for (auto& callback : callbacks_) { + if (callback) { + callback->onResetStream(reason, absl::string_view()); + callback = nullptr; + } } })); diff --git a/test/mocks/http/stream.h b/test/mocks/http/stream.h index 9a0abc4c58df..a9d27e58ddef 100644 --- a/test/mocks/http/stream.h +++ b/test/mocks/http/stream.h @@ -23,19 +23,25 @@ class MockStream : public Stream { MOCK_METHOD(void, setFlushTimeout, (std::chrono::milliseconds timeout)); MOCK_METHOD(void, setAccount, (Buffer::BufferMemoryAccountSharedPtr)); - std::list callbacks_{}; + // Use the same underlying structure as StreamCallbackHelper to insure iteration stability + // if we remove callbacks during iteration. + absl::InlinedVector callbacks_; Network::Address::InstanceConstSharedPtr connection_local_address_; Buffer::BufferMemoryAccountSharedPtr account_; void runHighWatermarkCallbacks() { for (auto* callback : callbacks_) { - callback->onAboveWriteBufferHighWatermark(); + if (callback) { + callback->onAboveWriteBufferHighWatermark(); + } } } void runLowWatermarkCallbacks() { for (auto* callback : callbacks_) { - callback->onBelowWriteBufferLowWatermark(); + if (callback) { + callback->onBelowWriteBufferLowWatermark(); + } } }