forked from nodejs/node
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathnode_platform.h
More file actions
292 lines (241 loc) Β· 9.92 KB
/
node_platform.h
File metadata and controls
292 lines (241 loc) Β· 9.92 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
#ifndef SRC_NODE_PLATFORM_H_
#define SRC_NODE_PLATFORM_H_
#if defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS
#include <functional>
#include <queue>
#include <type_traits>
#include <unordered_map>
#include <vector>
#include "libplatform/libplatform.h"
#include "node.h"
#include "node_mutex.h"
#include "uv.h"
namespace node {
class NodePlatform;
class IsolateData;
class PerIsolatePlatformData;
template <typename, typename = void>
struct has_priority : std::false_type {};
template <typename T>
struct has_priority<T, std::void_t<decltype(std::declval<T>().priority)>>
: std::true_type {};
template <class T>
class TaskQueue {
public:
// If the entry type has a priority member, order the priority queue by
// that - higher priority first. Otherwise, maintain insertion order.
struct EntryCompare {
bool operator()(const std::unique_ptr<T>& a,
const std::unique_ptr<T>& b) const {
if constexpr (has_priority<T>::value) {
return a->priority < b->priority;
} else {
return false;
}
}
};
using PriorityQueue = std::priority_queue<std::unique_ptr<T>,
std::vector<std::unique_ptr<T>>,
EntryCompare>;
class Locked {
public:
void Push(std::unique_ptr<T> task, bool outstanding = false);
std::unique_ptr<T> Pop();
std::unique_ptr<T> BlockingPop();
void NotifyOfOutstandingCompletion();
void BlockingDrain();
// Returns true if all outstanding tasks completed before the timeout,
// false if timed out. timeout_ns is in nanoseconds.
bool TimedBlockingDrain(uint64_t timeout_ns);
void Stop();
PriorityQueue PopAll();
private:
friend class TaskQueue;
explicit Locked(TaskQueue* queue);
TaskQueue* queue_;
Mutex::ScopedLock lock_;
};
TaskQueue();
~TaskQueue() = default;
Locked Lock() { return Locked(this); }
private:
Mutex lock_;
ConditionVariable tasks_available_;
ConditionVariable outstanding_tasks_drained_;
int outstanding_tasks_;
bool stopped_;
PriorityQueue task_queue_;
};
struct TaskQueueEntry {
std::unique_ptr<v8::Task> task;
v8::TaskPriority priority;
TaskQueueEntry(std::unique_ptr<v8::Task> t, v8::TaskPriority p)
: task(std::move(t)), priority(p) {}
inline bool is_outstanding() const {
return priority == v8::TaskPriority::kUserBlocking;
}
};
struct DelayedTask {
std::unique_ptr<v8::Task> task;
v8::TaskPriority priority;
uv_timer_t timer;
double timeout;
std::shared_ptr<PerIsolatePlatformData> platform_data;
};
enum class PlatformDebugLogLevel {
kNone = 0,
kMinimal = 1,
kVerbose = 2,
};
// This acts as the foreground task runner for a given Isolate.
class PerIsolatePlatformData
: public IsolatePlatformDelegate,
public v8::TaskRunner,
public std::enable_shared_from_this<PerIsolatePlatformData> {
public:
PerIsolatePlatformData(
v8::Isolate* isolate,
uv_loop_t* loop,
PlatformDebugLogLevel debug_log_level = PlatformDebugLogLevel::kNone);
~PerIsolatePlatformData() override;
std::shared_ptr<v8::TaskRunner> GetForegroundTaskRunner() override;
bool IdleTasksEnabled() override { return false; }
// Non-nestable tasks are treated like regular tasks.
bool NonNestableTasksEnabled() const override { return true; }
bool NonNestableDelayedTasksEnabled() const override { return true; }
void AddShutdownCallback(void (*callback)(void*), void* data);
void Shutdown();
// Returns true if work was dispatched or executed. New tasks that are
// posted during flushing of the queue are postponed until the next
// flushing.
bool FlushForegroundTasksInternal();
const uv_loop_t* event_loop() const { return loop_; }
private:
// v8::TaskRunner implementation.
void PostTaskImpl(std::unique_ptr<v8::Task> task,
const v8::SourceLocation& location) override;
void PostDelayedTaskImpl(std::unique_ptr<v8::Task> task,
double delay_in_seconds,
const v8::SourceLocation& location) override;
void PostIdleTaskImpl(std::unique_ptr<v8::IdleTask> task,
const v8::SourceLocation& location) override;
void PostNonNestableTaskImpl(std::unique_ptr<v8::Task> task,
const v8::SourceLocation& location) override;
void PostNonNestableDelayedTaskImpl(
std::unique_ptr<v8::Task> task,
double delay_in_seconds,
const v8::SourceLocation& location) override;
void DeleteFromScheduledTasks(DelayedTask* task);
void DecreaseHandleCount();
static void FlushTasks(uv_async_t* handle);
void RunForegroundTask(std::unique_ptr<v8::Task> task);
static void RunForegroundTask(uv_timer_t* timer);
uv_async_t* flush_tasks_ = nullptr;
struct ShutdownCallback {
void (*cb)(void*);
void* data;
};
typedef std::vector<ShutdownCallback> ShutdownCbList;
ShutdownCbList shutdown_callbacks_;
// shared_ptr to self to keep this object alive during shutdown.
std::shared_ptr<PerIsolatePlatformData> self_reference_;
uint32_t uv_handle_count_ = 1; // 1 = flush_tasks_
v8::Isolate* const isolate_;
uv_loop_t* const loop_;
// When acquiring locks for both task queues, lock foreground_tasks_
// first then foreground_delayed_tasks_ to avoid deadlocks.
TaskQueue<TaskQueueEntry> foreground_tasks_;
TaskQueue<DelayedTask> foreground_delayed_tasks_;
// Use a custom deleter because libuv needs to close the handle first.
typedef std::unique_ptr<DelayedTask, void (*)(DelayedTask*)>
DelayedTaskPointer;
std::vector<DelayedTaskPointer> scheduled_delayed_tasks_;
PlatformDebugLogLevel debug_log_level_ = PlatformDebugLogLevel::kNone;
};
// This acts as the single worker thread task runner for all Isolates.
class WorkerThreadsTaskRunner {
public:
explicit WorkerThreadsTaskRunner(int thread_pool_size,
PlatformDebugLogLevel debug_log_level);
void PostTask(v8::TaskPriority priority,
std::unique_ptr<v8::Task> task,
const v8::SourceLocation& location);
void PostDelayedTask(v8::TaskPriority priority,
std::unique_ptr<v8::Task> task,
const v8::SourceLocation& location,
double delay_in_seconds);
void BlockingDrain();
// Returns true if all outstanding tasks completed before the timeout,
// false if timed out. timeout_ns is in nanoseconds.
bool TimedBlockingDrain(uint64_t timeout_ns);
void Shutdown();
int NumberOfWorkerThreads() const;
private:
// A queue shared by all threads. The consumers are the worker threads which
// take tasks from it to run in PlatformWorkerThread(). The producers can be
// any thread. Both the foreground thread and the worker threads can push
// tasks into the queue via v8::Platform::PostTaskOnWorkerThread() which
// eventually calls PostTask() on this class. When any thread calls
// v8::Platform::PostDelayedTaskOnWorkerThread(), the DelayedTaskScheduler
// thread will schedule a timer that pushes the delayed tasks back into this
// queue when the timer expires.
TaskQueue<TaskQueueEntry> pending_worker_tasks_;
class DelayedTaskScheduler;
std::unique_ptr<DelayedTaskScheduler> delayed_task_scheduler_;
std::vector<std::unique_ptr<uv_thread_t>> threads_;
PlatformDebugLogLevel debug_log_level_ = PlatformDebugLogLevel::kNone;
};
class NodePlatform : public MultiIsolatePlatform {
public:
NodePlatform(int thread_pool_size,
v8::TracingController* tracing_controller,
v8::PageAllocator* page_allocator = nullptr);
~NodePlatform() override;
void DrainTasks(v8::Isolate* isolate) override;
void Shutdown();
// v8::Platform implementation.
int NumberOfWorkerThreads() override;
void PostTaskOnWorkerThreadImpl(v8::TaskPriority priority,
std::unique_ptr<v8::Task> task,
const v8::SourceLocation& location) override;
void PostDelayedTaskOnWorkerThreadImpl(
v8::TaskPriority priority,
std::unique_ptr<v8::Task> task,
double delay_in_seconds,
const v8::SourceLocation& location) override;
bool IdleTasksEnabled(v8::Isolate* isolate) override;
double MonotonicallyIncreasingTime() override;
double CurrentClockTimeMillis() override;
v8::TracingController* GetTracingController() override;
bool FlushForegroundTasks(v8::Isolate* isolate) override;
std::unique_ptr<v8::JobHandle> CreateJobImpl(
v8::TaskPriority priority,
std::unique_ptr<v8::JobTask> job_task,
const v8::SourceLocation& location) override;
void RegisterIsolate(v8::Isolate* isolate, uv_loop_t* loop) override;
void RegisterIsolate(v8::Isolate* isolate,
IsolatePlatformDelegate* delegate) override;
void UnregisterIsolate(v8::Isolate* isolate) override;
void AddIsolateFinishedCallback(v8::Isolate* isolate,
void (*callback)(void*),
void* data) override;
std::shared_ptr<v8::TaskRunner> GetForegroundTaskRunner(
v8::Isolate* isolate, v8::TaskPriority priority) override;
Platform::StackTracePrinter GetStackTracePrinter() override;
v8::PageAllocator* GetPageAllocator() override;
private:
IsolatePlatformDelegate* ForIsolate(v8::Isolate* isolate);
std::shared_ptr<PerIsolatePlatformData> ForNodeIsolate(v8::Isolate* isolate);
Mutex per_isolate_mutex_;
using DelegatePair = std::pair<IsolatePlatformDelegate*,
std::shared_ptr<PerIsolatePlatformData>>;
std::unordered_map<v8::Isolate*, DelegatePair> per_isolate_;
v8::TracingController* tracing_controller_;
v8::PageAllocator* page_allocator_;
std::shared_ptr<WorkerThreadsTaskRunner> worker_thread_task_runner_;
bool has_shut_down_ = false;
PlatformDebugLogLevel debug_log_level_ = PlatformDebugLogLevel::kNone;
};
} // namespace node
#endif // defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS
#endif // SRC_NODE_PLATFORM_H_