Skip to content

feat(v0.14): Connector + Orchestrator + Storage forward path (DEV-471)#35

Open
torresmateo wants to merge 1 commit into
v0.14from
mateo/dev-471-slice-1-agent-library-v014-tracer-connector-abc-orchestrator
Open

feat(v0.14): Connector + Orchestrator + Storage forward path (DEV-471)#35
torresmateo wants to merge 1 commit into
v0.14from
mateo/dev-471-slice-1-agent-library-v014-tracer-connector-abc-orchestrator

Conversation

@torresmateo
Copy link
Copy Markdown
Collaborator

Introduce the v0.14 single forward path and reimplement file indexing on top of it, keeping CLI/Python/MCP behavior the same from the user's POV while breaking (and rebuilding) the on-disk schema.

Core spine:

  • librarian.connectors: Connector ABC, ChangeEvent (DocumentUpsert / DocumentSoftDelete), ChunkInput, and the built-in LocalFileConnector (stateless, DB-free; replicates .gitignore/.librariantrack/force-include).
  • librarian.orchestrator.Orchestrator: parses/chunks/embeds and writes content
    • advances the sync cursor in one atomic transaction (idempotent, crash-safe).
  • librarian.storage.protocols: MetadataStore/VectorStore/FTSStore/StateStore bundle + SyncState; SQLiteStorage concrete implementation.
  • Deterministic document_id/chunk_id hashing (librarian.ids).
  • librarian.storage.migrate: additive v0.14 schema (new chunks/documents columns, chunk_embeddings.model_version, sync_state table).

User-facing integration:

  • libr / MCP ingest now route through LocalFileConnector + Orchestrator; IndexingService is a deprecated shim (DeprecationWarning -> Orchestrator).
  • Database mutators emit DeprecationWarning; Database.connection() is now private (_connection).
  • search_library returns chunk_id as a deterministic str hash and adds chunk_source_uri/chunk_index/document_size/source_created_at.
  • v0.13 -> v0.14 detect-and-rebuild: startup guard refuses on the old schema; libr index --rebuild [--no-backup] backs up to .v0-backup, wipes, recreates under v0.14, and re-ingests configured sources.
  • import-linter contracts wall connectors/protocols off from private internals.

Tests: connector contract, orchestrator atomic/idempotent/crash/soft-delete, schema detection + rebuild gating, enriched search shape, and import contracts. Also fixes a latent clean_db fixture that never isolated the test database.

Introduce the v0.14 single forward path and reimplement file indexing on top
of it, keeping CLI/Python/MCP behavior the same from the user's POV while
breaking (and rebuilding) the on-disk schema.

Core spine:
- librarian.connectors: Connector ABC, ChangeEvent (DocumentUpsert /
  DocumentSoftDelete), ChunkInput, and the built-in LocalFileConnector
  (stateless, DB-free; replicates .gitignore/.librariantrack/force-include).
- librarian.orchestrator.Orchestrator: parses/chunks/embeds and writes content
  + advances the sync cursor in one atomic transaction (idempotent, crash-safe).
- librarian.storage.protocols: MetadataStore/VectorStore/FTSStore/StateStore
  bundle + SyncState; SQLiteStorage concrete implementation.
- Deterministic document_id/chunk_id hashing (librarian.ids).
- librarian.storage.migrate: additive v0.14 schema (new chunks/documents
  columns, chunk_embeddings.model_version, sync_state table).

User-facing integration:
- libr / MCP ingest now route through LocalFileConnector + Orchestrator;
  IndexingService is a deprecated shim (DeprecationWarning -> Orchestrator).
- Database mutators emit DeprecationWarning; Database.connection() is now
  private (_connection).
- search_library returns chunk_id as a deterministic str hash and adds
  chunk_source_uri/chunk_index/document_size/source_created_at.
- v0.13 -> v0.14 detect-and-rebuild: startup guard refuses on the old schema;
  `libr index --rebuild [--no-backup]` backs up to <db>.v0-backup, wipes,
  recreates under v0.14, and re-ingests configured sources.
