diff --git a/packages/opencode/src/bus/bus-event.ts b/packages/opencode/src/bus/bus-event.ts index efaed944066c..90a0668dba52 100644 --- a/packages/opencode/src/bus/bus-event.ts +++ b/packages/opencode/src/bus/bus-event.ts @@ -1,16 +1,31 @@ import z from "zod" import type { ZodType } from "zod" +import { Schema, Types } from "effect" +import { zod } from "@/util/effect-zod" export type Definition = ReturnType const registry = new Map() -export function define(type: Type, properties: Properties) { - const result = { - type, - properties, - } - registry.set(type, result) +/** + * Define a bus event type with a payload schema. + * + * Accepts either a Zod schema or an Effect Schema. Effect Schemas are + * converted to Zod internally via the effect-zod walker so that the bus + * continues to use Zod as the lingua franca for serialization/validation. + */ +export function define( + type: Type, + properties: P, +): { type: Type; properties: z.ZodType>> } +export function define( + type: Type, + properties: P, +): { type: Type; properties: P } +export function define(type: string, properties: unknown) { + const zodProperties = Schema.isSchema(properties) ? zod(properties) : (properties as ZodType) + const result = { type, properties: zodProperties } + registry.set(type, result as Definition) return result } diff --git a/packages/opencode/src/server/projectors.ts b/packages/opencode/src/server/projectors.ts index cfecce526599..b5cc446b870e 100644 --- a/packages/opencode/src/server/projectors.ts +++ b/packages/opencode/src/server/projectors.ts @@ -1,16 +1,18 @@ -import z from "zod" +import { Schema } from "effect" import sessionProjectors from "../session/projectors" import { SyncEvent } from "@/sync" import { Session } from "@/session" import { SessionTable } from "@/session/session.sql" import { Database, eq } from "@/storage" +const isSessionUpdated = Schema.is(Session.Event.Updated.schema) + export function initProjectors() { SyncEvent.init({ projectors: sessionProjectors, convertEvent: (type, data) => { - if (type === "session.updated") { - const id = (data as z.infer).sessionID + if (type === Session.Event.Updated.type && isSessionUpdated(data)) { + const id = data.sessionID const row = Database.use((db) => db.select().from(SessionTable).where(eq(SessionTable.id, id)).get()) if (!row) return data diff --git a/packages/opencode/src/session/message-v2.ts b/packages/opencode/src/session/message-v2.ts index 980dd4da844f..7664b09adaa5 100644 --- a/packages/opencode/src/session/message-v2.ts +++ b/packages/opencode/src/session/message-v2.ts @@ -581,48 +581,48 @@ export const Event = { type: "message.updated", version: 1, aggregate: "sessionID", - schema: z.object({ - sessionID: SessionID.zod, - info: Info.zod, + schema: Schema.Struct({ + sessionID: SessionID, + info: _Info, }), }), Removed: SyncEvent.define({ type: "message.removed", version: 1, aggregate: "sessionID", - schema: z.object({ - sessionID: SessionID.zod, - messageID: MessageID.zod, + schema: Schema.Struct({ + sessionID: SessionID, + messageID: MessageID, }), }), PartUpdated: SyncEvent.define({ type: "message.part.updated", version: 1, aggregate: "sessionID", - schema: z.object({ - sessionID: SessionID.zod, - part: Part.zod, - time: z.number(), + schema: Schema.Struct({ + sessionID: SessionID, + part: _Part, + time: Schema.Number, }), }), PartDelta: BusEvent.define( "message.part.delta", - z.object({ - sessionID: SessionID.zod, - messageID: MessageID.zod, - partID: PartID.zod, - field: z.string(), - delta: z.string(), + Schema.Struct({ + sessionID: SessionID, + messageID: MessageID, + partID: PartID, + field: Schema.String, + delta: Schema.String, }), ), PartRemoved: SyncEvent.define({ type: "message.part.removed", version: 1, aggregate: "sessionID", - schema: z.object({ - sessionID: SessionID.zod, - messageID: MessageID.zod, - partID: PartID.zod, + schema: Schema.Struct({ + sessionID: SessionID, + messageID: MessageID, + partID: PartID, }), }), } diff --git a/packages/opencode/src/session/session.ts b/packages/opencode/src/session/session.ts index d2bdbccb7dbc..3b9e2ebb4be2 100644 --- a/packages/opencode/src/session/session.ts +++ b/packages/opencode/src/session/session.ts @@ -28,7 +28,7 @@ import type { Provider } from "@/provider" import { Permission } from "@/permission" import { Global } from "@/global" import { Effect, Layer, Option, Context, Schema, Types } from "effect" -import { zod, zodObject } from "@/util/effect-zod" +import { fromZod, zod, zodObject } from "@/util/effect-zod" import { withStatics } from "@/util/schema" const log = Log.create({ service: "session" }) @@ -215,40 +215,44 @@ export const MessagesInput = Schema.Struct({ limit: Schema.optional(Schema.Number), }).pipe(withStatics((s) => ({ zod: zod(s) }))) +const SessionUpdateInfoSchema = fromZod( + updateSchema(zodObject(Info)).extend({ + share: updateSchema(zodObject(Share)).optional(), + time: updateSchema(zodObject(Time)).optional(), + }), +) + export const Event = { Created: SyncEvent.define({ type: "session.created", version: 1, aggregate: "sessionID", - schema: z.object({ - sessionID: SessionID.zod, - info: Info.zod, - }), + schema: { + sessionID: SessionID, + info: Info, + }, }), Updated: SyncEvent.define({ type: "session.updated", version: 1, aggregate: "sessionID", - schema: z.object({ - sessionID: SessionID.zod, - info: updateSchema(zodObject(Info)).extend({ - share: updateSchema(zodObject(Share)).optional(), - time: updateSchema(zodObject(Time)).optional(), - }), - }), - busSchema: z.object({ - sessionID: SessionID.zod, - info: Info.zod, - }), + schema: { + sessionID: SessionID, + info: SessionUpdateInfoSchema, + }, + busSchema: { + sessionID: SessionID, + info: Info, + }, }), Deleted: SyncEvent.define({ type: "session.deleted", version: 1, aggregate: "sessionID", - schema: z.object({ - sessionID: SessionID.zod, - info: Info.zod, - }), + schema: { + sessionID: SessionID, + info: Info, + }, }), Diff: BusEvent.define( "session.diff", @@ -394,7 +398,7 @@ export interface Interface { export class Service extends Context.Service()("@opencode/Session") {} -type Patch = z.infer["info"] +type Patch = Schema.Schema.Type["info"] const db = (fn: (d: Parameters[0] extends (trx: infer D) => any ? D : never) => T) => Effect.sync(() => Database.use(fn)) diff --git a/packages/opencode/src/sync/index.ts b/packages/opencode/src/sync/index.ts index 125d8c95506e..2248ed15d2ab 100644 --- a/packages/opencode/src/sync/index.ts +++ b/packages/opencode/src/sync/index.ts @@ -1,5 +1,5 @@ import z from "zod" -import type { ZodObject } from "zod" +import { Schema, Types } from "effect" import { Database, eq } from "@/storage" import { GlobalBus } from "@/bus/global" import { Bus as ProjectBus } from "@/bus" @@ -9,23 +9,36 @@ import { EventSequenceTable, EventTable } from "./event.sql" import { WorkspaceContext } from "@/control-plane/workspace-context" import { EventID } from "./schema" import { Flag } from "@/flag/flag" +import { zod } from "@/util/effect-zod" + +type StructLike = Fields | Schema.Struct export type Definition = { type: string version: number aggregate: string - schema: z.ZodObject + schema: Schema.Top + busSchema: Schema.Top + properties: z.ZodTypeAny +} - // This is temporary and only exists for compatibility with bus - // event definitions - properties: z.ZodObject +type MutableType = Types.DeepMutable> + +type DefinedEvent = Definition & { + type: Type + aggregate: Agg + schema: SchemaDef + busSchema: BusDef + properties: z.ZodType> } +type Data = MutableType + export type Event = { id: string seq: number aggregateID: string - data: z.infer + data: Data } export type SerializedEvent = Event & { type: string } @@ -54,7 +67,7 @@ export function init(input: { projectors: Array<[Definition, ProjectorFunc]>; co for (let [type, version] of versions.entries()) { let def = registry.get(versionedType(type, version))! - BusEvent.define(def.type, def.properties || def.schema) + BusEvent.define(def.type, def.properties) } // Freeze the system so it clearly errors if events are defined @@ -69,22 +82,37 @@ export function versionedType(type: string, version?: number) { return version ? `${type}.${version}` : type } +function struct(value: StructLike) { + return (Schema.isSchema(value) ? value : Schema.Struct(value as Fields)) as Schema.Struct +} + export function define< Type extends string, Agg extends string, - Schema extends ZodObject>>, - BusSchema extends ZodObject = Schema, ->(input: { type: Type; version: number; aggregate: Agg; schema: Schema; busSchema?: BusSchema }) { + Fields extends Schema.Struct.Fields & Record, + BusFields extends Schema.Struct.Fields = Fields, +>(input: { + type: Type + version: number + aggregate: Agg + schema: StructLike + busSchema?: StructLike +}): DefinedEvent, Schema.Struct> { if (frozen) { throw new Error("Error defining sync event: sync system has been frozen") } - const def = { + const schema = struct(input.schema) + const busSchema = (input.busSchema ? struct(input.busSchema) : schema) as Schema.Struct + const properties = zod(busSchema) as unknown as z.ZodType> + + const def: DefinedEvent = { type: input.type, version: input.version, aggregate: input.aggregate, - schema: input.schema, - properties: input.busSchema ? input.busSchema : input.schema, + schema, + busSchema, + properties, } versions.set(def.type, Math.max(def.version, versions.get(def.type) || 0)) @@ -143,10 +171,10 @@ function process(def: Def, event: Event, options: { const result = convertEvent(def.type, event.data) if (result instanceof Promise) { void result.then((data) => { - void ProjectBus.publish({ type: def.type, properties: def.schema }, data) + void ProjectBus.publish({ type: def.type, properties: def.properties }, data) }) } else { - void ProjectBus.publish({ type: def.type, properties: def.schema }, result) + void ProjectBus.publish({ type: def.type, properties: def.properties }, result) } GlobalBus.emit("event", { @@ -266,7 +294,7 @@ export function payloads() { id: z.string(), seq: z.number(), aggregateID: z.literal(def.aggregate), - data: def.schema, + data: zod(def.schema), }) .meta({ ref: `SyncEvent.${def.type}`, diff --git a/packages/opencode/src/util/effect-zod.ts b/packages/opencode/src/util/effect-zod.ts index edbbf4d542c9..e949b255f545 100644 --- a/packages/opencode/src/util/effect-zod.ts +++ b/packages/opencode/src/util/effect-zod.ts @@ -49,6 +49,14 @@ function isZodType(value: unknown): value is z.ZodTypeAny { return typeof value === "object" && value !== null && "_zod" in value } +// Bridge a Zod-first schema into Effect Schema while preserving the original +// Zod for downstream JSON Schema/OpenAPI generation. +export function fromZod(value: T) { + return Schema.declare((input): input is z.output => value.safeParse(input).success).annotate({ + [ZodOverride]: value, + }) +} + function walk(ast: SchemaAST.AST): z.ZodTypeAny { const cached = walkCache.get(ast) if (cached) return cached diff --git a/packages/opencode/test/bus/bus.test.ts b/packages/opencode/test/bus/bus.test.ts index 3df179787dd3..7cde38a01426 100644 --- a/packages/opencode/test/bus/bus.test.ts +++ b/packages/opencode/test/bus/bus.test.ts @@ -1,4 +1,5 @@ import { afterEach, describe, expect, test } from "bun:test" +import { Schema } from "effect" import z from "zod" import { Bus } from "../../src/bus" import { BusEvent } from "../../src/bus/bus-event" @@ -10,6 +11,8 @@ const TestEvent = { Pong: BusEvent.define("test.pong", z.object({ message: z.string() })), } +const EffectTestEvent = BusEvent.define("test.effect-schema.ping", Schema.Struct({ value: Schema.Number })) + function withInstance(directory: string, fn: () => Promise) { return Instance.provide({ directory, fn }) } @@ -76,6 +79,22 @@ describe("Bus", () => { await Bus.publish(TestEvent.Ping, { value: 1 }) }) }) + + test("accepts Effect Schema event definitions", async () => { + await using tmp = await tmpdir() + const received: number[] = [] + + await withInstance(tmp.path, async () => { + Bus.subscribe(EffectTestEvent, (evt) => { + received.push(evt.properties.value) + }) + await Bun.sleep(10) + await Bus.publish(EffectTestEvent, { value: 42 }) + await Bun.sleep(10) + }) + + expect(received).toEqual([42]) + }) }) describe("unsubscribe", () => { diff --git a/packages/opencode/test/sync/index.test.ts b/packages/opencode/test/sync/index.test.ts index 866bcaa31ad0..9c9ff3081894 100644 --- a/packages/opencode/test/sync/index.test.ts +++ b/packages/opencode/test/sync/index.test.ts @@ -1,4 +1,5 @@ import { describe, test, expect, beforeEach, afterEach, afterAll } from "bun:test" +import { Schema } from "effect" import { tmpdir } from "../fixture/fixture" import z from "zod" import { Bus } from "../../src/bus" @@ -43,13 +44,13 @@ describe("SyncEvent", () => { type: "item.created", version: 1, aggregate: "id", - schema: z.object({ id: z.string(), name: z.string() }), + schema: { id: Schema.String, name: Schema.String }, }) const Sent = SyncEvent.define({ type: "item.sent", version: 1, aggregate: "item_id", - schema: z.object({ item_id: z.string(), to: z.string() }), + schema: { item_id: Schema.String, to: Schema.String }, }) SyncEvent.init({ @@ -128,6 +129,51 @@ describe("SyncEvent", () => { }) }), ) + + test( + "accepts Effect Schema event definitions", + withInstance(async () => { + SyncEvent.reset() + try { + const Created = SyncEvent.define({ + type: "item.effect.created", + version: 1, + aggregate: "id", + schema: { id: Schema.String, name: Schema.String }, + }) + + SyncEvent.init({ + projectors: [SyncEvent.project(Created, () => {})], + }) + + const events: Array<{ + type: string + properties: { id: string; name: string } + }> = [] + const received = new Promise((resolve) => { + Bus.subscribeAll((event) => { + events.push(event) + resolve() + }) + }) + + SyncEvent.run(Created, { id: "evt_1", name: "schema" }) + + await received + expect(events).toEqual([ + { + type: "item.effect.created", + properties: { + id: "evt_1", + name: "schema", + }, + }, + ]) + } finally { + setup() + } + }), + ) }) describe("replay", () => {