From 8fee072d9312d82e0c53b826f7bba696991ac9fd Mon Sep 17 00:00:00 2001 From: Vignesh Narayanaswamy Date: Wed, 10 Jun 2026 10:09:40 -0700 Subject: [PATCH] perf(snowflake): composite_summary as one self-contained SQL statement MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The Snowflake backend previously delegated composite_summary() to a V_COMPOSITES view it never creates — an undocumented deployment dependency — and that view computed derived fields with per-row correlated subqueries. Deployments without the view fell back to the SDK's sequential path: list_models() over the full inventory plus per-composite snapshot replays (measured: 92.6s and 568 round trips for a 41-composite production ledger). composite_summary() now computes everything in ONE CTE statement over the backend's own MODELS and SNAPSHOTS tables, replicating the SDK fallback semantics exactly: - composites: MODEL_TYPE IN (...) pushed down to SQL - member_count: replay of Ledger.members() — baseline from depends_on snapshots with payload.relationship='member_of' resolved against MODELS, overlaid by member_added/member_removed in timestamp order, latest op wins per (composite, member); unresolvable adds are no-ops - last_validated: MAX(timestamp) of 'validated' snapshots - open_observation_count: distinct observation_id issued and never resolved (set semantics, ignoring events without an observation_id) Parity proven against a production ledger (28.8k models / 212k snapshots): row-for-row identical with both the live sequential fallback (41/41 rows) and an in-memory replay oracle over bulk-loaded data (2334/2334 rows). Warm runtime 0.3-0.6s. InMemoryLedgerBackend intentionally keeps no composite_summary — the SDK fallback remains the reference implementation and the dispatch seam is covered by a new SDK test. Co-Authored-By: Claude Fable 5 --- CHANGELOG.md | 4 + src/model_ledger/backends/snowflake.py | 142 +++++++++++-- tests/test_backends/test_snowflake_ledger.py | 204 +++++++++++++++++++ tests/test_sdk/test_ledger.py | 19 ++ 4 files changed, 356 insertions(+), 13 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 7f4013f..f9e7cb8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,9 @@ # Changelog +## Unreleased + +- perf: `SnowflakeLedgerBackend.composite_summary()` computes the full composite inventory in ONE self-contained SQL statement over MODELS + SNAPSHOTS (CTE-based event replay), replacing the previous delegation to an externally-managed `V_COMPOSITES` view. Removes the undocumented requirement that deployments create that view, and replicates the SDK fallback semantics exactly (membership baseline from `member_of` dependency links + `member_added`/`member_removed` replay with latest-op-wins; distinct-id open-observation set semantics). Measured against a production-scale ledger (28.8k models / 212k snapshots): 92.6s sequential fallback → sub-second warm (~0.3-0.6s). + ## v0.7.3 - Add `metadata: dict` field to `ModelRef`. Thread through `register()` and `register_group()`. Replaces the unintended per-link broadcast of `register_group(metadata=...)` to member links; metadata now lives on the composite ModelRef itself. Backward compatible: existing data loads with `metadata={}`. diff --git a/src/model_ledger/backends/snowflake.py b/src/model_ledger/backends/snowflake.py index 35535df..fb1079f 100644 --- a/src/model_ledger/backends/snowflake.py +++ b/src/model_ledger/backends/snowflake.py @@ -479,26 +479,142 @@ def composite_summary( self, model_types: list[str] | None = None, ) -> list[dict[str, Any]]: - """Query V_COMPOSITES view — single query replaces N+1 per-model calls.""" + """Flat composite inventory in ONE SQL statement. + + Replicates the semantics of the SDK fallback in + ``Ledger.composite_summary`` (which issues 2 round trips per + composite) as a single set-based query: + + - membership baseline: ``depends_on`` snapshots on the composite with + ``payload.relationship = 'member_of'``, resolved against MODELS via + ``payload.upstream_hash`` (name match first, then hash — mirroring + ``Ledger.get``); unresolvable links are dropped. + - membership events: ``member_added`` / ``member_removed`` overlay the + baseline in timestamp order, latest op wins per (composite, member). + ``member_added`` events whose member resolves to no registered model + (by ``payload.member_hash`` or ``payload.member_name``) are no-ops + in the SDK replay and are dropped here too. + - last_validated: MAX(timestamp) of ``validated`` snapshots. + - open_observation_count: distinct ``observation_id`` values issued + and never resolved; events without an ``observation_id`` payload + key are ignored (set semantics, matching + ``Ledger.open_observation_count``). + """ self._flush_models() self._flush_snapshots() target_types = model_types or ["composite"] placeholders = ", ".join(_esc(t) for t in target_types) - sql = ( - f"SELECT v.NAME, v.OWNER, v.TIER, v.STATUS, v.MODEL_TYPE," - f" v.MEMBER_COUNT, v.LAST_VALIDATED, v.OPEN_OBSERVATION_COUNT," - f" m.METADATA" - f" FROM {self._schema}.V_COMPOSITES v" - f" JOIN {self._schema}.MODELS m ON m.NAME = v.NAME" - f" WHERE v.MODEL_TYPE IN ({placeholders})" - f" ORDER BY v.NAME" - ) + sql = f""" + WITH composites AS ( + SELECT MODEL_HASH, NAME, OWNER, TIER, STATUS, MODEL_TYPE, METADATA + FROM {self._schema}.MODELS + WHERE MODEL_TYPE IN ({placeholders}) + ), + relevant_snaps AS ( + SELECT s.MODEL_HASH, s.SNAPSHOT_HASH, s.EVENT_TYPE, s.TIMESTAMP, s.PAYLOAD + FROM {self._schema}.SNAPSHOTS s + JOIN composites c ON c.MODEL_HASH = s.MODEL_HASH + WHERE s.EVENT_TYPE IN ('depends_on', 'member_added', 'member_removed', + 'validated', 'observation_issued', 'observation_resolved') + ), + member_baseline AS ( + SELECT DISTINCT + s.MODEL_HASH AS COMPOSITE_HASH, + COALESCE(m_name.MODEL_HASH, m_hash.MODEL_HASH) AS MEMBER_KEY + FROM relevant_snaps s + LEFT JOIN {self._schema}.MODELS m_name + ON m_name.NAME = s.PAYLOAD:upstream_hash::VARCHAR + LEFT JOIN {self._schema}.MODELS m_hash + ON m_hash.MODEL_HASH = s.PAYLOAD:upstream_hash::VARCHAR + WHERE s.EVENT_TYPE = 'depends_on' + AND COALESCE(s.PAYLOAD:relationship::VARCHAR, 'depends_on') = 'member_of' + AND COALESCE(m_name.MODEL_HASH, m_hash.MODEL_HASH) IS NOT NULL + ), + member_events AS ( + SELECT + s.MODEL_HASH AS COMPOSITE_HASH, + COALESCE(s.PAYLOAD:member_hash::VARCHAR, '') AS MEMBER_KEY, + s.PAYLOAD:member_name::VARCHAR AS MEMBER_NAME, + s.EVENT_TYPE, + s.TIMESTAMP, + s.SNAPSHOT_HASH + FROM relevant_snaps s + WHERE s.EVENT_TYPE IN ('member_added', 'member_removed') + ), + effective_events AS ( + SELECT e.COMPOSITE_HASH, e.MEMBER_KEY, e.EVENT_TYPE, e.TIMESTAMP, e.SNAPSHOT_HASH + FROM member_events e + WHERE e.EVENT_TYPE = 'member_removed' + OR EXISTS ( + SELECT 1 FROM {self._schema}.MODELS m + WHERE m.NAME = e.MEMBER_KEY OR m.MODEL_HASH = e.MEMBER_KEY + OR m.NAME = e.MEMBER_NAME OR m.MODEL_HASH = e.MEMBER_NAME + ) + ), + last_op AS ( + SELECT COMPOSITE_HASH, MEMBER_KEY, EVENT_TYPE + FROM effective_events + QUALIFY ROW_NUMBER() OVER ( + PARTITION BY COMPOSITE_HASH, MEMBER_KEY + ORDER BY TIMESTAMP DESC, SNAPSHOT_HASH DESC + ) = 1 + ), + membership AS ( + ( + SELECT COMPOSITE_HASH, MEMBER_KEY FROM member_baseline + UNION + SELECT COMPOSITE_HASH, MEMBER_KEY FROM last_op + WHERE EVENT_TYPE = 'member_added' + ) + EXCEPT + SELECT COMPOSITE_HASH, MEMBER_KEY FROM last_op + WHERE EVENT_TYPE = 'member_removed' + ), + member_counts AS ( + SELECT COMPOSITE_HASH, COUNT(*) AS MEMBER_COUNT + FROM membership + GROUP BY COMPOSITE_HASH + ), + validations AS ( + SELECT MODEL_HASH AS COMPOSITE_HASH, MAX(TIMESTAMP) AS LAST_VALIDATED + FROM relevant_snaps + WHERE EVENT_TYPE = 'validated' + GROUP BY MODEL_HASH + ), + open_obs AS ( + SELECT COMPOSITE_HASH, COUNT(*) AS OPEN_OBSERVATION_COUNT + FROM ( + SELECT MODEL_HASH AS COMPOSITE_HASH, + PAYLOAD:observation_id::VARCHAR AS OBS_ID + FROM relevant_snaps + WHERE EVENT_TYPE IN ('observation_issued', 'observation_resolved') + AND PAYLOAD:observation_id::VARCHAR IS NOT NULL + AND PAYLOAD:observation_id::VARCHAR != '' + GROUP BY COMPOSITE_HASH, OBS_ID + HAVING COUNT_IF(EVENT_TYPE = 'observation_issued') > 0 + AND COUNT_IF(EVENT_TYPE = 'observation_resolved') = 0 + ) + GROUP BY COMPOSITE_HASH + ) + SELECT c.NAME, c.OWNER, c.TIER, c.STATUS, c.MODEL_TYPE, + COALESCE(mc.MEMBER_COUNT, 0) AS MEMBER_COUNT, + v.LAST_VALIDATED, + COALESCE(oo.OPEN_OBSERVATION_COUNT, 0) AS OPEN_OBSERVATION_COUNT, + c.METADATA + FROM composites c + LEFT JOIN member_counts mc ON mc.COMPOSITE_HASH = c.MODEL_HASH + LEFT JOIN validations v ON v.COMPOSITE_HASH = c.MODEL_HASH + LEFT JOIN open_obs oo ON oo.COMPOSITE_HASH = c.MODEL_HASH + ORDER BY c.NAME""" rows = _exec(self._session, sql) results = [] for r in rows: raw = r.get("METADATA") or {} if isinstance(raw, str): raw = json.loads(raw) if raw else {} + last_validated = r.get("LAST_VALIDATED") + if last_validated is not None and not isinstance(last_validated, datetime): + last_validated = datetime.fromisoformat(str(last_validated)) results.append( { "name": r["NAME"], @@ -506,9 +622,9 @@ def composite_summary( "tier": r["TIER"], "status": r["STATUS"], "model_type": r["MODEL_TYPE"], - "member_count": r["MEMBER_COUNT"] or 0, - "last_validated": r.get("LAST_VALIDATED"), - "open_observation_count": r["OPEN_OBSERVATION_COUNT"] or 0, + "member_count": int(r["MEMBER_COUNT"] or 0), + "last_validated": last_validated, + "open_observation_count": int(r["OPEN_OBSERVATION_COUNT"] or 0), "metadata": raw if isinstance(raw, dict) else {}, } ) diff --git a/tests/test_backends/test_snowflake_ledger.py b/tests/test_backends/test_snowflake_ledger.py index 6ffaef6..5de29f9 100644 --- a/tests/test_backends/test_snowflake_ledger.py +++ b/tests/test_backends/test_snowflake_ledger.py @@ -322,3 +322,207 @@ def sql(self, query: str, params: Any = None) -> MockCollectResult: assert seen_hashes.count(model.model_hash) == 1, ( f"model written {seen_hashes.count(model.model_hash)}x to MERGE source, expected 1" ) + + +class FakeCompositeSummarySession: + """Captures every SQL statement; returns canned rows for the summary query.""" + + def __init__(self, rows=None): + self.queries: list[str] = [] + self._rows = rows or [] + + def sql(self, query: str, params: Any = None) -> MockCollectResult: + self.queries.append(query) + if "WITH composites AS" in query: + return MockCollectResult(self._rows) + return MockCollectResult([]) + + +class TestCompositeSummarySQL: + def _backend(self, rows=None): + from model_ledger.backends.snowflake import SnowflakeLedgerBackend + + session = FakeCompositeSummarySession(rows) + backend = SnowflakeLedgerBackend(schema="TEST_SCHEMA", connection=session) + session.queries.clear() # drop the _ensure_tables DDL + return backend, session + + def test_single_statement_with_type_pushdown(self): + backend, session = self._backend() + backend.composite_summary(model_types=["ml_model", "heuristic"]) + assert len(session.queries) == 1, "composite_summary must issue exactly one statement" + sql = session.queries[0] + assert "MODEL_TYPE IN ('ml_model', 'heuristic')" in sql + assert "V_COMPOSITES" not in sql, "must not depend on an externally-managed view" + + def test_default_model_type_is_composite(self): + backend, session = self._backend() + backend.composite_summary() + assert "MODEL_TYPE IN ('composite')" in session.queries[0] + + def test_model_types_are_escaped(self): + backend, session = self._backend() + backend.composite_summary(model_types=["ty'pe"]) + assert "'ty''pe'" in session.queries[0] + + def test_replicates_sdk_replay_semantics_in_sql(self): + """The single statement must encode the SDK fallback semantics.""" + backend, session = self._backend() + backend.composite_summary() + sql = session.queries[0] + # membership baseline: member_of dependency links resolved against MODELS + assert "'member_of'" in sql + assert "upstream_hash" in sql + # event overlay: latest op wins per (composite, member) + assert "member_added" in sql and "member_removed" in sql + assert "ROW_NUMBER() OVER" in sql + # open observations: distinct-id set semantics, not raw event counts + assert "observation_id" in sql + assert "COUNT_IF(EVENT_TYPE = 'observation_resolved') = 0" in sql + + def test_row_mapping_and_null_coalescing(self): + ts = datetime(2026, 1, 2, 3, 4, 5, tzinfo=timezone.utc) + rows = [ + { + "NAME": "credit-scorecard", + "OWNER": "risk-team", + "TIER": "high", + "STATUS": "active", + "MODEL_TYPE": "composite", + "MEMBER_COUNT": 3, + "LAST_VALIDATED": ts, + "OPEN_OBSERVATION_COUNT": 1, + "METADATA": '{"source": "registry"}', + }, + { + "NAME": "empty-group", + "OWNER": "ops-team", + "TIER": "low", + "STATUS": "active", + "MODEL_TYPE": "composite", + "MEMBER_COUNT": None, + "LAST_VALIDATED": None, + "OPEN_OBSERVATION_COUNT": None, + "METADATA": None, + }, + ] + backend, _ = self._backend(rows) + result = backend.composite_summary() + assert result == [ + { + "name": "credit-scorecard", + "owner": "risk-team", + "tier": "high", + "status": "active", + "model_type": "composite", + "member_count": 3, + "last_validated": ts, + "open_observation_count": 1, + "metadata": {"source": "registry"}, + }, + { + "name": "empty-group", + "owner": "ops-team", + "tier": "low", + "status": "active", + "model_type": "composite", + "member_count": 0, + "last_validated": None, + "open_observation_count": 0, + "metadata": {}, + }, + ] + + def test_last_validated_string_coerced_to_datetime(self): + rows = [ + { + "NAME": "g", + "OWNER": "o", + "TIER": "t", + "STATUS": "active", + "MODEL_TYPE": "composite", + "MEMBER_COUNT": 0, + "LAST_VALIDATED": "2026-01-02T03:04:05+00:00", + "OPEN_OBSERVATION_COUNT": 0, + "METADATA": None, + } + ] + backend, _ = self._backend(rows) + result = backend.composite_summary() + assert result[0]["last_validated"] == datetime(2026, 1, 2, 3, 4, 5, tzinfo=timezone.utc) + + def test_flushes_buffered_writes_before_querying(self): + from model_ledger.core.ledger_models import ModelRef + + backend, session = self._backend() + backend.save_model( + ModelRef(name="g", owner="o", model_type="composite", tier="high", purpose="") + ) + backend.composite_summary() + merge_idx = next(i for i, q in enumerate(session.queries) if "MERGE INTO" in q) + select_idx = next(i for i, q in enumerate(session.queries) if "WITH composites AS" in q) + assert merge_idx < select_idx + + def test_parity_with_in_memory_fallback(self): + """Recorded-rows parity: the Snowflake mapping must produce exactly the + dicts the SDK fallback produces for an equivalent event history. + + Scenario (mirrored between the in-memory ledger and the recorded rows): + - 'credit-scorecard' seeded with one member via register_group (dep-link + baseline), one member added via add_member, one added then removed + (latest op wins -> excluded): member_count == 2 + - OBS-1 issued then resolved, OBS-2 issued: open_observation_count == 1 + - one validated event: last_validated == its timestamp + """ + from model_ledger.sdk.ledger import Ledger + + ledger = Ledger() # InMemoryLedgerBackend: no composite_summary -> fallback + for name in ["feature_pipeline", "scoring_model", "alert_queue"]: + ledger.register( + name=name, owner="risk-team", model_type="ml_model", tier="high", purpose="x" + ) + ledger.register_group( + name="credit-scorecard", + owner="risk-team", + model_type="composite", + tier="high", + purpose="Credit risk scoring pipeline", + members=["feature_pipeline"], + actor="test", + metadata={"source": "registry"}, + ) + ledger.add_member("credit-scorecard", "scoring_model", actor="test") + ledger.add_member("credit-scorecard", "alert_queue", actor="test") + ledger.remove_member("credit-scorecard", "alert_queue", actor="test") + ledger.record_observation( + "credit-scorecard", observation_id="OBS-1", observation="a", status="open", actor="t" + ) + ledger.record_observation( + "credit-scorecard", observation_id="OBS-2", observation="b", status="open", actor="t" + ) + ledger.resolve_observation( + "credit-scorecard", observation_id="OBS-1", resolution="fixed", actor="t" + ) + ledger.record_validation("credit-scorecard", result="passed", actor="t") + + expected = ledger.composite_summary() + assert len(expected) == 1 + assert expected[0]["member_count"] == 2 + assert expected[0]["open_observation_count"] == 1 + + # Recorded rows: what the single-statement SQL returns for this history. + rows = [ + { + "NAME": "credit-scorecard", + "OWNER": "risk-team", + "TIER": "high", + "STATUS": "active", + "MODEL_TYPE": "composite", + "MEMBER_COUNT": 2, + "LAST_VALIDATED": expected[0]["last_validated"], + "OPEN_OBSERVATION_COUNT": 1, + "METADATA": '{"source": "registry"}', + } + ] + backend, _ = self._backend(rows) + assert backend.composite_summary() == expected diff --git a/tests/test_sdk/test_ledger.py b/tests/test_sdk/test_ledger.py index 686f62b..88bb045 100644 --- a/tests/test_sdk/test_ledger.py +++ b/tests/test_sdk/test_ledger.py @@ -1054,6 +1054,25 @@ def test_composite_summary_custom_model_types(self, ledger): assert "Group B" in names assert "Group C" in names + def test_dispatches_to_backend_composite_summary_when_available(self): + """Backends exposing composite_summary get the call pushed down + (with the resolved model_types), bypassing the N+1 fallback.""" + + class StubBackend(InMemoryLedgerBackend): + def __init__(self): + super().__init__() + self.calls: list[list[str]] = [] + + def composite_summary(self, model_types=None): + self.calls.append(model_types) + return [{"name": "from-backend"}] + + backend = StubBackend() + ledger = Ledger(backend) + assert ledger.composite_summary() == [{"name": "from-backend"}] + assert ledger.composite_summary(model_types=["ml_model"]) == [{"name": "from-backend"}] + assert backend.calls == [["composite"], ["ml_model"]] + class TestInvestigateComposite: def test_investigate_composite_includes_governance(self, ledger):