- import-linter contracts wall connectors/protocols off from private internals.

Tests: connector contract, orchestrator atomic/idempotent/crash/soft-delete,
schema detection + rebuild gating, enriched search shape, and import contracts.
Also fixes a latent clean_db fixture that never isolated the test database.
@torresmateo torresmateo requested review from Spartee and jottakka June 1, 2026 15:42
@torresmateo
Copy link
Copy Markdown
Collaborator Author

@Spartee @jottakka for when you're reviewing this. I'm not merging this into main until I've added Postgres support with a working search (later this week). 0.14 changes the schema so it will force users to rebuild the index (safe and tested!)

I will open a couple of PRs against this branch in case the connector shape needs some fine tuning (unlikely but futurology is not my thing lol)

now = datetime.now().isoformat()
cursor = conn.execute(
"""
UPDATE chunks
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 — soft-deleted content stays fully searchable (tombstone is cosmetic).

soft_delete_document only sets chunks.deleted_at. But no read path filters deleted_at IS NULL: FTSStore.search (fts_store.py:84-92), VectorStore.search/search_by_modality (vector_store.py:156-164, :248-265) and the HybridSearcher paths in retrieval/search.py all join chunksdocuments with no tombstone predicate. The chunks_au AFTER UPDATE trigger (database.py:289) even re-inserts the unchanged content back into chunks_fts. The idx_chunks_deleted_at index added in migrate.py:164 is never used by any query.

Verified locally: after soft_delete_document, the doc still came back from both keyword AND semantic search. The whole feature is a no-op from the consumer's POV.

Fix: add AND c.deleted_at IS NULL to the FTS + vector SELECTs and surface deleted_at on results. (Currently latent because the only emitter — Orchestrator.sync / connector.fetch_changes — isn't wired into the CLI/MCP flow yet, but it's reachable via the public SQLiteStorage API and breaks the moment sync() ships.)

state = self.storage.get_sync_state(source_key) or SyncState(
source_key=source_key, cursor={"mtimes": {}}
)
state.cursor.setdefault("mtimes", {})[str(file_path)] = mtime
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 (perf) — sync_state cursor is rewritten in full on every file → O(N²).

index_file appends one {path: mtime} entry to the growing cursor["mtimes"] map and put_sync_state json.dumps-es and rewrites the entire blob every file (sqlite_storage.py:143). The streaming path has the same blow-up (local_file.py:101 sets checkpoint = {"mtimes": dict(known)} per event). For an initial ingest of N new files this is ~N²/2 serialization + a commit each time.

Fix: store per-file mtimes in a dedicated indexed table (source_key, path, mtime) upserting one row per file, or write the full-map cursor once at end-of-sync.

FileReadTimeoutError: If ``stat()`` times out (cloud storage not synced).
FileReadError: For other I/O errors accessing the file.
"""
file_path = Path(file_path)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 (correctness) — non-deterministic document_id across ingest paths → duplicate rows.

index_file does Path(file_path) with no .resolve(), while the streaming connector resolves before hashing (local_file.py:132). _upsert_document_row dedups on document_id = ? OR path = ?, so a relative / symlinked / .. path yields a different id and path than the resolved form → a second documents row for the same physical file with duplicate chunks/embeddings. Defeats the deterministic-id idempotency guarantee.

Fix: Path(file_path).resolve() here (and normalize in LocalFileConnector.build_upsert) so every value feeding ids.document_id is canonical.

self,
storage: SQLiteStorage | None = None,
embedder: Any = None,
) -> None:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 (DIP) — Orchestrator hard-depends on concrete SQLiteStorage, not the Storage protocol it was built for.

The PR adds a clean Storage/MetadataStore/VectorStore/FTSStore/StateStore bundle (protocols.py) promising substrate-agnostic callers, but the engine annotates storage: SQLiteStorage | None and defaults to get_storage() (concrete). The protocol is never used as a type where it matters, so the abstraction is currently dead and the Postgres-portability claim is false.

Fix: annotate storage: Storage and inject the concrete backend at the composition root.

