Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Changelog

## Unreleased

- perf: `SnowflakeLedgerBackend.composite_summary()` computes the full composite inventory in ONE self-contained SQL statement over MODELS + SNAPSHOTS (CTE-based event replay), replacing the previous delegation to an externally-managed `V_COMPOSITES` view. Removes the undocumented requirement that deployments create that view, and replicates the SDK fallback semantics exactly (membership baseline from `member_of` dependency links + `member_added`/`member_removed` replay with latest-op-wins; distinct-id open-observation set semantics). Measured against a production-scale ledger (28.8k models / 212k snapshots): 92.6s sequential fallback → sub-second warm (~0.3-0.6s).

## v0.7.3

- Add `metadata: dict` field to `ModelRef`. Thread through `register()` and `register_group()`. Replaces the unintended per-link broadcast of `register_group(metadata=...)` to member links; metadata now lives on the composite ModelRef itself. Backward compatible: existing data loads with `metadata={}`.
Expand Down
142 changes: 129 additions & 13 deletions src/model_ledger/backends/snowflake.py
Original file line number Diff line number Diff line change
Expand Up @@ -479,36 +479,152 @@ def composite_summary(
self,
model_types: list[str] | None = None,
) -> list[dict[str, Any]]:
"""Query V_COMPOSITES view — single query replaces N+1 per-model calls."""
"""Flat composite inventory in ONE SQL statement.

Replicates the semantics of the SDK fallback in
``Ledger.composite_summary`` (which issues 2 round trips per
composite) as a single set-based query:

- membership baseline: ``depends_on`` snapshots on the composite with
``payload.relationship = 'member_of'``, resolved against MODELS via
``payload.upstream_hash`` (name match first, then hash — mirroring
``Ledger.get``); unresolvable links are dropped.
- membership events: ``member_added`` / ``member_removed`` overlay the
baseline in timestamp order, latest op wins per (composite, member).
``member_added`` events whose member resolves to no registered model
(by ``payload.member_hash`` or ``payload.member_name``) are no-ops
in the SDK replay and are dropped here too.
- last_validated: MAX(timestamp) of ``validated`` snapshots.
- open_observation_count: distinct ``observation_id`` values issued
and never resolved; events without an ``observation_id`` payload
key are ignored (set semantics, matching
``Ledger.open_observation_count``).
"""
self._flush_models()
self._flush_snapshots()
target_types = model_types or ["composite"]
placeholders = ", ".join(_esc(t) for t in target_types)
sql = (
f"SELECT v.NAME, v.OWNER, v.TIER, v.STATUS, v.MODEL_TYPE,"
f" v.MEMBER_COUNT, v.LAST_VALIDATED, v.OPEN_OBSERVATION_COUNT,"
f" m.METADATA"
f" FROM {self._schema}.V_COMPOSITES v"
f" JOIN {self._schema}.MODELS m ON m.NAME = v.NAME"
f" WHERE v.MODEL_TYPE IN ({placeholders})"
f" ORDER BY v.NAME"
)
sql = f"""
WITH composites AS (
SELECT MODEL_HASH, NAME, OWNER, TIER, STATUS, MODEL_TYPE, METADATA
FROM {self._schema}.MODELS
WHERE MODEL_TYPE IN ({placeholders})
),
relevant_snaps AS (
SELECT s.MODEL_HASH, s.SNAPSHOT_HASH, s.EVENT_TYPE, s.TIMESTAMP, s.PAYLOAD
FROM {self._schema}.SNAPSHOTS s
JOIN composites c ON c.MODEL_HASH = s.MODEL_HASH
WHERE s.EVENT_TYPE IN ('depends_on', 'member_added', 'member_removed',
'validated', 'observation_issued', 'observation_resolved')
),
member_baseline AS (
SELECT DISTINCT
s.MODEL_HASH AS COMPOSITE_HASH,
COALESCE(m_name.MODEL_HASH, m_hash.MODEL_HASH) AS MEMBER_KEY
FROM relevant_snaps s
LEFT JOIN {self._schema}.MODELS m_name
ON m_name.NAME = s.PAYLOAD:upstream_hash::VARCHAR
LEFT JOIN {self._schema}.MODELS m_hash
ON m_hash.MODEL_HASH = s.PAYLOAD:upstream_hash::VARCHAR
WHERE s.EVENT_TYPE = 'depends_on'
AND COALESCE(s.PAYLOAD:relationship::VARCHAR, 'depends_on') = 'member_of'
AND COALESCE(m_name.MODEL_HASH, m_hash.MODEL_HASH) IS NOT NULL
),
member_events AS (
SELECT
s.MODEL_HASH AS COMPOSITE_HASH,
COALESCE(s.PAYLOAD:member_hash::VARCHAR, '') AS MEMBER_KEY,
s.PAYLOAD:member_name::VARCHAR AS MEMBER_NAME,
s.EVENT_TYPE,
s.TIMESTAMP,
s.SNAPSHOT_HASH
FROM relevant_snaps s
WHERE s.EVENT_TYPE IN ('member_added', 'member_removed')
),
effective_events AS (
SELECT e.COMPOSITE_HASH, e.MEMBER_KEY, e.EVENT_TYPE, e.TIMESTAMP, e.SNAPSHOT_HASH
FROM member_events e
WHERE e.EVENT_TYPE = 'member_removed'
OR EXISTS (
SELECT 1 FROM {self._schema}.MODELS m
WHERE m.NAME = e.MEMBER_KEY OR m.MODEL_HASH = e.MEMBER_KEY
OR m.NAME = e.MEMBER_NAME OR m.MODEL_HASH = e.MEMBER_NAME
)
),
last_op AS (
SELECT COMPOSITE_HASH, MEMBER_KEY, EVENT_TYPE
FROM effective_events
QUALIFY ROW_NUMBER() OVER (
PARTITION BY COMPOSITE_HASH, MEMBER_KEY
ORDER BY TIMESTAMP DESC, SNAPSHOT_HASH DESC
) = 1
),
membership AS (
(
SELECT COMPOSITE_HASH, MEMBER_KEY FROM member_baseline
UNION
SELECT COMPOSITE_HASH, MEMBER_KEY FROM last_op
WHERE EVENT_TYPE = 'member_added'
)
EXCEPT
SELECT COMPOSITE_HASH, MEMBER_KEY FROM last_op
WHERE EVENT_TYPE = 'member_removed'
),
member_counts AS (
SELECT COMPOSITE_HASH, COUNT(*) AS MEMBER_COUNT
FROM membership
GROUP BY COMPOSITE_HASH
),
validations AS (
SELECT MODEL_HASH AS COMPOSITE_HASH, MAX(TIMESTAMP) AS LAST_VALIDATED
FROM relevant_snaps
WHERE EVENT_TYPE = 'validated'
GROUP BY MODEL_HASH
),
open_obs AS (
SELECT COMPOSITE_HASH, COUNT(*) AS OPEN_OBSERVATION_COUNT
FROM (
SELECT MODEL_HASH AS COMPOSITE_HASH,
PAYLOAD:observation_id::VARCHAR AS OBS_ID
FROM relevant_snaps
WHERE EVENT_TYPE IN ('observation_issued', 'observation_resolved')
AND PAYLOAD:observation_id::VARCHAR IS NOT NULL
AND PAYLOAD:observation_id::VARCHAR != ''
GROUP BY COMPOSITE_HASH, OBS_ID
HAVING COUNT_IF(EVENT_TYPE = 'observation_issued') > 0
AND COUNT_IF(EVENT_TYPE = 'observation_resolved') = 0
Comment on lines +594 to +595

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Count unresolved observations despite COUNT_IF NULLs

For an observation ID that has only observation_issued rows, Snowflake documents COUNT_IF(EVENT_TYPE = 'observation_resolved') as returning NULL when no rows satisfy the condition, so the = 0 predicate is not true. That drops every genuinely open observation from open_obs, causing open_observation_count to come back as 0 for the common case of issued-but-not-resolved observations; coalesce the count or use a SUM/IFF expression before comparing to zero.

Useful? React with 👍 / 👎.

)
GROUP BY COMPOSITE_HASH
)
SELECT c.NAME, c.OWNER, c.TIER, c.STATUS, c.MODEL_TYPE,
COALESCE(mc.MEMBER_COUNT, 0) AS MEMBER_COUNT,
v.LAST_VALIDATED,
COALESCE(oo.OPEN_OBSERVATION_COUNT, 0) AS OPEN_OBSERVATION_COUNT,
c.METADATA
FROM composites c
LEFT JOIN member_counts mc ON mc.COMPOSITE_HASH = c.MODEL_HASH
LEFT JOIN validations v ON v.COMPOSITE_HASH = c.MODEL_HASH
LEFT JOIN open_obs oo ON oo.COMPOSITE_HASH = c.MODEL_HASH
ORDER BY c.NAME"""
rows = _exec(self._session, sql)
results = []
for r in rows:
raw = r.get("METADATA") or {}
if isinstance(raw, str):
raw = json.loads(raw) if raw else {}
last_validated = r.get("LAST_VALIDATED")
if last_validated is not None and not isinstance(last_validated, datetime):
last_validated = datetime.fromisoformat(str(last_validated))
results.append(
{
"name": r["NAME"],
"owner": r["OWNER"],
"tier": r["TIER"],
"status": r["STATUS"],
"model_type": r["MODEL_TYPE"],
"member_count": r["MEMBER_COUNT"] or 0,
"last_validated": r.get("LAST_VALIDATED"),
"open_observation_count": r["OPEN_OBSERVATION_COUNT"] or 0,
"member_count": int(r["MEMBER_COUNT"] or 0),
"last_validated": last_validated,
"open_observation_count": int(r["OPEN_OBSERVATION_COUNT"] or 0),
"metadata": raw if isinstance(raw, dict) else {},
}
)
Expand Down
204 changes: 204 additions & 0 deletions tests/test_backends/test_snowflake_ledger.py
Original file line number Diff line number Diff line change
Expand Up @@ -322,3 +322,207 @@ def sql(self, query: str, params: Any = None) -> MockCollectResult:
assert seen_hashes.count(model.model_hash) == 1, (
f"model written {seen_hashes.count(model.model_hash)}x to MERGE source, expected 1"
)


