Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 21 additions & 6 deletions packages/opencode/src/bus/bus-event.ts
Original file line number Diff line number Diff line change
@@ -1,16 +1,31 @@
import z from "zod"
import type { ZodType } from "zod"
import { Schema, Types } from "effect"
import { zod } from "@/util/effect-zod"

export type Definition = ReturnType<typeof define>

const registry = new Map<string, Definition>()

export function define<Type extends string, Properties extends ZodType>(type: Type, properties: Properties) {
const result = {
type,
properties,
}
registry.set(type, result)
/**
* Define a bus event type with a payload schema.
*
* Accepts either a Zod schema or an Effect Schema. Effect Schemas are
* converted to Zod internally via the effect-zod walker so that the bus
* continues to use Zod as the lingua franca for serialization/validation.
*/
export function define<Type extends string, P extends Schema.Top>(
type: Type,
properties: P,
): { type: Type; properties: z.ZodType<Types.DeepMutable<Schema.Schema.Type<P>>> }
export function define<Type extends string, P extends ZodType>(
type: Type,
properties: P,
): { type: Type; properties: P }
export function define(type: string, properties: unknown) {
const zodProperties = Schema.isSchema(properties) ? zod(properties) : (properties as ZodType)
const result = { type, properties: zodProperties }
registry.set(type, result as Definition)
return result
}

Expand Down
8 changes: 5 additions & 3 deletions packages/opencode/src/server/projectors.ts
Original file line number Diff line number Diff line change
@@ -1,16 +1,18 @@
import z from "zod"
import { Schema } from "effect"
import sessionProjectors from "../session/projectors"
import { SyncEvent } from "@/sync"
import { Session } from "@/session"
import { SessionTable } from "@/session/session.sql"
import { Database, eq } from "@/storage"

const isSessionUpdated = Schema.is(Session.Event.Updated.schema)

export function initProjectors() {
SyncEvent.init({
projectors: sessionProjectors,
convertEvent: (type, data) => {
if (type === "session.updated") {
const id = (data as z.infer<typeof Session.Event.Updated.schema>).sessionID
if (type === Session.Event.Updated.type && isSessionUpdated(data)) {
const id = data.sessionID
const row = Database.use((db) => db.select().from(SessionTable).where(eq(SessionTable.id, id)).get())

if (!row) return data
Expand Down
40 changes: 20 additions & 20 deletions packages/opencode/src/session/message-v2.ts
Original file line number Diff line number Diff line change
Expand Up @@ -581,48 +581,48 @@ export const Event = {
type: "message.updated",
version: 1,
aggregate: "sessionID",
schema: z.object({
sessionID: SessionID.zod,
info: Info.zod,
schema: Schema.Struct({
sessionID: SessionID,
info: _Info,
}),
}),
Removed: SyncEvent.define({
type: "message.removed",
version: 1,
aggregate: "sessionID",
schema: z.object({
sessionID: SessionID.zod,
messageID: MessageID.zod,
schema: Schema.Struct({
sessionID: SessionID,
messageID: MessageID,
}),
}),
PartUpdated: SyncEvent.define({
type: "message.part.updated",
version: 1,
aggregate: "sessionID",
schema: z.object({
sessionID: SessionID.zod,
part: Part.zod,
time: z.number(),
schema: Schema.Struct({
sessionID: SessionID,
part: _Part,
time: Schema.Number,
}),
}),
PartDelta: BusEvent.define(
"message.part.delta",
z.object({
sessionID: SessionID.zod,
messageID: MessageID.zod,
partID: PartID.zod,
field: z.string(),
delta: z.string(),
Schema.Struct({
sessionID: SessionID,
messageID: MessageID,
partID: PartID,
field: Schema.String,
delta: Schema.String,
}),
),
PartRemoved: SyncEvent.define({
type: "message.part.removed",
version: 1,
aggregate: "sessionID",
schema: z.object({
sessionID: SessionID.zod,
messageID: MessageID.zod,
partID: PartID.zod,
schema: Schema.Struct({
sessionID: SessionID,
messageID: MessageID,
partID: PartID,
}),
}),
}
Expand Down
46 changes: 25 additions & 21 deletions packages/opencode/src/session/session.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import type { Provider } from "@/provider"
import { Permission } from "@/permission"
import { Global } from "@/global"
import { Effect, Layer, Option, Context, Schema, Types } from "effect"
import { zod, zodObject } from "@/util/effect-zod"
import { fromZod, zod, zodObject } from "@/util/effect-zod"
import { withStatics } from "@/util/schema"

const log = Log.create({ service: "session" })
Expand Down Expand Up @@ -215,40 +215,44 @@ export const MessagesInput = Schema.Struct({
limit: Schema.optional(Schema.Number),
}).pipe(withStatics((s) => ({ zod: zod(s) })))

const SessionUpdateInfoSchema = fromZod(
updateSchema(zodObject(Info)).extend({
share: updateSchema(zodObject(Share)).optional(),
time: updateSchema(zodObject(Time)).optional(),
}),
)

export const Event = {
Created: SyncEvent.define({
type: "session.created",
version: 1,
aggregate: "sessionID",
schema: z.object({
sessionID: SessionID.zod,
info: Info.zod,
}),
schema: {
sessionID: SessionID,
info: Info,
},
}),
Updated: SyncEvent.define({
type: "session.updated",
version: 1,
aggregate: "sessionID",
schema: z.object({
sessionID: SessionID.zod,
info: updateSchema(zodObject(Info)).extend({
share: updateSchema(zodObject(Share)).optional(),
time: updateSchema(zodObject(Time)).optional(),
}),
}),
busSchema: z.object({
sessionID: SessionID.zod,
info: Info.zod,
}),
schema: {
sessionID: SessionID,
info: SessionUpdateInfoSchema,
},
busSchema: {
sessionID: SessionID,
info: Info,
},
}),
Deleted: SyncEvent.define({
type: "session.deleted",
version: 1,
aggregate: "sessionID",
schema: z.object({
sessionID: SessionID.zod,
info: Info.zod,
}),
schema: {
sessionID: SessionID,
info: Info,
},
}),
Diff: BusEvent.define(
"session.diff",
Expand Down Expand Up @@ -394,7 +398,7 @@ export interface Interface {

export class Service extends Context.Service<Service, Interface>()("@opencode/Session") {}

type Patch = z.infer<typeof Event.Updated.schema>["info"]
type Patch = Schema.Schema.Type<typeof Event.Updated.schema>["info"]

const db = <T>(fn: (d: Parameters<typeof Database.use>[0] extends (trx: infer D) => any ? D : never) => T) =>
Effect.sync(() => Database.use(fn))
Expand Down
60 changes: 44 additions & 16 deletions packages/opencode/src/sync/index.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import z from "zod"
import type { ZodObject } from "zod"
import { Schema, Types } from "effect"
import { Database, eq } from "@/storage"
import { GlobalBus } from "@/bus/global"
import { Bus as ProjectBus } from "@/bus"
Expand All @@ -9,23 +9,36 @@ import { EventSequenceTable, EventTable } from "./event.sql"
import { WorkspaceContext } from "@/control-plane/workspace-context"
import { EventID } from "./schema"
import { Flag } from "@/flag/flag"
import { zod } from "@/util/effect-zod"

type StructLike<Fields extends Schema.Struct.Fields> = Fields | Schema.Struct<Fields>

export type Definition = {
type: string
version: number
aggregate: string
schema: z.ZodObject
schema: Schema.Top
busSchema: Schema.Top
properties: z.ZodTypeAny
}

// This is temporary and only exists for compatibility with bus
// event definitions
properties: z.ZodObject
type MutableType<S extends Schema.Top> = Types.DeepMutable<Schema.Schema.Type<S>>

type DefinedEvent<Type extends string, Agg extends string, SchemaDef extends Schema.Top, BusDef extends Schema.Top> = Definition & {
type: Type
aggregate: Agg
schema: SchemaDef
busSchema: BusDef
properties: z.ZodType<MutableType<BusDef>>
}

type Data<Def extends Definition> = MutableType<Def["schema"]>

export type Event<Def extends Definition = Definition> = {
id: string
seq: number
aggregateID: string
data: z.infer<Def["schema"]>
data: Data<Def>
}

export type SerializedEvent<Def extends Definition = Definition> = Event<Def> & { type: string }
Expand Down Expand Up @@ -54,7 +67,7 @@ export function init(input: { projectors: Array<[Definition, ProjectorFunc]>; co
for (let [type, version] of versions.entries()) {
let def = registry.get(versionedType(type, version))!

BusEvent.define(def.type, def.properties || def.schema)
BusEvent.define(def.type, def.properties)
}

// Freeze the system so it clearly errors if events are defined
Expand All @@ -69,22 +82,37 @@ export function versionedType(type: string, version?: number) {
return version ? `${type}.${version}` : type
}

function struct<Fields extends Schema.Struct.Fields>(value: StructLike<Fields>) {
return (Schema.isSchema(value) ? value : Schema.Struct(value as Fields)) as Schema.Struct<Fields>
}

export function define<
Type extends string,
Agg extends string,
Schema extends ZodObject<Record<Agg, z.ZodType<string>>>,
BusSchema extends ZodObject = Schema,
>(input: { type: Type; version: number; aggregate: Agg; schema: Schema; busSchema?: BusSchema }) {
Fields extends Schema.Struct.Fields & Record<Agg, Schema.Top>,
BusFields extends Schema.Struct.Fields = Fields,
>(input: {
type: Type
version: number
aggregate: Agg
schema: StructLike<Fields>
busSchema?: StructLike<BusFields>
}): DefinedEvent<Type, Agg, Schema.Struct<Fields>, Schema.Struct<BusFields>> {
if (frozen) {
throw new Error("Error defining sync event: sync system has been frozen")
}

const def = {
const schema = struct(input.schema)
const busSchema = (input.busSchema ? struct(input.busSchema) : schema) as Schema.Struct<BusFields>
const properties = zod(busSchema) as unknown as z.ZodType<MutableType<typeof busSchema>>

const def: DefinedEvent<Type, Agg, typeof schema, typeof busSchema> = {
type: input.type,
version: input.version,
aggregate: input.aggregate,
schema: input.schema,
properties: input.busSchema ? input.busSchema : input.schema,
schema,
busSchema,
properties,
}

versions.set(def.type, Math.max(def.version, versions.get(def.type) || 0))
Expand Down Expand Up @@ -143,10 +171,10 @@ function process<Def extends Definition>(def: Def, event: Event<Def>, options: {
const result = convertEvent(def.type, event.data)
if (result instanceof Promise) {
void result.then((data) => {
void ProjectBus.publish({ type: def.type, properties: def.schema }, data)
void ProjectBus.publish({ type: def.type, properties: def.properties }, data)
})
} else {
void ProjectBus.publish({ type: def.type, properties: def.schema }, result)
void ProjectBus.publish({ type: def.type, properties: def.properties }, result)
}

GlobalBus.emit("event", {
Expand Down Expand Up @@ -266,7 +294,7 @@ export function payloads() {
id: z.string(),
seq: z.number(),
aggregateID: z.literal(def.aggregate),
data: def.schema,
data: zod(def.schema),
})
.meta({
ref: `SyncEvent.${def.type}`,
Expand Down
8 changes: 8 additions & 0 deletions packages/opencode/src/util/effect-zod.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,14 @@ function isZodType(value: unknown): value is z.ZodTypeAny {
return typeof value === "object" && value !== null && "_zod" in value
}

// Bridge a Zod-first schema into Effect Schema while preserving the original
// Zod for downstream JSON Schema/OpenAPI generation.
export function fromZod<T extends z.ZodTypeAny>(value: T) {
return Schema.declare((input): input is z.output<T> => value.safeParse(input).success).annotate({
[ZodOverride]: value,
})
}

function walk(ast: SchemaAST.AST): z.ZodTypeAny {
const cached = walkCache.get(ast)
if (cached) return cached
Expand Down
19 changes: 19 additions & 0 deletions packages/opencode/test/bus/bus.test.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { afterEach, describe, expect, test } from "bun:test"
import { Schema } from "effect"
import z from "zod"
import { Bus } from "../../src/bus"
import { BusEvent } from "../../src/bus/bus-event"
Expand All @@ -10,6 +11,8 @@ const TestEvent = {
Pong: BusEvent.define("test.pong", z.object({ message: z.string() })),
}

const EffectTestEvent = BusEvent.define("test.effect-schema.ping", Schema.Struct({ value: Schema.Number }))

function withInstance(directory: string, fn: () => Promise<void>) {
return Instance.provide({ directory, fn })
}
Expand Down Expand Up @@ -76,6 +79,22 @@ describe("Bus", () => {
await Bus.publish(TestEvent.Ping, { value: 1 })
})
})

test("accepts Effect Schema event definitions", async () => {
await using tmp = await tmpdir()
const received: number[] = []

await withInstance(tmp.path, async () => {
Bus.subscribe(EffectTestEvent, (evt) => {
received.push(evt.properties.value)
})
await Bun.sleep(10)
await Bus.publish(EffectTestEvent, { value: 42 })
await Bun.sleep(10)
})

expect(received).toEqual([42])
})
})

describe("unsubscribe", () => {
Expand Down
Loading
Loading