Skip to content

Commit f7a1974

Browse files
author
Brendan DeBeasi
committed
fix: prevent SSE stream memory leaks on client disconnect
- Add cleanup() guard with done flag to prevent double-cleanup - await stream.writeSSE() calls and catch errors to trigger cleanup - Unsubscribe Bus and GlobalBus listeners on abort or write failure - Clear heartbeat interval in all exit paths - Add Bus.debug() subscription count introspection - Add GET /debug/memory endpoint for runtime memory diagnostics
1 parent 5acfdd1 commit f7a1974

3 files changed

Lines changed: 87 additions & 39 deletions

File tree

packages/opencode/src/bus/index.ts

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,4 +102,12 @@ export namespace Bus {
102102
match.splice(index, 1)
103103
}
104104
}
105+
106+
export function debug() {
107+
const counts: Record<string, number> = {}
108+
for (const [type, subs] of state().subscriptions) {
109+
counts[type] = subs.length
110+
}
111+
return { subscriptions: counts }
112+
}
105113
}

packages/opencode/src/server/routes/global.ts

Lines changed: 38 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -69,40 +69,59 @@ export const GlobalRoutes = lazy(() =>
6969
c.header("X-Accel-Buffering", "no")
7070
c.header("X-Content-Type-Options", "nosniff")
7171
return streamSSE(c, async (stream) => {
72-
stream.writeSSE({
72+
await stream.writeSSE({
7373
data: JSON.stringify({
7474
payload: {
7575
type: "server.connected",
7676
properties: {},
7777
},
7878
}),
7979
})
80+
81+
let done = false
82+
let resolveWait: () => void = () => {}
83+
let heartbeat: ReturnType<typeof setInterval> | undefined
84+
85+
const cleanup = () => {
86+
if (done) return
87+
done = true
88+
clearInterval(heartbeat)
89+
GlobalBus.off("event", handler)
90+
resolveWait()
91+
log.info("global event disconnected")
92+
}
93+
8094
async function handler(event: any) {
81-
await stream.writeSSE({
82-
data: JSON.stringify(event),
83-
})
95+
if (done) return
96+
try {
97+
await stream.writeSSE({ data: JSON.stringify(event) })
98+
} catch {
99+
cleanup()
100+
}
84101
}
102+
85103
GlobalBus.on("event", handler)
86104

87105
// Send heartbeat every 10s to prevent stalled proxy streams.
88-
const heartbeat = setInterval(() => {
89-
stream.writeSSE({
90-
data: JSON.stringify({
91-
payload: {
92-
type: "server.heartbeat",
93-
properties: {},
94-
},
95-
}),
96-
})
106+
heartbeat = setInterval(async () => {
107+
if (done) return
108+
try {
109+
await stream.writeSSE({
110+
data: JSON.stringify({
111+
payload: {
112+
type: "server.heartbeat",
113+
properties: {},
114+
},
115+
}),
116+
})
117+
} catch {
118+
cleanup()
119+
}
97120
}, 10_000)
98121

99122
await new Promise<void>((resolve) => {
100-
stream.onAbort(() => {
101-
clearInterval(heartbeat)
102-
GlobalBus.off("event", handler)
103-
resolve()
104-
log.info("global event disconnected")
105-
})
123+
resolveWait = resolve
124+
stream.onAbort(cleanup)
106125
})
107126
})
108127
},

packages/opencode/src/server/server.ts

Lines changed: 41 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import { BusEvent } from "@/bus/bus-event"
22
import { Bus } from "@/bus"
3+
import { GlobalBus } from "@/bus/global"
34
import { Log } from "../util/log"
45
import { describeRoute, generateSpecs, validator, resolver, openAPIRouteHandler } from "hono-openapi"
56
import { Hono } from "hono"
@@ -516,38 +517,58 @@ export namespace Server {
516517
c.header("X-Accel-Buffering", "no")
517518
c.header("X-Content-Type-Options", "nosniff")
518519
return streamSSE(c, async (stream) => {
519-
stream.writeSSE({
520+
await stream.writeSSE({
520521
data: JSON.stringify({
521522
type: "server.connected",
522523
properties: {},
523524
}),
524525
})
525-
const unsub = Bus.subscribeAll(async (event) => {
526-
await stream.writeSSE({
527-
data: JSON.stringify(event),
528-
})
529-
if (event.type === Bus.InstanceDisposed.type) {
530-
stream.close()
526+
527+
let done = false
528+
let unsub: () => void = () => {}
529+
let resolveWait: () => void = () => {}
530+
let heartbeat: ReturnType<typeof setInterval> | undefined
531+
532+
const cleanup = () => {
533+
if (done) return
534+
done = true
535+
clearInterval(heartbeat)
536+
unsub()
537+
resolveWait()
538+
log.info("event disconnected")
539+
}
540+
541+
unsub = Bus.subscribeAll(async (event) => {
542+
if (done) return
543+
try {
544+
await stream.writeSSE({ data: JSON.stringify(event) })
545+
if (event.type === Bus.InstanceDisposed.type) {
546+
cleanup()
547+
stream.close()
548+
}
549+
} catch {
550+
cleanup()
531551
}
532552
})
533553

534554
// Send heartbeat every 10s to prevent stalled proxy streams.
535-
const heartbeat = setInterval(() => {
536-
stream.writeSSE({
537-
data: JSON.stringify({
538-
type: "server.heartbeat",
539-
properties: {},
540-
}),
541-
})
555+
heartbeat = setInterval(async () => {
556+
if (done) return
557+
try {
558+
await stream.writeSSE({
559+
data: JSON.stringify({
560+
type: "server.heartbeat",
561+
properties: {},
562+
}),
563+
})
564+
} catch {
565+
cleanup()
566+
}
542567
}, 10_000)
543568

544569
await new Promise<void>((resolve) => {
545-
stream.onAbort(() => {
546-
clearInterval(heartbeat)
547-
unsub()
548-
resolve()
549-
log.info("event disconnected")
550-
})
570+
resolveWait = resolve
571+
stream.onAbort(cleanup)
551572
})
552573
})
553574
},

0 commit comments

Comments
 (0)