class FakeCompositeSummarySession:
"""Captures every SQL statement; returns canned rows for the summary query."""

def __init__(self, rows=None):
self.queries: list[str] = []
self._rows = rows or []

def sql(self, query: str, params: Any = None) -> MockCollectResult:
self.queries.append(query)
if "WITH composites AS" in query:
return MockCollectResult(self._rows)
return MockCollectResult([])


class TestCompositeSummarySQL:
def _backend(self, rows=None):
from model_ledger.backends.snowflake import SnowflakeLedgerBackend

session = FakeCompositeSummarySession(rows)
backend = SnowflakeLedgerBackend(schema="TEST_SCHEMA", connection=session)
session.queries.clear() # drop the _ensure_tables DDL
return backend, session

def test_single_statement_with_type_pushdown(self):
backend, session = self._backend()
backend.composite_summary(model_types=["ml_model", "heuristic"])
assert len(session.queries) == 1, "composite_summary must issue exactly one statement"
sql = session.queries[0]
assert "MODEL_TYPE IN ('ml_model', 'heuristic')" in sql
assert "V_COMPOSITES" not in sql, "must not depend on an externally-managed view"

def test_default_model_type_is_composite(self):
backend, session = self._backend()
backend.composite_summary()
assert "MODEL_TYPE IN ('composite')" in session.queries[0]

