Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions .changeset/migration-mode-async.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
---
"chkit": patch
---

Add `mode=async` annotation for long-running migration operations.

Mark an operation as async by adding `mode=async` to its `-- operation:` header line, for example:

```sql
-- operation: load_table_data key=table:default.hits risk=caution mode=async
INSERT INTO default.hits SELECT * FROM s3(...);
```

When `chkit migrate --apply` encounters an async operation it:

1. Computes a deterministic `query_id` from `sha256(migration_filename + ':' + statement_index)`.
2. Checks `system.processes` / `system.query_log` for any prior attempt with that id.
3. Fires the INSERT via the existing `submit()` path without blocking on its HTTP response, and polls `queryStatus(query_id)` every 5 seconds — printing a one-line update (`written=N.NM rows (N.N GiB), elapsed Ns`) so the operator sees the load advance.
4. On `QueryFinish` → records the journal entry and proceeds. On `ExceptionWhileProcessing` → throws with the server's exception. On any prior run's failure → resubmits (retry semantics).

This unblocks two scenarios chkit could not previously handle:

- **Long INSERTs through a proxy/LB with an HTTP request-duration ceiling**: the operator sees progress, and a connection drop mid-poll no longer cancels the work — the deterministic id lets a re-run attach to the in-flight query on the server.
- **Transient client-side errors during a multi-minute load**: re-running chkit picks up where it left off rather than starting over.

Existing migrations without `mode=async` continue to use the synchronous path; the annotation is opt-in and forward-compatible (an unknown mode value falls back to sync).
32 changes: 21 additions & 11 deletions bun.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions examples/clickbench/README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# ClickBench CHKit example

This example creates the ClickBench `hits` schema and loads the full public ClickBench dataset from `datasets.clickhouse.com`.
This example creates the ClickBench `hits` schema and loads the full public ClickBench dataset from the ObsessionDB-hosted mirror at `fsn1.your-objectstorage.com/obsessiondb-datasets/clickbench/`.

The data load migration truncates `default.hits` before inserting so an interrupted load can be retried by clearing the migration journal entry or using a fresh database. Run it against a disposable ClickHouse database.

Expand All @@ -25,6 +25,6 @@ bun run migrate -- --service <service-name-or-alias>
## Migrations

- `20260525133129_create_clickbench_schema.sql` creates the ClickBench `hits` table.
- `20260525133130_load_clickbench_data.sql` truncates `hits` and loads the full partitioned Parquet dataset from `https://datasets.clickhouse.com/hits_compatible/athena_partitioned/` via ClickHouse's `url()` table function.
- `20260525133130_load_clickbench_data.sql` truncates `hits` and loads the full partitioned Parquet dataset from `https://fsn1.your-objectstorage.com/obsessiondb-datasets/clickbench/` via ClickHouse's `s3()` table function.

The benchmark query set is intentionally not included yet; this example focuses on schema creation and dataset loading.
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,13 @@
TRUNCATE TABLE default.hits SETTINGS max_table_size_to_drop = 0, max_partition_size_to_drop = 0;

-- Load the full ClickBench dataset (100 Parquet files, ~100M rows) in a single
-- INSERT. We use the s3() table function against the public dataset bucket
-- (datasets.clickhouse.com is a CloudFront alias for clickhouse-public-datasets)
-- because s3() does native partitioned-Parquet parallelism that url() does not.
-- INSERT. We use the s3() table function against the ObsessionDB-hosted mirror
-- of the dataset (Hetzner Object Storage, FSN1 region) because s3() does
-- native partitioned-Parquet parallelism that url() does not.
--
-- The executable TRUNCATE above clears the target before the first attempt.
-- The `before-retry:` line below repeats that compensation before any retry
-- where a prior async INSERT may have partially loaded rows.
--
-- Tuning (measured against an ObsessionDB customer-benchmark instance):
-- * max_download_threads = 64, max_insert_threads = 16 lands at ~178s for
Expand All @@ -26,15 +30,16 @@ TRUNCATE TABLE default.hits SETTINGS max_table_size_to_drop = 0, max_partition_s
-- * max_execution_time = 0 lifts the server-side query timer (the load is
-- intentionally long-running).

