diff --git a/python/popgetter/assets/bel/__init__.py b/python/popgetter/assets/bel/__init__.py old mode 100755 new mode 100644 index 46f9119..1121bd8 --- a/python/popgetter/assets/bel/__init__.py +++ b/python/popgetter/assets/bel/__init__.py @@ -1,507 +1,547 @@ -#!/usr/bin/python3 from __future__ import annotations -from . import ( - census_derived, # noqa: F401 - census_geometry, # noqa: F401 - census_tables, # noqa: F401 +import os +from collections.abc import Callable +from dataclasses import dataclass +from datetime import date + +import pandas as pd +from dagster import AssetIn, MetadataValue, SpecificPartitionsPartitionMapping, asset +from icecream import ic +from rdflib import Graph, URIRef +from rdflib.namespace import DCAT, DCTERMS + +from popgetter.assets.bel.utils import ( + DOWNLOAD_HANDLERS, + check_not_str, + check_str, + filter_by_language, + get_distribution_url, + get_landpage_url, + married_status_to_string, + nationality_to_string, + no_op_format_handler, +) +from popgetter.assets.country import Country +from popgetter.cloud_outputs import ( + GeometryOutput, + MetricsOutput, + send_to_geometry_sensor, +) +from popgetter.metadata import ( + CountryMetadata, + DataPublisher, + GeometryMetadata, + MetricMetadata, + SourceDataRelease, + metadata_to_dataframe, +) +from popgetter.utils import add_metadata, markdown_from_plot + +KNOWN_FAILING_DATASETS = { + # sqlite compressed as tar.gz + "595", # Census 2011 - Matrix of commutes by statistical sector + # faulty zip file (confirmed by manual download) + "2676", + # Excel only (French and Dutch only) + "2654", # Geografische indelingen 2020 + "3961", # Geografische indelingen 2021 + # AccessDB only! + "4135", # Enterprises subject to VAT according to legal form (English only) + "4136", # Enterprises subject to VAT according to employer class (English only) +} + + +@dataclass +class DatasetSpecification: + # The geography level that the dataset is at. Must be one of the keys of + # `BELGIUM_GEOMETRY_LEVELS`. + geography_level: str + # The column in the source data that contains the metric of interest + source_column: str + # The column in the source data that contains the geoID + geoid_column: str + # The columns that need to be pivoted to generate the derived metrics. If + # this is empty, no pivoting is done, and the only metrics that are + # published will be that of the `source_column`. + pivot_columns: list[str] + # The HXL tag for the derived metrics. If `pivot_columns` is empty, this + # should just be a single string, which is the HXL tag of the original + # `source_column`. If `pivot_columns` is not empty, this should be a + # function which takes the values of the pivot columns in the correct + # order. See below for an example. + derived_hxl: str | Callable + # The human readable name(s) for the derived metrics. Follows the same + # rules as `derived_hxl`. + derived_human_readable_name: str | Callable + + +DATASET_SPECIFICATIONS = { + # Not actually census table -- these are the geometries from 2023 + "4726": DatasetSpecification( + geography_level="statistical_sector", # Lowest level available + source_column="", + geoid_column="", + pivot_columns=[], + derived_hxl="", + derived_human_readable_name="", + ), + # Population in statistical sectors, 2023 + "4796": DatasetSpecification( + geography_level="statistical_sector", + source_column="TOTAL", + geoid_column="CD_SECTOR", + pivot_columns=[], + derived_hxl="population+adm5+total+2023", + derived_human_readable_name="Population, total, 2023", + ), + # Population by nationality, marital status, age, and sex in + # municipalities, 2023 + "4689": DatasetSpecification( + geography_level="municipality", + source_column="MS_POPULATION", + geoid_column="CD_REFNIS", + pivot_columns=["CD_NATLTY", "CD_CIV_STS", "CD_AGE", "CD_SEX"], + derived_hxl=lambda n, ms, a, s: f"#population+adm4+{s.lower()}+age{a}+{nationality_to_string(n).lower()}+{married_status_to_string(ms).lower()}", + derived_human_readable_name=lambda n, ms, a, s: f"Population, {nationality_to_string(n)}, {married_status_to_string(ms)}, {'female' if s == 'F' else 'male'}, and age {a}", + ), +} + +REQUIRED_DATASETS = ( + None if os.getenv("ENV") == "PROD" else list(DATASET_SPECIFICATIONS.keys()) ) -# @asset(key_prefix=asset_prefix) -# def get_population_details_per_municipality(context): -# """ -# Downloads the population breakdown data per Municipality Sector and returns a DataFrame. - -# If the data has already been downloaded, it is not downloaded again and the -# DataFrame is loaded from the cache. - -# returns a DataFrame with one row per, with the number of people satisfying a -# unique combination of age, sex, civic (marital) status, per municipality. -# """ -# output_dir = Path("population") / "demographic_breakdown" - -# # Population -# # "Population by place of residence, nationality, marital status, age and sex" -# # https://statbel.fgov.be/en/open-data/population-place-residence-nationality-marital-status-age-and-sex-13 - -# # Data (excel) -# # "https://statbel.fgov.be/sites/default/files/files/opendata/bevolking%20naar%20woonplaats%2C%20nationaliteit%20burgelijke%20staat%20%2C%20leeftijd%20en%20geslacht/TF_SOC_POP_STRUCT_2023.xlsx" -# # Data (zipped txt) -# pop_data_url = "https://statbel.fgov.be/sites/default/files/files/opendata/bevolking%20naar%20woonplaats%2C%20nationaliteit%20burgelijke%20staat%20%2C%20leeftijd%20en%20geslacht/TF_SOC_POP_STRUCT_2023.zip" - -# zip_file_contents = "TF_SOC_POP_STRUCT_2023.txt" - -# url = f"zip://{zip_file_contents}::{pop_data_url}" - -# text_file = get_path_to_cache(url, output_dir, "rt") - -# with text_file.open() as f: -# population_df = pd.read_csv(f, sep="|", encoding="utf-8-sig") - -# population_df.index = population_df.index.astype(str) - -# context.add_output_metadata( -# metadata={ -# "num_records": len(population_df), # Metadata can be any key-value pair -# "columns": MetadataValue.md( -# "\n".join([f"- '`{col}`'" for col in population_df.columns.to_list()]) -# ), -# "preview": MetadataValue.md(population_df.head().to_markdown()), -# } -# ) - -# return population_df - - -# @asset(key_prefix=asset_prefix) -# def get_population_by_statistical_sector(context): -# """ -# Downloads the population data per Statistical Sector and returns a DataFrame. - -# If the data has already been downloaded, it is not downloaded again and the -# DataFrame is loaded from the cache. - -# returns a DataFrame with one row per Statistical Sector, with the total number -# of people per sector. -# """ -# output_dir = Path("population") / "per_sector" - -# # Population -# # Population by Statistical sector -# # https://statbel.fgov.be/en/open-data/population-statistical-sector-10 - -# pop_data_url = "https://statbel.fgov.be/sites/default/files/files/opendata/bevolking/sectoren/OPENDATA_SECTOREN_2022.zip" - -# # The column descriptions from https://statbel.fgov.be/sites/default/files/files/opendata/bevolking/sectoren/Columns%20description2020.xlsx -# # are incorrect and do not match the data. The correct column descriptions are taken from the data file itself. - -# # | Naam/Nom | -# # | ------------------- | -# # | CD_REFNIS | -# # | CD_SECTOR | -# # | TOTAL | -# # | DT_STRT_SECTOR | -# # | DT_STOP_SECTOR | -# # | OPPERVLAKKTE IN HM² | -# # | TX_DESCR_SECTOR_NL | -# # | TX_DESCR_SECTOR_FR | -# # | TX_DESCR_NL | -# # | TX_DESCR_FR | - -# zip_file_contents = "OPENDATA_SECTOREN_2022.txt" -# url = f"zip://{zip_file_contents}::{pop_data_url}" - -# text_file = get_path_to_cache(url, output_dir, "rt") - -# with text_file.open() as f: -# population_df = pd.read_csv(f, sep="|", encoding="utf-8-sig") - -# population_df.index = population_df.index.astype(str) -# population_df["CD_REFNIS"] = population_df["CD_REFNIS"].astype(str) - -# context.add_output_metadata( -# metadata={ -# "num_records": len(population_df), # Metadata can be any key-value pair -# "columns": MetadataValue.md( -# "\n".join([f"- '`{col}`'" for col in population_df.columns.to_list()]) -# ), -# "preview": MetadataValue.md(population_df.head().to_markdown()), -# } -# ) - -# return population_df - - -# def aggregate_population_details_per_municipalities( -# pop_per_municipality_df: pd.DataFrame, output_dir: Path -# ) -> pd.DataFrame: -# """ -# Aggregates a DataFrame of the population details per Statistical Sector to Municipalities. - -# The `pop_per_municipality_df` is assumed to be produced by `get_population_details_per_municipality()`. - -# Also saves the result to a CSV file in the output_dir - -# returns a DataFrame with one row per Municipality, with the total number of people - -# TODO: Explore the other ways of aggregating this data. -# """ -# # Columns detail take from https://statbel.fgov.be/sites/default/files/files/opendata/bevolking%20naar%20woonplaats%2C%20nationaliteit%20burgelijke%20staat%20%2C%20leeftijd%20en%20geslacht/Columns%20description.xlsx - -# # | Naam/Nom | Description | -# # | -------------------- | ------------------------------- | -# # | CD_MUNTY_REFNIS | Refnis code of the municipality | -# # | TX_MUNTY_DESCR_NL | Municipality name in Dutch | -# # | TX_MUNTY_DESCR_FR | Municipality name in French | -# # | CD_DSTR_REFNIS | Refnis code of the district | -# # | TX_ADM_DSTR_DESCR_NL | District name in Dutch | -# # | TX_ADM_DSTR_DESCR_FR | District name in French | -# # | CD_PROV_REFNIS | Refnis code of the province | -# # | TX_PROV_DESCR_NL | Province name in Dutch | -# # | TX_PROV_DESCR_FR | Province name in French | -# # | CD_RGN_REFNIS | Refnis code of the region | -# # | TX_RGN_DESCR_NL | Region name in Dutch | -# # | TX_RGN_DESCR_FR | Region name in French | -# # | CD_SEX | Gender | -# # | CD_NATLTY | Nationality code | -# # | TX_NATLTY_FR | Nationality in French | -# # | TX_NATLTY_NL | Nationality in Dutch | -# # | CD_CIV_STS | Civil status code | -# # | TX_CIV_STS_FR | Civil status in French | -# # | TX_CIV_STS_NL | Civil status in Dutch | -# # | CD_AGE | Age | -# # | MS_POPULATION | Number of individuals | -# # | CD_YEAR | Reference year | - -# # Drop all the columns we don't need - -# # TODO there are many different ways top aggregate this data. For now we just take the sum of the population for each municipality -# pop_per_municipality_df = pop_per_municipality_df[["CD_REFNIS", "MS_POPULATION"]] - -# munty_df = pop_per_municipality_df.groupby(by="CD_REFNIS").sum() -# munty_df.to_csv(output_dir / "municipalities.csv", sep="|") -# munty_df.index = munty_df.index.astype(str) - -# return munty_df - - -# @asset( -# key_prefix=asset_prefix, -# ) -# def get_car_per_sector(context): -# """ -# Downloads the number of cars per Statistical Sector and returns a DataFrame. - -# If the data has already been downloaded, it is not downloaded again and the -# DataFrame is loaded from the cache. - -# returns a DataFrame with one row per Statistical Sector, with the total number -# of cars per sector. -# """ -# output_dir = Path("car_ownership") - -# # Number of cars by Statistical sector -# # https://statbel.fgov.be/en/open-data/number-cars-statistical-sector -# cars_url = "https://statbel.fgov.be/sites/default/files/files/opendata/Aantal%20wagens%20per%20statistische%20sector/TF_CAR_HH_SECTOR.zip" - -# # Column names (inferred as the website links for the wrong column descriptions) - -# # | Column Name | -# # | ----------------- | -# # | CD_YEAR | -# # | CD_REFNIS | -# # | TX_MUNTY_DESCR_FR | -# # | TX_MUNTY_DESCR_NL | -# # | TX_MUNTY_DESCR_DE | -# # | TX_MUNTY_DESCR_EN | -# # | cd_sector | -# # | total_huisH | -# # | total_wagens | - -# zip_file_contents = "TF_CAR_HH_SECTOR.txt" -# url = f"zip://{zip_file_contents}::{cars_url}" - -# text_file = get_path_to_cache(url, output_dir, "rt") - -# with text_file.open() as f: -# car_per_sector_df = pd.read_csv(f, sep="|", encoding="utf-8-sig") - -# car_per_sector_df.index = car_per_sector_df.index.astype(str) - -# context.add_output_metadata( -# metadata={ -# "num_records": len(car_per_sector_df), # Metadata can be any key-value pair -# "columns": MetadataValue.md( -# "\n".join( -# [f"- '`{col}`'" for col in car_per_sector_df.columns.to_list()] -# ) -# ), -# "preview": MetadataValue.md(car_per_sector_df.head().to_markdown()), -# } -# ) - -# return car_per_sector_df - - -# @asset( -# key_prefix=asset_prefix, -# ) -# def get_car_ownership_by_housetype(context): -# """ -# Downloads the number of cars per household type by municipality and returns a DataFrame. - -# If the data has already been downloaded, it is not downloaded again and the -# DataFrame is loaded from the cache. - -# returns a DataFrame with one row per Municipality, with the total number -# of cars per household type. -# """ -# output_dir = Path("car_ownership") - -# # Number of cars per household type by municipality -# # https://statbel.fgov.be/en/open-data/number-cars-household-type-municipality -# car_per_household_url = "https://statbel.fgov.be/sites/default/files/files/opendata/Aantal%20wagens%20volgens%20huishoudtype%20per%20gemeente/TF_CAR_HHTYPE_MUNTY.zip" - -# # Column Description from https://statbel.fgov.be/sites/default/files/files/opendata/Aantal%20wagens%20volgens%20huishoudtype%20per%20gemeente/Columns%20description_TF_CAR_HHTYPE_MUNTY.xlsx - -# # | Omschrijving | Description | -# # | --------------------------- | ------------------------------- | -# # | Referentie jaar | Reference year | -# # | Refnis-code van de gemeente | Refnis code of the municipality | -# # | Naam van de gemeente in FR | Name of the municipality in FR | -# # | Naam van de gemeente in NL | Name of the municipality in NL | -# # | Huishoud type | Household type | -# # | Aantal huishoudens | Number of households | -# # | Aantal wagens | Number of cars | - -# # where "Household type" is one of: - -# # | Detail | -# # | ---------------------------------------- | -# # | 1\. People living alone | -# # | 2\. Married couples with no children | -# # | 3\. Married couples with children | -# # | 4\. Not-married couples with no children | -# # | 5\. Non-married couples with children | -# # | 6\. Single parents | -# # | 7\. Other types of household | -# # | 8\. Collective households | - -# zip_file_contents = "TF_CAR_HHTYPE_MUNTY.txt" -# url = f"zip://{zip_file_contents}::{car_per_household_url}" - -# text_file = get_path_to_cache(url, output_dir, "rt") - -# with text_file.open() as f: -# car_per_household_df = pd.read_csv(f, sep="|", encoding="utf-8-sig") - -# car_per_household_df.index = car_per_household_df.index.astype(str) - -# context.add_output_metadata( -# metadata={ -# "num_records": len( -# car_per_household_df -# ), # Metadata can be any key-value pair -# "columns": MetadataValue.md( -# "\n".join( -# [f"- '`{col}`'" for col in car_per_household_df.columns.to_list()] -# ) -# ), -# "preview": MetadataValue.md(car_per_household_df.head().to_markdown()), -# } -# ) - -# return car_per_household_df - - -# @asset( -# key_prefix=asset_prefix, -# ins={ -# "get_geometries": AssetIn(key_prefix=asset_prefix), -# "get_population_by_statistical_sector": AssetIn(key_prefix=asset_prefix), -# }, -# ) -# def sector_populations(context, get_geometries, get_population_by_statistical_sector): -# """ -# Returns a GeoDataFrame of the Statistical Sectors joined with the population per sector. -# """ -# # Population -# pop_gdf = get_geometries.merge( -# get_population_by_statistical_sector, -# right_on="CD_REFNIS", -# left_on="cd_munty_refnis", -# how="inner", -# ) - -# # Plot and convert the image to Markdown to preview it within Dagster -# # Yes we do pass the `plt` object to the markdown_from_plot function and not the `ax` object -# ax = pop_gdf.plot(column="TOTAL", legend=True, scheme="quantiles") -# ax.set_title("Populations per sector in Belgium") -# md_plot = markdown_from_plot(plt) - -# context.add_output_metadata( -# metadata={ -# "num_records": len(pop_gdf), # Metadata can be any key-value pair -# "columns": MetadataValue.md( -# "\n".join([f"- '`{col}`'" for col in pop_gdf.columns.to_list()]) -# ), -# "preview": MetadataValue.md( -# pop_gdf.loc[:, pop_gdf.columns != "geometry"].head().to_markdown() -# ), -# "plot": MetadataValue.md(md_plot), -# } -# ) - -# return pop_gdf - - -# @asset( -# key_prefix=asset_prefix, -# ins={ -# "get_geometries": AssetIn(key_prefix=asset_prefix), -# "get_car_per_sector": AssetIn(key_prefix=asset_prefix), -# }, -# ) -# def sector_car_ownership(context, get_geometries, get_car_per_sector): -# """ -# Returns a GeoDataFrame of the Statistical Sectors joined with the number of cars per sector. -# """ -# # Vehicle Ownership -# cars = get_car_per_sector -# cars["CD_REFNIS"] = cars["CD_REFNIS"].astype(str) -# cars_gdf = get_geometries.merge( -# cars, right_on="CD_REFNIS", left_on="cd_munty_refnis", how="inner" -# ) - -# # Plot and convert the image to Markdown to preview it within Dagster -# # Yes we do pass the `plt` object to the markdown_from_plot function and not the `ax` object -# ax = cars_gdf.plot(column="total_wagens", legend=True, scheme="quantiles") -# ax.set_title("Car ownership per sector in Belgium") -# md_plot = markdown_from_plot(plt) - -# context.add_output_metadata( -# metadata={ -# "num_records": len(cars_gdf), # Metadata can be any key-value pair -# "columns": MetadataValue.md( -# "\n".join([f"- '`{col}`'" for col in cars_gdf.columns.to_list()]) -# ), -# "preview": MetadataValue.md( -# cars_gdf.loc[:, cars_gdf.columns != "geometry"].head().to_markdown() -# ), -# "plot": MetadataValue.md(md_plot), -# } -# ) - -# return cars_gdf - - -# @asset( -# key_prefix=asset_prefix, -# ins={ -# "get_population_details_per_municipality": AssetIn(key_prefix=asset_prefix), -# }, -# ) -# def pivot_population( -# context, -# get_population_details_per_municipality: pd.DataFrame, -# ): -# # For brevity -# pop: pd.DataFrame = get_population_details_per_municipality - -# # Check that the columns we need are present (currently failing on Windows) on CI for some unknown reason -# assert "CD_REFNIS" in pop.columns -# assert "CD_AGE" in pop.columns -# assert "MS_POPULATION" in pop.columns -# assert len(pop) > 0 - -# # Drop all the columns we don't need -# pop = pop[ -# [ -# "CD_REFNIS", # keep -# # "CD_DSTR_REFNIS", # drop -# # "CD_PROV_REFNIS", # drop -# # "CD_RGN_REFNIS", # drop -# "CD_SEX", # keep -# # "CD_NATLTY", # drop -# # "CD_CIV_STS", # drop -# "CD_AGE", # keep -# "MS_POPULATION", # keep -# # "CD_YEAR", # drop -# ] -# ] - -# # Check that the columns we need are present (currently failing on Windows) on CI for some unknown reason -# assert "CD_REFNIS" in pop.columns -# assert "CD_AGE" in pop.columns -# assert "MS_POPULATION" in pop.columns -# assert len(pop) > 0 - -# new_table: pd.DataFrame = pd.DataFrame() - -# # Using HXL tags for variable names (https://hxlstandard.org/standard/1-1final/dictionary/#tag_population) -# columns: dict[str, pd.Series[bool]] = { -# "population_children_age5_17": (pop["CD_AGE"] >= 5) & (pop["CD_AGE"] < 18), -# "population_infants_age0_4": (pop["CD_AGE"] <= 4), -# "population_children_age0_17": (pop["CD_AGE"] >= 0) & (pop["CD_AGE"] < 18), -# "population_adults_f": (pop["CD_AGE"] > 18) & (pop["CD_SEX"] == "F"), -# "population_adults_m": (pop["CD_AGE"] > 18) & (pop["CD_SEX"] == "M"), -# "population_adults": (pop["CD_AGE"] > 18), -# "population_ind": (pop["CD_AGE"] >= 0), -# } - -# for col_name, filter in columns.items(): -# new_col_def = {col_name: pd.NamedAgg(column="MS_POPULATION", aggfunc="sum")} -# temp_table: pd.DataFrame = ( -# pop.loc[filter] -# .groupby(by="CD_REFNIS", as_index=True) -# .agg( -# func=None, -# **new_col_def, # type: ignore TODO, don't know why pyright is complaining here -# ) -# ) - -# if len(new_table) == 0: -# new_table = temp_table -# else: -# new_table = new_table.merge( -# temp_table, left_index=True, right_index=True, how="inner" -# ) - -# # table.set_index("CD_REFNIS", inplace=True, drop=False) - -# context.add_output_metadata( -# metadata={ -# "num_records": len(new_table), # Metadata can be any key-value pair -# "columns": MetadataValue.md( -# "\n".join([f"- '`{col}`'" for col in new_table.columns.to_list()]) -# ), -# "preview": MetadataValue.md(new_table.head().to_markdown()), -# } -# ) - -# return new_table - - -# @asset( -# key_prefix=asset_prefix, -# ins={ -# "aggregate_sectors_to_municipalities": AssetIn(key_prefix=asset_prefix), -# "pivot_population": AssetIn(key_prefix=asset_prefix), -# }, -# ) -# def municipalities_populations( -# context, -# aggregate_sectors_to_municipalities, -# pivot_population, -# ): -# """ -# Returns a GeoDataFrame of the Municipalities joined with the population per municipality. -# """ -# # Population -# population = pivot_population -# population.index = population.index.astype(str) - -# geom = aggregate_sectors_to_municipalities -# pop_gdf = geom.merge( -# population, right_on="CD_REFNIS", left_on="cd_munty_refnis", how="inner" -# ) - -# ax = pop_gdf.plot(column="population_ind", legend=True, scheme="quantiles") -# ax.set_title("Population per Municipality in Belgium") -# md_plot = markdown_from_plot(plt) - -# context.add_output_metadata( -# metadata={ -# "num_records": len(pop_gdf), # Metadata can be any key-value pair -# "columns": MetadataValue.md( -# "\n".join([f"- '`{col}`'" for col in pop_gdf.columns.to_list()]) -# ), -# "preview": MetadataValue.md( -# pop_gdf.loc[:, pop_gdf.columns != "geometry"].head().to_markdown() -# ), -# "plot": MetadataValue.md(md_plot), -# } -# ) - -# return pop_gdf +@dataclass +class BelgiumGeometryLevel: + level: str + hxl_tag: str + geo_id_column: str + name_columns: dict[str, str] # keys = language codes, values = column names + + +BELGIUM_GEOMETRY_LEVELS = { + "province": BelgiumGeometryLevel( + level="province", + hxl_tag="adm1", + geo_id_column="cd_prov_refnis", + name_columns={ + "nld": "tx_prov_descr_nl", + "fra": "tx_prov_descr_fr", + "deu": "tx_prov_descr_de", + }, + ), + "region": BelgiumGeometryLevel( + level="region", + hxl_tag="adm2", + geo_id_column="cd_rgn_refnis", + name_columns={ + "nld": "tx_rgn_descr_nl", + "fra": "tx_rgn_descr_fr", + "deu": "tx_rgn_descr_de", + }, + ), + "arrondisement": BelgiumGeometryLevel( + level="arrondisement", + hxl_tag="adm3", + geo_id_column="cd_dstr_refnis", + name_columns={ + "nld": "tx_adm_dstr_descr_nl", + "fra": "tx_adm_dstr_descr_fr", + "deu": "tx_adm_dstr_descr_de", + }, + ), + "municipality": BelgiumGeometryLevel( + level="municipality", + hxl_tag="adm4", + geo_id_column="cd_munty_refnis", + name_columns={ + "nld": "tx_munty_descr_nl", + "fra": "tx_munty_descr_fr", + "deu": "tx_munty_descr_de", + }, + ), + "statistical_sector": BelgiumGeometryLevel( + level="statistical_sector", + hxl_tag="adm5", + geo_id_column="cd_sector", + name_columns={ + "nld": "tx_sector_descr_nl", + "fra": "tx_sector_descr_fr", + "deu": "tx_sector_descr_de", + }, + ), +} + + +class Belgium(Country): + country_metadata: CountryMetadata = CountryMetadata( + name_short_en="Belgium", + name_official="Kingdom of Belgium", + iso3="BEL", + iso2="BE", + iso3166_2=None, + ) + + def _country_metadata(self, _context) -> CountryMetadata: + return self.country_metadata + + def _data_publisher( + self, _context, country_metadata: CountryMetadata + ) -> DataPublisher: + return DataPublisher( + name="Statbel", + url="https://statbel.fgov.be/en", + description="Statbel is the Belgian statistical office. It is part of the Federal Public Service Economy, SMEs, Self-employed and Energy.", + countries_of_interest=[country_metadata.id], + ) + + @staticmethod + def get_opendata_dataset_list() -> Graph: + """ + Returns a list of all the tables available in the Statbel Open Data portal. + + This document is essential reading for understanding the structure of the data: + https://github.com/belgif/inspire-dcat/blob/main/DCATAPprofil.en.md + """ + CATALOG_URL = ( + "https://doc.statbel.be/publications/DCAT/DCAT_opendata_datasets.ttl" + ) + graph = Graph() + graph.parse(CATALOG_URL, format="ttl") + return graph + + def _catalog(self, context) -> pd.DataFrame: + self.remove_all_partition_keys(context) + + opendata_dataset_list = self.get_opendata_dataset_list() + # Create the schema for the catalog + catalog_summary = { + "node": [], + "human_readable_name": [], + "description": [], + "metric_parquet_path": [], + "parquet_column_name": [], + "parquet_margin_of_error_column": [], + "parquet_margin_of_error_file": [], + "potential_denominator_ids": [], + "parent_metric_id": [], + "source_data_release_id": [], + "source_download_url": [], + "source_format": [], + "source_archive_file_path": [], + "source_documentation_url": [], + } + + # Loop over the datasets in the catalogue Graph + CATALOG_ROOT = URIRef("http://data.gov.be/catalog/statbelopen") + for dataset_id in opendata_dataset_list.objects( + subject=CATALOG_ROOT, predicate=DCAT.dataset, unique=True + ): + catalog_summary["node"].append( + str(dataset_id).removeprefix("https://statbel.fgov.be/node/") + ) + catalog_summary["human_readable_name"].append( + filter_by_language( + graph=opendata_dataset_list, + subject=dataset_id, + predicate=DCTERMS.title, + ) + ) + catalog_summary["description"].append( + filter_by_language( + opendata_dataset_list, + subject=dataset_id, + predicate=DCTERMS.description, + ) + ) + + # This is unknown at this stage + catalog_summary["metric_parquet_path"].append(None) + catalog_summary["parquet_margin_of_error_column"].append(None) + catalog_summary["parquet_margin_of_error_file"].append(None) + catalog_summary["potential_denominator_ids"].append(None) + catalog_summary["parent_metric_id"].append(None) + catalog_summary["source_data_release_id"].append(None) + catalog_summary["parquet_column_name"].append(None) + + download_url, archive_file_path, format = get_distribution_url( + opendata_dataset_list, dataset_id + ) + catalog_summary["source_download_url"].append(download_url) + catalog_summary["source_archive_file_path"].append(archive_file_path) + catalog_summary["source_format"].append(format) + + catalog_summary["source_documentation_url"].append( + get_landpage_url(opendata_dataset_list, dataset_id, language="en") + ) + + # Convert to dataframe and remove datasets which are known to fail + catalog_df = pd.DataFrame(data=catalog_summary, dtype="string") + catalog_df = catalog_df[~catalog_df["node"].isin(KNOWN_FAILING_DATASETS)] + + # If not production, restrict the catalog to only the required datasets + if REQUIRED_DATASETS is not None: + catalog_df = catalog_df[catalog_df["node"].isin(REQUIRED_DATASETS)] + + self.add_partition_keys(context, catalog_df["node"].to_list()) + add_metadata(context, catalog_df, "Catalog") + return catalog_df + + def _census_tables(self, context, catalog: pd.DataFrame) -> pd.DataFrame: + row = catalog[catalog["node"] == context.partition_key] + format = row["source_format"].iloc[0] + ic(format) + handler = DOWNLOAD_HANDLERS.get(format, no_op_format_handler) + return handler(context, row=row) + + def _geometry(self, _context) -> list[GeometryOutput]: + err = "The geometry asset for Belgium has a custom implementation as it depends on the census_tables asset." + raise NotImplementedError(err) + + def _source_data_releases( + self, _context, geometry, data_publisher + ) -> dict[str, SourceDataRelease]: + return { + geo_output.metadata.level: SourceDataRelease( + name="StatBel Open Data", + date_published=date(2015, 10, 22), + reference_period_start=date(2015, 10, 22), + reference_period_end=date(2015, 10, 22), + collection_period_start=date(2015, 10, 22), + collection_period_end=date(2015, 10, 22), + expect_next_update=date(2022, 1, 1), + url="https://statbel.fgov.be/en/open-data", + description="TBC", + data_publisher_id=data_publisher.id, + geometry_metadata_id=geo_output.metadata.id, + ) + for geo_output in geometry + } + + def _source_metric_metadata( + self, + context, + catalog: pd.DataFrame, + source_data_releases: dict[str, SourceDataRelease], + ) -> MetricMetadata: + catalog_row = catalog[catalog["node"] == context.partition_key].to_dict( + orient="records" + )[0] + dataset_spec = DATASET_SPECIFICATIONS[context.partition_key] + + return MetricMetadata( + human_readable_name=catalog_row["human_readable_name"], + source_download_url=catalog_row["source_download_url"], + source_archive_file_path=catalog_row["source_archive_file_path"], + source_documentation_url=catalog_row["source_documentation_url"], + source_data_release_id=source_data_releases[ + dataset_spec.geography_level + ].id, + parent_metric_id=None, + potential_denominator_ids=None, + description=catalog_row["description"].strip(), + source_metric_id=dataset_spec.source_column, + # These are to be replaced at the derived stage + metric_parquet_path="__PLACEHOLDER__", + hxl_tag="__PLACEHOLDER__", + parquet_column_name="__PLACEHOLDER__", + parquet_margin_of_error_file=None, + parquet_margin_of_error_column=None, + ) + + def _derived_metrics( + self, + context, + census_tables: pd.DataFrame, + source_metric_metadata: MetricMetadata, + ) -> MetricsOutput: + # Skip if we don't know what to do with this partition + try: + this_dataset_spec = DATASET_SPECIFICATIONS[context.partition_key] + except KeyError: + skip_reason = ( + f"No action specified for partition key {context.partition_key}" + ) + context.log.warning(skip_reason) + return MetricsOutput(metadata=[], metrics=pd.DataFrame()) + + # Return empty metrics if the source table is empty, if the source + # column is not present + skip_reason = None + if len(census_tables) == 0: + skip_reason = "Skipping as input table is empty" + context.log.warning(skip_reason) + return MetricsOutput(metadata=[], metrics=pd.DataFrame()) + if this_dataset_spec.source_column not in census_tables.columns: + skip_reason = ( + f"Skipping as source column '{this_dataset_spec.source_column}' is not" + f" present in the input table." + ) + context.log.warning(skip_reason) + return MetricsOutput(metadata=[], metrics=pd.DataFrame()) + + # Assign parquet file name + parquet_file_name = ( + f"{self.country_metadata.id}/metrics/" + f"{''.join(c for c in context.partition_key if c.isalnum()) + '.parquet'}" + ) + + # Rename the geoID column to GEO_ID + geo_id_col_name = this_dataset_spec.geoid_column + census_tables = census_tables.rename(columns={geo_id_col_name: "GEO_ID"}) + + # Generate derived metrics through pivoting + if len(this_dataset_spec.pivot_columns) > 0: + needed_columns = [ + "GEO_ID", + this_dataset_spec.source_column, + *this_dataset_spec.pivot_columns, + ] + census_table = census_tables[needed_columns] + census_table = census_table.pivot_table( + index="GEO_ID", + columns=this_dataset_spec.pivot_columns, + values=this_dataset_spec.source_column, + ) + # Generate metadata structs + derived_mmds = [] + for c in census_table.columns.to_flat_index(): + new_mmd = source_metric_metadata.copy() + new_mmd.parent_metric_id = source_metric_metadata.source_metric_id + new_mmd.metric_parquet_path = parquet_file_name + check_not_str(this_dataset_spec.derived_hxl) + new_mmd.hxl_tag = this_dataset_spec.derived_hxl( + *c + ) # type: ignore[reportCallIssue] + check_not_str(this_dataset_spec.derived_human_readable_name) + new_mmd.human_readable_name = ( + this_dataset_spec.derived_human_readable_name(*c) + ) # type: ignore[reportCallIssue] + new_mmd.parquet_column_name = "_".join([str(x) for x in c]) + derived_mmds.append(new_mmd) + # Rename columns + col_names = [m.parquet_column_name for m in derived_mmds] + census_table.columns = col_names + + else: + # No pivoting required. Just extract the column + census_table = census_tables[["GEO_ID", this_dataset_spec.source_column]] + # Generate metadata struct + new_mmd = source_metric_metadata.copy() + new_mmd.parent_metric_id = source_metric_metadata.source_metric_id + new_mmd.metric_parquet_path = parquet_file_name + check_str(this_dataset_spec.derived_hxl) + new_mmd.hxl_tag = this_dataset_spec.derived_hxl # type: ignore[reportAttributeAccessIssue] + check_str(this_dataset_spec.derived_human_readable_name) + new_mmd.human_readable_name = this_dataset_spec.derived_human_readable_name # type: ignore[reportAttributeAccessIssue] + new_mmd.parquet_column_name = this_dataset_spec.source_column + derived_mmds = [new_mmd] + + context.add_output_metadata( + metadata={ + "metadata_preview": MetadataValue.md( + metadata_to_dataframe(derived_mmds).head().to_markdown() + ), + "metrics_shape": f"{census_table.shape[0]} rows x {census_table.shape[1]} columns", + "metrics_preview": MetadataValue.md(census_table.head().to_markdown()), + }, + ) + return MetricsOutput(metadata=derived_mmds, metrics=census_table) + + +# Create assets +bel = Belgium() +country_metadata = bel.create_country_metadata() +data_publisher = bel.create_data_publisher() +catalog = bel.create_catalog() +census_tables = bel.create_census_tables() + + +@send_to_geometry_sensor +@asset( + ins={ + "sector_geometries": AssetIn( + key=[bel.country_metadata.id, "census_tables"], + partition_mapping=SpecificPartitionsPartitionMapping(["4726"]), + ), + }, + key_prefix=bel.country_metadata.id, +) +def geometry(context, sector_geometries) -> list[GeometryOutput]: + """ + Produces the full set of data / metadata associated with Belgian + municipalities. The outputs, in order, are: + + 1. A DataFrame containing a serialised GeometryMetadata object. + 2. A GeoDataFrame containing the geometries of the municipalities. + 3. A DataFrame containing the names of the municipalities (in this case, + they are in Dutch, French, and German). + """ + geometries_to_return = [] + + for level_details in BELGIUM_GEOMETRY_LEVELS.values(): + geometry_metadata = GeometryMetadata( + country_metadata=bel.country_metadata, + validity_period_start=date(2023, 1, 1), + validity_period_end=date(2023, 12, 31), + level=level_details.level, + hxl_tag=level_details.hxl_tag, + ) + + region_geometries = ( + sector_geometries.dissolve(by=level_details.geo_id_column) + .reset_index() + .rename(columns={level_details.geo_id_column: "GEO_ID"}) + .loc[:, ["geometry", "GEO_ID"]] + ) + ic(region_geometries.head()) + + region_names = ( + sector_geometries.rename( + columns={ + level_details.geo_id_column: "GEO_ID", + level_details.name_columns["nld"]: "nld", + level_details.name_columns["fra"]: "fra", + level_details.name_columns["deu"]: "deu", + } + ) + .loc[:, ["GEO_ID", "nld", "fra", "deu"]] + .drop_duplicates() + .astype({"GEO_ID": str}) + ) + ic(region_names.head()) + + geometries_to_return.append( + GeometryOutput( + metadata=geometry_metadata, gdf=region_geometries, names_df=region_names + ) + ) + + # Add output metadata + first_output = geometries_to_return[0] + first_joined_gdf = first_output.gdf.merge(first_output.names_df, on="GEO_ID") + ax = first_joined_gdf.plot(column="nld", legend=False) + ax.set_title(f"Belgium 2023 {first_output.metadata.level}") + md_plot = markdown_from_plot() + context.add_output_metadata( + metadata={ + "all_geom_levels": MetadataValue.md( + ",".join( + [geom_output.metadata.level for geom_output in geometries_to_return] + ) + ), + "first_geometry_plot": MetadataValue.md(md_plot), + "first_names_preview": MetadataValue.md( + first_output.names_df.head().to_markdown() + ), + } + ) + + return geometries_to_return + + +source_data_releases = bel.create_source_data_releases() +source_metric_metadata = bel.create_source_metric_metadata() +derived_metrics = bel.create_derived_metrics() +metrics = bel.create_metrics() diff --git a/python/popgetter/assets/bel/belgium.py b/python/popgetter/assets/bel/belgium.py deleted file mode 100644 index d3d0f89..0000000 --- a/python/popgetter/assets/bel/belgium.py +++ /dev/null @@ -1,16 +0,0 @@ -from __future__ import annotations - -from pathlib import Path - -from popgetter.metadata import CountryMetadata - -country: CountryMetadata = CountryMetadata( - name_short_en="Belgium", - name_official="Kingdom of Belgium", - iso3="BEL", - iso2="BE", - iso3166_2=None, -) - -WORKING_DIR = Path("belgium") -asset_prefix = "bel" diff --git a/python/popgetter/assets/bel/census_derived.py b/python/popgetter/assets/bel/census_derived.py deleted file mode 100644 index 2f4986d..0000000 --- a/python/popgetter/assets/bel/census_derived.py +++ /dev/null @@ -1,350 +0,0 @@ -from __future__ import annotations - -from collections.abc import Callable -from dataclasses import dataclass -from functools import reduce - -import pandas as pd -from dagster import ( - AssetIn, - IdentityPartitionMapping, - MetadataValue, - SpecificPartitionsPartitionMapping, - StaticPartitionsDefinition, - asset, -) -from icecream import ic - -from popgetter.cloud_outputs import MetricsOutput, send_to_metrics_sensor -from popgetter.metadata import MetricMetadata, SourceDataRelease, metadata_to_dataframe - -from .belgium import asset_prefix -from .census_tables import dataset_node_partition - -_needed_dataset = [ - { - # Population by Statistical sector, Period: 2023 - "node": "https://statbel.fgov.be/node/4796", - "hxltag": "#population+total+2023", - "source_column": "TOTAL", - }, - { - # Statistical sectors 2023 - "node": "https://statbel.fgov.be/node/4726", - "hxltag": "#geo+bounds+sector+2023", - "source_column": "", - }, - { - # Population by Statistical sector, Period: 2016 - "node": "https://statbel.fgov.be/node/1437", - "hxltag": "#population+total+2016", - "source_column": "POPULATION", - }, - { - # Population by Municipalities, Period: 2023 - "node": "https://statbel.fgov.be/node/4689", - "hxltag": "#population+admn1+total+2023", - "source_column": "MS_POPULATION", - }, -] - -_needed_dataset_nodes: list[str] = [r["node"] for r in _needed_dataset] -needed_dataset_mapping = SpecificPartitionsPartitionMapping(_needed_dataset_nodes) -needed_dataset_partition = StaticPartitionsDefinition(_needed_dataset_nodes) - - -@dataclass -class DerivedColumn: - hxltag: str - filter_func: Callable[[pd.DataFrame], pd.DataFrame] - output_column_name: str - human_readable_name: str - - -# The keys of this dict are the nodes (i.e. partition keys). The values are a -# list of all columns of data derived from this node. -DERIVED_COLUMN_SPECIFICATIONS: dict[str, tuple[str, list[DerivedColumn]]] = { - "https://statbel.fgov.be/node/4689": ( - "CD_REFNIS", - [ - DerivedColumn( - hxltag="#population+children+age5_17", - filter_func=lambda df: df.query("CD_AGE >= 5 and CD_AGE < 18"), - output_column_name="children_5_17", - human_readable_name="Children aged 5 to 17", - ), - DerivedColumn( - hxltag="#population+infants+age0_4", - filter_func=lambda df: df.query("CD_AGE >= 0 and CD_AGE < 5"), - output_column_name="infants_0_4", - human_readable_name="Infants aged 0 to 4", - ), - DerivedColumn( - hxltag="#population+children+age0_17", - filter_func=lambda df: df.query("CD_AGE >= 0 and CD_AGE < 18"), - output_column_name="children_0_17", - human_readable_name="Children aged 0 to 17", - ), - DerivedColumn( - hxltag="#population+adults+f", - filter_func=lambda df: df.query("CD_AGE >= 18 and CD_SEX == 'F'"), - output_column_name="adults_f", - human_readable_name="Female adults", - ), - DerivedColumn( - hxltag="#population+adults+m", - filter_func=lambda df: df.query("CD_AGE >= 18 and CD_SEX == 'M'"), - output_column_name="adults_m", - human_readable_name="Male adults", - ), - DerivedColumn( - hxltag="#population+adults", - filter_func=lambda df: df.query("CD_AGE >= 18"), - output_column_name="adults", - human_readable_name="Adults", - ), - DerivedColumn( - hxltag="#population+ind", - filter_func=lambda df: df, - output_column_name="individuals", - human_readable_name="Total individuals", - ), - ], - ) -} - - -@asset(key_prefix=asset_prefix) -def needed_datasets(context) -> pd.DataFrame: - needed_df = pd.DataFrame( - _needed_dataset, - columns=["node", "hxltag", "source_column", "derived_columns"], - dtype="string", - ) - - # Now add some metadata to the context - context.add_output_metadata( - # Metadata can be any key-value pair - metadata={ - "num_records": len(needed_df), - "columns": MetadataValue.md( - "\n".join([f"- '`{col}`'" for col in needed_df.columns.to_list()]) - ), - "columns_types": MetadataValue.md(needed_df.dtypes.to_markdown()), - "preview": MetadataValue.md(needed_df.to_markdown()), - } - ) - - return needed_df - - -def make_census_table_metadata( - catalog_row: dict, source_data_release: SourceDataRelease -) -> MetricMetadata: - return MetricMetadata( - human_readable_name=catalog_row["human_readable_name"], - source_download_url=catalog_row["source_download_url"], - source_archive_file_path=catalog_row["source_archive_file_path"], - source_documentation_url=catalog_row["source_documentation_url"], - source_data_release_id=source_data_release.id, - parent_metric_id=None, - potential_denominator_ids=None, - parquet_margin_of_error_file=None, - parquet_margin_of_error_column=None, - parquet_column_name=catalog_row["source_column"], - metric_parquet_path="__PLACEHOLDER__", - hxl_tag=catalog_row["hxltag"], - description=catalog_row["description"], - source_metric_id=catalog_row["source_column"], - ) - - -@asset( - key_prefix=asset_prefix, - ins={ - "catalog_as_dataframe": AssetIn(partition_mapping=needed_dataset_mapping), - }, -) -def filter_needed_catalog( - context, needed_datasets, catalog_as_dataframe: pd.DataFrame -) -> pd.DataFrame: - ic(needed_datasets.head()) - ic(needed_datasets.columns) - ic(needed_datasets.dtypes) - - needed_df = needed_datasets.merge(catalog_as_dataframe, how="inner", on="node") - - context.add_output_metadata( - metadata={ - "num_records": len(needed_df), - "columns": MetadataValue.md( - "\n".join([f"- '`{col}`'" for col in needed_df.columns.to_list()]) - ), - "preview": MetadataValue.md(needed_df.to_markdown()), - } - ) - - return needed_df - - -@asset( - ins={ - "individual_census_table": AssetIn( - key_prefix=asset_prefix, partition_mapping=needed_dataset_mapping - ), - "filter_needed_catalog": AssetIn(key_prefix=asset_prefix), - "source_data_releases": AssetIn(key_prefix=asset_prefix), - }, - partitions_def=dataset_node_partition, - key_prefix=asset_prefix, -) -def source_metrics_by_partition( - context, - individual_census_table: dict[str, pd.DataFrame], - filter_needed_catalog: pd.DataFrame, - # TODO: generalise to list or dict of SourceDataReleases as there may be - # tables in here that are not at the same release level - # E.g. keys as Geography level ID - source_data_releases: dict[str, SourceDataRelease], - # TODO: return an intermediate type instead of MetricMetadata -) -> tuple[MetricMetadata, pd.DataFrame]: - input_partition_keys = context.asset_partition_keys_for_input( - input_name="individual_census_table" - ) - output_partition_key = context.partition_key - - if output_partition_key not in input_partition_keys: - skip_reason = f"Skipping as requested partition {output_partition_key} is not part of the 'needed' partitions {input_partition_keys}" - context.log.warning(skip_reason) - raise RuntimeError(skip_reason) - - try: - result_df = individual_census_table[output_partition_key] - except KeyError: - err_msg = ( - f"Partition key {output_partition_key} not found in individual_census_table\n" - f"Available keys are {individual_census_table.keys()}" - ) - raise ValueError(err_msg) from None - - catalog_row = filter_needed_catalog[ - filter_needed_catalog["node"] == output_partition_key - ].to_dict(orient="records")[0] - - # TODO: refine upon more general level handling with derived column config. - # This config is currently called `DERIVED_COLUMN_SPECIFICATIONS` here and the - # level can also be included there. - key = "municipality" - result_mmd = make_census_table_metadata(catalog_row, source_data_releases[key]) - - return result_mmd, result_df - - -@asset( - partitions_def=dataset_node_partition, - ins={ - "source_metrics_by_partition": AssetIn( - key_prefix=asset_prefix, partition_mapping=IdentityPartitionMapping() - ), - }, - key_prefix=asset_prefix, -) -def derived_metrics_by_partition( - context, - source_metrics_by_partition: tuple[MetricMetadata, pd.DataFrame], -) -> MetricsOutput: - node = context.partition_key - - source_mmd, source_table = source_metrics_by_partition - source_column = source_mmd.parquet_column_name - assert source_column in source_table.columns - assert len(source_table) > 0 - - try: - geo_id_col_name, metric_specs = DERIVED_COLUMN_SPECIFICATIONS[node] - except KeyError: - skip_reason = ( - f"Skipping as no derived columns are to be created for node {node}" - ) - context.log.warning(skip_reason) - raise RuntimeError(skip_reason) from None - - # Rename the geoID column to GEO_ID - source_table = source_table.rename(columns={geo_id_col_name: "GEO_ID"}) - - derived_metrics: list[pd.DataFrame] = [] - derived_mmd: list[MetricMetadata] = [] - - parquet_file_name = ( - f"{asset_prefix}/metrics/" - f"{''.join(c for c in node if c.isalnum()) + '.parquet'}" - ) - - for metric_spec in metric_specs: - new_table = ( - source_table.pipe(metric_spec.filter_func) - .groupby(by="GEO_ID", as_index=True) - .sum() - .rename(columns={source_column: metric_spec.output_column_name}) - .filter(items=["GEO_ID", metric_spec.output_column_name]) - ) - derived_metrics.append(new_table) - - new_mmd = source_mmd.copy() - new_mmd.parent_metric_id = source_mmd.source_metric_id - new_mmd.metric_parquet_path = parquet_file_name - new_mmd.hxl_tag = metric_spec.hxltag - new_mmd.parquet_column_name = metric_spec.output_column_name - new_mmd.human_readable_name = metric_spec.human_readable_name - derived_mmd.append(new_mmd) - - joined_metrics = reduce( - lambda left, right: left.merge( - right, on="GEO_ID", how="inner", validate="one_to_one" - ), - derived_metrics, - ) - - context.add_output_metadata( - metadata={ - "metadata_preview": MetadataValue.md( - metadata_to_dataframe(derived_mmd).head().to_markdown() - ), - "metrics_shape": f"{joined_metrics.shape[0]} rows x {joined_metrics.shape[1]} columns", - "metrics_preview": MetadataValue.md(joined_metrics.head().to_markdown()), - }, - ) - - return MetricsOutput(metadata=derived_mmd, metrics=joined_metrics) - - -@send_to_metrics_sensor -@asset( - ins={ - "derived_metrics_by_partition": AssetIn( - key_prefix=asset_prefix, - partition_mapping=SpecificPartitionsPartitionMapping( - ["https://statbel.fgov.be/node/4689"] - ), - ), - }, - key_prefix=asset_prefix, -) -def metrics( - context, - derived_metrics_by_partition: MetricsOutput, -) -> list[MetricsOutput]: - """ - This asset exists solely to aggregate all the derived tables into one - single unpartitioned asset, which the downstream publishing tasks can use. - - Right now it is a bit boring because it only relies on one partition, but - it could be extended when we have more data products. - """ - context.add_output_metadata( - metadata={ - "num_metrics": len(derived_metrics_by_partition.metadata), - "num_parquets": 1, - }, - ) - return [derived_metrics_by_partition] diff --git a/python/popgetter/assets/bel/census_geometry.py b/python/popgetter/assets/bel/census_geometry.py deleted file mode 100644 index a0c22e2..0000000 --- a/python/popgetter/assets/bel/census_geometry.py +++ /dev/null @@ -1,200 +0,0 @@ -from __future__ import annotations - -from dataclasses import dataclass -from datetime import date - -import matplotlib.pyplot as plt -from dagster import ( - AssetIn, - MetadataValue, - SpecificPartitionsPartitionMapping, - asset, -) -from icecream import ic - -from popgetter.cloud_outputs import ( - GeometryOutput, - send_to_geometry_sensor, - send_to_metadata_sensor, -) -from popgetter.metadata import ( - GeometryMetadata, - SourceDataRelease, -) -from popgetter.utils import markdown_from_plot - -from .belgium import asset_prefix, country -from .census_tables import publisher - - -@dataclass -class BelgiumGeometryLevel: - level: str - hxl_tag: str - geo_id_column: str - name_columns: dict[str, str] # keys = language codes, values = column names - - -BELGIUM_GEOMETRY_LEVELS = { - "province": BelgiumGeometryLevel( - level="province", - hxl_tag="adm1", - geo_id_column="cd_prov_refnis", - name_columns={ - "nld": "tx_prov_descr_nl", - "fra": "tx_prov_descr_fr", - "deu": "tx_prov_descr_de", - }, - ), - "region": BelgiumGeometryLevel( - level="region", - hxl_tag="adm2", - geo_id_column="cd_rgn_refnis", - name_columns={ - "nld": "tx_rgn_descr_nl", - "fra": "tx_rgn_descr_fr", - "deu": "tx_rgn_descr_de", - }, - ), - "arrondisement": BelgiumGeometryLevel( - level="arrondisement", - hxl_tag="adm3", - geo_id_column="cd_dstr_refnis", - name_columns={ - "nld": "tx_adm_dstr_descr_nl", - "fra": "tx_adm_dstr_descr_fr", - "deu": "tx_adm_dstr_descr_de", - }, - ), - "municipality": BelgiumGeometryLevel( - level="municipality", - hxl_tag="adm4", - geo_id_column="cd_munty_refnis", - name_columns={ - "nld": "tx_munty_descr_nl", - "fra": "tx_munty_descr_fr", - "deu": "tx_munty_descr_de", - }, - ), - "statistical_sector": BelgiumGeometryLevel( - level="statistical_sector", - hxl_tag="adm5", - geo_id_column="cd_sector", - name_columns={ - "nld": "tx_sector_descr_nl", - "fra": "tx_sector_descr_fr", - "deu": "tx_sector_descr_de", - }, - ), -} - - -@send_to_geometry_sensor -@asset( - ins={ - "sector_geometries": AssetIn( - key=[asset_prefix, "individual_census_table"], - partition_mapping=SpecificPartitionsPartitionMapping( - ["https://statbel.fgov.be/node/4726"] - ), - ), - }, - key_prefix=asset_prefix, -) -def geometry(context, sector_geometries) -> list[GeometryOutput]: - """ - Produces the full set of data / metadata associated with Belgian - municipalities. The outputs, in order, are: - - 1. A DataFrame containing a serialised GeometryMetadata object. - 2. A GeoDataFrame containing the geometries of the municipalities. - 3. A DataFrame containing the names of the municipalities (in this case, - they are in Dutch, French, and German). - """ - geometries_to_return = [] - - for level_details in BELGIUM_GEOMETRY_LEVELS.values(): - geometry_metadata = GeometryMetadata( - country_metadata=country, - validity_period_start=date(2023, 1, 1), - validity_period_end=date(2023, 12, 31), - level=level_details.level, - hxl_tag=level_details.hxl_tag, - ) - - region_geometries = ( - sector_geometries.dissolve(by=level_details.geo_id_column) - .reset_index() - .rename(columns={level_details.geo_id_column: "GEO_ID"}) - .loc[:, ["geometry", "GEO_ID"]] - ) - ic(region_geometries.head()) - - region_names = ( - sector_geometries.rename( - columns={ - level_details.geo_id_column: "GEO_ID", - level_details.name_columns["nld"]: "nld", - level_details.name_columns["fra"]: "fra", - level_details.name_columns["deu"]: "deu", - } - ) - .loc[:, ["GEO_ID", "nld", "fra", "deu"]] - .drop_duplicates() - .astype({"GEO_ID": str}) - ) - ic(region_names.head()) - - geometries_to_return.append( - GeometryOutput( - metadata=geometry_metadata, gdf=region_geometries, names_df=region_names - ) - ) - - # Add output metadata - first_output = geometries_to_return[0] - first_joined_gdf = first_output.gdf.merge(first_output.names_df, on="GEO_ID") - ax = first_joined_gdf.plot(column="nld", legend=False) - ax.set_title(f"Belgium 2023 {first_output.metadata.level}") - md_plot = markdown_from_plot(plt) - context.add_output_metadata( - metadata={ - "all_geom_levels": MetadataValue.md( - ",".join( - [geom_output.metadata.level for geom_output in geometries_to_return] - ) - ), - "first_geometry_plot": MetadataValue.md(md_plot), - "first_names_preview": MetadataValue.md( - first_output.names_df.head().to_markdown() - ), - } - ) - - return geometries_to_return - - -@send_to_metadata_sensor -@asset(key_prefix=asset_prefix) -def source_data_releases( - geometry: list[GeometryOutput], -) -> dict[str, SourceDataRelease]: - """ - Returns all SourceDataReleases for each geometry level. - """ - return { - geo_output.metadata.level: SourceDataRelease( - name="StatBel Open Data", - date_published=date(2015, 10, 22), - reference_period_start=date(2015, 10, 22), - reference_period_end=date(2015, 10, 22), - collection_period_start=date(2015, 10, 22), - collection_period_end=date(2015, 10, 22), - expect_next_update=date(2022, 1, 1), - url="https://statbel.fgov.be/en/open-data", - description="TBC", - data_publisher_id=publisher.id, - geometry_metadata_id=geo_output.metadata.id, - ) - for geo_output in geometry - } diff --git a/python/popgetter/assets/bel/census_tables.py b/python/popgetter/assets/bel/utils.py similarity index 64% rename from python/popgetter/assets/bel/census_tables.py rename to python/popgetter/assets/bel/utils.py index 812c241..1571884 100644 --- a/python/popgetter/assets/bel/census_tables.py +++ b/python/popgetter/assets/bel/utils.py @@ -3,197 +3,22 @@ import csv import sqlite3 import zipfile +from collections.abc import Callable from pathlib import Path, PurePath from tempfile import TemporaryDirectory from urllib.parse import urlparse import geopandas as gpd -import matplotlib.pyplot as plt import pandas as pd import requests -from dagster import ( - DynamicPartitionsDefinition, - MetadataValue, - asset, -) +from dagster import MetadataValue from icecream import ic from rdflib import Graph, URIRef from rdflib.namespace import DCAT, DCTERMS, SKOS -from popgetter.cloud_outputs import send_to_metadata_sensor -from popgetter.metadata import ( - CountryMetadata, - DataPublisher, -) from popgetter.utils import extract_main_file_from_zip, markdown_from_plot -from .belgium import asset_prefix, country - -publisher: DataPublisher = DataPublisher( - name="Statbel", - url="https://statbel.fgov.be/en", - description="Statbel is the Belgian statistical office. It is part of the Federal Public Service Economy, SMEs, Self-employed and Energy.", - countries_of_interest=[country.id], -) - -opendata_catalog_root = URIRef("http://data.gov.be/catalog/statbelopen") - -dataset_node_partition = DynamicPartitionsDefinition(name="dataset_nodes") - - -@send_to_metadata_sensor -@asset(key_prefix=asset_prefix) -def country_metadata() -> CountryMetadata: - """ - Returns the CountryMetadata for this country. - """ - return country - - -@send_to_metadata_sensor -@asset(key_prefix=asset_prefix) -def data_publisher() -> DataPublisher: - """ - Returns the DataPublisher for this country. - """ - return publisher - - -@asset(key_prefix=asset_prefix) -def opendata_dataset_list(context) -> Graph: - """ - Returns a list of all the tables available in the Statbel Open Data portal. - - This document is essential reading for understanding the structure of the data: - https://github.com/belgif/inspire-dcat/blob/main/DCATAPprofil.en.md - """ - # URL of datafile - catalog_url = "https://doc.statbel.be/publications/DCAT/DCAT_opendata_datasets.ttl" - - graph = Graph() - graph.parse(catalog_url, format="ttl") - - dataset_nodes_ids = list( - graph.objects( - subject=opendata_catalog_root, predicate=DCAT.dataset, unique=False - ) - ) - - context.add_output_metadata( - metadata={ - "graph_num_records": len(graph), - "num_datasets": len(dataset_nodes_ids), - "dataset_node_ids": "\n".join(iter([str(n) for n in dataset_nodes_ids])), - } - ) - - return graph - - -@asset(key_prefix=asset_prefix) -def catalog_as_dataframe(context, opendata_dataset_list: Graph) -> pd.DataFrame: - # Create the schema for the catalog - catalog_summary = { - "node": [], - "human_readable_name": [], - "description": [], - "metric_parquet_path": [], - "parquet_column_name": [], - "parquet_margin_of_error_column": [], - "parquet_margin_of_error_file": [], - "potential_denominator_ids": [], - "parent_metric_id": [], - "source_data_release_id": [], - "source_download_url": [], - "source_format": [], - "source_archive_file_path": [], - "source_documentation_url": [], - } - - # Loop over the datasets in the catalogue Graph - catalog_root = URIRef("http://data.gov.be/catalog/statbelopen") - for dataset_id in opendata_dataset_list.objects( - subject=catalog_root, predicate=DCAT.dataset, unique=True - ): - catalog_summary["node"].append(str(dataset_id)) - catalog_summary["human_readable_name"].append( - filter_by_language( - graph=opendata_dataset_list, subject=dataset_id, predicate=DCTERMS.title - ) - ) - catalog_summary["description"].append( - filter_by_language( - opendata_dataset_list, subject=dataset_id, predicate=DCTERMS.description - ) - ) - - # This is unknown at this stage - catalog_summary["metric_parquet_path"].append(None) - catalog_summary["parquet_margin_of_error_column"].append(None) - catalog_summary["parquet_margin_of_error_file"].append(None) - catalog_summary["potential_denominator_ids"].append(None) - catalog_summary["parent_metric_id"].append(None) - catalog_summary["source_data_release_id"].append(None) - catalog_summary["parquet_column_name"].append(None) - - download_url, archive_file_path, format = get_distribution_url( - opendata_dataset_list, dataset_id - ) - catalog_summary["source_download_url"].append(download_url) - catalog_summary["source_archive_file_path"].append(archive_file_path) - catalog_summary["source_format"].append(format) - - catalog_summary["source_documentation_url"].append( - get_landpage_url(opendata_dataset_list, dataset_id, language="en") - ) - - catalog_df = pd.DataFrame(data=catalog_summary, dtype="string") - - # Now create the dynamic partitions for later in the pipeline - # First delete the old dynamic partitions from the previous run - for partition in context.instance.get_dynamic_partitions("dataset_nodes"): - context.instance.delete_dynamic_partition("dataset_nodes", partition) - - # Create a dynamic partition for the datasets listed in the catalogue - filter_list = filter_known_failing_datasets(catalog_summary["node"]) - ignored_datasets = [n for n in catalog_summary["node"] if n not in filter_list] - - context.instance.add_dynamic_partitions( - partitions_def_name="dataset_nodes", partition_keys=filter_list - ) - - # Now add some metadata to the context - context.add_output_metadata( - # Metadata can be any key-value pair - metadata={ - "num_records": len(catalog_df), - "ignored_datasets": "\n".join(ignored_datasets), - "columns": MetadataValue.md( - "\n".join([f"- '`{col}`'" for col in catalog_df.columns.to_list()]) - ), - "columns_types": MetadataValue.md(catalog_df.dtypes.to_markdown()), - "preview": MetadataValue.md(catalog_df.to_markdown()), - } - ) - - return catalog_df - - -def filter_known_failing_datasets(node_list: list[str]) -> list[str]: - failing_cases = { - # sqlite compressed as tar.gz - "https://statbel.fgov.be/node/595", # Census 2011 - Matrix of commutes by statistical sector - # faulty zip file (confirmed by manual download) - "https://statbel.fgov.be/node/2676", - # Excel only (French and Dutch only) - "https://statbel.fgov.be/node/2654", # Geografische indelingen 2020 - "https://statbel.fgov.be/node/3961", # Geografische indelingen 2021 - # AccessDB only! - "https://statbel.fgov.be/node/4135", # Enterprises subject to VAT according to legal form (English only) - "https://statbel.fgov.be/node/4136", # Enterprises subject to VAT according to employer class (English only) - } - - return [n for n in node_list if n not in failing_cases] +## Functions to process catalog def filter_by_language(graph, subject, predicate, language="en") -> str: @@ -385,31 +210,7 @@ def get_distribution_url(graph, subject) -> tuple[str, str, str]: return url_str, path.name, format_str -@asset(partitions_def=dataset_node_partition, key_prefix=asset_prefix) -def individual_census_table( - context, catalog_as_dataframe: pd.DataFrame -) -> pd.DataFrame: - handlers = { - "http://publications.europa.eu/resource/authority/file-type/TXT": download_census_table, - "http://publications.europa.eu/resource/authority/file-type/GEOJSON": download_census_geometry, - "http://publications.europa.eu/resource/authority/file-type/GML": download_census_geometry, - "http://publications.europa.eu/resource/authority/file-type/BIN": download_census_database, - "http://publications.europa.eu/resource/authority/file-type/CSV": no_op_format_handler, - "http://publications.europa.eu/resource/authority/file-type/MDB": no_op_format_handler, - "http://publications.europa.eu/resource/authority/file-type/SHP": no_op_format_handler, - "http://publications.europa.eu/resource/authority/file-type/XLSX": no_op_format_handler, - } - - partition_key = context.asset_partition_key_for_output() - ic(partition_key) - row = ic(catalog_as_dataframe.loc[catalog_as_dataframe.node.isin([partition_key])]) - ic(row) - - format = row["source_format"].iloc[0] - ic(format) - - handler = handlers.get(format, no_op_format_handler) - return handler(context, row=row) +## Functions to download individual census tables def no_op_format_handler(context, **kwargs): @@ -525,10 +326,8 @@ def download_census_table(context, **kwargs) -> pd.DataFrame: def download_census_geometry(context, **kwargs) -> gpd.GeoDataFrame: - # def get_geometries(context: AssetExecutionContext) -> gpd.GeoDataFrame: """ Downloads the Statistical Sector for Belgium and returns a GeoDataFrame. - """ table_details = kwargs["row"] # URL of datafile @@ -547,11 +346,9 @@ def download_census_geometry(context, **kwargs) -> gpd.GeoDataFrame: sectors_gdf.index = sectors_gdf.index.astype(str) # Plot and convert the image to Markdown to preview it within Dagster - # Yes we do pass the `plt` object to the markdown_from_plot function and not the `ax` object - # ax = sectors_gdf.plot(color="green") ax = sectors_gdf.plot(legend=False) ax.set_title(table_details["human_readable_name"].iloc[0]) - md_plot = markdown_from_plot(plt) + md_plot = markdown_from_plot() context.add_output_metadata( metadata={ @@ -589,3 +386,50 @@ def download_file(source_download_url, source_archive_file_path, temp_dir) -> st return extract_main_file_from_zip(temp_file, temp_dir, expected_extension) return str(temp_file.resolve()) + + +DOWNLOAD_HANDLERS = { + "http://publications.europa.eu/resource/authority/file-type/TXT": download_census_table, + "http://publications.europa.eu/resource/authority/file-type/GEOJSON": download_census_geometry, + "http://publications.europa.eu/resource/authority/file-type/GML": download_census_geometry, + "http://publications.europa.eu/resource/authority/file-type/BIN": download_census_database, + "http://publications.europa.eu/resource/authority/file-type/CSV": no_op_format_handler, + "http://publications.europa.eu/resource/authority/file-type/MDB": no_op_format_handler, + "http://publications.europa.eu/resource/authority/file-type/SHP": no_op_format_handler, + "http://publications.europa.eu/resource/authority/file-type/XLSX": no_op_format_handler, +} + + +## Functions to process tables + + +def nationality_to_string(n): + if n == "ETR": + return "non-Belgian" + if n == "BEL": + return "Belgian" + raise ValueError + + +def married_status_to_string(cs): + if cs == 1: + return "single" + if cs == 2: + return "married" + if cs == 3: + return "widowed" + if cs == 4: + return "divorced" + raise ValueError + + +def check_not_str(obj: str | Callable): + if isinstance(obj, str): + err_msg = f"Object is a str ('{obj}'), expected a `Callable`" + raise TypeError(err_msg) + + +def check_str(obj: str | Callable): + if not isinstance(obj, str): + err_msg = f"Object ('{obj}') is not a `str` as expected" + raise TypeError(err_msg) diff --git a/python/popgetter/assets/gb_nir/__init__.py b/python/popgetter/assets/gb_nir/__init__.py index 740538b..ded4a7f 100644 --- a/python/popgetter/assets/gb_nir/__init__.py +++ b/python/popgetter/assets/gb_nir/__init__.py @@ -11,7 +11,6 @@ import aiohttp import geopandas as gpd -import matplotlib.pyplot as plt import pandas as pd import requests from bs4 import BeautifulSoup @@ -442,7 +441,7 @@ def _geometry(self, context) -> list[GeometryOutput]: ) ax = first_joined_gdf.plot(column="eng", legend=False) ax.set_title(f"NI 2021 {first_geometry.metadata.level}") - md_plot = markdown_from_plot(plt) + md_plot = markdown_from_plot() context.add_output_metadata( metadata={ "all_geom_levels": MetadataValue.md( diff --git a/python/popgetter/assets/uk/uk_os_opendata.py b/python/popgetter/assets/uk/uk_os_opendata.py index 47aff64..ed3bf09 100644 --- a/python/popgetter/assets/uk/uk_os_opendata.py +++ b/python/popgetter/assets/uk/uk_os_opendata.py @@ -4,7 +4,6 @@ import fiona import geopandas as gpd -import matplotlib.pyplot as plt import pandas as pd import requests from dagster import ( @@ -252,10 +251,9 @@ def get_layer_from_gpkg(context, gpkg_path, layer_name): # Only plot the layer if it's small enough if len(lyr_gdf) < 10000: # Plot and convert the image to Markdown to preview it within Dagster - # Yes we do pass the `plt` object to the markdown_from_plot function and not the `ax` object ax = lyr_gdf.plot(legend=False) ax.set_title(layer_name) - md_plot = markdown_from_plot(plt) + md_plot = markdown_from_plot() # Force the type to be MarkdownMetadataValue # else sometimes Dagster will try to interpret it as a tuple for some unknown reason mdv: MarkdownMetadataValue = MetadataValue.md(md_plot) diff --git a/python/popgetter/utils.py b/python/popgetter/utils.py index 48070cc..7cddc27 100644 --- a/python/popgetter/utils.py +++ b/python/popgetter/utils.py @@ -12,6 +12,7 @@ import fsspec import geopandas as gpd +import matplotlib.pyplot as plt import pandas as pd import requests from dagster import ( @@ -37,12 +38,11 @@ class StagingDirResource(ConfigurableResource): staging_dir: str | None -def markdown_from_plot(plot) -> str: - plot.tight_layout() - +def markdown_from_plot() -> str: + plt.tight_layout() # Convert the image to a saveable format buffer = BytesIO() - plot.savefig(buffer, format="png") + plt.savefig(buffer, format="png") image_data = base64.b64encode(buffer.getvalue()) # Convert the image to Markdown to preview it within Dagster diff --git a/tests/test_bel.py b/tests/test_bel.py index b08a6da..2ee0be0 100644 --- a/tests/test_bel.py +++ b/tests/test_bel.py @@ -3,16 +3,8 @@ from pathlib import Path import geopandas as gpd -import pandas as pd import pytest -from dagster import ( - build_asset_context, -) -from icecream import ic from rdflib import Graph -from rdflib.namespace import DCAT - -from popgetter.assets import bel @pytest.fixture(scope="module") @@ -33,203 +25,205 @@ def demo_catalog() -> Graph: return graph -@pytest.fixture(scope="module") -def demo_catalog_df(demo_catalog) -> pd.DataFrame: - context = build_asset_context() - return bel.census_tables.catalog_as_dataframe(context, demo_catalog) - - -@pytest.mark.skip( - reason="Need to re-implement aggregate_sectors_to_municipalities to work with the sectors coming from the partitioned asset." -) -def test_aggregate_sectors_to_municipalities(demo_sectors): - # Test the that the row count is correctly added to the metadata - context = build_asset_context() - - actual_municipalities = bel.census_geometry.aggregate_sectors_to_municipalities( - context, demo_sectors - ) - - expected_sector_row_count = 7 - expected_municipalities_row_count = 3 +# # TODO: consider revising tests to incorporate changes following change to base class implementation. +# # See issue: https://github.com/Urban-Analytics-Technology-Platform/popgetter/issues/133 +# @pytest.fixture(scope="module") +# def demo_catalog_df(demo_catalog) -> pd.DataFrame: +# context = build_asset_context() +# return bel.census_tables.catalog_as_dataframe(context, demo_catalog) - assert len(demo_sectors) == expected_sector_row_count - assert len(actual_municipalities) == expected_municipalities_row_count - metadata = context.get_output_metadata(output_name="result") - assert metadata["num_records"] == expected_municipalities_row_count +# @pytest.mark.skip( +# reason="Need to re-implement aggregate_sectors_to_municipalities to work with the sectors coming from the partitioned asset." +# ) +# def test_aggregate_sectors_to_municipalities(demo_sectors): +# # Test the that the row count is correctly added to the metadata +# context = build_asset_context() -@pytest.mark.skip(reason="Fix test_get_population_details_per_municipality first") -def test_get_population_details_per_municipality(): - with build_asset_context() as muni_context: - stat_muni = bel.census_tables.get_population_details_per_municipality( - muni_context - ) +# actual_municipalities = bel.census_geometry.aggregate_sectors_to_municipalities( +# context, demo_sectors +# ) - ic(len(stat_muni)) - ic(stat_muni.columns) +# expected_sector_row_count = 7 +# expected_municipalities_row_count = 3 + +# assert len(demo_sectors) == expected_sector_row_count +# assert len(actual_municipalities) == expected_municipalities_row_count +# metadata = context.get_output_metadata(output_name="result") +# assert metadata["num_records"] == expected_municipalities_row_count + + +# @pytest.mark.skip(reason="Fix test_get_population_details_per_municipality first") +# def test_get_population_details_per_municipality(): +# with build_asset_context() as muni_context: +# stat_muni = bel.census_tables.get_population_details_per_municipality( +# muni_context +# ) + +# ic(len(stat_muni)) +# ic(stat_muni.columns) - assert len(stat_muni) > 0 - assert len(stat_muni.columns) > 0 +# assert len(stat_muni) > 0 +# assert len(stat_muni.columns) > 0 - pytest.fail("Not complete") +# pytest.fail("Not complete") -@pytest.mark.skip(reason="Fix test_get_population_details_per_municipality first") -def test_pivot_population(): - # Test the that the row count is correctly added to the metadata - # muni_context = build_asset_context() +# @pytest.mark.skip(reason="Fix test_get_population_details_per_municipality first") +# def test_pivot_population(): +# # Test the that the row count is correctly added to the metadata +# # muni_context = build_asset_context() - with build_asset_context() as muni_context: - # Check that the metadata is empty initially - assert (muni_context.get_output_metadata(output_name="result") is None) | ( - muni_context.get_output_metadata(output_name="result") == {} - ) +# with build_asset_context() as muni_context: +# # Check that the metadata is empty initially +# assert (muni_context.get_output_metadata(output_name="result") is None) | ( +# muni_context.get_output_metadata(output_name="result") == {} +# ) - # Get the geometries - stat_muni = bel.census_tables.get_population_details_per_municipality( - muni_context - ) +# # Get the geometries +# stat_muni = bel.census_tables.get_population_details_per_municipality( +# muni_context +# ) - assert len(stat_muni) > 0 - ic(len(stat_muni)) - ic(stat_muni.head()) +# assert len(stat_muni) > 0 +# ic(len(stat_muni)) +# ic(stat_muni.head()) - # pivot_context = build_asset_context() - - with build_asset_context() as pivot_context: - # Pivot the population - pivoted = bel.pivot_population(pivot_context, stat_muni) - - expected_number_of_municipalities = 581 - - # Now check that the metadata has been updated - metadata = pivot_context.get_output_metadata(output_name="result") - assert len(pivoted) == expected_number_of_municipalities - assert metadata["num_records"] == expected_number_of_municipalities - - -def test_demo_catalog(demo_catalog): - # There are 10 datasets in the demo catalogue - expected_length = 10 - actual_length = len( - list( - demo_catalog.objects( - subject=bel.census_tables.opendata_catalog_root, - predicate=DCAT.dataset, - unique=False, - ) - ) - ) +# # pivot_context = build_asset_context() - assert actual_length == expected_length - - -def test_catalog_metadata_details(demo_catalog_df): - # Get the metadata for a specific dataset in the demo catalogue: - # https://statbel.fgov.be/node/4151 "Population by Statistical sector" - # mmd = bel.census_tables.get_mmd_from_dataset_node( - # demo_catalog, dataset_node=URIRef("https://statbel.fgov.be/node/4151") - # ) - - row = demo_catalog_df[ - demo_catalog_df["node"].eq("https://statbel.fgov.be/node/4151") - ].to_dict(orient="records")[0] - - # Check that the right distribution_url has been selected - # - # This dataset has two distributions: - # (xlsx): , - # (txt/zip): ; - # - # We expect the txt/zip version to be selected. - expected_distribution_url = "https://statbel.fgov.be/sites/default/files/files/opendata/bevolking/sectoren/OPENDATA_SECTOREN_2022.zip#distribution4151" - wrong_distribution_url = "https://statbel.fgov.be/sites/default/files/files/opendata/bevolking/sectoren/OPENDATA_SECTOREN_2022.xlsx#distribution4151" - - assert row["source_download_url"] == expected_distribution_url - assert row["source_download_url"] != wrong_distribution_url - - # We expect the title to be in English (not any of the other available languages) - title_english = "Population by Statistical sector" - title_german = "Bevölkerung nach statistischen Sektoren" - title_french = "Population par secteur statistique" - title_dutch = "Bevolking per statistische sector" - - assert row["human_readable_name"] == title_english - assert row["human_readable_name"] != title_german - assert row["human_readable_name"] != title_french - assert row["human_readable_name"] != title_dutch - - -@pytest.mark.skip(reason="Test not implemented") -def test_filter_by_language(): - # Test case - # This dataset is only available in Dutch and French - # https://statbel.fgov.be/node/2654 - pytest.fail("Not implemented") - - -def test_catalog_as_dataframe(demo_catalog_df): - # Check that the catalog has been converted to a DataFrame - assert isinstance(demo_catalog_df, pd.DataFrame) - - # Check that the DataFrame has the expected number of rows - expected_number_of_datasets = 10 - assert len(demo_catalog_df) == expected_number_of_datasets - - # # Convert the demo catalog to a DataFrame - # with build_asset_context() as context: - # catalog_df = bel.census_tables.catalog_as_dataframe(context, demo_catalog_df) - - # # Check that the catalog has been converted to a DataFrame - # assert isinstance(catalog_df, pd.DataFrame) - - # # Check that the DataFrame has the expected number of rows - # expected_number_of_datasets = 10 - # assert len(catalog_df) == expected_number_of_datasets - - # # Also check that the metadata has been updated - # metadata = context.get_output_metadata(output_name="result") - # assert metadata["num_records"] == expected_number_of_datasets - - -@pytest.mark.skip(reason="Test not implemented") -def test_purepath_suffix(): - # examples - # cases = [ - # ( - # "https://statbel.fgov.be/sites/default/files/files/opendata/bevolking/sectoren/OPENDATA_SECTOREN_2022.zip#distribution4151", - # "zip", - # ), - # ( - # "https://statbel.fgov.be/sites/default/files/files/opendata/bevolking/sectoren/OPENDATA_SECTOREN_2022.xlsx#distribution4151", - # "xlsx", - # ), - # ( - # "https://statbel.fgov.be/sites/default/files/files/opendata/bevolking/sectoren/OPENDATA_SECTOREN_2022.txt#distribution4151", - # "txt", - # ), - # ] - pytest.fail("Not implemented") - - -def test_filter_known_failing_datasets(): - mock_catalog = [ - "https://statbel.fgov.be/node/4796", # from census_derived._needed_dataset_nodes - "https://statbel.fgov.be/node/3961", # Known failing dataset - "https://statbel.fgov.be/node/595", # Known failing dataset - "https://statbel.fgov.be/en/node", # Incomplete URL - "2676", # Known failing dataset node number - ] - - expected_list = [ - "https://statbel.fgov.be/node/4796", - "https://statbel.fgov.be/en/node", - "2676", - ] - - actual_list = bel.census_tables.filter_known_failing_datasets(mock_catalog) - - assert mock_catalog != expected_list - assert actual_list != mock_catalog - assert actual_list == expected_list +# with build_asset_context() as pivot_context: +# # Pivot the population +# pivoted = bel.pivot_population(pivot_context, stat_muni) + +# expected_number_of_municipalities = 581 + +# # Now check that the metadata has been updated +# metadata = pivot_context.get_output_metadata(output_name="result") +# assert len(pivoted) == expected_number_of_municipalities +# assert metadata["num_records"] == expected_number_of_municipalities + + +# def test_demo_catalog(demo_catalog): +# # There are 10 datasets in the demo catalogue +# expected_length = 10 +# actual_length = len( +# list( +# demo_catalog.objects( +# subject=bel.census_tables.opendata_catalog_root, +# predicate=DCAT.dataset, +# unique=False, +# ) +# ) +# ) + +# assert actual_length == expected_length + + +# def test_catalog_metadata_details(demo_catalog_df): +# # Get the metadata for a specific dataset in the demo catalogue: +# # https://statbel.fgov.be/node/4151 "Population by Statistical sector" +# # mmd = bel.census_tables.get_mmd_from_dataset_node( +# # demo_catalog, dataset_node=URIRef("https://statbel.fgov.be/node/4151") +# # ) + +# row = demo_catalog_df[ +# demo_catalog_df["node"].eq("https://statbel.fgov.be/node/4151") +# ].to_dict(orient="records")[0] + +# # Check that the right distribution_url has been selected +# # +# # This dataset has two distributions: +# # (xlsx): , +# # (txt/zip): ; +# # +# # We expect the txt/zip version to be selected. +# expected_distribution_url = "https://statbel.fgov.be/sites/default/files/files/opendata/bevolking/sectoren/OPENDATA_SECTOREN_2022.zip#distribution4151" +# wrong_distribution_url = "https://statbel.fgov.be/sites/default/files/files/opendata/bevolking/sectoren/OPENDATA_SECTOREN_2022.xlsx#distribution4151" + +# assert row["source_download_url"] == expected_distribution_url +# assert row["source_download_url"] != wrong_distribution_url + +# # We expect the title to be in English (not any of the other available languages) +# title_english = "Population by Statistical sector" +# title_german = "Bevölkerung nach statistischen Sektoren" +# title_french = "Population par secteur statistique" +# title_dutch = "Bevolking per statistische sector" + +# assert row["human_readable_name"] == title_english +# assert row["human_readable_name"] != title_german +# assert row["human_readable_name"] != title_french +# assert row["human_readable_name"] != title_dutch + + +# @pytest.mark.skip(reason="Test not implemented") +# def test_filter_by_language(): +# # Test case +# # This dataset is only available in Dutch and French +# # https://statbel.fgov.be/node/2654 +# pytest.fail("Not implemented") + + +# def test_catalog_as_dataframe(demo_catalog_df): +# # Check that the catalog has been converted to a DataFrame +# assert isinstance(demo_catalog_df, pd.DataFrame) + +# # Check that the DataFrame has the expected number of rows +# expected_number_of_datasets = 10 +# assert len(demo_catalog_df) == expected_number_of_datasets + +# # # Convert the demo catalog to a DataFrame +# # with build_asset_context() as context: +# # catalog_df = bel.census_tables.catalog_as_dataframe(context, demo_catalog_df) + +# # # Check that the catalog has been converted to a DataFrame +# # assert isinstance(catalog_df, pd.DataFrame) + +# # # Check that the DataFrame has the expected number of rows +# # expected_number_of_datasets = 10 +# # assert len(catalog_df) == expected_number_of_datasets + +# # # Also check that the metadata has been updated +# # metadata = context.get_output_metadata(output_name="result") +# # assert metadata["num_records"] == expected_number_of_datasets + + +# @pytest.mark.skip(reason="Test not implemented") +# def test_purepath_suffix(): +# # examples +# # cases = [ +# # ( +# # "https://statbel.fgov.be/sites/default/files/files/opendata/bevolking/sectoren/OPENDATA_SECTOREN_2022.zip#distribution4151", +# # "zip", +# # ), +# # ( +# # "https://statbel.fgov.be/sites/default/files/files/opendata/bevolking/sectoren/OPENDATA_SECTOREN_2022.xlsx#distribution4151", +# # "xlsx", +# # ), +# # ( +# # "https://statbel.fgov.be/sites/default/files/files/opendata/bevolking/sectoren/OPENDATA_SECTOREN_2022.txt#distribution4151", +# # "txt", +# # ), +# # ] +# pytest.fail("Not implemented") + + +# def test_filter_known_failing_datasets(): +# mock_catalog = [ +# "https://statbel.fgov.be/node/4796", # from census_derived._needed_dataset_nodes +# "https://statbel.fgov.be/node/3961", # Known failing dataset +# "https://statbel.fgov.be/node/595", # Known failing dataset +# "https://statbel.fgov.be/en/node", # Incomplete URL +# "2676", # Known failing dataset node number +# ] + +# expected_list = [ +# "https://statbel.fgov.be/node/4796", +# "https://statbel.fgov.be/en/node", +# "2676", +# ] + +# actual_list = bel.census_tables.filter_known_failing_datasets(mock_catalog) + +# assert mock_catalog != expected_list +# assert actual_list != mock_catalog +# assert actual_list == expected_list