def test_model_types_are_escaped(self):
backend, session = self._backend()
backend.composite_summary(model_types=["ty'pe"])
assert "'ty''pe'" in session.queries[0]

def test_replicates_sdk_replay_semantics_in_sql(self):
"""The single statement must encode the SDK fallback semantics."""
backend, session = self._backend()
backend.composite_summary()
sql = session.queries[0]
# membership baseline: member_of dependency links resolved against MODELS
assert "'member_of'" in sql
assert "upstream_hash" in sql
# event overlay: latest op wins per (composite, member)
assert "member_added" in sql and "member_removed" in sql
assert "ROW_NUMBER() OVER" in sql
# open observations: distinct-id set semantics, not raw event counts
assert "observation_id" in sql
assert "COUNT_IF(EVENT_TYPE = 'observation_resolved') = 0" in sql

def test_row_mapping_and_null_coalescing(self):
ts = datetime(2026, 1, 2, 3, 4, 5, tzinfo=timezone.utc)
rows = [
{
"NAME": "credit-scorecard",
"OWNER": "risk-team",
"TIER": "high",
"STATUS": "active",
"MODEL_TYPE": "composite",
"MEMBER_COUNT": 3,
"LAST_VALIDATED": ts,
"OPEN_OBSERVATION_COUNT": 1,
"METADATA": '{"source": "registry"}',
},
{
"NAME": "empty-group",
"OWNER": "ops-team",
"TIER": "low",
"STATUS": "active",
"MODEL_TYPE": "composite",
"MEMBER_COUNT": None,
"LAST_VALIDATED": None,
"OPEN_OBSERVATION_COUNT": None,
"METADATA": None,
},
]
backend, _ = self._backend(rows)
result = backend.composite_summary()
assert result == [
{
"name": "credit-scorecard",
"owner": "risk-team",
"tier": "high",
"status": "active",
"model_type": "composite",
"member_count": 3,
"last_validated": ts,
"open_observation_count": 1,
"metadata": {"source": "registry"},
},
{
"name": "empty-group",
"owner": "ops-team",
"tier": "low",
"status": "active",
"model_type": "composite",
"member_count": 0,
"last_validated": None,
"open_observation_count": 0,
"metadata": {},
},
]

