@@ -1103,9 +1103,19 @@ void Environment::InitializeLibuv() {
11031103 Context::Scope context_scope (env->context ());
11041104 env->RunAndClearNativeImmediates ();
11051105 }));
1106+
1107+ CHECK_EQ (
1108+ 0 ,
1109+ uv_async_init (event_loop (), ¬ifications_async_, [](uv_async_t * async) {
1110+ Environment* env =
1111+ ContainerOf (&Environment::notifications_async_, async);
1112+ env->DispatchNotifications ();
1113+ }));
1114+
11061115 uv_unref (reinterpret_cast <uv_handle_t *>(&idle_prepare_handle_));
11071116 uv_unref (reinterpret_cast <uv_handle_t *>(&idle_check_handle_));
11081117 uv_unref (reinterpret_cast <uv_handle_t *>(&task_queues_async_));
1118+ uv_unref (reinterpret_cast <uv_handle_t *>(¬ifications_async_));
11091119
11101120 {
11111121 Mutex::ScopedLock lock (native_immediates_threadsafe_mutex_);
@@ -1215,6 +1225,7 @@ void Environment::ClosePerEnvHandles() {
12151225 close_and_finish (reinterpret_cast <uv_handle_t *>(&idle_prepare_handle_));
12161226 close_and_finish (reinterpret_cast <uv_handle_t *>(&idle_check_handle_));
12171227 close_and_finish (reinterpret_cast <uv_handle_t *>(&task_queues_async_));
1228+ close_and_finish (reinterpret_cast <uv_handle_t *>(¬ifications_async_));
12181229}
12191230
12201231void Environment::CleanupHandles () {
@@ -2260,4 +2271,97 @@ v8::CpuProfile* Environment::StopCpuProfile(v8::ProfilerId profile_id) {
22602271 return profile;
22612272}
22622273
2274+ Mutex Environment::notifications_mutex_;
2275+ uint64_t Environment::next_notification_id_ (0 );
2276+ std::unordered_map<uint64_t , Environment*> Environment::notifications_;
2277+
2278+ void Environment::RegisterNotification (
2279+ const v8::FunctionCallbackInfo<v8::Value>& args) {
2280+ CHECK (args[0 ]->IsFunction ());
2281+
2282+ Mutex::ScopedLock lock (notifications_mutex_);
2283+ Environment* env = Environment::GetCurrent (args);
2284+ Isolate* isolate = env->isolate ();
2285+
2286+ uint64_t id = ++Environment::next_notification_id_;
2287+ notifications_.emplace (id, env);
2288+
2289+ v8::Global<v8::Function> global;
2290+ global.Reset (isolate, args[0 ].As <v8::Function>());
2291+ env->notifications_callbacks_ .emplace (id, std::move (global));
2292+
2293+ args.GetReturnValue ().Set (v8::BigInt::New (isolate, id));
2294+ }
2295+
2296+ void Environment::UnregisterNotification (
2297+ const v8::FunctionCallbackInfo<v8::Value>& args) {
2298+ CHECK (args[0 ]->IsBigInt ());
2299+
2300+ bool lossless = false ;
2301+ uint64_t id = args[0 ].As <v8::BigInt>()->Uint64Value (&lossless);
2302+
2303+ if (!lossless) {
2304+ std::cout << " LOSSLESS TRUE\n " ;
2305+ return THROW_ERR_INVALID_NOTIFICATION (
2306+ args.GetIsolate (), " Invalid notification %un" , id);
2307+ }
2308+
2309+ Mutex::ScopedLock lock (notifications_mutex_);
2310+ Environment* env = Environment::GetCurrent (args);
2311+
2312+ if (env->notifications_callbacks_ .erase (id) == 0 ) {
2313+ return THROW_ERR_INVALID_NOTIFICATION (
2314+ args.GetIsolate (), " Invalid notification %llun" , id);
2315+ }
2316+
2317+ env->notifications_queue_ .erase (id);
2318+ notifications_.erase (id);
2319+ }
2320+
2321+ void Environment::GetNotifications (
2322+ const v8::FunctionCallbackInfo<v8::Value>& args) {
2323+ Mutex::ScopedLock lock (notifications_mutex_);
2324+ Environment* env = Environment::GetCurrent (args);
2325+ Isolate* isolate = env->isolate ();
2326+
2327+ v8::LocalVector<Value> ids (isolate);
2328+ for (auto & pairs : env->notifications_callbacks_ ) {
2329+ ids.push_back (v8::BigInt::New (isolate, pairs.first ));
2330+ }
2331+
2332+ args.GetReturnValue ().Set (Array::New (isolate, ids.data (), ids.size ()));
2333+ }
2334+
2335+ void Environment::SendNotification (
2336+ const v8::FunctionCallbackInfo<v8::Value>& args) {
2337+ CHECK (args[0 ]->IsBigInt ());
2338+
2339+ uint64_t id = args[0 ].As <v8::BigInt>()->Uint64Value ();
2340+ Environment* env = notifications_[id];
2341+
2342+ if (env == nullptr ) {
2343+ return THROW_ERR_INVALID_NOTIFICATION (
2344+ args.GetIsolate (), " Invalid notification %llun" , id);
2345+ }
2346+
2347+ Mutex::ScopedLock lock (notifications_mutex_);
2348+ env->notifications_queue_ .insert (id);
2349+ uv_async_send (&env->notifications_async_ );
2350+ }
2351+
2352+ void Environment::DispatchNotifications () {
2353+ Isolate* isolate = this ->isolate ();
2354+ v8::Local<v8::Object> process = process_object ();
2355+ Mutex::ScopedLock lock (notifications_mutex_);
2356+ HandleScope handle_scope (isolate);
2357+
2358+ for (auto id : notifications_queue_) {
2359+ v8::Local<v8::Function> callback =
2360+ notifications_callbacks_[id].Get (isolate);
2361+ MakeCallback (isolate, process, callback, 0 , nullptr , {0 , 0 })
2362+ .ToLocalChecked ();
2363+ }
2364+
2365+ notifications_queue_.clear ();
2366+ }
22632367} // namespace node
0 commit comments