Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

src: implement v8::TaskRunner API in NodePlatform #17134

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 67 additions & 0 deletions deps/v8/include/v8-platform.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,51 @@ class IdleTask {
virtual void Run(double deadline_in_seconds) = 0;
};

/**
* A TaskRunner allows scheduling of tasks. The TaskRunner may still be used to
* post tasks after the isolate gets destructed, but these tasks may not get
* executed anymore. All tasks posted to a given TaskRunner will be invoked in
* sequence. Tasks can be posted from any thread.
*/
class TaskRunner {
public:
/**
* Schedules a task to be invoked by this TaskRunner. The TaskRunner
* implementation takes ownership of |task|.
*/
virtual void PostTask(std::unique_ptr<Task> task) = 0;

/**
* Schedules a task to be invoked by this TaskRunner. The task is scheduled
* after the given number of seconds |delay_in_seconds|. The TaskRunner
* implementation takes ownership of |task|.
*/
virtual void PostDelayedTask(std::unique_ptr<Task> task,
double delay_in_seconds) = 0;

/**
* Schedules an idle task to be invoked by this TaskRunner. The task is
* scheduled when the embedder is idle. Requires that
* TaskRunner::SupportsIdleTasks(isolate) is true. Idle tasks may be reordered
* relative to other task types and may be starved for an arbitrarily long
* time if no idle time is available. The TaskRunner implementation takes
* ownership of |task|.
*/
virtual void PostIdleTask(std::unique_ptr<IdleTask> task) = 0;

/**
* Returns true if idle tasks are enabled for this TaskRunner.
*/
virtual bool IdleTasksEnabled() = 0;

TaskRunner() = default;
virtual ~TaskRunner() = default;

private:
TaskRunner(const TaskRunner&) = delete;
TaskRunner& operator=(const TaskRunner&) = delete;
};

