Skip to content

Commit 6df6a8c

Browse files
sebishogunclaude
andcommitted
fix: resolve multiple memory leaks in bus subscriptions, compaction, and state cleanup
- Return unsub from Bus.once() so callers can clean up - Store and call unsubscribe handles in Share, ShareNext, Format, Plugin, Bootstrap - Return unsub from subscribeSessionEvents in github command and call it in finally block - Actually clear tool output and attachments during compaction instead of just marking timestamp - Add dispose callback to FileTime state so it gets cleaned on instance disposal - Clear ShareNext queue timeouts on dispose and catch failed sync fetches Co-Authored-By: Claude Opus 4.6 <[email protected]>
1 parent 9b23130 commit 6df6a8c

8 files changed

Lines changed: 91 additions & 57 deletions

File tree

packages/opencode/src/bus/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@ export namespace Bus {
8080
const unsub = subscribe(def, (event) => {
8181
if (callback(event)) unsub()
8282
})
83+
return unsub
8384
}
8485

8586
export function subscribeAll(callback: (event: any) => void) {

packages/opencode/src/cli/cmd/github.ts

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -468,6 +468,7 @@ export const GithubRunCommand = cmd({
468468
let session: { id: string; title: string; version: string }
469469
let shareId: string | undefined
470470
let exitCode = 0
471+
let unsubSessionEvents: (() => void) | undefined
471472
type PromptFiles = Awaited<ReturnType<typeof getUserPrompt>>["promptFiles"]
472473
const triggerCommentId = isCommentEvent
473474
? (payload as IssueCommentEvent | PullRequestReviewCommentEvent).comment.id
@@ -518,7 +519,7 @@ export const GithubRunCommand = cmd({
518519
},
519520
],
520521
})
521-
subscribeSessionEvents()
522+
unsubSessionEvents = subscribeSessionEvents()
522523
shareId = await (async () => {
523524
if (share === false) return
524525
if (!share && repoData.data.private) return
@@ -633,6 +634,7 @@ export const GithubRunCommand = cmd({
633634
// Also output the clean error message for the action to capture
634635
//core.setOutput("prepare_error", e.message);
635636
} finally {
637+
unsubSessionEvents?.()
636638
if (!useGithubToken) {
637639
await restoreGitConfig()
638640
await revokeAppToken()
@@ -829,7 +831,7 @@ export const GithubRunCommand = cmd({
829831
}
830832

831833
let text = ""
832-
Bus.subscribe(MessageV2.Event.PartUpdated, async (evt) => {
834+
return Bus.subscribe(MessageV2.Event.PartUpdated, async (evt) => {
833835
if (evt.properties.part.sessionID !== session.id) return
834836
//if (evt.properties.part.messageID === messageID) return
835837
const part = evt.properties.part

packages/opencode/src/file/time.ts

Lines changed: 19 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -8,18 +8,26 @@ export namespace FileTime {
88
// All tools that overwrite existing files should run their
99
// assert/read/write/update sequence inside withLock(filepath, ...)
1010
// so concurrent writes to the same file are serialized.
11-
export const state = Instance.state(() => {
12-
const read: {
13-
[sessionID: string]: {
14-
[path: string]: Date | undefined
11+
export const state = Instance.state(
12+
() => {
13+
const read: {
14+
[sessionID: string]: {
15+
[path: string]: Date | undefined
16+
}
17+
} = {}
18+
const locks = new Map<string, Promise<void>>()
19+
return {
20+
read,
21+
locks,
1522
}
16-
} = {}
17-
const locks = new Map<string, Promise<void>>()
18-
return {
19-
read,
20-
locks,
21-
}
22-
})
23+
},
24+
async (current) => {
25+
for (const key of Object.keys(current.read)) {
26+
delete current.read[key]
27+
}
28+
current.locks.clear()
29+
},
30+
)
2331

2432
export function read(sessionID: string, file: string) {
2533
log.info("read", { sessionID, file })

packages/opencode/src/format/index.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,9 +100,12 @@ export namespace Format {
100100
return result
101101
}
102102

103+
let unsub: (() => void) | undefined
104+
103105
export function init() {
104106
log.info("init")
105-
Bus.subscribe(File.Event.Edited, async (payload) => {
107+
unsub?.()
108+
unsub = Bus.subscribe(File.Event.Edited, async (payload) => {
106109
const file = payload.properties.file
107110
log.info("formatting", { file })
108111
const ext = path.extname(file)

packages/opencode/src/plugin/index.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -119,14 +119,17 @@ export namespace Plugin {
119119
return state().then((x) => x.hooks)
120120
}
121121

122+
let unsub: (() => void) | undefined
123+
122124
export async function init() {
123125
const hooks = await state().then((x) => x.hooks)
124126
const config = await Config.get()
125127
for (const hook of hooks) {
126128
// @ts-expect-error this is because we haven't moved plugin to sdk v2
127129
await hook.config?.(config)
128130
}
129-
Bus.subscribeAll(async (input) => {
131+
unsub?.()
132+
unsub = Bus.subscribeAll(async (input) => {
130133
const hooks = await state().then((x) => x.hooks)
131134
for (const hook of hooks) {
132135
hook["event"]?.({

packages/opencode/src/project/bootstrap.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@ import { ShareNext } from "@/share/share-next"
1313
import { Snapshot } from "../snapshot"
1414
import { Truncate } from "../tool/truncation"
1515

16+
let unsub: (() => void) | undefined
17+
1618
export async function InstanceBootstrap() {
1719
Log.Default.info("bootstrapping", { directory: Instance.directory })
1820
await Plugin.init()
@@ -25,7 +27,8 @@ export async function InstanceBootstrap() {
2527
Snapshot.init()
2628
Truncate.init()
2729

28-
Bus.subscribe(Command.Event.Executed, async (payload) => {
30+
unsub?.()
31+
unsub = Bus.subscribe(Command.Event.Executed, async (payload) => {
2932
if (payload.properties.name === Command.Default.INIT) {
3033
await Project.setInitialized(Instance.project.id)
3134
}

packages/opencode/src/session/compaction.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,8 @@ export namespace SessionCompaction {
9191
for (const part of toPrune) {
9292
if (part.state.status === "completed") {
9393
part.state.time.compacted = Date.now()
94+
part.state.output = "[Old tool result content cleared]"
95+
part.state.attachments = []
9496
await Session.updatePart(part)
9597
}
9698
}

packages/opencode/src/share/share-next.ts

Lines changed: 53 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -18,52 +18,64 @@ export namespace ShareNext {
1818

1919
const disabled = process.env["OPENCODE_DISABLE_SHARE"] === "true" || process.env["OPENCODE_DISABLE_SHARE"] === "1"
2020

21+
let unsubs: (() => void)[] = []
22+
2123
export async function init() {
2224
if (disabled) return
23-
Bus.subscribe(Session.Event.Updated, async (evt) => {
24-
await sync(evt.properties.info.id, [
25-
{
26-
type: "session",
27-
data: evt.properties.info,
28-
},
29-
])
30-
})
31-
Bus.subscribe(MessageV2.Event.Updated, async (evt) => {
32-
await sync(evt.properties.info.sessionID, [
33-
{
34-
type: "message",
35-
data: evt.properties.info,
36-
},
37-
])
38-
if (evt.properties.info.role === "user") {
25+
dispose()
26+
unsubs.push(
27+
Bus.subscribe(Session.Event.Updated, async (evt) => {
28+
await sync(evt.properties.info.id, [
29+
{
30+
type: "session",
31+
data: evt.properties.info,
32+
},
33+
])
34+
}),
35+
Bus.subscribe(MessageV2.Event.Updated, async (evt) => {
3936
await sync(evt.properties.info.sessionID, [
4037
{
41-
type: "model",
42-
data: [
43-
await Provider.getModel(evt.properties.info.model.providerID, evt.properties.info.model.modelID).then(
44-
(m) => m,
45-
),
46-
],
38+
type: "message",
39+
data: evt.properties.info,
4740
},
4841
])
49-
}
50-
})
51-
Bus.subscribe(MessageV2.Event.PartUpdated, async (evt) => {
52-
await sync(evt.properties.part.sessionID, [
53-
{
54-
type: "part",
55-
data: evt.properties.part,
56-
},
57-
])
58-
})
59-
Bus.subscribe(Session.Event.Diff, async (evt) => {
60-
await sync(evt.properties.sessionID, [
61-
{
62-
type: "session_diff",
63-
data: evt.properties.diff,
64-
},
65-
])
66-
})
42+
if (evt.properties.info.role === "user") {
43+
await sync(evt.properties.info.sessionID, [
44+
{
45+
type: "model",
46+
data: [
47+
await Provider.getModel(evt.properties.info.model.providerID, evt.properties.info.model.modelID).then(
48+
(m) => m,
49+
),
50+
],
51+
},
52+
])
53+
}
54+
}),
55+
Bus.subscribe(MessageV2.Event.PartUpdated, async (evt) => {
56+
await sync(evt.properties.part.sessionID, [
57+
{
58+
type: "part",
59+
data: evt.properties.part,
60+
},
61+
])
62+
}),
63+
Bus.subscribe(Session.Event.Diff, async (evt) => {
64+
await sync(evt.properties.sessionID, [
65+
{
66+
type: "session_diff",
67+
data: evt.properties.diff,
68+
},
69+
])
70+
}),
71+
)
72+
}
73+
74+
export function dispose() {
75+
for (const unsub of unsubs) unsub()
76+
unsubs = []
77+
for (const [, entry] of queue) clearTimeout(entry.timeout)
78+
queue.clear()
6779
}
6880

6981
export async function create(sessionID: string) {
@@ -154,7 +166,7 @@ export namespace ShareNext {
154166
secret: share.secret,
155167
data: Array.from(queued.data.values()),
156168
}),
157-
})
169+
}).catch(() => {})
158170
}, 1000)
159171
queue.set(sessionID, { timeout, data: dataMap })
160172
}

0 commit comments

Comments
 (0)