Skip to content

Commit 467de8c

Browse files
feat(session): rework message pagination
Add bidirectional cursor pagination with Link headers across the server, app, TUI, and experimental HttpApi surfaces. Keep large-history revert and restore flows correct with server-backed revert previews, bounded client windows, and paged boundary hydration.
1 parent 4781564 commit 467de8c

27 files changed

Lines changed: 3072 additions & 436 deletions

File tree

packages/app/src/context/global-sync/event-reducer.ts

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,14 @@ import type { State, VcsCache } from "./types"
1515
import { trimSessions } from "./session-trim"
1616
import { dropSessionCaches } from "./session-cache"
1717
import { diffs as list, message as clean } from "@/utils/diffs"
18+
import { compareMessages } from "@/context/revert-page"
1819

1920
const SKIP_PARTS = new Set(["patch", "step-start", "step-finish"])
21+
const messageIndex = (messages: readonly Message[], id: string) => messages.findIndex((message) => message.id === id)
22+
const messageInsertIndex = (messages: readonly Message[], message: Message) => {
23+
const index = messages.findIndex((item) => compareMessages(message, item) < 0)
24+
return index === -1 ? messages.length : index
25+
}
2026

2127
export function applyGlobalEvent(input: {
2228
event: { type: string; properties?: unknown }
@@ -188,16 +194,16 @@ export function applyDirectoryEvent(input: {
188194
input.setStore("message", info.sessionID, [info])
189195
break
190196
}
191-
const result = Binary.search(messages, info.id, (m) => m.id)
192-
if (result.found) {
193-
input.setStore("message", info.sessionID, result.index, reconcile(info))
197+
const index = messageIndex(messages, info.id)
198+
if (index !== -1) {
199+
input.setStore("message", info.sessionID, index, reconcile(info))
194200
break
195201
}
196202
input.setStore(
197203
"message",
198204
info.sessionID,
199205
produce((draft) => {
200-
draft.splice(result.index, 0, info)
206+
draft.splice(messageInsertIndex(draft, info), 0, info)
201207
}),
202208
)
203209
break
@@ -208,8 +214,8 @@ export function applyDirectoryEvent(input: {
208214
produce((draft) => {
209215
const messages = draft.message[props.sessionID]
210216
if (messages) {
211-
const result = Binary.search(messages, props.messageID, (m) => m.id)
212-
if (result.found) messages.splice(result.index, 1)
217+
const index = messageIndex(messages, props.messageID)
218+
if (index !== -1) messages.splice(index, 1)
213219
}
214220
delete draft.part[props.messageID]
215221
}),
Lines changed: 147 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,147 @@
1+
import { describe, expect, test } from "bun:test"
2+
import type { Message, Part } from "@opencode-ai/sdk/v2/client"
3+
import { hasVisibleUserBeforeRevert, loadRevertAwareLatestPage } from "./revert-page"
4+
5+
const message = (id: string, role: Message["role"]): Message =>
6+
role === "assistant"
7+
? ({
8+
id,
9+
sessionID: "ses_1",
10+
role: "assistant",
11+
agent: "default",
12+
model: { providerID: "openai", modelID: "gpt-4" },
13+
time: { created: Number(id.slice(2)) },
14+
} as unknown as Message)
15+
: ({
16+
id,
17+
sessionID: "ses_1",
18+
role: "user",
19+
agent: "default",
20+
model: { providerID: "openai", modelID: "gpt-4" },
21+
time: { created: Number(id.slice(2)) },
22+
} as unknown as Message)
23+
24+
const textPart = (id: string, messageID: string): Extract<Part, { type: "text" }> => ({
25+
id,
26+
sessionID: "ses_1",
27+
messageID,
28+
type: "text",
29+
text: id,
30+
})
31+
32+
describe("revert page helpers", () => {
33+
test("detects when the loaded page has no visible user before revert", () => {
34+
expect(hasVisibleUserBeforeRevert([message("m6", "user"), message("m7", "assistant")], "m6")).toBe(false)
35+
expect(hasVisibleUserBeforeRevert([message("m5", "user"), message("m6", "user")], "m6")).toBe(true)
36+
})
37+
38+
test("loads and merges an older boundary window when latest page is fully reverted", async () => {
39+
const olderPart = textPart("p5", "m5")
40+
const boundaryPart = textPart("p6", "m6")
41+
42+
const result = await loadRevertAwareLatestPage({
43+
current: {
44+
session: [message("m6", "user"), message("m7", "assistant"), message("m8", "user")],
45+
part: [
46+
{ id: "m6", part: [boundaryPart] },
47+
{ id: "m7", part: [] },
48+
{ id: "m8", part: [] },
49+
],
50+
cursor: undefined,
51+
complete: true,
52+
},
53+
revertMessageID: "m6",
54+
fetchMessage: async () => ({ info: message("m6", "user"), parts: [boundaryPart], cursor: "boundary" }),
55+
fetchPage: async (before) => {
56+
expect(before).toBe("boundary")
57+
return {
58+
session: [message("m4", "assistant"), message("m5", "user")],
59+
part: [
60+
{ id: "m4", part: [] },
61+
{ id: "m5", part: [olderPart] },
62+
],
63+
cursor: "older",
64+
complete: false,
65+
}
66+
},
67+
})
68+
69+
expect(result.session.map((item) => item.id)).toEqual(["m4", "m5", "m6", "m7", "m8"])
70+
expect(result.part.find((item) => item.id === "m5")?.part).toEqual([olderPart])
71+
expect(result.part.find((item) => item.id === "m6")?.part).toEqual([boundaryPart])
72+
expect(result.cursor).toBe("older")
73+
expect(result.complete).toBe(false)
74+
})
75+
76+
test("keeps loading older pages until a visible user exists before revert", async () => {
77+
const boundaryPart = textPart("p6", "m6")
78+
const olderPart = textPart("p3", "m3")
79+
let call = 0
80+
81+
const result = await loadRevertAwareLatestPage({
82+
current: {
83+
session: [message("m6", "user"), message("m7", "assistant"), message("m8", "user")],
84+
part: [
85+
{ id: "m6", part: [boundaryPart] },
86+
{ id: "m7", part: [] },
87+
{ id: "m8", part: [] },
88+
],
89+
cursor: undefined,
90+
complete: true,
91+
},
92+
revertMessageID: "m6",
93+
fetchMessage: async () => ({ info: message("m6", "user"), parts: [boundaryPart], cursor: "boundary" }),
94+
fetchPage: async () => {
95+
call += 1
96+
if (call === 1) {
97+
return {
98+
session: [message("m4", "assistant"), message("m5", "assistant")],
99+
part: [
100+
{ id: "m4", part: [] },
101+
{ id: "m5", part: [] },
102+
],
103+
cursor: "older-2",
104+
complete: false,
105+
}
106+
}
107+
return {
108+
session: [message("m3", "user")],
109+
part: [{ id: "m3", part: [olderPart] }],
110+
cursor: undefined,
111+
complete: true,
112+
}
113+
},
114+
})
115+
116+
expect(call).toBe(2)
117+
expect(result.session.map((item) => item.id)).toEqual(["m3", "m4", "m5", "m6", "m7", "m8"])
118+
expect(result.part.find((item) => item.id === "m3")?.part).toEqual([olderPart])
119+
expect(result.cursor).toBeUndefined()
120+
expect(result.complete).toBe(true)
121+
})
122+
123+
test("marks stale revert boundaries for clearing when the boundary message is missing", async () => {
124+
const result = await loadRevertAwareLatestPage({
125+
current: {
126+
session: [message("m7", "assistant"), message("m8", "user")],
127+
part: [
128+
{ id: "m7", part: [] },
129+
{ id: "m8", part: [] },
130+
],
131+
cursor: undefined,
132+
complete: true,
133+
},
134+
revertMessageID: "m6",
135+
fetchMessage: async () => undefined,
136+
fetchPage: async () => ({
137+
session: [],
138+
part: [],
139+
cursor: undefined,
140+
complete: true,
141+
}),
142+
})
143+
144+
expect(result.clearedRevert).toBe(true)
145+
expect(result.session.map((item) => item.id)).toEqual(["m7", "m8"])
146+
})
147+
})
Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
import type { Message, Part } from "@opencode-ai/sdk/v2/client"
2+
3+
type MessagePage = {
4+
session: Message[]
5+
part: { id: string; part: Part[] }[]
6+
cursor?: string
7+
complete: boolean
8+
clearedRevert?: boolean
9+
}
10+
11+
type MessageWithParts = {
12+
info: Message
13+
parts: Part[]
14+
cursor?: string
15+
}
16+
17+
const cmp = (a: string, b: string) => (a < b ? -1 : a > b ? 1 : 0)
18+
export const compareMessages = (a: Message, b: Message) => a.time.created - b.time.created || cmp(a.id, b.id)
19+
export const messageBefore = (message: Message, boundary: Message) => compareMessages(message, boundary) < 0
20+
21+
const sortParts = (parts: Part[]) => parts.filter((part) => !!part?.id).sort((a, b) => cmp(a.id, b.id))
22+
23+
export function hasVisibleUserBeforeRevert(messages: Message[], revertMessageID?: string, boundary?: Message) {
24+
if (!revertMessageID) return true
25+
const revert = boundary ?? messages.find((message) => message.id === revertMessageID)
26+
if (!revert) return messages.some((message) => message.role === "user" && message.id < revertMessageID)
27+
return messages.some((message) => message.role === "user" && messageBefore(message, revert))
28+
}
29+
30+
function mergeMessages(current: Message[], older: Message[], boundary: Message) {
31+
const merged = new Map(current.filter((message) => !!message?.id).map((message) => [message.id, message] as const))
32+
for (const message of older) {
33+
if (!message?.id) continue
34+
merged.set(message.id, message)
35+
}
36+
merged.set(boundary.id, boundary)
37+
return [...merged.values()].sort(compareMessages)
38+
}
39+
40+
function mergeParts(current: MessagePage["part"], older: MessagePage["part"], boundary: MessageWithParts) {
41+
const merged = new Map(current.filter((item) => !!item?.id).map((item) => [item.id, sortParts(item.part)] as const))
42+
for (const item of older) {
43+
if (!item?.id) continue
44+
merged.set(item.id, sortParts(item.part))
45+
}
46+
merged.set(boundary.info.id, sortParts(boundary.parts))
47+
return [...merged.entries()].sort((a, b) => cmp(a[0], b[0])).map(([id, part]) => ({ id, part }))
48+
}
49+
50+
export async function loadRevertAwareLatestPage(input: {
51+
current: MessagePage
52+
revertMessageID?: string
53+
fetchMessage: (messageID: string) => Promise<MessageWithParts | undefined>
54+
fetchPage: (before: string) => Promise<MessagePage>
55+
}) {
56+
if (!input.revertMessageID) return input.current
57+
58+
const boundary = await input.fetchMessage(input.revertMessageID)
59+
if (!boundary) return { ...input.current, clearedRevert: true }
60+
if (hasVisibleUserBeforeRevert(input.current.session, input.revertMessageID, boundary.info)) return input.current
61+
if (!boundary.cursor) return input.current
62+
63+
let older = await input.fetchPage(boundary.cursor)
64+
let session = mergeMessages(input.current.session, older.session, boundary.info)
65+
let part = mergeParts(input.current.part, older.part, boundary)
66+
while (!hasVisibleUserBeforeRevert(session, input.revertMessageID) && older.cursor) {
67+
older = await input.fetchPage(older.cursor)
68+
session = mergeMessages(session, older.session, boundary.info)
69+
part = mergeParts(part, older.part, boundary)
70+
}
71+
return {
72+
session,
73+
part,
74+
cursor: older.cursor,
75+
complete: older.complete,
76+
}
77+
}

0 commit comments

Comments
 (0)