Comment thread librarian/server.py

storage = SQLiteStorage(database=get_database())
storage.migrate()
return Orchestrator(storage=storage).index_file(file_path)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 — storage.migrate() + a fresh SQLiteStorage/Orchestrator run on every single file.

_process_and_index_file is called per file inside the index_directory_to_library loop (server.py:314), and each call rebuilds storage and runs migrate() (PRAGMA introspection, 3× CREATE INDEX, sync_state DDL, commit) — bypassing the memoized get_storage() singleton (sqlite_storage.py:298-304) that exists to migrate exactly once. N schema passes + N extra commits per directory import.

Fix: build storage + orchestrator once per ingest run and reuse them in the loop.

def put_sync_state(self, state: SyncState, conn: Any = None) -> None: ...

def write_upsert(self, conn: Any, prepared: Any) -> None: ...

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 (ISP / type-boundary leak) — the Storage protocol leaks the SQLite handle and defeats typing.

write_upsert(self, conn: Any, prepared: Any) / soft_delete_document(self, conn: Any, ...) thread a raw sqlite3.Connection and a PreparedDocument through as Any, so the "substrate-agnostic" protocol hard-bakes "there is a SQL connection the caller passes around" (a Postgres/async backend wouldn't honor it) and loses type-checking at the one boundary that matters. Storage also re-declares the StateStore methods.

Fix: type prepared: PreparedDocument; replace conn: Any with an opaque Transaction/TxnHandle; compose state: StateStore instead of duplicating its methods.

def _replace_chunks(
self, conn: sqlite3.Connection, doc_pk: int, prepared: PreparedDocument
) -> None:
for chunk in prepared.chunks:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 (perf) — whole-document re-embed on any change; deterministic chunk_ids not used to skip unchanged chunks.

_replace_chunks deletes every chunk + embedding for the document and re-inserts/re-embeds all of them whenever a file's mtime changes (orchestrator.py:234 always embeds all chunks). Editing one line of a 2,000-chunk file re-embeds all 2,000 — embedding is the dominant cost. Also: per-row INSERTs here each fire 3 FTS triggers; consider executemany.

Fix: diff prepared chunk_ids against existing ones for the document; embed/insert only new/changed ids, delete removed ones, leave unchanged rows in place.

deleted_at, reason = conn.execute(
"SELECT deleted_at, deletion_reason FROM chunks LIMIT 1"
).fetchone()
assert deleted_at is not None
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 (testing) — soft-delete is verified only by column state; no test searches, so the broken read path shipped green.

This asserts deleted_at/deletion_reason are set but never runs a search. Because the read paths don't filter tombstones (see the sqlite_storage.py comment), a soft-deleted chunk is still returned by keyword and semantic search.

Add: ingest → assert search_library(...) returns it → soft-delete → assert the same query returns [] for both SearchMode.KEYWORD and SearchMode.SEMANTIC.

Comment thread librarian/cli.py
# Wipe and recreate cleanly under the v0.14 schema.
db_module._db_instance = None
if db_path.exists():
db_path.unlink()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 (testing) — the destructive _rebuild_v014 path is entirely untested.

Only the v0.13 guard is covered (test_schema_rebuild.py); the remedy itself is not. Untested: backup is actually created at <db>.v0-backup; --no-backup skips it; a backup failure (shutil.copy2 raising) must NOT proceed to unlink() and lose data; wipe+recreate yields v0.14; re-ingest repopulates. This is the single most data-destructive path in the PR.

Add CliRunner tests for each of those, including a monkeypatch that makes copy2 raise and asserts the original DB still exists.

Comment on lines +154 to +158
Context manager for database connections (private as of v0.14).

The public ``connection()`` accessor was removed in v0.14: external code
must go through ``Orchestrator`` / ``Storage`` rather than reaching into
raw connections. Kept private for internal store implementations.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are some comments that talk about past elements, are they really needed? Maybe ask the agent do a check in comments in general to be more about the behavior of the methods instead of those meta references of temporal states of the code (ine case you find it useful)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants