Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Overload: Reset H2 server stream only use codec level reset mechanism #18895

Merged
merged 15 commits into from
Nov 29, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions envoy/buffer/buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,16 @@ class BufferMemoryAccount {
* should trigger a reset of the corresponding upstream stream if it exists.
*/
virtual void resetDownstream() PURE;

/**
* Should be invoked when the Envoy level stream is complete.
*/
virtual void onEnvoyStreamComplete() PURE;

/**
* Whether the Envoy level stream was complete.
*/
virtual bool sawEnvoyStreamComplete() PURE;
KBaichoo marked this conversation as resolved.
Show resolved Hide resolved
};

using BufferMemoryAccountSharedPtr = std::shared_ptr<BufferMemoryAccount>;
Expand Down
5 changes: 4 additions & 1 deletion source/common/buffer/watermark_buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,9 @@ class BufferMemoryAccountImpl : public BufferMemoryAccount {
}
}

void onEnvoyStreamComplete() override { saw_envoy_stream_complete_ = true; }
bool sawEnvoyStreamComplete() override { return saw_envoy_stream_complete_; }

// The number of memory classes the Account expects to exists. See
// *WatermarkBufferFactory* for details on the memory classes.
static constexpr uint32_t NUM_MEMORY_CLASSES_ = 8;
Expand All @@ -143,7 +146,7 @@ class BufferMemoryAccountImpl : public BufferMemoryAccount {
absl::optional<uint32_t> current_bucket_idx_{};

WatermarkBufferFactory* factory_ = nullptr;

bool saw_envoy_stream_complete_ = false;
OptRef<Http::StreamResetHandler> reset_handler_;
// Keep a copy of the shared_ptr pointing to this account. We opted to go this
// route rather than enable_shared_from_this to avoid wasteful atomic
Expand Down
4 changes: 4 additions & 0 deletions source/common/http/filter_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -777,6 +777,10 @@ class FilterManager : public ScopeTrackedObject,
filter->handle_->onStreamComplete();
}
}

if (account_) {
account_->onEnvoyStreamComplete();
}
}

