Skip to content

Commit

Permalink
Ajout de la notion de SIREN (#1105)
Browse files Browse the repository at this point in the history
  • Loading branch information
kolok authored Dec 5, 2024
1 parent 108f0b7 commit 7ade97e
Show file tree
Hide file tree
Showing 28 changed files with 266 additions and 132 deletions.
1 change: 1 addition & 0 deletions dags/create_final_actors.py
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,7 @@ def write_data_to_postgres(**kwargs):
"telephone",
"nom_commercial",
"nom_officiel",
"siren",
"siret",
"identifiant_externe",
"acteur_type_id",
Expand Down
12 changes: 10 additions & 2 deletions dags/sources/config/airflow_params.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,11 @@
from pathlib import Path

import requests
from sources.tasks.transform.transform_column import convert_opening_hours
from sources.tasks.transform.transform_column import (
clean_siren,
clean_siret,
convert_opening_hours,
)

PATH_NOMENCLARURE_DECHET = (
"https://data.ademe.fr/data-fair/api/v1/datasets/sinoe-r-nomenclature-dechets/lines"
Expand All @@ -13,7 +17,11 @@
KEY_LIBELLE_DECHET_ALT = "LST_TYP_DECHET"


TRANSFORMATION_MAPPING = {"convert_opening_hours": convert_opening_hours}
TRANSFORMATION_MAPPING = {
"convert_opening_hours": convert_opening_hours,
"clean_siren": clean_siren,
"clean_siret": clean_siret,
}


# TODO: dataclass à implémenter pour la validation des paramètres des DAGs
Expand Down
16 changes: 7 additions & 9 deletions dags/sources/dags/source_aliapur.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,20 +11,18 @@
" for Aliapur dataset"
),
params={
"column_transformations": [
{
"origin": "siret",
"transformation": "clean_siret",
"destination": "siret",
},
],
"column_mapping": {
"id_point_apport_ou_reparation": "identifiant_externe",
"type_de_point_de_collecte": "acteur_type_id",
"exclusivite_de_reprisereparation": "exclusivite_de_reprisereparation",
"uniquement_sur_rdv": "uniquement_sur_rdv",
"public_accueilli": "public_accueilli",
"reprise": "reprise",
"produitsdechets_acceptes": "produitsdechets_acceptes",
"labels_etou_bonus": "labels_etou_bonus",
"point_de_reparation": "point_de_reparation",
"ecoorganisme": "source_id",
"adresse_format_ban": "adresse_format_ban",
"nom_de_lorganisme": "nom",
"perimetre_dintervention": "perimetre_dintervention",
"longitudewgs84": "longitude",
"latitudewgs84": "latitude",
},
Expand Down
7 changes: 7 additions & 0 deletions dags/sources/dags/source_citeo.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,13 @@
" sur de Koumoul"
),
params={
"column_transformations": [
{
"origin": "siren",
"transformation": "clean_siren",
"destination": "siren",
},
],
"column_mapping": {
"id_point_apport_ou_reparation": "identifiant_externe",
"type_de_point_de_collecte": "acteur_type_id",
Expand Down
7 changes: 7 additions & 0 deletions dags/sources/dags/source_cma.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,13 @@
" for CMA reparacteur dataset"
),
params={
"column_transformations": [
{
"origin": "siret",
"transformation": "clean_siret",
"destination": "siret",
},
],
"column_mapping": {
"name": "nom",
"reparactor_description": "description",
Expand Down
17 changes: 7 additions & 10 deletions dags/sources/dags/source_corepile.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,22 +11,19 @@
" for Corepile dataset"
),
params={
"column_transformations": [
{
"origin": "siret",
"transformation": "clean_siret",
"destination": "siret",
},
],
"column_mapping": {
"id_point_apport_ou_reparation": "identifiant_externe",
"type_de_point_de_collecte": "acteur_type_id",
"siret": "siret",
"exclusivite_de_reprisereparation": "exclusivite_de_reprisereparation",
"uniquement_sur_rdv": "uniquement_sur_rdv",
"public_accueilli": "public_accueilli",
"reprise": "reprise",
"produitsdechets_acceptes": "produitsdechets_acceptes",
"labels_etou_bonus": "labels_etou_bonus",
"point_de_reparation": "point_de_reparation",
"ecoorganisme": "source_id",
"adresse_format_ban": "adresse_format_ban",
"nom_de_lorganisme": "nom",
"enseigne_commerciale": "nom_commercial",
"perimetre_dintervention": "perimetre_dintervention",
"longitudewgs84": "longitude",
"latitudewgs84": "latitude",
},
Expand Down
5 changes: 0 additions & 5 deletions dags/sources/dags/source_ecodds.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,6 @@
"id_point_apport_ou_reparation": "identifiant_externe",
"type_de_point_de_collecte": "acteur_type_id",
"ecoorganisme": "source_id",
"adresse_format_ban": "adresse_format_ban",
"exclusivite_de_reprisereparation": "exclusivite_de_reprisereparation",
"uniquement_sur_rdv": "uniquement_sur_rdv",
"public_accueilli": "public_accueilli",
"reprise": "reprise",
"nom_de_lorganisme": "nom",
"enseigne_commerciale": "nom_commercial",
"longitudewgs84": "longitude",
Expand Down
9 changes: 0 additions & 9 deletions dags/sources/dags/source_ecologic.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,8 @@
"column_mapping": {
"id_point_apport_ou_reparation": "identifiant_externe",
"type_de_point_de_collecte": "acteur_type_id",
"exclusivite_de_reprisereparation": "exclusivite_de_reprisereparation",
"uniquement_sur_rdv": "uniquement_sur_rdv",
"public_accueilli": "public_accueilli",
"reprise": "reprise",
"produitsdechets_acceptes": "produitsdechets_acceptes",
"labels_etou_bonus": "labels_etou_bonus",
"point_de_reparation": "point_de_reparation",
"ecoorganisme": "source_id",
"adresse_format_ban": "adresse_format_ban",
"nom_de_lorganisme": "nom",
"perimetre_dintervention": "perimetre_dintervention",
"longitudewgs84": "longitude",
"latitudewgs84": "latitude",
},
Expand Down
19 changes: 7 additions & 12 deletions dags/sources/dags/source_ecomaison.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,25 +11,20 @@
" for Ecomaison dataset"
),
params={
"column_transformations": [
{
"origin": "siret",
"transformation": "clean_siret",
"destination": "siret",
},
],
"column_mapping": {
"id_point_apport_ou_reparation": "identifiant_externe",
"type_de_point_de_collecte": "acteur_type_id",
"exclusivite_de_reprisereparation": "exclusivite_de_reprisereparation",
"uniquement_sur_rdv": "uniquement_sur_rdv",
"public_accueilli": "public_accueilli",
"reprise": "reprise",
"enseigne_commerciale": "nom_commercial",
"telephone": "telephone",
"email": "email",
"siret": "siret",
"produitsdechets_acceptes": "produitsdechets_acceptes",
"labels_etou_bonus": "labels_etou_bonus",
"point_de_reparation": "point_de_reparation",
"ecoorganisme": "source_id",
"site_web": "url",
"adresse_format_ban": "adresse_format_ban",
"nom_de_lorganisme": "nom",
"perimetre_dintervention": "perimetre_dintervention",
"longitudewgs84": "longitude",
"latitudewgs84": "latitude",
},
Expand Down
12 changes: 12 additions & 0 deletions dags/sources/dags/source_ecosystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,18 @@
" for Ecosystem dataset"
),
params={
"column_transformations": [
{
"origin": "siren",
"transformation": "clean_siren",
"destination": "siren",
},
{
"origin": "siret",
"transformation": "clean_siret",
"destination": "siret",
},
],
"column_mapping": {
"id_point_apport_ou_reparation": "identifiant_externe",
"type_de_point_de_collecte": "acteur_type_id",
Expand Down
17 changes: 7 additions & 10 deletions dags/sources/dags/source_ocab.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,21 +11,18 @@
" for OCAB dataset"
),
params={
"column_transformations": [
{
"origin": "siret",
"transformation": "clean_siret",
"destination": "siret",
},
],
"column_mapping": {
"id_point_apport_ou_reparation": "identifiant_externe",
"type_de_point_de_collecte": "acteur_type_id",
"exclusivite_de_reprisereparation": "exclusivite_de_reprisereparation",
"uniquement_sur_rdv": "uniquement_sur_rdv",
"public_accueilli": "public_accueilli",
"reprise": "reprise",
"siret": "siret",
"produitsdechets_acceptes": "produitsdechets_acceptes",
"labels_etou_bonus": "labels_etou_bonus",
"point_de_reparation": "point_de_reparation",
"ecoorganisme": "source_id",
"adresse_format_ban": "adresse_format_ban",
"nom_de_lorganisme": "nom",
"perimetre_dintervention": "perimetre_dintervention",
"longitudewgs84": "longitude",
"latitudewgs84": "latitude",
},
Expand Down
12 changes: 12 additions & 0 deletions dags/sources/dags/source_ocad3e.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,18 @@
" for OCAD3E dataset"
),
params={
"column_transformations": [
{
"origin": "siren",
"transformation": "clean_siren",
"destination": "siren",
},
{
"origin": "siret",
"transformation": "clean_siret",
"destination": "siret",
},
],
"column_mapping": {
"id_point_apport_ou_reparation": "identifiant_externe",
"type_de_point_de_collecte": "acteur_type_id",
Expand Down
16 changes: 7 additions & 9 deletions dags/sources/dags/source_pyreo.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,20 +11,18 @@
" for Pyreo dataset"
),
params={
"column_transformations": [
{
"origin": "siret",
"transformation": "clean_siret",
"destination": "siret",
},
],
"column_mapping": {
"id_point_apport_ou_reparation": "identifiant_externe",
"type_de_point_de_collecte": "acteur_type_id",
"exclusivite_de_reprisereparation": "exclusivite_de_reprisereparation",
"uniquement_sur_rdv": "uniquement_sur_rdv",
"public_accueilli": "public_accueilli",
"reprise": "reprise",
"produitsdechets_acceptes": "produitsdechets_acceptes",
"labels_etou_bonus": "labels_etou_bonus",
"point_de_reparation": "point_de_reparation",
"ecoorganisme": "source_id",
"adresse_format_ban": "adresse_format_ban",
"nom_de_lorganisme": "nom",
"perimetre_dintervention": "perimetre_dintervention",
"longitudewgs84": "longitude",
"latitudewgs84": "latitude",
},
Expand Down
19 changes: 7 additions & 12 deletions dags/sources/dags/source_refashion.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,26 +11,21 @@
" for Refashion dataset"
),
params={
"column_transformations": [
{
"origin": "siret",
"transformation": "clean_siret",
"destination": "siret",
},
],
"column_mapping": {
"id_point_apport_ou_reparation": "identifiant_externe",
"adresse_complement": "adresse_complement",
"type_de_point_de_collecte": "acteur_type_id",
"telephone": "telephone",
"siret": "siret",
"exclusivite_de_reprisereparation": "exclusivite_de_reprisereparation",
"uniquement_sur_rdv": "uniquement_sur_rdv",
"public_accueilli": "public_accueilli",
"produitsdechets_acceptes": "produitsdechets_acceptes",
"labels_etou_bonus": "labels_etou_bonus",
"reprise": "reprise",
"point_de_reparation": "point_de_reparation",
"ecoorganisme": "source_id",
"adresse_format_ban": "adresse_format_ban",
"nom_de_lorganisme": "nom",
"enseigne_commerciale": "nom_commercial",
"site_web": "url",
"email": "email",
"perimetre_dintervention": "perimetre_dintervention",
"longitudewgs84": "longitude",
"latitudewgs84": "latitude",
"horaires_douverture": "horaires_description",
Expand Down
11 changes: 1 addition & 10 deletions dags/sources/dags/source_soren.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,22 +16,13 @@
"origin": "horaires_douverture",
"transformation": "convert_opening_hours",
"destination": "horaires_description",
}
},
],
"column_mapping": {
"id_point_apport_ou_reparation": "identifiant_externe",
"type_de_point_de_collecte": "acteur_type_id",
"exclusivite_de_reprisereparation": "exclusivite_de_reprisereparation",
"uniquement_sur_rdv": "uniquement_sur_rdv",
"public_accueilli": "public_accueilli",
"reprise": "reprise",
"produitsdechets_acceptes": "produitsdechets_acceptes",
"labels_etou_bonus": "labels_etou_bonus",
"point_de_reparation": "point_de_reparation",
"ecoorganisme": "source_id",
"adresse_format_ban": "adresse_format_ban",
"nom_de_lorganisme": "nom",
"perimetre_dintervention": "perimetre_dintervention",
"longitudewgs84": "longitude",
"latitudewgs84": "latitude",
},
Expand Down
12 changes: 12 additions & 0 deletions dags/sources/dags/source_valdelia.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,18 @@
" for Valdelia dataset"
),
params={
"column_transformations": [
{
"origin": "siren",
"transformation": "clean_siren",
"destination": "siren",
},
{
"origin": "siret",
"transformation": "clean_siret",
"destination": "siret",
},
],
"column_mapping": {
"id_point_apport_ou_reparation": "identifiant_externe",
"type_de_point_de_collecte": "acteur_type_id",
Expand Down
9 changes: 3 additions & 6 deletions dags/sources/tasks/business_logic/propose_acteur_changes.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@

import numpy as np
import pandas as pd
from sources.tasks.transform.transform_df import clean_phone_number
from utils.base_utils import transform_location
from utils.mapping_utils import parse_float, process_phone_number, process_siret
from utils.mapping_utils import parse_float

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -49,15 +50,11 @@ def propose_acteur_changes(
# On met à jour le modifie_le de qfdmo_acteur
df["modifie_le"] = datetime.now()

# TODO : à déplacer dans la source_data_normalize
if "siret" in df.columns:
df["siret"] = df["siret"].apply(process_siret)

# TODO : à déplacer dans la source_data_normalize
if "telephone" in df.columns and "code_postal" in df.columns:
df["telephone"] = df.apply(
lambda row: pd.Series(
process_phone_number(row["telephone"], row["code_postal"])
clean_phone_number(row["telephone"], row["code_postal"])
),
axis=1,
)
Expand Down
Loading

0 comments on commit 7ade97e

Please sign in to comment.