diff --git a/changelog.d/1041.changed b/changelog.d/1041.changed new file mode 100644 index 000000000..08363c26e --- /dev/null +++ b/changelog.d/1041.changed @@ -0,0 +1 @@ +Stage 2 calibration package manifests now track the explicit target config identity and contract artifact path. diff --git a/docs/engineering/pipeline-map.md b/docs/engineering/pipeline-map.md index 19222b6a4..6b50c8949 100644 --- a/docs/engineering/pipeline-map.md +++ b/docs/engineering/pipeline-map.md @@ -378,19 +378,32 @@ Build sparse calibration matrix (targets x households x clones) | `takeup_rerand` Block-Level Takeup Re-randomization | `process` | `unknown` | `unknown` | | | `sparse_build` Sparse Matrix Construction | `process` | `unknown` | `unknown` | | | `out_pkg` calibration_package.pkl | `artifact` | `unknown` | `unknown` | | +| `out_contract` calibration_package_contract.json | `artifact` | `unknown` | `unknown` | | | `util_sql` sqlalchemy | `utility` | `unknown` | `unknown` | | | `util_pool` ProcessPoolExecutor | `utility` | `unknown` | `unknown` | | | `util_takeup_s5` compute_block_takeup_for_entities() | `utility` | `unknown` | `unknown` | | | `util_scipy` scipy.sparse | `utility` | `unknown` | `unknown` | | +| `stage2_target_config_identity` Stage 2 Target Config Identity | `library` | `current` | `moving` | `policyengine_us_data.calibration_package.specs.resolve_target_config_identity` | +| `stage2_target_config_load` Load Stage 2 Target Config | `library` | `current` | `moving` | `policyengine_us_data.calibration.unified_calibration.load_target_config` | +| `stage2_target_config_apply` Apply Stage 2 Target Config | `library` | `current` | `moving` | `policyengine_us_data.calibration.unified_calibration.apply_target_config_to_targets` | | `state_precomp` Per-State Simulation Precomputation | `library` | `current` | `moving` | `policyengine_us_data.calibration.unified_matrix_builder._compute_single_state` | | `clone_assembly` Clone Value Assembly | `library` | `current` | `moving` | `policyengine_us_data.calibration.unified_matrix_builder._assemble_clone_values_standalone` | +| `build_matrix` Build Calibration Matrix | `library` | `current` | `moving` | `policyengine_us_data.calibration.unified_matrix_builder.UnifiedMatrixBuilder.build_matrix` | +| `build_matrix_chunked` Build Calibration Matrix In Chunks | `library` | `current` | `experimental` | `policyengine_us_data.calibration.unified_matrix_builder.UnifiedMatrixBuilder.build_matrix_chunked` | +| `stage2_calibration_package_writer` Stage 2 Package Writer | `library` | `current` | `moving` | `policyengine_us_data.calibration.unified_calibration.save_calibration_package` | +| `stage2_artifact_specs` Stage 2 Artifact Specs | `library` | `current` | `moving` | `policyengine_us_data.calibration_package.specs.calibration_package_artifact_paths` | +| `stage2_calibration_package_contract_writer` Stage 2 Contract Writer | `library` | `current` | `moving` | `policyengine_us_data.stage_contracts.calibration_package.write_calibration_package_contract` | +| `stage2_calibration_package_contract_validator` Stage 2 Contract Validator | `validation` | `current` | `moving` | `policyengine_us_data.stage_contracts.calibration_package.validate_calibration_package_contract` | #### Edges - `in_cps_s5` -> `target_resolve` `data_flow` - `in_db_s5` -> `target_resolve` `external_source` (SQL targets) -- `in_config_s5` -> `target_resolve` `data_flow` (include list) -- `target_resolve` -> `target_uprate` `data_flow` +- `in_config_s5` -> `stage2_target_config_identity` `data_flow` (config file) +- `stage2_target_config_identity` -> `stage2_target_config_load` `data_flow` (resolved path and checksum) +- `stage2_target_config_load` -> `stage2_target_config_apply` `data_flow` (include/exclude rules) +- `target_resolve` -> `stage2_target_config_apply` `data_flow` (candidate targets) +- `stage2_target_config_apply` -> `target_uprate` `data_flow` (selected targets) - `target_uprate` -> `geo_build` `data_flow` - `geo_build` -> `constraint_resolve` `data_flow` - `constraint_resolve` -> `state_precomp` `data_flow` @@ -399,7 +412,19 @@ Build sparse calibration matrix (targets x households x clones) - `in_blocks_s5` -> `clone_assembly` `data_flow` (block populations) - `clone_assembly` -> `takeup_rerand` `data_flow` - `takeup_rerand` -> `sparse_build` `data_flow` -- `sparse_build` -> `out_pkg` `produces_artifact` +- `sparse_build` -> `build_matrix` `uses_library` (non-chunked path) +- `sparse_build` -> `build_matrix_chunked` `uses_library` (chunked path) +- `build_matrix` -> `stage2_calibration_package_writer` `data_flow` +- `build_matrix_chunked` -> `stage2_calibration_package_writer` `data_flow` +- `stage2_artifact_specs` -> `stage2_calibration_package_writer` `uses_utility` (package path) +- `stage2_calibration_package_writer` -> `out_pkg` `produces_artifact` +- `out_pkg` -> `stage2_calibration_package_contract_writer` `data_flow` +- `stage2_artifact_specs` -> `stage2_calibration_package_contract_writer` `uses_utility` (contract path) +- `stage2_calibration_package_contract_writer` -> `out_contract` `produces_artifact` +- `out_pkg` -> `stage2_calibration_package_contract_validator` `validates` +- `out_contract` -> `stage2_calibration_package_contract_validator` `validates` +- `in_cps_s5` -> `stage2_calibration_package_contract_validator` `validates` +- `in_db_s5` -> `stage2_calibration_package_contract_validator` `validates` - `util_sql` -> `target_resolve` `uses_utility` - `util_pool` -> `state_precomp` `uses_utility` - `util_takeup_s5` -> `takeup_rerand` `uses_utility` @@ -778,22 +803,6 @@ def build_datasets(upload: bool = False, branch: str = 'main', sequential: bool Build all datasets with preemption-resilient checkpointing. -### `policyengine_us_data.calibration.unified_matrix_builder.UnifiedMatrixBuilder.build_matrix` - -```python -def build_matrix(self, geography, sim, target_filter: Optional[dict] = None, hierarchical_domains: Optional[List[str]] = None, cache_dir: Optional[str] = None, sim_modifier = None, rerandomize_takeup: bool = True, county_level: bool = True, workers: int = 1) -> Tuple[pd.DataFrame, sparse.csr_matrix, List[str]] -``` - -Build sparse calibration matrix. - -### `policyengine_us_data.calibration.unified_matrix_builder.UnifiedMatrixBuilder.build_matrix_chunked` - -```python -def build_matrix_chunked(self, geography, sim, target_filter: Optional[dict] = None, hierarchical_domains: Optional[List[str]] = None, chunk_size: int = 25000, chunk_dir: Optional[str] = None, keep_chunks: bool = False, resume_chunks: bool = False, rerandomize_takeup: bool = True, parallel: bool = False, num_matrix_workers: int = 50, run_id: str = '') -> Tuple[pd.DataFrame, sparse.csr_matrix, List[str]] -``` - -Build a sparse matrix by materializing mixed-geography chunks. - ### `modal_app.local_area._build_publishing_input_bundle` ```python @@ -1397,7 +1406,7 @@ Compute the scope fingerprint while preserving pinned resume values. ### `policyengine_us_data.calibration.unified_calibration.run_calibration` ```python -def run_calibration(dataset_path: str, db_path: str, n_clones: int = DEFAULT_N_CLONES, lambda_l0: float = 1e-08, epochs: int = DEFAULT_EPOCHS, device: str = 'cpu', seed: int = 42, domain_variables: list = None, hierarchical_domains: list = None, skip_takeup_rerandomize: bool = False, skip_source_impute: bool = True, skip_county: bool = True, target_config: dict = None, target_config_path: str = None, build_only: bool = False, package_path: str = None, package_output_path: str = None, beta: float = BETA, lambda_l2: float = LAMBDA_L2, learning_rate: float = LEARNING_RATE, log_freq: int = None, log_path: str = None, workers: int = 1, resume_from: str = None, checkpoint_path: str = None, chunked_matrix: bool = False, chunk_size: int = 25000, chunk_dir: str = None, keep_chunks: bool = False, resume_chunks: bool = False, parallel: bool = False, num_matrix_workers: int = 50, run_id: str = '') +def run_calibration(dataset_path: str, db_path: str, n_clones: int = DEFAULT_N_CLONES, lambda_l0: float = 1e-08, epochs: int = DEFAULT_EPOCHS, device: str = 'cpu', seed: int = 42, domain_variables: list = None, hierarchical_domains: list = None, skip_takeup_rerandomize: bool = False, skip_source_impute: bool = True, skip_county: bool = True, target_config: dict = None, target_config_path: str = None, target_config_identity: TargetConfigIdentity | None = None, build_only: bool = False, package_path: str = None, package_output_path: str = None, beta: float = BETA, lambda_l2: float = LAMBDA_L2, learning_rate: float = LEARNING_RATE, log_freq: int = None, log_path: str = None, workers: int = 1, resume_from: str = None, checkpoint_path: str = None, chunked_matrix: bool = False, chunk_size: int = 25000, chunk_dir: str = None, keep_chunks: bool = False, resume_chunks: bool = False, parallel: bool = False, num_matrix_workers: int = 50, run_id: str = '') ``` Run unified calibration pipeline. diff --git a/docs/generated/pipeline_api.json b/docs/generated/pipeline_api.json index 2fd53e3e3..08561c8a6 100644 --- a/docs/generated/pipeline_api.json +++ b/docs/generated/pipeline_api.json @@ -727,7 +727,7 @@ "docstring": "", "id": "calibration_diagnostics", "kind": "function", - "line": 1144, + "line": 1249, "metadata": { "api_refs": [ "policyengine_us_data.calibration.unified_calibration.compute_diagnostics" @@ -1091,7 +1091,7 @@ "docstring": "Fit L0-regularized calibration weights.\n\nArgs:\n X_sparse: Sparse matrix (targets x records).\n targets: Target values array.\n lambda_l0: L0 regularization strength.\n epochs: Training epochs.\n device: Torch device.\n verbose_freq: Print frequency. Defaults to 10%.\n beta: L0 gate temperature.\n lambda_l2: L2 regularization strength.\n learning_rate: Optimizer learning rate.\n log_freq: Epochs between per-target CSV logs.\n None disables logging.\n log_path: Path for the per-target calibration log CSV.\n target_names: Human-readable target names for the log.\n initial_weights: Pre-computed initial weights. If None,\n computed from targets_df age targets.\n targets_df: Targets DataFrame, used to compute\n initial_weights when not provided.\n target_groups: Optional group ID per target row for balanced loss.\n resume_from: Path to a `.checkpoint.pt` file or `.npy`\n weights file to continue fitting from.\n checkpoint_path: Where to save resumable fit checkpoints.\n\nReturns:\n Weight array of shape (n_records,).", "id": "fit_model", "kind": "function", - "line": 788, + "line": 893, "metadata": { "api_refs": [ "policyengine_us_data.calibration.unified_calibration.fit_l0_weights" @@ -1410,7 +1410,7 @@ "docstring": "Compute population-based initial weights from age targets.\n\nFor each congressional district, sums person_count targets where\ndomain_variable == \"age\" to get district population, then divides\nby the number of columns (households) active in that district.\n\nArgs:\n X_sparse: Sparse matrix (targets x records).\n targets_df: Targets DataFrame with columns: variable,\n domain_variable, geo_level, geographic_id, value.\n\nReturns:\n Weight array of shape (n_records,).", "id": "init_weights", "kind": "function", - "line": 709, + "line": 814, "metadata": { "api_refs": [ "policyengine_us_data.calibration.unified_calibration.compute_initial_weights" @@ -3086,7 +3086,7 @@ "docstring": "Promote a completed pipeline run to production.\n\n1. Verify run status is \"completed\"\n2. Promote every staged artifact in one Hugging Face commit\n3. Upload/copy every artifact to GCS\n4. Finalize release_manifest.json, tag the release, and update\n version_manifest.json\n5. Update run status to \"promoted\"\n\nArgs:\n run_id: The run ID to promote.\n candidate_version: Candidate staging scope used for staged source files.\n release_version: Stable version used for final release metadata.\n\nReturns:\n Summary message.", "id": "promote_pipeline_run", "kind": "function", - "line": 1898, + "line": 1910, "metadata": { "api_refs": [ "modal_app.pipeline.promote_run" @@ -3435,10 +3435,10 @@ "source_file": "policyengine_us_data/datasets/cps/enhanced_cps.py" }, "run_calibration": { - "docstring": "Run unified calibration pipeline.\n\nArgs:\n dataset_path: Path to CPS h5 file.\n db_path: Path to policy_data.db.\n n_clones: Number of dataset clones.\n lambda_l0: L0 regularization strength.\n epochs: Training epochs.\n device: Torch device.\n seed: Random seed.\n domain_variables: Filter targets by domain variable.\n hierarchical_domains: Domains for hierarchical\n uprating + CD reconciliation.\n skip_takeup_rerandomize: Skip takeup step.\n skip_source_impute: Skip ACS/SIPP/SCF imputations.\n target_config: Parsed target config dict.\n target_config_path: Path to target config, for provenance.\n build_only: If True, save package and skip fitting.\n package_path: Load pre-built package (skip build).\n package_output_path: Where to save calibration package.\n beta: L0 gate temperature.\n lambda_l2: L2 regularization strength.\n learning_rate: Optimizer learning rate.\n log_freq: Epochs between per-target CSV logs.\n log_path: Path for per-target calibration log CSV.\n resume_from: Path to a checkpoint or weights file to\n continue fitting from.\n checkpoint_path: Where to save resumable fit checkpoints.\n chunked_matrix: Build matrix in clone-household chunks.\n chunk_size: Clone-household columns per chunk.\n chunk_dir: Directory for chunked COO/H5 artifacts.\n keep_chunks: Keep temporary chunk H5 files.\n resume_chunks: Reuse existing chunk COO files.\n\nReturns:\n (weights, targets_df, X_sparse, target_names, geography_info)\n weights is None when build_only=True.\n geography_info is a dict with cd_geoid and base_n_records.", + "docstring": "Run unified calibration pipeline.\n\nArgs:\n dataset_path: Path to CPS h5 file.\n db_path: Path to policy_data.db.\n n_clones: Number of dataset clones.\n lambda_l0: L0 regularization strength.\n epochs: Training epochs.\n device: Torch device.\n seed: Random seed.\n domain_variables: Filter targets by domain variable.\n hierarchical_domains: Domains for hierarchical\n uprating + CD reconciliation.\n skip_takeup_rerandomize: Skip takeup step.\n skip_source_impute: Skip ACS/SIPP/SCF imputations.\n target_config: Parsed target config dict.\n target_config_path: Path to target config, for provenance.\n target_config_identity: Resolved target config path/checksum identity.\n build_only: If True, save package and skip fitting.\n package_path: Load pre-built package (skip build).\n package_output_path: Where to save calibration package.\n beta: L0 gate temperature.\n lambda_l2: L2 regularization strength.\n learning_rate: Optimizer learning rate.\n log_freq: Epochs between per-target CSV logs.\n log_path: Path for per-target calibration log CSV.\n resume_from: Path to a checkpoint or weights file to\n continue fitting from.\n checkpoint_path: Where to save resumable fit checkpoints.\n chunked_matrix: Build matrix in clone-household chunks.\n chunk_size: Clone-household columns per chunk.\n chunk_dir: Directory for chunked COO/H5 artifacts.\n keep_chunks: Keep temporary chunk H5 files.\n resume_chunks: Reuse existing chunk COO files.\n\nReturns:\n (weights, targets_df, X_sparse, target_names, geography_info)\n weights is None when build_only=True.\n geography_info is a dict with cd_geoid and base_n_records.", "id": "run_calibration", "kind": "function", - "line": 1270, + "line": 1375, "metadata": { "api_refs": [ "policyengine_us_data.calibration.unified_calibration.run_calibration" @@ -3470,7 +3470,7 @@ ] }, "object_path": "policyengine_us_data.calibration.unified_calibration.run_calibration", - "signature": "def run_calibration(dataset_path: str, db_path: str, n_clones: int = DEFAULT_N_CLONES, lambda_l0: float = 1e-08, epochs: int = DEFAULT_EPOCHS, device: str = 'cpu', seed: int = 42, domain_variables: list = None, hierarchical_domains: list = None, skip_takeup_rerandomize: bool = False, skip_source_impute: bool = True, skip_county: bool = True, target_config: dict = None, target_config_path: str = None, build_only: bool = False, package_path: str = None, package_output_path: str = None, beta: float = BETA, lambda_l2: float = LAMBDA_L2, learning_rate: float = LEARNING_RATE, log_freq: int = None, log_path: str = None, workers: int = 1, resume_from: str = None, checkpoint_path: str = None, chunked_matrix: bool = False, chunk_size: int = 25000, chunk_dir: str = None, keep_chunks: bool = False, resume_chunks: bool = False, parallel: bool = False, num_matrix_workers: int = 50, run_id: str = '')", + "signature": "def run_calibration(dataset_path: str, db_path: str, n_clones: int = DEFAULT_N_CLONES, lambda_l0: float = 1e-08, epochs: int = DEFAULT_EPOCHS, device: str = 'cpu', seed: int = 42, domain_variables: list = None, hierarchical_domains: list = None, skip_takeup_rerandomize: bool = False, skip_source_impute: bool = True, skip_county: bool = True, target_config: dict = None, target_config_path: str = None, target_config_identity: TargetConfigIdentity | None = None, build_only: bool = False, package_path: str = None, package_output_path: str = None, beta: float = BETA, lambda_l2: float = LAMBDA_L2, learning_rate: float = LEARNING_RATE, log_freq: int = None, log_path: str = None, workers: int = 1, resume_from: str = None, checkpoint_path: str = None, chunked_matrix: bool = False, chunk_size: int = 25000, chunk_dir: str = None, keep_chunks: bool = False, resume_chunks: bool = False, parallel: bool = False, num_matrix_workers: int = 50, run_id: str = '')", "source_file": "policyengine_us_data/calibration/unified_calibration.py" }, "run_local_h5_phase": { @@ -3507,7 +3507,7 @@ "docstring": "Run the full pipeline end-to-end.\n\nArgs:\n branch: Git branch to build from.\n gpu: GPU type for regional calibration.\n epochs: Training epochs for regional calibration.\n national_gpu: GPU type for national calibration.\n national_epochs: Training epochs for national.\n num_workers: Number of parallel H5 workers.\n n_clones: Number of clones for H5 building.\n skip_national: Skip national calibration/H5.\n resume_run_id: Resume a previously failed run.\n clear_checkpoints: Wipe ALL checkpoints before building\n (default False). Normally not needed \u2014 checkpoints are\n scoped by commit SHA, so stale ones from other commits\n are cleaned automatically. Use True only to force a\n full rebuild of the current commit.\n candidate_version: Candidate staging scope used for HF staging.\n release_version: Final stable release version. Usually empty until\n promotion.\n base_release_version: Stable release current when this candidate was\n built.\n release_bump: Intended SemVer bump for this candidate.\n sha_override: Exact source SHA deployed by GitHub Actions. When\n provided, this is recorded instead of reading the current\n branch tip.\n run_id: Cross-system run ID created by GitHub.\n run_context: Serialized run context from the launcher workflow.\n modal_app_name: Deployed Modal app name for this run.\n modal_environment: Modal environment used for this run.\n chunked_matrix: Build the calibration matrix in clone-household\n chunks instead of the non-chunked path. Opt-in; default off.\n chunk_size: Clone-household columns per chunk when\n ``chunked_matrix`` is True.\n parallel_matrix: Fan chunked matrix building across Modal\n workers via ``build_matrix_chunk_worker``. Only meaningful\n when ``chunked_matrix`` is True; ignored otherwise.\n num_matrix_workers: Number of Modal workers when\n ``parallel_matrix`` is True.\n\nReturns:\n The run ID for use with promote.", "id": "run_modal_pipeline", "kind": "function", - "line": 931, + "line": 943, "metadata": { "api_refs": [ "modal_app.pipeline.run_pipeline" @@ -3705,6 +3705,206 @@ "signature": "def reconcile_ss_subcomponents(data: Dict[str, Dict[int, np.ndarray]], n_cps: int, time_period: int) -> None", "source_file": "policyengine_us_data/calibration/puf_impute.py" }, + "stage2_artifact_specs": { + "docstring": "Return canonical Stage 2 paths rooted in an artifacts directory.", + "id": "stage2_artifact_specs", + "kind": "function", + "line": 96, + "metadata": { + "api_refs": [ + "policyengine_us_data.calibration_package.specs.calibration_package_artifact_paths" + ], + "artifacts_out": "[CALIBRATION_PACKAGE_FILENAME, CALIBRATION_PACKAGE_CONTRACT_FILENAME]", + "description": "Centralize calibration package, contract, metadata, and matrix-build artifact paths.", + "id": "stage2_artifact_specs", + "label": "Stage 2 Artifact Specs", + "node_type": "library", + "pathways": [ + "calibration_package" + ], + "source_file": "policyengine_us_data/calibration_package/specs.py", + "stability": "moving", + "status": "current", + "validation_commands": [ + "uv run pytest tests/unit/calibration_package/test_specs.py" + ] + }, + "object_path": "policyengine_us_data.calibration_package.specs.calibration_package_artifact_paths", + "signature": "def calibration_package_artifact_paths(artifacts_dir: str | Path) -> CalibrationPackageArtifactPaths", + "source_file": "policyengine_us_data/calibration_package/specs.py" + }, + "stage2_calibration_package_contract_validator": { + "docstring": "Validate that a Stage 2 sidecar describes the calibration package.", + "id": "stage2_calibration_package_contract_validator", + "kind": "function", + "line": 379, + "metadata": { + "api_refs": [ + "policyengine_us_data.stage_contracts.calibration_package.validate_calibration_package_contract" + ], + "artifacts_in": "['calibration_package.pkl', CALIBRATION_PACKAGE_CONTRACT_FILENAME]", + "description": "Validate that the persisted Stage 2 contract describes the calibration package and inputs.", + "id": "stage2_calibration_package_contract_validator", + "label": "Stage 2 Contract Validator", + "node_type": "validation", + "pathways": [ + "calibration_package" + ], + "source_file": "policyengine_us_data/stage_contracts/calibration_package.py", + "stability": "moving", + "status": "current", + "validation_commands": [ + "uv run pytest tests/unit/test_calibration_package_stage_contract.py" + ] + }, + "object_path": "policyengine_us_data.stage_contracts.calibration_package.validate_calibration_package_contract", + "signature": "def validate_calibration_package_contract(*, package_path: Path, contract_path: Path | None = None, package: Mapping[str, Any] | None = None, dataset_path: Path | None = None, db_path: Path | None = None) -> StageContract", + "source_file": "policyengine_us_data/stage_contracts/calibration_package.py" + }, + "stage2_calibration_package_contract_writer": { + "docstring": "Write and return the Stage 2 calibration-package contract.", + "id": "stage2_calibration_package_contract_writer", + "kind": "function", + "line": 322, + "metadata": { + "api_refs": [ + "policyengine_us_data.stage_contracts.calibration_package.write_calibration_package_contract" + ], + "artifacts_in": [ + "calibration_package.pkl" + ], + "artifacts_out": "[CALIBRATION_PACKAGE_CONTRACT_FILENAME]", + "description": "Write the Stage 2 calibration-package handoff contract next to the package artifact.", + "id": "stage2_calibration_package_contract_writer", + "label": "Stage 2 Contract Writer", + "node_type": "library", + "pathways": [ + "calibration_package" + ], + "source_file": "policyengine_us_data/stage_contracts/calibration_package.py", + "stability": "moving", + "status": "current", + "validation_commands": [ + "uv run pytest tests/unit/test_calibration_package_stage_contract.py" + ] + }, + "object_path": "policyengine_us_data.stage_contracts.calibration_package.write_calibration_package_contract", + "signature": "def write_calibration_package_contract(*, package_path: Path, dataset_path: Path, db_path: Path, package: Mapping[str, Any], parameters: CalibrationPackageParameters | Mapping[str, Any], run_id: str | None, completed_at: str, started_at: str | None = None, duration_s: float | None = None, code_sha: str | None = None, package_version: str | None = None, contract_path: Path | None = None) -> StageContract", + "source_file": "policyengine_us_data/stage_contracts/calibration_package.py" + }, + "stage2_calibration_package_writer": { + "docstring": "Save calibration package to pickle.\n\nArgs:\n path: Output file path.\n X_sparse: Sparse matrix.\n targets_df: Targets DataFrame.\n target_names: Target name list.\n metadata: Run metadata dict.\n initial_weights: Pre-computed initial weight array.\n cd_geoid: CD GEOID array from geography assignment.\n block_geoid: Block GEOID array from geography assignment.", + "id": "stage2_calibration_package_writer", + "kind": "function", + "line": 661, + "metadata": { + "api_refs": [ + "policyengine_us_data.calibration.unified_calibration.save_calibration_package" + ], + "artifacts_out": [ + "calibration_package.pkl" + ], + "description": "Persist the Stage 2 sparse matrix, target rows, target names, geography arrays, and provenance metadata.", + "id": "stage2_calibration_package_writer", + "label": "Stage 2 Package Writer", + "node_type": "library", + "pathways": [ + "calibration_package" + ], + "source_file": "policyengine_us_data/calibration/unified_calibration.py", + "stability": "moving", + "status": "current", + "validation_commands": [ + "uv run pytest tests/unit/calibration/test_unified_calibration.py" + ] + }, + "object_path": "policyengine_us_data.calibration.unified_calibration.save_calibration_package", + "signature": "def save_calibration_package(path: str, X_sparse, targets_df: 'pd.DataFrame', target_names: list, metadata: dict, initial_weights: np.ndarray = None, cd_geoid: np.ndarray = None, block_geoid: np.ndarray = None) -> None", + "source_file": "policyengine_us_data/calibration/unified_calibration.py" + }, + "stage2_target_config_apply": { + "docstring": "Filter target rows before matrix construction.", + "id": "stage2_target_config_apply", + "kind": "function", + "line": 631, + "metadata": { + "api_refs": [ + "policyengine_us_data.calibration.unified_calibration.apply_target_config_to_targets" + ], + "description": "Apply Stage 2 target include/exclude rules before matrix construction.", + "id": "stage2_target_config_apply", + "label": "Apply Stage 2 Target Config", + "node_type": "library", + "pathways": [ + "calibration_package" + ], + "source_file": "policyengine_us_data/calibration/unified_calibration.py", + "stability": "moving", + "status": "current", + "validation_commands": [ + "uv run pytest tests/unit/calibration/test_target_config.py" + ] + }, + "object_path": "policyengine_us_data.calibration.unified_calibration.apply_target_config_to_targets", + "signature": "def apply_target_config_to_targets(targets_df: 'pd.DataFrame', config: dict) -> 'pd.DataFrame'", + "source_file": "policyengine_us_data/calibration/unified_calibration.py" + }, + "stage2_target_config_identity": { + "docstring": "Resolve the target config identity used by Stage 2 package construction.", + "id": "stage2_target_config_identity", + "kind": "function", + "line": 127, + "metadata": { + "api_refs": [ + "policyengine_us_data.calibration_package.specs.resolve_target_config_identity" + ], + "artifacts_in": "[DEFAULT_TARGET_CONFIG_PATH]", + "description": "Resolve the effective Stage 2 target config path and checksum before package reuse or rebuild.", + "id": "stage2_target_config_identity", + "label": "Stage 2 Target Config Identity", + "node_type": "library", + "pathways": [ + "calibration_package" + ], + "source_file": "policyengine_us_data/calibration_package/specs.py", + "stability": "moving", + "status": "current", + "validation_commands": [ + "uv run pytest tests/unit/calibration_package/test_specs.py" + ] + }, + "object_path": "policyengine_us_data.calibration_package.specs.resolve_target_config_identity", + "signature": "def resolve_target_config_identity(target_config_path: str | Path | None = None, *, all_active_targets: bool = False, repo_root: str | Path | None = None) -> TargetConfigIdentity", + "source_file": "policyengine_us_data/calibration_package/specs.py" + }, + "stage2_target_config_load": { + "docstring": "Load target include/exclude config from YAML.\n\nArgs:\n path: Path to YAML config file.\n\nReturns:\n Parsed config dict with include and exclude lists.", + "id": "stage2_target_config_load", + "kind": "function", + "line": 525, + "metadata": { + "api_refs": [ + "policyengine_us_data.calibration.unified_calibration.load_target_config" + ], + "artifacts_in": "[DEFAULT_TARGET_CONFIG_RELATIVE_PATH]", + "description": "Load the YAML include/exclude target-selection config used by Stage 2 package construction.", + "id": "stage2_target_config_load", + "label": "Load Stage 2 Target Config", + "node_type": "library", + "pathways": [ + "calibration_package" + ], + "source_file": "policyengine_us_data/calibration/unified_calibration.py", + "stability": "moving", + "status": "current", + "validation_commands": [ + "uv run pytest tests/unit/calibration/test_target_config.py" + ] + }, + "object_path": "policyengine_us_data.calibration.unified_calibration.load_target_config", + "signature": "def load_target_config(path: str) -> dict", + "source_file": "policyengine_us_data/calibration/unified_calibration.py" + }, "stage4_release_candidate_bundle_builder": { "docstring": "Build a candidate bundle from a Stage 4 output contract shape.", "id": "stage4_release_candidate_bundle_builder", @@ -4187,7 +4387,7 @@ "docstring": "Verify deployed-image imports and subprocess seams.", "id": "verify_runtime_seams", "kind": "function", - "line": 558, + "line": 569, "metadata": { "api_refs": [ "modal_app.pipeline.verify_runtime_seams" diff --git a/docs/generated/pipeline_map.json b/docs/generated/pipeline_map.json index 6b1d74980..612344f3d 100644 --- a/docs/generated/pipeline_map.json +++ b/docs/generated/pipeline_map.json @@ -50,51 +50,6 @@ "uv run pytest tests/unit/test_modal_data_build.py" ] }, - { - "api_refs": [ - "policyengine_us_data.calibration.unified_matrix_builder.UnifiedMatrixBuilder.build_matrix" - ], - "artifacts_out": [ - "X_sparse", - "targets_df", - "target_names" - ], - "description": "Build the in-memory sparse matrix for calibration targets and clone households.", - "id": "build_matrix", - "label": "Build Calibration Matrix", - "node_type": "library", - "pathways": [ - "calibration_package" - ], - "source_file": "policyengine_us_data/calibration/unified_matrix_builder.py", - "stability": "moving", - "status": "current", - "validation_commands": [ - "uv run pytest tests/unit/calibration/test_unified_matrix_builder.py" - ] - }, - { - "api_refs": [ - "policyengine_us_data.calibration.unified_matrix_builder.UnifiedMatrixBuilder.build_matrix_chunked" - ], - "artifacts_out": [ - "chunked COO shards", - "X_sparse" - ], - "description": "Stream matrix construction through clone-household chunks with resumable shard caches.", - "id": "build_matrix_chunked", - "label": "Build Calibration Matrix In Chunks", - "node_type": "library", - "pathways": [ - "calibration_package" - ], - "source_file": "policyengine_us_data/calibration/unified_matrix_builder.py", - "stability": "experimental", - "status": "current", - "validation_commands": [ - "uv run pytest tests/integration/test_chunked_matrix_builder.py" - ] - }, { "api_refs": [ "policyengine_us_data.build_outputs.fingerprinting.PublishingInputBundle", @@ -2016,10 +1971,10 @@ } ], "metadata": { - "api_node_count": 97, + "api_node_count": 95, "canonical_stage_count": 5, - "decorated_object_count": 146, - "mapped_decorated_node_count": 49, + "decorated_object_count": 153, + "mapped_decorated_node_count": 58, "stage_count": 17, "substage_count": 17 }, @@ -3868,13 +3823,32 @@ }, { "edge_type": "data_flow", - "label": "include list", + "label": "config file", "source": "in_config_s5", - "target": "target_resolve" + "target": "stage2_target_config_identity" }, { "edge_type": "data_flow", + "label": "resolved path and checksum", + "source": "stage2_target_config_identity", + "target": "stage2_target_config_load" + }, + { + "edge_type": "data_flow", + "label": "include/exclude rules", + "source": "stage2_target_config_load", + "target": "stage2_target_config_apply" + }, + { + "edge_type": "data_flow", + "label": "candidate targets", "source": "target_resolve", + "target": "stage2_target_config_apply" + }, + { + "edge_type": "data_flow", + "label": "selected targets", + "source": "stage2_target_config_apply", "target": "target_uprate" }, { @@ -3920,10 +3894,74 @@ "target": "sparse_build" }, { - "edge_type": "produces_artifact", + "edge_type": "uses_library", + "label": "non-chunked path", + "source": "sparse_build", + "target": "build_matrix" + }, + { + "edge_type": "uses_library", + "label": "chunked path", "source": "sparse_build", + "target": "build_matrix_chunked" + }, + { + "edge_type": "data_flow", + "source": "build_matrix", + "target": "stage2_calibration_package_writer" + }, + { + "edge_type": "data_flow", + "source": "build_matrix_chunked", + "target": "stage2_calibration_package_writer" + }, + { + "edge_type": "uses_utility", + "label": "package path", + "source": "stage2_artifact_specs", + "target": "stage2_calibration_package_writer" + }, + { + "edge_type": "produces_artifact", + "source": "stage2_calibration_package_writer", "target": "out_pkg" }, + { + "edge_type": "data_flow", + "source": "out_pkg", + "target": "stage2_calibration_package_contract_writer" + }, + { + "edge_type": "uses_utility", + "label": "contract path", + "source": "stage2_artifact_specs", + "target": "stage2_calibration_package_contract_writer" + }, + { + "edge_type": "produces_artifact", + "source": "stage2_calibration_package_contract_writer", + "target": "out_contract" + }, + { + "edge_type": "validates", + "source": "out_pkg", + "target": "stage2_calibration_package_contract_validator" + }, + { + "edge_type": "validates", + "source": "out_contract", + "target": "stage2_calibration_package_contract_validator" + }, + { + "edge_type": "validates", + "source": "in_cps_s5", + "target": "stage2_calibration_package_contract_validator" + }, + { + "edge_type": "validates", + "source": "in_db_s5", + "target": "stage2_calibration_package_contract_validator" + }, { "edge_type": "uses_utility", "source": "util_sql", @@ -3951,7 +3989,10 @@ "id": "run_calibration_build", "label": "run_calibration()", "node_ids": [ + "stage2_target_config_identity", + "stage2_target_config_load", "target_resolve", + "stage2_target_config_apply", "target_uprate", "geo_build", "constraint_resolve", @@ -3959,7 +4000,14 @@ "clone_assembly", "takeup_rerand", "sparse_build", - "out_pkg" + "build_matrix", + "build_matrix_chunked", + "stage2_artifact_specs", + "stage2_calibration_package_writer", + "out_pkg", + "stage2_calibration_package_contract_writer", + "out_contract", + "stage2_calibration_package_contract_validator" ] } ], @@ -4036,6 +4084,12 @@ "label": "calibration_package.pkl", "node_type": "artifact" }, + { + "description": "Stage 2 package handoff contract written next to calibration_package.pkl", + "id": "out_contract", + "label": "calibration_package_contract.json", + "node_type": "artifact" + }, { "description": "Database queries", "id": "util_sql", @@ -4060,6 +4114,62 @@ "label": "scipy.sparse", "node_type": "utility" }, + { + "api_refs": [ + "policyengine_us_data.calibration_package.specs.resolve_target_config_identity" + ], + "artifacts_in": "[DEFAULT_TARGET_CONFIG_PATH]", + "description": "Resolve the effective Stage 2 target config path and checksum before package reuse or rebuild.", + "id": "stage2_target_config_identity", + "label": "Stage 2 Target Config Identity", + "node_type": "library", + "pathways": [ + "calibration_package" + ], + "source_file": "policyengine_us_data/calibration_package/specs.py", + "stability": "moving", + "status": "current", + "validation_commands": [ + "uv run pytest tests/unit/calibration_package/test_specs.py" + ] + }, + { + "api_refs": [ + "policyengine_us_data.calibration.unified_calibration.load_target_config" + ], + "artifacts_in": "[DEFAULT_TARGET_CONFIG_RELATIVE_PATH]", + "description": "Load the YAML include/exclude target-selection config used by Stage 2 package construction.", + "id": "stage2_target_config_load", + "label": "Load Stage 2 Target Config", + "node_type": "library", + "pathways": [ + "calibration_package" + ], + "source_file": "policyengine_us_data/calibration/unified_calibration.py", + "stability": "moving", + "status": "current", + "validation_commands": [ + "uv run pytest tests/unit/calibration/test_target_config.py" + ] + }, + { + "api_refs": [ + "policyengine_us_data.calibration.unified_calibration.apply_target_config_to_targets" + ], + "description": "Apply Stage 2 target include/exclude rules before matrix construction.", + "id": "stage2_target_config_apply", + "label": "Apply Stage 2 Target Config", + "node_type": "library", + "pathways": [ + "calibration_package" + ], + "source_file": "policyengine_us_data/calibration/unified_calibration.py", + "stability": "moving", + "status": "current", + "validation_commands": [ + "uv run pytest tests/unit/calibration/test_target_config.py" + ] + }, { "api_refs": [ "policyengine_us_data.calibration.unified_matrix_builder._compute_single_state" @@ -4099,6 +4209,132 @@ "uv run pytest tests/unit/calibration/test_unified_matrix_builder.py", "uv run pytest tests/integration/test_chunked_matrix_builder.py" ] + }, + { + "api_refs": [ + "policyengine_us_data.calibration.unified_matrix_builder.UnifiedMatrixBuilder.build_matrix" + ], + "artifacts_out": [ + "X_sparse", + "targets_df", + "target_names" + ], + "description": "Build the in-memory sparse matrix for calibration targets and clone households.", + "id": "build_matrix", + "label": "Build Calibration Matrix", + "node_type": "library", + "pathways": [ + "calibration_package" + ], + "source_file": "policyengine_us_data/calibration/unified_matrix_builder.py", + "stability": "moving", + "status": "current", + "validation_commands": [ + "uv run pytest tests/unit/calibration/test_unified_matrix_builder.py" + ] + }, + { + "api_refs": [ + "policyengine_us_data.calibration.unified_matrix_builder.UnifiedMatrixBuilder.build_matrix_chunked" + ], + "artifacts_out": [ + "chunked COO shards", + "X_sparse" + ], + "description": "Stream matrix construction through clone-household chunks with resumable shard caches.", + "id": "build_matrix_chunked", + "label": "Build Calibration Matrix In Chunks", + "node_type": "library", + "pathways": [ + "calibration_package" + ], + "source_file": "policyengine_us_data/calibration/unified_matrix_builder.py", + "stability": "experimental", + "status": "current", + "validation_commands": [ + "uv run pytest tests/integration/test_chunked_matrix_builder.py" + ] + }, + { + "api_refs": [ + "policyengine_us_data.calibration.unified_calibration.save_calibration_package" + ], + "artifacts_out": [ + "calibration_package.pkl" + ], + "description": "Persist the Stage 2 sparse matrix, target rows, target names, geography arrays, and provenance metadata.", + "id": "stage2_calibration_package_writer", + "label": "Stage 2 Package Writer", + "node_type": "library", + "pathways": [ + "calibration_package" + ], + "source_file": "policyengine_us_data/calibration/unified_calibration.py", + "stability": "moving", + "status": "current", + "validation_commands": [ + "uv run pytest tests/unit/calibration/test_unified_calibration.py" + ] + }, + { + "api_refs": [ + "policyengine_us_data.calibration_package.specs.calibration_package_artifact_paths" + ], + "artifacts_out": "[CALIBRATION_PACKAGE_FILENAME, CALIBRATION_PACKAGE_CONTRACT_FILENAME]", + "description": "Centralize calibration package, contract, metadata, and matrix-build artifact paths.", + "id": "stage2_artifact_specs", + "label": "Stage 2 Artifact Specs", + "node_type": "library", + "pathways": [ + "calibration_package" + ], + "source_file": "policyengine_us_data/calibration_package/specs.py", + "stability": "moving", + "status": "current", + "validation_commands": [ + "uv run pytest tests/unit/calibration_package/test_specs.py" + ] + }, + { + "api_refs": [ + "policyengine_us_data.stage_contracts.calibration_package.write_calibration_package_contract" + ], + "artifacts_in": [ + "calibration_package.pkl" + ], + "artifacts_out": "[CALIBRATION_PACKAGE_CONTRACT_FILENAME]", + "description": "Write the Stage 2 calibration-package handoff contract next to the package artifact.", + "id": "stage2_calibration_package_contract_writer", + "label": "Stage 2 Contract Writer", + "node_type": "library", + "pathways": [ + "calibration_package" + ], + "source_file": "policyengine_us_data/stage_contracts/calibration_package.py", + "stability": "moving", + "status": "current", + "validation_commands": [ + "uv run pytest tests/unit/test_calibration_package_stage_contract.py" + ] + }, + { + "api_refs": [ + "policyengine_us_data.stage_contracts.calibration_package.validate_calibration_package_contract" + ], + "artifacts_in": "['calibration_package.pkl', CALIBRATION_PACKAGE_CONTRACT_FILENAME]", + "description": "Validate that the persisted Stage 2 contract describes the calibration package and inputs.", + "id": "stage2_calibration_package_contract_validator", + "label": "Stage 2 Contract Validator", + "node_type": "validation", + "pathways": [ + "calibration_package" + ], + "source_file": "policyengine_us_data/stage_contracts/calibration_package.py", + "stability": "moving", + "status": "current", + "validation_commands": [ + "uv run pytest tests/unit/test_calibration_package_stage_contract.py" + ] } ], "stability": "moving", diff --git a/docs/pipeline_map.yaml b/docs/pipeline_map.yaml index 2f5e66b28..91c8fa1c2 100644 --- a/docs/pipeline_map.yaml +++ b/docs/pipeline_map.yaml @@ -805,7 +805,10 @@ stages: label: run_calibration() description: 'Build phase: resolve targets and constraints, assemble clone values, and package the sparse calibration matrix' node_ids: + - stage2_target_config_identity + - stage2_target_config_load - target_resolve + - stage2_target_config_apply - target_uprate - geo_build - constraint_resolve @@ -813,7 +816,14 @@ stages: - clone_assembly - takeup_rerand - sparse_build + - build_matrix + - build_matrix_chunked + - stage2_artifact_specs + - stage2_calibration_package_writer - out_pkg + - stage2_calibration_package_contract_writer + - out_contract + - stage2_calibration_package_contract_validator extra_nodes: - id: in_cps_s5 label: source_imputed_stratified_extended_cps.h5 @@ -859,6 +869,10 @@ stages: label: calibration_package.pkl node_type: artifact description: X_sparse CSR matrix, targets_df, initial_weights, metadata + - id: out_contract + label: calibration_package_contract.json + node_type: artifact + description: Stage 2 package handoff contract written next to calibration_package.pkl - id: util_sql label: sqlalchemy node_type: utility @@ -884,12 +898,25 @@ stages: edge_type: external_source label: SQL targets - source: in_config_s5 - target: target_resolve + target: stage2_target_config_identity + edge_type: data_flow + label: config file + - source: stage2_target_config_identity + target: stage2_target_config_load + edge_type: data_flow + label: resolved path and checksum + - source: stage2_target_config_load + target: stage2_target_config_apply edge_type: data_flow - label: include list + label: include/exclude rules - source: target_resolve + target: stage2_target_config_apply + edge_type: data_flow + label: candidate targets + - source: stage2_target_config_apply target: target_uprate edge_type: data_flow + label: selected targets - source: target_uprate target: geo_build edge_type: data_flow @@ -917,8 +944,48 @@ stages: target: sparse_build edge_type: data_flow - source: sparse_build + target: build_matrix + edge_type: uses_library + label: non-chunked path + - source: sparse_build + target: build_matrix_chunked + edge_type: uses_library + label: chunked path + - source: build_matrix + target: stage2_calibration_package_writer + edge_type: data_flow + - source: build_matrix_chunked + target: stage2_calibration_package_writer + edge_type: data_flow + - source: stage2_artifact_specs + target: stage2_calibration_package_writer + edge_type: uses_utility + label: package path + - source: stage2_calibration_package_writer target: out_pkg edge_type: produces_artifact + - source: out_pkg + target: stage2_calibration_package_contract_writer + edge_type: data_flow + - source: stage2_artifact_specs + target: stage2_calibration_package_contract_writer + edge_type: uses_utility + label: contract path + - source: stage2_calibration_package_contract_writer + target: out_contract + edge_type: produces_artifact + - source: out_pkg + target: stage2_calibration_package_contract_validator + edge_type: validates + - source: out_contract + target: stage2_calibration_package_contract_validator + edge_type: validates + - source: in_cps_s5 + target: stage2_calibration_package_contract_validator + edge_type: validates + - source: in_db_s5 + target: stage2_calibration_package_contract_validator + edge_type: validates - source: util_sql target: target_resolve edge_type: uses_utility diff --git a/modal_app/pipeline.py b/modal_app/pipeline.py index 3b8b35a4f..e71b44d09 100644 --- a/modal_app/pipeline.py +++ b/modal_app/pipeline.py @@ -94,6 +94,10 @@ DatasetCommandError, ) from policyengine_us_data.utils.run_context import RunContext, resolve_run_id # noqa: E402 +from policyengine_us_data.calibration_package.specs import ( # noqa: E402 + calibration_package_artifact_paths, + resolve_target_config_identity, +) from policyengine_us_data.utils.error_redaction import ( # noqa: E402 redacted_bounded_error_text, redact_error_text, @@ -172,6 +176,7 @@ def _calibration_package_parameters( workers: int, n_clones: int, target_config: str | None, + all_active_targets: bool = False, skip_county: bool, chunked_matrix: bool, chunk_size: int, @@ -179,11 +184,17 @@ def _calibration_package_parameters( num_matrix_workers: int, ) -> dict: """Return manifest parameters that affect package construction.""" + target_config_identity = resolve_target_config_identity( + target_config, + all_active_targets=all_active_targets, + ) effective_parallel = bool(chunked_matrix and parallel_matrix) params = { "workers": workers if not chunked_matrix else None, "n_clones": n_clones, - "target_config": target_config, + "target_config": target_config_identity.path, + "target_config_sha256": target_config_identity.sha256, + "target_config_mode": target_config_identity.mode, "skip_county": skip_county, "chunked_matrix": bool(chunked_matrix), "chunk_size": chunk_size if chunked_matrix else None, @@ -573,6 +584,7 @@ def verify_runtime_seams() -> dict: "modal_app/step_manifests/errors.py", "modal_app/step_manifests/status.py", "modal_app/fixtures/h5_cases.py", + "policyengine_us_data/calibration_package/specs.py", "tests/integration/test_fixture_50hh.h5", "policyengine_us_data/calibration/target_config.yaml", "policyengine_us_data/calibration/target_config_full.yaml", @@ -1264,6 +1276,7 @@ def run_pipeline( "database": _artifacts_dir(run_id) / "policy_data.db", } ) + package_artifacts = calibration_package_artifact_paths(_artifacts_dir(run_id)) package_parameters = _calibration_package_parameters( workers=num_workers, n_clones=n_clones, @@ -1328,8 +1341,7 @@ def run_pipeline( completed_package_manifest = _complete_step_manifest( active_step_manifest, outputs=collect_artifacts( - [_artifacts_dir(run_id) / "calibration_package.pkl"], - missing_ok=True, + package_artifacts.manifest_outputs, ), vol=pipeline_volume, ) diff --git a/modal_app/remote_calibration_runner.py b/modal_app/remote_calibration_runner.py index 28289be4f..a4ca08049 100644 --- a/modal_app/remote_calibration_runner.py +++ b/modal_app/remote_calibration_runner.py @@ -12,6 +12,9 @@ sys.path.insert(0, _p) from modal_app.images import gpu_image as image # noqa: E402 +from policyengine_us_data.calibration_package.specs import ( # noqa: E402 + calibration_package_artifact_paths, +) from policyengine_us_data.fit_weights import ( # noqa: E402 FitResultBytes, FitScope, @@ -421,7 +424,8 @@ def _build_package_impl( f"Missing {label} on pipeline volume: {p}. Run data_build first." ) - pkg_path = f"{artifacts}/calibration_package.pkl" + package_artifacts = calibration_package_artifact_paths(artifacts) + pkg_path = str(package_artifacts.package) cmd = [ *_python_cmd("-m", "policyengine_us_data.calibration.unified_calibration"), "--device", @@ -446,7 +450,7 @@ def _build_package_impl( if chunked_matrix: cmd.extend(["--chunked-matrix", "--chunk-size", str(chunk_size)]) if parallel_matrix: - chunk_dir = f"{artifacts}/matrix_build" + chunk_dir = str(package_artifacts.matrix_build_dir) cmd.extend( [ "--parallel", @@ -481,14 +485,12 @@ def _build_package_impl( raise RuntimeError(f"Package build failed with code {build_rc}") from policyengine_us_data.stage_contracts.calibration_package import ( - CALIBRATION_PACKAGE_CONTRACT_FILENAME, validate_persisted_calibration_package_contract, ) - contract_path = f"{artifacts}/{CALIBRATION_PACKAGE_CONTRACT_FILENAME}" validate_persisted_calibration_package_contract( - package_path=Path(pkg_path), - contract_path=Path(contract_path), + package_path=package_artifacts.package, + contract_path=package_artifacts.contract, dataset_path=Path(dataset_path), db_path=Path(db_path), ) @@ -567,8 +569,9 @@ def check_volume_package(artifacts_dir: str = "") -> dict: import json base = artifacts_dir if artifacts_dir else f"{PIPELINE_MOUNT}/artifacts" - pkg_path = f"{base}/calibration_package.pkl" - sidecar_path = f"{base}/calibration_package_meta.json" + package_artifacts = calibration_package_artifact_paths(base) + pkg_path = str(package_artifacts.package) + sidecar_path = str(package_artifacts.metadata) if not os.path.exists(pkg_path): return {"exists": False} diff --git a/policyengine_us_data/calibration/unified_calibration.py b/policyengine_us_data/calibration/unified_calibration.py index dcce949c1..f3943573b 100644 --- a/policyengine_us_data/calibration/unified_calibration.py +++ b/policyengine_us_data/calibration/unified_calibration.py @@ -41,6 +41,14 @@ build_checkpoint_signature, checkpoint_signature_mismatches, ) +from policyengine_us_data.calibration.calibration_utils import ( + create_target_groups, +) +from policyengine_us_data.calibration_package.specs import ( + DEFAULT_TARGET_CONFIG_PATH as DEFAULT_TARGET_CONFIG_RELATIVE_PATH, + TargetConfigIdentity, + resolve_target_config_identity, +) from policyengine_us_data.pipeline_metadata import pipeline_node from policyengine_us_data.stage_contracts.calibration_package import ( CalibrationPackageParameters, @@ -69,7 +77,9 @@ LEARNING_RATE = 0.15 DEFAULT_EPOCHS = 100 DEFAULT_N_CLONES = 430 -DEFAULT_TARGET_CONFIG_PATH = Path(__file__).resolve().parent / "target_config.yaml" +DEFAULT_TARGET_CONFIG_PATH = ( + Path(__file__).resolve().parents[2] / DEFAULT_TARGET_CONFIG_RELATIVE_PATH +) def _utc_now_isoformat() -> str: @@ -83,6 +93,8 @@ def _calibration_package_contract_parameters( workers: int, n_clones: int, target_config_path: str | None, + target_config_sha256: str | None, + target_config_mode: str | None, skip_county: bool, skip_source_impute: bool, skip_takeup_rerandomize: bool, @@ -97,6 +109,8 @@ def _calibration_package_contract_parameters( workers=workers, n_clones=n_clones, target_config_path=target_config_path, + target_config_sha256=target_config_sha256, + target_config_mode=target_config_mode, skip_county=skip_county, skip_source_impute=skip_source_impute, skip_takeup_rerandomize=skip_takeup_rerandomize, @@ -107,6 +121,50 @@ def _calibration_package_contract_parameters( ) +def _target_config_identity_for_metadata( + *, + target_config: dict | None, + target_config_path: str | None, + target_config_identity: TargetConfigIdentity | None, +) -> TargetConfigIdentity | None: + """Return a resolved identity consistent with the parsed target config.""" + + if target_config_identity is not None: + if ( + target_config is None + and target_config_identity.mode != "all_active_targets" + ): + raise ValueError( + "target_config_identity requires a parsed target_config unless " + "all_active_targets is selected" + ) + if ( + target_config is not None + and target_config_identity.mode == "all_active_targets" + ): + raise ValueError( + "all_active_targets identity cannot be paired with a target_config" + ) + return target_config_identity + if target_config is None: + if target_config_path is not None: + raise ValueError( + "target_config_path cannot be recorded unless target_config is parsed" + ) + return TargetConfigIdentity( + path=None, + sha256=None, + mode="all_active_targets", + resolved_path=None, + ) + if target_config_path is None: + raise ValueError( + "target_config_path or target_config_identity is required when " + "target_config is parsed" + ) + return resolve_target_config_identity(target_config_path) + + def get_git_provenance() -> dict: """Capture git state and package version for provenance tracking.""" import subprocess as _sp @@ -448,14 +506,30 @@ def parse_args(argv=None): return parser.parse_args(argv) +@pipeline_node( + PipelineNode( + id="stage2_target_config_load", + label="Load Stage 2 Target Config", + node_type="library", + description="Load the YAML include/exclude target-selection config used by Stage 2 package construction.", + source_file="policyengine_us_data/calibration/unified_calibration.py", + status="current", + stability="moving", + pathways=["calibration_package"], + artifacts_in=[DEFAULT_TARGET_CONFIG_RELATIVE_PATH], + validation_commands=[ + "uv run pytest tests/unit/calibration/test_target_config.py" + ], + ) +) def load_target_config(path: str) -> dict: - """Load target exclusion config from YAML. + """Load target include/exclude config from YAML. Args: path: Path to YAML config file. Returns: - Parsed config dict with 'exclude' list. + Parsed config dict with include and exclude lists. """ import yaml @@ -539,6 +613,21 @@ def apply_target_config( return filtered_df, filtered_X, filtered_names +@pipeline_node( + PipelineNode( + id="stage2_target_config_apply", + label="Apply Stage 2 Target Config", + node_type="library", + description="Apply Stage 2 target include/exclude rules before matrix construction.", + source_file="policyengine_us_data/calibration/unified_calibration.py", + status="current", + stability="moving", + pathways=["calibration_package"], + validation_commands=[ + "uv run pytest tests/unit/calibration/test_target_config.py" + ], + ) +) def apply_target_config_to_targets( targets_df: "pd.DataFrame", config: dict, @@ -553,6 +642,22 @@ def apply_target_config_to_targets( return filtered_df +@pipeline_node( + PipelineNode( + id="stage2_calibration_package_writer", + label="Stage 2 Package Writer", + node_type="library", + description="Persist the Stage 2 sparse matrix, target rows, target names, geography arrays, and provenance metadata.", + source_file="policyengine_us_data/calibration/unified_calibration.py", + status="current", + stability="moving", + pathways=["calibration_package"], + artifacts_out=["calibration_package.pkl"], + validation_commands=[ + "uv run pytest tests/unit/calibration/test_unified_calibration.py" + ], + ) +) def save_calibration_package( path: str, X_sparse, @@ -1282,6 +1387,7 @@ def run_calibration( skip_county: bool = True, target_config: dict = None, target_config_path: str = None, + target_config_identity: TargetConfigIdentity | None = None, build_only: bool = False, package_path: str = None, package_output_path: str = None, @@ -1319,6 +1425,7 @@ def run_calibration( skip_source_impute: Skip ACS/SIPP/SCF imputations. target_config: Parsed target config dict. target_config_path: Path to target config, for provenance. + target_config_identity: Resolved target config path/checksum identity. build_only: If True, save package and skip fitting. package_path: Load pre-built package (skip build). package_output_path: Where to save calibration package. @@ -1345,6 +1452,11 @@ def run_calibration( started_at = _utc_now_isoformat() t0 = time.time() + resolved_target_identity = _target_config_identity_for_metadata( + target_config=target_config, + target_config_path=target_config_path, + target_config_identity=target_config_identity, + ) # Early exit: load pre-built package if package_path is not None: @@ -1598,7 +1710,15 @@ def run_calibration( "base_n_records": n_records, "seed": seed, "created_at": _utc_now_isoformat(), - "target_config_path": target_config_path, + "target_config_path": ( + resolved_target_identity.path if resolved_target_identity else None + ), + "target_config_sha256": ( + resolved_target_identity.sha256 if resolved_target_identity else None + ), + "target_config_mode": ( + resolved_target_identity.mode if resolved_target_identity else "explicit" + ), "package_scope": "minimal" if target_config else "all_active_targets", "matrix_builder": "chunked" if chunked_matrix else "precompute", "chunk_size": chunk_size if chunked_matrix else None, @@ -1609,10 +1729,6 @@ def run_calibration( metadata["dataset_sha256"] = compute_file_checksum(Path(dataset_path)) metadata["db_sha256"] = compute_file_checksum(Path(db_path)) - if target_config_path: - metadata["target_config_sha256"] = compute_file_checksum( - Path(target_config_path) - ) initial_weights = compute_initial_weights(X_sparse, targets_df) if package_output_path: @@ -1649,7 +1765,9 @@ def run_calibration( parameters=_calibration_package_contract_parameters( workers=workers, n_clones=n_clones, - target_config_path=target_config_path, + target_config_path=metadata["target_config_path"], + target_config_sha256=metadata["target_config_sha256"], + target_config_mode=metadata["target_config_mode"], skip_county=skip_county, skip_source_impute=skip_source_impute, skip_takeup_rerandomize=skip_takeup_rerandomize, @@ -1813,9 +1931,15 @@ def main(argv=None): target_config = None target_config_path = None + target_config_identity = resolve_target_config_identity( + args.target_config, + all_active_targets=args.all_active_targets, + ) if not args.all_active_targets: - target_config_path = args.target_config or str(DEFAULT_TARGET_CONFIG_PATH) - target_config = load_target_config(target_config_path) + target_config_path = target_config_identity.path + target_config = load_target_config( + target_config_identity.resolved_path or target_config_path + ) package_output_path = args.package_output if args.build_only and not package_output_path: @@ -1851,6 +1975,7 @@ def main(argv=None): skip_county=not args.county_level, target_config=target_config, target_config_path=target_config_path, + target_config_identity=target_config_identity, build_only=args.build_only, package_path=args.package_path, package_output_path=package_output_path, diff --git a/policyengine_us_data/calibration_package/__init__.py b/policyengine_us_data/calibration_package/__init__.py new file mode 100644 index 000000000..762b17dc9 --- /dev/null +++ b/policyengine_us_data/calibration_package/__init__.py @@ -0,0 +1,29 @@ +"""Stage 2 calibration-package specifications.""" + +from .specs import ( + CALIBRATION_PACKAGE_CONTRACT_FILENAME, + CALIBRATION_PACKAGE_FILENAME, + CALIBRATION_PACKAGE_METADATA_FILENAME, + CALIBRATION_PACKAGE_SUBSTAGE_ID, + DEFAULT_TARGET_CONFIG_PATH, + MATRIX_BUILD_DIRNAME, + TARGET_CONFIG_IDENTITY_MODES, + CalibrationPackageArtifactPaths, + TargetConfigIdentity, + calibration_package_artifact_paths, + resolve_target_config_identity, +) + +__all__ = [ + "CALIBRATION_PACKAGE_CONTRACT_FILENAME", + "CALIBRATION_PACKAGE_FILENAME", + "CALIBRATION_PACKAGE_METADATA_FILENAME", + "CALIBRATION_PACKAGE_SUBSTAGE_ID", + "DEFAULT_TARGET_CONFIG_PATH", + "MATRIX_BUILD_DIRNAME", + "TARGET_CONFIG_IDENTITY_MODES", + "CalibrationPackageArtifactPaths", + "TargetConfigIdentity", + "calibration_package_artifact_paths", + "resolve_target_config_identity", +] diff --git a/policyengine_us_data/calibration_package/specs.py b/policyengine_us_data/calibration_package/specs.py new file mode 100644 index 000000000..637cc64a6 --- /dev/null +++ b/policyengine_us_data/calibration_package/specs.py @@ -0,0 +1,197 @@ +"""Shared Stage 2 calibration-package identity and artifact specifications.""" + +from __future__ import annotations + +from dataclasses import dataclass +from pathlib import Path +from typing import Literal + +from policyengine_us_data.pipeline_metadata import pipeline_node +from policyengine_us_data.pipeline_schema import PipelineNode +from policyengine_us_data.utils.manifest import compute_file_checksum + +DEFAULT_TARGET_CONFIG_PATH = "policyengine_us_data/calibration/target_config.yaml" +CALIBRATION_PACKAGE_FILENAME = "calibration_package.pkl" +CALIBRATION_PACKAGE_METADATA_FILENAME = "calibration_package_meta.json" +CALIBRATION_PACKAGE_CONTRACT_FILENAME = "calibration_package_contract.json" +MATRIX_BUILD_DIRNAME = "matrix_build" +CALIBRATION_PACKAGE_SUBSTAGE_ID = "2a_matrix_build_calibration_target_construction" + +TargetConfigMode = Literal["default", "explicit", "all_active_targets"] +TARGET_CONFIG_IDENTITY_MODES: frozenset[str] = frozenset( + {"default", "explicit", "all_active_targets"} +) + + +@dataclass(frozen=True, kw_only=True) +class TargetConfigIdentity: + """Checksum-backed identity for the Stage 2 target selection config.""" + + path: str | None + sha256: str | None + mode: TargetConfigMode + resolved_path: str | None = None + + def __post_init__(self) -> None: + if self.mode not in TARGET_CONFIG_IDENTITY_MODES: + raise ValueError(f"Unknown target config identity mode: {self.mode!r}") + if self.mode == "all_active_targets": + if self.path is not None or self.sha256 is not None: + raise ValueError( + "all_active_targets target config identity cannot include " + "a path or checksum" + ) + return + if not self.path: + raise ValueError(f"{self.mode} target config identity requires a path") + if not self.sha256: + raise ValueError(f"{self.mode} target config identity requires a checksum") + + def to_parameters(self) -> dict[str, str | None]: + """Return the identity fields used in Stage 2 reuse parameters.""" + + return { + "target_config": self.path, + "target_config_sha256": self.sha256, + "target_config_mode": self.mode, + } + + +@dataclass(frozen=True, kw_only=True) +class CalibrationPackageArtifactPaths: + """Canonical run-scoped Stage 2 artifact paths.""" + + artifacts_dir: Path + package: Path + metadata: Path + contract: Path + matrix_build_dir: Path + + @property + def manifest_outputs(self) -> tuple[Path, Path]: + """Return the durable Stage 2 outputs recorded in step manifests.""" + + return (self.package, self.contract) + + +@pipeline_node( + PipelineNode( + id="stage2_artifact_specs", + label="Stage 2 Artifact Specs", + node_type="library", + description="Centralize calibration package, contract, metadata, and matrix-build artifact paths.", + source_file="policyengine_us_data/calibration_package/specs.py", + status="current", + stability="moving", + pathways=["calibration_package"], + artifacts_out=[ + CALIBRATION_PACKAGE_FILENAME, + CALIBRATION_PACKAGE_CONTRACT_FILENAME, + ], + validation_commands=[ + "uv run pytest tests/unit/calibration_package/test_specs.py" + ], + ) +) +def calibration_package_artifact_paths( + artifacts_dir: str | Path, +) -> CalibrationPackageArtifactPaths: + """Return canonical Stage 2 paths rooted in an artifacts directory.""" + + root = Path(artifacts_dir) + return CalibrationPackageArtifactPaths( + artifacts_dir=root, + package=root / CALIBRATION_PACKAGE_FILENAME, + metadata=root / CALIBRATION_PACKAGE_METADATA_FILENAME, + contract=root / CALIBRATION_PACKAGE_CONTRACT_FILENAME, + matrix_build_dir=root / MATRIX_BUILD_DIRNAME, + ) + + +@pipeline_node( + PipelineNode( + id="stage2_target_config_identity", + label="Stage 2 Target Config Identity", + node_type="library", + description="Resolve the effective Stage 2 target config path and checksum before package reuse or rebuild.", + source_file="policyengine_us_data/calibration_package/specs.py", + status="current", + stability="moving", + pathways=["calibration_package"], + artifacts_in=[DEFAULT_TARGET_CONFIG_PATH], + validation_commands=[ + "uv run pytest tests/unit/calibration_package/test_specs.py" + ], + ) +) +def resolve_target_config_identity( + target_config_path: str | Path | None = None, + *, + all_active_targets: bool = False, + repo_root: str | Path | None = None, +) -> TargetConfigIdentity: + """Resolve the target config identity used by Stage 2 package construction.""" + + if all_active_targets: + if target_config_path is not None: + raise ValueError( + "--all-active-targets cannot be combined with a target config path" + ) + return TargetConfigIdentity( + path=None, + sha256=None, + mode="all_active_targets", + resolved_path=None, + ) + + root = Path(repo_root).resolve() if repo_root is not None else _repo_root() + mode: TargetConfigMode = "explicit" if target_config_path is not None else "default" + identity_path = Path(target_config_path or DEFAULT_TARGET_CONFIG_PATH) + resolved_path = _resolve_existing_config_path(identity_path, root) + logical_path = ( + DEFAULT_TARGET_CONFIG_PATH + if mode == "default" + else _logical_identity_path(identity_path, resolved_path, root) + ) + return TargetConfigIdentity( + path=logical_path, + sha256=f"sha256:{compute_file_checksum(resolved_path)}", + mode=mode, + resolved_path=str(resolved_path), + ) + + +def _repo_root() -> Path: + return Path(__file__).resolve().parents[2] + + +def _resolve_existing_config_path(path: Path, repo_root: Path) -> Path: + candidates = [path] if path.is_absolute() else [repo_root / path, Path.cwd() / path] + for candidate in candidates: + resolved = candidate.resolve() + if resolved.exists() and resolved.is_file(): + return resolved + raise FileNotFoundError(f"Target config not found: {path}") + + +def _logical_identity_path(path: Path, resolved_path: Path, repo_root: Path) -> str: + try: + return resolved_path.relative_to(repo_root).as_posix() + except ValueError: + return resolved_path.as_posix() if path.is_absolute() else path.as_posix() + + +__all__ = [ + "CALIBRATION_PACKAGE_CONTRACT_FILENAME", + "CALIBRATION_PACKAGE_FILENAME", + "CALIBRATION_PACKAGE_METADATA_FILENAME", + "CALIBRATION_PACKAGE_SUBSTAGE_ID", + "DEFAULT_TARGET_CONFIG_PATH", + "MATRIX_BUILD_DIRNAME", + "TARGET_CONFIG_IDENTITY_MODES", + "CalibrationPackageArtifactPaths", + "TargetConfigIdentity", + "TargetConfigMode", + "calibration_package_artifact_paths", + "resolve_target_config_identity", +] diff --git a/policyengine_us_data/stage_contracts/calibration_package.py b/policyengine_us_data/stage_contracts/calibration_package.py index dc0385321..27125c6b7 100644 --- a/policyengine_us_data/stage_contracts/calibration_package.py +++ b/policyengine_us_data/stage_contracts/calibration_package.py @@ -7,6 +7,12 @@ from pathlib import Path from typing import Any +from policyengine_us_data.calibration_package.specs import ( + CALIBRATION_PACKAGE_CONTRACT_FILENAME, + CALIBRATION_PACKAGE_SUBSTAGE_ID, +) +from policyengine_us_data.pipeline_metadata import pipeline_node +from policyengine_us_data.pipeline_schema import PipelineNode from policyengine_us_data.utils.step_manifest import sha256_file from policyengine_us_data.utils.geography_checksum import ( canonical_geography_checksum, @@ -26,11 +32,9 @@ from .stages import STAGE_2_BUILD_CALIBRATION_PACKAGE, contract_type_for_stage from .substages import SubstageRecord -CALIBRATION_PACKAGE_CONTRACT_FILENAME = "calibration_package_contract.json" CALIBRATION_PACKAGE_CONTRACT_TYPE = contract_type_for_stage( STAGE_2_BUILD_CALIBRATION_PACKAGE ) -CALIBRATION_PACKAGE_SUBSTAGE_ID = "2a_matrix_build_calibration_target_construction" def summarize_geography_assignment( @@ -200,8 +204,11 @@ def build_calibration_package_contract( _require_existing_file(db_path, "target database") parameter_schema = _calibration_package_parameters(parameters) - parameter_payload = parameter_schema.to_dict() metadata = _package_metadata(package) + parameter_payload = _parameters_with_package_identity( + parameter_schema.to_dict(), + metadata, + ) package_summary = summarize_calibration_package(package).to_dict() geography_summary = summarize_geography_assignment(package).to_dict() inputs = ( @@ -295,6 +302,23 @@ def build_calibration_package_contract( ) +@pipeline_node( + PipelineNode( + id="stage2_calibration_package_contract_writer", + label="Stage 2 Contract Writer", + node_type="library", + description="Write the Stage 2 calibration-package handoff contract next to the package artifact.", + source_file="policyengine_us_data/stage_contracts/calibration_package.py", + status="current", + stability="moving", + pathways=["calibration_package"], + artifacts_in=["calibration_package.pkl"], + artifacts_out=[CALIBRATION_PACKAGE_CONTRACT_FILENAME], + validation_commands=[ + "uv run pytest tests/unit/test_calibration_package_stage_contract.py" + ], + ) +) def write_calibration_package_contract( *, package_path: Path, @@ -333,6 +357,25 @@ def write_calibration_package_contract( return contract +@pipeline_node( + PipelineNode( + id="stage2_calibration_package_contract_validator", + label="Stage 2 Contract Validator", + node_type="validation", + description="Validate that the persisted Stage 2 contract describes the calibration package and inputs.", + source_file="policyengine_us_data/stage_contracts/calibration_package.py", + status="current", + stability="moving", + pathways=["calibration_package"], + artifacts_in=[ + "calibration_package.pkl", + CALIBRATION_PACKAGE_CONTRACT_FILENAME, + ], + validation_commands=[ + "uv run pytest tests/unit/test_calibration_package_stage_contract.py" + ], + ) +) def validate_calibration_package_contract( *, package_path: Path, @@ -488,6 +531,46 @@ def _calibration_package_parameters( return CalibrationPackageParameters.from_dict(parameters) +def _parameters_with_package_identity( + parameters: Mapping[str, Any], + metadata: Mapping[str, Any], +) -> dict[str, Any]: + payload = dict(parameters) + metadata_path = _optional_metadata_string(metadata, "target_config_path") + metadata_sha = _optional_metadata_string(metadata, "target_config_sha256") + metadata_mode = _optional_metadata_string(metadata, "target_config_mode") + + if metadata_path: + if payload.get("target_config") is None: + payload["target_config"] = metadata_path + if payload["target_config"] != metadata_path: + raise ValueError( + "Calibration package contract target_config does not match " + "package metadata" + ) + if metadata_sha: + if payload.get("target_config_sha256") is None: + payload["target_config_sha256"] = metadata_sha + if payload["target_config_sha256"] != metadata_sha: + raise ValueError( + "Calibration package contract target_config_sha256 does not match " + "package metadata" + ) + if metadata_mode: + if payload.get("target_config_mode") is None: + payload["target_config_mode"] = metadata_mode + if payload["target_config_mode"] != metadata_mode: + raise ValueError( + "Calibration package contract target_config_mode does not match " + "package metadata" + ) + if payload.get("target_config_mode") is None: + payload["target_config_mode"] = ( + "all_active_targets" if payload.get("target_config") is None else "explicit" + ) + return CalibrationPackageParameters.from_dict(payload).to_dict() + + def _require_existing_file(path: Path, label: str) -> None: if not path.exists(): raise FileNotFoundError(f"Missing {label}: {path}") diff --git a/policyengine_us_data/stage_contracts/calibration_package_schema.py b/policyengine_us_data/stage_contracts/calibration_package_schema.py index 06030812e..938f82498 100644 --- a/policyengine_us_data/stage_contracts/calibration_package_schema.py +++ b/policyengine_us_data/stage_contracts/calibration_package_schema.py @@ -7,6 +7,10 @@ from math import isfinite from typing import Any +from policyengine_us_data.calibration_package.specs import ( + TARGET_CONFIG_IDENTITY_MODES, +) + GEOGRAPHY_ASSIGNMENT_SOURCE_KINDS = frozenset( { "calibration_package", @@ -39,6 +43,8 @@ "skip_source_impute", "skip_takeup_rerandomize", "target_config", + "target_config_mode", + "target_config_sha256", "workers", } ) @@ -207,6 +213,8 @@ class CalibrationPackageParameters: workers: int | None n_clones: int target_config: str | None + target_config_sha256: str | None + target_config_mode: str | None skip_county: bool skip_source_impute: bool skip_takeup_rerandomize: bool @@ -230,6 +238,31 @@ def __post_init__(self) -> None: _validate_bool(self.parallel_matrix, "parallel_matrix") if self.target_config is not None and not isinstance(self.target_config, str): raise ValueError("target_config must be a string or None") + _validate_optional_sha256( + self.target_config_sha256, + "target_config_sha256", + ) + if self.target_config_mode is not None: + if not isinstance(self.target_config_mode, str): + raise ValueError("target_config_mode must be a string or None") + if self.target_config_mode not in TARGET_CONFIG_IDENTITY_MODES: + raise ValueError( + "target_config_mode must be one of " + f"{sorted(TARGET_CONFIG_IDENTITY_MODES)}" + ) + if self.target_config_mode == "all_active_targets": + if self.target_config is not None or self.target_config_sha256 is not None: + raise ValueError( + "all_active_targets target config parameters cannot include " + "a path or checksum" + ) + if self.target_config_mode in {"default", "explicit"} and ( + self.target_config is None or self.target_config_sha256 is None + ): + raise ValueError( + "default and explicit target config parameters require " + "target_config and target_config_sha256" + ) if self.chunked_matrix: if self.workers is not None: raise ValueError("workers must be None when chunked_matrix is true") @@ -265,14 +298,21 @@ def from_runtime_args( chunk_size: int, parallel: bool, num_matrix_workers: int, + target_config_sha256: str | None = None, + target_config_mode: str | None = None, ) -> "CalibrationPackageParameters": """Build canonical Stage 2 parameters from runtime CLI arguments.""" parallel_matrix = bool(chunked_matrix and parallel) + resolved_mode = target_config_mode or ( + "all_active_targets" if target_config_path is None else "explicit" + ) return cls( workers=workers if not chunked_matrix else None, n_clones=n_clones, target_config=target_config_path, + target_config_sha256=target_config_sha256, + target_config_mode=resolved_mode, skip_county=skip_county, skip_source_impute=skip_source_impute, skip_takeup_rerandomize=skip_takeup_rerandomize, @@ -291,15 +331,29 @@ def from_dict( if not isinstance(data, Mapping): raise ValueError("calibration package parameters must be a mapping") - _require_exact_keys( + _require_compatible_keys( data, "calibration package parameters", CALIBRATION_PACKAGE_PARAMETER_KEYS, + legacy_optional_keys=frozenset( + {"target_config_mode", "target_config_sha256"} + ), ) + target_config = _optional_string_field(data, "target_config") + target_config_sha256 = _optional_string_field(data, "target_config_sha256") + target_config_mode = _optional_string_field(data, "target_config_mode") + if "target_config_mode" not in data and "target_config_sha256" not in data: + resolved_mode = None + else: + resolved_mode = target_config_mode or ( + "all_active_targets" if target_config is None else "explicit" + ) return cls( workers=_optional_int_field(data, "workers"), n_clones=_required_int_field(data, "n_clones"), - target_config=_optional_string_field(data, "target_config"), + target_config=target_config, + target_config_sha256=target_config_sha256, + target_config_mode=resolved_mode, skip_county=_required_bool_field(data, "skip_county"), skip_source_impute=_required_bool_field(data, "skip_source_impute"), skip_takeup_rerandomize=_required_bool_field( @@ -325,6 +379,8 @@ def to_dict(self) -> dict[str, Any]: "skip_source_impute": self.skip_source_impute, "skip_takeup_rerandomize": self.skip_takeup_rerandomize, "target_config": self.target_config, + "target_config_mode": self.target_config_mode, + "target_config_sha256": self.target_config_sha256, "workers": self.workers, } @@ -463,9 +519,24 @@ def _require_exact_keys( data: Mapping[str, Any], label: str, expected_keys: frozenset[str], +) -> None: + _require_compatible_keys( + data, + label, + expected_keys, + legacy_optional_keys=frozenset(), + ) + + +def _require_compatible_keys( + data: Mapping[str, Any], + label: str, + expected_keys: frozenset[str], + *, + legacy_optional_keys: frozenset[str], ) -> None: keys = {str(key) for key in data} - missing = sorted(expected_keys - keys) + missing = sorted((expected_keys - legacy_optional_keys) - keys) unexpected = sorted(keys - expected_keys) if missing: raise ValueError(f"{label} missing required key: {missing[0]}") @@ -577,5 +648,8 @@ def _validate_optional_sha256(value: Any, key: str) -> None: return if not isinstance(value, str) or not value.startswith("sha256:"): raise ValueError(f"Calibration package field {key!r} must be a SHA-256 digest") - if len(value) != len("sha256:") + 64: + digest = value.removeprefix("sha256:") + if len(digest) != 64: + raise ValueError(f"Calibration package field {key!r} must be a SHA-256 digest") + if any(character not in "0123456789abcdef" for character in digest.lower()): raise ValueError(f"Calibration package field {key!r} must be a SHA-256 digest") diff --git a/tests/integration/test_modal_pipeline_seams.py b/tests/integration/test_modal_pipeline_seams.py index 6cf73ba26..6cb159059 100644 --- a/tests/integration/test_modal_pipeline_seams.py +++ b/tests/integration/test_modal_pipeline_seams.py @@ -63,6 +63,7 @@ def test_pipeline_image_runtime_seams(): "modal_app/step_manifests/errors.py": True, "modal_app/step_manifests/status.py": True, "modal_app/fixtures/h5_cases.py": True, + "policyengine_us_data/calibration_package/specs.py": True, "tests/integration/test_fixture_50hh.h5": True, "policyengine_us_data/calibration/target_config.yaml": True, "policyengine_us_data/calibration/target_config_full.yaml": True, diff --git a/tests/unit/calibration/test_unified_calibration.py b/tests/unit/calibration/test_unified_calibration.py index 41022a17b..1342b0511 100644 --- a/tests/unit/calibration/test_unified_calibration.py +++ b/tests/unit/calibration/test_unified_calibration.py @@ -42,18 +42,24 @@ ) from policyengine_us_data.calibration.unified_calibration import ( _calibration_package_contract_parameters, + _target_config_identity_for_metadata, check_package_staleness, + run_calibration, ) from policyengine_us_data.stage_contracts.calibration_package import ( CalibrationPackageParameters, ) +TARGET_CONFIG_SHA256 = "sha256:" + "a" * 64 + def test_calibration_package_contract_parameters_track_effective_matrix_mode(): params = _calibration_package_contract_parameters( workers=8, n_clones=430, target_config_path="policyengine_us_data/calibration/target_config.yaml", + target_config_sha256=TARGET_CONFIG_SHA256, + target_config_mode="default", skip_county=True, skip_source_impute=True, skip_takeup_rerandomize=False, @@ -68,6 +74,8 @@ def test_calibration_package_contract_parameters_track_effective_matrix_mode(): "workers": None, "n_clones": 430, "target_config": "policyengine_us_data/calibration/target_config.yaml", + "target_config_sha256": TARGET_CONFIG_SHA256, + "target_config_mode": "default", "skip_county": True, "skip_source_impute": True, "skip_takeup_rerandomize": False, @@ -83,6 +91,8 @@ def test_calibration_package_contract_parameters_ignore_unused_chunk_options(): workers=8, n_clones=430, target_config_path=None, + target_config_sha256=None, + target_config_mode="all_active_targets", skip_county=True, skip_source_impute=True, skip_takeup_rerandomize=False, @@ -98,6 +108,33 @@ def test_calibration_package_contract_parameters_ignore_unused_chunk_options(): assert params.to_dict()["num_matrix_workers"] is None +def test_target_config_identity_for_metadata_requires_identity_for_parsed_config(): + with pytest.raises( + ValueError, match="target_config_path or target_config_identity" + ): + _target_config_identity_for_metadata( + target_config={"include": []}, + target_config_path=None, + target_config_identity=None, + ) + + +def test_run_calibration_validates_target_identity_before_dataset_loading(): + with ( + patch.dict(sys.modules, {"policyengine_us": None}), + pytest.raises( + ValueError, + match="target_config_path or target_config_identity", + ), + ): + run_calibration( + dataset_path="/missing/source.h5", + db_path="/missing/policy_data.db", + target_config={"include": []}, + target_config_path=None, + ) + + def test_check_package_staleness_warns_for_old_utc_timestamp( capsys, monkeypatch, diff --git a/tests/unit/calibration_package/test_specs.py b/tests/unit/calibration_package/test_specs.py new file mode 100644 index 000000000..152ad7ab1 --- /dev/null +++ b/tests/unit/calibration_package/test_specs.py @@ -0,0 +1,119 @@ +from pathlib import Path + +import pytest + +from policyengine_us_data.calibration_package.specs import ( + CALIBRATION_PACKAGE_CONTRACT_FILENAME, + CALIBRATION_PACKAGE_FILENAME, + CALIBRATION_PACKAGE_METADATA_FILENAME, + DEFAULT_TARGET_CONFIG_PATH, + MATRIX_BUILD_DIRNAME, + TargetConfigIdentity, + calibration_package_artifact_paths, + resolve_target_config_identity, +) +from policyengine_us_data.stage_contracts.calibration_package import ( + CalibrationPackageParameters, +) +from policyengine_us_data.utils.manifest import compute_file_checksum + + +def _sha256_digest(path: Path) -> str: + return f"sha256:{compute_file_checksum(path)}" + + +def _write_default_target_config(repo_root: Path, body: str = "include: []\n") -> Path: + config_path = repo_root / DEFAULT_TARGET_CONFIG_PATH + config_path.parent.mkdir(parents=True) + config_path.write_text(body, encoding="utf-8") + return config_path + + +def test_default_target_config_identity_resolution(tmp_path): + config_path = _write_default_target_config(tmp_path) + + identity = resolve_target_config_identity(repo_root=tmp_path) + + assert identity == TargetConfigIdentity( + path=DEFAULT_TARGET_CONFIG_PATH, + sha256=_sha256_digest(config_path), + mode="default", + resolved_path=str(config_path.resolve()), + ) + assert identity.to_parameters() == { + "target_config": DEFAULT_TARGET_CONFIG_PATH, + "target_config_sha256": _sha256_digest(config_path), + "target_config_mode": "default", + } + + +def test_explicit_target_config_identity_resolution(tmp_path): + config_path = _write_default_target_config(tmp_path) + + identity = resolve_target_config_identity( + DEFAULT_TARGET_CONFIG_PATH, + repo_root=tmp_path, + ) + + assert identity.path == DEFAULT_TARGET_CONFIG_PATH + assert identity.sha256 == _sha256_digest(config_path) + assert identity.mode == "explicit" + assert identity.resolved_path == str(config_path.resolve()) + + +def test_resolved_target_config_identity_is_contract_compatible(tmp_path): + _write_default_target_config(tmp_path) + identity = resolve_target_config_identity(repo_root=tmp_path) + + params = CalibrationPackageParameters.from_runtime_args( + workers=8, + n_clones=430, + target_config_path=identity.path, + target_config_sha256=identity.sha256, + target_config_mode=identity.mode, + skip_county=True, + skip_source_impute=True, + skip_takeup_rerandomize=False, + chunked_matrix=False, + chunk_size=25_000, + parallel=False, + num_matrix_workers=50, + ) + + assert params.target_config_sha256 == identity.sha256 + + +def test_all_active_targets_identity_resolution(): + identity = resolve_target_config_identity(all_active_targets=True) + + assert identity.to_parameters() == { + "target_config": None, + "target_config_sha256": None, + "target_config_mode": "all_active_targets", + } + + +def test_all_active_targets_rejects_config_path(): + with pytest.raises(ValueError, match="all-active-targets"): + resolve_target_config_identity( + DEFAULT_TARGET_CONFIG_PATH, + all_active_targets=True, + ) + + +def test_calibration_package_artifact_paths(): + paths = calibration_package_artifact_paths("/pipeline/artifacts/run-a") + + assert paths.package == Path("/pipeline/artifacts/run-a") / ( + CALIBRATION_PACKAGE_FILENAME + ) + assert paths.metadata == Path("/pipeline/artifacts/run-a") / ( + CALIBRATION_PACKAGE_METADATA_FILENAME + ) + assert paths.contract == Path("/pipeline/artifacts/run-a") / ( + CALIBRATION_PACKAGE_CONTRACT_FILENAME + ) + assert paths.matrix_build_dir == Path("/pipeline/artifacts/run-a") / ( + MATRIX_BUILD_DIRNAME + ) + assert paths.manifest_outputs == (paths.package, paths.contract) diff --git a/tests/unit/fixtures/calibration_package_stage_contract.py b/tests/unit/fixtures/calibration_package_stage_contract.py index f11640ee9..780ec7fbf 100644 --- a/tests/unit/fixtures/calibration_package_stage_contract.py +++ b/tests/unit/fixtures/calibration_package_stage_contract.py @@ -22,6 +22,7 @@ CALIBRATION_COMPLETED_AT = "2026-05-08T12:02:00Z" CALIBRATION_DURATION_S = 120.0 TARGET_CONFIG_PATH = "policyengine_us_data/calibration/target_config.yaml" +TARGET_CONFIG_SHA256 = "sha256:" + "a" * 64 CALIBRATION_BLOCK_GEOIDS = ("010010001", "010010002", "020010001") CALIBRATION_CD_GEOIDS = ("0101", "0102", "0201") @@ -66,7 +67,8 @@ def calibration_package_payload() -> dict[str, Any]: "dataset_sha256": "sha256:dataset", "db_sha256": "sha256:db", "target_config_path": TARGET_CONFIG_PATH, - "target_config_sha256": "sha256:target-config", + "target_config_sha256": TARGET_CONFIG_SHA256, + "target_config_mode": "explicit", "n_clones": 3, "seed": 42, "base_n_records": 1, @@ -148,6 +150,8 @@ def calibration_package_parameters() -> dict[str, Any]: "workers": None, "n_clones": 3, "target_config": TARGET_CONFIG_PATH, + "target_config_sha256": TARGET_CONFIG_SHA256, + "target_config_mode": "explicit", "skip_county": True, "skip_source_impute": True, "skip_takeup_rerandomize": False, diff --git a/tests/unit/test_calibration_package_stage_contract.py b/tests/unit/test_calibration_package_stage_contract.py index f00f646da..b99a12c78 100644 --- a/tests/unit/test_calibration_package_stage_contract.py +++ b/tests/unit/test_calibration_package_stage_contract.py @@ -1,4 +1,5 @@ from tests.unit.fixtures.calibration_package_stage_contract import ( + TARGET_CONFIG_SHA256, TARGET_CONFIG_PATH, calibration_package_contract, calibration_package_parameters, @@ -60,6 +61,8 @@ def test_calibration_package_parameters_parse_runtime_args(): workers=8, n_clones=430, target_config_path=TARGET_CONFIG_PATH, + target_config_sha256=TARGET_CONFIG_SHA256, + target_config_mode="explicit", skip_county=True, skip_source_impute=True, skip_takeup_rerandomize=False, @@ -79,16 +82,110 @@ def test_calibration_package_parameters_parse_runtime_args(): "skip_source_impute": True, "skip_takeup_rerandomize": False, "target_config": TARGET_CONFIG_PATH, + "target_config_mode": "explicit", + "target_config_sha256": TARGET_CONFIG_SHA256, "workers": None, } +def test_calibration_package_parameters_require_identity_for_config_modes(): + try: + CalibrationPackageParameters.from_runtime_args( + workers=8, + n_clones=430, + target_config_path=TARGET_CONFIG_PATH, + target_config_sha256=None, + target_config_mode="explicit", + skip_county=True, + skip_source_impute=True, + skip_takeup_rerandomize=False, + chunked_matrix=False, + chunk_size=25_000, + parallel=False, + num_matrix_workers=50, + ) + except ValueError as exc: + assert "target_config and target_config_sha256" in str(exc) + else: + raise AssertionError("Explicit target config mode should require checksum") + + +def test_calibration_package_parameters_reject_malformed_target_config_checksum(): + try: + CalibrationPackageParameters.from_runtime_args( + workers=8, + n_clones=430, + target_config_path=TARGET_CONFIG_PATH, + target_config_sha256="sha256:target-config", + target_config_mode="explicit", + skip_county=True, + skip_source_impute=True, + skip_takeup_rerandomize=False, + chunked_matrix=False, + chunk_size=25_000, + parallel=False, + num_matrix_workers=50, + ) + except ValueError as exc: + assert "SHA-256 digest" in str(exc) + else: + raise AssertionError("Malformed target config checksum should fail") + + +def test_calibration_package_parameters_accept_legacy_identity_fields_missing(): + params = CalibrationPackageParameters.from_dict( + { + "chunk_size": None, + "chunked_matrix": False, + "n_clones": 430, + "num_matrix_workers": None, + "parallel_matrix": False, + "skip_county": True, + "skip_source_impute": True, + "skip_takeup_rerandomize": False, + "target_config": TARGET_CONFIG_PATH, + "workers": 8, + } + ) + + assert params.target_config == TARGET_CONFIG_PATH + assert params.target_config_mode is None + assert params.target_config_sha256 is None + + +def test_calibration_package_contract_revalidates_backfilled_identity(tmp_path): + dataset_path, db_path, package_path = contract_input_paths(tmp_path) + package = calibration_package_payload() + package["metadata"].pop("target_config_sha256") + write_calibration_package_payload(package_path, package) + parameters = calibration_package_parameters() + parameters.pop("target_config_mode") + parameters.pop("target_config_sha256") + + try: + build_calibration_package_contract( + package_path=package_path, + dataset_path=dataset_path, + db_path=db_path, + package=package, + parameters=parameters, + run_id="run-a", + completed_at="2026-05-08T12:02:00Z", + ) + except ValueError as exc: + assert "target_config and target_config_sha256" in str(exc) + else: + raise AssertionError("Backfilled target config identity should be revalidated") + + def test_calibration_package_parameters_reject_inconsistent_chunk_shape(): try: CalibrationPackageParameters( workers=8, n_clones=430, target_config=None, + target_config_sha256=None, + target_config_mode="all_active_targets", skip_county=True, skip_source_impute=True, skip_takeup_rerandomize=False, @@ -203,7 +300,7 @@ def test_calibration_package_contract_records_matrix_summary(tmp_path): assert summary["matrix_density"] == 0.5 assert summary["n_targets"] == 2 assert summary["target_name_count"] == 2 - assert summary["target_config_sha256"] == "sha256:target-config" + assert summary["target_config_sha256"] == TARGET_CONFIG_SHA256 assert summary["n_clones"] == 3 assert summary["seed"] == 42 assert summary["matrix_builder"] == "chunked" diff --git a/tests/unit/test_pipeline.py b/tests/unit/test_pipeline.py index 95235e3ba..199d63747 100644 --- a/tests/unit/test_pipeline.py +++ b/tests/unit/test_pipeline.py @@ -9,6 +9,10 @@ modal = pytest.importorskip("modal") +from policyengine_us_data.calibration_package.specs import ( # noqa: E402 + DEFAULT_TARGET_CONFIG_PATH, +) +from policyengine_us_data.utils.manifest import compute_file_checksum # noqa: E402 from modal_app.pipeline import ( # noqa: E402 NATIONAL_FIT_LAMBDA_L0, _build_diagnostics_upload_script, @@ -48,6 +52,11 @@ def test_calibration_package_parameters_track_matrix_mode(): assert params["chunked_matrix"] is True assert "workers" not in params + assert params["target_config"] == DEFAULT_TARGET_CONFIG_PATH + assert params["target_config_sha256"] == ( + f"sha256:{compute_file_checksum(DEFAULT_TARGET_CONFIG_PATH)}" + ) + assert params["target_config_mode"] == "default" assert params["chunk_size"] == 10_000 assert params["parallel_matrix"] is True assert params["num_matrix_workers"] == 25 @@ -67,6 +76,11 @@ def test_calibration_package_parameters_ignore_unused_matrix_options(): assert params["chunked_matrix"] is False assert params["workers"] == 50 + assert params["target_config"] == DEFAULT_TARGET_CONFIG_PATH + assert params["target_config_sha256"] == ( + f"sha256:{compute_file_checksum(DEFAULT_TARGET_CONFIG_PATH)}" + ) + assert params["target_config_mode"] == "default" assert "chunk_size" not in params assert params["parallel_matrix"] is False assert "num_matrix_workers" not in params diff --git a/tests/unit/test_pipeline_docs_extractor.py b/tests/unit/test_pipeline_docs_extractor.py index 986aa8a4c..27f16b177 100644 --- a/tests/unit/test_pipeline_docs_extractor.py +++ b/tests/unit/test_pipeline_docs_extractor.py @@ -123,6 +123,21 @@ def test_pipeline_map_manifest_validates(): assert bundle["metadata"]["mapped_decorated_node_count"] >= 45 assert sum(len(stage["nodes"]) for stage in bundle["stages"]) >= 160 assert sum(len(stage["edges"]) for stage in bundle["stages"]) >= 170 + stage2 = next( + stage + for stage in bundle["stages"] + if stage["id"] == "2a_matrix_build_calibration_target_construction" + ) + stage2_node_ids = {node["id"] for node in stage2["nodes"]} + assert { + "stage2_target_config_identity", + "stage2_target_config_load", + "build_matrix", + "build_matrix_chunked", + "stage2_calibration_package_writer", + "stage2_calibration_package_contract_writer", + "stage2_calibration_package_contract_validator", + } <= stage2_node_ids def test_pipeline_map_stage_1_substages_match_dataset_build_specs(): diff --git a/tests/unit/test_pipeline_source_contracts.py b/tests/unit/test_pipeline_source_contracts.py index 189ff9fd4..36820915b 100644 --- a/tests/unit/test_pipeline_source_contracts.py +++ b/tests/unit/test_pipeline_source_contracts.py @@ -58,6 +58,28 @@ def test_run_pipeline_stage_1_stages_datasets_without_promoting() -> None: assert keywords["version"].id == "candidate_version" +def test_calibration_package_parameters_record_target_config_identity() -> None: + source_text = PIPELINE_SOURCE.read_text() + tree = ast.parse(source_text) + helper = _function_def(tree, "_calibration_package_parameters") + source = ast.get_source_segment(source_text, helper) + + assert "resolve_target_config_identity(" in source + assert '"target_config": target_config_identity.path' in source + assert '"target_config_sha256": target_config_identity.sha256' in source + assert '"target_config_mode": target_config_identity.mode' in source + + +def test_stage_2_manifest_records_package_and_contract_outputs() -> None: + source_text = PIPELINE_SOURCE.read_text() + tree = ast.parse(source_text) + run_pipeline = _function_def(tree, "run_pipeline") + source = ast.get_source_segment(source_text, run_pipeline) + + assert "package_artifacts = calibration_package_artifact_paths(" in source + assert "package_artifacts.manifest_outputs" in source + + def test_promote_run_fails_closed_for_required_promotion_steps() -> None: tree = ast.parse(PIPELINE_SOURCE.read_text()) promote_run = _function_def(tree, "promote_run")