Skip to content

Commit

Permalink
thrift_proxy: fix crash on invalid transport/protocol (#8143)
Browse files Browse the repository at this point in the history
Transport/protocol decoder errors that occur before the connection manager
initializes an ActiveRPC to track the request caused a crash. Modifies the
connection manager to handle this case, terminating the downstream the
connection.

Risk Level: low
Testing: test case that triggers crash
Docs Changes: n/a
Release Notes: added

Signed-off-by: Stephan Zuercher <zuercher@gmail.com>
  • Loading branch information
zuercher authored Sep 5, 2019
1 parent d21d92c commit 22e4827
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 2 deletions.
1 change: 1 addition & 0 deletions docs/root/intro/version_history.rst
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ Version history
* router check tool: add coverage reporting & enforcement.
* router check tool: add comprehensive coverage reporting.
* router check tool: add deprecated field check.
* thrift_proxy: fix crashing bug on invalid transport/protocol framing
* tls: added verification of IP address SAN fields in certificates against configured SANs in the
* tracing: added support to the Zipkin reporter for sending list of spans as Zipkin JSON v2 and protobuf message over HTTP.
certificate validation context.
Expand Down
10 changes: 8 additions & 2 deletions source/extensions/filters/network/thrift_proxy/conn_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,14 @@ void ConnectionManager::dispatch() {
} catch (const EnvoyException& ex) {
ENVOY_CONN_LOG(error, "thrift error: {}", read_callbacks_->connection(), ex.what());

// Use the current rpc to send an error downstream, if possible.
rpcs_.front()->onError(ex.what());
if (rpcs_.empty()) {
// Transport/protocol mismatch (including errors in automatic detection). Just hang up
// since we don't know how to encode a response.
read_callbacks_->connection().close(Network::ConnectionCloseType::FlushWrite);
} else {
// Use the current rpc's transport/protocol to send an error downstream.
rpcs_.front()->onError(ex.what());
}
}

stats_.request_decoding_error_.inc();
Expand Down
46 changes: 46 additions & 0 deletions test/extensions/filters/network/thrift_proxy/conn_manager_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -509,6 +509,17 @@ TEST_F(ThriftConnectionManagerTest, OnDataHandlesTransportApplicationException)
EXPECT_EQ(0U, stats_.request_active_.value());
}

// Tests that OnData handles non-thrift input. Regression test for crash on invalid input.
TEST_F(ThriftConnectionManagerTest, OnDataHandlesGarbageRequest) {
initializeFilter();
addRepeated(buffer_, 8, 0);
EXPECT_CALL(filter_callbacks_.connection_, close(Network::ConnectionCloseType::FlushWrite));

EXPECT_EQ(filter_->onData(buffer_, false), Network::FilterStatus::StopIteration);
EXPECT_EQ(1U, store_.counter("test.request_decoding_error").value());
EXPECT_EQ(0U, stats_.request_active_.value());
}

TEST_F(ThriftConnectionManagerTest, OnEvent) {
// No active calls
{
Expand Down Expand Up @@ -895,6 +906,41 @@ TEST_F(ThriftConnectionManagerTest, RequestAndTransportApplicationException) {
EXPECT_EQ(1U, store_.counter("test.response_decoding_error").value());
}

// Tests that a request is routed and a non-thrift response is handled.
TEST_F(ThriftConnectionManagerTest, RequestAndGarbageResponse) {
initializeFilter();
writeFramedBinaryMessage(buffer_, MessageType::Call, 0x0F);

ThriftFilters::DecoderFilterCallbacks* callbacks{};
EXPECT_CALL(*decoder_filter_, setDecoderFilterCallbacks(_))
.WillOnce(
Invoke([&](ThriftFilters::DecoderFilterCallbacks& cb) -> void { callbacks = &cb; }));

EXPECT_EQ(filter_->onData(buffer_, false), Network::FilterStatus::StopIteration);
EXPECT_EQ(1U, store_.counter("test.request_call").value());

addRepeated(write_buffer_, 8, 0);

FramedTransportImpl transport;
BinaryProtocolImpl proto;
callbacks->startUpstreamResponse(transport, proto);

EXPECT_CALL(filter_callbacks_.connection_.dispatcher_, deferredDelete_(_)).Times(1);
EXPECT_EQ(ThriftFilters::ResponseStatus::Reset, callbacks->upstreamData(write_buffer_));

filter_callbacks_.connection_.dispatcher_.clearDeferredDeleteList();

EXPECT_EQ(1U, store_.counter("test.request").value());
EXPECT_EQ(1U, store_.counter("test.request_call").value());
EXPECT_EQ(0U, stats_.request_active_.value());
EXPECT_EQ(0U, store_.counter("test.response").value());
EXPECT_EQ(0U, store_.counter("test.response_reply").value());
EXPECT_EQ(1U, store_.counter("test.response_exception").value());
EXPECT_EQ(0U, store_.counter("test.response_invalid_type").value());
EXPECT_EQ(0U, store_.counter("test.response_success").value());
EXPECT_EQ(0U, store_.counter("test.response_error").value());
}

TEST_F(ThriftConnectionManagerTest, PipelinedRequestAndResponse) {
initializeFilter();

Expand Down

0 comments on commit 22e4827

Please sign in to comment.