Skip to content

Commit b487456

Browse files
committed
src: add C++ support for diagnostics channels
Add a C++ API for diagnostics channels that allows native code to check for subscribers and publish messages without unnecessary JS boundary crossings. Uses a shared AliasedUint32Array buffer between C++ and JS to track subscriber counts per channel, enabling a fast inline check (HasSubscribers) that reads the buffer directly.
1 parent 27ec431 commit b487456

10 files changed

Lines changed: 546 additions & 1 deletion

lib/diagnostics_channel.js

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,9 @@ const {
3131

3232
const { triggerUncaughtException } = internalBinding('errors');
3333

34+
const dc_binding = internalBinding('diagnostics_channel');
35+
const { subscribers: subscriberCounts } = dc_binding;
36+
3437
const { WeakReference } = require('internal/util');
3538

3639
// Can't delete when weakref count reaches 0 as it could increment again.
@@ -108,6 +111,7 @@ class ActiveChannel {
108111
this._subscribers = ArrayPrototypeSlice(this._subscribers);
109112
ArrayPrototypePush(this._subscribers, subscription);
110113
channels.incRef(this.name);
114+
if (this._index !== undefined) subscriberCounts[this._index]++;
111115
}
112116

113117
unsubscribe(subscription) {
@@ -120,14 +124,18 @@ class ActiveChannel {
120124
ArrayPrototypePushApply(this._subscribers, after);
121125

122126
channels.decRef(this.name);
127+
if (this._index !== undefined) subscriberCounts[this._index]--;
123128
maybeMarkInactive(this);
124129

125130
return true;
126131
}
127132

128133
bindStore(store, transform) {
129134
const replacing = this._stores.has(store);
130-
if (!replacing) channels.incRef(this.name);
135+
if (!replacing) {
136+
channels.incRef(this.name);
137+
if (this._index !== undefined) subscriberCounts[this._index]++;
138+
}
131139
this._stores.set(store, transform);
132140
}
133141

@@ -139,6 +147,7 @@ class ActiveChannel {
139147
this._stores.delete(store);
140148

141149
channels.decRef(this.name);
150+
if (this._index !== undefined) subscriberCounts[this._index]--;
142151
maybeMarkInactive(this);
143152

144153
return true;
@@ -183,6 +192,9 @@ class Channel {
183192
this._subscribers = undefined;
184193
this._stores = undefined;
185194
this.name = name;
195+
if (typeof name === 'string') {
196+
this._index = dc_binding.getOrCreateChannelIndex(name);
197+
}
186198

187199
channels.set(name, this);
188200
}
@@ -434,6 +446,11 @@ function tracingChannel(nameOrChannels) {
434446
return new TracingChannel(nameOrChannels);
435447
}
436448

449+
dc_binding.setPublishCallback((name, message) => {
450+
const ch = channels.get(name);
451+
if (ch) ch.publish(message);
452+
});
453+
437454
module.exports = {
438455
channel,
439456
hasSubscribers,

node.gyp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,7 @@
133133
'src/node_main_instance.cc',
134134
'src/node_messaging.cc',
135135
'src/node_metadata.cc',
136+
'src/node_diagnostics_channel.cc',
136137
'src/node_modules.cc',
137138
'src/node_options.cc',
138139
'src/node_os.cc',
@@ -270,6 +271,7 @@
270271
'src/node_messaging.h',
271272
'src/node_metadata.h',
272273
'src/node_mutex.h',
274+
'src/node_diagnostics_channel.h',
273275
'src/node_modules.h',
274276
'src/node_object_wrap.h',
275277
'src/node_options.h',

src/base_object_types.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ namespace node {
1010
// what the class passes to SET_BINDING_ID(), the second argument should match
1111
// the C++ class name.
1212
#define SERIALIZABLE_BINDING_TYPES(V) \
13+
V(diagnostics_channel_binding_data, diagnostics_channel::BindingData) \
1314
V(encoding_binding_data, encoding_binding::BindingData) \
1415
V(fs_binding_data, fs::BindingData) \
1516
V(mksnapshot_binding_data, mksnapshot::BindingData) \

src/node_binding.cc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
V(constants) \
4949
V(contextify) \
5050
V(credentials) \
51+
V(diagnostics_channel) \
5152
V(encoding_binding) \
5253
V(errors) \
5354
V(fs) \

src/node_binding.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ static_assert(static_cast<int>(NM_F_LINKED) ==
4949
V(blob) \
5050
V(builtins) \
5151
V(contextify) \
52+
V(diagnostics_channel) \
5253
V(encoding_binding) \
5354
V(fs) \
5455
V(fs_dir) \

src/node_diagnostics_channel.cc

Lines changed: 176 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,176 @@
1+
#include "node_diagnostics_channel.h"
2+
3+
#include "env-inl.h"
4+
#include "node_external_reference.h"
5+
#include "util-inl.h"
6+
#include "v8.h"
7+
8+
#include <cstdint>
9+
10+
namespace node {
11+
namespace diagnostics_channel {
12+
13+
using v8::Context;
14+
using v8::FunctionCallbackInfo;
15+
using v8::HandleScope;
16+
using v8::Isolate;
17+
using v8::Local;
18+
using v8::Object;
19+
using v8::ObjectTemplate;
20+
using v8::SnapshotCreator;
21+
using v8::String;
22+
using v8::Value;
23+
24+
BindingData::BindingData(Realm* realm,
25+
Local<Object> wrap,
26+
InternalFieldInfo* info)
27+
: SnapshotableObject(realm, wrap, type_int),
28+
subscribers_(realm->isolate(),
29+
kMaxChannels,
30+
MAYBE_FIELD_PTR(info, subscribers)) {
31+
if (info == nullptr) {
32+
wrap->Set(realm->context(),
33+
FIXED_ONE_BYTE_STRING(realm->isolate(), "subscribers"),
34+
subscribers_.GetJSArray())
35+
.Check();
36+
} else {
37+
subscribers_.Deserialize(realm->context());
38+
}
39+
subscribers_.MakeWeak();
40+
}
41+
42+
void BindingData::MemoryInfo(MemoryTracker* tracker) const {
43+
tracker->TrackField("subscribers", subscribers_);
44+
}
45+
46+
uint32_t BindingData::GetOrCreateChannelIndex(const std::string& name) {
47+
auto it = channel_indices_.find(name);
48+
if (it != channel_indices_.end()) {
49+
return it->second;
50+
}
51+
CHECK_LT(next_channel_index_, kMaxChannels);
52+
uint32_t index = next_channel_index_++;
53+
channel_indices_.emplace(name, index);
54+
return index;
55+
}
56+
57+
void BindingData::GetOrCreateChannelIndex(
58+
const FunctionCallbackInfo<Value>& args) {
59+
Realm* realm = Realm::GetCurrent(args);
60+
BindingData* binding = realm->GetBindingData<BindingData>();
61+
CHECK_NOT_NULL(binding);
62+
63+
CHECK(args[0]->IsString());
64+
Utf8Value name(realm->isolate(), args[0]);
65+
66+
uint32_t index = binding->GetOrCreateChannelIndex(*name);
67+
args.GetReturnValue().Set(index);
68+
}
69+
70+
void BindingData::SetPublishCallback(
71+
const FunctionCallbackInfo<Value>& args) {
72+
Realm* realm = Realm::GetCurrent(args);
73+
BindingData* binding = realm->GetBindingData<BindingData>();
74+
CHECK_NOT_NULL(binding);
75+
76+
CHECK(args[0]->IsFunction());
77+
binding->publish_callback_.Reset(realm->isolate(),
78+
args[0].As<v8::Function>());
79+
}
80+
81+
bool BindingData::PrepareForSerialization(Local<Context> context,
82+
SnapshotCreator* creator) {
83+
DCHECK_NULL(internal_field_info_);
84+
internal_field_info_ = InternalFieldInfoBase::New<InternalFieldInfo>(type());
85+
internal_field_info_->subscribers =
86+
subscribers_.Serialize(context, creator);
87+
publish_callback_.Reset();
88+
return true;
89+
}
90+
91+
InternalFieldInfoBase* BindingData::Serialize(int index) {
92+
DCHECK_IS_SNAPSHOT_SLOT(index);
93+
InternalFieldInfo* info = internal_field_info_;
94+
internal_field_info_ = nullptr;
95+
return info;
96+
}
97+
98+
void BindingData::Deserialize(Local<Context> context,
99+
Local<Object> holder,
100+
int index,
101+
InternalFieldInfoBase* info) {
102+
DCHECK_IS_SNAPSHOT_SLOT(index);
103+
HandleScope scope(Isolate::GetCurrent());
104+
Realm* realm = Realm::GetCurrent(context);
105+
BindingData* binding = realm->AddBindingData<BindingData>(
106+
holder, static_cast<InternalFieldInfo*>(info));
107+
CHECK_NOT_NULL(binding);
108+
}
109+
110+
void BindingData::CreatePerIsolateProperties(IsolateData* isolate_data,
111+
Local<ObjectTemplate> target) {
112+
Isolate* isolate = isolate_data->isolate();
113+
SetMethod(
114+
isolate, target, "getOrCreateChannelIndex", GetOrCreateChannelIndex);
115+
SetMethod(isolate, target, "setPublishCallback", SetPublishCallback);
116+
}
117+
118+
void BindingData::CreatePerContextProperties(Local<Object> target,
119+
Local<Value> unused,
120+
Local<Context> context,
121+
void* priv) {
122+
Realm* realm = Realm::GetCurrent(context);
123+
BindingData* const binding = realm->AddBindingData<BindingData>(target);
124+
if (binding == nullptr) return;
125+
}
126+
127+
void BindingData::RegisterExternalReferences(
128+
ExternalReferenceRegistry* registry) {
129+
registry->Register(GetOrCreateChannelIndex);
130+
registry->Register(SetPublishCallback);
131+
}
132+
133+
Channel::Channel(BindingData* binding_data, uint32_t index, const char* name)
134+
: binding_data_(binding_data), index_(index), name_(name) {}
135+
136+
Channel Channel::Get(Environment* env, const char* name) {
137+
Realm* realm = env->principal_realm();
138+
BindingData* binding = realm->GetBindingData<BindingData>();
139+
if (binding == nullptr) {
140+
return Channel(nullptr, 0, name);
141+
}
142+
uint32_t index = binding->GetOrCreateChannelIndex(std::string(name));
143+
return Channel(binding, index, name);
144+
}
145+
146+
void Channel::Publish(Environment* env, Local<Value> message) const {
147+
if (!HasSubscribers()) return;
148+
149+
Isolate* isolate = env->isolate();
150+
HandleScope handle_scope(isolate);
151+
Local<Context> context = env->context();
152+
Context::Scope context_scope(context);
153+
154+
if (binding_data_->publish_callback_.IsEmpty()) return;
155+
156+
Local<v8::Function> callback =
157+
binding_data_->publish_callback_.Get(isolate);
158+
Local<String> channel_name =
159+
String::NewFromUtf8(isolate, name_).ToLocalChecked();
160+
161+
Local<Value> argv[] = {channel_name, message};
162+
USE(callback->Call(context, v8::Undefined(isolate), 2, argv));
163+
}
164+
165+
} // namespace diagnostics_channel
166+
} // namespace node
167+
168+
NODE_BINDING_CONTEXT_AWARE_INTERNAL(
169+
diagnostics_channel,
170+
node::diagnostics_channel::BindingData::CreatePerContextProperties)
171+
NODE_BINDING_PER_ISOLATE_INIT(
172+
diagnostics_channel,
173+
node::diagnostics_channel::BindingData::CreatePerIsolateProperties)
174+
NODE_BINDING_EXTERNAL_REFERENCE(
175+
diagnostics_channel,
176+
node::diagnostics_channel::BindingData::RegisterExternalReferences)

src/node_diagnostics_channel.h

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
#ifndef SRC_NODE_DIAGNOSTICS_CHANNEL_H_
2+
#define SRC_NODE_DIAGNOSTICS_CHANNEL_H_
3+
4+
#if defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS
5+
6+
#include <cinttypes>
7+
#include <string>
8+
#include <unordered_map>
9+
#include "aliased_buffer.h"
10+
#include "node_snapshotable.h"
11+
12+
namespace node {
13+
class ExternalReferenceRegistry;
14+
15+
namespace diagnostics_channel {
16+
17+
class Channel;
18+
19+
class BindingData : public SnapshotableObject {
20+
public:
21+
static constexpr size_t kMaxChannels = 1024;
22+
23+
struct InternalFieldInfo : public node::InternalFieldInfoBase {
24+
AliasedBufferIndex subscribers;
25+
};
26+
27+
BindingData(Realm* realm,
28+
v8::Local<v8::Object> wrap,
29+
InternalFieldInfo* info = nullptr);
30+
31+
SERIALIZABLE_OBJECT_METHODS()
32+
SET_BINDING_ID(diagnostics_channel_binding_data)
33+
34+
void MemoryInfo(MemoryTracker* tracker) const override;
35+
SET_SELF_SIZE(BindingData)
36+
SET_MEMORY_INFO_NAME(BindingData)
37+
38+
AliasedUint32Array subscribers_;
39+
40+
uint32_t next_channel_index_ = 0;
41+
std::unordered_map<std::string, uint32_t> channel_indices_;
42+
43+
uint32_t GetOrCreateChannelIndex(const std::string& name);
44+
45+
v8::Global<v8::Function> publish_callback_;
46+
47+
static void GetOrCreateChannelIndex(
48+
const v8::FunctionCallbackInfo<v8::Value>& args);
49+
static void SetPublishCallback(
50+
const v8::FunctionCallbackInfo<v8::Value>& args);
51+
52+
static void CreatePerIsolateProperties(
53+
IsolateData* isolate_data,
54+
v8::Local<v8::ObjectTemplate> target);
55+
static void CreatePerContextProperties(
56+
v8::Local<v8::Object> target,
57+
v8::Local<v8::Value> unused,
58+
v8::Local<v8::Context> context,
59+
void* priv);
60+
static void RegisterExternalReferences(
61+
ExternalReferenceRegistry* registry);
62+
63+
private:
64+
InternalFieldInfo* internal_field_info_ = nullptr;
65+
};
66+
67+
class Channel {
68+
public:
69+
static Channel Get(Environment* env, const char* name);
70+
71+
inline bool HasSubscribers() const {
72+
return binding_data_ != nullptr && binding_data_->subscribers_[index_] > 0;
73+
}
74+
75+
void Publish(Environment* env, v8::Local<v8::Value> message) const;
76+
77+
private:
78+
Channel(BindingData* binding_data, uint32_t index, const char* name);
79+
80+
BindingData* binding_data_;
81+
uint32_t index_;
82+
const char* name_;
83+
};
84+
85+
} // namespace diagnostics_channel
86+
} // namespace node
87+
88+
#endif // defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS
89+
90+
#endif // SRC_NODE_DIAGNOSTICS_CHANNEL_H_

src/node_external_reference.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@ class ExternalReferenceRegistry {
7171
V(config) \
7272
V(contextify) \
7373
V(credentials) \
74+
V(diagnostics_channel) \
7475
V(encoding_binding) \
7576
V(env_var) \
7677
V(errors) \

src/node_snapshotable.cc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
#include "node_blob.h"
1515
#include "node_builtins.h"
1616
#include "node_contextify.h"
17+
#include "node_diagnostics_channel.h"
1718
#include "node_errors.h"
1819
#include "node_external_reference.h"
1920
#include "node_file.h"

0 commit comments

Comments
 (0)