Skip to content

Commit

Permalink
TaskScheduler: Move TaskFactory out of scheduler_thread_pool_unittest.cc
Browse files Browse the repository at this point in the history
This will allow the factory to be reused to test TaskSchedulerImpl.

TBR=danakj@chromium.org
BUG=553459

Review URL: https://codereview.chromium.org/1911493002

Cr-Commit-Position: refs/heads/master@{#389491}
  • Loading branch information
fdoray authored and Commit bot committed Apr 25, 2016
1 parent 1d488d9 commit 44fda21
Show file tree
Hide file tree
Showing 5 changed files with 235 additions and 134 deletions.
2 changes: 2 additions & 0 deletions base/BUILD.gn
Original file line number Diff line number Diff line change
Expand Up @@ -1864,6 +1864,8 @@ test("base_unittests") {
"task_scheduler/sequence_sort_key_unittest.cc",
"task_scheduler/sequence_unittest.cc",
"task_scheduler/task_tracker_unittest.cc",
"task_scheduler/test_task_factory.cc",
"task_scheduler/test_task_factory.h",
"task_scheduler/test_utils.h",
"task_scheduler/utils_unittest.cc",
"template_util_unittest.cc",
Expand Down
2 changes: 2 additions & 0 deletions base/base.gyp
Original file line number Diff line number Diff line change
Expand Up @@ -555,6 +555,8 @@
'task_scheduler/sequence_sort_key_unittest.cc',
'task_scheduler/sequence_unittest.cc',
'task_scheduler/task_tracker_unittest.cc',
'task_scheduler/test_task_factory.cc',
'task_scheduler/test_task_factory.h',
'task_scheduler/test_utils.h',
'task_scheduler/utils_unittest.cc',
'template_util_unittest.cc',
Expand Down
190 changes: 56 additions & 134 deletions base/task_scheduler/scheduler_thread_pool_unittest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include "base/task_scheduler/sequence.h"
#include "base/task_scheduler/sequence_sort_key.h"
#include "base/task_scheduler/task_tracker.h"
#include "base/task_scheduler/test_task_factory.h"
#include "base/threading/platform_thread.h"
#include "base/threading/simple_thread.h"
#include "testing/gtest/include/gtest/gtest.h"
Expand Down Expand Up @@ -71,154 +72,78 @@ class TaskSchedulerThreadPoolTest
DISALLOW_COPY_AND_ASSIGN(TaskSchedulerThreadPoolTest);
};

class TaskFactory {
public:
// Constructs a TaskFactory that posts tasks with |execution_mode| to
// |thread_pool|.
TaskFactory(SchedulerThreadPool* thread_pool, ExecutionMode execution_mode)
: cv_(&lock_),
task_runner_(thread_pool->CreateTaskRunnerWithTraits(TaskTraits(),
execution_mode)),
execution_mode_(execution_mode) {}

// Posts a task through |task_runner_|. If |post_nested_task| is true, the
// task will post a new task when it runs. If |event| is set, the task will
// block until it is signaled.
void PostTestTask(bool post_nested_task, WaitableEvent* event) {
AutoLock auto_lock(lock_);
EXPECT_TRUE(task_runner_->PostTask(
FROM_HERE,
Bind(&TaskFactory::RunTaskCallback, Unretained(this),
num_created_tasks_++, post_nested_task, Unretained(event))));
}

// Waits for all tasks posted by PostTestTask() to start running. It is not
// guaranteed that the tasks have completed their execution when this returns.
void WaitForAllTasksToRun() const {
AutoLock auto_lock(lock_);
while (ran_tasks_.size() < num_created_tasks_)
cv_.Wait();
}

size_t NumRunTasks() const {
AutoLock auto_lock(lock_);
return ran_tasks_.size();
}

const TaskRunner* task_runner() const { return task_runner_.get(); }

private:
void RunTaskCallback(size_t task_index,
bool post_nested_task,
WaitableEvent* event) {
if (post_nested_task)
PostTestTask(false, nullptr);

EXPECT_TRUE(task_runner_->RunsTasksOnCurrentThread());

{
AutoLock auto_lock(lock_);

if (execution_mode_ == ExecutionMode::SEQUENCED &&
task_index != ran_tasks_.size()) {
ADD_FAILURE() << "A SEQUENCED task didn't run in the expected order.";
}

if (ran_tasks_.find(task_index) != ran_tasks_.end())
ADD_FAILURE() << "A task ran more than once.";
ran_tasks_.insert(task_index);

cv_.Signal();
}

if (event)
event->Wait();
}

// Synchronizes access to all members below.
mutable Lock lock_;

// Condition variable signaled when a task runs.
mutable ConditionVariable cv_;

// Task runner through which this factory posts tasks.
const scoped_refptr<TaskRunner> task_runner_;

// Execution mode of |task_runner_|.
const ExecutionMode execution_mode_;

// Number of tasks posted by PostTestTask().
size_t num_created_tasks_ = 0;

// Indexes of tasks that ran.
std::unordered_set<size_t> ran_tasks_;

DISALLOW_COPY_AND_ASSIGN(TaskFactory);
};
using PostNestedTask = test::TestTaskFactory::PostNestedTask;

class ThreadPostingTasks : public SimpleThread {
public:
enum class WaitBeforePostTask {
NO_WAIT,
WAIT_FOR_ALL_THREADS_IDLE,
};

// Constructs a thread that posts tasks to |thread_pool| through an
// |execution_mode| task runner. If |wait_for_all_threads_idle| is true, the
// thread wait until all worker threads in |thread_pool| are idle before
// posting a new task. If |post_nested_task| is true, each task posted by this
// thread posts another task when it runs.
// |execution_mode| task runner. If |wait_before_post_task| is
// WAIT_FOR_ALL_THREADS_IDLE, the thread waits until all worker threads in
// |thread_pool| are idle before posting a new task. If |post_nested_task| is
// YES, each task posted by this thread posts another task when it runs.
ThreadPostingTasks(SchedulerThreadPool* thread_pool,
ExecutionMode execution_mode,
bool wait_for_all_threads_idle,
bool post_nested_task)
WaitBeforePostTask wait_before_post_task,
PostNestedTask post_nested_task)
: SimpleThread("ThreadPostingTasks"),
thread_pool_(thread_pool),
wait_for_all_threads_idle_(wait_for_all_threads_idle),
wait_before_post_task_(wait_before_post_task),
post_nested_task_(post_nested_task),
factory_(thread_pool_, execution_mode) {
factory_(thread_pool_->CreateTaskRunnerWithTraits(TaskTraits(),
execution_mode),
execution_mode) {
DCHECK(thread_pool_);
}

const TaskFactory* factory() const { return &factory_; }
const test::TestTaskFactory* factory() const { return &factory_; }

private:
void Run() override {
EXPECT_FALSE(factory_.task_runner()->RunsTasksOnCurrentThread());

for (size_t i = 0; i < kNumTasksPostedPerThread; ++i) {
if (wait_for_all_threads_idle_)
if (wait_before_post_task_ ==
WaitBeforePostTask::WAIT_FOR_ALL_THREADS_IDLE) {
thread_pool_->WaitForAllWorkerThreadsIdleForTesting();
factory_.PostTestTask(post_nested_task_, nullptr);
}
EXPECT_TRUE(factory_.PostTask(post_nested_task_, nullptr));
}
}

SchedulerThreadPool* const thread_pool_;
const scoped_refptr<TaskRunner> task_runner_;
const bool wait_for_all_threads_idle_;
const bool post_nested_task_;
TaskFactory factory_;
const WaitBeforePostTask wait_before_post_task_;
const PostNestedTask post_nested_task_;
test::TestTaskFactory factory_;

DISALLOW_COPY_AND_ASSIGN(ThreadPostingTasks);
};

using WaitBeforePostTask = ThreadPostingTasks::WaitBeforePostTask;

TEST_P(TaskSchedulerThreadPoolTest, PostTasks) {
// Create threads to post tasks.
std::vector<std::unique_ptr<ThreadPostingTasks>> threads_posting_tasks;
for (size_t i = 0; i < kNumThreadsPostingTasks; ++i) {
const bool kWaitForAllThreadIdle = false;
const bool kPostNestedTasks = false;
threads_posting_tasks.push_back(WrapUnique(
new ThreadPostingTasks(thread_pool_.get(), GetParam(),
kWaitForAllThreadIdle, kPostNestedTasks)));
threads_posting_tasks.push_back(WrapUnique(new ThreadPostingTasks(
thread_pool_.get(), GetParam(), WaitBeforePostTask::NO_WAIT,
PostNestedTask::NO)));
threads_posting_tasks.back()->Start();
}

// Wait for all tasks to run.
for (const auto& thread_posting_tasks : threads_posting_tasks) {
thread_posting_tasks->Join();
thread_posting_tasks->factory()->WaitForAllTasksToRun();
EXPECT_EQ(kNumTasksPostedPerThread,
thread_posting_tasks->factory()->NumRunTasks());
}

// Wait until all worker threads are idle to be sure that no task accesses
// its TaskFactory after |thread_posting_tasks| is destroyed.
// its TestTaskFactory after |thread_posting_tasks| is destroyed.
thread_pool_->WaitForAllWorkerThreadsIdleForTesting();
}

Expand All @@ -228,24 +153,20 @@ TEST_P(TaskSchedulerThreadPoolTest, PostTasksWaitAllThreadsIdle) {
// before posting a new task.
std::vector<std::unique_ptr<ThreadPostingTasks>> threads_posting_tasks;
for (size_t i = 0; i < kNumThreadsPostingTasks; ++i) {
const bool kWaitForAllThreadIdle = true;
const bool kPostNestedTasks = false;
threads_posting_tasks.push_back(WrapUnique(
new ThreadPostingTasks(thread_pool_.get(), GetParam(),
kWaitForAllThreadIdle, kPostNestedTasks)));
threads_posting_tasks.push_back(WrapUnique(new ThreadPostingTasks(
thread_pool_.get(), GetParam(),
WaitBeforePostTask::WAIT_FOR_ALL_THREADS_IDLE, PostNestedTask::NO)));
threads_posting_tasks.back()->Start();
}

// Wait for all tasks to run.
for (const auto& thread_posting_tasks : threads_posting_tasks) {
thread_posting_tasks->Join();
thread_posting_tasks->factory()->WaitForAllTasksToRun();
EXPECT_EQ(kNumTasksPostedPerThread,
thread_posting_tasks->factory()->NumRunTasks());
}

// Wait until all worker threads are idle to be sure that no task accesses
// its TaskFactory after |thread_posting_tasks| is destroyed.
// its TestTaskFactory after |thread_posting_tasks| is destroyed.
thread_pool_->WaitForAllWorkerThreadsIdleForTesting();
}

Expand All @@ -254,24 +175,20 @@ TEST_P(TaskSchedulerThreadPoolTest, NestedPostTasks) {
// another task when it runs.
std::vector<std::unique_ptr<ThreadPostingTasks>> threads_posting_tasks;
for (size_t i = 0; i < kNumThreadsPostingTasks; ++i) {
const bool kWaitForAllThreadIdle = false;
const bool kPostNestedTasks = true;
threads_posting_tasks.push_back(WrapUnique(
new ThreadPostingTasks(thread_pool_.get(), GetParam(),
kWaitForAllThreadIdle, kPostNestedTasks)));
threads_posting_tasks.push_back(WrapUnique(new ThreadPostingTasks(
thread_pool_.get(), GetParam(), WaitBeforePostTask::NO_WAIT,
PostNestedTask::YES)));
threads_posting_tasks.back()->Start();
}

// Wait for all tasks to run.
for (const auto& thread_posting_tasks : threads_posting_tasks) {
thread_posting_tasks->Join();
thread_posting_tasks->factory()->WaitForAllTasksToRun();
EXPECT_EQ(2 * kNumTasksPostedPerThread,
thread_posting_tasks->factory()->NumRunTasks());
}

// Wait until all worker threads are idle to be sure that no task accesses
// its TaskFactory after |thread_posting_tasks| is destroyed.
// its TestTaskFactory after |thread_posting_tasks| is destroyed.
thread_pool_->WaitForAllWorkerThreadsIdleForTesting();
}

Expand All @@ -280,26 +197,30 @@ TEST_P(TaskSchedulerThreadPoolTest, PostTasksWithOneAvailableThread) {
// Use different factories so that tasks are added to different sequences and
// can run simultaneously when the execution mode is SEQUENCED.
WaitableEvent event(true, false);
std::vector<std::unique_ptr<TaskFactory>> blocked_task_factories;
std::vector<std::unique_ptr<test::TestTaskFactory>> blocked_task_factories;
for (size_t i = 0; i < (kNumThreadsInThreadPool - 1); ++i) {
blocked_task_factories.push_back(
WrapUnique(new TaskFactory(thread_pool_.get(), GetParam())));
blocked_task_factories.back()->PostTestTask(false, &event);
blocked_task_factories.push_back(WrapUnique(new test::TestTaskFactory(
thread_pool_->CreateTaskRunnerWithTraits(TaskTraits(), GetParam()),
GetParam())));
EXPECT_TRUE(
blocked_task_factories.back()->PostTask(PostNestedTask::NO, &event));
blocked_task_factories.back()->WaitForAllTasksToRun();
}

// Post |kNumTasksPostedPerThread| tasks that should all run despite the fact
// that only one thread in |thread_pool_| isn't busy.
TaskFactory short_task_factory(thread_pool_.get(), GetParam());
test::TestTaskFactory short_task_factory(
thread_pool_->CreateTaskRunnerWithTraits(TaskTraits(), GetParam()),
GetParam());
for (size_t i = 0; i < kNumTasksPostedPerThread; ++i)
short_task_factory.PostTestTask(false, nullptr);
EXPECT_TRUE(short_task_factory.PostTask(PostNestedTask::NO, nullptr));
short_task_factory.WaitForAllTasksToRun();

// Release tasks waiting on |event|.
event.Signal();

// Wait until all worker threads are idle to be sure that no task accesses
// its TaskFactory after it is destroyed.
// its TestTaskFactory after it is destroyed.
thread_pool_->WaitForAllWorkerThreadsIdleForTesting();
}

Expand All @@ -309,19 +230,20 @@ TEST_P(TaskSchedulerThreadPoolTest, Saturate) {
// tasks are added to different sequences and can run simultaneously when the
// execution mode is SEQUENCED.
WaitableEvent event(true, false);
std::vector<std::unique_ptr<TaskFactory>> factories;
std::vector<std::unique_ptr<test::TestTaskFactory>> factories;
for (size_t i = 0; i < kNumThreadsInThreadPool; ++i) {
factories.push_back(
WrapUnique(new TaskFactory(thread_pool_.get(), GetParam())));
factories.back()->PostTestTask(false, &event);
factories.push_back(WrapUnique(new test::TestTaskFactory(
thread_pool_->CreateTaskRunnerWithTraits(TaskTraits(), GetParam()),
GetParam())));
EXPECT_TRUE(factories.back()->PostTask(PostNestedTask::NO, &event));
factories.back()->WaitForAllTasksToRun();
}

// Release tasks waiting on |event|.
event.Signal();

// Wait until all worker threads are idle to be sure that no task accesses
// its TaskFactory after it is destroyed.
// its TestTaskFactory after it is destroyed.
thread_pool_->WaitForAllWorkerThreadsIdleForTesting();
}

Expand Down
Loading

0 comments on commit 44fda21

Please sign in to comment.