diff --git a/pyproject.toml b/pyproject.toml index 693bbc0..3f66e27 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -54,6 +54,7 @@ dependencies = [ "xlrd >=2.0.1", # For reading Excel files "iso639-lang >=2.2.3", # For checking ISO639-3 language codes "aiohttp >=3.9.5", # Async HTTP + "more-itertools >= 10.3.0", # For batched iterator ] diff --git a/python/popgetter/__init__.py b/python/popgetter/__init__.py index 8fca310..e769339 100644 --- a/python/popgetter/__init__.py +++ b/python/popgetter/__init__.py @@ -17,11 +17,15 @@ AzureGeoIOManager, AzureMetadataIOManager, AzureMetricsIOManager, + AzureMetricsMetadataIOManager, + AzureMetricsPartitionedIOManager, ) from popgetter.io_managers.local import ( LocalGeoIOManager, LocalMetadataIOManager, LocalMetricsIOManager, + LocalMetricsMetadataIOManager, + LocalMetricsPartitionedIOManager, ) from popgetter.utils import StagingDirResource @@ -81,12 +85,16 @@ def resources_by_env(): "geometry_io_manager": AzureGeoIOManager(), "metrics_io_manager": AzureMetricsIOManager(), "azure_general_io_manager": AzureGeneralIOManager(".bin"), + "metrics_partitioned_io_manager": AzureMetricsPartitionedIOManager(), + "metrics_metadata_io_manager": AzureMetricsMetadataIOManager(), } if PROD else { "metadata_io_manager": LocalMetadataIOManager(), "geometry_io_manager": LocalGeoIOManager(), "metrics_io_manager": LocalMetricsIOManager(), + "metrics_partitioned_io_manager": LocalMetricsPartitionedIOManager(), + "metrics_metadata_io_manager": LocalMetricsMetadataIOManager(), } ) @@ -106,6 +114,9 @@ def resources_by_env(): cloud_outputs.metadata_sensor, cloud_outputs.geometry_sensor, cloud_outputs.metrics_sensor, + cloud_outputs.metrics_metadata_sensor, + # TODO: commented out until implemented for partitioned assets + # cloud_outputs.metrics_partitioned_sensor, ], resources=resources, jobs=jobs, diff --git a/python/popgetter/assets/__init__.py b/python/popgetter/assets/__init__.py index 655c9b0..84d63a4 100644 --- a/python/popgetter/assets/__init__.py +++ b/python/popgetter/assets/__init__.py @@ -1,9 +1,9 @@ from __future__ import annotations -from . import bel, gb_eaw, gb_nir, gb_sct, uk, us +from . import bel, gb_eaw, gb_nir, gb_sct, uk, usa countries = [ - (mod, mod.__name__.split(".")[-1]) for mod in [bel, gb_nir, uk, us, gb_eaw, gb_sct] + (mod, mod.__name__.split(".")[-1]) for mod in [bel, gb_nir, uk, usa, gb_eaw, gb_sct] ] __all__ = ["countries"] diff --git a/python/popgetter/assets/us/__init__.py b/python/popgetter/assets/us/__init__.py deleted file mode 100644 index 28af083..0000000 --- a/python/popgetter/assets/us/__init__.py +++ /dev/null @@ -1,213 +0,0 @@ -from __future__ import annotations - -import os -import tempfile -import urllib -from pathlib import Path - -import docker -import geopandas as gp -import pandas as pd -from dagster import ( - AssetOut, - DynamicPartitionsDefinition, - asset, - multi_asset, -) - -from .config import ACS_METADATA, SUMMARY_LEVELS - -year = 2019 -summary_level = "fiveYear" - - -@asset(key_prefix="us2", name="non_unique_name") -def non_unique_name(): - return "USA" - - -# @asset(key_prefix="us", name="non_unique_name_2") -# def non_unique_name_2(unique_name): -# return f"country name is: '{unique_name}'" - -raw_table_files_partition = DynamicPartitionsDefinition(name="raw_table_files") - -geo_dir = tempfile.mkdtemp() -# -# tractDir = os.path.join(workdir,"tracts") -# blockGroupDir = os.path.join(workdir,"block_groups") -# countyDir = os.path.join(workdir,"counties") -# -# os.mkdir(blockGroupDir) -# os.mkdir(tractDir) -# os.mkdir(countyDir) - - -def get_summary_table(table_name: str, year: int, summary_level: str): - base = ACS_METADATA[year]["base"] - summary_file_dir = base + ACS_METADATA[year][summary_level]["tables"] - data = pd.read_csv(f"{summary_file_dir}/{table_name}", sep="|") - return data - - -def extract_values_at_specified_levels(df: pd.DataFrame, geoids: pd.DataFrame): - joined = pd.merge( - df, - geoids[["DADSID", "SUMLEVEL"]], - left_on="GEO_ID", - right_on="DADSID", - how="left", - ) - result = {} - - for level, id in SUMMARY_LEVELS.items(): - result[level] = joined[joined["SUMLEVEL"] == id].drop( - ["DADSID", "SUMLEVEL"], axis=1 - ) - return result - - -def merge_parquet_files(file_names): - result = pd.DataFrame() - for batch in tqdm(batched(file_names, 20)): - newDFS = [ - select_estimates(pd.read_parquet(file).set_index("GEO_ID")) - for file in batch - ] - result = pd.concat([result] + newDFS, axis=1) - return result - - -@asset -def generate_variable_dictionary(): - metadata = ACS_METADATA[year] - base = metadata["base"] - shells = metadata[summary_level]["shells"] - raw = pd.read_csv(base + shells, encoding="latin") - result = ( - [] - ) # pd.DataFrame(columns=["tableID","uniqueID", "universe","tableName", "variableName", "variableExtedndedName"]) - - universe = "" - tableName = "" - path = [] - previousWasEdge = True - for index, row in raw.iterrows(): - if (type(row["Table ID"]) == str and len(row["Table ID"].strip()) == 0) or type( - row["Table ID"] - ) == float: - # path=[] - # previousWasEdge = True - continue - - stub = row["Stub"] - - if row[["UniqueID"]].isna().all(): - if "Universe" in stub: - universe = stub.split("Universe:")[1].strip() - else: - tableName = stub - else: - if ":" in stub: - if previousWasEdge: - path.append(stub.replace(":", "")) - else: - path.pop() - path.append(stub.replace(":", "")) - else: - previousWasEdge = False - extendedName = "|".join(path) - if ":" not in stub: - extendedName = extendedName + "|" + stub - result.append( - { - "tableID": row["Table ID"], - "uniqueID": row["UniqueID"], - "universe": universe, - "variableName": stub, - "variableExtendedName": extendedName, - } - ) - - return pd.DataFrame.from_records(result) - - -@multi_asset( - outs={ - "tract_files": AssetOut(), - "blockGroup_files": AssetOut(), - "county_files": AssetOut(), - }, - partitions_def=raw_table_files_partition, -) -def aws_table_files(context, geometry_ids, summary_table_names): - base = ACS_METADATA[year]["base"] - summary_file_dir = base + ACS_METADATA[year][summary_level]["tables"] - - context.log.info("Trying to get partition name") - table = context.partition_key - context.log.info(f"table is {table}") - - data = get_summary_table(table, year, summary_level) - - context.log.info("got summary table") - values = extract_values_at_specified_levels(data, geometry_ids) - context.log.info("extracted values ", values) - # values['tract'].to_parquet(os.path.join(tractDir,table.replace(".dat",".parquet"))) - # values['county'].to_parquet(os.path.join(countyDir,table.replace(".dat",".parquet"))) - # values['block_group'].to_parquet(os.path.join(blockGroupDir,table.replace(".dat",".parquet"))) - - return values["tract"], values["county"], values["block_group"] - - -# @asset -# def merge_parquet_files(tract_parquet_files, blockGroup_parquet_files, county_parquet_files): -# # merge_parquet_files([os.path.join(countyDir,file) for file in os.listdir(countyDir)]).to_parquet(f"county_{year}_{summary_level}.parquet" -# merge_parquet_files([os.path.join(tract_parquet_files,file) for file in os.listdir(tractDir)]).to_parquet(f"tracts_{year}_{summary_level}.parquet") -# # merge_parquet_files([os.path.join(blockGroupDir,file) for file in os.listdir(blockGroupDir)]).to_parquet(f"block_groups_{year}_{summary_level}.parquet") - - -# -# @asset -# def cartographic_file() -# metadata = ACS_METADATA[year] -# url = metadata['geors'][admin_level] -# if(work_dir == None): -# work_dir = os.tmpdir() -# local_dir = os.path.join(work_dir, admin_level+".zip") -# urllib.request.urlretrieve(url, local_dir) -# return local_dir -# -@asset -def geometry_ids(): - path = ACS_METADATA[year]["base"] + ACS_METADATA[year][summary_level]["geoIds"] - sep = ACS_METADATA[year]["geoIdsSep"] if "geoIdsSep" in ACS_METADATA[year] else "," - table = pd.read_csv(path, encoding="latin", sep=sep) - return table - - -# -# -@asset -def summary_table_names(context): - metadata = ACS_METADATA[year] - base = metadata["base"] - table_path = base + metadata[summary_level]["tables"] - - table = pd.read_html(table_path)[0] - filtered = table[table["Name"].str.startswith("acs", na=False)] - - partition = context.instance.get_dynamic_partitions("raw_table_files") - - # [context.instance.delete_dynamic_partition('raw_table_files', part) for part in parts_to_del] - - context.instance.add_dynamic_partitions("raw_table_files", list(filtered["Name"])) - - # for name in filtered["Name"] - # raw_table_files_partition.build_add_request(name) - - return list(filtered["Name"]) - - - - diff --git a/python/popgetter/assets/us/config.py b/python/popgetter/assets/us/config.py deleted file mode 100644 index d70c5db..0000000 --- a/python/popgetter/assets/us/config.py +++ /dev/null @@ -1,86 +0,0 @@ - -SUMMARY_LEVELS={ - "tract": 140 , - "block_group":150, - "county":50 -} - -ACS_METADATA={ - ## 2018 needs additional logic for joining together geometry files - ## 2018:{ - ## base:"https://www2.census.gov/programs-surveys/acs/summary_file/2018", - ## type:'table', - ## geoms:{ - ## tract:, - ## block:"", - ## county:"https://www2.census.gov/geo/tiger/GENZ2018/shp/cb_2018_us_county_500k.zip": - ## } - ## oneYear:{ - ## tables:"prototype/1YRData/", - ## geoIds:"https://www2.census.gov/programs-surveys/acs/summary_file/2018/prototype/20181YRGeos.csv", - ## shells:"prototype/ACS2018_Table_Shells.xlsx" - ## }, - ## fiveYear:{ - ## tables:"prototype/5YRData/", - ## geoIds:"https://www2.census.gov/programs-surveys/acs/summary_file/2018/prototype/20185YRGeos.csv", - ## shells:"prototype/ACS2018_Table_Shells.xlsx" - ## } - ## }, - 2019:{ - "base" : "https://www2.census.gov/programs-surveys/acs/summary_file/2019/", - "type":'table', - "geoms":{ - "tracts": "https://www2.census.gov/geo/tiger/GENZ2019/shp/cb_2019_us_tract_500k.zip", - "blockGroups":"https://www2.census.gov/geo/tiger/GENZ2019/shp/cb_2019_us_bg_500k.zip", - "county": "https://www2.census.gov/geo/tiger/GENZ2019/shp/cb_2019_us_county_500k.zip" - }, - "oneYear":{ - "tables":"prototype/1YRData/", - "geoIds":"prototype/Geos20191YR.csv", - "shells":"prototype/ACS2019_Table_Shells.csv" - }, - "fiveYear":{ - "tables":"prototype/5YRData/", - "geoIds":"prototype/Geos20195YR.csv", - "shells":"prototype/ACS2019_Table_Shells.csv" - } - }, - # Note 1 year esimates are not avaliable because of covid. More details of the exeprimental estimates - # are here : https://www.census.gov/programs-surveys/acs/technical-documentation/table-and-geography-changes/2020/1-year.html - 2020:{ - "base":"https://www2.census.gov/programs-surveys/acs/summary_file/2020/", - "type":'table', - "geoms":{ - "tracts":"https://www2.census.gov/geo/tiger/GENZ2020/shp/cb_2020_us_tract_500k.zip", - "blockGroups":"https://www2.census.gov/geo/tiger/GENZ2020/shp/cb_2020_us_bg_500k.zip", - "county":"https://www2.census.gov/geo/tiger/GENZ2020/shp/cb_2020_us_county_500k.zip" - }, - "shells":"prototype/ACS2020_Table_Shells.csv", - "oneYear":None, - "fiveYear":{ - "shells":"prototype/ACS2020_Table_Shells.csv", - "tables":"prototype/5YRData/", - "geoIds":"prototype/Geos20205YR.csv", - } - }, - 2021:{ - "base":"https://www2.census.gov/programs-surveys/acs/summary_file/2021/table-based-SF/", - "type":'table', - "geoIdsSep":"|", - "geoms":{ - "tracts":"https://www2.census.gov/geo/tiger/GENZ2021/shp/cb_2021_us_tract_500k.zip", - "blockGroups":"https://www2.census.gov/geo/tiger/GENZ2021/shp/cb_2021_us_bg_500k.zip", - "county":"https://www2.census.gov/geo/tiger/GENZ2021/shp/cb_2021_us_county_500k.zip" - }, - "oneYear":{ - "tables":"data/1YRData/", - "geoIds":"documentation/Geos20211YR.txt", - "shells":"documentation/ACS20211YR_Table_Shells.txt" - }, - "fiveYear":{ - "tables":"data/5YRData/", - "geoIds":"documentation/Geos20215YR.txt", - "shells":"documentation/ACS20215YR_Table_Shells.txt" - } - } -} diff --git a/python/popgetter/assets/usa/__init__.py b/python/popgetter/assets/usa/__init__.py new file mode 100644 index 0000000..1adb7cd --- /dev/null +++ b/python/popgetter/assets/usa/__init__.py @@ -0,0 +1,818 @@ +from dataclasses import dataclass +import itertools +from popgetter.assets.country import Country +from typing import Callable +from popgetter.metadata import ( + CountryMetadata, + DataPublisher, + GeometryMetadata, + MetricMetadata, + SourceDataRelease, + metadata_to_dataframe, +) +from popgetter.utils import add_metadata, markdown_from_plot +import geopandas as gpd +from popgetter.cloud_outputs import ( + GeometryOutput, + MetricsOutput, + send_to_metrics_metadata_sensor, + send_to_metrics_partitioned_sensor, +) +import pandas as pd +from typing import Any, ClassVar, Tuple +from dagster import MetadataValue, asset, multi_asset, AssetOut, AssetIn +from functools import reduce +from .census_tasks import ( + get_geom_ids_table_for_summary, + generate_variable_dictionary, + generate_variable_dictionary, + get_geom_ids_table_for_summary, + get_summary_table_file_names, + get_summary_table, + extract_values_at_specified_levels, + generate_variable_dictionary, + select_estimates, + select_errors, +) +from datetime import date +from more_itertools import batched +from icecream import ic +from popgetter.metadata import COL +from .census_tasks import ACS_METADATA, SUMMARY_LEVELS + +SUMMARY_LEVEL_STRINGS = ["oneYear", "fiveYear"] +GEOMETRY_COL = "AFFGEOID" +METRICS_COL = "GEO_ID" + +# For testing +# REQUIRED_TABLES = [ +# "acsdt1y2019-b01001.dat", +# "acsdt1y2019-b01001.dat", +# "acsdt1y2019-b01001a.dat", +# "acsdt1y2019-b01001b.dat", +# "acsdt1y2019-b01001d.dat", +# ] +# BATCH_SIZE = 2 + +# For prod +REQUIRED_TABLES = None +BATCH_SIZE = 10 + + +@dataclass +class DerivedColumn: + hxltag: str + extended_variable_names: list[str] + filter_func: Callable[[pd.DataFrame, list[str]], pd.DataFrame] + output_column_name: str + human_readable_name: str + table_id: str + + +INFANTS = ["Total|Male|Under 5 years", "Total|Female|Under 5 years"] +CHILDREN = [ + "Total|Male|Under 5 years", + "Total|Male|5 to 9 years", + "Total|Male|10 to 14 years", + "Total|Male|15 to 17 years", + "Total|Female|Under 5 years", + "Total|Female|5 to 9 years", + "Total|Female|10 to 14 years", + "Total|Female|15 to 17 years", +] +CHILDREN_5_TO_17 = [ + "Total|Male|5 to 9 years", + "Total|Male|10 to 14 years", + "Total|Male|15 to 17 years", + "Total|Female|5 to 9 years", + "Total|Female|10 to 14 years", + "Total|Female|15 to 17 years", +] +ADULTS_MALE = [ + "Total|Male|18 and 19 years", + "Total|Male|20 years", + "Total|Male|21 years", + "Total|Male|22 to 24 years", + "Total|Male|25 to 29 years", + "Total|Male|30 to 34 years", + "Total|Male|35 to 39 years", + "Total|Male|40 to 44 years", + "Total|Male|45 to 49 years", + "Total|Male|50 to 54 years", + "Total|Male|55 to 59 years", + "Total|Male|60 and 61 years", + "Total|Male|62 to 64 years", + "Total|Male|65 and 66 years", + "Total|Male|67 to 69 years", + "Total|Male|70 to 74 years", + "Total|Male|75 to 79 years", + "Total|Male|80 to 84 years", + "Total|Male|85 years and over", +] +ADULTS_FEMALE = [ + "Total|Female|18 and 19 years", + "Total|Female|20 years", + "Total|Female|21 years", + "Total|Female|22 to 24 years", + "Total|Female|25 to 29 years", + "Total|Female|30 to 34 years", + "Total|Female|35 to 39 years", + "Total|Female|40 to 44 years", + "Total|Female|45 to 49 years", + "Total|Female|50 to 54 years", + "Total|Female|55 to 59 years", + "Total|Female|60 and 61 years", + "Total|Female|62 to 64 years", + "Total|Female|65 and 66 years", + "Total|Female|67 to 69 years", + "Total|Female|70 to 74 years", + "Total|Female|75 to 79 years", + "Total|Female|80 to 84 years", + "Total|Female|85 years and over", +] +ADULTS = ADULTS_MALE + ADULTS_FEMALE +INDIVIDUALS = ["Total"] + + +# Config for each partition to be derived +age_code = "`Age Code`" +sex_label = "`Sex Label`" +DERIVED_COLUMNS = [ + DerivedColumn( + hxltag="#population+children+age5_17", + filter_func=lambda df, cols: df[cols].sum(axis=1).to_frame(), + extended_variable_names=CHILDREN_5_TO_17, + output_column_name="children_5_17", + human_readable_name="Children aged 5 to 17", + table_id="B01001", + ), + DerivedColumn( + hxltag="#population+infants+age0_4", + extended_variable_names=INFANTS, + filter_func=lambda df, cols: df[cols].sum(axis=1).to_frame(), + output_column_name="infants_0_4", + human_readable_name="Infants aged 0 to 4", + table_id="B01001", + ), + DerivedColumn( + hxltag="#population+children+age0_17", + extended_variable_names=CHILDREN, + filter_func=lambda df, cols: df[cols].sum(axis=1).to_frame(), + output_column_name="children_0_17", + human_readable_name="Children aged 0 to 17", + table_id="B01001", + ), + DerivedColumn( + hxltag="#population+adults+f", + extended_variable_names=ADULTS_FEMALE, + filter_func=lambda df, cols: df[cols].sum(axis=1).to_frame(), + output_column_name="adults_f", + human_readable_name="Female adults", + table_id="B01001", + ), + DerivedColumn( + hxltag="#population+adults+m", + extended_variable_names=ADULTS_MALE, + filter_func=lambda df, cols: df[cols].sum(axis=1).to_frame(), + output_column_name="adults_m", + human_readable_name="Male adults", + table_id="B01001", + ), + DerivedColumn( + hxltag="#population+adults", + extended_variable_names=ADULTS, + filter_func=lambda df, cols: df[cols].sum(axis=1).to_frame(), + output_column_name="adults", + human_readable_name="Adults", + table_id="B01001", + ), + DerivedColumn( + hxltag="#population+ind", + extended_variable_names=INDIVIDUALS, + filter_func=lambda df, cols: df[cols].sum(axis=1).to_frame(), + output_column_name="individuals", + human_readable_name="Total individuals", + table_id="B01001", + ), +] + +DERIVED_COLUMN_SPECIFICATIONS: dict[str, list[DerivedColumn]] = { + f"{year}/{summary_level}/{geo_level}/0": DERIVED_COLUMNS + for year, summary_level, geo_level in itertools.product( + ["2019", "2020", "2021"], + ["oneYear", "fiveYear"], + ["tract", "block_group", "county"], + ) +} + + +class USA(Country): + country_metadata: ClassVar[CountryMetadata] = CountryMetadata( + name_short_en="United States", + name_official="United States of America", + iso2="US", + iso3="USA", + iso3166_2=None, + ) + required_tables: list[str] | None = None + + def _country_metadata(self, _context) -> CountryMetadata: + return self.country_metadata + + def _data_publisher( + self, _context, country_metadata: CountryMetadata + ) -> DataPublisher: + return DataPublisher( + name="United States Census Bureau", + url="https://www.census.gov/programs-surveys/acs", + description=( + """ + The United States Census Bureau, officially the Bureau of the Census, + is a principal agency of the U.S. Federal Statistical System, responsible + for producing data about the American people and economy. + """ + ), + countries_of_interest=[country_metadata.id], + ) + + def _catalog(self, context) -> pd.DataFrame: + self.remove_all_partition_keys(context) + catalog_list = [] + for year, _ in ACS_METADATA.items(): + for summary_level in SUMMARY_LEVEL_STRINGS: + for geo_level in SUMMARY_LEVELS: + # If year and summary level has no data, skip it. + if (ACS_METADATA[year][summary_level] is None) or ( + summary_level == "oneYear" + and (geo_level == "tract" or geo_level == "block_group") + ): + continue + + table_names_list = [ + table_name + for table_name in get_summary_table_file_names( + year, summary_level + ) + if ( + REQUIRED_TABLES is not None + and table_name in REQUIRED_TABLES + ) + or REQUIRED_TABLES is None + ] + + table_names_list = list(batched(table_names_list, BATCH_SIZE)) + + # Catalog + table_names = pd.DataFrame({"table_names_batch": table_names_list}) + table_names["year"] = year + table_names["summary_level"] = summary_level + table_names["geo_level"] = geo_level + table_names["batch"] = range(len(table_names_list)) + table_names["partition_key"] = ( + table_names["year"].astype(str) + + "/" + + table_names["summary_level"].astype(str) + + "/" + + table_names["geo_level"].astype(str) + + "/" + + table_names["batch"].astype(str) + # .apply(lambda x: x.split(".")[0]) + ) + catalog_list.append(table_names) + + catalog = pd.concat(catalog_list, axis=0).reset_index(drop=True) + self.add_partition_keys(context, catalog["partition_key"].to_list()) + add_metadata(context, catalog, "Catalog") + return catalog + + def _geometry( + self, context + ) -> list[tuple[GeometryMetadata, gpd.GeoDataFrame, pd.DataFrame]]: + geometries_to_return = [] + for year, metadata in ACS_METADATA.items(): + context.log.debug(ic(year)) + context.log.debug(ic(metadata)) + names_col = ACS_METADATA[year]["geoIdCol"] + # Combine fiveYear and oneYear geoIDs + if year != 2020: + geo_ids5 = get_geom_ids_table_for_summary(year, "fiveYear") + geo_ids1 = get_geom_ids_table_for_summary(year, "oneYear") + geo_ids = ( + pd.concat( + [ + geo_ids5, + geo_ids1[~geo_ids1[names_col].isin(geo_ids5[names_col])], + ], + axis=0, + ) + .reset_index(drop=True) + .rename(columns={names_col: "GEO_ID", "NAME": "eng"}) + ) + else: + geo_ids = ( + get_geom_ids_table_for_summary(year, "fiveYear") + .rename(columns={names_col: "GEO_ID", "NAME": "eng"}) + .loc[:, ["GEO_ID", "eng"]] + ) + + for geo_level, url in metadata["geoms"].items(): + geometry_metadata = GeometryMetadata( + country_metadata=self.country_metadata, + validity_period_start=date(year, 1, 1), + validity_period_end=date(year, 1, 1), + level=geo_level, + # TODO: what should hxl_tag be? + hxl_tag=geo_level, + ) + + region_geometries_raw: gpd.GeoDataFrame = gpd.read_file(url) + + # Copy names + region_names = geo_ids.copy() + region_geometries_raw = region_geometries_raw.dissolve( + by=GEOMETRY_COL + ).reset_index() + + context.log.debug(ic(region_geometries_raw.head())) + context.log.debug(ic(region_geometries_raw.columns)) + region_geometries = region_geometries_raw.rename( + columns={GEOMETRY_COL: "GEO_ID"} + ).loc[:, ["geometry", "GEO_ID"]] + context.log.debug(ic(region_geometries.head())) + context.log.debug(ic(region_geometries.columns)) + + # TODO: Merge names. + # TODO: Check this step. Is this subsetting giving the correct GEO_IDs? + region_geometries = region_geometries.loc[ + region_geometries["GEO_ID"].isin(region_names["GEO_ID"]) + ] + + # Rename cols for names + region_names = region_names.loc[ + # Subset to row in geoms file + region_names["GEO_ID"].isin(region_geometries["GEO_ID"]), + ["GEO_ID", "eng"], + ].drop_duplicates() + context.log.debug(ic(region_names.head())) + context.log.debug(ic(region_names.columns)) + + geometries_to_return.append( + GeometryOutput( + metadata=geometry_metadata, + gdf=region_geometries, + names_df=region_names, + ) + ) + + return geometries_to_return + + def _source_data_releases( + self, _context, geometry, data_publisher + ) -> dict[str, SourceDataRelease]: + source_data_releases = {} + + idx = 0 + for year, metadata in ACS_METADATA.items(): + for _, url in metadata["geoms"].items(): + for summary_level in ["oneYear", "fiveYear"]: + geo = geometry[idx] + source_data_release: SourceDataRelease = SourceDataRelease( + name=( + f"ACS {year} 1 year" + if summary_level == "oneYear" + else f"ACS {year} 5 year" + ), + date_published=date(year, 1, 1), + reference_period_start=date(year, 1, 1), + reference_period_end=date(year, 1, 1), + collection_period_start=date(year, 1, 1), + collection_period_end=date(year, 1, 1), + expect_next_update=date(year, 1, 1), + url=url, + data_publisher_id=data_publisher.id, + description=""" + The American Community Survey (ACS) helps local officials, + community leaders, and businesses understand the changes + taking place in their communities. It is the premier source + for detailed population and housing information about our nation. + """, + geometry_metadata_id=geo.metadata.id, + ) + source_data_releases[ + f"{year}_{summary_level}_{geo.metadata.level}" + ] = source_data_release + idx += 1 + + return source_data_releases + + def _census_tables(self, context, catalog) -> pd.DataFrame: + partition = context.asset_partition_key_for_output() + ic(partition) + ic(catalog.loc[catalog["partition_key"].eq(partition), "table_names_batch"]) + row = catalog.loc[catalog["partition_key"].eq(partition), :] + table_names_batch = row.iloc[0]["table_names_batch"] + year = row.iloc[0].loc["year"] + summary_level = row.iloc[0].loc["summary_level"] + geo_level = row.iloc[0].loc["geo_level"] + + # TODO: generate as an asset to cache result per year and summary_level + geoids = get_geom_ids_table_for_summary(year, summary_level) + census_tables = [] + for table_name in table_names_batch: + df = get_summary_table(table_name, year, summary_level) + values = extract_values_at_specified_levels( + df, geoids, ACS_METADATA[int(year)]["geoIdCol"] + ) + try: + table = values[geo_level] + context.log.info(ic(table)) + context.log.info(ic(table.columns)) + census_tables.append(table) + except Exception as err: + msg = ( + f"Could not get table ({table_name}) at geo level ({geo_level}) " + f"for summary level ({summary_level}) in year ({year}) with " + f"error: {err}" + ) + context.log.warning(msg) + + if len(census_tables) > 0: + census_tables = reduce( + lambda left, right: left.merge( + right, on=METRICS_COL, how="outer", validate="one_to_one" + ), + census_tables, + ) + else: + census_tables = pd.DataFrame() + msg = ( + f"No tables at geo level ({geo_level}) " + f"for summary level ({summary_level}) in year ({year})." + ) + context.log.warning(msg) + + add_metadata( + context, census_tables, title=context.asset_partition_key_for_output() + ) + return census_tables + + def _source_metric_metadata( + self, + context, + catalog: pd.DataFrame, + source_data_releases: dict[str, SourceDataRelease], + ) -> MetricMetadata: ... + + def _gen_url( + self, col: str, table_names: list[str], year: int, summary_level: str + ) -> str: + base = ACS_METADATA[year]["base"] + summary_file_dir = base + ACS_METADATA[year][summary_level]["tables"] + for table_name in table_names: + table_id = table_name.split("-")[1].split(".")[0].upper() + col_start = col.split("_")[0] + if col_start == table_id: + return f"{summary_file_dir}/{table_name}" + # If no URL can be generated, return "TBD" + return "TBD" + + def _gen_parquet_path(self, partition_key: str) -> str: + return "/".join( + [ + self.key_prefix, + "metrics", + f"{''.join(c for c in partition_key if c.isalnum()) + '.parquet'}", + ] + ) + + @staticmethod + def _column_to_variable(name: str) -> str: + split = name.split("_") + return split[0] + "_" + split[1][1:] + + @staticmethod + def _variable_to_column(variable: str, type: str = "M") -> str: + split = variable.split("_") + return split[0] + f"_{type}" + split[1] + + def make_partial_metric_metadata( + self, + column: str, + variable_dictionary: pd.DataFrame, + source_data_release: SourceDataRelease, + partition_key: str, + table_names: Any, + year: str, + summary_level: str, + ) -> MetricMetadata: + variable = self._column_to_variable(column) + info = ( + variable_dictionary.loc[variable_dictionary["uniqueID"].eq(variable)] + .iloc[0] + .to_dict() + ) + + def gen_description(info: dict[str, str]) -> str: + return "; ".join( + [f"Key: {key}, Value: {value}" for key, value in info.items()] + ) + + def gen_hxl_tag(info: dict[str, str]) -> str: + return ( + "#" + + "".join( + [c for c in info["universe"].title() if c != " " and c.isalnum()] + ) + + "+" + + "+".join( + "".join(c for c in split.title() if c.isalnum() and c != " ") + for split in info["variableExtendedName"].split("|") + ) + ) + + def gen_human_readable_name() -> str: + return ( + f"{info['universe']}, {info['tableName']}, {year}, {summary_level}, " + f"{info['variableName']}" + ) + + return MetricMetadata( + human_readable_name=gen_human_readable_name(), + description=gen_description(info), + hxl_tag=gen_hxl_tag(info), + metric_parquet_path=self._gen_parquet_path(partition_key), + parquet_column_name=column, + parquet_margin_of_error_column=self._variable_to_column(variable, "E"), + parquet_margin_of_error_file=self._variable_to_column(variable, "M"), + potential_denominator_ids=None, + # TODO: get value + source_metric_id="TBD", + parent_metric_id=None, + source_data_release_id=source_data_release.id, + # TODO: check this works + source_download_url=self._gen_url( + column, table_names, int(year), summary_level + ), + source_archive_file_path=None, + # TODO: get value + source_documentation_url="TBD", + ) + + def create_derived_metrics(self): + """ + Creates an asset providing the metrics derived from the census tables and the + corresponding source metric metadata. + """ + + @multi_asset( + partitions_def=self.dataset_node_partition, + ins={ + "catalog": AssetIn(key_prefix=self.key_prefix), + "census_tables": AssetIn(key_prefix=self.key_prefix), + "source_data_releases": AssetIn(key_prefix=self.key_prefix), + }, + outs={ + "metrics_metadata_partitioned": AssetOut(key_prefix=self.key_prefix), + "metrics_partitioned": AssetOut(key_prefix=self.key_prefix), + }, + ) + def derived_metrics( + context, + catalog: pd.DataFrame, + census_tables: pd.DataFrame, + source_data_releases: dict[str, SourceDataRelease], + ) -> Tuple[list[MetricMetadata], MetricsOutput]: + + partition_key = context.partition_key + + row = catalog[catalog["partition_key"] == partition_key].iloc[0].to_dict() + year = row["year"] + summary_level = row["summary_level"] + geo_level = row["geo_level"] + table_names = row["table_names_batch"] + + # TODO: consider refactoring as asset + variable_dictionary = generate_variable_dictionary(year, summary_level) + if census_tables.shape[0] == 0 or census_tables.shape[1] == 0: + context.log.warning(f"No data found in parition: {partition_key}") + return [], MetricsOutput(metadata=[], metrics=pd.DataFrame()) + + metrics = census_tables.copy().set_index(METRICS_COL) + + if metrics.shape[1] == 0: + context.log.warning(f"No metrics found in parition: {partition_key}") + return [], MetricsOutput(metadata=[], metrics=pd.DataFrame()) + + estimates = select_estimates(metrics) + # TODO: No need to select errors, unless to check there is an error column + # errors = select_errors(metrics) + + if estimates.shape[1] == 0: + context.log.warning(f"No estimates found in parition: {partition_key}") + return [], MetricsOutput(metadata=[], metrics=pd.DataFrame()) + + derived_mmd = [] + metrics_out = [] + # Construct derived metrics if any + try: + # Make copy for derived estimates + for derived_column in DERIVED_COLUMN_SPECIFICATIONS[partition_key]: + # Log derived column + ic(context.log.debug(derived_column)) + derived = estimates.copy() + # Get uniqueID for metrics of the extended variable name + # from dictionary + cols = variable_dictionary[ + variable_dictionary["variableExtendedName"].isin( + derived_column.extended_variable_names + ) + & variable_dictionary["universe"] + .str.lower() + .eq("total population") + & variable_dictionary["tableID"].str.startswith( + derived_column.table_id + ) + ]["uniqueID"].to_list() + assert len(set(cols)) == len( + set(derived_column.extended_variable_names) + ) + + # Convert metric table column names to variable names + derived = derived.rename( + columns={ + col: self._column_to_variable(col) + for col in derived.columns + } + ) + derived_metric_out = ( + derived[cols] + .sum(axis=1) + .to_frame() + .rename(columns={0: derived_column.output_column_name}) + .reset_index() + ) + source_data_release = source_data_releases[ + f"{year}_{summary_level}_{geo_level}" + ] + derived_metric_metadata_out = MetricMetadata( + human_readable_name=derived_column.human_readable_name, + description=derived_column.human_readable_name, + hxl_tag=derived_column.hxltag, + metric_parquet_path=self._gen_parquet_path(partition_key), + parquet_column_name=derived_column.output_column_name, + # TODO: add error from combination of columns + parquet_margin_of_error_column=None, + parquet_margin_of_error_file=None, + potential_denominator_ids=None, + # TODO: get value + source_metric_id="TBD", + parent_metric_id=None, + source_data_release_id=source_data_release.id, + # TODO: check this works + source_download_url=self._gen_url( + cols[0], table_names, int(year), summary_level + ), + source_archive_file_path=None, + # TODO: get value + source_documentation_url="TBD", + ) + metrics_out.append(derived_metric_out) + derived_mmd.append(derived_metric_metadata_out) + # Log results + ic(context.log.debug(derived_metric_out)) + ic(context.log.debug(derived_metric_metadata_out)) + + except: + context.log.debug( + f"Partition key ({partition_key}) has no additional derived metric specifications." + ) + pass + + # Add remaining metrics + for col in estimates.columns: + metric_metadata = self.make_partial_metric_metadata( + col, + variable_dictionary, + source_data_releases[f"{year}_{summary_level}_{geo_level}"], + partition_key, + table_names, + year, + summary_level, + ) + derived_mmd.append(metric_metadata) + + metrics_out.append(metrics) + metrics = reduce( + lambda left, right: left.merge( + right, on=COL.GEO_ID.value, how="inner", validate="one_to_one" + ), + metrics_out, + ) + + context.add_output_metadata( + output_name="metrics_metadata_partitioned", + metadata={ + "metadata_preview": MetadataValue.md( + metadata_to_dataframe(derived_mmd).head().to_markdown() + ), + }, + ) + context.add_output_metadata( + output_name="metrics_partitioned", + metadata={ + "metadata_preview": MetadataValue.md( + metadata_to_dataframe(derived_mmd).head().to_markdown() + ), + "metrics_shape": f"{metrics.shape[0]} rows x {metrics.shape[1]} columns", + "metrics_preview": MetadataValue.md(metrics.head().to_markdown()), + }, + ) + return derived_mmd, MetricsOutput(metadata=derived_mmd, metrics=metrics) + + return derived_metrics + + # Implementation not required since overridden + def _derived_metrics(self, _census_tables): ... + + def create_metrics_metadata_output(self): + + @send_to_metrics_metadata_sensor + @asset(key_prefix=self.key_prefix) + def metrics_metadata_output( + context, + metrics_metadata_partitioned, + ) -> list[MetricMetadata]: + partition_names = context.instance.get_dynamic_partitions( + self.partition_name + ) + if len(partition_names) == 1: + metrics_metadata_partitioned = { + partition_names[0]: metrics_metadata_partitioned + } + + outputs = [ + mmd + for output in metrics_metadata_partitioned.values() + if len(metrics_metadata_partitioned) > 0 + for mmd in output + ] + context.add_output_metadata( + metadata={ + # TODO: check values are correct + "num_metrics": len(outputs), + "num_parquets": len(partition_names), + }, + ) + return outputs + + return metrics_metadata_output + + def create_metrics_partitioned_output(self): + # TODO: need to implement partitioned assets to be found by sensor + # For now use IO manager directly + # @send_to_metrics_partitioned_sensor + @asset( + key_prefix=self.key_prefix, + partitions_def=self.dataset_node_partition, + io_manager_key="metrics_partitioned_io_manager", + ) + def metrics_partitioned_output( + context, + metrics_partitioned: MetricsOutput, + ) -> MetricsOutput: + metadata, metrics = ( + metrics_partitioned.metadata, + metrics_partitioned.metrics, + ) + if len(metadata) == 0: + err_msg = ( + f"No metrics output for partition key: {context.partition_key}" + ) + context.log.warning(err_msg) + return metrics_partitioned + + context.add_output_metadata( + metadata={ + "metadata_preview": MetadataValue.md( + metadata_to_dataframe(metadata).head().to_markdown() + ), + "metrics_shape": f"{metrics.shape[0]} rows x {metrics.shape[1]} columns", + "metrics_preview": MetadataValue.md(metrics.head().to_markdown()), + }, + ) + return metrics_partitioned + + return metrics_partitioned_output + + +# Assets +usa = USA() +country_metadata = usa.create_country_metadata() +data_publisher = usa.create_data_publisher() +geometry = usa.create_geometry() +source_data_releases = usa.create_source_data_releases() +catalog = usa.create_catalog() +census_tables = usa.create_census_tables() +derived_metrics = usa.create_derived_metrics() +metrics_metadata_output = usa.create_metrics_metadata_output() +metrics_partitioned_output = usa.create_metrics_partitioned_output() diff --git a/python/popgetter/assets/usa/census_tasks.py b/python/popgetter/assets/usa/census_tasks.py new file mode 100644 index 0000000..d20fa10 --- /dev/null +++ b/python/popgetter/assets/usa/census_tasks.py @@ -0,0 +1,352 @@ +import pandas as pd +import urllib.request +import tempfile +import os +from functools import reduce +from tqdm import tqdm +from more_itertools import batched +import geopandas as gp +import subprocess +import docker +from pathlib import Path +import os +import tempfile + +SUMMARY_LEVELS={ + "tract": 140 , + "block_group":150, + "county":50 +} + +ACS_METADATA={ + # 2018 needs additional logic for joining together geometry files + # 2018:{ + # base:"https://www2.census.gov/programs-surveys/acs/summary_file/2018", + # type:'table', + # geoms:{ + # tract:, + # block:"", + # county:"https://www2.census.gov/geo/tiger/GENZ2018/shp/cb_2018_us_county_500k.zip": + # } + # oneYear:{ + # tables:"prototype/1YRData/", + # geoIds:"https://www2.census.gov/programs-surveys/acs/summary_file/2018/prototype/20181YRGeos.csv", + # shells:"prototype/ACS2018_Table_Shells.xlsx" + # }, + # fiveYear:{ + # tables:"prototype/5YRData/", + # geoIds:"https://www2.census.gov/programs-surveys/acs/summary_file/2018/prototype/20185YRGeos.csv", + # shells:"prototype/ACS2018_Table_Shells.xlsx" + # } + # }, + 2019:{ + "base" : "https://www2.census.gov/programs-surveys/acs/summary_file/2019/", + "type":'table', + "geoms":{ + "tract": "https://www2.census.gov/geo/tiger/GENZ2019/shp/cb_2019_us_tract_500k.zip", + "block_group":"https://www2.census.gov/geo/tiger/GENZ2019/shp/cb_2019_us_bg_500k.zip", + "county": "https://www2.census.gov/geo/tiger/GENZ2019/shp/cb_2019_us_county_500k.zip" + }, + "oneYear":{ + "tables":"prototype/1YRData/", + "geoIds":"prototype/Geos20191YR.csv", + "shells":"prototype/ACS2019_Table_Shells.csv" + }, + "fiveYear":{ + "tables":"prototype/5YRData/", + "geoIds":"prototype/Geos20195YR.csv", + "shells":"prototype/ACS2019_Table_Shells.csv" + }, + "geoIdCol": "DADSID" + }, + # Note 1 year esimates are not avaliable because of covid. More details of the exeprimental estimates + # are here : https://www.census.gov/programs-surveys/acs/technical-documentation/table-and-geography-changes/2020/1-year.html + 2020:{ + "base":"https://www2.census.gov/programs-surveys/acs/summary_file/2020/", + "type":'table', + "geoms":{ + "tract":"https://www2.census.gov/geo/tiger/GENZ2020/shp/cb_2020_us_tract_500k.zip", + "block_group":"https://www2.census.gov/geo/tiger/GENZ2020/shp/cb_2020_us_bg_500k.zip", + "county":"https://www2.census.gov/geo/tiger/GENZ2020/shp/cb_2020_us_county_500k.zip" + }, + "shells":"prototype/ACS2020_Table_Shells.csv", + "oneYear":None, + "fiveYear":{ + "shells":"prototype/ACS2020_Table_Shells.csv", + "tables":"prototype/5YRData/", + "geoIds":"prototype/Geos20205YR.csv", + }, + "geoIdCol": "DADSID" + }, + 2021:{ + "base":"https://www2.census.gov/programs-surveys/acs/summary_file/2021/table-based-SF/", + "type":'table', + "geoIdsSep":"|", + "geoms":{ + "tract":"https://www2.census.gov/geo/tiger/GENZ2021/shp/cb_2021_us_tract_500k.zip", + "block_group":"https://www2.census.gov/geo/tiger/GENZ2021/shp/cb_2021_us_bg_500k.zip", + "county":"https://www2.census.gov/geo/tiger/GENZ2021/shp/cb_2021_us_county_500k.zip" + }, + "oneYear":{ + "tables":"data/1YRData/", + "geoIds":"documentation/Geos20211YR.txt", + "shells":"documentation/ACS20211YR_Table_Shells.txt" + }, + "fiveYear":{ + "tables":"data/5YRData/", + "geoIds":"documentation/Geos20215YR.txt", + "shells":"documentation/ACS20215YR_Table_Shells.txt" + }, + "geoIdCol": "GEO_ID" + } +} + +def generate_variable_dictionary(year:int, summary_level:str): + metadata = ACS_METADATA[year] + base = metadata["base"] + shells = metadata[summary_level]["shells"] + + # Config for each year + if int(year) == 2019: + raw= pd.read_csv(base+shells, encoding="latin") + unique_id_col_name = "UniqueID" + elif int(year) == 2020: + raw= pd.read_csv(base+shells, encoding="latin") + unique_id_col_name = "Unique ID" + elif int(year) == 2021: + raw= pd.read_csv(base+shells, sep="|") + unique_id_col_name = "Unique ID" + raw = raw.rename(columns={"Label": "Stub"}) + else: + raise ValueError(f"generate_variable_dictionary() not implemented for year: {year}") + + result = [] # pd.DataFrame(columns=["tableID",unique_id_col_name, "universe","tableName", "variableName", "variableExtedndedName"]) + universe ="" + tableName = "" + path =[] + previousWasEdge = True + for (index,row) in raw.iterrows(): + if(( type(row["Table ID"]) == str and len(row["Table ID"].strip())==0) or type(row["Table ID"]) == float): + # path=[] + # previousWasEdge = True + continue + + stub = row["Stub"] + # Get table name + if year != 2021: + if row[[unique_id_col_name]].isna().all() and not row[["Data Release"]].isna().all(): + table_name = row["Stub"].title() + elif row[[unique_id_col_name]].isna().all(): + if("Universe" in stub): + universe = stub.split("Universe:")[1].strip() + else: + universe=stub + # Get universe + if not (row[[unique_id_col_name]].isna().all()): + # Universe is a column for 2021 + if year == 2021: + universe = row["Universe"] + table_name = row["Title"].title() + if (":" in stub): + if(previousWasEdge): + path.append(stub.replace(":","")) + else: + path.pop() + path.append(stub.replace(":","")) + else: + previousWasEdge = False + extendedName = "|".join(path) + if(":" not in stub): + extendedName = extendedName + "|"+stub + result.append({ + "tableID": row["Table ID"], + "uniqueID":row[unique_id_col_name], + "universe":universe, + "tableName": table_name, + "variableName":stub, + "variableExtendedName": extendedName + }) + + + + + return pd.DataFrame.from_records(result) + + +def download_cartography_file(year: int, admin_level: str, work_dir: str| None = None): + metadata = ACS_METADATA[year] + url = metadata['geoms'][admin_level] + if(work_dir == None): + work_dir = tempfile.mkdtemp() + local_dir = os.path.join(work_dir, admin_level+".zip") + urllib.request.urlretrieve(url, local_dir) + return local_dir + +def convert_cartography_file_to_formats(path: str): + data= gp.read_file(f"zip://{path}") + data.to_parquet(path.replace(".zip",".parquet")) + data.to_file(path.replace(".zip",".flatgeobuff"), driver="FlatGeobuf") + data.to_file(path.replace(".zip", ".geojsonseq"), driver="GeoJSONSeq") + +def generate_pmtiles(path:str): + client = docker.from_env() + mount_folder = Path(path).resolve() + container =client.containers.run("stuartlynn/tippecanoe:latest", + "tippecanoe -o tracts.pmtiles tracts.geojsonseq", + volumes={mount_folder: {"bind":"/app","mode":"rw"} }, + detach=True, + remove=True) + + output = container.attach(stdout=True, stream=True, logs=True) + for line in output: + print(line) + +"""" + Return the fips codes for states as two digit zero padded strings. + Exclude state id which are placeholders for future potential states. +""" +def state_fips(): + return [ f'{n:02d}' for n in range(1,56) if n not in [3,7,14,43,52]] + +""" + Convert a data table to parquet and output +""" +def convert_to_parquet(files: list[str], metrics: [list[str]]): # type: ignore + pass + # variableDescs = get_variable_definitions() + # for metric in metrics: + # print(variableDescs[metric]) + + # for file in files: + # data = pd.read_csv(file, dtype={"GEO_ID":str}) + # data.drop(["state","county","tract"],axis=1).to_parquet(file.replace(".csv",".parquet")) + +""" + get the names of each summary file for a given year and summary level +""" +def get_summary_table_file_names(year:int, summary_level:str="fiveYear"): + metadata = ACS_METADATA[year] + base = metadata['base'] + table_path = base + metadata[summary_level]['tables'] + + table = pd.read_html(table_path)[0] + filtered = table[table['Name'].str.startswith("acs",na=False)] + return list(filtered["Name"]) + +""" + Get the geometry identifier table for a given year and summary_level +""" +def get_geom_ids_table_for_summary(year:int, summary_level:str): + path = ACS_METADATA[year]["base"] + ACS_METADATA[year][summary_level]['geoIds'] + sep = ACS_METADATA[year]["geoIdsSep"] if "geoIdsSep" in ACS_METADATA[year] else "," + table = pd.read_csv(path, encoding='latin', sep=sep, low_memory=False) + return table + +""" + Extract variables for the geographies we are interested in from the summary table +""" +def extract_values_at_specified_levels( + df: pd.DataFrame, geoids: pd.DataFrame, geo_ids_col: str = "DADSID" + ): + joined = pd.merge(df,geoids[[geo_ids_col,"SUMLEVEL"]], left_on="GEO_ID", right_on=geo_ids_col, how='left') + result = {} + + for (level, id) in SUMMARY_LEVELS.items(): + result[level]=( + joined[joined['SUMLEVEL']==id] + .drop( + ["SUMLEVEL"] + + ([geo_ids_col] if geo_ids_col != "GEO_ID" else []), axis=1) + ) + return result + + +""" + Download and process a specific summary table +""" +def get_summary_table(table_name: str, year:int, summary_level:str): + base = ACS_METADATA[year]["base"] + summary_file_dir = base + ACS_METADATA[year][summary_level]['tables'] + data = pd.read_csv(f"{summary_file_dir}/{table_name}", sep="|") + return data + +def select_estimates(df): + estimate_columns = [ col for col in df.columns if col[0]=="B" and col.split("_")[1][0]=="E"] + return df[estimate_columns] + +def select_errors(df): + error_columns = [ col for col in df.columns if col[0]=="B" and col.split("_")[1][0]=="M"] + return df[error_columns] + + +def merge_parquet_files(file_names): + result=pd.DataFrame() + for batch in tqdm(batched(file_names,20)): + newDFS = [select_estimates(pd.read_parquet(file).set_index("GEO_ID")) for file in batch] + result = pd.concat([result] + newDFS ,axis=1) + return result + +""" + Generate the metirc parquet files for a given year and summary level +""" +def process_year_summary_level(year:int, summary_level:str ="fiveYear"): + workdir = tempfile.mkdtemp() + tractDir = os.path.join(workdir,"tract") + blockGroupDir = os.path.join(workdir,"block_groups") + countyDir = os.path.join(workdir,"counties") + os.mkdir(blockGroupDir) + os.mkdir(tractDir) + os.mkdir(countyDir) + + + table_names = get_summary_table_file_names(year,summary_level) + geoids = get_geom_ids_table_for_summary(year,summary_level) + print("temp dir is ", workdir) + for table in tqdm(table_names): + df = get_summary_table(table, year,summary_level) + values = extract_values_at_specified_levels(df, geoids) + values['tract'].to_parquet(os.path.join(tractDir,table.replace(".dat",".parquet"))) + values['county'].to_parquet(os.path.join(countyDir,table.replace(".dat",".parquet"))) + values['block_group'].to_parquet(os.path.join(blockGroupDir,table.replace(".dat",".parquet"))) + + merge_parquet_files([os.path.join(countyDir,file) for file in os.listdir(countyDir)]).to_parquet(f"county_{year}_{summary_level}.parquet") + merge_parquet_files([os.path.join(tractDir,file) for file in os.listdir(tractDir)]).to_parquet(f"tract_{year}_{summary_level}.parquet") + merge_parquet_files([os.path.join(blockGroupDir,file) for file in os.listdir(blockGroupDir)]).to_parquet(f"block_groups_{year}_{summary_level}.parquet") + +if __name__ == "__main__": + process_year_summary_level(2019,"fiveYear") +# """ +# Old code for getting the metrics from the Census API +# """ +# def get_census_merics(years:list[int], admin_level:str, metrics:list[str]): +# import requests +# import json +# import csv + +# metrics = get_variable_definitions(); +# metrics = [k for k in metrics.keys() if k[0]=='D'] +# print(metrics) + +# for year in years: +# allValues =[] +# for state in state_fips(): +# print(f"Downloading {year} for {state}") +# api_call = f'https://api.census.gov/data/{year}/acs/acs5/profile?get=GEO_ID,{",".join(metrics)}&for={admin_level}:*&in=state:{state}&in=county:*' + +# print(api_call) +# try: +# r = requests.get(api_call) +# r.raise_for_status + +# data = r.json() +# if(len(allValues) > 0): +# allValues = allValues + data[1:] +# else: +# allValues = data +# except requests.exceptions.HTTPError as errh: +# print(errh) + + +# with open(f'acs5_{year}_by_tract.csv','w',newline='') as csvfile: +# csv.writer(csvfile).writerows(allValues) +# print("Got metric") diff --git a/python/popgetter/cloud_outputs/__init__.py b/python/popgetter/cloud_outputs/__init__.py index 40269e5..907b866 100644 --- a/python/popgetter/cloud_outputs/__init__.py +++ b/python/popgetter/cloud_outputs/__init__.py @@ -62,6 +62,25 @@ class MetricsOutput: metrics_sensor = metrics_factory.create_sensor() metrics_asset = metrics_factory.create_publishing_asset() +metrics_partitioned_factory = CloudAssetSensor( + io_manager_key="metrics_partitioned_io_manager", + prefix="metrics_partitioned", + interval=60, +) + +# TODO: commented out until implemented for partitioned assets +# metrics_partitioned_sensor = metrics_partitioned_factory.create_sensor() +# metrics_partitioned_asset = metrics_partitioned_factory.create_publishing_asset() + +metrics_metadata_factory = CloudAssetSensor( + io_manager_key="metrics_metadata_io_manager", + prefix="metrics_metadata", + interval=60, +) + +metrics_metadata_sensor = metrics_metadata_factory.create_sensor() +metrics_metadata_asset = metrics_metadata_factory.create_publishing_asset() + def send_to_metadata_sensor(asset: AssetsDefinition): metadata_factory.monitored_asset_keys.append(asset.key) @@ -76,3 +95,15 @@ def send_to_geometry_sensor(asset: AssetsDefinition): def send_to_metrics_sensor(asset: AssetsDefinition): metrics_factory.monitored_asset_keys.append(asset.key) return asset + + +# TODO: need to implement handling for partitions for this sensor +def send_to_metrics_partitioned_sensor(asset: AssetsDefinition): + # TODO: add partition key here + metrics_partitioned_factory.monitored_asset_keys.append(asset.key) + return asset + + +def send_to_metrics_metadata_sensor(asset: AssetsDefinition): + metrics_metadata_factory.monitored_asset_keys.append(asset.key) + return asset diff --git a/python/popgetter/io_managers/__init__.py b/python/popgetter/io_managers/__init__.py index 6f722b6..bcede0f 100644 --- a/python/popgetter/io_managers/__init__.py +++ b/python/popgetter/io_managers/__init__.py @@ -14,6 +14,7 @@ CountryMetadata, DataPublisher, GeometryMetadata, + MetricMetadata, SourceDataRelease, metadata_to_dataframe, ) @@ -337,3 +338,115 @@ def handle_output( ), } ) + + +class MetricsPartitionedIOManager(PopgetterIOManager): + def get_full_path_metrics( + self, + parquet_path: str, + ) -> UPath: + base_path = self.get_base_path() + return base_path / UPath(parquet_path) + + def handle_output( + self, + context: OutputContext, + metrics_output: MetricsOutput, + ) -> None: + # Check if empty + if metrics_output.metrics.shape == (0, 0): + err_msg = ( + "The dataframe of metrics passed to MetricsPartitionedIOManager" + f" is empty for partition key: {context.partition_key}" + ) + # TODO: consider whether better approach than raise (error for output) + # and returning early (no indication directly of empty dataframe + # aside from log). + # raise ValueError + context.log.warning(err_msg) + return + + # Check GEO_ID col + metrics_cols = set(metrics_output.metrics.columns) + if COL.GEO_ID.value not in metrics_cols: + # Check if it's the index, reset index if so + if metrics_output.metrics.index.name == COL.GEO_ID.value: + metrics_output.metrics = metrics_output.metrics.reset_index() + metrics_cols = set(metrics_output.metrics.columns) + else: + err_msg = ( + "The dataframe of metrics passed to MetricsIOManager" + f" must have a {COL.GEO_ID.value} column. It only has columns" + f" {metrics_cols}." + ) + raise ValueError(err_msg) + + # In each tuple, the list of MetricMetadata must all have the same + # filepath, as the corresponding dataframe is saved to that path + this_mmd_filepaths = { + mmd.metric_parquet_path for mmd in metrics_output.metadata + } + if len(this_mmd_filepaths) != 1: + err_msg = ( + "The list of MetricMetadata in each tuple passed to" + " MetricsIOManager must all have the same" + " `metric_parquet_path`." + ) + raise ValueError(err_msg) + + # Check that it's not already been used + # TODO: this check is not possible for partitioned assets since the + # data is confined to each partition + this_filepath = this_mmd_filepaths.pop() + + # Convert GEO_ID cols to strings + metrics_output.metrics[COL.GEO_ID.value] = metrics_output.metrics[ + COL.GEO_ID.value + ].astype("string") + + rel_path = metrics_output.metadata[0].metric_parquet_path + full_path = self.get_full_path_metrics(rel_path) + self.handle_df(context, metrics_output.metrics, full_path) + + # Add metadata + context.add_output_metadata( + metadata={ + "metric_parquet_path": this_filepath, + "num_metrics": metrics_output.metrics.shape[1] - 1, + } + ) + + def load_input(self, _context: InputContext) -> pd.DataFrame: + return super().load_input(_context) + + +class MetricsMetdataIOManager(PopgetterIOManager): + def get_full_path_metadata( + self, + context: OutputContext, + ) -> UPath: + base_path = self.get_base_path() + asset_prefix = list(context.partition_key.split("/"))[:-1] + return base_path / UPath("/".join([*asset_prefix, "metric_metadata.parquet"])) + + def handle_output( + self, + context: OutputContext, + obj: list[MetricMetadata], + ) -> None: + all_metadatas_df = metadata_to_dataframe(obj) + metadata_df_filepath = self.get_full_path_metadata(context) + self.handle_df(context, all_metadatas_df, metadata_df_filepath) + + context.add_output_metadata( + metadata={ + "num_metrics": all_metadatas_df.shape[0], + "metric_human_readable_names": all_metadatas_df[ + COL.METRIC_HUMAN_READABLE_NAME.value + ].tolist(), + "metadata_parquet_path": str(metadata_df_filepath), + "metadata_preview": MetadataValue.md( + all_metadatas_df.head().to_markdown() + ), + } + ) diff --git a/python/popgetter/io_managers/azure.py b/python/popgetter/io_managers/azure.py index fdc8155..f14e81e 100644 --- a/python/popgetter/io_managers/azure.py +++ b/python/popgetter/io_managers/azure.py @@ -27,7 +27,13 @@ from icecream import ic from upath import UPath -from . import GeoIOManager, MetadataIOManager, MetricsIOManager +from . import ( + GeoIOManager, + MetadataIOManager, + MetricsIOManager, + MetricsMetdataIOManager, + MetricsPartitionedIOManager, +) # Set no time limit on lease duration to enable large files to be uploaded _LEASE_DURATION = -1 @@ -181,6 +187,14 @@ class AzureMetricsIOManager(AzureMixin, MetricsIOManager): pass +class AzureMetricsPartitionedIOManager(AzureMixin, MetricsPartitionedIOManager): + pass + + +class AzureMetricsMetadataIOManager(AzureMixin, MetricsMetdataIOManager): + pass + + class AzureGeneralIOManager(AzureMixin, IOManager): """This class is used only for an asset which tests the Azure functionality (see cloud_outputs/azure_test.py). It is not used for publishing any diff --git a/python/popgetter/io_managers/local.py b/python/popgetter/io_managers/local.py index a1f6163..b04889c 100644 --- a/python/popgetter/io_managers/local.py +++ b/python/popgetter/io_managers/local.py @@ -7,7 +7,13 @@ from dagster import OutputContext from upath import UPath -from . import GeoIOManager, MetadataIOManager, MetricsIOManager +from . import ( + GeoIOManager, + MetadataIOManager, + MetricsIOManager, + MetricsMetdataIOManager, + MetricsPartitionedIOManager, +) class LocalMixin: @@ -49,3 +55,11 @@ def handle_geojsonseq( class LocalMetricsIOManager(LocalMixin, MetricsIOManager): pass + + +class LocalMetricsMetadataIOManager(LocalMixin, MetricsMetdataIOManager): + pass + + +class LocalMetricsPartitionedIOManager(LocalMixin, MetricsPartitionedIOManager): + pass