Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 10 additions & 10 deletions modal_app/worker_script.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ def main():
sys.stdout = sys.stderr

from policyengine_us_data.calibration.publish_local_area import (
build_h5,
build_output_dataset,
NYC_COUNTIES,
NYC_CDS,
AT_LARGE_DISTRICTS,
Expand Down Expand Up @@ -104,11 +104,11 @@ def main():
continue
states_dir = output_dir / "states"
states_dir.mkdir(parents=True, exist_ok=True)
path = build_h5(
result = build_output_dataset(
weights=weights,
geography=geography,
dataset_path=dataset_path,
output_path=states_dir / f"{item_id}.h5",
output_base=states_dir / item_id,
cd_subset=cd_subset,
takeup_filter=takeup_filter,
)
Expand Down Expand Up @@ -147,11 +147,11 @@ def main():

districts_dir = output_dir / "districts"
districts_dir.mkdir(parents=True, exist_ok=True)
path = build_h5(
result = build_output_dataset(
weights=weights,
geography=geography,
dataset_path=dataset_path,
output_path=districts_dir / f"{friendly_name}.h5",
output_base=districts_dir / friendly_name,
cd_subset=[geoid],
takeup_filter=takeup_filter,
)
Expand All @@ -166,11 +166,11 @@ def main():
continue
cities_dir = output_dir / "cities"
cities_dir.mkdir(parents=True, exist_ok=True)
path = build_h5(
result = build_output_dataset(
weights=weights,
geography=geography,
dataset_path=dataset_path,
output_path=cities_dir / "NYC.h5",
output_base=cities_dir / "NYC",
cd_subset=cd_subset,
county_filter=NYC_COUNTIES,
takeup_filter=takeup_filter,
Expand All @@ -179,16 +179,16 @@ def main():
elif item_type == "national":
national_dir = output_dir / "national"
national_dir.mkdir(parents=True, exist_ok=True)
path = build_h5(
result = build_output_dataset(
weights=weights,
geography=geography,
dataset_path=dataset_path,
output_path=national_dir / "US.h5",
output_base=national_dir / "US",
)
else:
raise ValueError(f"Unknown item type: {item_type}")

if path:
if result:
results["completed"].append(f"{item_type}:{item_id}")
print(
f"Completed {item_type}:{item_id}",
Expand Down
130 changes: 80 additions & 50 deletions policyengine_us_data/calibration/publish_local_area.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,17 @@
python publish_local_area.py [--skip-download] [--states-only] [--upload]
"""

import os
import numpy as np
from pathlib import Path
from typing import List

from policyengine_us import Microsimulation
from policyengine_us_data.utils.hdfstore import (
DatasetResult,
save_h5,
save_hdfstore,
)
from policyengine_us_data.utils.huggingface import download_calibration_inputs
from policyengine_us_data.utils.data_upload import (
upload_local_area_file,
Expand Down Expand Up @@ -105,39 +111,39 @@ def record_completed_city(city_name: str):
f.write(f"{city_name}\n")


def build_h5(
def build_output_dataset(
weights: np.ndarray,
geography,
dataset_path: Path,
output_path: Path,
output_base: Path,
cd_subset: List[str] = None,
county_filter: set = None,
takeup_filter: List[str] = None,
) -> Path:
"""Build an H5 file by cloning records for each nonzero weight.
) -> DatasetResult:
"""Assemble a dataset and serialize to h5py + HDFStore.

Args:
weights: Clone-level weight vector, shape (n_clones_total * n_hh,).
geography: GeographyAssignment from assign_random_geography.
dataset_path: Path to base dataset H5 file.
output_path: Where to write the output H5 file.
output_base: Path stem **without** file extension.
Serializers append ``.h5`` and ``.hdfstore.h5``.
cd_subset: If provided, only include clones for these CDs.
county_filter: If provided, scale weights by P(target|CD)
for city datasets.
takeup_filter: List of takeup vars to apply.

Returns:
Path to the output H5 file.
A :class:`DatasetResult` with the assembled data.
"""
import h5py
from collections import defaultdict
from policyengine_core.enums import Enum
from policyengine_us.variables.household.demographic.geographic.county.county_enum import (
County,
)

output_path = Path(output_path)
output_path.parent.mkdir(parents=True, exist_ok=True)
output_base = Path(output_base)
output_base.parent.mkdir(parents=True, exist_ok=True)

blocks = np.asarray(geography.block_geoid)
clone_cds = np.asarray(geography.cd_geoid, dtype=str)
Expand Down Expand Up @@ -182,7 +188,7 @@ def build_h5(
else f"{n_clones_total} clone rows"
)
print(f"\n{'=' * 60}")
print(f"Building {output_path.name} ({label}, {n_hh} households)")
print(f"Building {output_base.name} ({label}, {n_hh} households)")
print(f"{'=' * 60}")

# === Identify active clones ===
Expand Down Expand Up @@ -539,31 +545,16 @@ def build_h5(
for var_name, bools in takeup_results.items():
data[var_name] = {time_period: bools}

# === Write H5 ===
with h5py.File(str(output_path), "w") as f:
for variable, periods in data.items():
grp = f.create_group(variable)
for period, values in periods.items():
grp.create_dataset(str(period), data=values)

print(f"\nH5 saved to {output_path}")

with h5py.File(str(output_path), "r") as f:
tp = str(time_period)
if "household_id" in f and tp in f["household_id"]:
n = len(f["household_id"][tp][:])
print(f"Verified: {n:,} households in output")
if "person_id" in f and tp in f["person_id"]:
n = len(f["person_id"][tp][:])
print(f"Verified: {n:,} persons in output")
if "household_weight" in f and tp in f["household_weight"]:
hw = f["household_weight"][tp][:]
print(f"Total population (HH weights): {hw.sum():,.0f}")
if "person_weight" in f and tp in f["person_weight"]:
pw = f["person_weight"][tp][:]
print(f"Total population (person weights): {pw.sum():,.0f}")

return output_path
# === Serialize ===
result = DatasetResult(
data=data,
time_period=time_period,
system=sim.tax_benefit_system,
)
save_h5(result, str(output_base))
save_hdfstore(result, str(output_base))

return result


AT_LARGE_DISTRICTS = {0, 98}
Expand Down Expand Up @@ -613,22 +604,35 @@ def build_states(
print(f"No CDs found for {state_code}, skipping")
continue

output_path = states_dir / f"{state_code}.h5"
output_base = states_dir / state_code

try:
build_h5(
build_output_dataset(
weights=w,
geography=geography,
dataset_path=dataset_path,
output_path=output_path,
output_base=output_base,
cd_subset=cd_subset,
takeup_filter=takeup_filter,
)

h5_path = str(output_base) + ".h5"
hdfstore_path = str(output_base) + ".hdfstore.h5"
has_hdfstore = os.path.exists(hdfstore_path)

if upload:
print(f"Uploading {state_code}.h5 to GCP...")
upload_local_area_file(str(output_path), "states", skip_hf=True)
hf_queue.append((str(output_path), "states"))
upload_local_area_file(h5_path, "states", skip_hf=True)

if has_hdfstore:
print(f"Uploading {state_code}.hdfstore.h5 to GCP...")
upload_local_area_file(
hdfstore_path, "states_hdfstore", skip_hf=True
)

hf_queue.append((h5_path, "states"))
if has_hdfstore:
hf_queue.append((hdfstore_path, "states_hdfstore"))

record_completed_state(state_code)
print(f"Completed {state_code}")
Expand Down Expand Up @@ -680,23 +684,36 @@ def build_districts(
print(f"Skipping {friendly_name} (already completed)")
continue

output_path = districts_dir / f"{friendly_name}.h5"
output_base = districts_dir / friendly_name
print(f"\n[{i + 1}/{len(all_cds)}] Building {friendly_name}")

try:
build_h5(
build_output_dataset(
weights=w,
geography=geography,
dataset_path=dataset_path,
output_path=output_path,
output_base=output_base,
cd_subset=[cd_geoid],
takeup_filter=takeup_filter,
)

h5_path = str(output_base) + ".h5"
hdfstore_path = str(output_base) + ".hdfstore.h5"
has_hdfstore = os.path.exists(hdfstore_path)

if upload:
print(f"Uploading {friendly_name}.h5 to GCP...")
upload_local_area_file(str(output_path), "districts", skip_hf=True)
hf_queue.append((str(output_path), "districts"))
upload_local_area_file(h5_path, "districts", skip_hf=True)

if has_hdfstore:
print(f"Uploading {friendly_name}.hdfstore.h5 to GCP...")
upload_local_area_file(
hdfstore_path, "districts_hdfstore", skip_hf=True
)

hf_queue.append((h5_path, "districts"))
if has_hdfstore:
hf_queue.append((hdfstore_path, "districts_hdfstore"))

record_completed_district(friendly_name)
print(f"Completed {friendly_name}")
Expand Down Expand Up @@ -743,23 +760,36 @@ def build_cities(
if not cd_subset:
print("No NYC-related CDs found, skipping")
else:
output_path = cities_dir / "NYC.h5"
output_base = cities_dir / "NYC"

try:
build_h5(
build_output_dataset(
weights=w,
geography=geography,
dataset_path=dataset_path,
output_path=output_path,
output_base=output_base,
cd_subset=cd_subset,
county_filter=NYC_COUNTIES,
takeup_filter=takeup_filter,
)

h5_path = str(output_base) + ".h5"
hdfstore_path = str(output_base) + ".hdfstore.h5"
has_hdfstore = os.path.exists(hdfstore_path)

if upload:
print("Uploading NYC.h5 to GCP...")
upload_local_area_file(str(output_path), "cities", skip_hf=True)
hf_queue.append((str(output_path), "cities"))
upload_local_area_file(h5_path, "cities", skip_hf=True)

if has_hdfstore:
print("Uploading NYC.hdfstore.h5 to GCP...")
upload_local_area_file(
hdfstore_path, "cities_hdfstore", skip_hf=True
)

hf_queue.append((h5_path, "cities"))
if has_hdfstore:
hf_queue.append((hdfstore_path, "cities_hdfstore"))

record_completed_city("NYC")
print("Completed NYC")
Expand Down
20 changes: 10 additions & 10 deletions policyengine_us_data/calibration/stacked_dataset_builder.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
"""
CLI for creating CD-stacked datasets from calibration artifacts.

Thin wrapper around build_h5/build_states/build_districts/build_cities
Thin wrapper around build_output_dataset/build_states/build_districts/build_cities
in publish_local_area.py. Loads a GeographyAssignment from geography.npz
and delegates all H5 building logic.
"""
Expand All @@ -19,7 +19,7 @@

from policyengine_us import Microsimulation
from policyengine_us_data.calibration.publish_local_area import (
build_h5,
build_output_dataset,
build_states,
build_districts,
build_cities,
Expand Down Expand Up @@ -111,13 +111,13 @@

# === Dispatch ===
if mode == "national":
output_path = output_dir / "US.h5"
print(f"\nCreating national dataset: {output_path}")
build_h5(
output_base = output_dir / "US"
print(f"\nCreating national dataset: {output_base}")
build_output_dataset(
weights=w,
geography=geography,
dataset_path=dataset_path,
output_path=output_path,
output_base=output_base,
takeup_filter=takeup_filter,
)

Expand Down Expand Up @@ -160,13 +160,13 @@
calibrated_cds = sorted(set(cd_geoid))
if args.cd not in calibrated_cds:
raise ValueError(f"CD {args.cd} not in calibrated CDs")
output_path = output_dir / f"{args.cd}.h5"
print(f"\nCreating single CD dataset: {output_path}")
build_h5(
output_base = output_dir / args.cd
print(f"\nCreating single CD dataset: {output_base}")
build_output_dataset(
weights=w,
geography=geography,
dataset_path=dataset_path,
output_path=output_path,
output_base=output_base,
cd_subset=[args.cd],
takeup_filter=takeup_filter,
)
Expand Down
1 change: 0 additions & 1 deletion policyengine_us_data/db/etl_age.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,6 @@ def transform_age_data(age_data, docs):


def load_age_data(df_long, geo, year):

# Quick data quality check before loading ----
if geo == "National":
assert len(set(df_long.ucgid_str)) == 1
Expand Down
1 change: 0 additions & 1 deletion policyengine_us_data/db/etl_irs_soi.py
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,6 @@ def extract_soi_data() -> pd.DataFrame:


def transform_soi_data(raw_df):

TARGETS = [
dict(code="59661", name="eitc", breakdown=("eitc_child_count", 0)),
dict(code="59662", name="eitc", breakdown=("eitc_child_count", 1)),
Expand Down
1 change: 0 additions & 1 deletion policyengine_us_data/db/etl_medicaid.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,6 @@ def transform_survey_medicaid_data(cd_survey_df):


def load_medicaid_data(long_state, long_cd, year):

DATABASE_URL = f"sqlite:///{STORAGE_FOLDER / 'calibration' / 'policy_data.db'}"
engine = create_engine(DATABASE_URL)

Expand Down
1 change: 0 additions & 1 deletion policyengine_us_data/db/etl_snap.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,6 @@ def transform_survey_snap_data(raw_df):


def load_administrative_snap_data(df_states, year):

DATABASE_URL = f"sqlite:///{STORAGE_FOLDER / 'calibration' / 'policy_data.db'}"
engine = create_engine(DATABASE_URL)

Expand Down
Loading
Loading