Skip to content

Commit e4aedda

Browse files
committed
refactor: simplify instance store concurrency
1 parent 6eb33c8 commit e4aedda

2 files changed

Lines changed: 264 additions & 76 deletions

File tree

packages/opencode/src/project/instance-store.ts

Lines changed: 115 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,7 @@ import { WorkspaceContext } from "@/control-plane/workspace-context"
33
import { disposeInstance } from "@/effect/instance-registry"
44
import { makeRuntime } from "@/effect/run-service"
55
import { AppFileSystem } from "@opencode-ai/core/filesystem"
6-
import * as Log from "@opencode-ai/core/util/log"
7-
import { Context, Effect, Layer } from "effect"
8-
import { iife } from "@/util/iife"
6+
import { Context, Deferred, Effect, Exit, Layer, Scope } from "effect"
97
import { context, type InstanceContext } from "./instance-context"
108
import * as Project from "./project"
119

@@ -25,13 +23,18 @@ export interface Interface {
2523

2624
export class Service extends Context.Service<Service, Interface>()("@opencode/InstanceStore") {}
2725

26+
interface Entry {
27+
readonly deferred: Deferred.Deferred<InstanceContext>
28+
}
29+
2830
export const layer: Layer.Layer<Service, never, Project.Service> = Layer.effect(
2931
Service,
3032
Effect.gen(function* () {
3133
const project = yield* Project.Service
32-
const cache = new Map<string, Promise<InstanceContext>>()
34+
const scope = yield* Scope.Scope
35+
const cache = new Map<string, Entry>()
3336
const disposal = {
34-
all: undefined as Promise<void> | undefined,
37+
all: undefined as Deferred.Deferred<void> | undefined,
3538
}
3639

3740
const boot = Effect.fn("InstanceStore.boot")(function* (input: LoadInput & { directory: string }) {
@@ -54,91 +57,128 @@ export const layer: Layer.Layer<Service, never, Project.Service> = Layer.effect(
5457
return ctx
5558
})
5659

57-
function track(directory: string, next: Promise<InstanceContext>) {
58-
const task = next.catch((error) => {
59-
if (cache.get(directory) === task) cache.delete(directory)
60-
throw error
60+
const removeEntry = (directory: string, entry: Entry) =>
61+
Effect.sync(() => {
62+
if (cache.get(directory) !== entry) return false
63+
cache.delete(directory)
64+
return true
6165
})
62-
cache.set(directory, task)
63-
return task
64-
}
66+
67+
const completeLoad = Effect.fnUntraced(function* (directory: string, input: LoadInput, entry: Entry) {
68+
const exit = yield* Effect.exit(boot({ ...input, directory }))
69+
if (Exit.isFailure(exit)) yield* removeEntry(directory, entry)
70+
yield* Deferred.done(entry.deferred, exit).pipe(Effect.asVoid)
71+
})
72+
73+
const emitDisposed = (input: { directory: string; project?: string }) =>
74+
Effect.sync(() =>
75+
GlobalBus.emit("event", {
76+
directory: input.directory,
77+
project: input.project,
78+
workspace: WorkspaceContext.workspaceID,
79+
payload: {
80+
type: "server.instance.disposed",
81+
properties: {
82+
directory: input.directory,
83+
},
84+
},
85+
}),
86+
)
87+
88+
const disposeContext = Effect.fn("InstanceStore.disposeContext")(function* (ctx: InstanceContext) {
89+
yield* Effect.logInfo("disposing instance", { directory: ctx.directory })
90+
yield* Effect.promise(() => disposeInstance(ctx.directory))
91+
yield* emitDisposed({ directory: ctx.directory, project: ctx.project.id })
92+
})
93+
94+
const disposeEntry = Effect.fnUntraced(function* (directory: string, entry: Entry, ctx: InstanceContext) {
95+
if (cache.get(directory) !== entry) return false
96+
yield* disposeContext(ctx)
97+
if (cache.get(directory) !== entry) return false
98+
cache.delete(directory)
99+
return true
100+
})
65101

66102
const load = Effect.fn("InstanceStore.load")(function* (input: LoadInput) {
67103
const directory = AppFileSystem.resolve(input.directory)
68-
const existing = cache.get(directory)
69-
if (existing) return yield* Effect.promise(() => existing)
70-
71-
Log.Default.info("creating instance", { directory })
72-
return yield* Effect.promise(() => track(directory, Effect.runPromise(boot({ ...input, directory }))))
104+
return yield* Effect.uninterruptibleMask((restore) =>
105+
Effect.gen(function* () {
106+
const existing = cache.get(directory)
107+
if (existing) return yield* restore(Deferred.await(existing.deferred))
108+
109+
const entry: Entry = { deferred: Deferred.makeUnsafe<InstanceContext>() }
110+
cache.set(directory, entry)
111+
yield* Effect.gen(function* () {
112+
yield* Effect.logInfo("creating instance", { directory })
113+
yield* completeLoad(directory, input, entry)
114+
}).pipe(Effect.forkIn(scope, { startImmediately: true }))
115+
return yield* restore(Deferred.await(entry.deferred))
116+
}),
117+
)
73118
})
74119

75120
const reload = Effect.fn("InstanceStore.reload")(function* (input: LoadInput) {
76121
const directory = AppFileSystem.resolve(input.directory)
77-
Log.Default.info("reloading instance", { directory })
78-
yield* Effect.promise(() => disposeInstance(directory))
79-
cache.delete(directory)
80-
const next = track(directory, Effect.runPromise(boot({ ...input, directory })))
81-
82-
GlobalBus.emit("event", {
83-
directory,
84-
project: input.project?.id,
85-
workspace: WorkspaceContext.workspaceID,
86-
payload: {
87-
type: "server.instance.disposed",
88-
properties: {
89-
directory,
90-
},
91-
},
92-
})
93-
94-
return yield* Effect.promise(() => next)
122+
return yield* Effect.uninterruptibleMask((restore) =>
123+
Effect.gen(function* () {
124+
const previous = cache.get(directory)
125+
const entry: Entry = { deferred: Deferred.makeUnsafe<InstanceContext>() }
126+
cache.set(directory, entry)
127+
yield* Effect.gen(function* () {
128+
yield* Effect.logInfo("reloading instance", { directory })
129+
if (previous) yield* Deferred.await(previous.deferred).pipe(Effect.exit, Effect.asVoid)
130+
yield* Effect.promise(() => disposeInstance(directory))
131+
yield* emitDisposed({ directory, project: input.project?.id })
132+
yield* completeLoad(directory, input, entry)
133+
}).pipe(Effect.forkIn(scope, { startImmediately: true }))
134+
return yield* restore(Deferred.await(entry.deferred))
135+
}),
136+
)
95137
})
96138

97139
const dispose = Effect.fn("InstanceStore.dispose")(function* (ctx: InstanceContext) {
98-
Log.Default.info("disposing instance", { directory: ctx.directory })
99-
yield* Effect.promise(() => disposeInstance(ctx.directory))
100-
cache.delete(ctx.directory)
101-
102-
GlobalBus.emit("event", {
103-
directory: ctx.directory,
104-
project: ctx.project.id,
105-
workspace: WorkspaceContext.workspaceID,
106-
payload: {
107-
type: "server.instance.disposed",
108-
properties: {
109-
directory: ctx.directory,
110-
},
111-
},
112-
})
140+
const entry = cache.get(ctx.directory)
141+
if (!entry) return yield* disposeContext(ctx)
142+
143+
const exit = yield* Deferred.await(entry.deferred).pipe(Effect.exit)
144+
if (Exit.isFailure(exit)) return yield* removeEntry(ctx.directory, entry).pipe(Effect.asVoid)
145+
if (exit.value !== ctx) return
146+
yield* disposeEntry(ctx.directory, entry, ctx).pipe(Effect.asVoid)
113147
})
114148

115149
const disposeAll = Effect.fn("InstanceStore.disposeAll")(function* () {
116-
if (disposal.all) return yield* Effect.promise(() => disposal.all!)
117-
118-
disposal.all = iife(async () => {
119-
Log.Default.info("disposing all instances")
120-
const entries = [...cache.entries()]
121-
for (const [key, value] of entries) {
122-
if (cache.get(key) !== value) continue
123-
124-
const ctx = await value.catch((error) => {
125-
Log.Default.warn("instance dispose failed", { key, error })
126-
return undefined
127-
})
128-
129-
if (!ctx) {
130-
if (cache.get(key) === value) cache.delete(key)
131-
continue
150+
return yield* Effect.uninterruptibleMask((restore) =>
151+
Effect.gen(function* () {
152+
const existing = disposal.all
153+
if (existing) return yield* restore(Deferred.await(existing))
154+
155+
const done = Deferred.makeUnsafe<void>()
156+
const entries = [...cache.entries()]
157+
disposal.all = done
158+
const exit = yield* Effect.gen(function* () {
159+
yield* Effect.logInfo("disposing all instances")
160+
yield* Effect.forEach(
161+
entries,
162+
(item) =>
163+
Effect.gen(function* () {
164+
const exit = yield* Deferred.await(item[1].deferred).pipe(Effect.exit)
165+
if (Exit.isFailure(exit)) {
166+
yield* Effect.logWarning("instance dispose failed", { key: item[0], cause: exit.cause })
167+
yield* removeEntry(item[0], item[1])
168+
return
169+
}
170+
yield* disposeEntry(item[0], item[1], exit.value)
171+
}),
172+
{ discard: true },
173+
)
174+
}).pipe(Effect.exit)
175+
yield* Deferred.done(done, exit).pipe(Effect.asVoid)
176+
if (disposal.all === done) {
177+
disposal.all = undefined
132178
}
133-
134-
if (cache.get(key) !== value) continue
135-
await Effect.runPromise(dispose(ctx))
136-
}
137-
}).finally(() => {
138-
disposal.all = undefined
139-
})
140-
141-
return yield* Effect.promise(() => disposal.all!)
179+
return yield* restore(Deferred.await(done))
180+
}),
181+
)
142182
})
143183

144184
yield* Effect.addFinalizer(() => disposeAll().pipe(Effect.ignore))

packages/opencode/test/project/instance.test.ts

Lines changed: 149 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import { afterEach, describe, expect } from "bun:test"
22
import { CrossSpawnSpawner } from "@opencode-ai/core/cross-spawn-spawner"
3-
import { Effect, Layer } from "effect"
3+
import { Effect, Fiber, Layer } from "effect"
4+
import { registerDisposer } from "../../src/effect/instance-registry"
45
import { Instance } from "../../src/project/instance"
56
import { InstanceStore } from "../../src/project/instance-store"
67
import { tmpdirScoped } from "../fixture/fixture"
@@ -67,6 +68,153 @@ describe("InstanceStore", () => {
6768
}),
6869
)
6970

71+
it.live("dedupes concurrent loads while init is in flight", () =>
72+
Effect.gen(function* () {
73+
const dir = yield* tmpdirScoped({ git: true })
74+
const store = yield* InstanceStore.Service
75+
const started = Promise.withResolvers<void>()
76+
const release = Promise.withResolvers<void>()
77+
let initialized = 0
78+
79+
const first = yield* store
80+
.load({
81+
directory: dir,
82+
init: async () => {
83+
initialized++
84+
started.resolve()
85+
await release.promise
86+
},
87+
})
88+
.pipe(Effect.forkScoped)
89+
90+
yield* Effect.promise(() => started.promise)
91+
92+
const second = yield* store
93+
.load({
94+
directory: dir,
95+
init: async () => {
96+
initialized++
97+
},
98+
})
99+
.pipe(Effect.forkScoped)
100+
101+
expect(initialized).toBe(1)
102+
release.resolve()
103+
104+
const [firstCtx, secondCtx] = yield* Effect.all([Fiber.join(first), Fiber.join(second)])
105+
expect(secondCtx).toBe(firstCtx)
106+
expect(initialized).toBe(1)
107+
}),
108+
)
109+
110+
it.live("removes failed loads from the cache", () =>
111+
Effect.gen(function* () {
112+
const dir = yield* tmpdirScoped({ git: true })
113+
const store = yield* InstanceStore.Service
114+
let attempts = 0
115+
116+
const failed = yield* store
117+
.load({
118+
directory: dir,
119+
init: async () => {
120+
attempts++
121+
throw new Error("init failed")
122+
},
123+
})
124+
.pipe(
125+
Effect.as(false),
126+
Effect.catchCause(() => Effect.succeed(true)),
127+
)
128+
129+
expect(failed).toBe(true)
130+
131+
const ctx = yield* store.load({
132+
directory: dir,
133+
init: async () => {
134+
attempts++
135+
},
136+
})
137+
138+
expect(ctx.directory).toBe(dir)
139+
expect(attempts).toBe(2)
140+
}),
141+
)
142+
143+
it.live("reload replaces the cached context", () =>
144+
Effect.gen(function* () {
145+
const dir = yield* tmpdirScoped({ git: true })
146+
const store = yield* InstanceStore.Service
147+
148+
const first = yield* store.load({ directory: dir })
149+
const second = yield* store.reload({ directory: dir })
150+
const cached = yield* store.load({ directory: dir })
151+
152+
expect(second).not.toBe(first)
153+
expect(cached).toBe(second)
154+
}),
155+
)
156+
157+
it.live("stale dispose does not delete an in-flight reload", () =>
158+
Effect.gen(function* () {
159+
const dir = yield* tmpdirScoped({ git: true })
160+
const store = yield* InstanceStore.Service
161+
const reloading = Promise.withResolvers<void>()
162+
const releaseReload = Promise.withResolvers<void>()
163+
const disposed: Array<string> = []
164+
const off = registerDisposer(async (directory) => {
165+
disposed.push(directory)
166+
})
167+
yield* Effect.addFinalizer(() => Effect.sync(off))
168+
169+
const first = yield* store.load({ directory: dir })
170+
const reload = yield* store
171+
.reload({
172+
directory: dir,
173+
init: async () => {
174+
reloading.resolve()
175+
await releaseReload.promise
176+
},
177+
})
178+
.pipe(Effect.forkScoped)
179+
180+
yield* Effect.promise(() => reloading.promise)
181+
const staleDispose = yield* store.dispose(first).pipe(Effect.forkScoped)
182+
releaseReload.resolve()
183+
184+
const second = yield* Fiber.join(reload)
185+
yield* Fiber.join(staleDispose)
186+
187+
expect(disposed).toEqual([dir])
188+
expect(yield* store.load({ directory: dir })).toBe(second)
189+
}),
190+
)
191+
192+
it.live("dedupes concurrent disposeAll calls", () =>
193+
Effect.gen(function* () {
194+
const dir = yield* tmpdirScoped({ git: true })
195+
const store = yield* InstanceStore.Service
196+
const disposing = Promise.withResolvers<void>()
197+
const releaseDispose = Promise.withResolvers<void>()
198+
const disposed: Array<string> = []
199+
const off = registerDisposer(async (directory) => {
200+
disposed.push(directory)
201+
disposing.resolve()
202+
await releaseDispose.promise
203+
})
204+
yield* Effect.addFinalizer(() => Effect.sync(off))
205+
206+
yield* store.load({ directory: dir })
207+
const first = yield* store.disposeAll().pipe(Effect.forkScoped)
208+
yield* Effect.promise(() => disposing.promise)
209+
const second = yield* store.disposeAll().pipe(Effect.forkScoped)
210+
211+
expect(disposed).toEqual([dir])
212+
releaseDispose.resolve()
213+
yield* Effect.all([Fiber.join(first), Fiber.join(second)])
214+
expect(disposed).toEqual([dir])
215+
}),
216+
)
217+
70218
it.live("keeps Instance.provide as the legacy ALS wrapper", () =>
71219
Effect.gen(function* () {
72220
const dir = yield* tmpdirScoped({ git: true })

0 commit comments

Comments
 (0)