Skip to content

Commit

Permalink
Historisation du nombre d’entrées des datasets consolidés (IRVE, BNLC…
Browse files Browse the repository at this point in the history
…, Parc Relais et ZFE) et refactorisation des datasets consolidés (#3969)

* ugly implementation

* More generic function

* Refactor tests with unique factory to insert IRVE, BNLC etc datasets

* Refactor tests with unique factory to insert IRVE, BNLC etc datasets

* Move IRVE dataset and resource references to new module

* Use new factories in explore controller

* Moves test to new file

* Migrate to new module

* Move to Transport.ConsolidatedDataset

* use central config for IRVE consolidated dataset

* Move test

* Save all stats

* common functions to find consolidated datasets

* Refactor finding consolidated resources

* Apply suggestions from code review

Co-authored-by: Antoine Augusti <[email protected]>

* Use PAN organization_id

* refactor ConsolidatedDataset dataset access

* refactor resource access

* Improve geoquery controller

* improve explore controller

* mandatorily have consolidated datasets for tests

* Setup AOMs for geo_data datasets

* Apply suggestions from code review

Co-authored-by: Antoine Augusti <[email protected]>

* use parkings_relais in js instead of parkings-relais

---------

Co-authored-by: Antoine Augusti <[email protected]>
Co-authored-by: Thibaut Barrère <[email protected]>
  • Loading branch information
3 people authored Jul 17, 2024
1 parent c373e38 commit f603f2a
Show file tree
Hide file tree
Showing 28 changed files with 284 additions and 221 deletions.
2 changes: 1 addition & 1 deletion apps/transport/client/javascripts/explore.js
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ document.getElementById('bnlc-check').addEventListener('change', (event) => {
document.getElementById('parkings_relais-check').addEventListener('change', (event) => {
if (event.currentTarget.checked) {
trackEvent('parkings-relais')
fetch('/api/geo-query?data=parkings-relais')
fetch('/api/geo-query?data=parkings_relais')
.then(data => updateParkingsRelaisLayer(data.json()))
} else {
updateParkingsRelaisLayer(null)
Expand Down
4 changes: 2 additions & 2 deletions apps/transport/lib/db/dataset.ex
Original file line number Diff line number Diff line change
Expand Up @@ -451,12 +451,12 @@ defmodule DB.Dataset do
end

def order_datasets(datasets, _params) do
pan_publisher = Application.fetch_env!(:transport, :datagouvfr_transport_publisher_label)
pan_publisher = Application.fetch_env!(:transport, :datagouvfr_transport_publisher_id)

order_by(datasets,
desc:
fragment(
"case when organization = ? and custom_title ilike 'base nationale%' then 1 else 0 end",
"case when organization_id = ? and custom_title ilike 'base nationale%' then 1 else 0 end",
^pan_publisher
),
# Gotcha, population can be null for datasets covering France/Europe
Expand Down
7 changes: 7 additions & 0 deletions apps/transport/lib/db/geo_data/geo_data.ex
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,11 @@ defmodule DB.GeoData do

query |> DB.Repo.one()
end

def count_lines_for_geo_data_import(nil), do: 0

@spec count_lines_for_geo_data_import(DB.GeoDataImport.t()) :: integer()
def count_lines_for_geo_data_import(geo_data_import) do
from(g in DB.GeoData, where: g.geo_data_import_id == ^geo_data_import.id, select: count(g.id)) |> DB.Repo.one!()
end
end
4 changes: 2 additions & 2 deletions apps/transport/lib/jobs/consolidate_lez_job.ex
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ defmodule Transport.Jobs.ConsolidateLEZsJob do
|> join(:inner, [r], d in Dataset,
on:
r.dataset_id == d.id and d.type == @lez_dataset_type and
d.organization != ^own_publisher
d.organization_id != ^own_publisher
)
|> where([r], r.schema_name == @schema_name)
|> preload(dataset: [:aom])
Expand Down Expand Up @@ -191,7 +191,7 @@ defmodule Transport.Jobs.ConsolidateLEZsJob do
end

def pan_publisher do
Application.fetch_env!(:transport, :datagouvfr_transport_publisher_label)
Application.fetch_env!(:transport, :datagouvfr_transport_publisher_id)
end

def resource_id_for_type(type) do
Expand Down
22 changes: 2 additions & 20 deletions apps/transport/lib/jobs/geo_data/bnlc_to_geo_data.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,29 +4,16 @@ defmodule Transport.Jobs.BNLCToGeoData do
in the geo_data table
"""
use Oban.Worker, max_attempts: 3
import Ecto.Query
require Logger

@impl Oban.Worker
def perform(%{}) do
[%DB.Resource{} = resource] =
relevant_dataset()
|> Map.fetch!(:resources)
|> Enum.filter(fn %DB.Resource{datagouv_id: datagouv_id} -> datagouv_id == bnlc_datagouv_id() end)
Transport.ConsolidatedDataset.resource(:bnlc)
|> Transport.Jobs.BaseGeoData.import_replace_data(&prepare_data_for_insert/2)

Transport.Jobs.BaseGeoData.import_replace_data(resource, &prepare_data_for_insert/2)
:ok
end

def relevant_dataset do
transport_publisher_label = Application.fetch_env!(:transport, :datagouvfr_transport_publisher_label)

DB.Dataset.base_query()
|> preload(:resources)
|> where([d], d.type == "carpooling-areas" and d.organization == ^transport_publisher_label)
|> DB.Repo.one!()
end

def prepare_data_for_insert(body, geo_data_import_id) do
prepare_data_fn = fn m ->
%{
Expand All @@ -43,9 +30,4 @@ defmodule Transport.Jobs.BNLCToGeoData do

Transport.Jobs.BaseGeoData.prepare_csv_data_for_import(body, prepare_data_fn)
end

defp bnlc_datagouv_id do
%{resource_id: resource_id} = Map.fetch!(Application.fetch_env!(:transport, :consolidation), :bnlc)
resource_id
end
end
20 changes: 2 additions & 18 deletions apps/transport/lib/jobs/geo_data/irve_to_geo_data.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,12 @@ defmodule Transport.Jobs.IRVEToGeoData do
Job in charge of taking the charge stations stored in the Base nationale des Infrastructures de Recharge pour Véhicules Électriques and storing the result in the `geo_data` table.
"""
use Oban.Worker, max_attempts: 3
import Ecto.Query
require Logger

@datagouv_organization_id "646b7187b50b2a93b1ae3d45"
# https://www.data.gouv.fr/fr/datasets/fichier-consolide-des-bornes-de-recharge-pour-vehicules-electriques/#/resources/eb76d20a-8501-400e-b336-d85724de5435
@resource_datagouv_id "eb76d20a-8501-400e-b336-d85724de5435"

@impl Oban.Worker
def perform(%Oban.Job{}) do
[resource] =
relevant_dataset()
|> DB.Dataset.official_resources()
|> Enum.filter(&match?(%DB.Resource{datagouv_id: @resource_datagouv_id, format: "csv"}, &1))

Transport.Jobs.BaseGeoData.import_replace_data(resource, &prepare_data_for_insert/2)
Transport.ConsolidatedDataset.resource(:irve)
|> Transport.Jobs.BaseGeoData.import_replace_data(&prepare_data_for_insert/2)

:ok
end
Expand All @@ -38,11 +29,4 @@ defmodule Transport.Jobs.IRVEToGeoData do

Transport.Jobs.BaseGeoData.prepare_csv_data_for_import(body, prepare_data_fn)
end

def relevant_dataset do
DB.Dataset.base_query()
|> preload(:resources)
|> where([d], d.type == "charging-stations" and d.organization_id == @datagouv_organization_id)
|> DB.Repo.one!()
end
end
15 changes: 2 additions & 13 deletions apps/transport/lib/jobs/geo_data/lez_to_geo_data.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,26 +4,15 @@ defmodule Transport.Jobs.LowEmissionZonesToGeoData do
and storing the result in the `geo_data` table.
"""
use Oban.Worker, max_attempts: 3
import Ecto.Query

@impl Oban.Worker
def perform(%{}) do
[resource] = relevant_dataset() |> DB.Dataset.official_resources() |> Enum.filter(&(&1.title == "aires.geojson"))

Transport.Jobs.BaseGeoData.import_replace_data(resource, &prepare_data_for_insert/2)
Transport.ConsolidatedDataset.resource(:zfe)
|> Transport.Jobs.BaseGeoData.import_replace_data(&prepare_data_for_insert/2)

:ok
end

def relevant_dataset do
transport_publisher_label = Application.fetch_env!(:transport, :datagouvfr_transport_publisher_label)

DB.Dataset.base_query()
|> preload(:resources)
|> where([d], d.type == "low-emission-zones" and d.organization == ^transport_publisher_label)
|> DB.Repo.one!()
end

def prepare_data_for_insert(body, geo_data_import_id) do
body
|> Jason.decode!()
Expand Down
19 changes: 2 additions & 17 deletions apps/transport/lib/jobs/geo_data/parkings_relais_to_geo_data.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,30 +3,15 @@ defmodule Transport.Jobs.ParkingsRelaisToGeoData do
Job in charge of taking the parking relais stored in the Base nationale des parcs relais and storing the result in the `geo_data` table.
"""
use Oban.Worker, max_attempts: 3
import Ecto.Query

@impl Oban.Worker
def perform(%{}) do
[resource] = relevant_dataset() |> DB.Dataset.official_resources() |> Enum.filter(&(&1.format == "csv"))

Transport.Jobs.BaseGeoData.import_replace_data(resource, &prepare_data_for_insert/2)
Transport.ConsolidatedDataset.resource(:parkings_relais)
|> Transport.Jobs.BaseGeoData.import_replace_data(&prepare_data_for_insert/2)

:ok
end

def relevant_dataset do
transport_publisher_label = Application.fetch_env!(:transport, :datagouvfr_transport_publisher_label)

DB.Dataset.base_query()
|> preload(:resources)
|> where(
[d],
d.type == "private-parking" and d.organization == ^transport_publisher_label and
d.custom_title == "Base nationale des parcs relais"
)
|> DB.Repo.one!()
end

defp pr_count(""), do: 0
defp pr_count(str), do: String.to_integer(str)

Expand Down
85 changes: 85 additions & 0 deletions apps/transport/lib/transport/consolidated_dataset.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
defmodule Transport.ConsolidatedDataset do
@moduledoc """
A module to hold configuration of the datasets used in geodata queries and access them easily.
"""

import Ecto.Query

@config %{
irve: %{dataset_type: "charging-stations", publisher: :datagouvfr},
bnlc: %{dataset_type: "carpooling-areas", publisher: :transport},
parkings_relais: %{dataset_type: "private-parking", publisher: :transport},
zfe: %{dataset_type: "low-emission-zones", publisher: :transport}
}

@available_datasets @config |> Map.keys()

def geo_data_datasets do
# For now, every consolidated dataset here has geographical features imported in the geo_data table
@available_datasets
end

def dataset(name) when name in @available_datasets do
publisher_id = @config |> get_in([name, :publisher]) |> publisher_id()
dataset_type = @config |> get_in([name, :dataset_type])

DB.Dataset.base_query()
|> preload(:resources)
|> where([d], d.type == ^dataset_type and d.organization_id == ^publisher_id)
|> additional_ecto_query(name)
|> DB.Repo.one!()
end

def resource(name) do
[resource] =
dataset(name)
|> DB.Dataset.official_resources()
|> filter_official_resources(name)

resource
end

# This filter has been moved from previous code but is fragile
defp additional_ecto_query(query, :parkings_relais) do
query |> where([d], d.custom_title == "Base nationale des parcs relais")
end

defp additional_ecto_query(q, _), do: q

# Following filters have been moved from previous code but are fragile
# There should be a better and more unified way to be sure we find the right official resource
defp filter_official_resources(resources, :bnlc) do
Enum.filter(resources, fn %DB.Resource{datagouv_id: datagouv_id} -> datagouv_id == bnlc_resource_id() end)
end

defp filter_official_resources(resources, :irve) do
irve_resource_id = irve_resource_id()
Enum.filter(resources, &match?(%DB.Resource{datagouv_id: ^irve_resource_id, format: "csv"}, &1))
end

defp filter_official_resources(resources, :parkings_relais) do
Enum.filter(resources, &(&1.format == "csv"))
end

defp filter_official_resources(resources, :zfe) do
Enum.filter(resources, &(&1.title == "aires.geojson"))
end

defp publisher_id(:datagouvfr) do
Application.fetch_env!(:transport, :datagouvfr_publisher_id)
end

defp publisher_id(:transport) do
Application.fetch_env!(:transport, :datagouvfr_transport_publisher_id)
end

defp bnlc_resource_id do
%{resource_id: bnlc_resource_id} = Map.fetch!(Application.fetch_env!(:transport, :consolidation), :bnlc)
bnlc_resource_id
end

defp irve_resource_id do
%{resource_id: irve_resource_id} = Map.fetch!(Application.fetch_env!(:transport, :consolidation), :irve)
irve_resource_id
end
end
16 changes: 14 additions & 2 deletions apps/transport/lib/transport/stats_handler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ defmodule Transport.StatsHandler do
end

defp store_stat_history(key, values, %DateTime{} = timestamp)
when key in [:gtfs_rt_types, :climate_resilience_bill_count] do
when key in [:gtfs_rt_types, :climate_resilience_bill_count, :count_geo_data_lines] do
Enum.map(values, fn {type, count} ->
store_stat_history("#{key}::#{type}", count, timestamp)
end)
Expand Down Expand Up @@ -87,7 +87,8 @@ defmodule Transport.StatsHandler do
gtfs_rt_types: count_feed_types_gtfs_rt(),
climate_resilience_bill_count: count_datasets_climate_resilience_bill(),
nb_siri: count_dataset_with_format("SIRI"),
nb_siri_lite: count_dataset_with_format("SIRI Lite")
nb_siri_lite: count_dataset_with_format("SIRI Lite"),
count_geo_data_lines: count_geo_data_lines()
}
end

Expand Down Expand Up @@ -237,4 +238,15 @@ defmodule Transport.StatsHandler do

sum / nb_aom_with_data
end

def count_geo_data_lines do
Transport.ConsolidatedDataset.geo_data_datasets()
|> Map.new(fn feature -> {feature, count_geo_data_lines(feature)} end)
end

def count_geo_data_lines(feature) do
Transport.ConsolidatedDataset.dataset(feature).id
|> DB.GeoDataImport.dataset_latest_geo_data_import()
|> DB.GeoData.count_lines_for_geo_data_import()
end
end
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,16 @@ defmodule TransportWeb.API.GeoQueryController do
import Ecto.Query

def index(%Plug.Conn{} = conn, %{"data" => slug}) do
if Map.has_key?(config(), slug) do
%{dataset: %DB.Dataset{} = dataset, transform_fn: transform_fn} = Map.fetch!(config(), slug)
feature_atom = slug |> String.to_atom()

if feature_atom in Transport.ConsolidatedDataset.geo_data_datasets() do
dataset = Transport.ConsolidatedDataset.dataset(feature_atom)

get_geojson = fn ->
dataset
|> Map.fetch!(:id)
|> DB.GeoDataImport.dataset_latest_geo_data_import()
|> transform_fn.()
|> transform_geojson(feature_atom)
end

geojson = Transport.Cache.fetch("#{slug}_data", get_geojson, :timer.hours(1))
Expand All @@ -24,30 +26,12 @@ defmodule TransportWeb.API.GeoQueryController do

def render_404(%Plug.Conn{} = conn), do: conn |> put_status(404) |> json(%{"message" => "Not found"})

defp config do
%{
"bnlc" => %{dataset: Transport.Jobs.BNLCToGeoData.relevant_dataset(), transform_fn: &bnlc_geojson/1},
"parkings-relais" => %{
dataset: Transport.Jobs.ParkingsRelaisToGeoData.relevant_dataset(),
transform_fn: &parkings_relais_geojson/1
},
"zfe" => %{
dataset: Transport.Jobs.LowEmissionZonesToGeoData.relevant_dataset(),
transform_fn: &zfe_geojson/1
},
"irve" => %{
dataset: Transport.Jobs.IRVEToGeoData.relevant_dataset(),
transform_fn: &irve_geojson/1
}
}
end

def bnlc_geojson(%DB.GeoDataImport{} = geo_data_import) do
def transform_geojson(%DB.GeoDataImport{} = geo_data_import, :bnlc) do
add_fields = fn query -> from(g in query, select_merge: %{nom_lieu: fragment("payload->>'nom_lieu'")}) end
DB.GeoData.geo_data_as_geojson(geo_data_import, add_fields)
end

def parkings_relais_geojson(%DB.GeoDataImport{} = geo_data_import) do
def transform_geojson(%DB.GeoDataImport{} = geo_data_import, :parkings_relais) do
add_fields = fn query ->
from(g in query,
select_merge: %{nom: fragment("payload->>'nom'"), nb_pr: fragment("(payload->>'nb_pr')::int")}
Expand All @@ -57,13 +41,13 @@ defmodule TransportWeb.API.GeoQueryController do
DB.GeoData.geo_data_as_geojson(geo_data_import, add_fields)
end

def zfe_geojson(%DB.GeoDataImport{} = geo_data_import) do
def transform_geojson(%DB.GeoDataImport{} = geo_data_import, :zfe) do
add_fields = fn query -> query end

DB.GeoData.geo_data_as_geojson(geo_data_import, add_fields)
end

def irve_geojson(%DB.GeoDataImport{} = geo_data_import) do
def transform_geojson(%DB.GeoDataImport{} = geo_data_import, :irve) do
add_fields = fn query ->
from(g in query,
select_merge: %{
Expand Down
Loading

0 comments on commit f603f2a

Please sign in to comment.