Skip to content

Commit

Permalink
TaskScheduler [4/9] Priority Queue
Browse files Browse the repository at this point in the history
This change is a subset of https://codereview.chromium.org/1698183005/

A PriorityQueue holds Sequences of Tasks. It supports Push, Pop and
Peek operations through a Transaction object.

A SequenceSortKey must be provided to push a Sequence into a
PriorityQueue. Sequences are sorted according to their SequenceSortKey.
The SequenceSortKey of a Sequence never changes while it is in the
PriorityQueue (even if Tasks are pushed/popped from the Sequence).

BUG=553459

Review URL: https://codereview.chromium.org/1709713002

Cr-Commit-Position: refs/heads/master@{#382115}
  • Loading branch information
fdoray authored and Commit bot committed Mar 18, 2016
1 parent 85d385a commit 6477fbd
Show file tree
Hide file tree
Showing 8 changed files with 501 additions and 7 deletions.
4 changes: 4 additions & 0 deletions base/BUILD.gn
Original file line number Diff line number Diff line change
Expand Up @@ -766,6 +766,8 @@ component("base") {
"task_runner.cc",
"task_runner.h",
"task_runner_util.h",
"task_scheduler/priority_queue.cc",
"task_scheduler/priority_queue.h",
"task_scheduler/scheduler_lock.h",
"task_scheduler/scheduler_lock_impl.cc",
"task_scheduler/scheduler_lock_impl.h",
Expand Down Expand Up @@ -1815,9 +1817,11 @@ test("base_unittests") {
"system_monitor/system_monitor_unittest.cc",
"task/cancelable_task_tracker_unittest.cc",
"task_runner_util_unittest.cc",
"task_scheduler/priority_queue_unittest.cc",
"task_scheduler/scheduler_lock_unittest.cc",
"task_scheduler/sequence_sort_key_unittest.cc",
"task_scheduler/sequence_unittest.cc",
"task_scheduler/test_utils.h",
"template_util_unittest.cc",
"test/histogram_tester_unittest.cc",
"test/icu_test_util.cc",
Expand Down
2 changes: 2 additions & 0 deletions base/base.gyp
Original file line number Diff line number Diff line change
Expand Up @@ -549,9 +549,11 @@
'system_monitor/system_monitor_unittest.cc',
'task/cancelable_task_tracker_unittest.cc',
'task_runner_util_unittest.cc',
'task_scheduler/priority_queue_unittest.cc',
'task_scheduler/scheduler_lock_unittest.cc',
'task_scheduler/sequence_sort_key_unittest.cc',
'task_scheduler/sequence_unittest.cc',
'task_scheduler/test_utils.h',
'template_util_unittest.cc',
'test/histogram_tester_unittest.cc',
'test/test_pending_task_unittest.cc',
Expand Down
2 changes: 2 additions & 0 deletions base/base.gypi
Original file line number Diff line number Diff line change
Expand Up @@ -636,6 +636,8 @@
'task_runner.cc',
'task_runner.h',
'task_runner_util.h',
'task_scheduler/priority_queue.cc',
'task_scheduler/priority_queue.h',
'task_scheduler/scheduler_lock.h',
'task_scheduler/scheduler_lock_impl.cc',
'task_scheduler/scheduler_lock_impl.h',
Expand Down
89 changes: 89 additions & 0 deletions base/task_scheduler/priority_queue.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
// Copyright 2016 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#include "base/task_scheduler/priority_queue.h"

#include <utility>

#include "base/logging.h"

namespace base {
namespace internal {

PriorityQueue::SequenceAndSortKey::SequenceAndSortKey()
: sort_key(TaskPriority::LOWEST, TimeTicks()) {}

PriorityQueue::SequenceAndSortKey::SequenceAndSortKey(
scoped_refptr<Sequence> sequence,
const SequenceSortKey& sort_key)
: sequence(std::move(sequence)), sort_key(sort_key) {}

PriorityQueue::SequenceAndSortKey::~SequenceAndSortKey() = default;

PriorityQueue::Transaction::Transaction(PriorityQueue* outer_queue)
: auto_lock_(new AutoSchedulerLock(outer_queue->container_lock_)),
outer_queue_(outer_queue) {
DCHECK(CalledOnValidThread());
}

PriorityQueue::Transaction::~Transaction() {
DCHECK(CalledOnValidThread());

// Run the sequence insertion callback once for each Sequence that was
// inserted in the PriorityQueue during the lifetime of this Transaction.
// Perform this outside the scope of PriorityQueue's lock to avoid imposing an
// unnecessary lock dependency on |sequence_inserted_callback_|'s destination.
auto_lock_.reset();
for (size_t i = 0; i < num_pushed_sequences_; ++i)
outer_queue_->sequence_inserted_callback_.Run();
}

void PriorityQueue::Transaction::Push(
scoped_ptr<SequenceAndSortKey> sequence_and_sort_key) {
DCHECK(CalledOnValidThread());
DCHECK(!sequence_and_sort_key->is_null());

outer_queue_->container_.push(std::move(sequence_and_sort_key));
++num_pushed_sequences_;
}

const PriorityQueue::SequenceAndSortKey& PriorityQueue::Transaction::Peek()
const {
DCHECK(CalledOnValidThread());

// TODO(fdoray): Add an IsEmpty() method to Transaction and require Peek() to
// be called on a non-empty PriorityQueue only.
if (outer_queue_->container_.empty())
return outer_queue_->empty_sequence_and_sort_key_;

return *outer_queue_->container_.top();
}

void PriorityQueue::Transaction::Pop() {
DCHECK(CalledOnValidThread());
DCHECK(!outer_queue_->container_.empty());
outer_queue_->container_.pop();
}

PriorityQueue::PriorityQueue(const Closure& sequence_inserted_callback)
: sequence_inserted_callback_(sequence_inserted_callback) {
DCHECK(!sequence_inserted_callback_.is_null());
}

PriorityQueue::PriorityQueue(const Closure& sequence_inserted_callback,
const PriorityQueue* predecessor_priority_queue)
: container_lock_(&predecessor_priority_queue->container_lock_),
sequence_inserted_callback_(sequence_inserted_callback) {
DCHECK(!sequence_inserted_callback_.is_null());
DCHECK(predecessor_priority_queue);
}

PriorityQueue::~PriorityQueue() = default;

scoped_ptr<PriorityQueue::Transaction> PriorityQueue::BeginTransaction() {
return make_scoped_ptr(new Transaction(this));
}

} // namespace internal
} // namespace base
139 changes: 139 additions & 0 deletions base/task_scheduler/priority_queue.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
// Copyright 2016 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#ifndef BASE_TASK_SCHEDULER_PRIORITY_QUEUE_H_
#define BASE_TASK_SCHEDULER_PRIORITY_QUEUE_H_

#include <queue>
#include <vector>

#include "base/base_export.h"
#include "base/callback.h"
#include "base/macros.h"
#include "base/memory/ref_counted.h"
#include "base/memory/scoped_ptr.h"
#include "base/task_scheduler/scheduler_lock.h"
#include "base/task_scheduler/sequence.h"
#include "base/task_scheduler/sequence_sort_key.h"
#include "base/threading/non_thread_safe.h"

namespace base {
namespace internal {

// A PriorityQueue holds Sequences of Tasks. This class is thread-safe.
class BASE_EXPORT PriorityQueue {
public:
// An immutable struct combining a Sequence and the sort key that determines
// its position in a PriorityQueue.
struct BASE_EXPORT SequenceAndSortKey {
// Constructs a null SequenceAndSortKey.
SequenceAndSortKey();

// Constructs a SequenceAndSortKey with the given |sequence| and |sort_key|.
SequenceAndSortKey(scoped_refptr<Sequence> sequence,
const SequenceSortKey& sort_key);

~SequenceAndSortKey();

// Returns true if this is a null SequenceAndSortKey.
bool is_null() const { return !sequence; }

const scoped_refptr<Sequence> sequence;
const SequenceSortKey sort_key;
};

// A Transaction can perform multiple operations atomically on a
// PriorityQueue. While a Transaction is alive, it is guaranteed that nothing
// else will access the PriorityQueue.
//
// A WorkerThread needs to be able to Peek sequences from both its
// PriorityQueues (single-threaded and shared) and then Pop the sequence with
// the highest priority. If the Peek and the Pop are done through the same
// Transaction, it is guaranteed that the PriorityQueue hasn't changed between
// the 2 operations.
class BASE_EXPORT Transaction : public NonThreadSafe {
public:
~Transaction();

// Inserts |sequence_and_sort_key| in the PriorityQueue.
void Push(scoped_ptr<SequenceAndSortKey> sequence_and_sort_key);

// Returns the SequenceAndSortKey with the highest priority or a null
// SequenceAndSortKey if the PriorityQueue is empty. The reference becomes
// invalid the next time that a Sequence is popped from the PriorityQueue.
const SequenceAndSortKey& Peek() const;

// Removes the SequenceAndSortKey with the highest priority from the
// PriorityQueue. Cannot be called on an empty PriorityQueue.
void Pop();

private:
friend class PriorityQueue;

explicit Transaction(PriorityQueue* outer_queue);

// Holds the lock of |outer_queue_| for most of the lifetime of this
// Transaction. Using a scoped_ptr allows the destructor to release the lock
// before performing internal operations which have to be done outside of
// its scope.
scoped_ptr<AutoSchedulerLock> auto_lock_;

PriorityQueue* const outer_queue_;

// Number of times that Push() has been called on this Transaction.
size_t num_pushed_sequences_ = 0;

DISALLOW_COPY_AND_ASSIGN(Transaction);
};

// |sequence_inserted_callback| is a non-null callback invoked when the
// Transaction is done for each Push that was performed with the Transaction.
explicit PriorityQueue(const Closure& sequence_inserted_callback);

// |sequence_inserted_callback| is a non-null callback invoked when the
// Transaction is done for each Push that was performed with the Transaction.
// |predecessor_priority_queue| is a PriorityQueue for which a thread is
// allowed to have an active Transaction when it creates a Transaction for
// this PriorityQueue.
PriorityQueue(const Closure& sequence_inserted_callback,
const PriorityQueue* predecessor_priority_queue);

~PriorityQueue();

// Begins a Transaction. This method cannot be called on a thread which has an
// active Transaction unless the last Transaction created on the thread was
// for the allowed predecessor specified in the constructor of this
// PriorityQueue.
scoped_ptr<Transaction> BeginTransaction();

private:
struct SequenceAndSortKeyComparator {
bool operator()(const scoped_ptr<SequenceAndSortKey>& left,
const scoped_ptr<SequenceAndSortKey>& right) const {
return left->sort_key < right->sort_key;
}
};
using ContainerType =
std::priority_queue<scoped_ptr<SequenceAndSortKey>,
std::vector<scoped_ptr<SequenceAndSortKey>>,
SequenceAndSortKeyComparator>;

// Synchronizes access to |container_|.
SchedulerLock container_lock_;

ContainerType container_;

const Closure sequence_inserted_callback_;

// A null SequenceAndSortKey returned by Peek() when the PriorityQueue is
// empty.
const SequenceAndSortKey empty_sequence_and_sort_key_;

DISALLOW_COPY_AND_ASSIGN(PriorityQueue);
};

} // namespace internal
} // namespace base

#endif // BASE_TASK_SCHEDULER_PRIORITY_QUEUE_H_
Loading

0 comments on commit 6477fbd

Please sign in to comment.