Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP #35827

Draft
wants to merge 11 commits into
base: main
Choose a base branch
from
7 changes: 7 additions & 0 deletions envoy/http/async_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,13 @@ class SidestreamWatermarkCallbacks {
* watermarks have gone below.
*/
virtual void onSidestreamBelowLowWatermark() PURE;

/**
Sidestream subscribes to downstream watermark events.
*/
virtual void addDownstreamWatermarkCallbacks(Http::DownstreamWatermarkCallbacks& callbacks) PURE;
virtual void
removeDownstreamWatermarkCallbacks(Http::DownstreamWatermarkCallbacks& callbacks) PURE;
};

/**
Expand Down
13 changes: 11 additions & 2 deletions source/common/http/async_client_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -225,8 +225,17 @@ class AsyncStreamImpl : public virtual AsyncClient::Stream,
watermark_callbacks_->get().onSidestreamBelowLowWatermark();
}
}
void addDownstreamWatermarkCallbacks(DownstreamWatermarkCallbacks&) override {}
void removeDownstreamWatermarkCallbacks(DownstreamWatermarkCallbacks&) override {}
void addDownstreamWatermarkCallbacks(DownstreamWatermarkCallbacks& callbacks) override {
if (watermark_callbacks_.has_value()) {
watermark_callbacks_->get().addDownstreamWatermarkCallbacks(callbacks);
}
}
void removeDownstreamWatermarkCallbacks(DownstreamWatermarkCallbacks& callbacks) override {
if (watermark_callbacks_.has_value()) {
watermark_callbacks_->get().removeDownstreamWatermarkCallbacks(callbacks);
}
}

void setDecoderBufferLimit(uint32_t) override {
IS_ENVOY_BUG("decoder buffer limits should not be overridden on async streams.");
}
Expand Down
14 changes: 14 additions & 0 deletions source/common/http/sidestream_watermark.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,20 @@ class StreamFilterSidestreamWatermarkCallbacks : public Http::SidestreamWatermar
}
}

void addDownstreamWatermarkCallbacks(Http::DownstreamWatermarkCallbacks& callbacks) final {
if (decode_callback_ != nullptr) {
// Sidestream subscribes to downstream watermark events.
decode_callback_->addDownstreamWatermarkCallbacks(callbacks);
}
}

void removeDownstreamWatermarkCallbacks(Http::DownstreamWatermarkCallbacks& callbacks) final {
if (decode_callback_ != nullptr) {
// Sidestream stops subscribing to downstream watermark events.
decode_callback_->removeDownstreamWatermarkCallbacks(callbacks);
}
}

/**
* The set function needs to be called by stream decoder filter before side stream connection is
* established, to apply the backpressure to downstream when it is above watermark,
Expand Down
6 changes: 4 additions & 2 deletions test/common/http/async_client_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -454,7 +454,7 @@ TEST_F(AsyncClientImplTest, OngoingRequestWithWatermarking) {
const Buffer::OwnedImpl data_copy(data.toString());

EXPECT_CALL(cm_.thread_local_cluster_.conn_pool_, newStream(_, _, _))
.WillOnce(Invoke(
.WillRepeatedly(Invoke(
[&](ResponseDecoder& decoder, ConnectionPool::Callbacks& callbacks,
const ConnectionPool::Instance::StreamOptions&) -> ConnectionPool::Cancellable* {
callbacks.onPoolReady(stream_encoder_, cm_.thread_local_cluster_.conn_pool_.host_,
Expand All @@ -472,6 +472,8 @@ TEST_F(AsyncClientImplTest, OngoingRequestWithWatermarking) {
EXPECT_NE(request, nullptr);

StrictMock<MockSidestreamWatermarkCallbacks> watermark_callbacks;
EXPECT_CALL(watermark_callbacks, removeDownstreamWatermarkCallbacks(_));

// Registering a new watermark callback should note that the high watermark has already been hit.
EXPECT_CALL(watermark_callbacks, onSidestreamAboveHighWatermark());
request->setWatermarkCallbacks(watermark_callbacks);
Expand Down Expand Up @@ -529,9 +531,9 @@ TEST_F(AsyncClientImplTest, OngoingRequestWithWatermarkingAndReset) {
client_.startRequest(std::move(headers), callbacks_, AsyncClient::RequestOptions());
EXPECT_NE(request, nullptr);

// StrictMock<MockStreamDecoderFilterCallbacks> watermark_callbacks;
StrictMock<MockSidestreamWatermarkCallbacks> watermark_callbacks;
request->setWatermarkCallbacks(watermark_callbacks);
EXPECT_CALL(watermark_callbacks, removeDownstreamWatermarkCallbacks(_));

EXPECT_CALL(stream_encoder_, encodeData(BufferEqual(&data_copy), false));
request->sendData(data, false);
Expand Down
2 changes: 2 additions & 0 deletions test/mocks/http/mocks.h
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,8 @@ class MockSidestreamWatermarkCallbacks : public SidestreamWatermarkCallbacks {

MOCK_METHOD(void, onSidestreamAboveHighWatermark, ());
MOCK_METHOD(void, onSidestreamBelowLowWatermark, ());
MOCK_METHOD(void, addDownstreamWatermarkCallbacks, (DownstreamWatermarkCallbacks&));
MOCK_METHOD(void, removeDownstreamWatermarkCallbacks, (DownstreamWatermarkCallbacks&));
};

class MockStreamDecoderFilterCallbacks : public StreamDecoderFilterCallbacks,
Expand Down