Skip to content

Commit

Permalink
event: Remove a source of non-determinism by always running deferred …
Browse files Browse the repository at this point in the history
…deletion before post callbacks (#14293)

Signed-off-by: Antonio Vicente <avd@google.com>
  • Loading branch information
antoniovicente authored Dec 8, 2020
1 parent 3b15582 commit 5932dfd
Show file tree
Hide file tree
Showing 3 changed files with 104 additions and 12 deletions.
5 changes: 5 additions & 0 deletions source/common/event/dispatcher_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,11 @@ void DispatcherImpl::updateApproximateMonotonicTimeInternal() {
}

void DispatcherImpl::runPostCallbacks() {
// Clear the deferred delete list before running post callbacks to reduce non-determinism in
// callback processing, and more easily detect if a scheduled post callback refers to one of the
// objects that is being deferred deleted.
clearDeferredDeleteList();

while (true) {
// It is important that this declaration is inside the body of the loop so that the callback is
// destructed while post_lock_ is not held. If callback is declared outside the loop and reused
Expand Down
18 changes: 18 additions & 0 deletions test/common/event/dispatcher_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,24 @@ TEST(DeferredTaskTest, DeferredTask) {
dispatcher->clearDeferredDeleteList();
}

TEST(DeferredDeleteTest, DeferredDeleteAndPostOrdering) {
InSequence s;

Api::ApiPtr api = Api::createApiForTest();
DispatcherPtr dispatcher(api->allocateDispatcher("test_thread"));
ReadyWatcher post_watcher;
ReadyWatcher delete_watcher;

// DeferredDelete should always run before post callbacks.
EXPECT_CALL(delete_watcher, ready());
EXPECT_CALL(post_watcher, ready());

dispatcher->post([&]() { post_watcher.ready(); });
dispatcher->deferredDelete(
std::make_unique<TestDeferredDeletable>([&]() -> void { delete_watcher.ready(); }));
dispatcher->run(Dispatcher::RunType::NonBlock);
}

class DispatcherImplTest : public testing::Test {
protected:
DispatcherImplTest()
Expand Down
93 changes: 81 additions & 12 deletions test/integration/fake_upstream.cc
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,16 @@ void FakeStream::postToConnectionThread(std::function<void()> cb) {
void FakeStream::encode100ContinueHeaders(const Http::ResponseHeaderMap& headers) {
std::shared_ptr<Http::ResponseHeaderMap> headers_copy(
Http::createHeaderMap<Http::ResponseHeaderMapImpl>(headers));
parent_.connection().dispatcher().post(
[this, headers_copy]() -> void { encoder_.encode100ContinueHeaders(*headers_copy); });
parent_.connection().dispatcher().post([this, headers_copy]() -> void {
{
absl::MutexLock lock(&lock_);
if (saw_reset_) {
// Encoded already deleted.
return;
}
}
encoder_.encode100ContinueHeaders(*headers_copy);
});
}

void FakeStream::encodeHeaders(const Http::HeaderMap& headers, bool end_stream) {
Expand All @@ -82,50 +90,111 @@ void FakeStream::encodeHeaders(const Http::HeaderMap& headers, bool end_stream)
}

parent_.connection().dispatcher().post([this, headers_copy, end_stream]() -> void {
{
absl::MutexLock lock(&lock_);
if (saw_reset_) {
// Encoded already deleted.
return;
}
}
encoder_.encodeHeaders(*headers_copy, end_stream);
});
}

void FakeStream::encodeData(absl::string_view data, bool end_stream) {
parent_.connection().dispatcher().post([this, data, end_stream]() -> void {
{
absl::MutexLock lock(&lock_);
if (saw_reset_) {
// Encoded already deleted.
return;
}
}
Buffer::OwnedImpl fake_data(data.data(), data.size());
encoder_.encodeData(fake_data, end_stream);
});
}

void FakeStream::encodeData(uint64_t size, bool end_stream) {
parent_.connection().dispatcher().post([this, size, end_stream]() -> void {
{
absl::MutexLock lock(&lock_);
if (saw_reset_) {
// Encoded already deleted.
return;
}
}
Buffer::OwnedImpl data(std::string(size, 'a'));
encoder_.encodeData(data, end_stream);
});
}

void FakeStream::encodeData(Buffer::Instance& data, bool end_stream) {
std::shared_ptr<Buffer::Instance> data_copy = std::make_shared<Buffer::OwnedImpl>(data);
parent_.connection().dispatcher().post(
[this, data_copy, end_stream]() -> void { encoder_.encodeData(*data_copy, end_stream); });
parent_.connection().dispatcher().post([this, data_copy, end_stream]() -> void {
{
absl::MutexLock lock(&lock_);
if (saw_reset_) {
// Encoded already deleted.
return;
}
}
encoder_.encodeData(*data_copy, end_stream);
});
}

void FakeStream::encodeTrailers(const Http::HeaderMap& trailers) {
std::shared_ptr<Http::ResponseTrailerMap> trailers_copy(
Http::createHeaderMap<Http::ResponseTrailerMapImpl>(trailers));
parent_.connection().dispatcher().post(
[this, trailers_copy]() -> void { encoder_.encodeTrailers(*trailers_copy); });
parent_.connection().dispatcher().post([this, trailers_copy]() -> void {
{
absl::MutexLock lock(&lock_);
if (saw_reset_) {
// Encoded already deleted.
return;
}
}
encoder_.encodeTrailers(*trailers_copy);
});
}

void FakeStream::encodeResetStream() {
parent_.connection().dispatcher().post(
[this]() -> void { encoder_.getStream().resetStream(Http::StreamResetReason::LocalReset); });
parent_.connection().dispatcher().post([this]() -> void {
{
absl::MutexLock lock(&lock_);
if (saw_reset_) {
// Encoded already deleted.
return;
}
}
encoder_.getStream().resetStream(Http::StreamResetReason::LocalReset);
});
}

void FakeStream::encodeMetadata(const Http::MetadataMapVector& metadata_map_vector) {
parent_.connection().dispatcher().post(
[this, &metadata_map_vector]() -> void { encoder_.encodeMetadata(metadata_map_vector); });
parent_.connection().dispatcher().post([this, &metadata_map_vector]() -> void {
{
absl::MutexLock lock(&lock_);
if (saw_reset_) {
// Encoded already deleted.
return;
}
}
encoder_.encodeMetadata(metadata_map_vector);
});
}

void FakeStream::readDisable(bool disable) {
parent_.connection().dispatcher().post(
[this, disable]() -> void { encoder_.getStream().readDisable(disable); });
parent_.connection().dispatcher().post([this, disable]() -> void {
{
absl::MutexLock lock(&lock_);
if (saw_reset_) {
// Encoded already deleted.
return;
}
}
encoder_.getStream().readDisable(disable);
});
}

void FakeStream::onResetStream(Http::StreamResetReason, absl::string_view) {
Expand Down

0 comments on commit 5932dfd

Please sign in to comment.