Skip to content

Commit 420c522

Browse files
committed
feat(ws): persist WebSocket connections in SQLite with recovery/rollback
Add a `WsConnection` module that durably persists WebSocket connection state, outbound message queues, and fragmented inbound streams to the existing opencode SQLite database. Key features: - **Connection lifecycle** – `open()` / `close()` / `get()` / `setState()` track each connection with a cryptographically-random recovery token and a configurable recovery window (default 1 h). - **Durable message queue** – `enqueue()` / `markSent()` / `ack()` implement a pending → sent → acked state machine. Backpressure is handled by keeping "sent" messages in the queue until the client explicitly acks them, so nothing is lost if the network drops. - **Connection recovery** – `recover()` looks up a connection by token and replays all non-acked messages since `last_acked_seq`, enabling clients that reconnect mid-stream to continue without data loss. - **Stream / fragment support** – `beginStream()` / `appendFragment()` / `commitStream()` accumulate out-of-order fragments atomically and enqueue the fully-assembled message only when all fragments arrive. - **Stale-stream rollback** – `rollbackStaleStreams()` deletes uncommitted streams older than a given threshold, so partial transmissions from senders that disconnect mid-message do not accumulate indefinitely. - **Housekeeping** – `cleanup()` removes closed connections (and their cascaded messages / streams) outside the recovery window. Schema (migration 20260225000000): ws_connection – connection state, token, acked cursor ws_message – ordered outbound queue with delivery tracking ws_stream – fragmented inbound stream accumulator All functions accept an optional `_db` parameter for isolated unit testing without needing a full Instance context. 54 tests cover the full API surface including the end-to-end recovery and mid-stream-drop scenarios. https://claude.ai/code/session_01Urv7ih36PJX12fvs5DAQZn
1 parent 21f5078 commit 420c522

5 files changed

Lines changed: 1400 additions & 0 deletions

File tree

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
CREATE TABLE `ws_connection` (
2+
`id` text PRIMARY KEY,
3+
`user_id` text,
4+
`session_id` text,
5+
`recovery_token` text NOT NULL UNIQUE,
6+
`recovery_expires_at` integer NOT NULL,
7+
`state` text,
8+
`last_acked_seq` integer NOT NULL DEFAULT 0,
9+
`opened_at` integer NOT NULL,
10+
`closed_at` integer,
11+
`last_activity_at` integer NOT NULL,
12+
`time_created` integer NOT NULL,
13+
`time_updated` integer NOT NULL
14+
);
15+
--> statement-breakpoint
16+
CREATE TABLE `ws_message` (
17+
`seq` integer PRIMARY KEY AUTOINCREMENT,
18+
`connection_id` text NOT NULL,
19+
`type` text NOT NULL,
20+
`payload` text NOT NULL,
21+
`status` text NOT NULL DEFAULT 'pending',
22+
`delivery_attempts` integer NOT NULL DEFAULT 0,
23+
`acked_at` integer,
24+
`time_created` integer NOT NULL,
25+
`time_updated` integer NOT NULL,
26+
CONSTRAINT `ws_message_connection_id_fk` FOREIGN KEY (`connection_id`) REFERENCES `ws_connection`(`id`) ON DELETE CASCADE
27+
);
28+
--> statement-breakpoint
29+
CREATE TABLE `ws_stream` (
30+
`id` text PRIMARY KEY,
31+
`connection_id` text NOT NULL,
32+
`message_type` text NOT NULL,
33+
`fragments` text NOT NULL DEFAULT '[]',
34+
`total_fragments` integer,
35+
`committed` integer NOT NULL DEFAULT 0,
36+
`time_created` integer NOT NULL,
37+
`time_updated` integer NOT NULL,
38+
CONSTRAINT `ws_stream_connection_id_fk` FOREIGN KEY (`connection_id`) REFERENCES `ws_connection`(`id`) ON DELETE CASCADE
39+
);
40+
--> statement-breakpoint
41+
CREATE INDEX `ws_connection_recovery_idx` ON `ws_connection` (`recovery_token`);
42+
--> statement-breakpoint
43+
CREATE INDEX `ws_connection_session_idx` ON `ws_connection` (`session_id`);
44+
--> statement-breakpoint
45+
CREATE INDEX `ws_connection_closed_idx` ON `ws_connection` (`closed_at`);
46+
--> statement-breakpoint
47+
CREATE INDEX `ws_message_connection_status_idx` ON `ws_message` (`connection_id`, `status`);
48+
--> statement-breakpoint
49+
CREATE INDEX `ws_message_connection_seq_idx` ON `ws_message` (`connection_id`, `seq`);
50+
--> statement-breakpoint
51+
CREATE INDEX `ws_stream_connection_idx` ON `ws_stream` (`connection_id`);
52+
--> statement-breakpoint
53+
CREATE INDEX `ws_stream_committed_idx` ON `ws_stream` (`connection_id`, `committed`);

