Skip to content

Commit e0c10e9

Browse files
committed
refactor(core): allow SyncEvent.define and BusEvent.define to accept Effect Schema
Overloads BusEvent.define and SyncEvent.define so payload schemas can be passed as Effect Schema values directly. Effect Schemas are converted to Zod via the effect-zod walker since the sync/bus pipelines still use Zod internally. Migrates MessageV2.Event.* to use Schema.Struct directly instead of z.object with .zod references.
1 parent 51334b9 commit e0c10e9

3 files changed

Lines changed: 80 additions & 32 deletions

File tree

packages/opencode/src/bus/bus-event.ts

Lines changed: 25 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,38 @@
11
import z from "zod"
22
import type { ZodType } from "zod"
3+
import { Schema, Types } from "effect"
4+
import { zod } from "@/util/effect-zod"
35

46
export type Definition = ReturnType<typeof define>
57

68
const registry = new Map<string, Definition>()
79

8-
export function define<Type extends string, Properties extends ZodType>(type: Type, properties: Properties) {
9-
const result = {
10-
type,
11-
properties,
12-
}
13-
registry.set(type, result)
10+
/**
11+
* Define a bus event type with a payload schema.
12+
*
13+
* Accepts either a Zod schema or an Effect Schema. Effect Schemas are
14+
* converted to Zod internally via the effect-zod walker so that the bus
15+
* continues to use Zod as the lingua franca for serialization/validation.
16+
*/
17+
export function define<Type extends string, P extends Schema.Top>(
18+
type: Type,
19+
properties: P,
20+
): { type: Type; properties: z.ZodType<Types.DeepMutable<Schema.Schema.Type<P>>> }
21+
export function define<Type extends string, P extends ZodType>(
22+
type: Type,
23+
properties: P,
24+
): { type: Type; properties: P }
25+
export function define(type: string, properties: unknown) {
26+
const zodProperties = isEffectSchema(properties) ? zod(properties) : (properties as ZodType)
27+
const result = { type, properties: zodProperties }
28+
registry.set(type, result as Definition)
1429
return result
1530
}
1631

32+
function isEffectSchema(value: unknown): value is Schema.Top {
33+
return typeof value === "object" && value !== null && "ast" in value
34+
}
35+
1736
export function payloads() {
1837
return registry
1938
.entries()

packages/opencode/src/session/message-v2.ts

Lines changed: 20 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -575,48 +575,48 @@ export const Event = {
575575
type: "message.updated",
576576
version: 1,
577577
aggregate: "sessionID",
578-
schema: z.object({
579-
sessionID: SessionID.zod,
580-
info: Info.zod,
578+
schema: Schema.Struct({
579+
sessionID: SessionID,
580+
info: _Info,
581581
}),
582582
}),
583583
Removed: SyncEvent.define({
584584
type: "message.removed",
585585
version: 1,
586586
aggregate: "sessionID",
587-
schema: z.object({
588-
sessionID: SessionID.zod,
589-
messageID: MessageID.zod,
587+
schema: Schema.Struct({
588+
sessionID: SessionID,
589+
messageID: MessageID,
590590
}),
591591
}),
592592
PartUpdated: SyncEvent.define({
593593
type: "message.part.updated",
594594
version: 1,
595595
aggregate: "sessionID",
596-
schema: z.object({
597-
sessionID: SessionID.zod,
598-
part: Part.zod,
599-
time: z.number(),
596+
schema: Schema.Struct({
597+
sessionID: SessionID,
598+
part: _Part,
599+
time: Schema.Number,
600600
}),
601601
}),
602602
PartDelta: BusEvent.define(
603603
"message.part.delta",
604-
z.object({
605-
sessionID: SessionID.zod,
606-
messageID: MessageID.zod,
607-
partID: PartID.zod,
608-
field: z.string(),
609-
delta: z.string(),
604+
Schema.Struct({
605+
sessionID: SessionID,
606+
messageID: MessageID,
607+
partID: PartID,
608+
field: Schema.String,
609+
delta: Schema.String,
610610
}),
611611
),
612612
PartRemoved: SyncEvent.define({
613613
type: "message.part.removed",
614614
version: 1,
615615
aggregate: "sessionID",
616-
schema: z.object({
617-
sessionID: SessionID.zod,
618-
messageID: MessageID.zod,
619-
partID: PartID.zod,
616+
schema: Schema.Struct({
617+
sessionID: SessionID,
618+
messageID: MessageID,
619+
partID: PartID,
620620
}),
621621
}),
622622
}

packages/opencode/src/sync/index.ts

Lines changed: 35 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import z from "zod"
22
import type { ZodObject } from "zod"
3+
import { Schema, Types } from "effect"
34
import { Database, eq } from "@/storage"
45
import { GlobalBus } from "@/bus/global"
56
import { Bus as ProjectBus } from "@/bus"
@@ -9,6 +10,7 @@ import { EventSequenceTable, EventTable } from "./event.sql"
910
import { WorkspaceContext } from "@/control-plane/workspace-context"
1011
import { EventID } from "./schema"
1112
import { Flag } from "@/flag/flag"
13+
import { zod } from "@/util/effect-zod"
1214

1315
export type Definition = {
1416
type: string
@@ -69,31 +71,58 @@ export function versionedType(type: string, version?: number) {
6971
return version ? `${type}.${version}` : type
7072
}
7173

74+
type SchemaLike<Agg extends string> =
75+
| ZodObject<Record<Agg, z.ZodType<string>>>
76+
| Schema.Struct<Record<Agg, Schema.Top>>
77+
78+
type BusSchemaLike = ZodObject | Schema.Struct<Schema.Struct.Fields>
79+
80+
type Mutable<T> = Types.DeepMutable<T>
81+
type ToZodObject<S> = S extends Schema.Top
82+
? z.ZodObject<{ [K in keyof Mutable<Schema.Schema.Type<S>>]: z.ZodType<Mutable<Schema.Schema.Type<S>>[K]> }>
83+
: S
84+
85+
/**
86+
* Define a sync event. Accepts either a Zod schema or an Effect Schema for
87+
* both `schema` and `busSchema`. Effect Schemas are converted to Zod via the
88+
* `effect-zod` walker since the sync pipeline uses Zod for validation and
89+
* JSON Schema generation.
90+
*/
7291
export function define<
7392
Type extends string,
7493
Agg extends string,
75-
Schema extends ZodObject<Record<Agg, z.ZodType<string>>>,
76-
BusSchema extends ZodObject = Schema,
77-
>(input: { type: Type; version: number; aggregate: Agg; schema: Schema; busSchema?: BusSchema }) {
94+
S extends SchemaLike<Agg>,
95+
B extends BusSchemaLike = S,
96+
>(input: { type: Type; version: number; aggregate: Agg; schema: S; busSchema?: B }) {
7897
if (frozen) {
7998
throw new Error("Error defining sync event: sync system has been frozen")
8099
}
81100

101+
const schema = toZodObject(input.schema) as ToZodObject<S>
102+
const properties = (input.busSchema ? toZodObject(input.busSchema) : schema) as ToZodObject<B>
103+
82104
const def = {
83105
type: input.type,
84106
version: input.version,
85107
aggregate: input.aggregate,
86-
schema: input.schema,
87-
properties: input.busSchema ? input.busSchema : input.schema,
108+
schema,
109+
properties,
88110
}
89111

90112
versions.set(def.type, Math.max(def.version, versions.get(def.type) || 0))
91113

92-
registry.set(versionedType(def.type, def.version), def)
114+
registry.set(versionedType(def.type, def.version), def as unknown as Definition)
93115

94116
return def
95117
}
96118

119+
function toZodObject(value: ZodObject | Schema.Top): z.ZodObject {
120+
if (typeof value === "object" && value !== null && "ast" in value) {
121+
return zod(value as Schema.Top) as unknown as z.ZodObject
122+
}
123+
return value as z.ZodObject
124+
}
125+
97126
export function project<Def extends Definition>(
98127
def: Def,
99128
func: (db: Database.TxOrDb, data: Event<Def>["data"]) => void,

0 commit comments

Comments
 (0)