void destroyFilters() {
Expand Down
11 changes: 10 additions & 1 deletion source/common/http/http2/codec_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -537,7 +537,16 @@ void ConnectionImpl::ServerStreamImpl::resetStream(StreamResetReason reason) {
buffer_memory_account_->clearDownstream();
}

StreamImpl::resetStream(reason);
if (buffer_memory_account_ && buffer_memory_account_->sawEnvoyStreamComplete()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So the underlying problem here, is that the filter manager believes the stream to be complete, and the codec does not? I think we don't want the buffer accounting to be the entity handling this disconnect because there could be other entities with this lifetime confusion.

either entities which can down-call a reset should be explicitly made aware of a stream being complete, or the filter manager should handle reset being called after it thinks that the stream is complete. Make sense?
cc @snowp for second opinion

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Envoy level stream object (ConnectionManagerImpl::ActiveStream is complete) and so the filter manager also thinks the stream is complete, but the codec doesn't as you said as it has yet to call onStreamClose.

AFAIK, the only entities that'll try to reset the stream at this point is either the reset_high_memory_stream action or the OnPendingFlushTimer though in the latter case the codec will know the stream has ended.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there's also cases in envoy mobile where we've had a reset-after-fin (Snow has reviewed) so again I'd rather make this a bit more resistant

// We should use a lower level reset mechanism to reset the stream.
resetStreamWorker(reason);
if (parent_.sendPendingFramesAndHandleError()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After some more offline discussion I'd suggested using local_end_stream_ rather than adding yet another way to communicate a stream is done. @KBaichoo thinks that's sketchy because of interplay between local_end_stream_ and the deferred reset code.
Pulling in @yanavlasov and @antoniovicente to comment.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

interplay between local_end_stream and runResetCallbacks(reason) I think can be sketchy (sorry if I didn't convey that well)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change feels brittle.

I think we need to dig deeper and make object lifetimes more clear. We need to eliminate all cases where an object remains registered after deletion.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah that's my thought too. @KBaichoo if we're in a hurry to get this landed I'd be OK merging with a TODO to fix if you can help resolve the real lifetime issues right after, but if we're not in a rush I think we want to sort out the lifetime issues now in a less fragile way.
/wait

Copy link
Contributor Author

@KBaichoo KBaichoo Nov 9, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the pushback. I've dug deeper and have done the following:

  • Delay H1 ActiveRequest dtor until after the Envoy level stream is destroyed -- didn't have the invariant that codec stream outlived the envoy stream in local reply cases.
  • With the above, it's safe to in doDeferredStreamDestroy access the response encoder (codec stream) and remove the envoy level stream from callbacks, and notify the codec level stream it's done.
  • Removed changes to buffer account interface

// Intended to check through coverage that this error case is tested
return;
}
} else {
StreamImpl::resetStream(reason);
}
}

void ConnectionImpl::StreamImpl::resetStream(StreamResetReason reason) {
Expand Down
85 changes: 85 additions & 0 deletions test/integration/buffer_accounting_integration_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -611,4 +611,89 @@ TEST_P(Http2OverloadManagerIntegrationTest,
EXPECT_EQ(smallest_response->headers().getStatusValue(), "200");
}

TEST_P(Http2OverloadManagerIntegrationTest, CanResetStreamIfEnvoyLevelStreamEnded) {
initializeOverloadManagerInBootstrap(
TestUtility::parseYaml<envoy::config::overload::v3::OverloadAction>(R"EOF(
name: "envoy.overload_actions.reset_high_memory_stream"
triggers:
- name: "envoy.resource_monitors.testonly.fake_resource_monitor"
scaled:
scaling_threshold: 0.90
saturation_threshold: 0.98
)EOF"));
initialize();

// Set 10MiB receive window for the client.
const int downstream_window_size = 10 * 1024 * 1024;
envoy::config::core::v3::Http2ProtocolOptions http2_options =
::Envoy::Http2::Utility::initializeAndValidateOptions(
envoy::config::core::v3::Http2ProtocolOptions());
http2_options.mutable_initial_stream_window_size()->set_value(downstream_window_size);
http2_options.mutable_initial_connection_window_size()->set_value(downstream_window_size);
codec_client_ = makeRawHttpConnection(makeClientConnection(lookupPort("http")), http2_options);

// Makes us have Envoy's writes to downstream return EAGAIN
writev_matcher_->setSourcePort(lookupPort("http"));
writev_matcher_->setWritevReturnsEgain();

// Send a request
auto encoder_decoder = codec_client_->startRequest(Http::TestRequestHeaderMapImpl{
{":method", "POST"},
{":path", "/"},
{":scheme", "http"},
{":authority", "host"},
{"content-length", "10"},
});
auto& encoder = encoder_decoder.first;
const std::string data(10, 'a');
codec_client_->sendData(encoder, data, true);
auto response = std::move(encoder_decoder.second);

waitForNextUpstreamRequest();
FakeStreamPtr upstream_request_for_response = std::move(upstream_request_);

// Send the responses back. It is larger than the downstream's receive window
// size. Thus, the codec will not end the stream, but the Envoy level stream
// should.
upstream_request_for_response->encodeHeaders(Http::TestResponseHeaderMapImpl{{":status", "200"}},
false);
const int response_size = downstream_window_size + 1024; // Slightly over the window size.
upstream_request_for_response->encodeData(response_size, true);

// Wait for the stream to have seen complete.
if (streamBufferAccounting()) {
EXPECT_TRUE(
buffer_factory_->waitUntilExpectedNumberOfActiveAccountsThatSawEnvoyStreamComplete(1));
}

// Set the pressure so the overload action kills the response if doing stream
// accounting
updateResource(0.95);
test_server_->waitForGaugeEq(
"overload.envoy.overload_actions.reset_high_memory_stream.scale_percent", 62);

if (streamBufferAccounting()) {
test_server_->waitForCounterGe("envoy.overload_actions.reset_high_memory_stream.count", 1);
}

// Reduce resource pressure
updateResource(0.80);
test_server_->waitForGaugeEq(
"overload.envoy.overload_actions.reset_high_memory_stream.scale_percent", 0);

// Resume writes to downstream.
writev_matcher_->setResumeWrites();

if (streamBufferAccounting()) {
EXPECT_TRUE(response->waitForReset());
EXPECT_TRUE(response->reset());
} else {
// If we're not doing the accounting, we didn't end up resetting the
// streams.
ASSERT_TRUE(response->waitForEndStream());
ASSERT_TRUE(response->complete());
EXPECT_EQ(response->headers().getStatusValue(), "200");
}
}

} // namespace Envoy
19 changes: 19 additions & 0 deletions test/integration/tracked_watermark_buffer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,25 @@ bool TrackedWatermarkBufferFactory::waitUntilExpectedNumberOfAccountsAndBoundBuf
return mutex_.AwaitWithTimeout(absl::Condition(&predicate), absl::FromChrono(timeout));
}

bool TrackedWatermarkBufferFactory::
waitUntilExpectedNumberOfActiveAccountsThatSawEnvoyStreamComplete(
uint32_t expected_num_accounts, std::chrono::milliseconds timeout) {
absl::MutexLock lock(&mutex_);
auto predicate = [this, expected_num_accounts]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mutex_) {
mutex_.AssertHeld();
// Remove non-active accounts
removeDanglingAccounts();
uint32_t num_accounts_seen_envoy_stream_complete = 0;
for (auto& entry : account_infos_) {
if (entry.first->sawEnvoyStreamComplete()) {
++num_accounts_seen_envoy_stream_complete;
}
}
return num_accounts_seen_envoy_stream_complete == expected_num_accounts;
};
return mutex_.AwaitWithTimeout(absl::Condition(&predicate), absl::FromChrono(timeout));
}

void TrackedWatermarkBufferFactory::checkIfExpectedBalancesMet() {
if (!expected_balances_ || expected_balances_met_.HasBeenNotified()) {
return;
Expand Down
3 changes: 3 additions & 0 deletions test/integration/tracked_watermark_buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,9 @@ class TrackedWatermarkBufferFactory : public WatermarkBufferFactory {
uint32_t num_accounts, uint32_t num_bound_buffers,
std::chrono::milliseconds timeout = TestUtility::DefaultTimeout);

bool waitUntilExpectedNumberOfActiveAccountsThatSawEnvoyStreamComplete(
uint32_t num_accounts, std::chrono::milliseconds timeout = TestUtility::DefaultTimeout);

using AccountToBoundBuffersMap =
absl::flat_hash_map<BufferMemoryAccountSharedPtr,
absl::flat_hash_set<TrackedWatermarkBuffer*>>;
Expand Down