packages/opencode/src/storage/schema.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,3 +2,4 @@ export { ControlAccountTable } from "../control/control.sql"
22
export { SessionTable, MessageTable, PartTable, TodoTable, PermissionTable } from "../session/session.sql"
33
export { SessionShareTable } from "../share/share.sql"
44
export { ProjectTable } from "../project/project.sql"
5+
export { WsConnectionTable, WsMessageTable, WsStreamTable } from "../ws/ws.sql"

packages/opencode/src/ws/ws.sql.ts

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
import { sqliteTable, text, integer, index } from "drizzle-orm/sqlite-core"
2+
import { Timestamps } from "@/storage/schema.sql"
3+
4+
/**
5+
* Tracks the lifecycle of every WebSocket connection.
6+
* Closed connections are kept for the duration of the recovery window so
7+
* clients can reconnect and replay undelivered messages.
8+
*/
9+
export const WsConnectionTable = sqliteTable(
10+
"ws_connection",
11+
{
12+
id: text().primaryKey(),
13+
user_id: text(),
14+
session_id: text(),
15+
/** Opaque token the client stores and presents on reconnect. */
16+
recovery_token: text().notNull().unique(),
17+
/** Epoch-ms after which the recovery token is invalid. */
18+
recovery_expires_at: integer().notNull(),
19+
/** Arbitrary subscriber/room state, JSON-serialised. */
20+
state: text({ mode: "json" }).$type<Record<string, unknown>>(),
21+
/** Highest message seq the client has explicitly acknowledged. */
22+
last_acked_seq: integer().notNull().$default(() => 0),
23+
opened_at: integer().notNull(),
24+
/** NULL while the connection is still active. */
25+
closed_at: integer(),
26+
last_activity_at: integer().notNull(),
27+
...Timestamps,
28+
},
29+
(t) => [
30+
index("ws_connection_recovery_idx").on(t.recovery_token),
31+
index("ws_connection_session_idx").on(t.session_id),
32+
index("ws_connection_closed_idx").on(t.closed_at),
33+
],
34+
)
35+
36+
/**
37+
* Durable outbound message queue for a connection.
38+
* Messages progress through: pending → sent → acked (or failed).
39+
* Acked messages are retained briefly then purged by cleanup().
40+
*/
41+
export const WsMessageTable = sqliteTable(
42+
"ws_message",
43+
{
44+
/** Monotonically-increasing delivery sequence number. */
45+
seq: integer().primaryKey({ autoIncrement: true }),
46+
connection_id: text()
47+
.notNull()
48+
.references(() => WsConnectionTable.id, { onDelete: "cascade" }),
49+
type: text().notNull(),
50+
payload: text({ mode: "json" }).notNull().$type<unknown>(),
51+
status: text()
52+
.notNull()
53+
.$type<"pending" | "sent" | "acked" | "failed">()
54+
.$default(() => "pending"),
55+
delivery_attempts: integer().notNull().$default(() => 0),
56+
acked_at: integer(),
57+
...Timestamps,
58+
},
59+
(t) => [
60+
index("ws_message_connection_status_idx").on(t.connection_id, t.status),
61+
index("ws_message_connection_seq_idx").on(t.connection_id, t.seq),
62+
],
63+
)
64+
65+
/**
66+
* Tracks fragmented/streaming inbound messages.
67+
* When a large message arrives in fragments, each piece is appended here.
68+
* commitStream() assembles the fragments into a single WsMessage.
69+
* Fragments belonging to streams that never complete are rolled back by
70+
* rollbackStaleStreams().
71+
*/
72+
export const WsStreamTable = sqliteTable(
73+
"ws_stream",
74+
{
75+
id: text().primaryKey(),
76+
connection_id: text()
77+
.notNull()
78+
.references(() => WsConnectionTable.id, { onDelete: "cascade" }),
79+
message_type: text().notNull(),
80+
/** Ordered list of received fragments, JSON-serialised. */
81+
fragments: text({ mode: "json" })
82+
.notNull()
83+
.$type<{ index: number; data: string }[]>()
84+
.$default(() => []),
85+
/** Set on commitStream(); null means the total is not yet known. */
86+
total_fragments: integer(),
87+
committed: integer({ mode: "boolean" }).notNull().$default(() => false),
88+
...Timestamps,
89+
},
90+
(t) => [
91+
index("ws_stream_connection_idx").on(t.connection_id),
92+
index("ws_stream_committed_idx").on(t.connection_id, t.committed),
93+
],
94+
)

0 commit comments

Comments
 (0)