-- operation: load_table_data key=table:default.hits risk=caution
-- operation: load_table_data key=table:default.hits risk=caution mode=async
-- before-retry: TRUNCATE TABLE default.hits SETTINGS max_table_size_to_drop = 0, max_partition_size_to_drop = 0;
INSERT INTO default.hits
SELECT *
FROM s3(
'https://clickhouse-public-datasets.s3.amazonaws.com/hits_compatible/athena_partitioned/hits_{0..99}.parquet',
'https://fsn1.your-objectstorage.com/obsessiondb-datasets/clickbench/hits_{0..99}.parquet',
'Parquet'
)
SETTINGS
max_execution_time = 0,
max_memory_usage = 6500000000,
max_download_threads = 64,
max_insert_threads = 16;
max_download_threads = 2,
max_insert_threads = 2;
6 changes: 3 additions & 3 deletions examples/clickbench/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@
"check": "chkit check"
},
"devDependencies": {
"@chkit/core": "0.1.0-beta.24",
"@chkit/plugin-obsessiondb": "0.1.0-beta.24",
"chkit": "0.1.0-beta.24"
"@chkit/core": "0.1.0-beta.26",
"@chkit/plugin-obsessiondb": "0.1.0-beta.26",
"chkit": "0.1.0-beta.26"
}
}
3 changes: 2 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@
"packageManager": "[email protected]",
"workspaces": [
"packages/*",
"apps/*"
"apps/*",
"examples/clickbench"
],
"scripts": {
"build": "turbo run build",
Expand Down
23 changes: 21 additions & 2 deletions packages/cli/src/commands/migrate/apply.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,11 @@
extractExecutableStatements,
extractMigrationOperationSummaries,
} from '../../runtime/safety-markers.js'
import { applyAsyncStatement } from './async-apply.js'

type JournalStore = ReturnType<typeof createJournalStore>

export async function applyMigration(input: {

Check warning on line 19 in packages/cli/src/commands/migrate/apply.ts

View workflow job for this annotation

GitHub Actions / verify

High CRAP score (moderate)

Function 'applyMigration' has a CRAP score of 30.0 (threshold: 30.0). • Severity: moderate • Cyclomatic: 5 • Cognitive: 5 • CRAP: 30.0 (threshold: 30.0) • Lines: 74 CRAP combines complexity with coverage: high CRAP means changes here carry high risk. Consider adding tests, simplifying the function, or both.
db: ClickHouseExecutor
journalStore: JournalStore
pluginRuntime: PluginRuntime
Expand Down Expand Up @@ -43,10 +44,28 @@
statements: parsedStatements,
})

const migrationChecksum = checksumSQL(sql)

for (let i = 0; i < statements.length; i++) {
const statement = statements[i] as string
await db.command(statement)
const operation = operationSummaries[i]
if (operation?.mode === 'async') {
await applyAsyncStatement({
db,
journalStore,
sql: statement,
migrationName: file,
migrationChecksum,
statementIndex: i,
operationType: operation.type,
operationKey: operation.key,
beforeRetry: operation.beforeRetry,
log: (line) => console.log(line),
})
// Async ops are DML (loads, backfills) — no DDL propagation to wait on.
continue
}
await db.command(statement)
if (operation) {
await waitForDDLPropagation(db, operation.type, operation.key)
}
Expand All @@ -55,7 +74,7 @@
const entry: MigrationJournalEntry = {
name: file,
appliedAt: new Date().toISOString().replace('Z', ''),
checksum: checksumSQL(sql),
checksum: migrationChecksum,
}
await journalStore.appendEntry(entry)

Expand Down
Loading
Loading