From db06cd4354717849417f3b7498368c52063c9721 Mon Sep 17 00:00:00 2001 From: KeKs0r Date: Wed, 27 May 2026 02:52:31 +0200 Subject: [PATCH] Add async migration operation mode --- .changeset/migration-mode-async.md | 26 ++ bun.lock | 32 +- examples/clickbench/README.md | 4 +- .../20260525133130_load_clickbench_data.sql | 19 +- examples/clickbench/package.json | 6 +- package.json | 3 +- packages/cli/src/commands/migrate/apply.ts | 23 +- .../cli/src/commands/migrate/async-apply.ts | 386 ++++++++++++++++++ packages/cli/src/runtime/journal-store.ts | 164 +++++++- packages/cli/src/runtime/safety-markers.ts | 66 ++- .../test/commands/migrate/async-apply.test.ts | 383 +++++++++++++++++ .../src/test/runtime/journal-store.test.ts | 211 ++++++++++ .../src/test/runtime/safety-markers.test.ts | 99 ++++- 13 files changed, 1366 insertions(+), 56 deletions(-) create mode 100644 .changeset/migration-mode-async.md create mode 100644 packages/cli/src/commands/migrate/async-apply.ts create mode 100644 packages/cli/src/test/commands/migrate/async-apply.test.ts create mode 100644 packages/cli/src/test/runtime/journal-store.test.ts diff --git a/.changeset/migration-mode-async.md b/.changeset/migration-mode-async.md new file mode 100644 index 0000000..7a3ab12 --- /dev/null +++ b/.changeset/migration-mode-async.md @@ -0,0 +1,26 @@ +--- +"chkit": patch +--- + +Add `mode=async` annotation for long-running migration operations. + +Mark an operation as async by adding `mode=async` to its `-- operation:` header line, for example: + +```sql +-- operation: load_table_data key=table:default.hits risk=caution mode=async +INSERT INTO default.hits SELECT * FROM s3(...); +``` + +When `chkit migrate --apply` encounters an async operation it: + +1. Computes a deterministic `query_id` from `sha256(migration_filename + ':' + statement_index)`. +2. Checks `system.processes` / `system.query_log` for any prior attempt with that id. +3. Fires the INSERT via the existing `submit()` path without blocking on its HTTP response, and polls `queryStatus(query_id)` every 5 seconds — printing a one-line update (`written=N.NM rows (N.N GiB), elapsed Ns`) so the operator sees the load advance. +4. On `QueryFinish` → records the journal entry and proceeds. On `ExceptionWhileProcessing` → throws with the server's exception. On any prior run's failure → resubmits (retry semantics). + +This unblocks two scenarios chkit could not previously handle: + +- **Long INSERTs through a proxy/LB with an HTTP request-duration ceiling**: the operator sees progress, and a connection drop mid-poll no longer cancels the work — the deterministic id lets a re-run attach to the in-flight query on the server. +- **Transient client-side errors during a multi-minute load**: re-running chkit picks up where it left off rather than starting over. + +Existing migrations without `mode=async` continue to use the synchronous path; the annotation is opt-in and forward-compatible (an unknown mode value falls back to sync). diff --git a/bun.lock b/bun.lock index 9eeb8a4..9768a4a 100644 --- a/bun.lock +++ b/bun.lock @@ -19,7 +19,7 @@ }, "apps/docs": { "name": "@chkit/docs", - "version": "0.0.2-beta.13", + "version": "0.0.2-beta.15", "dependencies": { "@astrojs/starlight": "^0.37.6", "astro": "^5.6.1", @@ -29,9 +29,17 @@ "wrangler": "^4.65.0", }, }, + "examples/clickbench": { + "name": "chkit-example-clickbench", + "devDependencies": { + "@chkit/core": "0.1.0-beta.26", + "@chkit/plugin-obsessiondb": "0.1.0-beta.26", + "chkit": "0.1.0-beta.26", + }, + }, "packages/cli": { "name": "chkit", - "version": "0.1.0-beta.24", + "version": "0.1.0-beta.26", "bin": { "chkit": "./dist/bin/chkit.js", }, @@ -48,7 +56,7 @@ }, "packages/clickhouse": { "name": "@chkit/clickhouse", - "version": "0.1.0-beta.24", + "version": "0.1.0-beta.26", "dependencies": { "@chkit/core": "workspace:*", "@clickhouse/client": "^1.11.0", @@ -58,14 +66,14 @@ }, "packages/codegen": { "name": "@chkit/codegen", - "version": "0.1.0-beta.24", + "version": "0.1.0-beta.26", "dependencies": { - "@chkit/core": "workspace:*", + "@chkit/core": "0.1.0-beta.26", }, }, "packages/core": { "name": "@chkit/core", - "version": "0.1.0-beta.24", + "version": "0.1.0-beta.26", "dependencies": { "fast-glob": "^3.3.2", }, @@ -75,7 +83,7 @@ }, "packages/create-chkit": { "name": "create-chkit", - "version": "0.1.0-beta.24", + "version": "0.1.0-beta.25", "bin": { "create-chkit": "./dist/bin/create-chkit.js", }, @@ -91,7 +99,7 @@ }, "packages/plugin-backfill": { "name": "@chkit/plugin-backfill", - "version": "0.1.0-beta.24", + "version": "0.1.0-beta.26", "dependencies": { "@chkit/clickhouse": "workspace:*", "@chkit/core": "workspace:*", @@ -102,7 +110,7 @@ }, "packages/plugin-codegen": { "name": "@chkit/plugin-codegen", - "version": "0.1.0-beta.24", + "version": "0.1.0-beta.26", "dependencies": { "@chkit/core": "workspace:*", "zod": "^4.3.6", @@ -110,7 +118,7 @@ }, "packages/plugin-obsessiondb": { "name": "@chkit/plugin-obsessiondb", - "version": "0.1.0-beta.24", + "version": "0.1.0-beta.26", "dependencies": { "@chkit/clickhouse": "workspace:*", "@chkit/core": "workspace:*", @@ -121,7 +129,7 @@ }, "packages/plugin-pull": { "name": "@chkit/plugin-pull", - "version": "0.1.0-beta.24", + "version": "0.1.0-beta.26", "dependencies": { "@chkit/clickhouse": "workspace:*", "@chkit/core": "workspace:*", @@ -598,6 +606,8 @@ "chkit": ["chkit@workspace:packages/cli"], + "chkit-example-clickbench": ["chkit-example-clickbench@workspace:examples/clickbench"], + "chokidar": ["chokidar@5.0.0", "", { "dependencies": { "readdirp": "^5.0.0" } }, "sha512-TQMmc3w+5AxjpL8iIiwebF73dRDF4fBIieAqGn9RGCWaEVwQ6Fb2cGe31Yns0RRIzii5goJ1Y7xbMwo1TxMplw=="], "ci-info": ["ci-info@3.9.0", "", {}, "sha512-NIxF55hv4nSqQswkAeiOi1r83xy8JldOFDTWiug55KBu9Jnblncd2U6ViHmYgHf01TPZS77NJBhBMKdWj9HQMQ=="], diff --git a/examples/clickbench/README.md b/examples/clickbench/README.md index 1685981..6981750 100644 --- a/examples/clickbench/README.md +++ b/examples/clickbench/README.md @@ -1,6 +1,6 @@ # ClickBench CHKit example -This example creates the ClickBench `hits` schema and loads the full public ClickBench dataset from `datasets.clickhouse.com`. +This example creates the ClickBench `hits` schema and loads the full public ClickBench dataset from the ObsessionDB-hosted mirror at `fsn1.your-objectstorage.com/obsessiondb-datasets/clickbench/`. The data load migration truncates `default.hits` before inserting so an interrupted load can be retried by clearing the migration journal entry or using a fresh database. Run it against a disposable ClickHouse database. @@ -25,6 +25,6 @@ bun run migrate -- --service ## Migrations - `20260525133129_create_clickbench_schema.sql` creates the ClickBench `hits` table. -- `20260525133130_load_clickbench_data.sql` truncates `hits` and loads the full partitioned Parquet dataset from `https://datasets.clickhouse.com/hits_compatible/athena_partitioned/` via ClickHouse's `url()` table function. +- `20260525133130_load_clickbench_data.sql` truncates `hits` and loads the full partitioned Parquet dataset from `https://fsn1.your-objectstorage.com/obsessiondb-datasets/clickbench/` via ClickHouse's `s3()` table function. The benchmark query set is intentionally not included yet; this example focuses on schema creation and dataset loading. diff --git a/examples/clickbench/chkit/migrations/20260525133130_load_clickbench_data.sql b/examples/clickbench/chkit/migrations/20260525133130_load_clickbench_data.sql index 0afa209..9c68e04 100644 --- a/examples/clickbench/chkit/migrations/20260525133130_load_clickbench_data.sql +++ b/examples/clickbench/chkit/migrations/20260525133130_load_clickbench_data.sql @@ -11,9 +11,13 @@ TRUNCATE TABLE default.hits SETTINGS max_table_size_to_drop = 0, max_partition_size_to_drop = 0; -- Load the full ClickBench dataset (100 Parquet files, ~100M rows) in a single --- INSERT. We use the s3() table function against the public dataset bucket --- (datasets.clickhouse.com is a CloudFront alias for clickhouse-public-datasets) --- because s3() does native partitioned-Parquet parallelism that url() does not. +-- INSERT. We use the s3() table function against the ObsessionDB-hosted mirror +-- of the dataset (Hetzner Object Storage, FSN1 region) because s3() does +-- native partitioned-Parquet parallelism that url() does not. +-- +-- The executable TRUNCATE above clears the target before the first attempt. +-- The `before-retry:` line below repeats that compensation before any retry +-- where a prior async INSERT may have partially loaded rows. -- -- Tuning (measured against an ObsessionDB customer-benchmark instance): -- * max_download_threads = 64, max_insert_threads = 16 lands at ~178s for @@ -26,15 +30,16 @@ TRUNCATE TABLE default.hits SETTINGS max_table_size_to_drop = 0, max_partition_s -- * max_execution_time = 0 lifts the server-side query timer (the load is -- intentionally long-running). --- operation: load_table_data key=table:default.hits risk=caution +-- operation: load_table_data key=table:default.hits risk=caution mode=async +-- before-retry: TRUNCATE TABLE default.hits SETTINGS max_table_size_to_drop = 0, max_partition_size_to_drop = 0; INSERT INTO default.hits SELECT * FROM s3( - 'https://clickhouse-public-datasets.s3.amazonaws.com/hits_compatible/athena_partitioned/hits_{0..99}.parquet', + 'https://fsn1.your-objectstorage.com/obsessiondb-datasets/clickbench/hits_{0..99}.parquet', 'Parquet' ) SETTINGS max_execution_time = 0, max_memory_usage = 6500000000, - max_download_threads = 64, - max_insert_threads = 16; + max_download_threads = 2, + max_insert_threads = 2; diff --git a/examples/clickbench/package.json b/examples/clickbench/package.json index 3159487..2d40533 100644 --- a/examples/clickbench/package.json +++ b/examples/clickbench/package.json @@ -9,8 +9,8 @@ "check": "chkit check" }, "devDependencies": { - "@chkit/core": "0.1.0-beta.24", - "@chkit/plugin-obsessiondb": "0.1.0-beta.24", - "chkit": "0.1.0-beta.24" + "@chkit/core": "0.1.0-beta.26", + "@chkit/plugin-obsessiondb": "0.1.0-beta.26", + "chkit": "0.1.0-beta.26" } } diff --git a/package.json b/package.json index e6ce9cd..7201643 100644 --- a/package.json +++ b/package.json @@ -13,7 +13,8 @@ "packageManager": "bun@1.3.5", "workspaces": [ "packages/*", - "apps/*" + "apps/*", + "examples/clickbench" ], "scripts": { "build": "turbo run build", diff --git a/packages/cli/src/commands/migrate/apply.ts b/packages/cli/src/commands/migrate/apply.ts index fc1895a..a17fd62 100644 --- a/packages/cli/src/commands/migrate/apply.ts +++ b/packages/cli/src/commands/migrate/apply.ts @@ -12,6 +12,7 @@ import { extractExecutableStatements, extractMigrationOperationSummaries, } from '../../runtime/safety-markers.js' +import { applyAsyncStatement } from './async-apply.js' type JournalStore = ReturnType @@ -43,10 +44,28 @@ export async function applyMigration(input: { statements: parsedStatements, }) + const migrationChecksum = checksumSQL(sql) + for (let i = 0; i < statements.length; i++) { const statement = statements[i] as string - await db.command(statement) const operation = operationSummaries[i] + if (operation?.mode === 'async') { + await applyAsyncStatement({ + db, + journalStore, + sql: statement, + migrationName: file, + migrationChecksum, + statementIndex: i, + operationType: operation.type, + operationKey: operation.key, + beforeRetry: operation.beforeRetry, + log: (line) => console.log(line), + }) + // Async ops are DML (loads, backfills) — no DDL propagation to wait on. + continue + } + await db.command(statement) if (operation) { await waitForDDLPropagation(db, operation.type, operation.key) } @@ -55,7 +74,7 @@ export async function applyMigration(input: { const entry: MigrationJournalEntry = { name: file, appliedAt: new Date().toISOString().replace('Z', ''), - checksum: checksumSQL(sql), + checksum: migrationChecksum, } await journalStore.appendEntry(entry) diff --git a/packages/cli/src/commands/migrate/async-apply.ts b/packages/cli/src/commands/migrate/async-apply.ts new file mode 100644 index 0000000..1cbcde3 --- /dev/null +++ b/packages/cli/src/commands/migrate/async-apply.ts @@ -0,0 +1,386 @@ +import { createHash } from 'node:crypto' + +import type { ClickHouseExecutor, QueryStatus } from '@chkit/clickhouse' + +import { debug } from '../../runtime/debug.js' +import type { + JournalStore, + MigrationRowState, + OperationState, +} from '../../runtime/journal-store.js' + +const POLL_INTERVAL_MS = 5_000 + +export interface AsyncApplyInput { + db: ClickHouseExecutor + journalStore: JournalStore + sql: string + migrationName: string + migrationChecksum: string + statementIndex: number + operationType: string + operationKey: string + beforeRetry: string | null + log: (line: string) => void + pollIntervalMs?: number + sleep?: (ms: number) => Promise + now?: () => number +} + +export type AsyncApplyResult = + | { kind: 'completed'; operation: OperationState } + | { kind: 'skipped'; operation: OperationState } + +export async function applyAsyncStatement(input: AsyncApplyInput): Promise { + const { + db, + journalStore, + sql, + migrationName, + migrationChecksum, + statementIndex, + operationType, + operationKey, + beforeRetry, + log, + pollIntervalMs = POLL_INTERVAL_MS, + sleep = defaultSleep, + now = Date.now, + } = input + + const queryId = makeDeterministicQueryId(migrationName, statementIndex) + debug( + 'migrate:async', + `${migrationName}#${statementIndex} query_id=${queryId} type=${operationType}`, + ) + + const initialMigrationState = await journalStore.readMigrationState(migrationName) + if ( + initialMigrationState !== null && + !initialMigrationState.migrationCompleted && + initialMigrationState.checksum !== migrationChecksum + ) { + throw new Error( + `Migration ${migrationName} has in-progress async journal state for checksum ${initialMigrationState.checksum}, but the current file checksum is ${migrationChecksum}. Restore the original migration file or clear the in-progress journal state before retrying.`, + ) + } + const priorOpState = initialMigrationState?.operations.find( + (op) => op.operationIndex === statementIndex, + ) + + // 1. Already completed → skip entirely + if (priorOpState?.status === 'completed') { + log( + ` ${operationType}: query_id=${queryId} already completed in prior run — skipping`, + ) + return { kind: 'skipped', operation: priorOpState } + } + + // 2. Currently in flight on the server → attach + const inFlight = await db.queryStatus(queryId) + if (inFlight.status === 'running') { + log( + ` ${operationType}: query_id=${queryId} already running on server — attaching to in-flight query`, + ) + return await pollUntilTerminal({ + db, + journalStore, + migrationState: initialMigrationState, + migrationName, + migrationChecksum, + statementIndex, + operationType, + operationKey, + queryId, + pollAfterTime: '1970-01-01 00:00:00', + log, + pollIntervalMs, + sleep, + now, + // No submit promise — query was started by a prior chkit run that's + // now gone. We only observe its eventual completion. + submitPromise: null, + startedAt: priorOpState?.startedAt ?? isoWithoutZone(new Date(now())), + }) + } + + // 3. Prior in-progress row exists but query is not in system.processes → + // RETRY. Run before-retry compensation, then resubmit forward. + // 4. No prior row → FIRST attempt. Skip compensation, submit forward. + // (priorOpState.status === 'completed' was already returned above.) + if (priorOpState !== undefined) { + const errorTail = priorOpState.lastError + ? `: ${firstLine(priorOpState.lastError)}` + : '' + log( + ` ${operationType}: previous attempt of query_id=${queryId} is no longer running (status=${priorOpState.status}${errorTail}) — running before-retry then resubmitting`, + ) + if (beforeRetry !== null) { + log(` ${operationType}: running before-retry SQL`) + await db.command(beforeRetry) + } + } else { + log(` ${operationType}: submitting async (query_id=${queryId})`) + } + + const submitAfterTime = + priorOpState === undefined ? undefined : isoWithoutZone(new Date(now() - 60_000)) + const startedAt = isoWithoutZone(new Date(now())) + + // Persist the "started" intent BEFORE submitting, so a crash between here + // and the next event still leaves chkit able to detect "this op was tried." + await journalStore.writeMigrationState( + upsertOperation( + initialMigrationState ?? freshMigrationState(migrationName, migrationChecksum), + { + operationIndex: statementIndex, + operationKey, + operationType, + queryId, + status: 'started', + startedAt, + finishedAt: null, + lastError: '', + }, + now, + ), + ) + + // Re-read so we have the row's actual stored applied_at (matters for + // subsequent ReplacingMergeTree writes during polling). + const stateAfterStart = await journalStore.readMigrationState(migrationName) + + const submitPromise = db.submit(sql, queryId) + + return await pollUntilTerminal({ + db, + journalStore, + migrationState: stateAfterStart, + migrationName, + migrationChecksum, + statementIndex, + operationType, + operationKey, + queryId, + pollAfterTime: submitAfterTime, + log, + pollIntervalMs, + sleep, + now, + submitPromise, + startedAt, + }) +} + +interface PollUntilTerminalInput { + db: ClickHouseExecutor + journalStore: JournalStore + migrationState: MigrationRowState | null + migrationName: string + migrationChecksum: string + statementIndex: number + operationType: string + operationKey: string + queryId: string + pollAfterTime: string | undefined + log: (line: string) => void + pollIntervalMs: number + sleep: (ms: number) => Promise + now: () => number + submitPromise: Promise | null + startedAt: string +} + +async function pollUntilTerminal(input: PollUntilTerminalInput): Promise { + const { + db, + journalStore, + migrationState, + migrationName, + migrationChecksum, + statementIndex, + operationType, + operationKey, + queryId, + pollAfterTime, + log, + pollIntervalMs, + sleep, + now, + submitPromise, + startedAt, + } = input + + let submitError: Error | null = null + // Capture the submit promise rejection so it doesn't surface as an + // unhandled rejection. Polling is the source of truth for outcome. + const submitPromiseGuarded = + submitPromise === null + ? null + : submitPromise.catch((error: unknown) => { + submitError = error instanceof Error ? error : new Error(String(error)) + }) + + const pollStartedAt = now() + try { + for (;;) { + await sleep(pollIntervalMs) + const status = await db.queryStatus( + queryId, + pollAfterTime === undefined ? undefined : { afterTime: pollAfterTime }, + ) + const elapsedSec = Math.floor((now() - pollStartedAt) / 1000) + + if (status.status === 'finished') { + const finishedSec = Math.round((status.durationMs ?? 0) / 1000) + const finishedOp: OperationState = { + operationIndex: statementIndex, + operationKey, + operationType, + queryId, + status: 'completed', + startedAt, + finishedAt: isoWithoutZone(new Date(now())), + lastError: '', + } + const baseState = + migrationState ?? freshMigrationState(migrationName, migrationChecksum) + await journalStore.writeMigrationState( + upsertOperation(baseState, finishedOp, now), + ) + log( + ` ${operationType}: finished — written=${formatRows(status.writtenRows)} (${formatBytes(status.writtenBytes)}) in ${finishedSec}s`, + ) + return { kind: 'completed', operation: finishedOp } + } + + if (status.status === 'failed') { + const failedOp: OperationState = { + operationIndex: statementIndex, + operationKey, + operationType, + queryId, + status: 'failed', + startedAt, + finishedAt: isoWithoutZone(new Date(now())), + lastError: status.error ?? '', + } + const baseState = + migrationState ?? freshMigrationState(migrationName, migrationChecksum) + await journalStore.writeMigrationState( + upsertOperation(baseState, failedOp, now), + ) + throw new Error( + `Async migration step ${operationType} failed (query_id ${queryId}): ${status.error ?? ''}`, + ) + } + + if (status.status === 'running') { + log(progressLine(operationType, status, elapsedSec)) + continue + } + + // status === 'unknown' + // If submit has already rejected, the query never made it server-side + // (e.g. SQL parse error) — surface that error. Otherwise it's a + // transient gap (just-submitted or just-finished); loop. + if (submitError) { + throw submitError + } + log( + ` ${operationType}: status unknown — still polling (elapsed ${elapsedSec}s)`, + ) + } + } finally { + if (submitPromiseGuarded) { + await submitPromiseGuarded.catch(() => {}) + } + } +} + +function upsertOperation( + state: MigrationRowState, + op: OperationState, + now: () => number, +): MigrationRowState { + const others = state.operations.filter( + (existing) => existing.operationIndex !== op.operationIndex, + ) + const operations = [...others, op].sort( + (a, b) => a.operationIndex - b.operationIndex, + ) + return { + ...state, + appliedAt: isoWithoutZone(new Date(now())), + operations, + // migrationCompleted stays false until applyMigration explicitly flips it + migrationCompleted: state.migrationCompleted, + } +} + +function freshMigrationState( + migrationName: string, + checksum: string, +): MigrationRowState { + return { + name: migrationName, + appliedAt: '1970-01-01 00:00:00.000', + checksum, + chkitVersion: '', + migrationCompleted: false, + operations: [], + } +} + +export function makeDeterministicQueryId( + migrationName: string, + statementIndex: number, +): string { + const hex = createHash('sha256') + .update(`chkit:${migrationName}:${statementIndex}`) + .digest('hex') + return [ + hex.slice(0, 8), + hex.slice(8, 12), + hex.slice(12, 16), + hex.slice(16, 20), + hex.slice(20, 32), + ].join('-') +} + +function progressLine( + operationLabel: string, + status: QueryStatus, + elapsedSec: number, +): string { + const rows = formatRows(status.writtenRows) + const bytes = formatBytes(status.writtenBytes) + return ` ${operationLabel}: written=${rows} (${bytes}), elapsed ${elapsedSec}s` +} + +function formatRows(value: number | undefined): string { + if (value === undefined || value === 0) return '0 rows' + if (value >= 1_000_000) return `${(value / 1_000_000).toFixed(2)}M rows` + if (value >= 1_000) return `${(value / 1_000).toFixed(1)}K rows` + return `${value} rows` +} + +function formatBytes(value: number | undefined): string { + if (value === undefined || value === 0) return '0 B' + if (value >= 1024 ** 3) return `${(value / 1024 ** 3).toFixed(2)} GiB` + if (value >= 1024 ** 2) return `${(value / 1024 ** 2).toFixed(1)} MiB` + if (value >= 1024) return `${(value / 1024).toFixed(1)} KiB` + return `${value} B` +} + +function isoWithoutZone(date: Date): string { + return date.toISOString().replace('Z', '') +} + +function firstLine(value: string): string { + return value.split('\n')[0] ?? value +} + +function defaultSleep(ms: number): Promise { + return new Promise((resolve) => setTimeout(resolve, ms)) +} diff --git a/packages/cli/src/runtime/journal-store.ts b/packages/cli/src/runtime/journal-store.ts index 56ec054..bf13271 100644 --- a/packages/cli/src/runtime/journal-store.ts +++ b/packages/cli/src/runtime/journal-store.ts @@ -4,8 +4,32 @@ import type { MigrationJournal, MigrationJournalEntry } from './migration-store. import { CLI_VERSION } from './version.js' import { debug } from './debug.js' -interface JournalStore { +export type OperationStatus = 'started' | 'completed' | 'failed' + +export interface OperationState { + operationIndex: number + operationKey: string + operationType: string + queryId: string + status: OperationStatus + startedAt: string + finishedAt: string | null + lastError: string +} + +export interface MigrationRowState { + name: string + appliedAt: string + checksum: string + chkitVersion: string + migrationCompleted: boolean + operations: OperationState[] +} + +export interface JournalStore { readJournal(): Promise + readMigrationState(migrationName: string): Promise + writeMigrationState(state: MigrationRowState): Promise appendEntry(entry: MigrationJournalEntry): Promise readonly databaseMissing: boolean } @@ -28,14 +52,68 @@ interface MigrationRow extends Record { applied_at: string checksum: string chkit_version: string + migration_completed?: boolean | number + // ClickHouse JSONEachRow returns named-tuple columns as objects keyed by the + // tuple field names, not as positional arrays. + operations?: OperationTupleRow[] } +interface OperationTupleRow { + operation_index: number | string + operation_key: string + operation_type: string + query_id: string + status: string + started_at: string + finished_at: string | null + last_error: string +} + +const OPERATIONS_TUPLE_TYPE = + 'Array(Tuple(operation_index Int32, operation_key String, operation_type String, query_id String, status LowCardinality(String), started_at DateTime64(3, \'UTC\'), finished_at Nullable(DateTime64(3, \'UTC\')), last_error String))' + function isRetryableInsertRace(error: unknown): boolean { if (!(error instanceof Error)) return false const message = error.message return message.includes('INSERT race condition') || message.includes('Please retry the INSERT') } +function escapeSqlString(value: string): string { + return value.replace(/\\/g, '\\\\').replace(/'/g, "\\'") +} + +function operationToTupleLiteral(op: OperationState): string { + const parts = [ + String(op.operationIndex), + `'${escapeSqlString(op.operationKey)}'`, + `'${escapeSqlString(op.operationType)}'`, + `'${escapeSqlString(op.queryId)}'`, + `'${escapeSqlString(op.status)}'`, + `'${escapeSqlString(op.startedAt)}'`, + op.finishedAt === null ? 'NULL' : `'${escapeSqlString(op.finishedAt)}'`, + `'${escapeSqlString(op.lastError)}'`, + ] + return `(${parts.join(',')})` +} + +function operationsArrayLiteral(operations: OperationState[]): string { + if (operations.length === 0) return '[]' + return `[${operations.map(operationToTupleLiteral).join(',')}]` +} + +function operationFromTuple(row: OperationTupleRow): OperationState { + return { + operationIndex: Number(row.operation_index), + operationKey: row.operation_key, + operationType: row.operation_type, + queryId: row.query_id, + status: row.status as OperationStatus, + startedAt: row.started_at, + finishedAt: row.finished_at, + lastError: row.last_error, + } +} + export function createJournalStore(db: ClickHouseExecutor): JournalStore { const journalTable = resolveJournalTableName() debug('journal', `journal table: ${journalTable}${process.env.CHKIT_JOURNAL_TABLE ? ' (from CHKIT_JOURNAL_TABLE)' : ''}`) @@ -43,10 +121,13 @@ export function createJournalStore(db: ClickHouseExecutor): JournalStore { name String, applied_at DateTime64(3, 'UTC'), checksum String, - chkit_version String + chkit_version String, + migration_completed Bool DEFAULT true, + operations ${OPERATIONS_TUPLE_TYPE} DEFAULT [] ) ENGINE = ReplacingMergeTree(applied_at) ORDER BY (name) SETTINGS index_granularity = 1` + let bootstrapped = false let _databaseMissing = false @@ -55,7 +136,8 @@ SETTINGS index_granularity = 1` debug('journal', `probing journal table "${journalTable}"`) try { await db.query(`SELECT name FROM ${journalTable} LIMIT 0`) - debug('journal', 'journal table exists') + debug('journal', 'journal table exists — checking schema') + await ensureSchemaUpgraded() bootstrapped = true return } catch (error) { @@ -90,10 +172,32 @@ SETTINGS index_granularity = 1` bootstrapped = true } + async function ensureSchemaUpgraded(): Promise { + // Pre-existing journal tables predate per-operation tracking. Add the + // columns idempotently so older deployments pick them up on first run + // of the new chkit. ALTER ADD COLUMN IF NOT EXISTS is a metadata op, + // no data rewrite. + await db.command( + `ALTER TABLE ${journalTable} ADD COLUMN IF NOT EXISTS migration_completed Bool DEFAULT true`, + ) + await db.command( + `ALTER TABLE ${journalTable} ADD COLUMN IF NOT EXISTS operations ${OPERATIONS_TUPLE_TYPE} DEFAULT []`, + ) + } + + async function trySyncReplica(): Promise { + try { + await db.command(`SYSTEM SYNC REPLICA ${journalTable}`) + } catch { + // Non-replicated or single-node setups don't support SYSTEM SYNC REPLICA. + } + } + return { get databaseMissing() { return _databaseMissing }, + async readJournal(): Promise { debug('journal', 'reading journal') await ensureTable() @@ -101,13 +205,9 @@ SETTINGS index_granularity = 1` debug('journal', 'database missing — returning empty journal') return { version: 1, applied: [] } } - try { - await db.command(`SYSTEM SYNC REPLICA ${journalTable}`) - } catch { - // Non-replicated or single-node setups don't support SYSTEM SYNC REPLICA. - } + await trySyncReplica() const rows = await db.query( - `SELECT name, applied_at, checksum, chkit_version FROM ${journalTable} ORDER BY name SETTINGS select_sequential_consistency = 1` + `SELECT name, applied_at, checksum, chkit_version FROM ${journalTable} FINAL WHERE migration_completed = true ORDER BY name SETTINGS select_sequential_consistency = 1`, ) debug('journal', `journal has ${rows.length} applied entries`) return { @@ -120,16 +220,33 @@ SETTINGS index_granularity = 1` } }, - async appendEntry(entry: MigrationJournalEntry): Promise { - debug('journal', `appending entry: ${entry.name} (checksum: ${entry.checksum})`) + async readMigrationState(migrationName: string): Promise { + await ensureTable() + if (_databaseMissing) return null + await trySyncReplica() + const rows = await db.query( + `SELECT name, applied_at, checksum, chkit_version, migration_completed, operations FROM ${journalTable} FINAL WHERE name = '${escapeSqlString(migrationName)}' LIMIT 1 SETTINGS select_sequential_consistency = 1`, + ) + const row = rows[0] + if (!row) return null + return { + name: row.name, + appliedAt: row.applied_at, + checksum: row.checksum, + chkitVersion: row.chkit_version, + migrationCompleted: Boolean(row.migration_completed), + operations: (row.operations ?? []).map(operationFromTuple), + } + }, + + async writeMigrationState(state: MigrationRowState): Promise { if (_databaseMissing) { debug('journal', 'resetting databaseMissing flag — migration may have created the database') _databaseMissing = false bootstrapped = false } await ensureTable() - const esc = (s: string) => s.replace(/\\/g, '\\\\').replace(/'/g, "\\'") - const insertSql = `INSERT INTO ${journalTable} (name, applied_at, checksum, chkit_version) VALUES ('${esc(entry.name)}', '${esc(entry.appliedAt)}', '${esc(entry.checksum)}', '${esc(CLI_VERSION)}')` + const insertSql = `INSERT INTO ${journalTable} (name, applied_at, checksum, chkit_version, migration_completed, operations) VALUES ('${escapeSqlString(state.name)}', '${escapeSqlString(state.appliedAt)}', '${escapeSqlString(state.checksum)}', '${escapeSqlString(state.chkitVersion || CLI_VERSION)}', ${state.migrationCompleted ? 'true' : 'false'}, ${operationsArrayLiteral(state.operations)})` const maxAttempts = 5 for (let attempt = 1; attempt <= maxAttempts; attempt += 1) { try { @@ -143,11 +260,22 @@ SETTINGS index_granularity = 1` await new Promise((r) => setTimeout(r, attempt * 150)) } } - try { - await db.command(`SYSTEM SYNC REPLICA ${journalTable}`) - } catch { - // Non-replicated or single-node setups don't support SYSTEM SYNC REPLICA. - } + await trySyncReplica() + }, + + async appendEntry(entry: MigrationJournalEntry): Promise { + debug('journal', `appending entry: ${entry.name} (checksum: ${entry.checksum})`) + // Preserve any per-operation history that async statements wrote + // earlier in this migration's apply — we just flip migrationCompleted. + const existing = await this.readMigrationState(entry.name) + await this.writeMigrationState({ + name: entry.name, + appliedAt: entry.appliedAt, + checksum: entry.checksum, + chkitVersion: CLI_VERSION, + migrationCompleted: true, + operations: existing?.operations ?? [], + }) }, } } diff --git a/packages/cli/src/runtime/safety-markers.ts b/packages/cli/src/runtime/safety-markers.ts index 1f498c5..32bd5b1 100644 --- a/packages/cli/src/runtime/safety-markers.ts +++ b/packages/cli/src/runtime/safety-markers.ts @@ -14,10 +14,21 @@ export interface DestructiveOperationMarker { summary: string } -interface MigrationOperationSummary { +export type MigrationOperationMode = 'sync' | 'async' + +export interface MigrationOperationSummary { type: string key: string risk: string + mode: MigrationOperationMode + /** + * SQL to run BEFORE re-attempting this operation, parsed from a + * `-- before-retry: ` header line that follows the `-- operation:` + * line. Empty / null on first attempt; runs on every retry. Compensates + * for partial side effects left behind by a failed prior attempt + * (e.g. TRUNCATE a partially-loaded table before re-INSERTing). + */ + beforeRetry: string | null summary: string } @@ -33,25 +44,58 @@ function extractDestructiveOperationSummaries(sql: string): string[] { .map((line) => line.replace(/^-- operation:\s*/, '')) } -function parseOperationLine(summary: string): MigrationOperationSummary | null { - const match = summary.match(/^([a-z_]+)\s+key=([^\s]+)\s+risk=([a-z_]+)$/) +function parseOperationLine( + summary: string, + beforeRetry: string | null, +): MigrationOperationSummary | null { + const match = summary.match( + /^([a-z_]+)\s+key=([^\s]+)\s+risk=([a-z_]+)(?:\s+mode=([a-z_]+))?$/, + ) if (!match) return null + const rawMode = match[4] + const mode: MigrationOperationMode = rawMode === 'async' ? 'async' : 'sync' return { type: match[1] ?? 'unknown', key: match[2] ?? 'unknown', risk: match[3] ?? 'unknown', + mode, + beforeRetry, summary, } } +const BEFORE_RETRY_PREFIX = '-- before-retry:' + +function stripTrailingSemicolon(value: string): string { + return value.replace(/;\s*$/, '').trim() +} + export function extractMigrationOperationSummaries(sql: string): MigrationOperationSummary[] { - return sql - .split('\n') - .map((line) => line.trim()) - .filter((line) => line.startsWith('-- operation:')) - .map((line) => line.replace(/^-- operation:\s*/, '')) - .map((summary) => parseOperationLine(summary)) - .filter((item): item is MigrationOperationSummary => item !== null) + const lines = sql.split('\n').map((line) => line.trim()) + const summaries: MigrationOperationSummary[] = [] + for (let i = 0; i < lines.length; i++) { + const line = lines[i] + if (!line?.startsWith('-- operation:')) continue + const summary = line.replace(/^-- operation:\s*/, '') + // A `-- before-retry: ` line is associated with the operation if it + // appears in the comment block immediately following the operation line + // (allowing blank/comment lines in between, but not executable SQL). + let beforeRetry: string | null = null + for (let j = i + 1; j < lines.length; j++) { + const next = lines[j] + if (next === undefined) break + if (next === '') continue + if (!next.startsWith('--')) break + if (next.startsWith(BEFORE_RETRY_PREFIX)) { + beforeRetry = stripTrailingSemicolon(next.slice(BEFORE_RETRY_PREFIX.length).trim()) + if (beforeRetry === '') beforeRetry = null + break + } + } + const parsed = parseOperationLine(summary, beforeRetry) + if (parsed !== null) summaries.push(parsed) + } + return summaries } function describeDestructiveOperation(type: string): { @@ -98,7 +142,7 @@ export function collectDestructiveOperationMarkers( sql: string ): DestructiveOperationMarker[] { return extractDestructiveOperationSummaries(sql).map((summary) => { - const parsed = parseOperationLine(summary) + const parsed = parseOperationLine(summary, null) const type = parsed?.type ?? 'unknown' const key = parsed?.key ?? 'unknown' const risk = parsed?.risk ?? 'danger' diff --git a/packages/cli/src/test/commands/migrate/async-apply.test.ts b/packages/cli/src/test/commands/migrate/async-apply.test.ts new file mode 100644 index 0000000..6cd3c3e --- /dev/null +++ b/packages/cli/src/test/commands/migrate/async-apply.test.ts @@ -0,0 +1,383 @@ +import { describe, expect, test } from 'bun:test' + +import type { ClickHouseExecutor, QueryStatus } from '@chkit/clickhouse' + +import { + applyAsyncStatement, + makeDeterministicQueryId, +} from '../../../commands/migrate/async-apply.js' +import type { + JournalStore, + MigrationRowState, + OperationState, +} from '../../../runtime/journal-store.js' + +type StatusCall = { queryId: string; afterTime?: string } +type SubmitCall = { sql: string; queryId?: string } +type CommandCall = { sql: string } + +interface FakeExecutor { + db: ClickHouseExecutor + statusCalls: StatusCall[] + submitCalls: SubmitCall[] + commandCalls: CommandCall[] +} + +function createFakeExecutor( + statuses: QueryStatus[], + options: { failSubmit?: () => Error } = {}, +): FakeExecutor { + const statusCalls: StatusCall[] = [] + const submitCalls: SubmitCall[] = [] + const commandCalls: CommandCall[] = [] + const queue = [...statuses] + const db = { + async submit(sql: string, queryId?: string) { + submitCalls.push({ sql, queryId }) + if (options.failSubmit) throw options.failSubmit() + return queryId ?? 'fallback-id' + }, + async queryStatus(queryId: string, opts?: { afterTime?: string }) { + statusCalls.push({ queryId, afterTime: opts?.afterTime }) + const next = queue.shift() + if (!next) { + throw new Error('queryStatus called more times than fake has answers for') + } + return next + }, + async command(sql: string) { + commandCalls.push({ sql }) + }, + } as unknown as ClickHouseExecutor + return { db, statusCalls, submitCalls, commandCalls } +} + +interface FakeStore { + store: JournalStore + writes: MigrationRowState[] +} + +function createFakeJournalStore(initial: MigrationRowState | null = null): FakeStore { + let current: MigrationRowState | null = initial + const writes: MigrationRowState[] = [] + const store: JournalStore = { + databaseMissing: false, + async readJournal() { + return { version: 1, applied: [] } + }, + async readMigrationState() { + return current + }, + async writeMigrationState(state) { + writes.push(state) + current = state + }, + async appendEntry() { + // not used in these tests + }, + } + return { store, writes } +} + +const NO_SLEEP = (_ms: number) => Promise.resolve() +const FIXED_NOW = () => 1_700_000_000_000 + +const BASE_INPUT = { + sql: 'INSERT INTO t SELECT 1', + migrationName: 'm.sql', + migrationChecksum: 'deadbeef', + statementIndex: 0, + operationType: 'load_table_data', + operationKey: 'table:t', + beforeRetry: null, +} as const + +function freshStateWith(op: OperationState): MigrationRowState { + return { + name: BASE_INPUT.migrationName, + appliedAt: '1970-01-01 00:00:00.000', + checksum: BASE_INPUT.migrationChecksum, + chkitVersion: '', + migrationCompleted: false, + operations: [op], + } +} + +describe('applyAsyncStatement', () => { + test('produces a deterministic UUID-shaped query_id from (migration, statement_index)', () => { + const a = makeDeterministicQueryId('20260526_load.sql', 0) + const b = makeDeterministicQueryId('20260526_load.sql', 0) + const c = makeDeterministicQueryId('20260526_load.sql', 1) + expect(a).toBe(b) + expect(a).not.toBe(c) + expect(a).toMatch(/^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$/) + }) + + test('first attempt: writes started, submits, polls to completion', async () => { + const queryId = makeDeterministicQueryId(BASE_INPUT.migrationName, BASE_INPUT.statementIndex) + const { db, statusCalls, submitCalls, commandCalls } = createFakeExecutor([ + { status: 'unknown' }, // initial in-flight check + { status: 'running', writtenRows: 50_000 }, + { status: 'finished', writtenRows: 100_000, durationMs: 8000 }, + ]) + const { store, writes } = createFakeJournalStore(null) + const lines: string[] = [] + + const result = await applyAsyncStatement({ + ...BASE_INPUT, + db, + journalStore: store, + log: (line) => lines.push(line), + sleep: NO_SLEEP, + now: FIXED_NOW, + }) + + expect(result.kind).toBe('completed') + expect(submitCalls).toHaveLength(1) + expect(submitCalls[0]?.queryId).toBe(queryId) + expect(commandCalls).toEqual([]) // no before-retry on first attempt + expect(statusCalls).toHaveLength(3) + expect(statusCalls.every((call) => call.afterTime === undefined)).toBe(true) + // Two writes: started + completed. + expect(writes).toHaveLength(2) + expect(writes[0]?.operations[0]?.status).toBe('started') + expect(writes[1]?.operations[0]?.status).toBe('completed') + expect(lines.some((line) => line.includes('submitting async'))).toBe(true) + }) + + test('refuses to resume in-progress async state when the migration checksum changed', async () => { + const queryId = makeDeterministicQueryId(BASE_INPUT.migrationName, BASE_INPUT.statementIndex) + const { db, statusCalls, submitCalls, commandCalls } = createFakeExecutor([]) + const { store, writes } = createFakeJournalStore( + freshStateWith({ + operationIndex: 0, + operationKey: BASE_INPUT.operationKey, + operationType: BASE_INPUT.operationType, + queryId, + status: 'started', + startedAt: '2026-05-26 12:00:00.000', + finishedAt: null, + lastError: '', + }), + ) + + await expect( + applyAsyncStatement({ + ...BASE_INPUT, + migrationChecksum: 'changed-checksum', + beforeRetry: 'TRUNCATE TABLE t', + db, + journalStore: store, + log: () => {}, + sleep: NO_SLEEP, + now: FIXED_NOW, + }), + ).rejects.toThrow(/in-progress async journal state/) + + expect(statusCalls).toEqual([]) + expect(submitCalls).toEqual([]) + expect(commandCalls).toEqual([]) + expect(writes).toEqual([]) + }) + + test('completed-skip: prior operation marked completed → no submit, no command', async () => { + const queryId = makeDeterministicQueryId(BASE_INPUT.migrationName, BASE_INPUT.statementIndex) + const { db, statusCalls, submitCalls, commandCalls } = createFakeExecutor([]) + const { store, writes } = createFakeJournalStore( + freshStateWith({ + operationIndex: 0, + operationKey: BASE_INPUT.operationKey, + operationType: BASE_INPUT.operationType, + queryId, + status: 'completed', + startedAt: '2026-05-26 12:00:00.000', + finishedAt: '2026-05-26 12:01:00.000', + lastError: '', + }), + ) + const lines: string[] = [] + + const result = await applyAsyncStatement({ + ...BASE_INPUT, + db, + journalStore: store, + log: (line) => lines.push(line), + sleep: NO_SLEEP, + now: FIXED_NOW, + }) + + expect(result.kind).toBe('skipped') + expect(submitCalls).toEqual([]) + expect(statusCalls).toEqual([]) // didn't even consult system.processes + expect(commandCalls).toEqual([]) + expect(writes).toEqual([]) // nothing changed + expect(lines.some((line) => line.includes('already completed'))).toBe(true) + }) + + test('in-flight attach: query already running on server → poll without resubmit', async () => { + const queryId = makeDeterministicQueryId(BASE_INPUT.migrationName, BASE_INPUT.statementIndex) + const { db, statusCalls, submitCalls, commandCalls } = createFakeExecutor([ + { status: 'running', writtenRows: 50 }, // initial check sees it running + { status: 'running', writtenRows: 75 }, + { status: 'finished', writtenRows: 100, durationMs: 5000 }, + ]) + const { store, writes } = createFakeJournalStore( + freshStateWith({ + operationIndex: 0, + operationKey: BASE_INPUT.operationKey, + operationType: BASE_INPUT.operationType, + queryId, + status: 'started', + startedAt: '2026-05-26 11:00:00.000', + finishedAt: null, + lastError: '', + }), + ) + const lines: string[] = [] + + const result = await applyAsyncStatement({ + ...BASE_INPUT, + beforeRetry: 'TRUNCATE TABLE t', + db, + journalStore: store, + log: (line) => lines.push(line), + sleep: NO_SLEEP, + now: FIXED_NOW, + }) + + expect(result.kind).toBe('completed') + expect(submitCalls).toEqual([]) // never resubmitted + expect(commandCalls).toEqual([]) // before-retry NOT run on attach + expect(statusCalls).toHaveLength(3) + expect(lines.some((line) => line.includes('attaching'))).toBe(true) + expect(writes).toHaveLength(1) + expect(writes[0]?.operations[0]?.status).toBe('completed') + }) + + test('retry: prior failed op + query no longer running → run before-retry, then resubmit', async () => { + const queryId = makeDeterministicQueryId(BASE_INPUT.migrationName, BASE_INPUT.statementIndex) + const { db, statusCalls, submitCalls, commandCalls } = createFakeExecutor([ + { status: 'unknown' }, // initial check: not running on server anymore + { status: 'running', writtenRows: 25_000 }, + { status: 'finished', writtenRows: 100_000, durationMs: 3000 }, + ]) + const { store, writes } = createFakeJournalStore( + freshStateWith({ + operationIndex: 0, + operationKey: BASE_INPUT.operationKey, + operationType: BASE_INPUT.operationType, + queryId, + status: 'failed', + startedAt: '2026-05-26 10:00:00.000', + finishedAt: '2026-05-26 10:05:00.000', + lastError: 'Memory limit exceeded', + }), + ) + const lines: string[] = [] + + const result = await applyAsyncStatement({ + ...BASE_INPUT, + beforeRetry: 'TRUNCATE TABLE t SETTINGS max_table_size_to_drop = 0', + db, + journalStore: store, + log: (line) => lines.push(line), + sleep: NO_SLEEP, + now: FIXED_NOW, + }) + + expect(result.kind).toBe('completed') + expect(commandCalls).toEqual([ + { sql: 'TRUNCATE TABLE t SETTINGS max_table_size_to_drop = 0' }, + ]) + expect(submitCalls).toHaveLength(1) + expect(submitCalls[0]?.queryId).toBe(queryId) + expect(statusCalls).toHaveLength(3) + expect(statusCalls[1]?.afterTime).toBe('2023-11-14T22:12:20.000') + expect(statusCalls[2]?.afterTime).toBe('2023-11-14T22:12:20.000') + expect(lines.some((line) => line.includes('running before-retry SQL'))).toBe(true) + expect(lines.some((line) => line.includes('Memory limit exceeded'))).toBe(true) + // started (overwrite prior failed) + completed + expect(writes).toHaveLength(2) + expect(writes[0]?.operations[0]?.status).toBe('started') + expect(writes[1]?.operations[0]?.status).toBe('completed') + }) + + test('retry without before-retry SQL: still resubmits forward', async () => { + const queryId = makeDeterministicQueryId(BASE_INPUT.migrationName, BASE_INPUT.statementIndex) + const { db, submitCalls, commandCalls } = createFakeExecutor([ + { status: 'unknown' }, + { status: 'finished', writtenRows: 1, durationMs: 100 }, + ]) + const { store } = createFakeJournalStore( + freshStateWith({ + operationIndex: 0, + operationKey: BASE_INPUT.operationKey, + operationType: BASE_INPUT.operationType, + queryId, + status: 'failed', + startedAt: '2026-05-26 10:00:00.000', + finishedAt: '2026-05-26 10:05:00.000', + lastError: 'NETWORK_ERROR', + }), + ) + + await applyAsyncStatement({ + ...BASE_INPUT, + beforeRetry: null, + db, + journalStore: store, + log: () => {}, + sleep: NO_SLEEP, + now: FIXED_NOW, + }) + + expect(commandCalls).toEqual([]) // no before-retry SQL → no command + expect(submitCalls).toHaveLength(1) + }) + + test('polling-failure: query transitions to failed → write failed state and throw', async () => { + const { db } = createFakeExecutor([ + { status: 'unknown' }, + { status: 'running' }, + { status: 'failed', error: 'NETWORK_ERROR: broken pipe' }, + ]) + const { store, writes } = createFakeJournalStore(null) + + await expect( + applyAsyncStatement({ + ...BASE_INPUT, + db, + journalStore: store, + log: () => {}, + sleep: NO_SLEEP, + now: FIXED_NOW, + }), + ).rejects.toThrow(/NETWORK_ERROR: broken pipe/) + + // started + failed + expect(writes).toHaveLength(2) + expect(writes[0]?.operations[0]?.status).toBe('started') + expect(writes[1]?.operations[0]?.status).toBe('failed') + expect(writes[1]?.operations[0]?.lastError).toContain('NETWORK_ERROR: broken pipe') + }) + + test('surfaces submit error when status remains unknown (SQL parse failure case)', async () => { + const { db } = createFakeExecutor( + [{ status: 'unknown' }, { status: 'unknown' }, { status: 'unknown' }], + { failSubmit: () => new Error('Syntax error: failed at position 1') }, + ) + const { store } = createFakeJournalStore(null) + + await expect( + applyAsyncStatement({ + ...BASE_INPUT, + sql: 'NOT VALID SQL', + db, + journalStore: store, + log: () => {}, + sleep: NO_SLEEP, + now: FIXED_NOW, + }), + ).rejects.toThrow(/Syntax error/) + }) +}) diff --git a/packages/cli/src/test/runtime/journal-store.test.ts b/packages/cli/src/test/runtime/journal-store.test.ts new file mode 100644 index 0000000..43c3692 --- /dev/null +++ b/packages/cli/src/test/runtime/journal-store.test.ts @@ -0,0 +1,211 @@ +import { describe, expect, test } from 'bun:test' + +import type { ClickHouseExecutor } from '@chkit/clickhouse' + +import { createJournalStore } from '../../runtime/journal-store.js' + +interface ScriptedExecutor { + db: ClickHouseExecutor + commandCalls: string[] +} + +function createScriptedExecutor( + queryAnswers: Map, +): ScriptedExecutor { + const commandCalls: string[] = [] + const db = { + async command(sql: string) { + commandCalls.push(sql) + }, + async query(sql: string): Promise { + for (const [key, value] of queryAnswers) { + if (key instanceof RegExp) { + if (key.test(sql)) return value as T[] + } else if (sql.includes(key)) { + return value as T[] + } + } + return [] + }, + async queryStatus() { + throw new Error('queryStatus not implemented in this fake') + }, + async submit() { + throw new Error('submit not implemented in this fake') + }, + async insert() { + throw new Error('insert not implemented in this fake') + }, + } as unknown as ClickHouseExecutor + return { db, commandCalls } +} + +describe('createJournalStore', () => { + test('readMigrationState parses named-tuple operations returned as objects (JSONEachRow shape)', async () => { + // Regression: ClickHouse's JSONEachRow format returns named Tuple columns + // as objects keyed by tuple field names, NOT positional arrays. An earlier + // implementation read by index (`tuple[0]`) and silently produced + // `operationIndex: NaN`, which made retry detection fail and corrupted + // subsequent writes. Make sure named-tuple object-shape is parsed by name. + const { db } = createScriptedExecutor( + new Map([ + // ensureTable probe: pretend the table exists with operations column already. + [/SELECT name FROM .* LIMIT 0/, []], + // The actual SELECT for readMigrationState — return JSONEachRow shape: + [ + /FROM .* FINAL WHERE name = /, + [ + { + name: 'm.sql', + applied_at: '2026-05-26 12:00:00.000', + checksum: 'deadbeef', + chkit_version: '0.1.0-test', + migration_completed: false, + operations: [ + { + operation_index: 0, + operation_key: 'table:default.hits', + operation_type: 'load_table_data', + query_id: '17977426-2184-de85-8142-3b6b04a1fded', + status: 'started', + started_at: '2026-05-26 12:00:00.000', + finished_at: null, + last_error: '', + }, + ], + }, + ], + ], + ]), + ) + + const store = createJournalStore(db) + const state = await store.readMigrationState('m.sql') + + expect(state).not.toBeNull() + expect(state?.name).toBe('m.sql') + expect(state?.migrationCompleted).toBe(false) + expect(state?.operations).toHaveLength(1) + const op = state?.operations[0] + expect(op?.operationIndex).toBe(0) + expect(Number.isNaN(op?.operationIndex)).toBe(false) + expect(op?.operationKey).toBe('table:default.hits') + expect(op?.operationType).toBe('load_table_data') + expect(op?.queryId).toBe('17977426-2184-de85-8142-3b6b04a1fded') + expect(op?.status).toBe('started') + expect(op?.startedAt).toBe('2026-05-26 12:00:00.000') + expect(op?.finishedAt).toBeNull() + expect(op?.lastError).toBe('') + }) + + test('readMigrationState handles operations column missing entirely (legacy row pre-ALTER)', async () => { + const { db } = createScriptedExecutor( + new Map([ + [/SELECT name FROM .* LIMIT 0/, []], + [ + /FROM .* FINAL WHERE name = /, + [ + { + name: 'legacy.sql', + applied_at: '2026-04-01 09:00:00.000', + checksum: 'cafebabe', + chkit_version: '0.0.9', + migration_completed: true, + // operations field omitted on purpose — legacy journal row. + }, + ], + ], + ]), + ) + + const store = createJournalStore(db) + const state = await store.readMigrationState('legacy.sql') + + expect(state).not.toBeNull() + expect(state?.migrationCompleted).toBe(true) + expect(state?.operations).toEqual([]) + }) + + test('readMigrationState returns null when no row exists', async () => { + const { db } = createScriptedExecutor( + new Map([ + [/SELECT name FROM .* LIMIT 0/, []], + [/FROM .* FINAL WHERE name = /, []], + ]), + ) + + const store = createJournalStore(db) + const state = await store.readMigrationState('absent.sql') + expect(state).toBeNull() + }) + + test('writeMigrationState serializes operations as a tuple array literal in INSERT VALUES', async () => { + const { db, commandCalls } = createScriptedExecutor( + new Map([ + [/SELECT name FROM .* LIMIT 0/, []], + ]), + ) + + const store = createJournalStore(db) + await store.writeMigrationState({ + name: 'm.sql', + appliedAt: '2026-05-26 12:00:00.000', + checksum: 'deadbeef', + chkitVersion: '0.1.0-test', + migrationCompleted: false, + operations: [ + { + operationIndex: 0, + operationKey: 'table:default.hits', + operationType: 'load_table_data', + queryId: '17977426-2184-de85-8142-3b6b04a1fded', + status: 'started', + startedAt: '2026-05-26 12:00:00.000', + finishedAt: null, + lastError: '', + }, + ], + }) + + const insert = commandCalls.find((sql) => sql.startsWith('INSERT INTO')) + expect(insert).toBeDefined() + // Operations encoded as a positional tuple-array literal, with NULL for + // finished_at when the op is still in flight. + expect(insert).toMatch( + /\[\(0,'table:default\.hits','load_table_data','17977426-2184-de85-8142-3b6b04a1fded','started','2026-05-26 12:00:00\.000',NULL,''\)\]/, + ) + }) + + test('writeMigrationState escapes single quotes inside lastError so SQL stays valid', async () => { + const { db, commandCalls } = createScriptedExecutor( + new Map([ + [/SELECT name FROM .* LIMIT 0/, []], + ]), + ) + + const store = createJournalStore(db) + await store.writeMigrationState({ + name: 'm.sql', + appliedAt: '2026-05-26 12:00:00.000', + checksum: 'cs', + chkitVersion: 'v', + migrationCompleted: false, + operations: [ + { + operationIndex: 0, + operationKey: 'k', + operationType: 't', + queryId: 'q', + status: 'failed', + startedAt: '2026-05-26 12:00:00.000', + finishedAt: '2026-05-26 12:01:00.000', + lastError: "It's broken: 'unterminated", + }, + ], + }) + + const insert = commandCalls.find((sql) => sql.startsWith('INSERT INTO')) + expect(insert).toBeDefined() + expect(insert).toContain("It\\'s broken: \\'unterminated") + }) +}) diff --git a/packages/cli/src/test/runtime/safety-markers.test.ts b/packages/cli/src/test/runtime/safety-markers.test.ts index 23a2ae5..8c3ba24 100644 --- a/packages/cli/src/test/runtime/safety-markers.test.ts +++ b/packages/cli/src/test/runtime/safety-markers.test.ts @@ -1,6 +1,9 @@ import { describe, expect, test } from 'bun:test' -import { extractExecutableStatements } from '../../runtime/safety-markers.js' +import { + extractExecutableStatements, + extractMigrationOperationSummaries, +} from '../../runtime/safety-markers.js' describe('extractExecutableStatements', () => { test('splits simple statement batches', () => { @@ -42,6 +45,100 @@ describe('extractExecutableStatements', () => { ]) }) + test('extractMigrationOperationSummaries defaults mode to sync and beforeRetry to null', () => { + const sql = ` + -- operation: create_table key=table:app.events risk=safe + CREATE TABLE app.events (id UInt64) ENGINE = MergeTree() ORDER BY id; + ` + + expect(extractMigrationOperationSummaries(sql)).toEqual([ + { + type: 'create_table', + key: 'table:app.events', + risk: 'safe', + mode: 'sync', + beforeRetry: null, + summary: 'create_table key=table:app.events risk=safe', + }, + ]) + }) + + test('extractMigrationOperationSummaries parses mode=async', () => { + const sql = ` + -- operation: load_table_data key=table:app.events risk=caution mode=async + INSERT INTO app.events SELECT * FROM s3('https://example.com/file.parquet','Parquet'); + ` + + expect(extractMigrationOperationSummaries(sql)).toEqual([ + { + type: 'load_table_data', + key: 'table:app.events', + risk: 'caution', + mode: 'async', + beforeRetry: null, + summary: + 'load_table_data key=table:app.events risk=caution mode=async', + }, + ]) + }) + + test('extractMigrationOperationSummaries parses before-retry SQL attached to an operation', () => { + const sql = ` + -- operation: load_table_data key=table:app.events risk=caution mode=async + -- before-retry: TRUNCATE TABLE app.events SETTINGS max_table_size_to_drop = 0; + INSERT INTO app.events SELECT * FROM s3('…','Parquet'); + ` + + const ops = extractMigrationOperationSummaries(sql) + expect(ops).toHaveLength(1) + expect(ops[0]?.beforeRetry).toBe( + 'TRUNCATE TABLE app.events SETTINGS max_table_size_to_drop = 0', + ) + }) + + test('extractMigrationOperationSummaries does not pick up before-retry separated by SQL', () => { + const sql = ` + -- operation: load_table_data key=table:app.events risk=caution mode=async + INSERT INTO app.events SELECT 1; + -- before-retry: TRUNCATE TABLE app.events; + ` + + // The before-retry line comes AFTER an executable statement — that's + // for a different (later) operation, not this one. This operation's + // beforeRetry should be null. + const ops = extractMigrationOperationSummaries(sql) + expect(ops).toHaveLength(1) + expect(ops[0]?.beforeRetry).toBeNull() + }) + + test('extractMigrationOperationSummaries handles mixed sync + async ops', () => { + const sql = ` + -- operation: truncate_table key=table:app.events risk=caution + TRUNCATE TABLE app.events; + -- operation: load_table_data key=table:app.events risk=caution mode=async + INSERT INTO app.events SELECT 1; + ` + + const ops = extractMigrationOperationSummaries(sql) + expect(ops.map((op) => ({ type: op.type, mode: op.mode }))).toEqual([ + { type: 'truncate_table', mode: 'sync' }, + { type: 'load_table_data', mode: 'async' }, + ]) + }) + + test('extractMigrationOperationSummaries treats unrecognized mode as sync (forward-compat)', () => { + const sql = ` + -- operation: load_table_data key=table:app.events risk=caution mode=batched + INSERT INTO app.events SELECT 1; + ` + + // Forward compat: a future mode value an older chkit doesn't know about + // should fall back to sync execution rather than silently dropping the op. + const ops = extractMigrationOperationSummaries(sql) + expect(ops).toHaveLength(1) + expect(ops[0]?.mode).toBe('sync') + }) + test('ignores full-line comments while preserving executable statements', () => { const sql = ` -- operation: alter_table_drop_column key=table:app.events:column:old_col risk=danger