def test_last_validated_string_coerced_to_datetime(self):
rows = [
{
"NAME": "g",
"OWNER": "o",
"TIER": "t",
"STATUS": "active",
"MODEL_TYPE": "composite",
"MEMBER_COUNT": 0,
"LAST_VALIDATED": "2026-01-02T03:04:05+00:00",
"OPEN_OBSERVATION_COUNT": 0,
"METADATA": None,
}
]
backend, _ = self._backend(rows)
result = backend.composite_summary()
assert result[0]["last_validated"] == datetime(2026, 1, 2, 3, 4, 5, tzinfo=timezone.utc)

def test_flushes_buffered_writes_before_querying(self):
from model_ledger.core.ledger_models import ModelRef

backend, session = self._backend()
backend.save_model(
ModelRef(name="g", owner="o", model_type="composite", tier="high", purpose="")
)
backend.composite_summary()
merge_idx = next(i for i, q in enumerate(session.queries) if "MERGE INTO" in q)
select_idx = next(i for i, q in enumerate(session.queries) if "WITH composites AS" in q)
assert merge_idx < select_idx

def test_parity_with_in_memory_fallback(self):
"""Recorded-rows parity: the Snowflake mapping must produce exactly the
dicts the SDK fallback produces for an equivalent event history.

Scenario (mirrored between the in-memory ledger and the recorded rows):
- 'credit-scorecard' seeded with one member via register_group (dep-link
baseline), one member added via add_member, one added then removed
(latest op wins -> excluded): member_count == 2
- OBS-1 issued then resolved, OBS-2 issued: open_observation_count == 1
- one validated event: last_validated == its timestamp
"""
from model_ledger.sdk.ledger import Ledger

ledger = Ledger() # InMemoryLedgerBackend: no composite_summary -> fallback
for name in ["feature_pipeline", "scoring_model", "alert_queue"]:
ledger.register(
name=name, owner="risk-team", model_type="ml_model", tier="high", purpose="x"
)
ledger.register_group(
name="credit-scorecard",
owner="risk-team",
model_type="composite",
tier="high",
purpose="Credit risk scoring pipeline",
members=["feature_pipeline"],
actor="test",
metadata={"source": "registry"},
)
ledger.add_member("credit-scorecard", "scoring_model", actor="test")
ledger.add_member("credit-scorecard", "alert_queue", actor="test")
ledger.remove_member("credit-scorecard", "alert_queue", actor="test")
ledger.record_observation(
"credit-scorecard", observation_id="OBS-1", observation="a", status="open", actor="t"
)
ledger.record_observation(
"credit-scorecard", observation_id="OBS-2", observation="b", status="open", actor="t"
)
ledger.resolve_observation(
"credit-scorecard", observation_id="OBS-1", resolution="fixed", actor="t"
)
ledger.record_validation("credit-scorecard", result="passed", actor="t")

expected = ledger.composite_summary()
assert len(expected) == 1
assert expected[0]["member_count"] == 2
assert expected[0]["open_observation_count"] == 1

# Recorded rows: what the single-statement SQL returns for this history.
rows = [
{
"NAME": "credit-scorecard",
"OWNER": "risk-team",
"TIER": "high",
"STATUS": "active",
"MODEL_TYPE": "composite",
"MEMBER_COUNT": 2,
"LAST_VALIDATED": expected[0]["last_validated"],
"OPEN_OBSERVATION_COUNT": 1,
"METADATA": '{"source": "registry"}',
}
]
backend, _ = self._backend(rows)
assert backend.composite_summary() == expected
Loading
Loading