Skip to content

Commit 362ff61

Browse files
AgentEnderclaude
andauthored
feat(core): add logging and progress message types to daemon (#35342)
Co-authored-by: Claude <[email protected]>
1 parent c19a8e8 commit 362ff61

15 files changed

Lines changed: 471 additions & 111 deletions

packages/nx/src/daemon/client/client.ts

Lines changed: 34 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,10 @@ import {
9999
POST_TASKS_EXECUTION,
100100
PRE_TASKS_EXECUTION,
101101
} from '../message-types/run-tasks-execution-hooks';
102+
import {
103+
isEmitLogMessage,
104+
isUpdateProgressMessage,
105+
} from '../message-types/streaming-messages';
102106
import {
103107
GET_ESTIMATED_TASK_TIMINGS,
104108
GET_FLAKY_TASKS,
@@ -159,6 +163,11 @@ export class DaemonClient {
159163
private currentMessage;
160164
private currentResolve;
161165
private currentReject;
166+
// Tracks the spinner owned by the in-flight request so streamed
167+
// progress updates are routed to the caller's spinner instead of
168+
// mutating the process-wide globalSpinner (which may belong to an
169+
// unrelated command).
170+
private currentSpinner: DelayedSpinner | null = null;
162171

163172
private _enabled: boolean | undefined;
164173
private _daemonStatus: DaemonStatus = DaemonStatus.DISCONNECTED;
@@ -307,6 +316,7 @@ export class DaemonClient {
307316
'Calculating the project graph on the Nx Daemon is taking longer than expected. Re-run with NX_DAEMON=false to see more details.',
308317
{ ciDelay: 60_000, delay: 30_000 }
309318
);
319+
this.currentSpinner = spinner;
310320
try {
311321
const response = await this.sendToDaemonViaQueue({
312322
type: 'REQUEST_PROJECT_GRAPH',
@@ -323,6 +333,7 @@ export class DaemonClient {
323333
}
324334
} finally {
325335
spinner?.cleanup();
336+
this.currentSpinner = null;
326337
}
327338
}
328339

@@ -760,9 +771,9 @@ export class DaemonClient {
760771
type: 'PROCESS_IN_BACKGROUND',
761772
requirePath,
762773
data,
763-
// This method is sometimes passed data that cannot be serialized with v8
764-
// so we force JSON serialization here
765774
},
775+
// This method is sometimes passed data that cannot be serialized with v8
776+
// so we force JSON serialization here
766777
'json'
767778
);
768779
}
@@ -1032,11 +1043,15 @@ export class DaemonClient {
10321043

10331044
private async sendToDaemonViaQueue<T extends DaemonMessage>(
10341045
messageToDaemon: T,
1035-
force?: 'v8' | 'json'
1046+
parser?: 'v8' | 'json'
10361047
): Promise<any> {
1037-
return this.queue.sendToQueue(() =>
1038-
this.sendMessageToDaemon(messageToDaemon, force)
1039-
);
1048+
return this.queue.sendToQueue(async () => {
1049+
// Set currentSpinner inside the queued function so it's only
1050+
// active while this specific message is in flight — preventing
1051+
// concurrent callers from overwriting each other's spinner
1052+
// reference before their turn arrives.
1053+
return await this.sendMessageToDaemon(messageToDaemon, parser);
1054+
});
10401055
}
10411056

10421057
private setUpConnection() {
@@ -1259,6 +1274,19 @@ export class DaemonClient {
12591274
'result-parse-start-' + this.currentMessage.type,
12601275
'result-parse-end-' + this.currentMessage.type
12611276
);
1277+
// Streaming messages fire side-effects on the client but do not
1278+
// resolve the pending request promise — the daemon can push several
1279+
// of these before finally sending the real response. Progress
1280+
// updates route through the in-flight request's own spinner so
1281+
// we don't stomp on unrelated commands' spinner text.
1282+
if (isUpdateProgressMessage(parsedResult)) {
1283+
this.currentSpinner?.setMessage(parsedResult.message);
1284+
return;
1285+
}
1286+
if (isEmitLogMessage(parsedResult)) {
1287+
console[parsedResult.level](parsedResult.message);
1288+
return;
1289+
}
12621290
if (parsedResult.error) {
12631291
this.currentReject(parsedResult.error);
12641292
} else {

packages/nx/src/daemon/logger.ts

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,11 +11,14 @@
1111
*/
1212

1313
import { appendFileSync, existsSync, mkdirSync } from 'fs';
14+
import { ProgressTopic } from '../utils/progress-topics';
15+
import { nxVersion } from '../utils/versions';
16+
import { EmitLogLevel } from './message-types/streaming-messages';
17+
import { sendEmitLogMessageToTopic } from './server/client-socket-context';
1418
import {
1519
DAEMON_DIR_FOR_CURRENT_WORKSPACE,
1620
DAEMON_OUTPUT_LOG_FILE,
1721
} from './tmp-dir';
18-
import { nxVersion } from '../utils/versions';
1922

2023
type LogSource = 'Server' | 'Client';
2124

@@ -51,6 +54,25 @@ class DaemonLogger {
5154
this.log(`[WATCHER]: ${s.join(' ')}`);
5255
}
5356

57+
/**
58+
* Broadcasts a log line to every client currently subscribed to the
59+
* given topic. Useful for warnings raised inside daemon-executed code
60+
* that we want the user to see in their terminal rather than lose to
61+
* the daemon log file.
62+
*
63+
* Falls back to writing into the daemon log when no clients are
64+
* subscribed to the topic.
65+
*
66+
* Must only be invoked from inside the Nx daemon process.
67+
*/
68+
logToClient(
69+
topic: ProgressTopic,
70+
message: string,
71+
level: EmitLogLevel = 'log'
72+
) {
73+
sendEmitLogMessageToTopic(topic, message, level);
74+
}
75+
5476
private writeToFile(message: string) {
5577
try {
5678
if (!existsSync(DAEMON_DIR_FOR_CURRENT_WORKSPACE)) {
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
export const UPDATE_PROGRESS_MESSAGE = 'UPDATE_PROGRESS_MESSAGE' as const;
2+
3+
export type UpdateProgressMessage = {
4+
type: typeof UPDATE_PROGRESS_MESSAGE;
5+
message: string;
6+
};
7+
8+
export function isUpdateProgressMessage(
9+
message: unknown
10+
): message is UpdateProgressMessage {
11+
return (
12+
typeof message === 'object' &&
13+
message !== null &&
14+
'type' in message &&
15+
message['type'] === UPDATE_PROGRESS_MESSAGE
16+
);
17+
}
18+
19+
export const EMIT_LOG = 'EMIT_LOG' as const;
20+
21+
export type EmitLogLevel = 'log' | 'warn' | 'error';
22+
23+
export type EmitLogMessage = {
24+
type: typeof EMIT_LOG;
25+
level: EmitLogLevel;
26+
message: string;
27+
};
28+
29+
export function isEmitLogMessage(message: unknown): message is EmitLogMessage {
30+
return (
31+
typeof message === 'object' &&
32+
message !== null &&
33+
'type' in message &&
34+
message['type'] === EMIT_LOG
35+
);
36+
}
Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
import type { Socket } from 'net';
2+
import { MESSAGE_END_SEQ } from '../../utils/consume-messages-from-socket';
3+
import { ProgressTopic } from '../../utils/progress-topics';
4+
import { isOnDaemon } from '../is-on-daemon';
5+
import { serverLogger } from '../logger';
6+
import {
7+
EMIT_LOG,
8+
EmitLogLevel,
9+
EmitLogMessage,
10+
UPDATE_PROGRESS_MESSAGE,
11+
} from '../message-types/streaming-messages';
12+
import { serialize } from '../socket-utils';
13+
14+
const topicSubscribers = new Map<ProgressTopic, Set<Socket>>();
15+
16+
export function subscribeClientToTopic(
17+
socket: Socket,
18+
topic: ProgressTopic
19+
): void {
20+
let subscribers = getTopicSubscribers(topic);
21+
if (!subscribers) {
22+
subscribers = new Set();
23+
topicSubscribers.set(topic, subscribers);
24+
}
25+
subscribers.add(socket);
26+
}
27+
28+
export function unsubscribeClientFromTopic(
29+
socket: Socket,
30+
topic: ProgressTopic
31+
): void {
32+
const subscribers = getTopicSubscribers(topic);
33+
if (!subscribers) return;
34+
subscribers.delete(socket);
35+
}
36+
37+
export function getTopicSubscribers(topic: ProgressTopic): Set<Socket> {
38+
const subscribers = topicSubscribers.get(topic);
39+
if (!subscribers) {
40+
const set = new Set<Socket>();
41+
topicSubscribers.set(topic, set);
42+
return set;
43+
}
44+
return subscribers;
45+
}
46+
47+
export function assertOnDaemon(helperName: string) {
48+
if (!isOnDaemon()) {
49+
throw new Error(
50+
`${helperName} can only be called from the Nx daemon process.`
51+
);
52+
}
53+
}
54+
55+
/**
56+
* Writes a streaming message over the given socket using the daemon's
57+
* configured serialization format and terminated with MESSAGE_END_SEQ.
58+
* Errors are logged to the daemon's stdout (redirected to the daemon
59+
* log) rather than propagated — a disconnected client shouldn't tear
60+
* down the current request handler or other subscribers.
61+
*/
62+
export function writeStreamingMessage(
63+
socket: Socket,
64+
payload: unknown,
65+
description: string
66+
) {
67+
try {
68+
serverLogger.log('Streaming message to client:', description);
69+
socket.write(serialize(payload) + MESSAGE_END_SEQ, (err) => {
70+
if (err) {
71+
console.log(
72+
`Streaming message write error (client likely disconnected): ${err.message}`
73+
);
74+
}
75+
});
76+
} catch (e) {
77+
console.log(
78+
`Failed to send streaming message to client: ${
79+
e instanceof Error ? e.message : String(e)
80+
}`
81+
);
82+
}
83+
}
84+
85+
/**
86+
* Broadcasts a progress message to every client currently subscribed to
87+
* the given topic. No-op when there are no subscribers.
88+
*
89+
* Must only be invoked from inside the Nx daemon process.
90+
*/
91+
export function sendProgressMessageToTopic(
92+
topic: ProgressTopic,
93+
message: string
94+
): void {
95+
assertOnDaemon('sendProgressMessageToTopic');
96+
const subscribers = getTopicSubscribers(topic);
97+
if (!subscribers?.size) return;
98+
const payload = { type: UPDATE_PROGRESS_MESSAGE, message };
99+
for (const socket of subscribers) {
100+
writeStreamingMessage(
101+
socket,
102+
payload,
103+
'progress update for topic ' + topic
104+
);
105+
}
106+
}
107+
108+
export function sendEmitLogMessageToTopic(
109+
topic: ProgressTopic,
110+
message: string,
111+
level: EmitLogLevel
112+
): void {
113+
assertOnDaemon('sendEmitLogMessageToTopic');
114+
const subscribers = getTopicSubscribers(topic);
115+
if (!subscribers?.size) return;
116+
const payload: EmitLogMessage = { type: EMIT_LOG, message, level };
117+
for (const socket of subscribers) {
118+
writeStreamingMessage(socket, payload, 'emit log message to ' + topic);
119+
}
120+
}

packages/nx/src/daemon/server/handle-request-project-graph.ts

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,18 @@
1+
import { Socket } from 'net';
12
import { performance } from 'perf_hooks';
23
import { serializeResult } from '../socket-utils';
34
import { serverLogger } from '../logger';
45
import { getCachedSerializedProjectGraphPromise } from './project-graph-incremental-recomputation';
56
import { HandlerResult } from './server';
67

7-
export async function handleRequestProjectGraph(): Promise<HandlerResult> {
8+
export async function handleRequestProjectGraph(
9+
socket: Socket
10+
): Promise<HandlerResult> {
811
try {
912
performance.mark('server-connection');
1013
serverLogger.requestLog('Client Request for Project Graph Received');
1114

12-
const result = await getCachedSerializedProjectGraphPromise();
15+
const result = await getCachedSerializedProjectGraphPromise(socket);
1316
if (result.error) {
1417
return {
1518
description: `Error when preparing serialized project graph.`,

packages/nx/src/daemon/server/project-graph-incremental-recomputation.ts

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import { Socket } from 'net';
12
import { performance } from 'perf_hooks';
23
import { readNxJson } from '../../config/nx-json';
34
import {
@@ -38,6 +39,11 @@ import {
3839
} from '../../utils/workspace-context';
3940
import { workspaceRoot } from '../../utils/workspace-root';
4041
import { serverLogger } from '../logger';
42+
import { ProgressTopics } from '../../utils/progress-topics';
43+
import {
44+
subscribeClientToTopic,
45+
unsubscribeClientFromTopic,
46+
} from './client-socket-context';
4147
import { notifyFileChangeListeners } from './file-watching/file-change-events';
4248
import { notifyFileWatcherSockets } from './file-watching/file-watcher-sockets';
4349
import { notifyProjectGraphListenerSockets } from './project-graph-listener-sockets';
@@ -84,7 +90,16 @@ let knownExternalNodes: Record<string, ProjectGraphExternalNode> = {};
8490
let fileChangeCounter = 0;
8591
let recomputationGeneration = 0;
8692

87-
export async function getCachedSerializedProjectGraphPromise(): Promise<SerializedProjectGraph> {
93+
export async function getCachedSerializedProjectGraphPromise(
94+
socket?: Socket
95+
): Promise<SerializedProjectGraph> {
96+
// Subscribe the requesting client to the graph-construction topic
97+
// for the duration of the await, so in-flight progress/log messages
98+
// — including those produced by a recomputation that was already
99+
// started before this caller arrived — are broadcast to them.
100+
if (socket) {
101+
subscribeClientToTopic(socket, ProgressTopics.GraphConstruction);
102+
}
88103
try {
89104
let wasScheduled = false;
90105
// recomputing it now on demand. we can ignore the scheduled timeout
@@ -180,6 +195,10 @@ export async function getCachedSerializedProjectGraphPromise(): Promise<Serializ
180195
allWorkspaceFiles: null,
181196
rustReferences: null,
182197
};
198+
} finally {
199+
if (socket) {
200+
unsubscribeClientFromTopic(socket, ProgressTopics.GraphConstruction);
201+
}
183202
}
184203
}
185204

packages/nx/src/daemon/server/server.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -282,7 +282,7 @@ async function handleMessage(socket: Socket, data: string) {
282282
await handleResult(
283283
socket,
284284
'REQUEST_PROJECT_GRAPH',
285-
() => handleRequestProjectGraph(),
285+
() => handleRequestProjectGraph(socket),
286286
mode
287287
);
288288
} else if (payload.type === 'HASH_TASKS') {

0 commit comments

Comments
 (0)