Skip to content

Commit cdfbb26

Browse files
authored
refactor: collapse bus barrel into bus/index.ts (#22902)
1 parent 610c036 commit cdfbb26

2 files changed

Lines changed: 193 additions & 192 deletions

File tree

packages/opencode/src/bus/bus.ts

Lines changed: 0 additions & 191 deletions
This file was deleted.

packages/opencode/src/bus/index.ts

Lines changed: 193 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1,193 @@
1-
export * as Bus from "./bus"
1+
import z from "zod"
2+
import { Effect, Exit, Layer, PubSub, Scope, Context, Stream } from "effect"
3+
import { EffectBridge } from "@/effect"
4+
import { Log } from "../util"
5+
import { BusEvent } from "./bus-event"
6+
import { GlobalBus } from "./global"
7+
import { InstanceState } from "@/effect"
8+
import { makeRuntime } from "@/effect/run-service"
9+
10+
const log = Log.create({ service: "bus" })
11+
12+
export const InstanceDisposed = BusEvent.define(
13+
"server.instance.disposed",
14+
z.object({
15+
directory: z.string(),
16+
}),
17+
)
18+
19+
type Payload<D extends BusEvent.Definition = BusEvent.Definition> = {
20+
type: D["type"]
21+
properties: z.infer<D["properties"]>
22+
}
23+
24+
type State = {
25+
wildcard: PubSub.PubSub<Payload>
26+
typed: Map<string, PubSub.PubSub<Payload>>
27+
}
28+
29+
export interface Interface {
30+
readonly publish: <D extends BusEvent.Definition>(
31+
def: D,
32+
properties: z.output<D["properties"]>,
33+
) => Effect.Effect<void>
34+
readonly subscribe: <D extends BusEvent.Definition>(def: D) => Stream.Stream<Payload<D>>
35+
readonly subscribeAll: () => Stream.Stream<Payload>
36+
readonly subscribeCallback: <D extends BusEvent.Definition>(
37+
def: D,
38+
callback: (event: Payload<D>) => unknown,
39+
) => Effect.Effect<() => void>
40+
readonly subscribeAllCallback: (callback: (event: any) => unknown) => Effect.Effect<() => void>
41+
}
42+
43+
export class Service extends Context.Service<Service, Interface>()("@opencode/Bus") {}
44+
45+
export const layer = Layer.effect(
46+
Service,
47+
Effect.gen(function* () {
48+
const state = yield* InstanceState.make<State>(
49+
Effect.fn("Bus.state")(function* (ctx) {
50+
const wildcard = yield* PubSub.unbounded<Payload>()
51+
const typed = new Map<string, PubSub.PubSub<Payload>>()
52+
53+
yield* Effect.addFinalizer(() =>
54+
Effect.gen(function* () {
55+
// Publish InstanceDisposed before shutting down so subscribers see it
56+
yield* PubSub.publish(wildcard, {
57+
type: InstanceDisposed.type,
58+
properties: { directory: ctx.directory },
59+
})
60+
yield* PubSub.shutdown(wildcard)
61+
for (const ps of typed.values()) {
62+
yield* PubSub.shutdown(ps)
63+
}
64+
}),
65+
)
66+
67+
return { wildcard, typed }
68+
}),
69+
)
70+
71+
function getOrCreate<D extends BusEvent.Definition>(state: State, def: D) {
72+
return Effect.gen(function* () {
73+
let ps = state.typed.get(def.type)
74+
if (!ps) {
75+
ps = yield* PubSub.unbounded<Payload>()
76+
state.typed.set(def.type, ps)
77+
}
78+
return ps as unknown as PubSub.PubSub<Payload<D>>
79+
})
80+
}
81+
82+
function publish<D extends BusEvent.Definition>(def: D, properties: z.output<D["properties"]>) {
83+
return Effect.gen(function* () {
84+
const s = yield* InstanceState.get(state)
85+
const payload: Payload = { type: def.type, properties }
86+
log.info("publishing", { type: def.type })
87+
88+
const ps = s.typed.get(def.type)
89+
if (ps) yield* PubSub.publish(ps, payload)
90+
yield* PubSub.publish(s.wildcard, payload)
91+
92+
const dir = yield* InstanceState.directory
93+
const context = yield* InstanceState.context
94+
const workspace = yield* InstanceState.workspaceID
95+
96+
GlobalBus.emit("event", {
97+
directory: dir,
98+
project: context.project.id,
99+
workspace,
100+
payload,
101+
})
102+
})
103+
}
104+
105+
function subscribe<D extends BusEvent.Definition>(def: D): Stream.Stream<Payload<D>> {
106+
log.info("subscribing", { type: def.type })
107+
return Stream.unwrap(
108+
Effect.gen(function* () {
109+
const s = yield* InstanceState.get(state)
110+
const ps = yield* getOrCreate(s, def)
111+
return Stream.fromPubSub(ps)
112+
}),
113+
).pipe(Stream.ensuring(Effect.sync(() => log.info("unsubscribing", { type: def.type }))))
114+
}
115+
116+
function subscribeAll(): Stream.Stream<Payload> {
117+
log.info("subscribing", { type: "*" })
118+
return Stream.unwrap(
119+
Effect.gen(function* () {
120+
const s = yield* InstanceState.get(state)
121+
return Stream.fromPubSub(s.wildcard)
122+
}),
123+
).pipe(Stream.ensuring(Effect.sync(() => log.info("unsubscribing", { type: "*" }))))
124+
}
125+
126+
function on<T>(pubsub: PubSub.PubSub<T>, type: string, callback: (event: T) => unknown) {
127+
return Effect.gen(function* () {
128+
log.info("subscribing", { type })
129+
const bridge = yield* EffectBridge.make()
130+
const scope = yield* Scope.make()
131+
const subscription = yield* Scope.provide(scope)(PubSub.subscribe(pubsub))
132+
133+
yield* Scope.provide(scope)(
134+
Stream.fromSubscription(subscription).pipe(
135+
Stream.runForEach((msg) =>
136+
Effect.tryPromise({
137+
try: () => Promise.resolve().then(() => callback(msg)),
138+
catch: (cause) => {
139+
log.error("subscriber failed", { type, cause })
140+
},
141+
}).pipe(Effect.ignore),
142+
),
143+
Effect.forkScoped,
144+
),
145+
)
146+
147+
return () => {
148+
log.info("unsubscribing", { type })
149+
bridge.fork(Scope.close(scope, Exit.void))
150+
}
151+
})
152+
}
153+
154+
const subscribeCallback = Effect.fn("Bus.subscribeCallback")(function* <D extends BusEvent.Definition>(
155+
def: D,
156+
callback: (event: Payload<D>) => unknown,
157+
) {
158+
const s = yield* InstanceState.get(state)
159+
const ps = yield* getOrCreate(s, def)
160+
return yield* on(ps, def.type, callback)
161+
})
162+
163+
const subscribeAllCallback = Effect.fn("Bus.subscribeAllCallback")(function* (callback: (event: any) => unknown) {
164+
const s = yield* InstanceState.get(state)
165+
return yield* on(s.wildcard, "*", callback)
166+
})
167+
168+
return Service.of({ publish, subscribe, subscribeAll, subscribeCallback, subscribeAllCallback })
169+
}),
170+
)
171+
172+
export const defaultLayer = layer
173+
174+
const { runPromise, runSync } = makeRuntime(Service, layer)
175+
176+
// runSync is safe here because the subscribe chain (InstanceState.get, PubSub.subscribe,
177+
// Scope.make, Effect.forkScoped) is entirely synchronous. If any step becomes async, this will throw.
178+
export async function publish<D extends BusEvent.Definition>(def: D, properties: z.output<D["properties"]>) {
179+
return runPromise((svc) => svc.publish(def, properties))
180+
}
181+
182+
export function subscribe<D extends BusEvent.Definition>(
183+
def: D,
184+
callback: (event: { type: D["type"]; properties: z.infer<D["properties"]> }) => unknown,
185+
) {
186+
return runSync((svc) => svc.subscribeCallback(def, callback))
187+
}
188+
189+
export function subscribeAll(callback: (event: any) => unknown) {
190+
return runSync((svc) => svc.subscribeAllCallback(callback))
191+
}
192+
193+
export * as Bus from "."

0 commit comments

Comments
 (0)