Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

http: minor code clean up to the http filter manager #36027

Merged
merged 3 commits into from
Sep 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 35 additions & 61 deletions source/common/http/filter_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,12 +64,9 @@ class LocalReplyOwnerObject : public StreamInfo::FilterState::Object {
*/
struct ActiveStreamFilterBase : public virtual StreamFilterCallbacks,
Logger::Loggable<Logger::Id::http> {
ActiveStreamFilterBase(FilterManager& parent, bool is_encoder_decoder_filter,
FilterContext filter_context)
ActiveStreamFilterBase(FilterManager& parent, FilterContext filter_context)
: parent_(parent), iteration_state_(IterationState::Continue),
filter_context_(std::move(filter_context)), iterate_from_current_filter_(false),
headers_continued_(false), continued_1xx_headers_(false), end_stream_(false),
is_encoder_decoder_filter_(is_encoder_decoder_filter), processed_headers_(false) {}
filter_context_(std::move(filter_context)) {}

// Functions in the following block are called after the filter finishes processing
// corresponding data. Those functions handle state updates and data storage (if needed)
Expand Down Expand Up @@ -185,14 +182,13 @@ struct ActiveStreamFilterBase : public virtual StreamFilterCallbacks,
// hasn't parsed data and trailers. As a result, the filter iteration should start with the
// current filter instead of the next one. If true, filter iteration starts with the current
// filter. Otherwise, starts with the next filter in the chain.
bool iterate_from_current_filter_ : 1;
bool headers_continued_ : 1;
bool continued_1xx_headers_ : 1;
bool iterate_from_current_filter_{};
bool headers_continued_{};
bool continued_1xx_headers_{};
// If true, end_stream is called for this filter.
bool end_stream_ : 1;
const bool is_encoder_decoder_filter_ : 1;
bool end_stream_{};
// If true, the filter has processed headers.
bool processed_headers_ : 1;
bool processed_headers_{};
};

/**
Expand All @@ -202,9 +198,8 @@ struct ActiveStreamDecoderFilter : public ActiveStreamFilterBase,
public StreamDecoderFilterCallbacks,
LinkedObject<ActiveStreamDecoderFilter> {
ActiveStreamDecoderFilter(FilterManager& parent, StreamDecoderFilterSharedPtr filter,
bool is_encoder_decoder_filter, FilterContext filter_context)
: ActiveStreamFilterBase(parent, is_encoder_decoder_filter, std::move(filter_context)),
handle_(std::move(filter)) {
FilterContext filter_context)
: ActiveStreamFilterBase(parent, std::move(filter_context)), handle_(std::move(filter)) {
handle_->setDecoderFilterCallbacks(*this);
}

Expand Down Expand Up @@ -298,9 +293,8 @@ struct ActiveStreamEncoderFilter : public ActiveStreamFilterBase,
public StreamEncoderFilterCallbacks,
LinkedObject<ActiveStreamEncoderFilter> {
ActiveStreamEncoderFilter(FilterManager& parent, StreamEncoderFilterSharedPtr filter,
bool is_encoder_decoder_filter, FilterContext filter_context)
: ActiveStreamFilterBase(parent, is_encoder_decoder_filter, std::move(filter_context)),
handle_(std::move(filter)) {
FilterContext filter_context)
: ActiveStreamFilterBase(parent, std::move(filter_context)), handle_(std::move(filter)) {
handle_->setEncoderFilterCallbacks(*this);
}

Expand Down Expand Up @@ -720,30 +714,16 @@ class FilterManager : public ScopeTrackedObject,
std::list<AccessLog::InstanceSharedPtr> accessLogHandlers() { return access_log_handlers_; }

void onStreamComplete() {
for (auto& filter : decoder_filters_) {
filter->handle_->onStreamComplete();
}

for (auto& filter : encoder_filters_) {
// Do not call onStreamComplete twice for dual registered filters.
if (!filter->is_encoder_decoder_filter_) {
filter->handle_->onStreamComplete();
}
for (auto filter : filters_) {
filter->onStreamComplete();
}
}

void destroyFilters() {
state_.destroyed_ = true;

for (auto& filter : decoder_filters_) {
filter->handle_->onDestroy();
}

for (auto& filter : encoder_filters_) {
// Do not call on destroy twice for dual registered filters.
if (!filter->is_encoder_decoder_filter_) {
filter->handle_->onDestroy();
}
for (auto filter : filters_) {
filter->onDestroy();
}
}

Expand Down Expand Up @@ -884,50 +864,44 @@ class FilterManager : public ScopeTrackedObject,

protected:
struct State {
State()
: decoder_filter_chain_complete_(false), encoder_filter_chain_complete_(false),
observed_decode_end_stream_(false), observed_encode_end_stream_(false),
has_1xx_headers_(false), created_filter_chain_(false), is_head_request_(false),
is_grpc_request_(false), non_100_response_headers_encoded_(false),
under_on_local_reply_(false), decoder_filter_chain_aborted_(false),
encoder_filter_chain_aborted_(false), saw_downstream_reset_(false) {}
State() = default;
uint32_t filter_call_state_{0};

// Set after decoder filter chain has completed iteration. Prevents further calls to decoder
// filters. This flag is used to determine stream completion when the independent half-close is
// enabled.
bool decoder_filter_chain_complete_ : 1;
bool decoder_filter_chain_complete_{};

// Set after encoder filter chain has completed iteration. Prevents further calls to encoder
// filters. This flag is used to determine stream completion when the independent half-close is
// enabled.
bool encoder_filter_chain_complete_ : 1;
bool encoder_filter_chain_complete_{};

// Set `true` when the filter manager observes end stream on the decoder path (from downstream
// client) before iteration of the decoder filter chain begins. This flag is used for setting
// end_stream value when resuming decoder filter chain iteration.
bool observed_decode_end_stream_ : 1;
bool observed_decode_end_stream_{};
// Set `true` when the filter manager observes end stream on the encoder path (from upstream
// server or Envoy's local reply) before iteration of the encoder filter chain begins. This flag
// is used for setting end_stream value when resuming encoder filter chain iteration.
bool observed_encode_end_stream_ : 1;
bool observed_encode_end_stream_{};

// By default, we will assume there are no 1xx. If encode1xxHeaders
// is ever called, this is set to true so commonContinue resumes processing the 1xx.
bool has_1xx_headers_ : 1;
bool created_filter_chain_ : 1;
bool has_1xx_headers_{};
bool created_filter_chain_{};
// These two are latched on initial header read, to determine if the original headers
// constituted a HEAD or gRPC request, respectively.
bool is_head_request_ : 1;
bool is_grpc_request_ : 1;
bool is_head_request_{};
bool is_grpc_request_{};
// Tracks if headers other than 100-Continue have been encoded to the codec.
bool non_100_response_headers_encoded_ : 1;
bool non_100_response_headers_encoded_{};
// True under the stack of onLocalReply, false otherwise.
bool under_on_local_reply_ : 1;
bool under_on_local_reply_{};
// True when the filter chain iteration was aborted with local reply.
bool decoder_filter_chain_aborted_ : 1;
bool encoder_filter_chain_aborted_ : 1;
bool saw_downstream_reset_ : 1;
bool decoder_filter_chain_aborted_{};
bool encoder_filter_chain_aborted_{};
bool saw_downstream_reset_{};

// The following 3 members are booleans rather than part of the space-saving bitfield as they
// are passed as arguments to functions expecting bools. Extend State using the bitfield
Expand All @@ -952,24 +926,24 @@ class FilterManager : public ScopeTrackedObject,

void addStreamDecoderFilter(Http::StreamDecoderFilterSharedPtr filter) override {
manager_.addStreamFilterBase(filter.get());
manager_.addStreamDecoderFilter(std::make_unique<ActiveStreamDecoderFilter>(
manager_, std::move(filter), false, context_));
manager_.addStreamDecoderFilter(
std::make_unique<ActiveStreamDecoderFilter>(manager_, std::move(filter), context_));
}

void addStreamEncoderFilter(Http::StreamEncoderFilterSharedPtr filter) override {
manager_.addStreamFilterBase(filter.get());
manager_.addStreamEncoderFilter(std::make_unique<ActiveStreamEncoderFilter>(
manager_, std::move(filter), false, context_));
manager_.addStreamEncoderFilter(
std::make_unique<ActiveStreamEncoderFilter>(manager_, std::move(filter), context_));
}

void addStreamFilter(Http::StreamFilterSharedPtr filter) override {
StreamDecoderFilter* decoder_filter = filter.get();
manager_.addStreamFilterBase(decoder_filter);

manager_.addStreamDecoderFilter(
std::make_unique<ActiveStreamDecoderFilter>(manager_, filter, true, context_));
std::make_unique<ActiveStreamDecoderFilter>(manager_, filter, context_));
manager_.addStreamEncoderFilter(
std::make_unique<ActiveStreamEncoderFilter>(manager_, std::move(filter), true, context_));
std::make_unique<ActiveStreamEncoderFilter>(manager_, std::move(filter), context_));
}

void addAccessLogHandler(AccessLog::InstanceSharedPtr handler) override {
Expand Down
14 changes: 4 additions & 10 deletions test/common/http/conn_manager_impl_test_base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -394,21 +394,15 @@ void HttpConnectionManagerImplMixin::expectOnDestroy(bool deferred) {
for (auto filter : decoder_filters_) {
EXPECT_CALL(*filter, onStreamComplete());
}
{
auto setup_filter_expect = [](MockStreamEncoderFilter* filter) {
EXPECT_CALL(*filter, onStreamComplete());
};
std::for_each(encoder_filters_.rbegin(), encoder_filters_.rend(), setup_filter_expect);
for (auto filter : encoder_filters_) {
EXPECT_CALL(*filter, onStreamComplete());
}

for (auto filter : decoder_filters_) {
EXPECT_CALL(*filter, onDestroy());
}
{
auto setup_filter_expect = [](MockStreamEncoderFilter* filter) {
EXPECT_CALL(*filter, onDestroy());
};
std::for_each(encoder_filters_.rbegin(), encoder_filters_.rend(), setup_filter_expect);
for (auto filter : encoder_filters_) {
EXPECT_CALL(*filter, onDestroy());
}

if (deferred) {
Expand Down
Loading