Skip to content

Commit

Permalink
Reimplement base::WaitableEvent with a kqueue on Mac.
Browse files Browse the repository at this point in the history
For a single WaitableEvent, a custom EVFILT_USER kevent is used to wait and
signal. This replaces the default POSIX implementation that uses a
pthread_cond_t and a boolean flag.

To implement WaitMany, a new kqueue is created to wait on all the individual
WaitableEvent's kqueue descriptor. This replaces a complex locking algorithm
used in the default POSIX implementation.

For the asynchronous WaitableEventWatcher, a TYPE_READ dispatch_source_t is
used to watch the WaitableEvent's kqueue. This replaces the POSIX
implementation of a reference-counted list of async watchers guarded by a lock.

Microbenchmarks show that the kqueue implementation is significantly faster in
most cases. The one potential drawback is hitting the low RLIMIT_NOFILE on
macOS, since each WaitableEvent and WaitableEventWatcher requires a new
descriptor.

Bug: 681167
Change-Id: I135012fdd25e547ffb911fc7adc97c203df38241
Reviewed-on: https://chromium-review.googlesource.com/553497
Reviewed-by: Robert Liao <robliao@chromium.org>
Reviewed-by: Mark Mentovai <mark@chromium.org>
Commit-Queue: Robert Sesek <rsesek@chromium.org>
Cr-Commit-Position: refs/heads/master@{#485788}
  • Loading branch information
rsesek authored and Commit Bot committed Jul 12, 2017
1 parent 14e88d1 commit 2096391
Show file tree
Hide file tree
Showing 6 changed files with 258 additions and 4 deletions.
6 changes: 6 additions & 0 deletions base/BUILD.gn
Original file line number Diff line number Diff line change
Expand Up @@ -789,8 +789,10 @@ component("base") {
"synchronization/read_write_lock_win.cc",
"synchronization/spin_wait.h",
"synchronization/waitable_event.h",
"synchronization/waitable_event_mac.cc",
"synchronization/waitable_event_posix.cc",
"synchronization/waitable_event_watcher.h",
"synchronization/waitable_event_watcher_mac.cc",
"synchronization/waitable_event_watcher_posix.cc",
"synchronization/waitable_event_watcher_win.cc",
"synchronization/waitable_event_win.cc",
Expand Down Expand Up @@ -1527,6 +1529,8 @@ component("base") {
"memory/shared_memory_posix.cc",
"native_library_posix.cc",
"strings/sys_string_conversions_posix.cc",
"synchronization/waitable_event_posix.cc",
"synchronization/waitable_event_watcher_posix.cc",
"threading/platform_thread_internal_posix.cc",
]

Expand Down Expand Up @@ -1653,6 +1657,8 @@ component("base") {
"power_monitor/power_monitor_device_source_ios.mm",
"process/memory_stubs.cc",
"strings/sys_string_conversions_mac.mm",
"synchronization/waitable_event_mac.cc",
"synchronization/waitable_event_watcher_mac.cc",
"threading/platform_thread_mac.mm",
"time/time_conversion_posix.cc",
"time/time_mac.cc",
Expand Down
15 changes: 12 additions & 3 deletions base/synchronization/waitable_event.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,12 @@

#if defined(OS_WIN)
#include "base/win/scoped_handle.h"
#endif

#if defined(OS_POSIX)
#elif defined(OS_MACOSX)
#include "base/files/scoped_file.h"
#elif defined(OS_POSIX)
#include <list>
#include <utility>

#include "base/memory/ref_counted.h"
#include "base/synchronization/lock.h"
#endif
Expand Down Expand Up @@ -154,6 +155,14 @@ class BASE_EXPORT WaitableEvent {

#if defined(OS_WIN)
win::ScopedHandle handle_;
#elif defined(OS_MACOSX)
// The kqueue used to signal and wait on a custom user event.
ScopedFD kqueue_;

// Creates a kevent64_s, filling in the values using EV_SET64() with the
// specified flags and filter flags, and then submits it as a change to the
// |kqueue_|.
void PostEvent(uint16_t flags, uint32_t fflags);
#else
// On Windows, you must not close a HANDLE which is currently being waited on.
// The MSDN documentation says that the resulting behaviour is 'undefined'.
Expand Down
140 changes: 140 additions & 0 deletions base/synchronization/waitable_event_mac.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
// Copyright 2017 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#include "base/synchronization/waitable_event.h"

#include <sys/event.h>

#include <vector>

#include "base/debug/activity_tracker.h"
#include "base/logging.h"
#include "base/posix/eintr_wrapper.h"
#include "base/threading/thread_restrictions.h"

namespace base {

WaitableEvent::WaitableEvent(ResetPolicy reset_policy,
InitialState initial_state)
: kqueue_(kqueue()) {
PCHECK(kqueue_.is_valid()) << "kqueue";
uint16_t flags = EV_ADD;

if (reset_policy == ResetPolicy::AUTOMATIC)
flags |= EV_CLEAR;

// The initial event registration.
PostEvent(flags, 0);

if (initial_state == InitialState::SIGNALED)
Signal();
}

WaitableEvent::~WaitableEvent() = default;

void WaitableEvent::Reset() {
PostEvent(EV_DISABLE, 0);
}

void WaitableEvent::Signal() {
PostEvent(EV_ENABLE, NOTE_TRIGGER);
}

bool WaitableEvent::IsSignaled() {
// TODO(rsesek): Use KEVENT_FLAG_IMMEDIATE rather than an empty timeout.
timespec ts{};
kevent64_s event;
int rv = kevent64(kqueue_.get(), nullptr, 0, &event, 1, 0, &ts);
PCHECK(rv >= 0) << "kevent64 IsSignaled";
return rv > 0;
}

void WaitableEvent::Wait() {
bool result = TimedWaitUntil(TimeTicks::Max());
DCHECK(result) << "TimedWait() should never fail with infinite timeout";
}

bool WaitableEvent::TimedWait(const TimeDelta& wait_delta) {
return TimedWaitUntil(TimeTicks::Now() + wait_delta);
}

bool WaitableEvent::TimedWaitUntil(const TimeTicks& end_time) {
ThreadRestrictions::AssertWaitAllowed();
// Record the event that this thread is blocking upon (for hang diagnosis).
debug::ScopedEventWaitActivity event_activity(this);

bool indefinite = end_time.is_max();

int rv = 0;

do {
TimeDelta wait_time = end_time - TimeTicks::Now();
if (wait_time < TimeDelta()) {
// A negative delta would be treated by the system as indefinite, but
// it needs to be treated as a poll instead.
wait_time = TimeDelta();
}

timespec timeout = wait_time.ToTimeSpec();

// This does not use HANDLE_EINTR, since retrying the syscall requires
// adjusting the timeout to account for time already waited.
kevent64_s event;
rv = kevent64(kqueue_.get(), nullptr, 0, &event, 1, 0,
indefinite ? nullptr : &timeout);
} while (rv < 0 && errno == EINTR);

PCHECK(rv >= 0) << "kevent64 TimedWait";
return rv > 0;
}

// static
size_t WaitableEvent::WaitMany(WaitableEvent** raw_waitables, size_t count) {
ThreadRestrictions::AssertWaitAllowed();
DCHECK(count) << "Cannot wait on no events";

// Record an event (the first) that this thread is blocking upon.
debug::ScopedEventWaitActivity event_activity(raw_waitables[0]);

std::vector<kevent64_s> events(count);
for (size_t i = 0; i < count; ++i) {
EV_SET64(&events[i], raw_waitables[i]->kqueue_.get(), EVFILT_READ,
EV_ADD | EV_CLEAR, 0, 0, i, 0, 0);
}

std::vector<kevent64_s> out_events(count);

ScopedFD wait_many(kqueue());
PCHECK(wait_many.is_valid()) << "kqueue WaitMany";

int rv = HANDLE_EINTR(kevent64(wait_many.get(), events.data(), count,
out_events.data(), count, 0, nullptr));
PCHECK(rv > 0) << "kevent64: WaitMany";

size_t triggered = -1;
for (size_t i = 0; i < static_cast<size_t>(rv); ++i) {
// WaitMany should return the lowest index in |raw_waitables| that was
// triggered.
size_t index = static_cast<size_t>(out_events[i].udata);
triggered = std::min(triggered, index);
}

// The WaitMany kevent has identified which kqueue was signaled. Trigger
// a Wait on it to clear the event within WaitableEvent's kqueue. This
// will not block, since it has been triggered.
raw_waitables[triggered]->Wait();

return triggered;
}

void WaitableEvent::PostEvent(uint16_t flags, uint32_t fflags) {
kevent64_s event;
EV_SET64(&event, reinterpret_cast<uint64_t>(this), EVFILT_USER, flags, fflags,
0, 0, 0, 0);
int rv =
HANDLE_EINTR(kevent64(kqueue_.get(), &event, 1, nullptr, 0, 0, nullptr));
PCHECK(rv == 0) << "kevent64";
}

} // namespace base
27 changes: 26 additions & 1 deletion base/synchronization/waitable_event_watcher.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,20 @@
#if defined(OS_WIN)
#include "base/win/object_watcher.h"
#include "base/win/scoped_handle.h"
#elif defined(OS_MACOSX)
#include <dispatch/dispatch.h>

#include "base/files/scoped_file.h"
#include "base/mac/scoped_dispatch_object.h"
#else
#include "base/callback.h"
#include "base/sequence_checker.h"
#include "base/synchronization/waitable_event.h"
#endif

#if !defined(OS_WIN)
#include "base/callback.h"
#endif

namespace base {

class Flag;
Expand Down Expand Up @@ -101,6 +109,23 @@ class BASE_EXPORT WaitableEventWatcher

EventCallback callback_;
WaitableEvent* event_ = nullptr;
#elif defined(OS_MACOSX)
// Invokes the callback and resets the source. Must be called on the task
// runner on which StartWatching() was called.
void InvokeCallback();

// Closure bound to the event being watched. This will be is_null() if
// nothing is being watched.
OnceClosure callback_;

// A dup()'d descriptor of the kqueue used by the WaitableEvent being
// watched, or the invalid value if nothing is.
ScopedFD kqueue_;

// A TYPE_READ dispatch source on |kqueue_|. When a read event is delivered,
// the kqueue has an event pending, and the bound |callback_| will be
// invoked. This will be null if nothing is currently being watched.
ScopedDispatchObject<dispatch_source_t> source_;
#else
// Instantiated in StartWatching(). Set before the callback runs. Reset in
// StopWatching() or StartWatching().
Expand Down
72 changes: 72 additions & 0 deletions base/synchronization/waitable_event_watcher_mac.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
// Copyright 2017 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#include "base/synchronization/waitable_event_watcher.h"

#include "base/bind.h"
#include "base/callback.h"
#include "base/synchronization/waitable_event.h"
#include "base/threading/sequenced_task_runner_handle.h"

namespace base {

WaitableEventWatcher::WaitableEventWatcher() {}

WaitableEventWatcher::~WaitableEventWatcher() {
StopWatching();
}

bool WaitableEventWatcher::StartWatching(WaitableEvent* event,
EventCallback callback) {
DCHECK(!source_ || dispatch_source_testcancel(source_));

kqueue_.reset(dup(event->kqueue_.get()));
if (!kqueue_.is_valid()) {
PLOG(ERROR) << "dup kqueue";
return false;
}

// Use the global concurrent queue here, since it is only used to thunk
// to the real callback on the target task runner.
source_.reset(dispatch_source_create(
DISPATCH_SOURCE_TYPE_READ, kqueue_.get(), 0,
dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0)));

callback_ = BindOnce(std::move(callback), event);

scoped_refptr<SequencedTaskRunner> task_runner =
SequencedTaskRunnerHandle::Get();
dispatch_source_set_event_handler(source_, ^{
// Cancel the source immediately, since libdispatch will continue to send
// events until the kqueue is drained.
dispatch_source_cancel(source_);

task_runner->PostTask(
FROM_HERE,
BindOnce(&WaitableEventWatcher::InvokeCallback, Unretained(this)));
});
dispatch_resume(source_);

return true;
}

void WaitableEventWatcher::StopWatching() {
callback_.Reset();
if (source_) {
dispatch_source_cancel(source_);
source_.reset();
}
kqueue_.reset();
}

void WaitableEventWatcher::InvokeCallback() {
// The callback can be null if StopWatching() is called between signaling
// and the |callback_| getting run on the target task runner.
if (callback_.is_null())
return;
source_.reset();
std::move(callback_).Run();
}

} // namespace base
2 changes: 2 additions & 0 deletions base/task_scheduler/task_tracker_unittest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -784,6 +784,8 @@ TEST_F(TaskSchedulerTaskTrackerTest,

for (const auto& thread : post_threads)
thread->Join();
// Clean up unused Thread objects to avoid running out of system resources.
post_threads.clear();

// Call Shutdown() asynchronously.
CallShutdownAsync();
Expand Down

0 comments on commit 2096391

Please sign in to comment.