Skip to content

Commit 11ffed6

Browse files
committed
refactor(core): make SyncEvent schema-first
Store sync event definitions as Effect Schema and derive Zod only at the bus and OpenAPI edges. Bridge the remaining Zod-native session payloads through Schema annotations so the sync layer no longer needs mixed-schema definition helpers.
1 parent 6430564 commit 11ffed6

4 files changed

Lines changed: 77 additions & 66 deletions

File tree

packages/opencode/src/server/projectors.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import z from "zod"
1+
import { Schema } from "effect"
22
import sessionProjectors from "../session/projectors"
33
import { SyncEvent } from "@/sync"
44
import { Session } from "@/session"
@@ -10,7 +10,7 @@ export function initProjectors() {
1010
projectors: sessionProjectors,
1111
convertEvent: (type, data) => {
1212
if (type === "session.updated") {
13-
const id = (data as z.infer<typeof Session.Event.Updated.schema>).sessionID
13+
const id = (data as Schema.Schema.Type<typeof Session.Event.Updated.schema>).sessionID
1414
const row = Database.use((db) => db.select().from(SessionTable).where(eq(SessionTable.id, id)).get())
1515

1616
if (!row) return data

packages/opencode/src/session/session.ts

Lines changed: 33 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,8 @@ import { SessionID, MessageID, PartID } from "./schema"
2727
import type { Provider } from "@/provider"
2828
import { Permission } from "@/permission"
2929
import { Global } from "@/global"
30-
import { Effect, Layer, Option, Context } from "effect"
30+
import { Effect, Layer, Option, Context, Schema } from "effect"
31+
import { ZodOverride } from "@/util/effect-zod"
3132

3233
const log = Log.create({ service: "session" })
3334

@@ -200,40 +201,51 @@ export const SetRevertInput = z.object({
200201
})
201202
export const MessagesInput = z.object({ sessionID: SessionID.zod, limit: z.number().optional() })
202203

204+
function schemaFromZod<T extends z.ZodTypeAny>(value: T) {
205+
return Schema.declare((input): input is z.output<T> => value.safeParse(input).success).annotate({
206+
[ZodOverride]: value,
207+
})
208+
}
209+
210+
const SessionInfoSchema = schemaFromZod(Info)
211+
const SessionUpdateInfoSchema = schemaFromZod(
212+
updateSchema(Info).extend({
213+
share: updateSchema(Info.shape.share.unwrap()).optional(),
214+
time: updateSchema(Info.shape.time).optional(),
215+
}),
216+
)
217+
203218
export const Event = {
204219
Created: SyncEvent.define({
205220
type: "session.created",
206221
version: 1,
207222
aggregate: "sessionID",
208-
schema: z.object({
209-
sessionID: SessionID.zod,
210-
info: Info,
211-
}),
223+
schema: {
224+
sessionID: SessionID,
225+
info: SessionInfoSchema,
226+
},
212227
}),
213228
Updated: SyncEvent.define({
214229
type: "session.updated",
215230
version: 1,
216231
aggregate: "sessionID",
217-
schema: z.object({
218-
sessionID: SessionID.zod,
219-
info: updateSchema(Info).extend({
220-
share: updateSchema(Info.shape.share.unwrap()).optional(),
221-
time: updateSchema(Info.shape.time).optional(),
222-
}),
223-
}),
224-
busSchema: z.object({
225-
sessionID: SessionID.zod,
226-
info: Info,
227-
}),
232+
schema: {
233+
sessionID: SessionID,
234+
info: SessionUpdateInfoSchema,
235+
},
236+
busSchema: {
237+
sessionID: SessionID,
238+
info: SessionInfoSchema,
239+
},
228240
}),
229241
Deleted: SyncEvent.define({
230242
type: "session.deleted",
231243
version: 1,
232244
aggregate: "sessionID",
233-
schema: z.object({
234-
sessionID: SessionID.zod,
235-
info: Info,
236-
}),
245+
schema: {
246+
sessionID: SessionID,
247+
info: SessionInfoSchema,
248+
},
237249
}),
238250
Diff: BusEvent.define(
239251
"session.diff",
@@ -379,7 +391,7 @@ export interface Interface {
379391

380392
export class Service extends Context.Service<Service, Interface>()("@opencode/Session") {}
381393

382-
type Patch = z.infer<typeof Event.Updated.schema>["info"]
394+
type Patch = Schema.Schema.Type<typeof Event.Updated.schema>["info"]
383395

384396
const db = <T>(fn: (d: Parameters<typeof Database.use>[0] extends (trx: infer D) => any ? D : never) => T) =>
385397
Effect.sync(() => Database.use(fn))

packages/opencode/src/sync/index.ts

Lines changed: 39 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
import z from "zod"
2-
import type { ZodObject } from "zod"
32
import { Schema, Types } from "effect"
43
import { Database, eq } from "@/storage"
54
import { GlobalBus } from "@/bus/global"
@@ -12,22 +11,34 @@ import { EventID } from "./schema"
1211
import { Flag } from "@/flag/flag"
1312
import { zod } from "@/util/effect-zod"
1413

14+
type StructLike<Fields extends Schema.Struct.Fields> = Fields | Schema.Struct<Fields>
15+
1516
export type Definition = {
1617
type: string
1718
version: number
1819
aggregate: string
19-
schema: z.ZodObject
20+
schema: Schema.Top
21+
busSchema: Schema.Top
22+
properties: z.ZodTypeAny
23+
}
24+
25+
type MutableType<S extends Schema.Top> = Types.DeepMutable<Schema.Schema.Type<S>>
2026

21-
// This is temporary and only exists for compatibility with bus
22-
// event definitions
23-
properties: z.ZodObject
27+
type DefinedEvent<Type extends string, Agg extends string, SchemaDef extends Schema.Top, BusDef extends Schema.Top> = Definition & {
28+
type: Type
29+
aggregate: Agg
30+
schema: SchemaDef
31+
busSchema: BusDef
32+
properties: z.ZodType<MutableType<BusDef>>
2433
}
2534

35+
type Data<Def extends Definition> = MutableType<Def["schema"]>
36+
2637
export type Event<Def extends Definition = Definition> = {
2738
id: string
2839
seq: number
2940
aggregateID: string
30-
data: z.infer<Def["schema"]>
41+
data: Data<Def>
3142
}
3243

3344
export type SerializedEvent<Def extends Definition = Definition> = Event<Def> & { type: string }
@@ -56,7 +67,7 @@ export function init(input: { projectors: Array<[Definition, ProjectorFunc]>; co
5667
for (let [type, version] of versions.entries()) {
5768
let def = registry.get(versionedType(type, version))!
5869

59-
BusEvent.define(def.type, def.properties || def.schema)
70+
BusEvent.define(def.type, def.properties)
6071
}
6172

6273
// Freeze the system so it clearly errors if events are defined
@@ -71,58 +82,46 @@ export function versionedType(type: string, version?: number) {
7182
return version ? `${type}.${version}` : type
7283
}
7384

74-
type SchemaLike<Agg extends string> =
75-
| ZodObject<Record<Agg, z.ZodType<string>>>
76-
| Schema.Struct<Record<Agg, Schema.Top>>
77-
78-
type BusSchemaLike = ZodObject | Schema.Struct<Schema.Struct.Fields>
79-
80-
type Mutable<T> = Types.DeepMutable<T>
81-
type ToZodObject<S> = S extends Schema.Top
82-
? z.ZodObject<{ [K in keyof Mutable<Schema.Schema.Type<S>>]: z.ZodType<Mutable<Schema.Schema.Type<S>>[K]> }>
83-
: S
85+
function struct<Fields extends Schema.Struct.Fields>(value: StructLike<Fields>) {
86+
return (Schema.isSchema(value) ? value : Schema.Struct(value as Fields)) as Schema.Struct<Fields>
87+
}
8488

85-
/**
86-
* Define a sync event. Accepts either a Zod schema or an Effect Schema for
87-
* both `schema` and `busSchema`. Effect Schemas are converted to Zod via the
88-
* `effect-zod` walker since the sync pipeline uses Zod for validation and
89-
* JSON Schema generation.
90-
*/
9189
export function define<
9290
Type extends string,
9391
Agg extends string,
94-
S extends SchemaLike<Agg>,
95-
B extends BusSchemaLike = S,
96-
>(input: { type: Type; version: number; aggregate: Agg; schema: S; busSchema?: B }) {
92+
Fields extends Schema.Struct.Fields & Record<Agg, Schema.Top>,
93+
BusFields extends Schema.Struct.Fields = Fields,
94+
>(input: {
95+
type: Type
96+
version: number
97+
aggregate: Agg
98+
schema: StructLike<Fields>
99+
busSchema?: StructLike<BusFields>
100+
}): DefinedEvent<Type, Agg, Schema.Struct<Fields>, Schema.Struct<BusFields>> {
97101
if (frozen) {
98102
throw new Error("Error defining sync event: sync system has been frozen")
99103
}
100104

101-
const schema = toZodObject(input.schema) as ToZodObject<S>
102-
const properties = (input.busSchema ? toZodObject(input.busSchema) : schema) as ToZodObject<B>
105+
const schema = struct(input.schema)
106+
const busSchema = (input.busSchema ? struct(input.busSchema) : schema) as Schema.Struct<BusFields>
107+
const properties = zod(busSchema) as unknown as z.ZodType<MutableType<typeof busSchema>>
103108

104-
const def = {
109+
const def: DefinedEvent<Type, Agg, typeof schema, typeof busSchema> = {
105110
type: input.type,
106111
version: input.version,
107112
aggregate: input.aggregate,
108113
schema,
114+
busSchema,
109115
properties,
110116
}
111117

112118
versions.set(def.type, Math.max(def.version, versions.get(def.type) || 0))
113119

114-
registry.set(versionedType(def.type, def.version), def as unknown as Definition)
120+
registry.set(versionedType(def.type, def.version), def)
115121

116122
return def
117123
}
118124

119-
function toZodObject(value: ZodObject | Schema.Top): z.ZodObject {
120-
if (Schema.isSchema(value)) {
121-
return zod(value as Schema.Top) as unknown as z.ZodObject
122-
}
123-
return value as z.ZodObject
124-
}
125-
126125
export function project<Def extends Definition>(
127126
def: Def,
128127
func: (db: Database.TxOrDb, data: Event<Def>["data"]) => void,
@@ -172,10 +171,10 @@ function process<Def extends Definition>(def: Def, event: Event<Def>, options: {
172171
const result = convertEvent(def.type, event.data)
173172
if (result instanceof Promise) {
174173
void result.then((data) => {
175-
void ProjectBus.publish({ type: def.type, properties: def.schema }, data)
174+
void ProjectBus.publish({ type: def.type, properties: def.properties }, data)
176175
})
177176
} else {
178-
void ProjectBus.publish({ type: def.type, properties: def.schema }, result)
177+
void ProjectBus.publish({ type: def.type, properties: def.properties }, result)
179178
}
180179

181180
GlobalBus.emit("event", {
@@ -295,7 +294,7 @@ export function payloads() {
295294
id: z.string(),
296295
seq: z.number(),
297296
aggregateID: z.literal(def.aggregate),
298-
data: def.schema,
297+
data: zod(def.schema),
299298
})
300299
.meta({
301300
ref: `SyncEvent.${def.type}`,

packages/opencode/test/sync/index.test.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -44,13 +44,13 @@ describe("SyncEvent", () => {
4444
type: "item.created",
4545
version: 1,
4646
aggregate: "id",
47-
schema: z.object({ id: z.string(), name: z.string() }),
47+
schema: { id: Schema.String, name: Schema.String },
4848
})
4949
const Sent = SyncEvent.define({
5050
type: "item.sent",
5151
version: 1,
5252
aggregate: "item_id",
53-
schema: z.object({ item_id: z.string(), to: z.string() }),
53+
schema: { item_id: Schema.String, to: Schema.String },
5454
})
5555

5656
SyncEvent.init({
@@ -139,7 +139,7 @@ describe("SyncEvent", () => {
139139
type: "item.effect.created",
140140
version: 1,
141141
aggregate: "id",
142-
schema: Schema.Struct({ id: Schema.String, name: Schema.String }),
142+
schema: { id: Schema.String, name: Schema.String },
143143
})
144144

145145
SyncEvent.init({

0 commit comments

Comments
 (0)