Skip to content

Commit a8c78fc

Browse files
authored
fix(core): add historical sync on workspace connect (#23121)
1 parent fcb473f commit a8c78fc

8 files changed

Lines changed: 234 additions & 24 deletions

File tree

packages/opencode/src/cli/cmd/tui/context/sdk.tsx

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ import { createOpencodeClient } from "@opencode-ai/sdk/v2"
22
import type { GlobalEvent } from "@opencode-ai/sdk/v2"
33
import { createSimpleContext } from "./helper"
44
import { createGlobalEmitter } from "@solid-primitives/event-bus"
5+
import { Flag } from "@/flag/flag"
56
import { batch, onCleanup, onMount } from "solid-js"
67

78
export type EventSource = {
@@ -39,6 +40,8 @@ export const { use: useSDK, provider: SDKProvider } = createSimpleContext({
3940
let queue: GlobalEvent[] = []
4041
let timer: Timer | undefined
4142
let last = 0
43+
const retryDelay = 1000
44+
const maxRetryDelay = 30000
4245

4346
const flush = () => {
4447
if (queue.length === 0) return
@@ -73,9 +76,20 @@ export const { use: useSDK, provider: SDKProvider } = createSimpleContext({
7376
const ctrl = new AbortController()
7477
sse = ctrl
7578
;(async () => {
79+
let attempt = 0
7680
while (true) {
7781
if (abort.signal.aborted || ctrl.signal.aborted) break
78-
const events = await sdk.global.event({ signal: ctrl.signal })
82+
83+
const events = await sdk.global.event({
84+
signal: ctrl.signal,
85+
sseMaxRetryAttempts: 0,
86+
})
87+
88+
if (Flag.OPENCODE_EXPERIMENTAL_WORKSPACES) {
89+
// Start syncing workspaces, it's important to do this after
90+
// we've started listening to events
91+
await sdk.sync.start().catch(() => {})
92+
}
7993

8094
for await (const event of events.stream) {
8195
if (ctrl.signal.aborted) break
@@ -84,6 +98,12 @@ export const { use: useSDK, provider: SDKProvider } = createSimpleContext({
8498

8599
if (timer) clearTimeout(timer)
86100
if (queue.length > 0) flush()
101+
attempt += 1
102+
if (abort.signal.aborted || ctrl.signal.aborted) break
103+
104+
// Exponential backoff
105+
const backoff = Math.min(retryDelay * 2 ** (attempt - 1), maxRetryDelay)
106+
await new Promise((resolve) => setTimeout(resolve, backoff))
87107
}
88108
})().catch(() => {})
89109
}
@@ -92,6 +112,12 @@ export const { use: useSDK, provider: SDKProvider } = createSimpleContext({
92112
if (props.events) {
93113
const unsub = await props.events.subscribe(handleEvent)
94114
onCleanup(unsub)
115+
116+
if (Flag.OPENCODE_EXPERIMENTAL_WORKSPACES) {
117+
// Start syncing workspaces, it's important to do this after
118+
// we've started listening to events
119+
await sdk.sync.start().catch(() => {})
120+
}
95121
} else {
96122
startSSE()
97123
}

packages/opencode/src/control-plane/workspace.ts

Lines changed: 85 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ import { BusEvent } from "@/bus/bus-event"
77
import { GlobalBus } from "@/bus/global"
88
import { Auth } from "@/auth"
99
import { SyncEvent } from "@/sync"
10-
import { EventTable } from "@/sync/event.sql"
10+
import { EventSequenceTable, EventTable } from "@/sync/event.sql"
1111
import { Flag } from "@/flag/flag"
1212
import { Log } from "@/util"
1313
import { Filesystem } from "@/util"
@@ -23,8 +23,8 @@ import { SessionTable } from "@/session/session.sql"
2323
import { SessionID } from "@/session/schema"
2424
import { errorData } from "@/util/error"
2525
import { AppRuntime } from "@/effect/app-runtime"
26-
import { EventSequenceTable } from "@/sync/event.sql"
2726
import { waitEvent } from "./util"
27+
import { WorkspaceContext } from "./workspace-context"
2828

2929
export const Info = WorkspaceInfo.meta({
3030
ref: "Workspace",
@@ -297,22 +297,13 @@ export function list(project: Project.Info) {
297297
db.select().from(WorkspaceTable).where(eq(WorkspaceTable.project_id, project.id)).all(),
298298
)
299299
const spaces = rows.map(fromRow).sort((a, b) => a.id.localeCompare(b.id))
300-
301-
for (const space of spaces) startSync(space)
302300
return spaces
303301
}
304302

305-
function lookup(id: WorkspaceID) {
303+
export const get = fn(WorkspaceID.zod, async (id) => {
306304
const row = Database.use((db) => db.select().from(WorkspaceTable).where(eq(WorkspaceTable.id, id)).get())
307305
if (!row) return
308306
return fromRow(row)
309-
}
310-
311-
export const get = fn(WorkspaceID.zod, async (id) => {
312-
const space = lookup(id)
313-
if (!space) return
314-
startSync(space)
315-
return space
316307
})
317308

318309
export const remove = fn(WorkspaceID.zod, async (id) => {
@@ -437,6 +428,70 @@ async function connectSSE(url: URL | string, headers: HeadersInit | undefined, s
437428
return res.body
438429
}
439430

431+
async function syncHistory(space: Info, url: URL | string, headers: HeadersInit | undefined, signal: AbortSignal) {
432+
const sessionIDs = Database.use((db) =>
433+
db
434+
.select({ id: SessionTable.id })
435+
.from(SessionTable)
436+
.where(eq(SessionTable.workspace_id, space.id))
437+
.all()
438+
.map((row) => row.id),
439+
)
440+
const state = sessionIDs.length
441+
? Object.fromEntries(
442+
Database.use((db) =>
443+
db.select().from(EventSequenceTable).where(inArray(EventSequenceTable.aggregate_id, sessionIDs)).all(),
444+
).map((row) => [row.aggregate_id, row.seq]),
445+
)
446+
: {}
447+
448+
log.info("syncing workspace history", {
449+
workspaceID: space.id,
450+
sessions: sessionIDs.length,
451+
known: Object.keys(state).length,
452+
})
453+
454+
const requestHeaders = new Headers(headers)
455+
requestHeaders.set("content-type", "application/json")
456+
457+
const res = await fetch(route(url, "/sync/history"), {
458+
method: "POST",
459+
headers: requestHeaders,
460+
body: JSON.stringify(state),
461+
signal,
462+
})
463+
464+
if (!res.ok) {
465+
const body = await res.text()
466+
throw new Error(`Workspace history HTTP failure: ${res.status} ${body}`)
467+
}
468+
469+
const events = await res.json()
470+
471+
return WorkspaceContext.provide({
472+
workspaceID: space.id,
473+
fn: () => {
474+
for (const event of events) {
475+
SyncEvent.replay(
476+
{
477+
id: event.id,
478+
aggregateID: event.aggregate_id,
479+
seq: event.seq,
480+
type: event.type,
481+
data: event.data,
482+
},
483+
{ publish: true },
484+
)
485+
}
486+
},
487+
})
488+
489+
log.info("workspace history synced", {
490+
workspaceID: space.id,
491+
events: events.length,
492+
})
493+
}
494+
440495
async function syncWorkspaceLoop(space: Info, signal: AbortSignal) {
441496
const adaptor = await getAdaptor(space.projectID, space.type)
442497
const target = await adaptor.target(space)
@@ -452,7 +507,9 @@ async function syncWorkspaceLoop(space: Info, signal: AbortSignal) {
452507
let stream
453508
try {
454509
stream = await connectSSE(target.url, target.headers, signal)
510+
await syncHistory(space, target.url, target.headers, signal)
455511
} catch (err) {
512+
stream = null
456513
setStatus(space.id, "error")
457514
log.info("failed to connect to global sync", {
458515
workspace: space.name,
@@ -469,6 +526,7 @@ async function syncWorkspaceLoop(space: Info, signal: AbortSignal) {
469526
await parseSSE(stream, signal, (evt: any) => {
470527
try {
471528
if (!("payload" in evt)) return
529+
if (evt.payload.type === "server.heartbeat") return
472530

473531
if (evt.payload.type === "sync") {
474532
SyncEvent.replay(evt.payload.syncEvent as SyncEvent.SerializedEvent)
@@ -536,4 +594,19 @@ function stopSync(id: WorkspaceID) {
536594
connections.delete(id)
537595
}
538596

597+
export function startWorkspaceSyncing(projectID: ProjectID) {
598+
const spaces = Database.use((db) =>
599+
db
600+
.select({ workspace: WorkspaceTable })
601+
.from(WorkspaceTable)
602+
.innerJoin(SessionTable, eq(SessionTable.workspace_id, WorkspaceTable.id))
603+
.where(eq(WorkspaceTable.project_id, projectID))
604+
.all(),
605+
)
606+
607+
for (const row of new Map(spaces.map((row) => [row.workspace.id, row.workspace])).values()) {
608+
void startSync(fromRow(row))
609+
}
610+
}
611+
539612
export * as Workspace from "./workspace"

packages/opencode/src/server/proxy.ts

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -130,13 +130,6 @@ export async function http(url: string | URL, extra: HeadersInit | undefined, re
130130
const done = sync ? Fence.wait(workspaceID, sync, req.signal) : Promise.resolve()
131131

132132
return done.then(async () => {
133-
console.log("proxy http response", {
134-
method: req.method,
135-
request: req.url,
136-
url: String(url),
137-
status: res.status,
138-
statusText: res.statusText,
139-
})
140133
return new Response(res.body, {
141134
status: res.status,
142135
statusText: res.statusText,

packages/opencode/src/server/routes/instance/sync.ts

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@ import { Database, asc, and, not, or, lte, eq } from "@/storage"
66
import { EventTable } from "@/sync/event.sql"
77
import { lazy } from "@/util/lazy"
88
import { Log } from "@/util"
9+
import { startWorkspaceSyncing } from "@/control-plane/workspace"
10+
import { Instance } from "@/project/instance"
911
import { errors } from "../../error"
1012

1113
const ReplayEvent = z.object({
@@ -20,6 +22,28 @@ const log = Log.create({ service: "server.sync" })
2022

2123
export const SyncRoutes = lazy(() =>
2224
new Hono()
25+
.post(
26+
"/start",
27+
describeRoute({
28+
summary: "Start workspace sync",
29+
description: "Start sync loops for workspaces in the current project that have active sessions.",
30+
operationId: "sync.start",
31+
responses: {
32+
200: {
33+
description: "Workspace sync started",
34+
content: {
35+
"application/json": {
36+
schema: resolver(z.boolean()),
37+
},
38+
},
39+
},
40+
},
41+
}),
42+
async (c) => {
43+
startWorkspaceSyncing(Instance.project.id)
44+
return c.json(true)
45+
},
46+
)
2347
.post(
2448
"/replay",
2549
describeRoute({
@@ -75,7 +99,7 @@ export const SyncRoutes = lazy(() =>
7599
})
76100
},
77101
)
78-
.get(
102+
.post(
79103
"/history",
80104
describeRoute({
81105
summary: "List sync events",

packages/opencode/test/workspace/workspace-restore.test.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -141,9 +141,12 @@ describe("Workspace.sessionRestore", () => {
141141
Object.assign(
142142
async (input: URL | RequestInfo, init?: BunFetchRequestInit | RequestInit) => {
143143
const url = new URL(typeof input === "string" || input instanceof URL ? input : input.url)
144-
if (url.pathname !== "/base/sync/replay") {
144+
if (url.pathname === "/base/global/event") {
145145
return eventStreamResponse()
146146
}
147+
if (url.pathname === "/base/sync/history") {
148+
return Response.json([])
149+
}
147150
const body = JSON.parse(String(init?.body))
148151
posts.push({
149152
path: url.pathname,

packages/sdk/js/src/v2/gen/sdk.gen.ts

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,7 @@ import type {
163163
SyncHistoryListResponses,
164164
SyncReplayErrors,
165165
SyncReplayResponses,
166+
SyncStartResponses,
166167
TextPartInput,
167168
ToolIdsErrors,
168169
ToolIdsResponses,
@@ -3038,7 +3039,7 @@ export class History extends HeyApiClient {
30383039
},
30393040
],
30403041
)
3041-
return (options?.client ?? this.client).get<SyncHistoryListResponses, SyncHistoryListErrors, ThrowOnError>({
3042+
return (options?.client ?? this.client).post<SyncHistoryListResponses, SyncHistoryListErrors, ThrowOnError>({
30423043
url: "/sync/history",
30433044
...options,
30443045
...params,
@@ -3052,6 +3053,36 @@ export class History extends HeyApiClient {
30523053
}
30533054

30543055
export class Sync extends HeyApiClient {
3056+
/**
3057+
* Start workspace sync
3058+
*
3059+
* Start sync loops for workspaces in the current project that have active sessions.
3060+
*/
3061+
public start<ThrowOnError extends boolean = false>(
3062+
parameters?: {
3063+
directory?: string
3064+
workspace?: string
3065+
},
3066+
options?: Options<never, ThrowOnError>,
3067+
) {
3068+
const params = buildClientParams(
3069+
[parameters],
3070+
[
3071+
{
3072+
args: [
3073+
{ in: "query", key: "directory" },
3074+
{ in: "query", key: "workspace" },
3075+
],
3076+
},
3077+
],
3078+
)
3079+
return (options?.client ?? this.client).post<SyncStartResponses, unknown, ThrowOnError>({
3080+
url: "/sync/start",
3081+
...options,
3082+
...params,
3083+
})
3084+
}
3085+
30553086
/**
30563087
* Replay sync events
30573088
*

packages/sdk/js/src/v2/gen/types.gen.ts

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4502,6 +4502,25 @@ export type ProviderOauthCallbackResponses = {
45024502

45034503
export type ProviderOauthCallbackResponse = ProviderOauthCallbackResponses[keyof ProviderOauthCallbackResponses]
45044504

4505+
export type SyncStartData = {
4506+
body?: never
4507+
path?: never
4508+
query?: {
4509+
directory?: string
4510+
workspace?: string
4511+
}
4512+
url: "/sync/start"
4513+
}
4514+
4515+
export type SyncStartResponses = {
4516+
/**
4517+
* Workspace sync started
4518+
*/
4519+
200: boolean
4520+
}
4521+
4522+
export type SyncStartResponse = SyncStartResponses[keyof SyncStartResponses]
4523+
45054524
export type SyncReplayData = {
45064525
body?: {
45074526
directory: string

0 commit comments

Comments
 (0)