From 21bd13b523288233d49795517b9888e5256681f0 Mon Sep 17 00:00:00 2001 From: Denys Kashkovskyi Date: Fri, 19 Jun 2026 19:32:52 +0200 Subject: [PATCH] feat: repair-semantic-queue command + post-update bridge for the OV poison loop (#2734) Temporary bridge until the pinned OpenViking includes upstream PR #2735. - New `threadnote repair-semantic-queue [--apply]` patches the installed OpenViking semantic_processor.py to skip non-directory/missing memory URIs (so a memory file reindexed with mode=semantic_and_vectors cannot poison the AGFS-persisted semantic queue), then restarts the server so a stuck message drains. Idempotent (no-op once OV already has the guard or a prior hot-fix), keeps a .threadnote-bak, and compile-checks the patched file before writing. Locates the package from the OV server's venv, common uv/pipx roots, or the venv python; override with THREADNOTE_OPENVIKING_SEMANTIC_PROCESSOR. - Post-update migration (introducedIn 1.4.3) offers it on `threadnote update` with the usual consent prompt. - Unit tests for the pure idempotent patcher; troubleshooting doc. Marked for removal once DEFAULT_OPENVIKING_VERSION is bumped to an OpenViking release containing PR #2735. --- config/post-update-migrations.json | 15 ++ docs/troubleshooting.md | 32 +++ src/lifecycle.ts | 2 +- src/semantic_queue_repair.ts | 248 ++++++++++++++++++++++++ src/threadnote.ts | 13 ++ src/types.ts | 5 + test/unit/semantic_queue_repair.test.ts | 73 +++++++ 7 files changed, 387 insertions(+), 1 deletion(-) create mode 100644 src/semantic_queue_repair.ts create mode 100644 test/unit/semantic_queue_repair.test.ts diff --git a/config/post-update-migrations.json b/config/post-update-migrations.json index fc58d00..ac6e629 100644 --- a/config/post-update-migrations.json +++ b/config/post-update-migrations.json @@ -18,6 +18,21 @@ "If Threadnote reported any original files were still being processed, rerun the printed threadnote forget command later." ], "requiresLegacyHandoffs": true + }, + { + "id": "ov-semantic-poison-hotfix-2734", + "introducedIn": "1.4.3", + "title": "Patch OpenViking against the semantic-queue poison loop (#2734)", + "description": [ + "OpenViking 0.4.x can deadlock its semantic queue: a memory file reindexed with mode=semantic_and_vectors enqueues a directory-level semantic message whose URI is a file; the processor lists it, fails, and the message re-enqueues forever (it is AGFS-persisted, so it survives a server restart).", + "This patches the installed OpenViking to skip non-directory/missing memory URIs (upstream fix PR #2735) and restarts the server so any stuck message drains on the next dequeue.", + "It is a no-op when the installed OpenViking already includes the fix, a backup of the original file is kept, and the patched file is compile-checked before it is written." + ], + "commandArgs": ["repair-semantic-queue", "--apply"], + "instructions": [ + "If the semantic queue was stuck it should now drain. Verify with: ov observer queue", + "This is a temporary local patch; it is superseded automatically once Threadnote pins an OpenViking release that includes the fix. Re-run manually any time with: threadnote repair-semantic-queue --apply" + ] } ] } diff --git a/docs/troubleshooting.md b/docs/troubleshooting.md index 87b2618..cc9a950 100644 --- a/docs/troubleshooting.md +++ b/docs/troubleshooting.md @@ -94,6 +94,38 @@ threadnote doctor --dry-run If it still is not healthy, open that log. Certificate failures during the first embedding model download are covered above. +## Semantic Queue Stuck / Memory Writes Hang + +Symptom: agents hang or `remember`/`handoff` get very slow, and `~/.openviking/logs/server.log` repeats: + +``` +RuntimeError: Failed to list memory directory viking://user/.../memories/.../.md: Directory not found +``` + +A memory _file_ got enqueued for directory-level semantic processing; OpenViking's `_process_memory_directory` lists +it, fails, and the message re-enqueues forever. The entry is AGFS-persisted, so it survives a server restart. Check the +queue — a non-zero `Errors`/`Requeued` on the `Semantic` row is the signature: + +```bash +ov observer queue +``` + +Fix it by patching the installed OpenViking and restarting the server: + +```bash +threadnote repair-semantic-queue --apply +``` + +It skips non-directory/missing memory URIs (OpenViking PR #2735), keeps a `.threadnote-bak`, compile-checks the patched +file before writing, and is a no-op once the installed OpenViking already includes the fix. This is a **temporary local +patch** — `threadnote update` also offers it as a post-update step, and it is superseded automatically once Threadnote +pins an OpenViking release containing the fix. To revert manually: + +```bash +mv .threadnote-bak +threadnote stop && threadnote start +``` + ## Port Already In Use The default bind address is `127.0.0.1:1933`. This does not conflict with projects serving `localhost:80`, diff --git a/src/lifecycle.ts b/src/lifecycle.ts index d45151d..f094eff 100644 --- a/src/lifecycle.ts +++ b/src/lifecycle.ts @@ -434,7 +434,7 @@ async function configureOpenVikingCliLanguage(config: RuntimeConfig, dryRun: boo * three times. The resolved path itself is not memoised: a `threadnote install` * may create the binary mid-process and the second resolution must see it. */ -async function findOpenVikingServer(): Promise { +export async function findOpenVikingServer(): Promise { const onPath = await findExecutable([OPENVIKING_SERVER_COMMAND]); if (onPath) { return onPath; diff --git a/src/semantic_queue_repair.ts b/src/semantic_queue_repair.ts new file mode 100644 index 0000000..db34c75 --- /dev/null +++ b/src/semantic_queue_repair.ts @@ -0,0 +1,248 @@ +import {existsSync} from 'node:fs'; +import {copyFile, mkdtemp, readFile, readdir, rm, writeFile} from 'node:fs/promises'; +import {homedir, tmpdir} from 'node:os'; +import {dirname, join} from 'node:path'; +import {findOpenVikingServer, runStart, runStop} from './lifecycle.js'; +import type {RepairSemanticQueueOptions, RuntimeConfig} from './types.js'; +import {errorMessage, expandPath, findExecutable, runCommand} from './utils.js'; + +// Temporary bridge for the OpenViking semantic-queue poison loop (#2734): a +// memory FILE reindexed with mode=semantic_and_vectors enqueues a directory-level +// semantic message; OpenViking's _process_memory_directory lists the file, fails, +// and the AGFS-persisted message re-enqueues forever. This patches the installed +// OpenViking to skip non-directory/missing memory URIs (upstream PR #2735). +// +// REMOVE-WHEN: DEFAULT_OPENVIKING_VERSION (src/constants.ts) is bumped to an +// OpenViking release that includes PR #2735. Then delete this module, its +// `repair-semantic-queue` command, the post-update migration entry, and the docs +// note. + +const HOTFIX_MARKER = 'THREADNOTE-HOTFIX-2734'; +const ENV_OVERRIDE = 'THREADNOTE_OPENVIKING_SEMANTIC_PROCESSOR'; + +export type SemanticPatchResult = + | {readonly status: 'patched'; readonly source: string} + | {readonly status: 'already-fixed'} + | {readonly status: 'no-anchor'}; + +/** + * Insert the upstream #2735 guard into OpenViking's semantic_processor.py source + * so a context_type="memory" message whose URI is a file (or a vanished + * directory) is skipped instead of re-enqueued forever. Pure and idempotent: + * returns `already-fixed` when a stat(dir_uri) guard (or this hot-fix marker) is + * already present, and `no-anchor` when the expected _process_memory_directory / + * ls(dir_uri) shape is missing so the caller makes no changes. + */ +export function patchSemanticProcessorSource(source: string): SemanticPatchResult { + if (source.includes(HOTFIX_MARKER)) { + return {status: 'already-fixed'}; + } + const lines = source.split('\n'); + const defIndex = lines.findIndex(line => /^\s*async def _process_memory_directory\b/.test(line)); + if (defIndex < 0) { + return {status: 'no-anchor'}; + } + + const lsPattern = /^(\s+)entries = await viking_fs\.ls\(dir_uri\b/; + let lsIndex = -1; + let lsIndent = ''; + for (let i = defIndex + 1; i < lines.length; i += 1) { + const match = lsPattern.exec(lines[i]); + if (match) { + lsIndex = i; + lsIndent = match[1]; + break; + } + if (/^\s*async def \w/.test(lines[i])) { + break; // left the method without finding the anchor + } + } + if (lsIndex < 0) { + return {status: 'no-anchor'}; + } + + // A stat(dir_uri) guard already above the ls (e.g. the upstream fix) means done. + if (lines.slice(defIndex, lsIndex).some(line => line.includes('viking_fs.stat(dir_uri'))) { + return {status: 'already-fixed'}; + } + + // The first meaningful line above the ls must be the `try:` that wraps it. + let tryIndex = -1; + let base = ''; + for (let j = lsIndex - 1; j > defIndex; j -= 1) { + const text = lines[j]; + if (text.trim() === '' || text.trimStart().startsWith('#')) { + continue; + } + const match = /^(\s*)try:\s*$/.exec(text); + if (match && match[1].length < lsIndent.length) { + tryIndex = j; + base = match[1]; + } + break; // only the first meaningful line above the ls is considered + } + if (tryIndex < 0) { + return {status: 'no-anchor'}; + } + + const unit = ' '.repeat(lsIndent.length - base.length); + const at = (level: number, text: string): string => `${base}${unit.repeat(level)}${text}`; + const guard = [ + at(0, `# ${HOTFIX_MARKER}: skip non-directory / missing memory URIs so a memory file`), + at(0, '# reindexed with mode=semantic_and_vectors cannot poison the semantic queue.'), + at(0, '# Temporary local hot-fix; superseded once OpenViking ships upstream PR #2735.'), + at(0, 'try:'), + at(1, '_tn_dir_stat = await viking_fs.stat(dir_uri, ctx=ctx)'), + at(0, 'except Exception as _tn_err:'), + at(1, 'if isinstance(_tn_err, FileNotFoundError) or "not found" in str(_tn_err).lower():'), + at(2, '_mark_done()'), + at(2, 'return'), + at(1, '_tn_dir_stat = None'), + at(0, 'if _tn_dir_stat is not None and not _tn_dir_stat.get("isDir", _tn_dir_stat.get("is_dir", False)):'), + at(1, '_mark_done()'), + at(1, 'return'), + ]; + const patched = [...lines.slice(0, tryIndex), ...guard, ...lines.slice(tryIndex)].join('\n'); + return {status: 'patched', source: patched}; +} + +async function semanticProcessorCandidates(root: string): Promise { + const lib = join(root, 'lib'); + if (!existsSync(lib)) { + return []; + } + let entries: string[]; + try { + entries = await readdir(lib); + } catch { + return []; + } + return entries + .filter(name => name.startsWith('python')) + .map(name => join(lib, name, 'site-packages', 'openviking', 'storage', 'queuefs', 'semantic_processor.py')); +} + +export async function locateSemanticProcessorPath(): Promise { + const override = process.env[ENV_OVERRIDE]?.trim(); + if (override) { + const resolved = expandPath(override); + if (existsSync(resolved)) { + return resolved; + } + } + + const roots: string[] = []; + const server = await findOpenVikingServer(); + if (server) { + roots.push(dirname(dirname(server))); // /bin/openviking-server -> + } + roots.push(join(homedir(), '.local', 'share', 'uv', 'tools', 'openviking')); + roots.push(join(homedir(), '.local', 'pipx', 'venvs', 'openviking')); + + for (const root of roots) { + for (const candidate of await semanticProcessorCandidates(root)) { + if (existsSync(candidate)) { + return candidate; + } + } + } + + // Last resort: ask the OpenViking venv python where the package lives. + if (server) { + const venvPython = join(dirname(server), process.platform === 'win32' ? 'python.exe' : 'python3'); + if (existsSync(venvPython)) { + const result = await runCommand( + venvPython, + [ + '-c', + 'import openviking, os; print(os.path.join(os.path.dirname(openviking.__file__), "storage", "queuefs", "semantic_processor.py"))', + ], + {allowFailure: true}, + ); + const path = result.stdout.trim(); + if (result.exitCode === 0 && path && existsSync(path)) { + return path; + } + } + } + return undefined; +} + +async function assertPatchedSourceCompiles(source: string): Promise { + const python = await findExecutable(['python3', 'python']); + if (!python) { + console.warn('WARN python3 not found; skipping compile validation of the patched OpenViking source.'); + return; + } + const dir = await mkdtemp(join(tmpdir(), 'threadnote-ov-patch-')); + const file = join(dir, 'semantic_processor_patched.py'); + try { + await writeFile(file, source, 'utf8'); + const result = await runCommand(python, ['-m', 'py_compile', file], {allowFailure: true}); + if (result.exitCode !== 0) { + throw new Error(`patched OpenViking source failed to compile: ${result.stderr.trim() || result.stdout.trim()}`); + } + } finally { + await rm(dir, {force: true, recursive: true}); + } +} + +export async function runRepairSemanticQueue( + config: RuntimeConfig, + options: RepairSemanticQueueOptions, +): Promise { + const apply = options.apply === true && options.dryRun !== true; + + const path = await locateSemanticProcessorPath(); + if (!path) { + console.error( + 'Could not locate the installed OpenViking semantic_processor.py. ' + + `Set ${ENV_OVERRIDE}=/path/to/openviking/storage/queuefs/semantic_processor.py and retry.`, + ); + process.exitCode = 1; + return; + } + console.log(`OpenViking semantic processor: ${path}`); + + let original: string; + try { + original = await readFile(path, 'utf8'); + } catch (err: unknown) { + console.error(`Could not read ${path}: ${errorMessage(err)}`); + process.exitCode = 1; + return; + } + + const result = patchSemanticProcessorSource(original); + if (result.status === 'already-fixed') { + console.log('OpenViking already guards non-directory memory URIs (#2734/#2735); nothing to patch.'); + return; + } + if (result.status === 'no-anchor') { + console.warn( + 'WARN Could not find the _process_memory_directory ls(dir_uri) anchor; this OpenViking layout is ' + + 'unexpected, so no changes were made.', + ); + return; + } + + if (!apply) { + console.log('Dry run: would patch OpenViking to skip non-directory/missing memory URIs, then restart the server.'); + console.log('Re-run with --apply to perform it.'); + return; + } + + await assertPatchedSourceCompiles(result.source); + + const backup = `${path}.threadnote-bak`; + if (!existsSync(backup)) { + await copyFile(path, backup); + } + await writeFile(path, result.source, 'utf8'); + console.log(`Patched OpenViking semantic processor (backup: ${backup}).`); + + console.log('Restarting the OpenViking server so the patch loads and any stuck message drains...'); + await runStop(config, {}); + await runStart(config, {}); + console.log('Done. A stuck semantic message drains on the next dequeue; verify with: ov observer queue'); +} diff --git a/src/threadnote.ts b/src/threadnote.ts index 901a1e1..a8844f5 100755 --- a/src/threadnote.ts +++ b/src/threadnote.ts @@ -23,6 +23,7 @@ import type { RecallOptions, RememberOptions, RepairOptions, + RepairSemanticQueueOptions, SeedOptions, ShareInstallArtifactsOptions, ShareInitOptions, @@ -77,6 +78,7 @@ import { runShareUnpublish, } from './share.js'; import {parsePackageManager, runDoctor, runInstall, runRepair, runStart, runStop, runUninstall} from './lifecycle.js'; +import {runRepairSemanticQueue} from './semantic_queue_repair.js'; import {maybeNotifyUpdate, parseUpdateRuntime, runPostUpdate, runUpdate} from './update.js'; import {runVersion} from './version_command.js'; import {runManage} from './manager.js'; @@ -350,6 +352,17 @@ async function main(): Promise { await runMigrateLifecycle(getRuntimeConfig(program), options); }); + program + // Temporary bridge for the OpenViking semantic-queue poison loop (#2734); + // remove once the pinned OpenViking includes upstream PR #2735. + .command('repair-semantic-queue') + .description('Patch the installed OpenViking to drain/avoid the semantic-queue poison loop (#2734)') + .option('--apply', 'Apply the patch and restart the server; without this, prints a dry run') + .option('--dry-run', 'Print what would change without patching or restarting') + .action(async (options: RepairSemanticQueueOptions) => { + await runRepairSemanticQueue(getRuntimeConfig(program), options); + }); + program .command('recall') .description('Search shared OpenViking context') diff --git a/src/types.ts b/src/types.ts index 9d1bb3b..4543748 100644 --- a/src/types.ts +++ b/src/types.ts @@ -192,6 +192,11 @@ export interface MigrateLifecycleOptions { readonly limit?: string; } +export interface RepairSemanticQueueOptions { + readonly apply?: boolean; + readonly dryRun?: boolean; +} + export interface RecallOptions { readonly dryRun?: boolean; readonly inferScope?: boolean; diff --git a/test/unit/semantic_queue_repair.test.ts b/test/unit/semantic_queue_repair.test.ts new file mode 100644 index 0000000..49ec37f --- /dev/null +++ b/test/unit/semantic_queue_repair.test.ts @@ -0,0 +1,73 @@ +import {describe, expect, it} from 'vitest'; +import {patchSemanticProcessorSource} from '../../src/semantic_queue_repair.js'; + +// Minimal stand-in for OpenViking's _process_memory_directory: the outer try, +// the inner try wrapping ls(dir_uri), and the _mark_done helper the guard calls. +const SAMPLE = [ + 'class SemanticProcessor:', + ' async def _process_memory_directory(self, msg, ctx=None):', + ' viking_fs = get_viking_fs()', + ' dir_uri = msg.uri', + '', + ' def _mark_done():', + ' pass', + '', + ' try:', + ' try:', + ' entries = await viking_fs.ls(dir_uri, node_limit=LS_ALL_NODES, ctx=ctx)', + ' except Exception as e:', + ' raise RuntimeError(f"Failed to list memory directory {dir_uri}: {e}") from e', + ' for entry in entries:', + ' pass', + ' finally:', + ' await lock.close()', + '', +].join('\n'); + +describe('patchSemanticProcessorSource', () => { + it('inserts the guard, at the right indent, before the ls(dir_uri) call', () => { + const result = patchSemanticProcessorSource(SAMPLE); + expect(result.status).toBe('patched'); + if (result.status !== 'patched') { + return; + } + const src = result.source; + expect(src).toContain('THREADNOTE-HOTFIX-2734'); + expect(src.indexOf('_tn_dir_stat = await viking_fs.stat(dir_uri')).toBeLessThan( + src.indexOf('entries = await viking_fs.ls(dir_uri'), + ); + // guard `try:` at the inner-try indent (12), stat one level deeper (16) + expect(src).toContain('\n try:\n _tn_dir_stat = await viking_fs.stat(dir_uri, ctx=ctx)'); + // original ls line is preserved untouched + expect(src).toContain(' entries = await viking_fs.ls(dir_uri, node_limit=LS_ALL_NODES, ctx=ctx)'); + }); + + it('is idempotent: re-patching detects the marker', () => { + const once = patchSemanticProcessorSource(SAMPLE); + expect(once.status).toBe('patched'); + if (once.status !== 'patched') { + return; + } + expect(patchSemanticProcessorSource(once.source).status).toBe('already-fixed'); + }); + + it('treats an existing stat(dir_uri) guard (upstream fix) as already-fixed', () => { + const upstream = SAMPLE.replace( + ' try:\n try:', + ' try:\n dir_stat = await viking_fs.stat(dir_uri, ctx=ctx)\n try:', + ); + expect(patchSemanticProcessorSource(upstream).status).toBe('already-fixed'); + }); + + it('returns no-anchor when the ls(dir_uri) call is absent', () => { + const noLs = SAMPLE.replace( + 'entries = await viking_fs.ls(dir_uri, node_limit=LS_ALL_NODES, ctx=ctx)', + 'entries = []', + ); + expect(patchSemanticProcessorSource(noLs).status).toBe('no-anchor'); + }); + + it('returns no-anchor when _process_memory_directory is absent', () => { + expect(patchSemanticProcessorSource('class Foo:\n pass\n').status).toBe('no-anchor'); + }); +});