Skip to content

Commit f444aff

Browse files
committed
progress
1 parent 8131d98 commit f444aff

5 files changed

Lines changed: 164 additions & 15 deletions

File tree

packages/opencode/src/server/routes/instance/httpapi/v2.ts

Lines changed: 89 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,21 +3,80 @@ import { SessionMessage } from "@/v2/session-message"
33
import { Prompt } from "@/v2/session-prompt"
44
import { SessionV2 } from "@/v2/session"
55
import { Effect, Layer, Schema } from "effect"
6-
import { HttpApi, HttpApiBuilder, HttpApiEndpoint, HttpApiGroup, HttpApiSchema, OpenApi } from "effect/unstable/httpapi"
6+
import * as DateTime from "effect/DateTime"
7+
import {
8+
HttpApi,
9+
HttpApiBuilder,
10+
HttpApiEndpoint,
11+
HttpApiError,
12+
HttpApiGroup,
13+
HttpApiSchema,
14+
OpenApi,
15+
} from "effect/unstable/httpapi"
716
import { Authorization } from "./auth"
817

18+
const DefaultMessagesLimit = 50
19+
20+
const Cursor = Schema.Struct({
21+
id: SessionMessage.ID,
22+
time: Schema.Number,
23+
})
24+
25+
const decodeCursor = Schema.decodeUnknownSync(Cursor)
26+
27+
const cursor = {
28+
encode(message: SessionMessage.Message) {
29+
return Buffer.from(
30+
JSON.stringify({ id: message.id, time: DateTime.toEpochMillis(message.time.created) }),
31+
).toString("base64url")
32+
},
33+
decode(input: string) {
34+
return decodeCursor(JSON.parse(Buffer.from(input, "base64url").toString("utf8")))
35+
},
36+
}
37+
938
export const V2Api = HttpApi.make("v2")
1039
.add(
1140
HttpApiGroup.make("v2")
1241
.add(
1342
HttpApiEndpoint.get("messages", "/api/session/:sessionID/message", {
1443
params: { sessionID: SessionID },
15-
success: Schema.Array(SessionMessage.Message),
44+
query: Schema.Struct({
45+
limit: Schema.optional(
46+
Schema.NumberFromString.check(
47+
Schema.isInt(),
48+
Schema.isGreaterThanOrEqualTo(1),
49+
Schema.isLessThanOrEqualTo(200),
50+
),
51+
).annotate({
52+
description:
53+
"Maximum number of messages to return. When omitted, the endpoint returns its default page size. Use limit without a cursor to fetch the newest page for chat history.",
54+
}),
55+
before: Schema.optional(Schema.String).annotate({
56+
description:
57+
"Opaque pagination cursor for the item at the start of the current window. Returns messages older than this cursor. Mutually exclusive with after.",
58+
}),
59+
after: Schema.optional(Schema.String).annotate({
60+
description:
61+
"Opaque pagination cursor for the item at the end of the current window. Returns messages newer than this cursor. Mutually exclusive with before.",
62+
}),
63+
from: Schema.optional(Schema.Literal("start")).annotate({
64+
description:
65+
"Start from the beginning of session history instead of the newest messages. Mutually exclusive with before and after.",
66+
}),
67+
}).annotate({ identifier: "V2SessionMessagesQuery" }),
68+
success: Schema.Struct({
69+
items: Schema.Array(SessionMessage.Message),
70+
before: Schema.String.pipe(Schema.optional),
71+
after: Schema.String.pipe(Schema.optional),
72+
}).annotate({ identifier: "V2SessionMessagesResponse" }),
73+
error: HttpApiError.BadRequest,
1674
}).annotateMerge(
1775
OpenApi.annotations({
1876
identifier: "v2.session.messages",
1977
summary: "Get v2 session messages",
20-
description: "Retrieve projected v2 messages for a session directly from the message database.",
78+
description:
79+
"Retrieve projected v2 messages for a session. For chat clients, request the latest page with limit, page backward through older history with before, and catch up with newer messages using after.",
2180
}),
2281
),
2382
)
@@ -80,11 +139,37 @@ export const V2Api = HttpApi.make("v2")
80139
export const v2Handlers = HttpApiBuilder.group(V2Api, "v2", (handlers) =>
81140
Effect.gen(function* () {
82141
const session = yield* SessionV2.Service
142+
83143
return handlers
84144
.handle(
85145
"messages",
86146
Effect.fn(function* (ctx) {
87-
return yield* session.messages(ctx.params.sessionID)
147+
if (ctx.query.before && ctx.query.after) return yield* new HttpApiError.BadRequest({})
148+
if (ctx.query.from && (ctx.query.before || ctx.query.after)) return yield* new HttpApiError.BadRequest({})
149+
const decoded = yield* Effect.try({
150+
try: () => {
151+
return {
152+
before: ctx.query.before ? cursor.decode(ctx.query.before) : undefined,
153+
after: ctx.query.after ? cursor.decode(ctx.query.after) : undefined,
154+
}
155+
},
156+
catch: () => new HttpApiError.BadRequest({}),
157+
})
158+
const limit = ctx.query.limit ?? DefaultMessagesLimit
159+
const messages = yield* session.messages({
160+
sessionID: ctx.params.sessionID,
161+
limit,
162+
before: decoded.before,
163+
after: decoded.after,
164+
start: ctx.query.from === "start",
165+
})
166+
const oldest = messages[0]
167+
const newest = messages.at(-1)
168+
return {
169+
items: messages,
170+
before: oldest ? cursor.encode(oldest) : undefined,
171+
after: newest ? cursor.encode(newest) : undefined,
172+
}
88173
}),
89174
)
90175
.handle(

packages/opencode/src/server/routes/instance/v2.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ export const V2Routes = lazy(() =>
3535
return jsonRequest("V2Routes.messages", c, function* () {
3636
return yield* Effect.gen(function* () {
3737
const session = yield* SessionV2.Service
38-
return yield* session.messages(sessionID)
38+
return yield* session.messages({ sessionID })
3939
}).pipe(Effect.provide(SessionV2.defaultLayer))
4040
})
4141
},

packages/opencode/src/v2/session.ts

Lines changed: 70 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import { SessionMessageTable } from "@/session/session.sql"
22
import type { SessionID } from "@/session/schema"
3-
import { asc, eq } from "@/storage/db"
3+
import { and, asc, desc, eq, gt, inArray, lt, or } from "@/storage/db"
44
import * as Database from "@/storage/db"
55
import { Context, Effect, Layer, Schema } from "effect"
66
import { SessionMessage } from "./session-message"
@@ -16,8 +16,32 @@ export type Delivery = Schema.Schema.Type<typeof Delivery>
1616

1717
export const DefaultDelivery = "immediate" satisfies Delivery
1818

19+
export type MessagesCursor = {
20+
id: SessionMessage.ID
21+
time: number
22+
}
23+
24+
export type MessagesInput = {
25+
sessionID: SessionID
26+
limit?: number
27+
cursor?: MessagesCursor
28+
direction?: "before" | "after"
29+
}
30+
31+
const older = (item: MessagesCursor) =>
32+
or(
33+
lt(SessionMessageTable.time_created, item.time),
34+
and(eq(SessionMessageTable.time_created, item.time), lt(SessionMessageTable.id, item.id)),
35+
)
36+
37+
const newer = (item: MessagesCursor) =>
38+
or(
39+
gt(SessionMessageTable.time_created, item.time),
40+
and(eq(SessionMessageTable.time_created, item.time), gt(SessionMessageTable.id, item.id)),
41+
)
42+
1943
export interface Interface {
20-
readonly messages: (sessionID: SessionID) => Effect.Effect<SessionMessage.Message[], never>
44+
readonly messages: (input: MessagesInput) => Effect.Effect<SessionMessage.Message[], never>
2145
readonly prompt: (input: {
2246
id?: Event.ID
2347
sessionID: SessionID
@@ -39,16 +63,54 @@ export const layer = Layer.effect(
3963
decodeMessage({ ...row.data, id: row.id, type: row.type })
4064

4165
const result: Interface = {
42-
messages: Effect.fn("V2Session.messages")(function* (sessionID) {
43-
return Database.use((db) =>
44-
db
66+
messages: Effect.fn("V2Session.messages")(function* (input) {
67+
if (input.limit === undefined) {
68+
const rows = Database.use((db) =>
69+
db
70+
.select()
71+
.from(SessionMessageTable)
72+
.where(eq(SessionMessageTable.session_id, input.sessionID))
73+
.orderBy(asc(SessionMessageTable.time_created), asc(SessionMessageTable.id))
74+
.all(),
75+
)
76+
return rows.map((row) => decode(row))
77+
}
78+
79+
const limit = input.limit
80+
const direction = input.direction ?? "before"
81+
const where = input.cursor
82+
? and(
83+
eq(SessionMessageTable.session_id, input.sessionID),
84+
direction === "after" ? newer(input.cursor) : older(input.cursor),
85+
)
86+
: eq(SessionMessageTable.session_id, input.sessionID)
87+
const rows = Database.use((db) => {
88+
if (direction === "after") {
89+
return db
90+
.select()
91+
.from(SessionMessageTable)
92+
.where(where)
93+
.orderBy(asc(SessionMessageTable.time_created), asc(SessionMessageTable.id))
94+
.limit(limit)
95+
.all()
96+
}
97+
const ids = db
98+
.select({ id: SessionMessageTable.id })
99+
.from(SessionMessageTable)
100+
.where(where)
101+
.orderBy(desc(SessionMessageTable.time_created), desc(SessionMessageTable.id))
102+
.limit(limit)
103+
.all()
104+
.map((row) => row.id)
105+
if (ids.length === 0) return []
106+
return db
45107
.select()
46108
.from(SessionMessageTable)
47-
.where(eq(SessionMessageTable.session_id, sessionID))
109+
.where(inArray(SessionMessageTable.id, ids))
48110
.orderBy(asc(SessionMessageTable.time_created), asc(SessionMessageTable.id))
49111
.all()
50-
.map((row) => decode(row)),
51-
)
112+
})
113+
return rows.map((row) => decode(row))
52114
}),
53115
prompt: Effect.fn("V2Session.prompt")(function* (input) {
54116
const delivery = input.delivery ?? DefaultDelivery

packages/opencode/test/session/compaction.test.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -597,7 +597,9 @@ describe("session.compaction.create", () => {
597597
overflow: true,
598598
})
599599

600-
const v2 = yield* SessionV2.Service.use((svc) => svc.messages(info.id)).pipe(Effect.provide(SessionV2.defaultLayer))
600+
const v2 = yield* SessionV2.Service.use((svc) => svc.messages({ sessionID: info.id })).pipe(
601+
Effect.provide(SessionV2.defaultLayer),
602+
)
601603
expect(v2.at(-1)).toMatchObject({
602604
type: "compaction",
603605
reason: "auto",

packages/opencode/test/session/prompt.test.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -396,7 +396,7 @@ it.live("prompt emits v2 prompted and synthetic events", () =>
396396
],
397397
})
398398

399-
const messages = yield* SessionV2.Service.use((session) => session.messages(chat.id)).pipe(
399+
const messages = yield* SessionV2.Service.use((session) => session.messages({ sessionID: chat.id })).pipe(
400400
Effect.provide(SessionV2.layer),
401401
)
402402
const row = Database.use((db) =>

0 commit comments

Comments
 (0)