|
1 | | -export * as SyncEvent from "./sync-event" |
| 1 | +import z from "zod" |
| 2 | +import type { ZodObject } from "zod" |
| 3 | +import { Database, eq } from "@/storage" |
| 4 | +import { GlobalBus } from "@/bus/global" |
| 5 | +import { Bus as ProjectBus } from "@/bus" |
| 6 | +import { BusEvent } from "@/bus/bus-event" |
| 7 | +import { Instance } from "@/project/instance" |
| 8 | +import { EventSequenceTable, EventTable } from "./event.sql" |
| 9 | +import { WorkspaceContext } from "@/control-plane/workspace-context" |
| 10 | +import { EventID } from "./schema" |
| 11 | +import { Flag } from "@/flag/flag" |
| 12 | + |
| 13 | +export type Definition = { |
| 14 | + type: string |
| 15 | + version: number |
| 16 | + aggregate: string |
| 17 | + schema: z.ZodObject |
| 18 | + |
| 19 | + // This is temporary and only exists for compatibility with bus |
| 20 | + // event definitions |
| 21 | + properties: z.ZodObject |
| 22 | +} |
| 23 | + |
| 24 | +export type Event<Def extends Definition = Definition> = { |
| 25 | + id: string |
| 26 | + seq: number |
| 27 | + aggregateID: string |
| 28 | + data: z.infer<Def["schema"]> |
| 29 | +} |
| 30 | + |
| 31 | +export type SerializedEvent<Def extends Definition = Definition> = Event<Def> & { type: string } |
| 32 | + |
| 33 | +type ProjectorFunc = (db: Database.TxOrDb, data: unknown) => void |
| 34 | + |
| 35 | +export const registry = new Map<string, Definition>() |
| 36 | +let projectors: Map<Definition, ProjectorFunc> | undefined |
| 37 | +const versions = new Map<string, number>() |
| 38 | +let frozen = false |
| 39 | +let convertEvent: (type: string, event: Event["data"]) => Promise<Record<string, unknown>> | Record<string, unknown> |
| 40 | + |
| 41 | +export function reset() { |
| 42 | + frozen = false |
| 43 | + projectors = undefined |
| 44 | + convertEvent = (_, data) => data |
| 45 | +} |
| 46 | + |
| 47 | +export function init(input: { projectors: Array<[Definition, ProjectorFunc]>; convertEvent?: typeof convertEvent }) { |
| 48 | + projectors = new Map(input.projectors) |
| 49 | + |
| 50 | + // Install all the latest event defs to the bus. We only ever emit |
| 51 | + // latest versions from code, and keep around old versions for |
| 52 | + // replaying. Replaying does not go through the bus, and it |
| 53 | + // simplifies the bus to only use unversioned latest events |
| 54 | + for (let [type, version] of versions.entries()) { |
| 55 | + let def = registry.get(versionedType(type, version))! |
| 56 | + |
| 57 | + BusEvent.define(def.type, def.properties || def.schema) |
| 58 | + } |
| 59 | + |
| 60 | + // Freeze the system so it clearly errors if events are defined |
| 61 | + // after `init` which would cause bugs |
| 62 | + frozen = true |
| 63 | + convertEvent = input.convertEvent || ((_, data) => data) |
| 64 | +} |
| 65 | + |
| 66 | +export function versionedType<A extends string>(type: A): A |
| 67 | +export function versionedType<A extends string, B extends number>(type: A, version: B): `${A}/${B}` |
| 68 | +export function versionedType(type: string, version?: number) { |
| 69 | + return version ? `${type}.${version}` : type |
| 70 | +} |
| 71 | + |
| 72 | +export function define< |
| 73 | + Type extends string, |
| 74 | + Agg extends string, |
| 75 | + Schema extends ZodObject<Record<Agg, z.ZodType<string>>>, |
| 76 | + BusSchema extends ZodObject = Schema, |
| 77 | +>(input: { type: Type; version: number; aggregate: Agg; schema: Schema; busSchema?: BusSchema }) { |
| 78 | + if (frozen) { |
| 79 | + throw new Error("Error defining sync event: sync system has been frozen") |
| 80 | + } |
| 81 | + |
| 82 | + const def = { |
| 83 | + type: input.type, |
| 84 | + version: input.version, |
| 85 | + aggregate: input.aggregate, |
| 86 | + schema: input.schema, |
| 87 | + properties: input.busSchema ? input.busSchema : input.schema, |
| 88 | + } |
| 89 | + |
| 90 | + versions.set(def.type, Math.max(def.version, versions.get(def.type) || 0)) |
| 91 | + |
| 92 | + registry.set(versionedType(def.type, def.version), def) |
| 93 | + |
| 94 | + return def |
| 95 | +} |
| 96 | + |
| 97 | +export function project<Def extends Definition>( |
| 98 | + def: Def, |
| 99 | + func: (db: Database.TxOrDb, data: Event<Def>["data"]) => void, |
| 100 | +): [Definition, ProjectorFunc] { |
| 101 | + return [def, func as ProjectorFunc] |
| 102 | +} |
| 103 | + |
| 104 | +function process<Def extends Definition>(def: Def, event: Event<Def>, options: { publish: boolean }) { |
| 105 | + if (projectors == null) { |
| 106 | + throw new Error("No projectors available. Call `SyncEvent.init` to install projectors") |
| 107 | + } |
| 108 | + |
| 109 | + const projector = projectors.get(def) |
| 110 | + if (!projector) { |
| 111 | + throw new Error(`Projector not found for event: ${def.type}`) |
| 112 | + } |
| 113 | + |
| 114 | + // idempotent: need to ignore any events already logged |
| 115 | + |
| 116 | + Database.transaction((tx) => { |
| 117 | + projector(tx, event.data) |
| 118 | + |
| 119 | + if (Flag.OPENCODE_EXPERIMENTAL_WORKSPACES) { |
| 120 | + tx.insert(EventSequenceTable) |
| 121 | + .values({ |
| 122 | + aggregate_id: event.aggregateID, |
| 123 | + seq: event.seq, |
| 124 | + }) |
| 125 | + .onConflictDoUpdate({ |
| 126 | + target: EventSequenceTable.aggregate_id, |
| 127 | + set: { seq: event.seq }, |
| 128 | + }) |
| 129 | + .run() |
| 130 | + tx.insert(EventTable) |
| 131 | + .values({ |
| 132 | + id: event.id, |
| 133 | + seq: event.seq, |
| 134 | + aggregate_id: event.aggregateID, |
| 135 | + type: versionedType(def.type, def.version), |
| 136 | + data: event.data as Record<string, unknown>, |
| 137 | + }) |
| 138 | + .run() |
| 139 | + } |
| 140 | + |
| 141 | + Database.effect(() => { |
| 142 | + if (options?.publish) { |
| 143 | + const result = convertEvent(def.type, event.data) |
| 144 | + if (result instanceof Promise) { |
| 145 | + void result.then((data) => { |
| 146 | + void ProjectBus.publish({ type: def.type, properties: def.schema }, data) |
| 147 | + }) |
| 148 | + } else { |
| 149 | + void ProjectBus.publish({ type: def.type, properties: def.schema }, result) |
| 150 | + } |
| 151 | + |
| 152 | + GlobalBus.emit("event", { |
| 153 | + directory: Instance.directory, |
| 154 | + project: Instance.project.id, |
| 155 | + workspace: WorkspaceContext.workspaceID, |
| 156 | + payload: { |
| 157 | + type: "sync", |
| 158 | + syncEvent: { |
| 159 | + type: versionedType(def.type, def.version), |
| 160 | + ...event, |
| 161 | + }, |
| 162 | + }, |
| 163 | + }) |
| 164 | + } |
| 165 | + }) |
| 166 | + }) |
| 167 | +} |
| 168 | + |
| 169 | +export function replay(event: SerializedEvent, options?: { publish: boolean }) { |
| 170 | + const def = registry.get(event.type) |
| 171 | + if (!def) { |
| 172 | + throw new Error(`Unknown event type: ${event.type}`) |
| 173 | + } |
| 174 | + |
| 175 | + const row = Database.use((db) => |
| 176 | + db |
| 177 | + .select({ seq: EventSequenceTable.seq }) |
| 178 | + .from(EventSequenceTable) |
| 179 | + .where(eq(EventSequenceTable.aggregate_id, event.aggregateID)) |
| 180 | + .get(), |
| 181 | + ) |
| 182 | + |
| 183 | + const latest = row?.seq ?? -1 |
| 184 | + if (event.seq <= latest) { |
| 185 | + return |
| 186 | + } |
| 187 | + |
| 188 | + const expected = latest + 1 |
| 189 | + if (event.seq !== expected) { |
| 190 | + throw new Error(`Sequence mismatch for aggregate "${event.aggregateID}": expected ${expected}, got ${event.seq}`) |
| 191 | + } |
| 192 | + |
| 193 | + process(def, event, { publish: !!options?.publish }) |
| 194 | +} |
| 195 | + |
| 196 | +export function replayAll(events: SerializedEvent[], options?: { publish: boolean }) { |
| 197 | + const source = events[0]?.aggregateID |
| 198 | + if (!source) return |
| 199 | + if (events.some((item) => item.aggregateID !== source)) { |
| 200 | + throw new Error("Replay events must belong to the same session") |
| 201 | + } |
| 202 | + const start = events[0].seq |
| 203 | + for (const [i, item] of events.entries()) { |
| 204 | + const seq = start + i |
| 205 | + if (item.seq !== seq) { |
| 206 | + throw new Error(`Replay sequence mismatch at index ${i}: expected ${seq}, got ${item.seq}`) |
| 207 | + } |
| 208 | + } |
| 209 | + for (const item of events) { |
| 210 | + replay(item, options) |
| 211 | + } |
| 212 | + return source |
| 213 | +} |
| 214 | + |
| 215 | +export function run<Def extends Definition>(def: Def, data: Event<Def>["data"], options?: { publish?: boolean }) { |
| 216 | + const agg = (data as Record<string, string>)[def.aggregate] |
| 217 | + // This should never happen: we've enforced it via typescript in |
| 218 | + // the definition |
| 219 | + if (agg == null) { |
| 220 | + throw new Error(`SyncEvent.run: "${def.aggregate}" required but not found: ${JSON.stringify(data)}`) |
| 221 | + } |
| 222 | + |
| 223 | + if (def.version !== versions.get(def.type)) { |
| 224 | + throw new Error(`SyncEvent.run: running old versions of events is not allowed: ${def.type}`) |
| 225 | + } |
| 226 | + |
| 227 | + const { publish = true } = options || {} |
| 228 | + |
| 229 | + // Note that this is an "immediate" transaction which is critical. |
| 230 | + // We need to make sure we can safely read and write with nothing |
| 231 | + // else changing the data from under us |
| 232 | + Database.transaction( |
| 233 | + (tx) => { |
| 234 | + const id = EventID.ascending() |
| 235 | + const row = tx |
| 236 | + .select({ seq: EventSequenceTable.seq }) |
| 237 | + .from(EventSequenceTable) |
| 238 | + .where(eq(EventSequenceTable.aggregate_id, agg)) |
| 239 | + .get() |
| 240 | + const seq = row?.seq != null ? row.seq + 1 : 0 |
| 241 | + |
| 242 | + const event = { id, seq, aggregateID: agg, data } |
| 243 | + process(def, event, { publish }) |
| 244 | + }, |
| 245 | + { |
| 246 | + behavior: "immediate", |
| 247 | + }, |
| 248 | + ) |
| 249 | +} |
| 250 | + |
| 251 | +export function remove(aggregateID: string) { |
| 252 | + Database.transaction((tx) => { |
| 253 | + tx.delete(EventSequenceTable).where(eq(EventSequenceTable.aggregate_id, aggregateID)).run() |
| 254 | + tx.delete(EventTable).where(eq(EventTable.aggregate_id, aggregateID)).run() |
| 255 | + }) |
| 256 | +} |
| 257 | + |
| 258 | +export function payloads() { |
| 259 | + return registry |
| 260 | + .entries() |
| 261 | + .map(([type, def]) => { |
| 262 | + return z |
| 263 | + .object({ |
| 264 | + type: z.literal("sync"), |
| 265 | + name: z.literal(type), |
| 266 | + id: z.string(), |
| 267 | + seq: z.number(), |
| 268 | + aggregateID: z.literal(def.aggregate), |
| 269 | + data: def.schema, |
| 270 | + }) |
| 271 | + .meta({ |
| 272 | + ref: `SyncEvent.${def.type}`, |
| 273 | + }) |
| 274 | + }) |
| 275 | + .toArray() |
| 276 | +} |
| 277 | + |
| 278 | +export * as SyncEvent from "." |
0 commit comments