Skip to content

Commit

Permalink
Add WorkerThread to WorkerThreads after context is initialized
Browse files Browse the repository at this point in the history
This is a follow-up CL of crrev.com/c/1792028. The CL seemed to
fix a crash but a better and proper fix is to delay adding
WorkerThread to WorkerThreads() until the worker context is
initialized. This way we can make sure WorkerScheduler is
available for all workers in WorkerThreads().

To terminate initializing workers via TerminateAllWorkersForTesting()
we need to keep initializing workers somewhere else. This CL adds a
HashSet that keeps initializing workers.

Bug: 1000077
Change-Id: I8410ebec0a8459a779105f6d65e2bde4f3b81f20
Reviewed-on: https://chromium-review.googlesource.com/c/chromium/src/+/1792510
Commit-Queue: Kenichi Ishibashi <bashi@chromium.org>
Reviewed-by: Hiroki Nakagawa <nhiroki@chromium.org>
Cr-Commit-Position: refs/heads/master@{#695841}
  • Loading branch information
bashi authored and Commit Bot committed Sep 12, 2019
1 parent 9c0f68b commit 416faab
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 19 deletions.
47 changes: 32 additions & 15 deletions third_party/blink/renderer/core/workers/worker_thread.cc
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,18 @@ namespace {
constexpr base::TimeDelta kForcibleTerminationDelay =
base::TimeDelta::FromSeconds(2);

void TerminateThreadsInSet(HashSet<WorkerThread*> threads) {
for (WorkerThread* thread : threads)
thread->TerminateForTesting();

for (WorkerThread* thread : threads)
thread->WaitForShutdownForTesting();

// Destruct base::Thread and join the underlying system threads.
for (WorkerThread* thread : threads)
thread->ClearWorkerBackingThread();
}

} // namespace

Mutex& WorkerThread::ThreadSetMutex() {
Expand Down Expand Up @@ -140,7 +152,9 @@ class WorkerThread::InterruptData {
WorkerThread::~WorkerThread() {
DCHECK_CALLED_ON_VALID_THREAD(parent_thread_checker_);
MutexLocker lock(ThreadSetMutex());
DCHECK(WorkerThreads().Contains(this));
DCHECK(InitializingWorkerThreads().Contains(this) ||
WorkerThreads().Contains(this));
InitializingWorkerThreads().erase(this);
WorkerThreads().erase(this);

DCHECK(child_threads_.IsEmpty());
Expand Down Expand Up @@ -289,18 +303,8 @@ void WorkerThread::TerminateAllWorkersForTesting() {

// Keep this lock to prevent WorkerThread instances from being destroyed.
MutexLocker lock(ThreadSetMutex());
HashSet<WorkerThread*> threads = WorkerThreads();

for (WorkerThread* thread : threads) {
thread->TerminateForTesting();
}

for (WorkerThread* thread : threads)
thread->WaitForShutdownForTesting();

// Destruct base::Thread and join the underlying system threads.
for (WorkerThread* thread : threads)
thread->ClearWorkerBackingThread();
TerminateThreadsInSet(InitializingWorkerThreads());
TerminateThreadsInSet(WorkerThreads());
}

void WorkerThread::WillProcessTask(const base::PendingTask& pending_task) {
Expand Down Expand Up @@ -374,7 +378,12 @@ WorkerInspectorController* WorkerThread::GetWorkerInspectorController() {

unsigned WorkerThread::WorkerThreadCount() {
MutexLocker lock(ThreadSetMutex());
return WorkerThreads().size();
return InitializingWorkerThreads().size() + WorkerThreads().size();
}

HashSet<WorkerThread*>& WorkerThread::InitializingWorkerThreads() {
DEFINE_THREAD_SAFE_STATIC_LOCAL(HashSet<WorkerThread*>, threads, ());
return threads;
}

HashSet<WorkerThread*>& WorkerThread::WorkerThreads() {
Expand Down Expand Up @@ -453,7 +462,7 @@ WorkerThread::WorkerThread(WorkerReportingProxy& worker_reporting_proxy,
shutdown_event_(RefCountedWaitableEvent::Create()) {
DCHECK_CALLED_ON_VALID_THREAD(parent_thread_checker_);
MutexLocker lock(ThreadSetMutex());
WorkerThreads().insert(this);
InitializingWorkerThreads().insert(this);
}

void WorkerThread::ScheduleToTerminateScriptExecution() {
Expand Down Expand Up @@ -584,6 +593,14 @@ void WorkerThread::InitializeOnWorkerThread(
return;
}

{
MutexLocker lock(ThreadSetMutex());
DCHECK(InitializingWorkerThreads().Contains(this));
DCHECK(!WorkerThreads().Contains(this));
InitializingWorkerThreads().erase(this);
WorkerThreads().insert(this);
}

// It is important that no code is run on the Isolate between
// initializing InspectorTaskRunner and pausing on start.
// Otherwise, InspectorTaskRunner might interrupt isolate execution
Expand Down
9 changes: 5 additions & 4 deletions third_party/blink/renderer/core/workers/worker_thread.h
Original file line number Diff line number Diff line change
Expand Up @@ -195,10 +195,6 @@ class CORE_EXPORT WorkerThread : public Thread::TaskObserver {
Parameters&&... parameters) {
MutexLocker lock(ThreadSetMutex());
for (WorkerThread* thread : WorkerThreads()) {
// Skip threads which aren't started yet. The scheduler and task runners
// are available only after Start() is called.
if (!thread->worker_scheduler_)
continue;
PostCrossThreadTask(
*thread->GetTaskRunner(task_type), FROM_HERE,
CrossThreadBindOnce(function, WTF::CrossThreadUnretained(thread),
Expand Down Expand Up @@ -232,6 +228,7 @@ class CORE_EXPORT WorkerThread : public Thread::TaskObserver {
// function can be called on both the main thread and the worker thread.
// You must not call this after Terminate() is called.
scoped_refptr<base::SingleThreadTaskRunner> GetTaskRunner(TaskType type) {
DCHECK(worker_scheduler_);
return worker_scheduler_->GetTaskRunner(type);
}

Expand Down Expand Up @@ -279,7 +276,11 @@ class CORE_EXPORT WorkerThread : public Thread::TaskObserver {
FRIEND_TEST_ALL_PREFIXES(WorkerThreadTest,
Terminate_WhileDebuggerTaskIsRunning);

// Contains threads which are created but haven't started.
static HashSet<WorkerThread*>& InitializingWorkerThreads();
// Contains threads which have started.
static HashSet<WorkerThread*>& WorkerThreads();
// This mutex guards both WorkerThreads() and InitializingWorkerThreads().
static Mutex& ThreadSetMutex();

// Represents the state of this worker thread.
Expand Down

0 comments on commit 416faab

Please sign in to comment.