diff --git a/changelog.d/consolidate-economic-impact.added.md b/changelog.d/consolidate-economic-impact.added.md new file mode 100644 index 00000000..ae1ea94f --- /dev/null +++ b/changelog.d/consolidate-economic-impact.added.md @@ -0,0 +1 @@ +Shared compute functions for economic impact analysis: AnalysisStrategy protocol, economic_impact_analysis, BudgetSummaryItem, compute_program_statistics, compute_decile_impacts, and PolicyReformAnalysis diff --git a/examples/us_budgetary_impact.py b/examples/us_budgetary_impact.py new file mode 100644 index 00000000..f16fb051 --- /dev/null +++ b/examples/us_budgetary_impact.py @@ -0,0 +1,155 @@ +"""Example: US budgetary impact comparison between baseline and reform. + +Demonstrates the canonical policyengine.py workflow: +1. Ensure datasets exist (download + compute or load from cache) +2. Define a parametric reform +3. Run baseline and reform simulations +4. Use economic_impact_analysis() for the full analysis +5. Use ChangeAggregate for targeted single-metric queries + +Run: python examples/us_budgetary_impact.py +""" + +import datetime + +from policyengine.core import Parameter, ParameterValue, Policy, Simulation +from policyengine.outputs.change_aggregate import ( + ChangeAggregate, + ChangeAggregateType, +) +from policyengine.tax_benefit_models.us import ( + economic_impact_analysis, + ensure_datasets, + us_latest, +) + + +def main(): + year = 2026 + + # ── Step 1: Get dataset (downloads from HuggingFace on first run) ── + print("Ensuring datasets are available...") + datasets = ensure_datasets( + datasets=["hf://policyengine/policyengine-us-data/enhanced_cps_2024.h5"], + years=[year], + data_folder="./data", + ) + dataset = datasets[f"enhanced_cps_2024_{year}"] + print(f" Loaded: {dataset}") + + # ── Step 2: Define a reform ── + # Example: double the standard deduction for single filers + param = Parameter( + name="gov.irs.deductions.standard.amount.SINGLE", + tax_benefit_model_version=us_latest, + ) + reform = Policy( + name="Double standard deduction (single)", + parameter_values=[ + ParameterValue( + parameter=param, + start_date=datetime.date(year, 1, 1), + end_date=datetime.date(year, 12, 31), + value=30_950, + ), + ], + ) + + # ── Step 3: Create simulations ── + baseline_sim = Simulation( + dataset=dataset, + tax_benefit_model_version=us_latest, + ) + reform_sim = Simulation( + dataset=dataset, + tax_benefit_model_version=us_latest, + policy=reform, + ) + + # ── Step 4a: Quick budgetary number via ChangeAggregate ── + # This requires running the simulations first. + print("\nRunning simulations...") + baseline_sim.run() + reform_sim.run() + + tax_change = ChangeAggregate( + baseline_simulation=baseline_sim, + reform_simulation=reform_sim, + variable="household_tax", + aggregate_type=ChangeAggregateType.SUM, + ) + tax_change.run() + print("\nQuick budgetary result:") + print(f" Tax revenue change: ${tax_change.result / 1e9:.2f}B") + + # Count winners and losers + winners = ChangeAggregate( + baseline_simulation=baseline_sim, + reform_simulation=reform_sim, + variable="household_net_income", + aggregate_type=ChangeAggregateType.COUNT, + change_geq=1, + ) + losers = ChangeAggregate( + baseline_simulation=baseline_sim, + reform_simulation=reform_sim, + variable="household_net_income", + aggregate_type=ChangeAggregateType.COUNT, + change_leq=-1, + ) + winners.run() + losers.run() + print(f" Winners: {winners.result / 1e6:.2f}M households") + print(f" Losers: {losers.result / 1e6:.2f}M households") + + # ── Step 4b: Full analysis via economic_impact_analysis ── + # Note: this calls .ensure() internally, which is a no-op here since + # we already ran the simulations above. If we hadn't called .run(), + # ensure() would run + cache them automatically. + print("\nRunning full economic impact analysis...") + analysis = economic_impact_analysis(baseline_sim, reform_sim) + + print("\n=== Program-by-Program Impact ===") + for prog in analysis.program_statistics.outputs: + print( + f" {prog.program_name:30s} " + f"baseline=${prog.baseline_total / 1e9:8.1f}B " + f"reform=${prog.reform_total / 1e9:8.1f}B " + f"change=${prog.change / 1e9:+8.1f}B" + ) + + print("\n=== Decile Impacts ===") + for d in analysis.decile_impacts.outputs: + print( + f" Decile {d.decile:2d}: " + f"avg change=${d.absolute_change:+8.0f} " + f"relative={d.relative_change:+.2%}" + ) + + print("\n=== Poverty ===") + for bp, rp in zip( + analysis.baseline_poverty.outputs, + analysis.reform_poverty.outputs, + strict=True, + ): + print( + f" {bp.poverty_type:30s} " + f"baseline={bp.rate:.4f} " + f"reform={rp.rate:.4f} " + f"change={rp.rate - bp.rate:+.4f}" + ) + + print("\n=== Inequality ===") + bi = analysis.baseline_inequality + ri = analysis.reform_inequality + print(f" Gini: baseline={bi.gini:.4f} reform={ri.gini:.4f}") + print( + f" Top 10% share: baseline={bi.top_10_share:.4f} reform={ri.top_10_share:.4f}" + ) + print( + f" Top 1% share: baseline={bi.top_1_share:.4f} reform={ri.top_1_share:.4f}" + ) + + +if __name__ == "__main__": + main() diff --git a/src/policyengine/outputs/__init__.py b/src/policyengine/outputs/__init__.py index d426f743..f58ace8c 100644 --- a/src/policyengine/outputs/__init__.py +++ b/src/policyengine/outputs/__init__.py @@ -1,5 +1,15 @@ from policyengine.core import Output, OutputCollection from policyengine.outputs.aggregate import Aggregate, AggregateType +from policyengine.outputs.analysis_strategy import ( + AnalysisStrategy, + InequalityResult, + PovertyResult, + ProgramDefinition, +) +from policyengine.outputs.budget_summary import ( + BudgetSummaryItem, + compute_budget_summary, +) from policyengine.outputs.change_aggregate import ( ChangeAggregate, ChangeAggregateType, @@ -15,6 +25,10 @@ from policyengine.outputs.decile_impact import ( DecileImpact, calculate_decile_impacts, + compute_decile_impacts, +) +from policyengine.outputs.economic_impact import ( + economic_impact_analysis, ) from policyengine.outputs.inequality import ( UK_INEQUALITY_INCOME_VARIABLE, @@ -31,6 +45,7 @@ LocalAuthorityImpact, compute_uk_local_authority_impacts, ) +from policyengine.outputs.policy_reform_analysis import PolicyReformAnalysis from policyengine.outputs.poverty import ( AGE_GROUPS, GENDER_GROUPS, @@ -48,6 +63,7 @@ calculate_us_poverty_by_race, calculate_us_poverty_rates, ) +from policyengine.outputs.program_statistics import compute_program_statistics __all__ = [ "Output", @@ -86,4 +102,14 @@ "compute_uk_constituency_impacts", "LocalAuthorityImpact", "compute_uk_local_authority_impacts", + "BudgetSummaryItem", + "compute_budget_summary", + "compute_decile_impacts", + "compute_program_statistics", + "PolicyReformAnalysis", + "AnalysisStrategy", + "ProgramDefinition", + "PovertyResult", + "InequalityResult", + "economic_impact_analysis", ] diff --git a/src/policyengine/outputs/aggregate.py b/src/policyengine/outputs/aggregate.py index 9406a4d7..09189114 100644 --- a/src/policyengine/outputs/aggregate.py +++ b/src/policyengine/outputs/aggregate.py @@ -47,10 +47,15 @@ def run(self): # Get variable object var_obj = next( - v - for v in self.simulation.tax_benefit_model_version.variables - if v.name == self.variable + ( + v + for v in self.simulation.tax_benefit_model_version.variables + if v.name == self.variable + ), + None, ) + if var_obj is None: + raise ValueError(f"Variable '{self.variable}' not found in model") # Get the target entity data target_entity = self.entity or var_obj.entity @@ -68,10 +73,17 @@ def run(self): # Apply filters if self.filter_variable is not None: filter_var_obj = next( - v - for v in self.simulation.tax_benefit_model_version.variables - if v.name == self.filter_variable + ( + v + for v in self.simulation.tax_benefit_model_version.variables + if v.name == self.filter_variable + ), + None, ) + if filter_var_obj is None: + raise ValueError( + f"Filter variable '{self.filter_variable}' not found in model" + ) if filter_var_obj.entity != target_entity: filter_mapped = self.simulation.output_dataset.data.map_to_entity( diff --git a/src/policyengine/outputs/analysis_strategy.py b/src/policyengine/outputs/analysis_strategy.py new file mode 100644 index 00000000..b19e832c --- /dev/null +++ b/src/policyengine/outputs/analysis_strategy.py @@ -0,0 +1,88 @@ +"""Strategy protocol and result types for economic impact analysis.""" + +from __future__ import annotations + +from typing import TYPE_CHECKING, Protocol, TypedDict, runtime_checkable + +from pydantic import BaseModel, ConfigDict + +from policyengine.core import OutputCollection +from policyengine.outputs.inequality import Inequality +from policyengine.outputs.poverty import Poverty + +if TYPE_CHECKING: + from policyengine.core.simulation import Simulation + + +class ProgramDefinition(TypedDict): + """Definition of a program for program statistics computation.""" + + entity: str + is_tax: bool + + +class PovertyResult(BaseModel): + """Standardised poverty result returned by a country strategy.""" + + model_config = ConfigDict(arbitrary_types_allowed=True) + + baseline_poverty: OutputCollection[Poverty] + reform_poverty: OutputCollection[Poverty] + baseline_poverty_by_age: OutputCollection[Poverty] | None = None + reform_poverty_by_age: OutputCollection[Poverty] | None = None + baseline_poverty_by_gender: OutputCollection[Poverty] | None = None + reform_poverty_by_gender: OutputCollection[Poverty] | None = None + baseline_poverty_by_race: OutputCollection[Poverty] | None = None + reform_poverty_by_race: OutputCollection[Poverty] | None = None + + +class InequalityResult(BaseModel): + """Standardised inequality result returned by a country strategy.""" + + model_config = ConfigDict(arbitrary_types_allowed=True) + + baseline_inequality: Inequality + reform_inequality: Inequality + + +@runtime_checkable +class AnalysisStrategy(Protocol): + """Country-specific strategy for economic impact analysis. + + Each property/method corresponds to a standardised extension point + in the shared analysis pipeline. + """ + + @property + def income_variable(self) -> str: + """Primary income variable for decile / intra-decile analysis.""" + ... + + @property + def budget_variable_names(self) -> list[str]: + """Variable names for budget summary. + + Entities are looked up from the tax-benefit system at runtime. + """ + ... + + @property + def programs(self) -> dict[str, ProgramDefinition]: + """Program definitions: name -> ProgramDefinition.""" + ... + + def compute_poverty( + self, + baseline: Simulation, + reform: Simulation, + ) -> PovertyResult: + """Compute all poverty metrics (overall + demographic breakdowns).""" + ... + + def compute_inequality( + self, + baseline: Simulation, + reform: Simulation, + ) -> InequalityResult: + """Compute inequality metrics.""" + ... diff --git a/src/policyengine/outputs/budget_summary.py b/src/policyengine/outputs/budget_summary.py new file mode 100644 index 00000000..eccea659 --- /dev/null +++ b/src/policyengine/outputs/budget_summary.py @@ -0,0 +1,97 @@ +"""Budget summary output — totals for key budget variables under baseline and reform.""" + +from __future__ import annotations + +from typing import TYPE_CHECKING + +import pandas as pd +from pydantic import ConfigDict + +from policyengine.core import Output, OutputCollection +from policyengine.outputs.aggregate import Aggregate, AggregateType + +if TYPE_CHECKING: + from policyengine.core.simulation import Simulation + + +class BudgetSummaryItem(Output): + """One row of the budget summary — totals for a single variable.""" + + model_config = ConfigDict(arbitrary_types_allowed=True) + + baseline_simulation: Simulation + reform_simulation: Simulation + variable_name: str + entity: str + + # Results populated by run() + baseline_total: float | None = None + reform_total: float | None = None + change: float | None = None + + def run(self): + baseline_agg = Aggregate( + simulation=self.baseline_simulation, + variable=self.variable_name, + aggregate_type=AggregateType.SUM, + entity=self.entity, + ) + baseline_agg.run() + + reform_agg = Aggregate( + simulation=self.reform_simulation, + variable=self.variable_name, + aggregate_type=AggregateType.SUM, + entity=self.entity, + ) + reform_agg.run() + + self.baseline_total = float(baseline_agg.result) + self.reform_total = float(reform_agg.result) + self.change = self.reform_total - self.baseline_total + + +def compute_budget_summary( + baseline_simulation: Simulation, + reform_simulation: Simulation, + variable_names: list[str], +) -> OutputCollection[BudgetSummaryItem]: + """Compute budget totals for each variable under baseline and reform. + + Args: + baseline_simulation: Already-run baseline simulation. + reform_simulation: Already-run reform simulation. + variable_names: Variable names to aggregate. The entity for each + variable is looked up from the tax-benefit model version + attached to the baseline simulation. + + Returns: + OutputCollection of BudgetSummaryItem objects with a DataFrame. + """ + tbm = baseline_simulation.tax_benefit_model_version + results: list[BudgetSummaryItem] = [] + for var_name in variable_names: + entity = tbm.get_variable(var_name).entity + item = BudgetSummaryItem( + baseline_simulation=baseline_simulation, + reform_simulation=reform_simulation, + variable_name=var_name, + entity=entity, + ) + item.run() + results.append(item) + + df = pd.DataFrame( + [ + { + "variable_name": r.variable_name, + "entity": r.entity, + "baseline_total": r.baseline_total, + "reform_total": r.reform_total, + "change": r.change, + } + for r in results + ] + ) + + return OutputCollection(outputs=results, dataframe=df) diff --git a/src/policyengine/outputs/change_aggregate.py b/src/policyengine/outputs/change_aggregate.py index e1cd3985..45b685c0 100644 --- a/src/policyengine/outputs/change_aggregate.py +++ b/src/policyengine/outputs/change_aggregate.py @@ -59,10 +59,15 @@ def run(self): # Get variable object var_obj = next( - v - for v in self.baseline_simulation.tax_benefit_model_version.variables - if v.name == self.variable + ( + v + for v in self.baseline_simulation.tax_benefit_model_version.variables + if v.name == self.variable + ), + None, ) + if var_obj is None: + raise ValueError(f"Variable '{self.variable}' not found in model") # Get the target entity data target_entity = self.entity or var_obj.entity @@ -123,10 +128,17 @@ def run(self): # Apply filter_variable filters if self.filter_variable is not None: filter_var_obj = next( - v - for v in self.baseline_simulation.tax_benefit_model_version.variables - if v.name == self.filter_variable + ( + v + for v in self.baseline_simulation.tax_benefit_model_version.variables + if v.name == self.filter_variable + ), + None, ) + if filter_var_obj is None: + raise ValueError( + f"Filter variable '{self.filter_variable}' not found in model" + ) if filter_var_obj.entity != target_entity: filter_mapped = ( diff --git a/src/policyengine/outputs/decile_impact.py b/src/policyengine/outputs/decile_impact.py index 9d5e2e43..0379ae1c 100644 --- a/src/policyengine/outputs/decile_impact.py +++ b/src/policyengine/outputs/decile_impact.py @@ -34,10 +34,15 @@ def run(self): """Calculate impact for this specific decile.""" # Get variable object to determine entity var_obj = next( - v - for v in self.baseline_simulation.tax_benefit_model_version.variables - if v.name == self.income_variable + ( + v + for v in self.baseline_simulation.tax_benefit_model_version.variables + if v.name == self.income_variable + ), + None, ) + if var_obj is None: + raise ValueError(f"Variable '{self.income_variable}' not found in model") # Get target entity target_entity = self.entity or var_obj.entity @@ -96,6 +101,63 @@ def run(self): self.count_no_change = float((absolute_change[mask] == 0).sum()) +def compute_decile_impacts( + baseline_simulation: Simulation, + reform_simulation: Simulation, + income_variable: str = "equiv_hbai_household_net_income", + entity: str | None = None, + quantiles: int = 10, +) -> OutputCollection[DecileImpact]: + """Calculate decile-by-decile impact using already-run simulations. + + Unlike ``calculate_decile_impacts`` this does **not** create new + Simulation objects — it works directly with the provided ones. + + Args: + baseline_simulation: Already-run baseline simulation. + reform_simulation: Already-run reform simulation. + income_variable: Variable to measure income changes. + entity: Entity to aggregate on (default: variable's entity). + quantiles: Number of quantiles (default 10 for deciles). + + Returns: + OutputCollection of DecileImpact objects with a DataFrame. + """ + results = [] + for decile in range(1, quantiles + 1): + impact = DecileImpact.model_construct( + baseline_simulation=baseline_simulation, + reform_simulation=reform_simulation, + income_variable=income_variable, + entity=entity, + decile=decile, + quantiles=quantiles, + ) + impact.run() + results.append(impact) + + df = pd.DataFrame( + [ + { + "baseline_simulation_id": r.baseline_simulation.id, + "reform_simulation_id": r.reform_simulation.id, + "income_variable": r.income_variable, + "decile": r.decile, + "baseline_mean": r.baseline_mean, + "reform_mean": r.reform_mean, + "absolute_change": r.absolute_change, + "relative_change": r.relative_change, + "count_better_off": r.count_better_off, + "count_worse_off": r.count_worse_off, + "count_no_change": r.count_no_change, + } + for r in results + ] + ) + + return OutputCollection(outputs=results, dataframe=df) + + def calculate_decile_impacts( dataset: Dataset, tax_benefit_model_version: TaxBenefitModelVersion, diff --git a/src/policyengine/outputs/economic_impact.py b/src/policyengine/outputs/economic_impact.py new file mode 100644 index 00000000..f80ef58b --- /dev/null +++ b/src/policyengine/outputs/economic_impact.py @@ -0,0 +1,107 @@ +"""Shared cross-country economic impact analysis.""" + +from __future__ import annotations + +from typing import TYPE_CHECKING + +import numpy as np + +from policyengine.outputs.analysis_strategy import AnalysisStrategy +from policyengine.outputs.budget_summary import compute_budget_summary +from policyengine.outputs.decile_impact import compute_decile_impacts +from policyengine.outputs.intra_decile_impact import compute_intra_decile_impacts +from policyengine.outputs.policy_reform_analysis import PolicyReformAnalysis +from policyengine.outputs.program_statistics import compute_program_statistics + +if TYPE_CHECKING: + from policyengine.core.simulation import Simulation + + +def economic_impact_analysis( + baseline_simulation: Simulation, + reform_simulation: Simulation, + strategy: AnalysisStrategy, +) -> PolicyReformAnalysis: + """Perform comprehensive economic impact analysis of a policy reform. + + Shared implementation that delegates country-specific work to + *strategy* at five standardised extension points. + + Both simulations must already be run (i.e. ``ensure()`` called). + """ + if not isinstance(strategy, AnalysisStrategy): + raise TypeError( + f"strategy must implement the AnalysisStrategy protocol, " + f"but got {type(strategy).__name__}. Ensure it defines: " + f"income_variable, budget_variable_names, programs, " + f"compute_poverty(), and compute_inequality()." + ) + + baseline_simulation.ensure() + reform_simulation.ensure() + + # --- shared computations ------------------------------------------------ + + # Decile impacts + decile_impacts = compute_decile_impacts( + baseline_simulation, + reform_simulation, + income_variable=strategy.income_variable, + ) + + # Intra-decile impacts + intra_decile_impacts = compute_intra_decile_impacts( + baseline_simulation, + reform_simulation, + income_variable=strategy.income_variable, + ) + + # Budget summary (entity looked up from TBM inside compute_budget_summary) + budget = compute_budget_summary( + baseline_simulation, + reform_simulation, + strategy.budget_variable_names, + ) + + # Household counts — raw weight sums to avoid MicroSeries double-weighting + hh_weight_baseline = baseline_simulation.output_dataset.data.household[ + "household_weight" + ] + hh_weight_reform = reform_simulation.output_dataset.data.household[ + "household_weight" + ] + household_count_baseline = float(np.array(hh_weight_baseline).sum()) + household_count_reform = float(np.array(hh_weight_reform).sum()) + + # Program statistics + programs = compute_program_statistics( + baseline_simulation, + reform_simulation, + strategy.programs, + ) + + # --- strategy extension points ------------------------------------------ + + poverty = strategy.compute_poverty(baseline_simulation, reform_simulation) + inequality = strategy.compute_inequality(baseline_simulation, reform_simulation) + + # --- assemble result ---------------------------------------------------- + + return PolicyReformAnalysis( + decile_impacts=decile_impacts, + intra_decile_impacts=intra_decile_impacts, + budget_summary=budget, + household_count_baseline=household_count_baseline, + household_count_reform=household_count_reform, + program_statistics=programs, + baseline_poverty=poverty.baseline_poverty, + reform_poverty=poverty.reform_poverty, + baseline_poverty_by_age=poverty.baseline_poverty_by_age, + reform_poverty_by_age=poverty.reform_poverty_by_age, + baseline_poverty_by_gender=poverty.baseline_poverty_by_gender, + reform_poverty_by_gender=poverty.reform_poverty_by_gender, + baseline_poverty_by_race=poverty.baseline_poverty_by_race, + reform_poverty_by_race=poverty.reform_poverty_by_race, + baseline_inequality=inequality.baseline_inequality, + reform_inequality=inequality.reform_inequality, + ) diff --git a/src/policyengine/outputs/policy_reform_analysis.py b/src/policyengine/outputs/policy_reform_analysis.py new file mode 100644 index 00000000..710bab89 --- /dev/null +++ b/src/policyengine/outputs/policy_reform_analysis.py @@ -0,0 +1,60 @@ +"""Unified result container for a complete policy reform analysis.""" + +from __future__ import annotations + +from typing import TYPE_CHECKING + +from pydantic import BaseModel, ConfigDict + +from policyengine.core import OutputCollection +from policyengine.outputs.decile_impact import DecileImpact +from policyengine.outputs.inequality import Inequality +from policyengine.outputs.intra_decile_impact import IntraDecileImpact +from policyengine.outputs.poverty import Poverty + +if TYPE_CHECKING: + from policyengine.outputs.budget_summary import BudgetSummaryItem + + +class PolicyReformAnalysis(BaseModel): + """Complete result of an economic impact analysis. + + This is a pure result container — it does no computation itself. + ``economic_impact_analysis()`` (in each country's ``analysis.py``) + builds and returns an instance of this class. + + Geographic outputs (constituency, local authority, congressional + district) and wealth deciles are **not** included here because + they depend on external data or optional dataset variables and + must be able to fail independently of the core analysis. + """ + + model_config = ConfigDict(arbitrary_types_allowed=True) + + # Distributional + decile_impacts: OutputCollection[DecileImpact] + intra_decile_impacts: OutputCollection[IntraDecileImpact] + + # Budget + budget_summary: OutputCollection[BudgetSummaryItem] + household_count_baseline: float + household_count_reform: float + + # Programs + program_statistics: ( + OutputCollection # US ProgramStatistics or UK ProgrammeStatistics + ) + + # Poverty — overall always present, demographics optional + baseline_poverty: OutputCollection[Poverty] + reform_poverty: OutputCollection[Poverty] + baseline_poverty_by_age: OutputCollection[Poverty] | None = None + reform_poverty_by_age: OutputCollection[Poverty] | None = None + baseline_poverty_by_gender: OutputCollection[Poverty] | None = None + reform_poverty_by_gender: OutputCollection[Poverty] | None = None + baseline_poverty_by_race: OutputCollection[Poverty] | None = None + reform_poverty_by_race: OutputCollection[Poverty] | None = None + + # Inequality + baseline_inequality: Inequality + reform_inequality: Inequality diff --git a/src/policyengine/outputs/program_statistics.py b/src/policyengine/outputs/program_statistics.py new file mode 100644 index 00000000..8f49c9ba --- /dev/null +++ b/src/policyengine/outputs/program_statistics.py @@ -0,0 +1,109 @@ +"""Shared compute function for program/programme statistics.""" + +from __future__ import annotations + +import logging +from typing import TYPE_CHECKING + +import pandas as pd + +from policyengine.core import OutputCollection +from policyengine.outputs.analysis_strategy import ProgramDefinition + +if TYPE_CHECKING: + from policyengine.core.simulation import Simulation + +logger = logging.getLogger(__name__) + + +def compute_program_statistics( + baseline_simulation: Simulation, + reform_simulation: Simulation, + programs: dict[str, ProgramDefinition], +) -> OutputCollection: + """Compute per-program statistics for a policy reform. + + Args: + baseline_simulation: Already-run baseline simulation. + reform_simulation: Already-run reform simulation. + programs: Mapping of program name to :class:`ProgramDefinition` + with keys ``"entity"`` (str) and ``"is_tax"`` (bool). + Example:: + + { + "income_tax": {"entity": "tax_unit", "is_tax": True}, + "snap": {"entity": "spm_unit", "is_tax": False}, + } + + Returns: + OutputCollection of ProgramStatistics/ProgrammeStatistics objects. + Programs that raise KeyError or ValueError are silently skipped. + """ + # Import both variants — only one will actually be used depending on + # which country package is installed, but we try both so this function + # works for either. + ProgramStats: type | None = None + try: + from policyengine.tax_benefit_models.us.outputs import ProgramStatistics + + ProgramStats = ProgramStatistics + except ImportError: + pass + if ProgramStats is None: + try: + from policyengine.tax_benefit_models.uk.outputs import ProgrammeStatistics + + ProgramStats = ProgrammeStatistics + except ImportError: + pass + if ProgramStats is None: + raise ImportError( + "Neither ProgramStatistics (US) nor ProgrammeStatistics (UK) could be imported" + ) + + # Determine the field name for the program name attribute + # US uses "program_name", UK uses "programme_name" + if hasattr(ProgramStats, "model_fields"): + name_field = ( + "program_name" + if "program_name" in ProgramStats.model_fields + else "programme_name" + ) + else: + name_field = "program_name" + + results = [] + for prog_name, prog_info in programs.items(): + try: + stats = ProgramStats( + baseline_simulation=baseline_simulation, + reform_simulation=reform_simulation, + **{name_field: prog_name}, + entity=prog_info["entity"], + is_tax=prog_info.get("is_tax", False), + ) + stats.run() + results.append(stats) + except (KeyError, ValueError) as exc: + logger.warning("Skipping program %s: %s", prog_name, exc, exc_info=True) + continue + + df = pd.DataFrame( + [ + { + "program_name": getattr(r, name_field), + "entity": r.entity, + "is_tax": r.is_tax, + "baseline_total": r.baseline_total, + "reform_total": r.reform_total, + "change": r.change, + "baseline_count": r.baseline_count, + "reform_count": r.reform_count, + "winners": r.winners, + "losers": r.losers, + } + for r in results + ] + ) + + return OutputCollection(outputs=results, dataframe=df) diff --git a/src/policyengine/tax_benefit_models/uk/__init__.py b/src/policyengine/tax_benefit_models/uk/__init__.py index 09e697b7..73be9e70 100644 --- a/src/policyengine/tax_benefit_models/uk/__init__.py +++ b/src/policyengine/tax_benefit_models/uk/__init__.py @@ -4,8 +4,17 @@ if find_spec("policyengine_uk") is not None: from policyengine.core import Dataset + from policyengine.core.simulation import Simulation + from policyengine.outputs.analysis_strategy import ( + AnalysisStrategy, + InequalityResult, + PovertyResult, + ) + from policyengine.outputs.budget_summary import BudgetSummaryItem + from policyengine.outputs.policy_reform_analysis import PolicyReformAnalysis from .analysis import ( + UKAnalysisStrategy, UKHouseholdInput, UKHouseholdOutput, calculate_household_impact, @@ -31,6 +40,11 @@ UKYearData.model_rebuild() PolicyEngineUKDataset.model_rebuild() PolicyEngineUKLatest.model_rebuild() + ProgrammeStatistics.model_rebuild(_types_namespace={"Simulation": Simulation}) + BudgetSummaryItem.model_rebuild(_types_namespace={"Simulation": Simulation}) + PolicyReformAnalysis.model_rebuild( + _types_namespace={"BudgetSummaryItem": BudgetSummaryItem} + ) __all__ = [ "UKYearData", @@ -47,6 +61,11 @@ "UKHouseholdInput", "UKHouseholdOutput", "ProgrammeStatistics", + "PolicyReformAnalysis", + "UKAnalysisStrategy", + "AnalysisStrategy", + "PovertyResult", + "InequalityResult", ] else: __all__ = [] diff --git a/src/policyengine/tax_benefit_models/uk/analysis.py b/src/policyengine/tax_benefit_models/uk/analysis.py index c4b32016..be77de61 100644 --- a/src/policyengine/tax_benefit_models/uk/analysis.py +++ b/src/policyengine/tax_benefit_models/uk/analysis.py @@ -8,24 +8,26 @@ from microdf import MicroDataFrame from pydantic import BaseModel, Field, create_model -from policyengine.core import OutputCollection, Simulation +from policyengine.core import Simulation from policyengine.core.policy import Policy -from policyengine.outputs.decile_impact import ( - DecileImpact, - calculate_decile_impacts, +from policyengine.outputs.analysis_strategy import ( + InequalityResult, + PovertyResult, + ProgramDefinition, ) -from policyengine.outputs.inequality import ( - Inequality, - calculate_uk_inequality, +from policyengine.outputs.economic_impact import ( + economic_impact_analysis as _shared_economic_impact_analysis, ) +from policyengine.outputs.inequality import calculate_uk_inequality +from policyengine.outputs.policy_reform_analysis import PolicyReformAnalysis from policyengine.outputs.poverty import ( - Poverty, + calculate_uk_poverty_by_age, + calculate_uk_poverty_by_gender, calculate_uk_poverty_rates, ) from .datasets import PolicyEngineUKDataset, UKYearData from .model import uk_latest -from .outputs import ProgrammeStatistics def _create_entity_output_model(entity: str, variables: list[str]) -> type[BaseModel]: @@ -170,115 +172,76 @@ def safe_convert(value): ) -class PolicyReformAnalysis(BaseModel): - """Complete policy reform analysis result.""" +# --------------------------------------------------------------------------- +# UK analysis strategy +# --------------------------------------------------------------------------- - decile_impacts: OutputCollection[DecileImpact] - programme_statistics: OutputCollection[ProgrammeStatistics] - baseline_poverty: OutputCollection[Poverty] - reform_poverty: OutputCollection[Poverty] - baseline_inequality: Inequality - reform_inequality: Inequality +class UKAnalysisStrategy: + """Country-specific strategy for UK economic impact analysis.""" -def economic_impact_analysis( - baseline_simulation: Simulation, - reform_simulation: Simulation, -) -> PolicyReformAnalysis: - """Perform comprehensive analysis of a policy reform. - - Returns: - PolicyReformAnalysis containing decile impacts and programme statistics - """ - baseline_simulation.ensure() - reform_simulation.ensure() + @property + def income_variable(self) -> str: + return "equiv_hbai_household_net_income" - assert len(baseline_simulation.dataset.data.household) > 100, ( - "Baseline simulation must have more than 100 households" - ) - assert len(reform_simulation.dataset.data.household) > 100, ( - "Reform simulation must have more than 100 households" - ) - - # Decile impact - decile_impacts = calculate_decile_impacts( - dataset=baseline_simulation.dataset, - tax_benefit_model_version=baseline_simulation.tax_benefit_model_version, - baseline_policy=baseline_simulation.policy, - reform_policy=reform_simulation.policy, - dynamic=baseline_simulation.dynamic, - ) + @property + def budget_variable_names(self) -> list[str]: + return [ + "household_tax", + "household_benefits", + "household_net_income", + ] - # Major programmes to analyse - programmes = { - # Tax - "income_tax": {"entity": "person", "is_tax": True}, - "national_insurance": {"entity": "person", "is_tax": True}, - "vat": {"entity": "household", "is_tax": True}, - "council_tax": {"entity": "household", "is_tax": True}, - # Benefits - "universal_credit": {"entity": "person", "is_tax": False}, - "child_benefit": {"entity": "person", "is_tax": False}, - "pension_credit": {"entity": "person", "is_tax": False}, - "income_support": {"entity": "person", "is_tax": False}, - "working_tax_credit": {"entity": "person", "is_tax": False}, - "child_tax_credit": {"entity": "person", "is_tax": False}, - } + @property + def programs(self) -> dict[str, ProgramDefinition]: + return { + "income_tax": ProgramDefinition(entity="person", is_tax=True), + "national_insurance": ProgramDefinition(entity="person", is_tax=True), + "vat": ProgramDefinition(entity="household", is_tax=True), + "council_tax": ProgramDefinition(entity="household", is_tax=True), + "universal_credit": ProgramDefinition(entity="person", is_tax=False), + "child_benefit": ProgramDefinition(entity="person", is_tax=False), + "pension_credit": ProgramDefinition(entity="person", is_tax=False), + "income_support": ProgramDefinition(entity="person", is_tax=False), + "working_tax_credit": ProgramDefinition(entity="person", is_tax=False), + "child_tax_credit": ProgramDefinition(entity="person", is_tax=False), + } + + def compute_poverty( + self, + baseline: Simulation, + reform: Simulation, + ) -> PovertyResult: + return PovertyResult( + baseline_poverty=calculate_uk_poverty_rates(baseline), + reform_poverty=calculate_uk_poverty_rates(reform), + baseline_poverty_by_age=calculate_uk_poverty_by_age(baseline), + reform_poverty_by_age=calculate_uk_poverty_by_age(reform), + baseline_poverty_by_gender=calculate_uk_poverty_by_gender(baseline), + reform_poverty_by_gender=calculate_uk_poverty_by_gender(reform), + ) - programme_statistics = [] + def compute_inequality( + self, + baseline: Simulation, + reform: Simulation, + ) -> InequalityResult: + return InequalityResult( + baseline_inequality=calculate_uk_inequality(baseline), + reform_inequality=calculate_uk_inequality(reform), + ) - for programme_name, programme_info in programmes.items(): - entity = programme_info["entity"] - is_tax = programme_info["is_tax"] - stats = ProgrammeStatistics( - baseline_simulation=baseline_simulation, - reform_simulation=reform_simulation, - programme_name=programme_name, - entity=entity, - is_tax=is_tax, - ) - stats.run() - programme_statistics.append(stats) - - # Create DataFrame - programme_df = pd.DataFrame( - [ - { - "baseline_simulation_id": p.baseline_simulation.id, - "reform_simulation_id": p.reform_simulation.id, - "programme_name": p.programme_name, - "entity": p.entity, - "is_tax": p.is_tax, - "baseline_total": p.baseline_total, - "reform_total": p.reform_total, - "change": p.change, - "baseline_count": p.baseline_count, - "reform_count": p.reform_count, - "winners": p.winners, - "losers": p.losers, - } - for p in programme_statistics - ] - ) +UK_STRATEGY = UKAnalysisStrategy() - programme_collection = OutputCollection( - outputs=programme_statistics, dataframe=programme_df - ) - # Calculate poverty rates for both simulations - baseline_poverty = calculate_uk_poverty_rates(baseline_simulation) - reform_poverty = calculate_uk_poverty_rates(reform_simulation) - - # Calculate inequality for both simulations - baseline_inequality = calculate_uk_inequality(baseline_simulation) - reform_inequality = calculate_uk_inequality(reform_simulation) - - return PolicyReformAnalysis( - decile_impacts=decile_impacts, - programme_statistics=programme_collection, - baseline_poverty=baseline_poverty, - reform_poverty=reform_poverty, - baseline_inequality=baseline_inequality, - reform_inequality=reform_inequality, +def economic_impact_analysis( + baseline_simulation: Simulation, + reform_simulation: Simulation, +) -> PolicyReformAnalysis: + """Perform comprehensive economic impact analysis of a UK policy reform.""" + return _shared_economic_impact_analysis( + baseline_simulation, + reform_simulation, + UK_STRATEGY, ) diff --git a/src/policyengine/tax_benefit_models/us/__init__.py b/src/policyengine/tax_benefit_models/us/__init__.py index b5a95b3f..bf92cbaf 100644 --- a/src/policyengine/tax_benefit_models/us/__init__.py +++ b/src/policyengine/tax_benefit_models/us/__init__.py @@ -4,8 +4,17 @@ if find_spec("policyengine_us") is not None: from policyengine.core import Dataset + from policyengine.core.simulation import Simulation + from policyengine.outputs.analysis_strategy import ( + AnalysisStrategy, + InequalityResult, + PovertyResult, + ) + from policyengine.outputs.budget_summary import BudgetSummaryItem + from policyengine.outputs.policy_reform_analysis import PolicyReformAnalysis from .analysis import ( + USAnalysisStrategy, USHouseholdInput, USHouseholdOutput, calculate_household_impact, @@ -31,6 +40,11 @@ USYearData.model_rebuild() PolicyEngineUSDataset.model_rebuild() PolicyEngineUSLatest.model_rebuild() + ProgramStatistics.model_rebuild(_types_namespace={"Simulation": Simulation}) + BudgetSummaryItem.model_rebuild(_types_namespace={"Simulation": Simulation}) + PolicyReformAnalysis.model_rebuild( + _types_namespace={"BudgetSummaryItem": BudgetSummaryItem} + ) __all__ = [ "USYearData", @@ -47,6 +61,11 @@ "USHouseholdInput", "USHouseholdOutput", "ProgramStatistics", + "PolicyReformAnalysis", + "USAnalysisStrategy", + "AnalysisStrategy", + "PovertyResult", + "InequalityResult", ] else: __all__ = [] diff --git a/src/policyengine/tax_benefit_models/us/analysis.py b/src/policyengine/tax_benefit_models/us/analysis.py index 4b14a93f..85b5d77c 100644 --- a/src/policyengine/tax_benefit_models/us/analysis.py +++ b/src/policyengine/tax_benefit_models/us/analysis.py @@ -8,24 +8,27 @@ from microdf import MicroDataFrame from pydantic import BaseModel, Field -from policyengine.core import OutputCollection, Simulation +from policyengine.core import Simulation from policyengine.core.policy import Policy -from policyengine.outputs.decile_impact import ( - DecileImpact, - calculate_decile_impacts, +from policyengine.outputs.analysis_strategy import ( + InequalityResult, + PovertyResult, + ProgramDefinition, ) -from policyengine.outputs.inequality import ( - Inequality, - calculate_us_inequality, +from policyengine.outputs.economic_impact import ( + economic_impact_analysis as _shared_economic_impact_analysis, ) +from policyengine.outputs.inequality import calculate_us_inequality +from policyengine.outputs.policy_reform_analysis import PolicyReformAnalysis from policyengine.outputs.poverty import ( - Poverty, + calculate_us_poverty_by_age, + calculate_us_poverty_by_gender, + calculate_us_poverty_by_race, calculate_us_poverty_rates, ) from .datasets import PolicyEngineUSDataset, USYearData from .model import us_latest -from .outputs import ProgramStatistics class USHouseholdOutput(BaseModel): @@ -186,118 +189,75 @@ def extract_entity_outputs( ) -class PolicyReformAnalysis(BaseModel): - """Complete policy reform analysis result.""" +# --------------------------------------------------------------------------- +# US analysis strategy +# --------------------------------------------------------------------------- - decile_impacts: OutputCollection[DecileImpact] - program_statistics: OutputCollection[ProgramStatistics] - baseline_poverty: OutputCollection[Poverty] - reform_poverty: OutputCollection[Poverty] - baseline_inequality: Inequality - reform_inequality: Inequality +class USAnalysisStrategy: + """Country-specific strategy for US economic impact analysis.""" -def economic_impact_analysis( - baseline_simulation: Simulation, - reform_simulation: Simulation, -) -> PolicyReformAnalysis: - """Perform comprehensive analysis of a policy reform. - - Returns: - PolicyReformAnalysis containing decile impacts and program statistics - """ - baseline_simulation.ensure() - reform_simulation.ensure() + @property + def income_variable(self) -> str: + return "household_net_income" - assert len(baseline_simulation.dataset.data.household) > 100, ( - "Baseline simulation must have more than 100 households" - ) - assert len(reform_simulation.dataset.data.household) > 100, ( - "Reform simulation must have more than 100 households" - ) - - # Decile impact (using household_net_income for US) - decile_impacts = calculate_decile_impacts( - dataset=baseline_simulation.dataset, - tax_benefit_model_version=baseline_simulation.tax_benefit_model_version, - baseline_policy=baseline_simulation.policy, - reform_policy=reform_simulation.policy, - dynamic=baseline_simulation.dynamic, - income_variable="household_net_income", - ) + @property + def budget_variable_names(self) -> list[str]: + return [ + "household_tax", + "household_benefits", + "household_net_income", + "household_state_income_tax", + ] - # Major programs to analyse - programs = { - # Federal taxes - "income_tax": {"entity": "tax_unit", "is_tax": True}, - "payroll_tax": {"entity": "person", "is_tax": True}, - # State and local taxes - "state_income_tax": {"entity": "tax_unit", "is_tax": True}, - # Benefits - "snap": {"entity": "spm_unit", "is_tax": False}, - "tanf": {"entity": "spm_unit", "is_tax": False}, - "ssi": {"entity": "person", "is_tax": False}, - "social_security": {"entity": "person", "is_tax": False}, - "medicare": {"entity": "person", "is_tax": False}, - "medicaid": {"entity": "person", "is_tax": False}, - "eitc": {"entity": "tax_unit", "is_tax": False}, - "ctc": {"entity": "tax_unit", "is_tax": False}, - } + @property + def programs(self) -> dict[str, ProgramDefinition]: + return { + "income_tax": ProgramDefinition(entity="tax_unit", is_tax=True), + "employee_payroll_tax": ProgramDefinition(entity="person", is_tax=True), + "snap": ProgramDefinition(entity="spm_unit", is_tax=False), + "tanf": ProgramDefinition(entity="spm_unit", is_tax=False), + "ssi": ProgramDefinition(entity="spm_unit", is_tax=False), + "social_security": ProgramDefinition(entity="person", is_tax=False), + } + + def compute_poverty( + self, + baseline: Simulation, + reform: Simulation, + ) -> PovertyResult: + return PovertyResult( + baseline_poverty=calculate_us_poverty_rates(baseline), + reform_poverty=calculate_us_poverty_rates(reform), + baseline_poverty_by_age=calculate_us_poverty_by_age(baseline), + reform_poverty_by_age=calculate_us_poverty_by_age(reform), + baseline_poverty_by_gender=calculate_us_poverty_by_gender(baseline), + reform_poverty_by_gender=calculate_us_poverty_by_gender(reform), + baseline_poverty_by_race=calculate_us_poverty_by_race(baseline), + reform_poverty_by_race=calculate_us_poverty_by_race(reform), + ) - program_statistics = [] + def compute_inequality( + self, + baseline: Simulation, + reform: Simulation, + ) -> InequalityResult: + return InequalityResult( + baseline_inequality=calculate_us_inequality(baseline), + reform_inequality=calculate_us_inequality(reform), + ) - for program_name, program_info in programs.items(): - entity = program_info["entity"] - is_tax = program_info["is_tax"] - stats = ProgramStatistics( - baseline_simulation=baseline_simulation, - reform_simulation=reform_simulation, - program_name=program_name, - entity=entity, - is_tax=is_tax, - ) - stats.run() - program_statistics.append(stats) - - # Create DataFrame - program_df = pd.DataFrame( - [ - { - "baseline_simulation_id": p.baseline_simulation.id, - "reform_simulation_id": p.reform_simulation.id, - "program_name": p.program_name, - "entity": p.entity, - "is_tax": p.is_tax, - "baseline_total": p.baseline_total, - "reform_total": p.reform_total, - "change": p.change, - "baseline_count": p.baseline_count, - "reform_count": p.reform_count, - "winners": p.winners, - "losers": p.losers, - } - for p in program_statistics - ] - ) +US_STRATEGY = USAnalysisStrategy() - program_collection = OutputCollection( - outputs=program_statistics, dataframe=program_df - ) - # Calculate poverty rates for both simulations - baseline_poverty = calculate_us_poverty_rates(baseline_simulation) - reform_poverty = calculate_us_poverty_rates(reform_simulation) - - # Calculate inequality for both simulations - baseline_inequality = calculate_us_inequality(baseline_simulation) - reform_inequality = calculate_us_inequality(reform_simulation) - - return PolicyReformAnalysis( - decile_impacts=decile_impacts, - program_statistics=program_collection, - baseline_poverty=baseline_poverty, - reform_poverty=reform_poverty, - baseline_inequality=baseline_inequality, - reform_inequality=reform_inequality, +def economic_impact_analysis( + baseline_simulation: Simulation, + reform_simulation: Simulation, +) -> PolicyReformAnalysis: + """Perform comprehensive economic impact analysis of a US policy reform.""" + return _shared_economic_impact_analysis( + baseline_simulation, + reform_simulation, + US_STRATEGY, ) diff --git a/tests/test_aggregate.py b/tests/test_aggregate.py index 5b4e8b27..8c0a5c37 100644 --- a/tests/test_aggregate.py +++ b/tests/test_aggregate.py @@ -478,7 +478,7 @@ def test_aggregate_invalid_variable(): variable="nonexistent_variable", aggregate_type=AggregateType.SUM, ) - with pytest.raises(StopIteration): + with pytest.raises(ValueError): agg.run() # Invalid filter variable name should raise error on run() @@ -488,5 +488,5 @@ def test_aggregate_invalid_variable(): aggregate_type=AggregateType.SUM, filter_variable="nonexistent_filter", ) - with pytest.raises(StopIteration): + with pytest.raises(ValueError): agg.run() diff --git a/tests/test_economic_impact_outputs.py b/tests/test_economic_impact_outputs.py new file mode 100644 index 00000000..f5a1dd33 --- /dev/null +++ b/tests/test_economic_impact_outputs.py @@ -0,0 +1,340 @@ +"""Tests for the new economic impact output modules.""" + +from unittest.mock import MagicMock, patch + +import numpy as np +import pandas as pd +import pytest +from microdf import MicroDataFrame + +from policyengine.outputs.analysis_strategy import AnalysisStrategy, ProgramDefinition +from policyengine.outputs.budget_summary import compute_budget_summary +from policyengine.outputs.change_aggregate import ChangeAggregate, ChangeAggregateType +from policyengine.outputs.decile_impact import DecileImpact, compute_decile_impacts + +# --------------------------------------------------------------------------- +# Helpers (same pattern as test_intra_decile_impact.py) +# --------------------------------------------------------------------------- + + +def _make_variable_mock(name: str, entity: str) -> MagicMock: + """Create a mock Variable with name and entity attributes.""" + var = MagicMock() + var.name = name + var.entity = entity + return var + + +def _make_sim(household_data: dict, variables: list | None = None) -> MagicMock: + """Create a mock Simulation with household-level data.""" + hh_df = MicroDataFrame( + pd.DataFrame(household_data), + weights="household_weight", + ) + sim = MagicMock() + sim.output_dataset.data.household = hh_df + sim.id = "test-sim" + if variables is not None: + sim.tax_benefit_model_version.variables = variables + return sim + + +# --------------------------------------------------------------------------- +# US AnalysisStrategy tests +# --------------------------------------------------------------------------- + + +def test_us_strategy_programs(): + """USAnalysisStrategy should contain expected program keys.""" + from policyengine.tax_benefit_models.us.analysis import USAnalysisStrategy + + strategy = USAnalysisStrategy() + expected = { + "income_tax", + "employee_payroll_tax", + "snap", + "tanf", + "ssi", + "social_security", + } + assert set(strategy.programs.keys()) == expected + + +def test_us_strategy_conforms_to_protocol(): + """USAnalysisStrategy should satisfy the AnalysisStrategy protocol.""" + from policyengine.tax_benefit_models.us.analysis import USAnalysisStrategy + + assert isinstance(USAnalysisStrategy(), AnalysisStrategy) + + +def test_us_strategy_program_structure(): + """Each program entry should be a ProgramDefinition with 'entity' and 'is_tax'.""" + from policyengine.tax_benefit_models.us.analysis import USAnalysisStrategy + + for name, info in USAnalysisStrategy().programs.items(): + assert "entity" in info, f"US program {name} missing 'entity'" + assert "is_tax" in info, f"US program {name} missing 'is_tax'" + + +# --------------------------------------------------------------------------- +# UK AnalysisStrategy tests (conditional on policyengine_uk being installed) +# --------------------------------------------------------------------------- + +uk_installed = pytest.importorskip( + "policyengine_uk", reason="policyengine_uk not installed" +) + + +def test_uk_strategy_programs(): + """UKAnalysisStrategy should contain expected programme keys.""" + from policyengine.tax_benefit_models.uk.analysis import UKAnalysisStrategy + + expected = { + "income_tax", + "national_insurance", + "vat", + "council_tax", + "universal_credit", + "child_benefit", + "pension_credit", + "income_support", + "working_tax_credit", + "child_tax_credit", + } + assert set(UKAnalysisStrategy().programs.keys()) == expected + + +def test_uk_strategy_conforms_to_protocol(): + """UKAnalysisStrategy should satisfy the AnalysisStrategy protocol.""" + from policyengine.tax_benefit_models.uk.analysis import UKAnalysisStrategy + + assert isinstance(UKAnalysisStrategy(), AnalysisStrategy) + + +def test_uk_strategy_program_structure(): + """Each programme entry should be a ProgramDefinition with 'entity' and 'is_tax'.""" + from policyengine.tax_benefit_models.uk.analysis import UKAnalysisStrategy + + for name, info in UKAnalysisStrategy().programs.items(): + assert "entity" in info, f"UK programme {name} missing 'entity'" + assert "is_tax" in info, f"UK programme {name} missing 'is_tax'" + + +# --------------------------------------------------------------------------- +# Shared economic_impact_analysis tests +# --------------------------------------------------------------------------- + + +def test_economic_impact_analysis_rejects_bad_strategy(): + """economic_impact_analysis should raise TypeError for non-strategy objects.""" + from policyengine.outputs.economic_impact import economic_impact_analysis + + sim = MagicMock() + with pytest.raises(TypeError, match="AnalysisStrategy protocol"): + economic_impact_analysis(sim, sim, "not_a_strategy") + + +def test_economic_impact_analysis_calls_ensure(): + """economic_impact_analysis should call ensure() on both simulations.""" + from policyengine.outputs.economic_impact import economic_impact_analysis + + baseline = MagicMock() + reform = MagicMock() + + strategy = MagicMock(spec=AnalysisStrategy) + strategy.income_variable = "household_net_income" + strategy.budget_variable_names = ["household_tax"] + strategy.programs = { + "income_tax": ProgramDefinition(entity="tax_unit", is_tax=True) + } + strategy.compute_poverty.return_value = MagicMock() + strategy.compute_inequality.return_value = MagicMock() + + with ( + patch( + "policyengine.outputs.economic_impact.compute_decile_impacts" + ) as mock_decile, + patch( + "policyengine.outputs.economic_impact.compute_intra_decile_impacts" + ) as mock_intra, + patch( + "policyengine.outputs.economic_impact.compute_budget_summary" + ) as mock_budget, + patch( + "policyengine.outputs.economic_impact.compute_program_statistics" + ) as mock_prog, + patch("policyengine.outputs.economic_impact.PolicyReformAnalysis"), + ): + economic_impact_analysis(baseline, reform, strategy) + + baseline.ensure.assert_called_once() + reform.ensure.assert_called_once() + mock_decile.assert_called_once() + mock_intra.assert_called_once() + mock_budget.assert_called_once() + mock_prog.assert_called_once() + strategy.compute_poverty.assert_called_once_with(baseline, reform) + strategy.compute_inequality.assert_called_once_with(baseline, reform) + + +# --------------------------------------------------------------------------- +# compute_budget_summary tests +# --------------------------------------------------------------------------- + + +def _make_budget_sim(variable_data: dict, variables: list) -> MagicMock: + """Create a mock simulation for budget summary testing.""" + sim = MagicMock() + sim.output_dataset.data.household = MicroDataFrame( + pd.DataFrame(variable_data), + weights="household_weight", + ) + sim.id = "test-budget-sim" + sim.tax_benefit_model_version.variables = variables + + def get_variable(name): + for v in variables: + if v.name == name: + return v + raise ValueError(f"Variable '{name}' not found in model") + + sim.tax_benefit_model_version.get_variable = get_variable + return sim + + +def test_compute_budget_summary_looks_up_entity_from_tbm(): + """compute_budget_summary should resolve entity from TBM, not from caller.""" + variables = [ + _make_variable_mock("household_tax", "household"), + _make_variable_mock("household_benefits", "household"), + ] + sim = _make_budget_sim( + { + "household_tax": [5000.0], + "household_benefits": [2000.0], + "household_weight": [1.0], + }, + variables, + ) + + # Patch BudgetSummaryItem + OutputCollection to bypass Pydantic validation + with ( + patch("policyengine.outputs.budget_summary.BudgetSummaryItem") as MockBSI, + patch("policyengine.outputs.budget_summary.OutputCollection"), + ): + MockBSI.return_value = MagicMock() + compute_budget_summary(sim, sim, ["household_tax", "household_benefits"]) + + assert MockBSI.call_count == 2 + calls = MockBSI.call_args_list + assert calls[0].kwargs["entity"] == "household" + assert calls[1].kwargs["entity"] == "household" + + +def test_compute_budget_summary_variable_not_found(): + """compute_budget_summary should raise ValueError for unknown variable.""" + variables = [_make_variable_mock("household_tax", "household")] + sim = _make_budget_sim( + {"household_tax": [5000.0], "household_weight": [1.0]}, + variables, + ) + + with pytest.raises(ValueError, match="not found in model"): + compute_budget_summary(sim, sim, ["nonexistent_variable"]) + + +# --------------------------------------------------------------------------- +# DecileImpact tests +# --------------------------------------------------------------------------- + + +def test_decile_impact_variable_not_found(): + """DecileImpact.run() should raise ValueError for a nonexistent variable.""" + variables = [_make_variable_mock("household_net_income", "household")] + sim = _make_sim( + {"household_net_income": [50000.0], "household_weight": [1.0]}, + variables=variables, + ) + + di = DecileImpact.model_construct( + baseline_simulation=sim, + reform_simulation=sim, + income_variable="nonexistent_variable", + entity="household", + decile=1, + ) + with pytest.raises(ValueError, match="not found in model"): + di.run() + + +def test_compute_decile_impacts_returns_10(): + """compute_decile_impacts should return 10 DecileImpact objects by default.""" + n = 100 + incomes = np.linspace(10000, 100000, n) + reform_incomes = incomes + 500 + variables = [_make_variable_mock("household_net_income", "household")] + + baseline = _make_sim( + {"household_net_income": incomes, "household_weight": np.ones(n)}, + variables=variables, + ) + reform = _make_sim( + {"household_net_income": reform_incomes, "household_weight": np.ones(n)}, + variables=variables, + ) + + result = compute_decile_impacts( + baseline, reform, income_variable="household_net_income", entity="household" + ) + + assert len(result.outputs) == 10 + assert len(result.dataframe) == 10 + + # Each decile should have absolute_change ~500 + for di in result.outputs: + assert abs(di.absolute_change - 500.0) < 1e-6 + + +def test_compute_decile_impacts_custom_quantiles(): + """compute_decile_impacts with quantiles=5 should return 5 outputs.""" + n = 100 + incomes = np.linspace(10000, 100000, n) + variables = [_make_variable_mock("household_net_income", "household")] + + sim = _make_sim( + {"household_net_income": incomes, "household_weight": np.ones(n)}, + variables=variables, + ) + + result = compute_decile_impacts( + sim, + sim, + income_variable="household_net_income", + entity="household", + quantiles=5, + ) + + assert len(result.outputs) == 5 + + +# --------------------------------------------------------------------------- +# ChangeAggregate error test +# --------------------------------------------------------------------------- + + +def test_change_aggregate_variable_not_found(): + """ChangeAggregate should raise ValueError for a nonexistent variable.""" + variables = [_make_variable_mock("employment_income", "person")] + sim = _make_sim( + {"household_net_income": [50000.0], "household_weight": [1.0]}, + variables=variables, + ) + + ca = ChangeAggregate.model_construct( + baseline_simulation=sim, + reform_simulation=sim, + variable="nonexistent_variable", + aggregate_type=ChangeAggregateType.COUNT, + ) + with pytest.raises(ValueError, match="not found in model"): + ca.run()