/**
* The interface represents complex arguments to trace events.
*/
Expand Down Expand Up @@ -150,6 +195,28 @@ class Platform {
*/
virtual size_t NumberOfAvailableBackgroundThreads() { return 0; }

/**
* Returns a TaskRunner which can be used to post a task on the foreground.
* This function should only be called from a foreground thread.
*/
virtual std::shared_ptr<v8::TaskRunner> GetForegroundTaskRunner(
Isolate* isolate) {
// TODO(ahaas): Make this function abstract after it got implemented on all
// platforms.
return {};
}

/**
* Returns a TaskRunner which can be used to post a task on a background.
* This function should only be called from a foreground thread.
*/
virtual std::shared_ptr<v8::TaskRunner> GetBackgroundTaskRunner(
Isolate* isolate) {
// TODO(ahaas): Make this function abstract after it got implemented on all
// platforms.
return {};
}

/**
* Schedules a task to be invoked on a background thread. |expected_runtime|
* indicates that the task will run a long time. The Platform implementation
Expand Down
115 changes: 80 additions & 35 deletions src/node_platform.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,43 @@ static void BackgroundRunner(void* data) {
}
}

BackgroundTaskRunner::BackgroundTaskRunner(int thread_pool_size) {
for (int i = 0; i < thread_pool_size; i++) {
std::unique_ptr<uv_thread_t> t { new uv_thread_t() };
if (uv_thread_create(t.get(), BackgroundRunner, &background_tasks_) != 0)
break;
threads_.push_back(std::move(t));
}
}

void BackgroundTaskRunner::PostTask(std::unique_ptr<Task> task) {
background_tasks_.Push(std::move(task));
}

void BackgroundTaskRunner::PostIdleTask(std::unique_ptr<v8::IdleTask> task) {
UNREACHABLE();
}

void BackgroundTaskRunner::PostDelayedTask(std::unique_ptr<v8::Task> task,
double delay_in_seconds) {
UNREACHABLE();
}

void BackgroundTaskRunner::BlockingDrain() {
background_tasks_.BlockingDrain();
}

void BackgroundTaskRunner::Shutdown() {
background_tasks_.Stop();
for (size_t i = 0; i < threads_.size(); i++) {
CHECK_EQ(0, uv_thread_join(threads_[i].get()));
}
}

size_t BackgroundTaskRunner::NumberOfAvailableBackgroundThreads() const {
return threads_.size();
}

PerIsolatePlatformData::PerIsolatePlatformData(
v8::Isolate* isolate, uv_loop_t* loop)
: isolate_(isolate), loop_(loop) {
Expand All @@ -38,17 +75,20 @@ void PerIsolatePlatformData::FlushTasks(uv_async_t* handle) {
platform_data->FlushForegroundTasksInternal();
}

void PerIsolatePlatformData::CallOnForegroundThread(
std::unique_ptr<Task> task) {
void PerIsolatePlatformData::PostIdleTask(std::unique_ptr<v8::IdleTask> task) {
UNREACHABLE();
}

void PerIsolatePlatformData::PostTask(std::unique_ptr<Task> task) {
foreground_tasks_.Push(std::move(task));
uv_async_send(flush_tasks_);
}

void PerIsolatePlatformData::CallDelayedOnForegroundThread(
std::unique_ptr<Task> task, double delay_in_seconds) {
void PerIsolatePlatformData::PostDelayedTask(
std::unique_ptr<Task> task, double delay_in_seconds) {
std::unique_ptr<DelayedTask> delayed(new DelayedTask());
delayed->task = std::move(task);
delayed->platform_data = this;
delayed->platform_data = shared_from_this();
delayed->timeout = delay_in_seconds;
foreground_delayed_tasks_.Push(std::move(delayed));
uv_async_send(flush_tasks_);
Expand Down Expand Up @@ -80,49 +120,43 @@ NodePlatform::NodePlatform(int thread_pool_size,
TracingController* controller = new TracingController();
tracing_controller_.reset(controller);
}
for (int i = 0; i < thread_pool_size; i++) {
uv_thread_t* t = new uv_thread_t();
if (uv_thread_create(t, BackgroundRunner, &background_tasks_) != 0) {
delete t;
break;
}
threads_.push_back(std::unique_ptr<uv_thread_t>(t));
}
background_task_runner_ =
std::make_shared<BackgroundTaskRunner>(thread_pool_size);
}

void NodePlatform::RegisterIsolate(IsolateData* isolate_data, uv_loop_t* loop) {
Isolate* isolate = isolate_data->isolate();
Mutex::ScopedLock lock(per_isolate_mutex_);
PerIsolatePlatformData* existing = per_isolate_[isolate];
if (existing != nullptr)
std::shared_ptr<PerIsolatePlatformData> existing = per_isolate_[isolate];
if (existing) {
existing->ref();
else
per_isolate_[isolate] = new PerIsolatePlatformData(isolate, loop);
} else {
per_isolate_[isolate] =
std::make_shared<PerIsolatePlatformData>(isolate, loop);
}
}

void NodePlatform::UnregisterIsolate(IsolateData* isolate_data) {
Isolate* isolate = isolate_data->isolate();
Mutex::ScopedLock lock(per_isolate_mutex_);
PerIsolatePlatformData* existing = per_isolate_[isolate];
CHECK_NE(existing, nullptr);
std::shared_ptr<PerIsolatePlatformData> existing = per_isolate_[isolate];
CHECK(existing);
if (existing->unref() == 0) {
delete existing;
per_isolate_.erase(isolate);
}
}

void NodePlatform::Shutdown() {
background_tasks_.Stop();
for (size_t i = 0; i < threads_.size(); i++) {
CHECK_EQ(0, uv_thread_join(threads_[i].get()));
background_task_runner_->Shutdown();

{
Mutex::ScopedLock lock(per_isolate_mutex_);
per_isolate_.clear();
}
Mutex::ScopedLock lock(per_isolate_mutex_);
for (const auto& pair : per_isolate_)
delete pair.second;
}

size_t NodePlatform::NumberOfAvailableBackgroundThreads() {
return threads_.size();
return background_task_runner_->NumberOfAvailableBackgroundThreads();
}

void PerIsolatePlatformData::RunForegroundTask(std::unique_ptr<Task> task) {
Expand Down Expand Up @@ -155,14 +189,14 @@ void PerIsolatePlatformData::CancelPendingDelayedTasks() {
}

void NodePlatform::DrainBackgroundTasks(Isolate* isolate) {
PerIsolatePlatformData* per_isolate = ForIsolate(isolate);
std::shared_ptr<PerIsolatePlatformData> per_isolate = ForIsolate(isolate);

do {
// Right now, there is no way to drain only background tasks associated
// with a specific isolate, so this sometimes does more work than
// necessary. In the long run, that functionality is probably going to
// be available anyway, though.
background_tasks_.BlockingDrain();
background_task_runner_->BlockingDrain();
} while (per_isolate->FlushForegroundTasksInternal());
}

Expand Down Expand Up @@ -198,24 +232,25 @@ bool PerIsolatePlatformData::FlushForegroundTasksInternal() {

void NodePlatform::CallOnBackgroundThread(Task* task,
ExpectedRuntime expected_runtime) {
background_tasks_.Push(std::unique_ptr<Task>(task));
background_task_runner_->PostTask(std::unique_ptr<Task>(task));
}

PerIsolatePlatformData* NodePlatform::ForIsolate(Isolate* isolate) {
std::shared_ptr<PerIsolatePlatformData>
NodePlatform::ForIsolate(Isolate* isolate) {
Mutex::ScopedLock lock(per_isolate_mutex_);
PerIsolatePlatformData* data = per_isolate_[isolate];
CHECK_NE(data, nullptr);
std::shared_ptr<PerIsolatePlatformData> data = per_isolate_[isolate];
CHECK(data);
return data;
}

void NodePlatform::CallOnForegroundThread(Isolate* isolate, Task* task) {
ForIsolate(isolate)->CallOnForegroundThread(std::unique_ptr<Task>(task));
ForIsolate(isolate)->PostTask(std::unique_ptr<Task>(task));
}

void NodePlatform::CallDelayedOnForegroundThread(Isolate* isolate,
Task* task,
double delay_in_seconds) {
ForIsolate(isolate)->CallDelayedOnForegroundThread(
ForIsolate(isolate)->PostDelayedTask(
std::unique_ptr<Task>(task), delay_in_seconds);
}

Expand All @@ -229,6 +264,16 @@ void NodePlatform::CancelPendingDelayedTasks(v8::Isolate* isolate) {

bool NodePlatform::IdleTasksEnabled(Isolate* isolate) { return false; }

std::shared_ptr<v8::TaskRunner>
NodePlatform::GetBackgroundTaskRunner(Isolate* isolate) {
return background_task_runner_;
}

std::shared_ptr<v8::TaskRunner>
NodePlatform::GetForegroundTaskRunner(Isolate* isolate) {
return ForIsolate(isolate);
}

double NodePlatform::MonotonicallyIncreasingTime() {
// Convert nanos to seconds.
return uv_hrtime() / 1e9;
Expand Down
48 changes: 39 additions & 9 deletions src/node_platform.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,17 +43,22 @@ struct DelayedTask {
std::unique_ptr<v8::Task> task;
uv_timer_t timer;
double timeout;
PerIsolatePlatformData* platform_data;
std::shared_ptr<PerIsolatePlatformData> platform_data;
};

class PerIsolatePlatformData {
// This acts as the foreground task runner for a given Isolate.
class PerIsolatePlatformData :
public v8::TaskRunner,
public std::enable_shared_from_this<PerIsolatePlatformData> {
public:
PerIsolatePlatformData(v8::Isolate* isolate, uv_loop_t* loop);
~PerIsolatePlatformData();

void CallOnForegroundThread(std::unique_ptr<v8::Task> task);
void CallDelayedOnForegroundThread(std::unique_ptr<v8::Task> task,
double delay_in_seconds);
void PostTask(std::unique_ptr<v8::Task> task) override;
void PostIdleTask(std::unique_ptr<v8::IdleTask> task) override;
void PostDelayedTask(std::unique_ptr<v8::Task> task,
double delay_in_seconds) override;
bool IdleTasksEnabled() override { return false; };

void Shutdown();

Expand Down Expand Up @@ -84,6 +89,26 @@ class PerIsolatePlatformData {
std::vector<DelayedTaskPointer> scheduled_delayed_tasks_;
};

// This acts as the single background task runner for all Isolates.
class BackgroundTaskRunner : public v8::TaskRunner {
public:
explicit BackgroundTaskRunner(int thread_pool_size);

void PostTask(std::unique_ptr<v8::Task> task) override;
void PostIdleTask(std::unique_ptr<v8::IdleTask> task) override;
void PostDelayedTask(std::unique_ptr<v8::Task> task,
double delay_in_seconds) override;
bool IdleTasksEnabled() override { return false; };

void BlockingDrain();
void Shutdown();

size_t NumberOfAvailableBackgroundThreads() const;
private:
TaskQueue<v8::Task> background_tasks_;
std::vector<std::unique_ptr<uv_thread_t>> threads_;
};

class NodePlatform : public MultiIsolatePlatform {
public:
NodePlatform(int thread_pool_size, v8::TracingController* tracing_controller);
Expand All @@ -109,15 +134,20 @@ class NodePlatform : public MultiIsolatePlatform {
void RegisterIsolate(IsolateData* isolate_data, uv_loop_t* loop) override;
void UnregisterIsolate(IsolateData* isolate_data) override;

std::shared_ptr<v8::TaskRunner> GetBackgroundTaskRunner(
v8::Isolate* isolate) override;
std::shared_ptr<v8::TaskRunner> GetForegroundTaskRunner(
v8::Isolate* isolate) override;

private:
PerIsolatePlatformData* ForIsolate(v8::Isolate* isolate);
std::shared_ptr<PerIsolatePlatformData> ForIsolate(v8::Isolate* isolate);

Mutex per_isolate_mutex_;
std::unordered_map<v8::Isolate*, PerIsolatePlatformData*> per_isolate_;
TaskQueue<v8::Task> background_tasks_;
std::vector<std::unique_ptr<uv_thread_t>> threads_;
std::unordered_map<v8::Isolate*,
std::shared_ptr<PerIsolatePlatformData>> per_isolate_;

std::unique_ptr<v8::TracingController> tracing_controller_;
std::shared_ptr<BackgroundTaskRunner> background_task_runner_;
};

} // namespace node
Expand Down