Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions changelog.d/sipp-positive-weights.fixed
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Filter non-positive SIPP donor weights before fitting source imputation models.
Interpret SIPP status flags with Census status semantics when filtering observed donor targets.
Bump policyengine-us to 1.703.1.
19 changes: 19 additions & 0 deletions policyengine_us_data/calibration/source_impute.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@
from policyengine_us_data.pipeline_schema import PipelineNode
from policyengine_us_data.utils.source_quality import (
cap_training_sample,
filter_positive_finite_weight_rows,
require_columns_present,
target_observed_source_masks,
)
Expand Down Expand Up @@ -710,6 +711,12 @@ def _impute_sipp(
"household_weight",
]
tip_train = sipp_df[tip_cols].dropna()
tip_train, tip_target_filters = filter_positive_finite_weight_rows(
tip_train,
weight_col="household_weight",
target_filters=tip_target_filters,
context_name="SIPP source tip donor",
)
tip_train, tip_target_filters = cap_training_sample(
tip_train,
max_train_samples=10_000,
Expand Down Expand Up @@ -849,6 +856,12 @@ def _impute_sipp(
target_source_columns=SIPP_ASSET_TARGET_SOURCE_COLUMNS,
target_allocation_flag_columns=SIPP_ASSET_TARGET_ALLOCATION_COLUMNS,
)
asset_train, asset_target_filters = filter_positive_finite_weight_rows(
asset_train,
weight_col="household_weight",
target_filters=asset_target_filters,
context_name="SIPP source asset donor",
)
asset_train, asset_target_filters = cap_training_sample(
asset_train,
max_train_samples=20_000,
Expand Down Expand Up @@ -1013,6 +1026,12 @@ def _impute_sipp(
targets=vehicle_vars,
target_allocation_flag_columns=SIPP_VEHICLE_TARGET_ALLOCATION_COLUMNS,
)
vehicle_train, vehicle_target_filters = filter_positive_finite_weight_rows(
vehicle_train,
weight_col="household_weight",
target_filters=vehicle_target_filters,
context_name="SIPP source vehicle donor",
)
vehicle_train, vehicle_target_filters = cap_training_sample(
vehicle_train,
max_train_samples=20_000,
Expand Down
62 changes: 59 additions & 3 deletions policyengine_us_data/datasets/sipp/sipp.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
)
from policyengine_us_data.utils.source_quality import (
cap_training_sample,
filter_positive_finite_weight_rows,
filter_observed_source_rows,
require_columns_present,
sipp_allocation_flag_for,
Expand Down Expand Up @@ -188,6 +189,12 @@ def train_tip_model():
]

sipp = sipp[~sipp.isna().any(axis=1)]
sipp, tip_target_filters = filter_positive_finite_weight_rows(
sipp,
weight_col="household_weight",
target_filters=tip_target_filters,
context_name="SIPP tip donor",
)
sipp, tip_target_filters = cap_training_sample(
sipp,
max_train_samples=10_000,
Expand Down Expand Up @@ -232,9 +239,40 @@ def get_tip_model() -> QRF:
"stock_assets": ["TVAL_STMF"],
"bond_assets": ["TVAL_BOND"],
}
SIPP_BANK_ACCOUNT_ASSET_ALLOCATION_COLUMNS = [
"AJSSAVVAL",
"AJOSAVVAL",
"AOSAVVAL",
"AJSMMVAL",
"AJOMMVAL",
"AOMMVAL",
"AJSCDVAL",
"AJOCDVAL",
"AOCDVAL",
"AJSCHKVAL",
"AJOCHKVAL",
"AOCHKVAL",
]
SIPP_STOCK_ASSET_ALLOCATION_COLUMNS = [
"AJSSTVAL",
"AJOSTVAL",
"AOSTVAL",
"AJSMFVAL",
"AJOMFVAL",
"AOMFVAL",
]
SIPP_BOND_ASSET_ALLOCATION_COLUMNS = [
"AJSGOVSVAL",
"AJOGOVSVAL",
"AOGOVSVAL",
"AJSMCBDVAL",
"AJOMCBDVAL",
"AOMCBDVAL",
]
SIPP_ASSET_TARGET_ALLOCATION_COLUMNS = {
target: [sipp_allocation_flag_for(column) for column in columns]
for target, columns in SIPP_ASSET_TARGET_SOURCE_COLUMNS.items()
"bank_account_assets": SIPP_BANK_ACCOUNT_ASSET_ALLOCATION_COLUMNS,
"stock_assets": SIPP_STOCK_ASSET_ALLOCATION_COLUMNS,
"bond_assets": SIPP_BOND_ASSET_ALLOCATION_COLUMNS,
}
SIPP_ASSET_ALLOCATION_COLUMNS = sorted(
{
Expand Down Expand Up @@ -326,7 +364,7 @@ def get_tip_model() -> QRF:

SIPP_VEHICLE_TARGET_ALLOCATION_COLUMNS = {
"household_vehicles_owned": [sipp_allocation_flag_for("TVEH_NUM")],
"household_vehicles_value": [sipp_allocation_flag_for("THVAL_VEH")],
"household_vehicles_value": ["AVEH1VAL", "AVEH2VAL", "AVEH3VAL"],
}

VEHICLE_COLUMNS = [
Expand All @@ -347,6 +385,9 @@ def get_tip_model() -> QRF:
"THVAL_HOME",
"AVEH_NUM",
"AHVAL_VEH",
"AVEH1VAL",
"AVEH2VAL",
"AVEH3VAL",
]


Expand Down Expand Up @@ -652,6 +693,12 @@ def train_asset_model():
target_source_columns=SIPP_ASSET_TARGET_SOURCE_COLUMNS,
target_allocation_flag_columns=SIPP_ASSET_TARGET_ALLOCATION_COLUMNS,
)
sipp, asset_target_filters = filter_positive_finite_weight_rows(
sipp,
weight_col="household_weight",
target_filters=asset_target_filters,
context_name="SIPP asset donor",
)
sipp, asset_target_filters = cap_training_sample(
sipp,
max_train_samples=20_000,
Expand Down Expand Up @@ -799,6 +846,9 @@ def build_vehicle_training_frame() -> pd.DataFrame:
"household_vehicles_value": grouped["THVAL_VEH"].first().fillna(0),
"AVEH_NUM": grouped["AVEH_NUM"].max().fillna(0),
"AHVAL_VEH": grouped["AHVAL_VEH"].first().fillna(0),
"AVEH1VAL": grouped["AVEH1VAL"].max().fillna(0),
"AVEH2VAL": grouped["AVEH2VAL"].max().fillna(0),
"AVEH3VAL": grouped["AVEH3VAL"].max().fillna(0),
"is_homeowner": (grouped["THVAL_HOME"].first().fillna(0) > 0).astype(
np.float32
),
Expand Down Expand Up @@ -839,6 +889,12 @@ def train_vehicle_model():
targets=vehicle_vars,
target_allocation_flag_columns=SIPP_VEHICLE_TARGET_ALLOCATION_COLUMNS,
)
sipp, vehicle_target_filters = filter_positive_finite_weight_rows(
sipp,
weight_col="household_weight",
target_filters=vehicle_target_filters,
context_name="SIPP vehicle donor",
)
sipp, vehicle_target_filters = cap_training_sample(
sipp,
max_train_samples=20_000,
Expand Down
81 changes: 77 additions & 4 deletions policyengine_us_data/utils/source_quality.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,18 @@

logger = logging.getLogger(__name__)

SIPP_OBSERVED_STATUS_VALUES = frozenset((0, 1, 9))
SIPP_STATUS_FLAG_PREFIXES = (
"AJB",
"AJS",
"AJO",
"AO",
"ASSI",
"AVAL",
"AVEH",
"AHVAL",
)


def sipp_allocation_flag_for(source_column: str) -> str:
"""Return the SIPP allocation flag name for a source variable."""
Expand All @@ -20,6 +32,11 @@ def sipp_allocation_flag_for(source_column: str) -> str:
return f"A{source_column[1:]}"


def is_sipp_status_flag_column(column: str) -> bool:
"""Return whether a column name looks like a Census SIPP status flag."""
return column.startswith(SIPP_STATUS_FLAG_PREFIXES)


def require_columns_present(
available_columns: Container[str],
required_columns: Sequence[str],
Expand Down Expand Up @@ -47,9 +64,13 @@ def observed_source_mask(
) -> pd.Series:
"""Mask rows whose donor source values are observed for one target.

Source-survey allocation flags conventionally use ``0`` for not allocated
and non-zero values for allocated/imputed. Missing flag columns are ignored
so callers can use this helper across sources with different flag coverage.
Generic allocation flags use ``0`` for not allocated and non-zero values
for allocated/imputed. Census SIPP ``A*`` status flags instead encode
``0`` as not in universe, ``1`` as reported, and ``9`` as derivable from
component flags; values ``2`` through ``8`` indicate imputation.

Missing flag columns are ignored so callers can use this helper across
sources with different flag coverage.
"""
mask = pd.Series(True, index=df.index)

Expand All @@ -62,7 +83,10 @@ def observed_source_mask(
if column not in df:
continue
flag = pd.to_numeric(df[column], errors="coerce").fillna(0)
mask &= flag.eq(0)
if is_sipp_status_flag_column(column):
mask &= flag.isin(SIPP_OBSERVED_STATUS_VALUES)
else:
mask &= flag.eq(0)

return mask

Expand Down Expand Up @@ -225,3 +249,52 @@ def cap_training_sample(
for target, mask in filters.items()
}
return sampled_df, sampled_filters


def filter_positive_finite_weight_rows(
df: pd.DataFrame,
*,
weight_col: str,
target_filters: Mapping[str, pd.Series] | None = None,
context_name: str = "donor training frame",
) -> tuple[pd.DataFrame, dict[str, pd.Series]]:
"""Drop rows whose fit weight cannot be passed to microimpute."""
if weight_col not in df:
raise KeyError(f"{context_name} is missing weight column {weight_col!r}")

filters = {}
for target, mask in (target_filters or {}).items():
aligned = mask.reindex(df.index)
if aligned.isna().any():
raise ValueError(f"target_filters[{target!r}] contains missing values")
filters[target] = aligned.astype(bool)

weights = pd.to_numeric(df[weight_col], errors="coerce")
valid_weight = np.isfinite(weights) & weights.gt(0)
dropped = int((~valid_weight).sum())
if dropped:
logger.info(
"Dropped %d/%d %s rows with non-positive or non-finite %s",
dropped,
len(df),
context_name,
weight_col,
)

filtered_df = df.loc[valid_weight].copy().reset_index(drop=True)
filtered_filters = {
target: pd.Series(
mask.loc[valid_weight].to_numpy(dtype=bool),
index=filtered_df.index,
)
for target, mask in filters.items()
}

for target, mask in filtered_filters.items():
if not mask.any():
raise ValueError(
f"No observed donor rows with positive finite {weight_col} "
f"available for {target}"
)

return filtered_df, filtered_filters
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ classifiers = [
"Programming Language :: Python :: 3.14",
]
dependencies = [
"policyengine-us==1.702.1",
"policyengine-us==1.703.1",
# policyengine-core 3.26.1 is the current 3.26.x runtime and includes the fix for
# PolicyEngine/policyengine-core#482 (user-set ETERNITY inputs lost
# after _invalidate_all_caches) and is required by policyengine-us 1.682.1+.
Expand Down
12 changes: 7 additions & 5 deletions tests/unit/calibration/test_source_impute.py
Original file line number Diff line number Diff line change
Expand Up @@ -410,7 +410,7 @@ def test_calibration_sipp_qrf_passes_target_filters(self, monkeypatch):
for column in source_impute.SIPP_TIP_AMOUNT_COLUMNS:
tip_columns[column] = [10.0, 5.0, 0.0]
for column in source_impute.SIPP_TIP_AMOUNT_TO_ALLOCATION_COLUMN.values():
tip_columns[column] = [0, 1, 0]
tip_columns[column] = [1, 2, 0]
for column in source_impute.SIPP_JOB_OCCUPATION_COLUMNS:
tip_columns[column] = [0, 0, 0]
tip_source = pd.DataFrame(tip_columns)
Expand All @@ -437,8 +437,8 @@ def test_calibration_sipp_qrf_passes_target_filters(self, monkeypatch):
asset_columns[column] = [1_000.0, 2_000.0, 0.0]
for column in source_impute.SIPP_ASSET_ALLOCATION_COLUMNS:
asset_columns[column] = [0, 0, 0]
asset_columns["AVAL_BANK"] = [0, 1, 0]
asset_columns["AVAL_STMF"] = [0, 0, 1]
asset_columns["AJSSAVVAL"] = [0, 2, 0]
asset_columns["AJSSTVAL"] = [0, 0, 6]
asset_source = pd.DataFrame(asset_columns)

vehicle_train = pd.DataFrame(
Expand All @@ -449,8 +449,10 @@ def test_calibration_sipp_qrf_passes_target_filters(self, monkeypatch):
},
"household_vehicles_owned": [1.0, 2.0, 3.0],
"household_vehicles_value": [5_000.0, 10_000.0, 15_000.0],
"AVEH_NUM": [0, 1, 0],
"AHVAL_VEH": [0, 0, 1],
"AVEH_NUM": [1, 2, 1],
"AVEH1VAL": [1, 1, 5],
"AVEH2VAL": [0, 0, 0],
"AVEH3VAL": [0, 0, 0],
"household_weight": [1.0, 1.0, 1.0],
}
)
Expand Down
2 changes: 1 addition & 1 deletion tests/unit/datasets/test_sipp_ssi_disability.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ def test_ssi_disability_training_usecols_include_label_and_income_columns():

def test_build_ssi_disability_training_frame_excludes_allocated_label_source():
frame = _base_sipp_frame()
frame.loc[0, "ASSI_YRYN"] = 1
frame.loc[0, "ASSI_YRYN"] = 3
frame.loc[1:, "ASSI_YRYN"] = 0
frame["ASSI_BRSN"] = 0

Expand Down
Loading