Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions .changeset/obsessiondb-service-slug.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
---
"chkit": patch
"@chkit/plugin-obsessiondb": patch
---

Fix broken ObsessionDB integration after the platform moved service-scoped RPC endpoints from `serviceId` to `serviceSlug`. The plugin now sources and stores the service slug instead of the id and sends `serviceSlug` to `workbench.query.execute`, `jobs.list`/`submit`, and `services.get`. This repairs `migrate`, `generate`, `status`, `drift`, `check`, `query`, and remote backfill against ObsessionDB targets. The bundled oRPC contract copies were also refreshed to match the current platform schemas (services now expose `slug`, query results are array-of-arrays, job details carry per-task runs).

Notes:

- The selected-service file (`obsessiondb.json`) now stores `service_slug` instead of `service_id`. Existing selections will need to be re-run with `chkit obsessiondb service select` (and any aliases re-set).
- The remote backfill flag `--service-id` is renamed to `--service-slug`.
6 changes: 4 additions & 2 deletions packages/cli/src/test/obsessiondb-service.e2e.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,12 @@ async function runCliAsync(
function service(overrides: {
id: string
name: string
slug?: string
status?: 'running' | 'stopped'
}) {
return {
id: overrides.id,
slug: overrides.slug ?? overrides.name,
name: overrides.name,
status: overrides.status ?? 'running',
tier: 1,
Expand Down Expand Up @@ -186,10 +188,10 @@ describe('@chkit/cli obsessiondb service e2e', () => {
expect(aliasList.stdout).toContain(' login -> production')

const stored = JSON.parse(await readFile(servicePath, 'utf8')) as {
aliases?: Record<string, { service_id?: string; service_name?: string }>
aliases?: Record<string, { service_slug?: string; service_name?: string }>
}
expect(stored.aliases?.login).toEqual({
service_id: 'svc-prod',
service_slug: 'production',
service_name: 'production',
})
expect(requests.map((request) => request.path)).toEqual([
Expand Down
2 changes: 1 addition & 1 deletion packages/plugin-obsessiondb/src/auth/login.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ async function promptServiceSelection(
const selected = await selectServiceInteractive(organizations, print)
if (selected) {
await saveSelectedService(configPath, {
service_id: selected.service.id,
service_slug: selected.service.slug,
service_name: selected.service.name,
})
print(`Service selected: ${serviceChoiceLabel(selected)}`)
Expand Down
2 changes: 1 addition & 1 deletion packages/plugin-obsessiondb/src/backfill/handler.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ describe('handleBackfillCommand', () => {

globalThis.fetch = mock(async () => orpcResponse(listResponse)) as typeof fetch

const { context, printed } = makeContext({ command: 'list', flags: { '--service-id': 'svc-1' } })
const { context, printed } = makeContext({ command: 'list', flags: { '--service-slug': 'svc-1' } })
const result = await handleBackfillCommand(context)

expect(result).toEqual({ handled: true, exitCode: 0 })
Expand Down
11 changes: 6 additions & 5 deletions packages/plugin-obsessiondb/src/backfill/handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,27 +54,28 @@
}
}

async function dispatchCommand(

Check warning on line 57 in packages/plugin-obsessiondb/src/backfill/handler.ts

View workflow job for this annotation

GitHub Actions / verify

High CRAP score (moderate)

Function 'dispatchCommand' has a CRAP score of 31.6 (threshold: 30.0). • Severity: moderate • Cyclomatic: 10 • Cognitive: 11 • CRAP: 31.6 (threshold: 30.0) • Lines: 27 CRAP combines complexity with coverage: high CRAP means changes here carry high risk. Consider adding tests, simplifying the function, or both.
client: JobsClient,
command: string,
flags: Record<string, string | string[] | boolean | undefined>,
): Promise<unknown> {
const jobId = typeof flags['--job-id'] === 'string' ? flags['--job-id'] : undefined
const serviceId = typeof flags['--service-id'] === 'string' ? flags['--service-id'] : undefined
const serviceSlug =
typeof flags['--service-slug'] === 'string' ? flags['--service-slug'] : undefined

switch (command) {
case 'status': {
if (jobId) return client.get({ jobId })
if (serviceId) return client.list({ serviceId })
throw new Error('Either --job-id or --service-id is required for remote status')
if (serviceSlug) return client.list({ serviceSlug })
throw new Error('Either --job-id or --service-slug is required for remote status')
}
case 'cancel': {
if (!jobId) throw new Error('--job-id is required for remote cancel')
return client.cancel({ jobId })
}
case 'list': {
if (!serviceId) throw new Error('--service-id is required for remote list')
return client.list({ serviceId })
if (!serviceSlug) throw new Error('--service-slug is required for remote list')
return client.list({ serviceSlug })
}
default:
throw new Error(`Unsupported remote command: ${command}`)
Expand Down
4 changes: 2 additions & 2 deletions packages/plugin-obsessiondb/src/backfill/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@ export const BACKFILL_EXTEND_COMMANDS = [
description: 'Remote job ID for status/cancel',
},
{
name: '--service-id',
name: '--service-slug',
type: 'string' as const,
description: 'ObsessionDB service ID for listing jobs',
description: 'ObsessionDB service slug for listing jobs',
},
],
},
Expand Down
95 changes: 75 additions & 20 deletions packages/plugin-obsessiondb/src/contract/jobs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,32 +5,71 @@
import { oc } from '@orpc/contract'
import { z } from 'zod'

const jobStatusSchema = z.enum(['pending', 'running', 'completed', 'failed', 'cancelled'])
const jobStatusSchema = z.enum([
'pending',
'running',
'draining',
'paused',
'completed',
'failed',
'cancelled',
])

const taskStatusSchema = z.enum(['pending', 'running', 'done', 'failed'])
const runTypeSchema = z.enum(['insert', 'cleanup'])

const jobTaskSchema = z.object({
const runStatusSchema = z.enum(['pending', 'running', 'done', 'failed'])

const cleanupTypeSchema = z.enum(['drop_partition', 'delete_range'])

const taskRunSchema = z.object({
id: z.string(),
taskIndex: z.number().int(),
status: taskStatusSchema,
sql: z.string(),
attempt: z.number().int(),
type: runTypeSchema,
status: runStatusSchema,
queryId: z.string().nullable(),
estimatedBytes: z.number().nullable(),
sql: z.string().nullable(),
settings: z.record(z.string(), z.string()).nullable(),
mutationId: z.string().nullable(),
readRows: z.number().nullable(),
readBytes: z.number().nullable(),
readBytesCompressed: z.number().nullable(),
writtenRows: z.number().nullable(),
writtenBytes: z.number().nullable(),
writtenBytesCompressed: z.number().nullable(),
durationMs: z.number().nullable(),
error: z.string().nullable(),
node: z.number().int().nullable(),
executingNode: z.number().int().nullable(),
executingHost: z.string().nullable(),
startedAt: z.string().datetime().nullable(),
finishedAt: z.string().datetime().nullable(),
})

const jobTaskSchema = z.object({
id: z.string(),
taskIndex: z.number().int(),
taskGroup: z.string().nullable(),
sql: z.string(),
settings: z.record(z.string(), z.string()).nullable(),
cleanupSql: z.string().nullable(),
cleanupType: cleanupTypeSchema.nullable(),
maxRetries: z.number().int(),
estimatedBytes: z.number().nullable(),
estimatedBytesUncompressed: z.number().nullable(),
runs: z.array(taskRunSchema),
})

const jobSummarySchema = z.object({
id: z.string(),
serviceId: z.string(),
title: z.string().nullable(),
type: z.string(),
target: z.string(),
status: jobStatusSchema,
concurrency: z.number().int(),
taskLimit: z.number().int().nullable(),
pollIntervalBaseSec: z.number().int().nullable(),
pollIntervalMaxSec: z.number().int().nullable(),
totalTasks: z.number().int(),
completedTasks: z.number().int(),
failedTasks: z.number().int(),
Expand All @@ -39,27 +78,38 @@ const jobSummarySchema = z.object({
})

const jobDetailSchema = jobSummarySchema.extend({
maxRetries: z.number().int(),
workflowId: z.string().nullable(),
metadata: z.record(z.unknown()).nullable(),
metadata: z.record(z.string(), z.unknown()).nullable(),
tasks: z.array(jobTaskSchema),
})

export const jobsContract = {
submit: oc
.input(
z.object({
serviceId: z.string(),
serviceSlug: z.string(),
title: z.string().max(200).optional(),
type: z.enum(['backfill']),
target: z.string(),
concurrency: z.number().int().min(1).max(12).optional(),
tasks: z.array(
z.object({
id: z.string(),
sql: z.string(),
estimatedBytes: z.number().optional(),
}),
),
metadata: z.record(z.unknown()).optional(),
concurrency: z.number().int().min(0).max(48).optional(),
taskLimit: z.number().int().min(1).optional(),
tasks: z
.array(
z.object({
id: z.string(),
sql: z.string(),
settings: z.record(z.string(), z.string()).optional(),
group: z.string().optional(),
estimatedBytes: z.number().optional(),
estimatedBytesUncompressed: z.number().optional(),
cleanupSql: z.string().optional(),
cleanupType: cleanupTypeSchema.optional(),
maxRetries: z.number().int().min(0).max(10).optional(),
}),
)
.min(1),
metadata: z.record(z.string(), z.unknown()).optional(),
}),
)
.output(z.object({ jobId: z.string() })),
Expand All @@ -69,8 +119,13 @@ export const jobsContract = {
.output(jobDetailSchema),

list: oc
.input(z.object({ serviceId: z.string() }))
.output(z.object({ jobs: z.array(jobSummarySchema) })),
.input(z.object({ serviceSlug: z.string() }))
.output(
z.object({
jobs: z.array(jobSummarySchema),
total: z.number().int().min(0),
}),
),

cancel: oc
.input(z.object({ jobId: z.string() }))
Expand Down
14 changes: 13 additions & 1 deletion packages/plugin-obsessiondb/src/contract/services.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,20 @@
'error',
])

const RESERVED_SLUGS = ['new', 'settings', 'select', 'members', 'profile'] as const

export const serviceSlugSchema = z

Check warning on line 22 in packages/plugin-obsessiondb/src/contract/services.ts

View workflow job for this annotation

GitHub Actions / verify

Unused export

Exported value 'serviceSlugSchema' is never imported by other modules. If this export is part of a public API, consider adding it to the entry configuration. Otherwise, remove the export keyword or delete the declaration.

Check failure

Code scanning / fallow

Export is never imported Error

Export 'serviceSlugSchema' is never imported by other modules
.string()
.min(2)
.max(64)
.regex(/^[a-z0-9][a-z0-9-]{0,62}[a-z0-9]$/)
.refine((s) => !RESERVED_SLUGS.includes(s as (typeof RESERVED_SLUGS)[number]), {
message: 'Reserved slug',
})

export const serviceSchema = z.object({
id: z.string(),
slug: serviceSlugSchema,
name: z.string(),
status: serviceStatusSchema,
tier: z.number().int(),
Expand Down Expand Up @@ -51,6 +63,6 @@
),

get: oc
.input(z.object({ serviceId: z.string() }))
.input(z.object({ serviceSlug: serviceSlugSchema }))
.output(serviceSchema),
}
15 changes: 11 additions & 4 deletions packages/plugin-obsessiondb/src/contract/workbench.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,14 @@
import { oc } from '@orpc/contract'
import { z } from 'zod'

const queryColumnMetaSchema = z.object({
name: z.string(),
type: z.string(),
})

const queryResultSchema = z.object({
data: z.array(z.record(z.unknown())),
meta: z.array(z.object({ name: z.string(), type: z.string() })),
data: z.array(z.array(z.string().nullable())),
meta: z.array(queryColumnMetaSchema),
rows: z.number().int(),
statistics: z
.object({
Expand All @@ -26,9 +31,11 @@ export const workbenchContract = {
execute: oc
.input(
z.object({
serviceId: z.string(),
serviceSlug: z.string(),
query: z.string().min(1),
settings: z.record(z.union([z.string(), z.number()])).optional(),
settings: z
.record(z.string(), z.union([z.string(), z.number()]))
.optional(),
database: z.string().optional(),
}),
)
Expand Down
2 changes: 1 addition & 1 deletion packages/plugin-obsessiondb/src/index.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -492,7 +492,7 @@ describe('obsessiondb getContext', () => {
await setupAuth()
const configPath = join(tempDir, 'project', 'clickhouse.config.ts')
await saveServiceAlias(configPath, 'prod', {
service_id: 'svc-prod',
service_slug: 'svc-prod',
service_name: 'production',
})

Expand Down
4 changes: 2 additions & 2 deletions packages/plugin-obsessiondb/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ async function resolveServiceOverride(input: {
const services = await listServices(input.credentials)
const service = services.find((candidate) => candidate.name === input.name)
if (service) {
return { service_id: service.id, service_name: service.name }
return { service_slug: service.slug, service_name: service.name }
}

const aliases = await loadServiceAliases(input.configPath)
Expand Down Expand Up @@ -276,7 +276,7 @@ function createObsessionDBPlugin(
return {
executor: createRemoteExecutor({
credentials: effectiveCreds,
serviceId: service.service_id,
serviceSlug: service.service_slug,
}),
}
},
Expand Down
12 changes: 6 additions & 6 deletions packages/plugin-obsessiondb/src/query/remote-executor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,23 +49,23 @@ export function normalizeQueryJsonResult<T extends Record<string, unknown>>(

export function createRemoteExecutor(deps: {
credentials: Credentials
serviceId: string
serviceSlug: string
}): ClickHouseExecutor {
const { credentials, serviceId } = deps
const { credentials, serviceSlug } = deps
const client = createApiClient(credentials)

const executor: ClickHouseExecutor = {
async command(sql) {
const res = await client.workbench.query.execute({
serviceId,
serviceSlug,
query: sql,
})
throwIfError(res)
},

async query<T>(sql: string): Promise<T[]> {
const res = await client.workbench.query.execute({
serviceId,
serviceSlug,
query: sql,
})
throwIfError(res)
Expand All @@ -76,7 +76,7 @@ export function createRemoteExecutor(deps: {
sql: string,
): Promise<ClickHouseJsonQueryResult<T>> {
const res = await client.workbench.query.execute({
serviceId,
serviceSlug,
query: sql,
})
throwIfError(res)
Expand Down Expand Up @@ -112,7 +112,7 @@ export function createRemoteExecutor(deps: {

async submit(sql, queryId?) {
const res = await client.workbench.query.execute({
serviceId,
serviceSlug,
query: sql,
settings: queryId ? { query_id: queryId } : undefined,
})
Expand Down
Loading
Loading