Skip to content

Commit

Permalink
[ThreadPool]: Implement standalone JobTaskSource.
Browse files Browse the repository at this point in the history
This CL Creates a new derived JobTaskSource that tracks the number
of concurrent worker.
It's a precursor of https://chromium-review.googlesource.com/c/chromium/src/+/1582427
which integrates JobTaskSource in ThreadGroups.

Bug: 839091
Change-Id: Ie407823bdd6d4c84fd8b3db7d6d6a58a33846045
Reviewed-on: https://chromium-review.googlesource.com/c/chromium/src/+/1693182
Commit-Queue: François Doray <fdoray@chromium.org>
Reviewed-by: Gabriel Charette <gab@chromium.org>
Reviewed-by: François Doray <fdoray@chromium.org>
Cr-Commit-Position: refs/heads/master@{#677058}
  • Loading branch information
Etienne Pierre-doray authored and Commit Bot committed Jul 12, 2019
1 parent 665a2b9 commit 36afade
Show file tree
Hide file tree
Showing 15 changed files with 519 additions and 85 deletions.
3 changes: 3 additions & 0 deletions base/BUILD.gn
Original file line number Diff line number Diff line change
Expand Up @@ -829,6 +829,8 @@ jumbo_component("base") {
"task/thread_pool/environment_config.h",
"task/thread_pool/initialization_util.cc",
"task/thread_pool/initialization_util.h",
"task/thread_pool/job_task_source.cc",
"task/thread_pool/job_task_source.h",
"task/thread_pool/pooled_parallel_task_runner.cc",
"task/thread_pool/pooled_parallel_task_runner.h",
"task/thread_pool/pooled_sequenced_task_runner.cc",
Expand Down Expand Up @@ -2686,6 +2688,7 @@ test("base_unittests") {
"task/thread_pool/can_run_policy_test.h",
"task/thread_pool/delayed_task_manager_unittest.cc",
"task/thread_pool/environment_config_unittest.cc",
"task/thread_pool/job_task_source_unittest.cc",
"task/thread_pool/pooled_single_thread_task_runner_manager_unittest.cc",
"task/thread_pool/priority_queue_unittest.cc",
"task/thread_pool/sequence_sort_key_unittest.cc",
Expand Down
90 changes: 90 additions & 0 deletions base/task/thread_pool/job_task_source.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
// Copyright 2019 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#include "base/task/thread_pool/job_task_source.h"

#include <utility>

#include "base/logging.h"
#include "base/memory/ptr_util.h"
#include "base/task/task_features.h"
#include "base/task/thread_pool/thread_pool_clock.h"
#include "base/time/time.h"

namespace base {
namespace internal {

JobTaskSource::JobTaskSource(const Location& from_here,
base::RepeatingClosure worker_task,
const TaskTraits& traits)
: TaskSource(traits, nullptr, TaskSourceExecutionMode::kJob),
from_here_(from_here),
worker_task_(std::move(worker_task)),
queue_time_(ThreadPoolClock::Now()) {}

JobTaskSource::~JobTaskSource() = default;

ExecutionEnvironment JobTaskSource::GetExecutionEnvironment() {
return {SequenceToken::Create(), nullptr};
}

TaskSource::RunIntent JobTaskSource::WillRunTask() {
const size_t max_concurrency = GetMaxConcurrency();
const size_t worker_count_initial =
worker_count_.load(std::memory_order_relaxed);
// Don't allow this worker to run the task if either:
// A) |worker_count_| is already at |max_concurrency|.
// B) |max_concurrency| was lowered below or to |worker_count_|.
if (worker_count_initial >= max_concurrency) {
// The caller receives an invalid RunIntent and should skip this TaskSource.
return RunIntent();
}
const size_t worker_count_before_add =
worker_count_.fetch_add(1, std::memory_order_relaxed);
// WillRunTask() has external synchronization to prevent concurrent calls and
// it is the only place where |worker_count_| is incremented. Therefore, the
// second reading of |worker_count_| from WillRunTask() cannot be greater than
// the first reading. However, since DidProcessTask() can decrement
// |worker_count_| concurrently with WillRunTask(), the second reading can be
// lower than the first reading.
DCHECK_LE(worker_count_before_add, worker_count_initial);
DCHECK_LT(worker_count_before_add, max_concurrency);
return MakeRunIntent(max_concurrency == worker_count_before_add + 1
? Saturated::kYes
: Saturated::kNo);
}

size_t JobTaskSource::GetRemainingConcurrency() const {
return GetMaxConcurrency() - worker_count_.load(std::memory_order_relaxed);
}

Optional<Task> JobTaskSource::TakeTask() {
DCHECK_GT(worker_count_.load(std::memory_order_relaxed), 0U);
DCHECK(worker_task_);
return base::make_optional<Task>(from_here_, worker_task_, TimeDelta());
}

bool JobTaskSource::DidProcessTask(RunResult run_result) {
size_t worker_count_before_sub =
worker_count_.fetch_sub(1, std::memory_order_relaxed);
DCHECK_GT(worker_count_before_sub, 0U);
// Re-enqueue the TaskSource if the task ran and the worker count is below the
// max concurrency.
const bool must_be_queued =
run_result == RunResult::kSkippedAtShutdown
? false
: worker_count_before_sub <= GetMaxConcurrency();
return must_be_queued;
}

SequenceSortKey JobTaskSource::GetSortKey() const {
return SequenceSortKey(traits_.priority(), queue_time_);
}

void JobTaskSource::Clear() {
worker_task_.Reset();
}

} // namespace internal
} // namespace base
69 changes: 69 additions & 0 deletions base/task/thread_pool/job_task_source.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
// Copyright 2019 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#ifndef BASE_TASK_THREAD_POOL_JOB_TASK_SOURCE_H_
#define BASE_TASK_THREAD_POOL_JOB_TASK_SOURCE_H_

#include <stddef.h>

#include <atomic>

#include "base/base_export.h"
#include "base/macros.h"
#include "base/optional.h"
#include "base/task/task_traits.h"
#include "base/task/thread_pool/sequence_sort_key.h"
#include "base/task/thread_pool/task.h"
#include "base/task/thread_pool/task_source.h"

namespace base {
namespace internal {

// A JobTaskSource generates many Tasks from a single RepeatingClosure.
//
// Derived classes control the intended concurrency with GetMaxConcurrency().
// Increase in concurrency is not supported and should never happen.
// TODO(etiennep): Support concurrency increase.
class BASE_EXPORT JobTaskSource : public TaskSource {
public:
JobTaskSource(const Location& from_here,
base::RepeatingClosure task,
const TaskTraits& traits);

// TaskSource:
RunIntent WillRunTask() override;
ExecutionEnvironment GetExecutionEnvironment() override;
size_t GetRemainingConcurrency() const override;

protected:
~JobTaskSource() override;

// Returns the maximum number of tasks from this TaskSource that can run
// concurrently. The implementation can only return values lower than or equal
// to previously returned values.
virtual size_t GetMaxConcurrency() const = 0;

private:
// TaskSource:
Optional<Task> TakeTask() override;
bool DidProcessTask(RunResult run_result) override;
SequenceSortKey GetSortKey() const override;
void Clear() override;

// The current number of workers concurrently running tasks from this
// TaskSource. "memory_order_relaxed" is sufficient to access this variable as
// no other state is synchronized with it.
std::atomic_size_t worker_count_{0U};

const Location from_here_;
base::RepeatingClosure worker_task_;
const TimeTicks queue_time_;

DISALLOW_COPY_AND_ASSIGN(JobTaskSource);
};

} // namespace internal
} // namespace base

#endif // BASE_TASK_THREAD_POOL_JOB_TASK_SOURCE_H_
146 changes: 146 additions & 0 deletions base/task/thread_pool/job_task_source_unittest.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
// Copyright 2019 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#include "base/task/thread_pool/job_task_source.h"

#include <utility>

#include "base/bind_helpers.h"
#include "base/memory/ptr_util.h"
#include "base/task/thread_pool/test_utils.h"
#include "base/test/gtest_util.h"
#include "testing/gtest/include/gtest/gtest.h"

namespace base {
namespace internal {

// Verifies the normal flow of running 2 tasks in series.
TEST(ThreadPoolJobTaskSourceTest, RunTasks) {
scoped_refptr<test::MockJobTaskSource> task_source =
MakeRefCounted<test::MockJobTaskSource>(
FROM_HERE, DoNothing(), TaskTraits(TaskPriority::BEST_EFFORT),
/* num_tasks_to_run */ 2, /* max_concurrency */ 1);

TaskSource::Transaction task_source_transaction(
task_source->BeginTransaction());

{
auto run_intent = task_source->WillRunTask();
EXPECT_TRUE(run_intent);
EXPECT_TRUE(run_intent.IsSaturated());

// An attempt to run an additional task is not allowed until this task
// is processed.
EXPECT_FALSE(task_source->WillRunTask());

auto task = task_source_transaction.TakeTask(&run_intent);

EXPECT_FALSE(task_source->WillRunTask());

std::move(task->task).Run();
EXPECT_TRUE(task_source_transaction.DidProcessTask(std::move(run_intent)));
}
{
auto run_intent = task_source->WillRunTask();
EXPECT_TRUE(run_intent);
EXPECT_TRUE(run_intent.IsSaturated());
auto task = task_source_transaction.TakeTask(&run_intent);
std::move(task->task).Run();
EXPECT_FALSE(task_source_transaction.DidProcessTask(std::move(run_intent)));
}
}

// Verifies that a job task source doesn't get reenqueued when a task is not
// run.
TEST(ThreadPoolJobTaskSourceTest, SkipTask) {
scoped_refptr<test::MockJobTaskSource> task_source =
MakeRefCounted<test::MockJobTaskSource>(
FROM_HERE, DoNothing(), TaskTraits(TaskPriority::BEST_EFFORT),
/* num_tasks_to_run */ 2, /* max_concurrency */ 1);

TaskSource::Transaction task_source_transaction(
task_source->BeginTransaction());

auto run_intent = task_source->WillRunTask();
EXPECT_TRUE(run_intent);
EXPECT_TRUE(run_intent.IsSaturated());
auto task = task_source_transaction.TakeTask(&run_intent);
EXPECT_FALSE(task_source_transaction.DidProcessTask(
std::move(run_intent), TaskSource::RunResult::kSkippedAtShutdown));
}

// Verifies that multiple tasks can run in parallel up to |max_concurrency|.
TEST(ThreadPoolJobTaskSourceTest, RunTasksInParallel) {
scoped_refptr<test::MockJobTaskSource> task_source =
MakeRefCounted<test::MockJobTaskSource>(
FROM_HERE, DoNothing(), TaskTraits(TaskPriority::BEST_EFFORT),
/* num_tasks_to_run */ 3, /* max_concurrency */ 2);

TaskSource::Transaction task_source_transaction(
task_source->BeginTransaction());

auto run_intent_a = task_source->WillRunTask();
EXPECT_TRUE(run_intent_a);
EXPECT_FALSE(run_intent_a.IsSaturated());
auto task_a = task_source_transaction.TakeTask(&run_intent_a);

auto run_intent_b = task_source->WillRunTask();
EXPECT_TRUE(run_intent_b);
EXPECT_TRUE(run_intent_b.IsSaturated());
auto task_b = task_source_transaction.TakeTask(&run_intent_b);

// WillRunTask() should return a null RunIntent once the max concurrency is
// reached.
EXPECT_FALSE(task_source->WillRunTask());

std::move(task_a->task).Run();
EXPECT_TRUE(task_source_transaction.DidProcessTask(std::move(run_intent_a)));

std::move(task_b->task).Run();
EXPECT_TRUE(task_source_transaction.DidProcessTask(std::move(run_intent_b)));

auto run_intent_c = task_source->WillRunTask();
EXPECT_TRUE(run_intent_c);
EXPECT_TRUE(run_intent_c.IsSaturated());
auto task_c = task_source_transaction.TakeTask(&run_intent_c);

std::move(task_c->task).Run();
EXPECT_FALSE(task_source_transaction.DidProcessTask(std::move(run_intent_c)));
}

TEST(ThreadPoolJobTaskSourceTest, InvalidTakeTask) {
scoped_refptr<test::MockJobTaskSource> task_source =
MakeRefCounted<test::MockJobTaskSource>(
FROM_HERE, DoNothing(), TaskTraits(TaskPriority::BEST_EFFORT),
/* num_tasks_to_run */ 1, /* max_concurrency */ 1);
TaskSource::Transaction task_source_transaction(
task_source->BeginTransaction());

auto run_intent_a = task_source->WillRunTask();
auto run_intent_b = task_source->WillRunTask();
EXPECT_FALSE(run_intent_b);
// Can not be called with an invalid RunIntent.
EXPECT_DCHECK_DEATH(
{ auto task = task_source_transaction.TakeTask(&run_intent_b); });
run_intent_a.ReleaseForTesting();
}

TEST(ThreadPoolJobTaskSourceTest, InvalidDidProcessTask) {
scoped_refptr<test::MockJobTaskSource> task_source =
MakeRefCounted<test::MockJobTaskSource>(
FROM_HERE, DoNothing(), TaskTraits(TaskPriority::BEST_EFFORT),
/* num_tasks_to_run */ 1, /* max_concurrency */ 1);
TaskSource::Transaction task_source_transaction(
task_source->BeginTransaction());

auto run_intent = task_source->WillRunTask();
EXPECT_TRUE(run_intent);
// Can not be called before TakeTask().
EXPECT_DCHECK_DEATH(
task_source_transaction.DidProcessTask(std::move(run_intent)));
run_intent.ReleaseForTesting();
}

} // namespace internal
} // namespace base
9 changes: 6 additions & 3 deletions base/task/thread_pool/sequence.cc
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,10 @@ TaskSource::RunIntent Sequence::WillRunTask() {
// TakeTask() and DidProcessTask() and only called if |!queue_.empty()|, which
// means it won't race with WillPushTask()/PushTask().
has_worker_ = true;
return MakeRunIntent(ConcurrencyStatus::kSaturated);
return MakeRunIntent(Saturated::kYes);
}

size_t Sequence::GetMaxConcurrency() const {
size_t Sequence::GetRemainingConcurrency() const {
return 1;
}

Expand All @@ -83,7 +83,7 @@ Optional<Task> Sequence::TakeTask() {
return std::move(next_task);
}

bool Sequence::DidProcessTask(bool /* was_run */) {
bool Sequence::DidProcessTask(RunResult run_result) {
// There should never be a call to DidProcessTask without an associated
// WillRunTask().
DCHECK(has_worker_);
Expand All @@ -92,6 +92,9 @@ bool Sequence::DidProcessTask(bool /* was_run */) {
ReleaseTaskRunner();
return false;
}
// Let the caller re-enqueue this non-empty Sequence regardless of
// |run_result| so it can continue churning through this Sequence's tasks and
// skip/delete them in the proper scope.
return true;
}

Expand Down
4 changes: 2 additions & 2 deletions base/task/thread_pool/sequence.h
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ class BASE_EXPORT Sequence : public TaskSource {
// TaskSource:
ExecutionEnvironment GetExecutionEnvironment() override;
RunIntent WillRunTask() override;
size_t GetMaxConcurrency() const override;
size_t GetRemainingConcurrency() const override;

// Returns a token that uniquely identifies this Sequence.
const SequenceToken& token() const { return token_; }
Expand All @@ -100,7 +100,7 @@ class BASE_EXPORT Sequence : public TaskSource {

// TaskSource:
Optional<Task> TakeTask() override WARN_UNUSED_RESULT;
bool DidProcessTask(bool can_keep_running) override;
bool DidProcessTask(RunResult run_result) override;
SequenceSortKey GetSortKey() const override;
void Clear() override;

Expand Down
Loading

0 comments on commit 36afade

Please sign in to comment.