Skip to content

Commit

Permalink
TaskScheduler: Remove SchedulerWorker::Thread.
Browse files Browse the repository at this point in the history
This CL moves all functionnality from SchedulerWorker::Thread to
SchedulerWorker, and removes SchedulerWorker::Thread.

We needed SchedulerWorker::Thread when a SchedulerWorker could
re-create its native thread. Now, it's just adding complexity to
our codebase.

Change-Id: I5db4fa4fb0419b27641251cb26c9ed373bae7b74
Reviewed-on: https://chromium-review.googlesource.com/1018167
Reviewed-by: Robert Liao <robliao@chromium.org>
Commit-Queue: François Doray <fdoray@chromium.org>
Cr-Commit-Position: refs/heads/master@{#552710}
  • Loading branch information
fdoray authored and Commit Bot committed Apr 23, 2018
1 parent 7c47545 commit 6e1b000
Show file tree
Hide file tree
Showing 2 changed files with 168 additions and 195 deletions.
317 changes: 132 additions & 185 deletions base/task_scheduler/scheduler_worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
#include <utility>

#include "base/logging.h"
#include "base/memory/ptr_util.h"
#include "base/task_scheduler/task_tracker.h"
#include "base/trace_event/trace_event.h"

Expand All @@ -23,160 +22,6 @@
namespace base {
namespace internal {

class SchedulerWorker::Thread : public PlatformThread::Delegate {
public:
~Thread() override = default;

static std::unique_ptr<Thread> Create(scoped_refptr<SchedulerWorker> outer) {
std::unique_ptr<Thread> thread(new Thread(std::move(outer)));
thread->Initialize();
if (thread->thread_handle_.is_null())
return nullptr;
return thread;
}

// PlatformThread::Delegate.
void ThreadMain() override {
TRACE_EVENT_BEGIN0("task_scheduler", "SchedulerWorkerThread active");

outer_->delegate_->OnMainEntry(outer_.get());

// A SchedulerWorker starts out waiting for work.
{
TRACE_EVENT_END0("task_scheduler", "SchedulerWorkerThread active");
outer_->delegate_->WaitForWork(&wake_up_event_);
TRACE_EVENT_BEGIN0("task_scheduler", "SchedulerWorkerThread active");
}

// When defined(COM_INIT_CHECK_HOOK_ENABLED), ignore
// SchedulerBackwardCompatibility::INIT_COM_STA to find incorrect uses of
// COM that should be running in a COM STA Task Runner.
#if defined(OS_WIN) && !defined(COM_INIT_CHECK_HOOK_ENABLED)
std::unique_ptr<win::ScopedCOMInitializer> com_initializer;
if (outer_->backward_compatibility_ ==
SchedulerBackwardCompatibility::INIT_COM_STA) {
com_initializer = std::make_unique<win::ScopedCOMInitializer>();
}
#endif

while (!outer_->ShouldExit()) {
DCHECK(outer_);

#if defined(OS_MACOSX)
mac::ScopedNSAutoreleasePool autorelease_pool;
#endif

UpdateThreadPriority(GetDesiredThreadPriority());

// Get the sequence containing the next task to execute.
scoped_refptr<Sequence> sequence =
outer_->delegate_->GetWork(outer_.get());
if (!sequence) {
// Exit immediately if GetWork() resulted in detaching this worker.
if (outer_->ShouldExit())
break;

TRACE_EVENT_END0("task_scheduler", "SchedulerWorkerThread active");
outer_->delegate_->WaitForWork(&wake_up_event_);
TRACE_EVENT_BEGIN0("task_scheduler", "SchedulerWorkerThread active");
continue;
}

sequence = outer_->task_tracker_->RunAndPopNextTask(
std::move(sequence), outer_->delegate_.get());

outer_->delegate_->DidRunTask();

// Re-enqueue |sequence| if allowed by RunNextTask().
if (sequence)
outer_->delegate_->ReEnqueueSequence(std::move(sequence));

// Calling WakeUp() guarantees that this SchedulerWorker will run
// Tasks from Sequences returned by the GetWork() method of |delegate_|
// until it returns nullptr. Resetting |wake_up_event_| here doesn't break
// this invariant and avoids a useless loop iteration before going to
// sleep if WakeUp() is called while this SchedulerWorker is awake.
wake_up_event_.Reset();
}

// Important: It is unsafe to access unowned state (e.g. |task_tracker_|)
// after invoking OnMainExit().

outer_->delegate_->OnMainExit(outer_.get());

// Break the ownership circle between SchedulerWorker and Thread.
// This can result in deleting |this| and as such no more member accesses
// should be made after this point.
outer_ = nullptr;

TRACE_EVENT_END0("task_scheduler", "SchedulerWorkerThread active");
}

void Join() { PlatformThread::Join(thread_handle_); }

void Detach() { PlatformThread::Detach(thread_handle_); }

void WakeUp() { wake_up_event_.Signal(); }

private:
Thread(scoped_refptr<SchedulerWorker> outer)
: outer_(std::move(outer)),
current_thread_priority_(GetDesiredThreadPriority()) {
DCHECK(outer_);
}

void Initialize() {
constexpr size_t kDefaultStackSize = 0;
PlatformThread::CreateWithPriority(kDefaultStackSize, this, &thread_handle_,
current_thread_priority_);
}

// Returns the priority for which the thread should be set based on the
// priority hint, current shutdown state, and platform capabilities.
ThreadPriority GetDesiredThreadPriority() const {
DCHECK(outer_);

// All threads have a NORMAL priority when Lock doesn't handle multiple
// thread priorities.
if (!Lock::HandlesMultipleThreadPriorities())
return ThreadPriority::NORMAL;

// To avoid shutdown hangs, disallow a priority below NORMAL during
// shutdown. If thread priority cannot be increased, never allow a priority
// below NORMAL.
if (static_cast<int>(outer_->priority_hint_) <
static_cast<int>(ThreadPriority::NORMAL) &&
(outer_->task_tracker_->HasShutdownStarted() ||
!PlatformThread::CanIncreaseCurrentThreadPriority())) {
return ThreadPriority::NORMAL;
}

return outer_->priority_hint_;
}

void UpdateThreadPriority(ThreadPriority desired_thread_priority) {
if (desired_thread_priority == current_thread_priority_)
return;

PlatformThread::SetCurrentThreadPriority(desired_thread_priority);
current_thread_priority_ = desired_thread_priority;
}

PlatformThreadHandle thread_handle_;

scoped_refptr<SchedulerWorker> outer_;

// Event signaled to wake up this thread.
WaitableEvent wake_up_event_{WaitableEvent::ResetPolicy::AUTOMATIC,
WaitableEvent::InitialState::NOT_SIGNALED};

// Current priority of this thread. May be different from
// |outer_->priority_hint_|.
ThreadPriority current_thread_priority_;

DISALLOW_COPY_AND_ASSIGN(Thread);
};

void SchedulerWorker::Delegate::WaitForWork(WaitableEvent* wake_up_event) {
DCHECK(wake_up_event);
const TimeDelta sleep_time = GetSleepTimeout();
Expand All @@ -196,9 +41,10 @@ SchedulerWorker::SchedulerWorker(
const SchedulerLock* predecessor_lock,
SchedulerBackwardCompatibility backward_compatibility)
: thread_lock_(predecessor_lock),
priority_hint_(priority_hint),
delegate_(std::move(delegate)),
task_tracker_(std::move(task_tracker))
task_tracker_(std::move(task_tracker)),
priority_hint_(priority_hint),
current_thread_priority_(GetDesiredThreadPriority())
#if defined(OS_WIN) && !defined(COM_INIT_CHECK_HOOK_ENABLED)
,
backward_compatibility_(backward_compatibility)
Expand All @@ -210,68 +56,70 @@ SchedulerWorker::SchedulerWorker(

bool SchedulerWorker::Start() {
AutoSchedulerLock auto_lock(thread_lock_);
DCHECK(!thread_);
DCHECK(thread_handle_.is_null());

if (should_exit_.IsSet())
return true;

thread_ = Thread::Create(WrapRefCounted(this));
return !!thread_;
self_ = this;

constexpr size_t kDefaultStackSize = 0;
PlatformThread::CreateWithPriority(kDefaultStackSize, this, &thread_handle_,
current_thread_priority_);

if (thread_handle_.is_null()) {
self_ = nullptr;
return false;
}

return true;
}

void SchedulerWorker::WakeUp() {
AutoSchedulerLock auto_lock(thread_lock_);

// Calling WakeUp() after Cleanup() or Join() is wrong because the
// SchedulerWorker cannot run more tasks.
DCHECK(!join_called_for_testing_.IsSet());
// Calling WakeUp() after Cleanup() is wrong because the SchedulerWorker
// cannot run more tasks.
DCHECK(!should_exit_.IsSet());
if (thread_)
thread_->WakeUp();
wake_up_event_.Signal();
}

void SchedulerWorker::JoinForTesting() {
DCHECK(!join_called_for_testing_.IsSet());
join_called_for_testing_.Set();
wake_up_event_.Signal();

std::unique_ptr<Thread> thread;
PlatformThreadHandle thread_handle;

{
AutoSchedulerLock auto_lock(thread_lock_);

if (thread_) {
// Make sure the thread is awake. It will see that
// |join_called_for_testing_| is set and exit shortly after.
thread_->WakeUp();
thread = std::move(thread_);
}
DCHECK(!thread_handle_.is_null());
thread_handle = thread_handle_;
// Reset |thread_handle_| so it isn't joined by the destructor.
thread_handle_ = PlatformThreadHandle();
}

if (thread)
thread->Join();
PlatformThread::Join(thread_handle);
}

bool SchedulerWorker::ThreadAliveForTesting() const {
AutoSchedulerLock auto_lock(thread_lock_);
return !!thread_;
return !thread_handle_.is_null();
}

SchedulerWorker::~SchedulerWorker() {
if (thread_) {
if (join_called_for_testing_.IsSet())
return;
AutoSchedulerLock auto_lock(thread_lock_);

DCHECK(should_exit_.IsSet());
thread_->Detach();
// If |thread_handle_| wasn't joined, detach it.
if (!thread_handle_.is_null()) {
DCHECK(!join_called_for_testing_.IsSet());
PlatformThread::Detach(thread_handle_);
}
}

void SchedulerWorker::Cleanup() {
AutoSchedulerLock auto_lock(thread_lock_);
DCHECK(!should_exit_.IsSet());
should_exit_.Set();
if (thread_)
thread_->WakeUp();
wake_up_event_.Signal();
}

bool SchedulerWorker::ShouldExit() const {
Expand All @@ -283,5 +131,104 @@ bool SchedulerWorker::ShouldExit() const {
task_tracker_->IsShutdownComplete();
}

ThreadPriority SchedulerWorker::GetDesiredThreadPriority() const {
// All threads have a NORMAL priority when Lock doesn't handle multiple thread
// priorities.
if (!Lock::HandlesMultipleThreadPriorities())
return ThreadPriority::NORMAL;

// To avoid shutdown hangs, disallow a priority below NORMAL during shutdown.
// If thread priority cannot be increased, never allow a priority below
// NORMAL.
if (static_cast<int>(priority_hint_) <
static_cast<int>(ThreadPriority::NORMAL) &&
(task_tracker_->HasShutdownStarted() ||
!PlatformThread::CanIncreaseCurrentThreadPriority())) {
return ThreadPriority::NORMAL;
}

return priority_hint_;
}

void SchedulerWorker::UpdateThreadPriority(
ThreadPriority desired_thread_priority) {
if (desired_thread_priority == current_thread_priority_)
return;

PlatformThread::SetCurrentThreadPriority(desired_thread_priority);
current_thread_priority_ = desired_thread_priority;
}

void SchedulerWorker::ThreadMain() {
DCHECK_EQ(self_, this);
TRACE_EVENT_BEGIN0("task_scheduler", "SchedulerWorkerThread active");

delegate_->OnMainEntry(this);

// A SchedulerWorker starts out waiting for work.
{
TRACE_EVENT_END0("task_scheduler", "SchedulerWorkerThread active");
delegate_->WaitForWork(&wake_up_event_);
TRACE_EVENT_BEGIN0("task_scheduler", "SchedulerWorkerThread active");
}

// When defined(COM_INIT_CHECK_HOOK_ENABLED), ignore
// SchedulerBackwardCompatibility::INIT_COM_STA to find incorrect uses of
// COM that should be running in a COM STA Task Runner.
#if defined(OS_WIN) && !defined(COM_INIT_CHECK_HOOK_ENABLED)
std::unique_ptr<win::ScopedCOMInitializer> com_initializer;
if (backward_compatibility_ == SchedulerBackwardCompatibility::INIT_COM_STA)
com_initializer = std::make_unique<win::ScopedCOMInitializer>();
#endif

while (!ShouldExit()) {
#if defined(OS_MACOSX)
mac::ScopedNSAutoreleasePool autorelease_pool;
#endif

UpdateThreadPriority(GetDesiredThreadPriority());

// Get the sequence containing the next task to execute.
scoped_refptr<Sequence> sequence = delegate_->GetWork(this);
if (!sequence) {
// Exit immediately if GetWork() resulted in detaching this worker.
if (ShouldExit())
break;

TRACE_EVENT_END0("task_scheduler", "SchedulerWorkerThread active");
delegate_->WaitForWork(&wake_up_event_);
TRACE_EVENT_BEGIN0("task_scheduler", "SchedulerWorkerThread active");
continue;
}

sequence =
task_tracker_->RunAndPopNextTask(std::move(sequence), delegate_.get());

delegate_->DidRunTask();

// Re-enqueue |sequence| if allowed by RunNextTask().
if (sequence)
delegate_->ReEnqueueSequence(std::move(sequence));

// Calling WakeUp() guarantees that this SchedulerWorker will run Tasks from
// Sequences returned by the GetWork() method of |delegate_| until it
// returns nullptr. Resetting |wake_up_event_| here doesn't break this
// invariant and avoids a useless loop iteration before going to sleep if
// WakeUp() is called while this SchedulerWorker is awake.
wake_up_event_.Reset();
}

// Important: It is unsafe to access unowned state (e.g. |task_tracker_|)
// after invoking OnMainExit().

delegate_->OnMainExit(this);

// Release the self-reference to |this|. This can result in deleting |this|
// and as such no more member accesses should be made after this point.
self_ = nullptr;

TRACE_EVENT_END0("task_scheduler", "SchedulerWorkerThread active");
}

} // namespace internal
} // namespace base
Loading

0 comments on commit 6e1b000

Please sign in to comment.