Skip to content

Commit 58244eb

Browse files
authored
feat(httpapi): bridge event stream (#24518)
1 parent e9071b0 commit 58244eb

5 files changed

Lines changed: 104 additions & 2 deletions

File tree

packages/opencode/specs/effect/http-api.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -184,7 +184,7 @@ Use raw Effect HTTP routes where `HttpApi` does not fit. The goal is deleting Ho
184184
| experimental JSON routes | `bridged` | console, tool, worktree list/mutations, global session list, resource list |
185185
| `session` | `bridged` | read, lifecycle, prompt, message/part mutations, revert, permission reply |
186186
| `sync` | `bridged` | start/replay/history |
187-
| `event` | `special` | SSE |
187+
| `event` | `bridged` | SSE via raw Effect HTTP |
188188
| `pty` | `special` | websocket |
189189
| `tui` | `special` | UI bridge |
190190

@@ -316,7 +316,7 @@ This checklist tracks bridge parity only. Checked routes are available through t
316316

317317
### Event Routes
318318

319-
- [ ] `GET /event` - SSE event stream; replace with raw Effect HTTP, not `HttpApi`.
319+
- [x] `GET /event` - SSE event stream via raw Effect HTTP.
320320

321321
### PTY Routes
322322

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
import { Bus } from "@/bus"
2+
import { Log } from "@/util"
3+
import { Effect } from "effect"
4+
import * as Stream from "effect/Stream"
5+
import { HttpRouter, HttpServerResponse } from "effect/unstable/http"
6+
7+
const log = Log.create({ service: "server" })
8+
9+
export const EventPaths = {
10+
event: "/event",
11+
} as const
12+
13+
function eventData(data: unknown) {
14+
return `data: ${JSON.stringify(data)}\n\n`
15+
}
16+
17+
export const eventRoute = HttpRouter.add(
18+
"GET",
19+
EventPaths.event,
20+
Effect.gen(function* () {
21+
const bus = yield* Bus.Service
22+
const events = bus.subscribeAll().pipe(Stream.takeUntil((event) => event.type === Bus.InstanceDisposed.type))
23+
const heartbeat = Stream.tick("10 seconds").pipe(
24+
Stream.drop(1),
25+
Stream.map(() => ({ type: "server.heartbeat", properties: {} })),
26+
)
27+
28+
log.info("event connected")
29+
return HttpServerResponse.stream(
30+
Stream.make({ type: "server.connected", properties: {} }).pipe(
31+
Stream.concat(events.pipe(Stream.merge(heartbeat, { haltStrategy: "left" }))),
32+
Stream.map(eventData),
33+
Stream.encodeText,
34+
Stream.ensuring(Effect.sync(() => log.info("event disconnected"))),
35+
),
36+
{
37+
contentType: "text/event-stream",
38+
headers: {
39+
"Cache-Control": "no-cache, no-transform",
40+
"X-Accel-Buffering": "no",
41+
"X-Content-Type-Options": "nosniff",
42+
},
43+
},
44+
)
45+
}).pipe(Effect.provide(Bus.layer)),
46+
)

packages/opencode/src/server/routes/instance/httpapi/server.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import { lazy } from "@/util/lazy"
1010
import { Filesystem } from "@/util"
1111
import { authorizationLayer } from "./auth"
1212
import { ConfigApi, configHandlers } from "./config"
13+
import { eventRoute } from "./event"
1314
import { FileApi, fileHandlers } from "./file"
1415
import { ExperimentalApi, experimentalHandlers } from "./experimental"
1516
import { InstanceApi, instanceHandlers } from "./instance"
@@ -66,6 +67,7 @@ const instance = HttpRouter.middleware()(
6667
).layer
6768

6869
export const routes = Layer.mergeAll(
70+
eventRoute,
6971
HttpApiBuilder.layer(ConfigApi).pipe(Layer.provide(configHandlers)),
7072
HttpApiBuilder.layer(ExperimentalApi).pipe(Layer.provide(experimentalHandlers)),
7173
HttpApiBuilder.layer(FileApi).pipe(Layer.provide(fileHandlers)),

packages/opencode/src/server/routes/instance/index.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import { QuestionRoutes } from "./question"
1616
import { PermissionRoutes } from "./permission"
1717
import { Flag } from "@opencode-ai/core/flag/flag"
1818
import { ExperimentalHttpApiServer } from "./httpapi/server"
19+
import { EventPaths } from "./httpapi/event"
1920
import { ExperimentalPaths } from "./httpapi/experimental"
2021
import { FilePaths } from "./httpapi/file"
2122
import { InstancePaths } from "./httpapi/instance"
@@ -41,6 +42,7 @@ export const InstanceRoutes = (upgrade: UpgradeWebSocket): Hono => {
4142
if (Flag.OPENCODE_EXPERIMENTAL_HTTPAPI) {
4243
const handler = ExperimentalHttpApiServer.webHandler().handler
4344
const context = Context.empty() as Context.Context<unknown>
45+
app.get(EventPaths.event, (c) => handler(c.req.raw, context))
4446
app.get("/question", (c) => handler(c.req.raw, context))
4547
app.post("/question/:requestID/reply", (c) => handler(c.req.raw, context))
4648
app.post("/question/:requestID/reject", (c) => handler(c.req.raw, context))
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
import { afterEach, describe, expect, test } from "bun:test"
2+
import type { UpgradeWebSocket } from "hono/ws"
3+
import { Flag } from "@opencode-ai/core/flag/flag"
4+
import { Instance } from "../../src/project/instance"
5+
import { InstanceRoutes } from "../../src/server/routes/instance"
6+
import { EventPaths } from "../../src/server/routes/instance/httpapi/event"
7+
import { Log } from "../../src/util"
8+
import { resetDatabase } from "../fixture/db"
9+
import { tmpdir } from "../fixture/fixture"
10+
11+
void Log.init({ print: false })
12+
13+
const original = Flag.OPENCODE_EXPERIMENTAL_HTTPAPI
14+
const websocket = (() => () => new Response(null, { status: 501 })) as unknown as UpgradeWebSocket
15+
16+
function app() {
17+
Flag.OPENCODE_EXPERIMENTAL_HTTPAPI = true
18+
return InstanceRoutes(websocket)
19+
}
20+
21+
async function readFirstChunk(response: Response) {
22+
if (!response.body) throw new Error("missing response body")
23+
const reader = response.body.getReader()
24+
const result = await Promise.race([
25+
reader.read(),
26+
new Promise<never>((_, reject) => setTimeout(() => reject(new Error("timed out waiting for event")), 5_000)),
27+
])
28+
await reader.cancel()
29+
return new TextDecoder().decode(result.value)
30+
}
31+
32+
afterEach(async () => {
33+
Flag.OPENCODE_EXPERIMENTAL_HTTPAPI = original
34+
await Instance.disposeAll()
35+
await resetDatabase()
36+
})
37+
38+
describe("event HttpApi bridge", () => {
39+
test("serves event stream through experimental Effect route", async () => {
40+
await using tmp = await tmpdir({ git: true, config: { formatter: false, lsp: false } })
41+
const response = await app().request(EventPaths.event, { headers: { "x-opencode-directory": tmp.path } })
42+
43+
expect(response.status).toBe(200)
44+
expect(response.headers.get("content-type")).toContain("text/event-stream")
45+
expect(response.headers.get("cache-control")).toBe("no-cache, no-transform")
46+
expect(response.headers.get("x-accel-buffering")).toBe("no")
47+
expect(response.headers.get("x-content-type-options")).toBe("nosniff")
48+
expect(await readFirstChunk(response)).toContain(
49+
'data: {"type":"server.connected","properties":{}}\n\n',
50+
)
51+
})
52+
})

0 commit comments

Comments
 (0)