Skip to content

Commit

Permalink
keep dagrun managment and create suggestion aside it
Browse files Browse the repository at this point in the history
  • Loading branch information
kolok committed Jan 16, 2025
1 parent 65b77f4 commit b657043
Show file tree
Hide file tree
Showing 24 changed files with 445 additions and 100 deletions.
2 changes: 1 addition & 1 deletion core/urls.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ class PaginatedSitemap(GenericSitemap):
path("dsfr/", include(("dsfr_hacks.urls", "dsfr_hacks"), namespace="dsfr_hacks")),
path("", include(("qfdmo.urls", "qfdmo"), namespace="qfdmo")),
path("", include(("qfdmd.urls", "qfdmd"), namespace="qfdmd")),
path("", include(("data.urls", "data"), namespace="data")),
path("data/", include(("data.urls", "data"), namespace="data")),
path("docs/", TemplateView.as_view(template_name="techdocs.html"), name="techdocs"),
]

Expand Down
2 changes: 1 addition & 1 deletion dags/annuaire_entreprise_checks.py
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ def db_data_prepare(**kwargs):
serialized_data = {}
for key, df in data.items():
df["event"] = "UPDATE_ACTOR"
df["suggestion"] = df[columns].apply(
df["row_updates"] = df[columns].apply(
lambda row: json.dumps(row.to_dict(), default=str), axis=1
)
serialized_data[key] = {"df": df, "metadata": {"updated_rows": len(df)}}
Expand Down
4 changes: 2 additions & 2 deletions dags/shared/tasks/business_logic/write_data.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from utils.dag_eo_utils import insert_suggestion_and_process_df
from utils.dag_eo_utils import insert_dagrun_and_process_df


def write_data(
Expand All @@ -20,4 +20,4 @@ def write_data(
run_name = run_id.replace("__", " - ")
df = data["df"]
metadata.update(data.get("metadata", {}))
insert_suggestion_and_process_df(df, metadata, dag_name_suffixed, run_name)
insert_dagrun_and_process_df(df, metadata, dag_name_suffixed, run_name)
6 changes: 6 additions & 0 deletions dags/sources/config/shared_constants.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
# DEPRECATED DagRun statuts
DAGRUN_TOVALIDATE = "TO_VALIDATE"
DAGRUN_TOINSERT = "TO_INSERT"
DAGRUN_REJECTED = "REJECTED"
DAGRUN_FINISHED = "FINISHED"

# Suggestion statuts (pour cohorte et unitaire)
SUGGESTION_AVALIDER = "AVALIDER"
SUGGESTION_REJETER = "REJETER"
Expand Down
1 change: 1 addition & 0 deletions dags/sources/tasks/transform/transform_df.py
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,7 @@ def compute_location(row: pd.Series, _):
lng_column = row.keys()[1]
row[lat_column] = parse_float(row[lat_column])
row[lng_column] = parse_float(row[lng_column])
print(row[lat_column], row[lng_column])
row["location"] = transform_location(row[lng_column], row[lat_column])
return row[["location"]]

Expand Down
3 changes: 2 additions & 1 deletion dags/utils/base_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,8 @@ def extract_details(row, col="adresse_format_ban"):


def transform_location(longitude, latitude):
if not longitude or not latitude:
if not longitude or not latitude or math.isnan(longitude) or math.isnan(latitude):
print("Longitude or latitude is missing.")
return None
return wkb.dumps(Point(longitude, latitude)).hex()

Expand Down
36 changes: 13 additions & 23 deletions dags/utils/dag_eo_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,48 +8,38 @@
logger = logging.getLogger(__name__)


def insert_suggestion_and_process_df(df_acteur_updates, metadata, dag_name, run_name):
def insert_dagrun_and_process_df(df_acteur_updates, metadata, dag_name, run_name):
if df_acteur_updates.empty:
return
engine = PostgresConnectionManager().engine
current_date = datetime.now()

with engine.connect() as conn:
# Insert a new suggestion
# Insert a new dagrun
result = conn.execute(
"""
INSERT INTO data_suggestioncohorte
(
identifiant_action,
identifiant_execution,
type_action,
statut,
metadata,
cree_le,
modifie_le
)
VALUES (%s, %s, %s, %s, %s, %s, %s)
INSERT INTO qfdmo_dagrun
(dag_id, run_id, status, meta_data, created_date, updated_date)
VALUES (%s, %s, %s, %s, %s, %s)
RETURNING ID;
""",
(
dag_name,
run_name,
constants.SUGGESTION_SOURCE,
constants.SUGGESTION_AVALIDER,
"TO_VALIDATE",
json.dumps(metadata),
current_date,
current_date,
),
)
suggestion_cohorte_id = result.fetchone()[0]
dag_run_id = result.fetchone()[0]

# Insert dag_run_change
df_acteur_updates["type_action"] = df_acteur_updates["event"]
df_acteur_updates["suggestion_cohorte_id"] = suggestion_cohorte_id
df_acteur_updates["statut"] = constants.SUGGESTION_AVALIDER
df_acteur_updates[
["suggestion", "suggestion_cohorte_id", "type_action", "statut"]
].to_sql(
"data_suggestionunitaire",
df_acteur_updates["change_type"] = df_acteur_updates["event"]
df_acteur_updates["dag_run_id"] = dag_run_id
df_acteur_updates["status"] = constants.DAGRUN_TOVALIDATE
df_acteur_updates[["row_updates", "dag_run_id", "change_type", "status"]].to_sql(
"qfdmo_dagrunchange",
engine,
if_exists="append",
index=False,
Expand Down
8 changes: 3 additions & 5 deletions dags/utils/dag_ingest_validated_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ def handle_update_actor_event(df_actors, dag_run_id):
]

current_time = datetime.now().astimezone().isoformat(timespec="microseconds")
df_actors = df_actors[df_actors["status"] == shared_constants.SUGGESTION_ATRAITER]
df_actors = df_actors[df_actors["status"] == shared_constants.DAGRUN_TOINSERT]
df_actors = df_actors.apply(mapping_utils.replace_with_selected_candidat, axis=1)
df_actors[["adresse", "code_postal", "ville"]] = df_actors.apply(
lambda row: base_utils.extract_details(row, col="adresse_candidat"), axis=1
Expand Down Expand Up @@ -312,9 +312,7 @@ def handle_write_data_update_actor_event(connection, df_actors):


def update_dag_run_status(
connection, dag_run_id, statut=shared_constants.SUGGESTION_SUCCES
connection, dag_run_id, statut=shared_constants.DAGRUN_FINISHED
):
query = f"""
UPDATE data_suggestioncohorte SET statut = '{statut}' WHERE id = {dag_run_id}
"""
query = f"UPDATE qfdmo_dagrun SET status = '{statut}' WHERE id = {dag_run_id}"
connection.execute(query)
31 changes: 31 additions & 0 deletions dags_unit_tests/sources/tasks/transform/test_transform_df.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
clean_label_codes,
clean_siret_and_siren,
clean_telephone,
compute_location,
get_latlng_from_geopoint,
merge_and_clean_souscategorie_codes,
merge_duplicates,
Expand Down Expand Up @@ -451,3 +452,33 @@ def test_get_latlng_from_geopoint(self):
result = get_latlng_from_geopoint(row, None)
assert result["latitude"] == 48.8588443
assert result["longitude"] == 2.2943506


PARIS_LOCATION = (
"0101000000a835cd3b4ed1024076e09c11a56d4840" # pragma: allowlist secret
)


LONDON_LOCATION = (
"0101000000ebe2361ac05bc0bfc5feb27bf2c04940" # pragma: allowlist secret
)


class TestComputeLocation:

@pytest.mark.parametrize(
"latitude, longitude, expected_location",
[
(48.8566, 2.3522, PARIS_LOCATION),
("48.8566", "2.3522", PARIS_LOCATION),
(51.5074, -0.1278, LONDON_LOCATION),
(None, None, None), # Missing lat and long
],
)
def test_compute_location(self, latitude, longitude, expected_location):

result = compute_location(
pd.Series({"latitude": latitude, "longitude": longitude}), None
)
print(result["location"])
assert result["location"] == expected_location
8 changes: 4 additions & 4 deletions data/urls.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
from django.urls import path

from data.views import DagsValidation
from data.views import SuggestionManagment

urlpatterns = [
path(
"dags/validations",
DagsValidation.as_view(),
name="dags_validations",
"suggestions",
SuggestionManagment.as_view(),
name="suggestions",
),
]
2 changes: 1 addition & 1 deletion data/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ def dispatch(self, request, *args, **kwargs):
}


class DagsValidation(IsStaffMixin, FormView):
class SuggestionManagment(IsStaffMixin, FormView):
form_class = SuggestionCohorteForm
template_name = "data/dags_validations.html"
success_url = "/dags/validations"
Expand Down
16 changes: 8 additions & 8 deletions jinja2/qfdmo/create_actor_event.html
Original file line number Diff line number Diff line change
@@ -1,22 +1,22 @@
{% if suggestion_unitaires|length > 0 and suggestion_unitaires[0].change_type == 'CREATE' %}
{% if dagrun_lines|length > 0 and dagrun_lines[0].change_type == 'CREATE' %}
<thead>

<tr>
<th scope="col">change_type</th>
<th scope="col">meta_data</th>
<th scope="col">Acteur</th>
<th scope="col">Proposition de service</th>
<th scope="col">suggestion</th>
<th scope="col">row_updates</th>
</tr>

</thead>
<tbody>
{% for suggestion_unitaire in suggestion_unitaires if suggestion_unitaire.change_type == 'CREATE' %}
{% for dagrun_line in dagrun_lines if dagrun_line.change_type == 'CREATE' %}
<tr>
<td>{{ suggestion_unitaire.get_change_type_display() }}</td>
<td>{{ suggestion_unitaire.meta_data if suggestion_unitaire.meta_data else "-" }}</td>
<td>{{ dagrun_line.get_change_type_display() }}</td>
<td>{{ dagrun_line.meta_data if dagrun_line.meta_data else "-" }}</td>
<td>
{% for key, value in suggestion_unitaire.display_acteur_details().items() %}
{% for key, value in dagrun_line.display_acteur_details().items() %}
<p><strong>{{ key }}</strong> : {{ value }}</p>
{% endfor %}
</td>
Expand All @@ -26,7 +26,7 @@
<th>Action</th>
<th>Sous-Catégories</th>
</tr>
{% for service in suggestion_unitaire.display_proposition_service() %}
{% for service in dagrun_line.display_proposition_service() %}
<tr>
<td>{{ service.action }}</td>
<td>
Expand All @@ -43,7 +43,7 @@
<td>
<details>
<summary>Données brutes</summary>
<pre class="qf-text-wrap">{{ suggestion_unitaire.suggestion }}</pre>
<pre class="qf-text-wrap">{{ dagrun_line.row_updates }}</pre>
</details>
</td>
</tr>
Expand Down
51 changes: 51 additions & 0 deletions jinja2/qfdmo/dags_validations.html
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
{% extends 'layout/base.html' %}

{% block content %}

<div class="fr-m-3w">
<h1>Interface déprécier</h1>

La nouvelle interface est disponible à l'adresse suivante : <a href="{{ url('data:suggestions') }}">/data/suggestions</a>
<h2>Validations des «DAGs»</h2>

<p>
Cette page permet de valider les données des «DAGs».
</p>

<form>
{{ csrf_input }}
{{ form }}
<div class="fr-my-3w">
<button type="submit" class="fr-btn" method='GET'>Afficher le résumé du dag</button>
</div>

{% if dagrun_instance %}
<h2>Instance du DAG : {{ dagrun_instance }}</h2>
<h3>Meta données</h3>
{% for (meta_title, meta_data) in dagrun_instance.display_meta_data().items() %}
<p><strong>{{ meta_title }}</strong> : {{meta_data}}</p>
{% endfor %}
<details>
<summary>meta_data brutes</summary>
<pre class="qf-text-wrap">{{ dagrun_instance.meta_data }}</pre>
</details>
<h3>Exemples</h3>

<div class="fr-table">
<table>
<caption>Résumé du tableau (accessibilité)</caption>
{% include 'qfdmo/update_actor_event.html' %}
{% include 'qfdmo/create_actor_event.html' %}

</table>
</div>
<div class="fr-my-3w">
<button type="submit" class="fr-btn" name='dag_valid' value='1' formmethod='POST'>Valider les modifications</button>
<button type="submit" class="fr-btn fr-btn--secondary" name='dag_valid' value='0' formmethod='POST'>Refuser</button>
</div>
{% endif %}
</form>

</div>

{% endblock %}
10 changes: 5 additions & 5 deletions jinja2/qfdmo/partials/candidat_row.html
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,17 @@
<th>Meilleure proposition</th>
<th>Map Link</th>
</tr>
{% for candidat in suggestion_unitaire.suggestion.ae_result %}
{% for candidat in dagrun_line.row_updates.ae_result %}
{% if candidat.etat_admin_candidat != 'F' %}
{% with comparison_result=(suggestion_unitaire.suggestion.best_candidat_index and loop.index == suggestion_unitaire.suggestion.best_candidat_index|int) %}
{% with comparison_result=(dagrun_line.row_updates.best_candidat_index and loop.index == dagrun_line.row_updates.best_candidat_index|int) %}
{# Ces valeurs sont définies dans dags/utils/shared_constants.py, à garder synchronisées entre Django et Airflow #}
<tr class="{% if suggestion_unitaire.status == 'ATRAITER' and comparison_result %}validated{% elif suggestion_unitaire.status == 'REJETER' %}rejected{% endif %}">
<tr class="{% if dagrun_line.status == 'TO_INSERT' and comparison_result %}validated{% elif dagrun_line.status == 'REJECTED' %}rejected{% endif %}">

<td>
<form method="post" action="{{ request.path }}" target="form-frame-{{identifiant_unique}}-{{ loop.index }}">
{{ csrf_input }}
<input type="hidden" name="id" value="{{ suggestion_unitaire.id }}">
<input type="hidden" name="suggestion_cohorte" value="{{ suggestion_cohorte }}">
<input type="hidden" name="id" value="{{ dagrun_line.id }}">
<input type="hidden" name="dagrun" value="{{ dagrun }}">
<input type="hidden" name="identifiant_unique" value="{{ identifiant_unique }}">
<input type="hidden" name="index" value="{{ loop.index }}">
<button class="fr-btn" type="submit" name="action" value="validate">O</button>
Expand Down
26 changes: 13 additions & 13 deletions jinja2/qfdmo/update_actor_event.html
Original file line number Diff line number Diff line change
@@ -1,52 +1,52 @@
{% if suggestion_unitaires|length > 0 and suggestion_unitaires[0].change_type == 'UPDATE_ACTOR' %}
{% if dagrun_lines|length > 0 and dagrun_lines[0].change_type == 'UPDATE_ACTOR' %}
<thead>

<tr>
<th scope="col">change_type</th>
<th scope="col">meta_data</th>
<th scope="col">Identifiant Unique</th>
<th scope="col">Candidats</th>
<th scope="col">suggestion</th>
<th scope="col">row_updates</th>
</tr>

</thead>
<tbody>
{% for suggestion_unitaire in suggestion_unitaires if suggestion_unitaire.change_type == 'UPDATE_ACTOR' %}
{% for dagrun_line in dagrun_lines if dagrun_line.change_type == 'UPDATE_ACTOR' %}
<tr>
<td>{{ suggestion_unitaire.get_change_type_display() }}</td>
<td>{{ suggestion_unitaire.meta_data if suggestion_unitaire.meta_data else "-" }}</td>
<td>{{ dagrun_line.get_change_type_display() }}</td>
<td>{{ dagrun_line.meta_data if dagrun_line.meta_data else "-" }}</td>
<td>
{% with identifiant_unique=suggestion_unitaire.display_acteur_details().identifiant_unique %}
{% with identifiant_unique=dagrun_line.display_acteur_details().identifiant_unique %}
<a href="{{ url('admin:qfdmo_displayedacteur_change', args=[identifiant_unique]) }}" target="_blank" rel="noreferrer">{{ identifiant_unique }}</a>
{% endwith %}
</td>
<td>
{% with candidat=candidat, index=loop.index, suggestion_cohorte=request.GET.suggestion_cohorte,
identifiant_unique=suggestion_unitaire.display_acteur_details().identifiant_unique %}
{% with candidat=candidat, index=loop.index, dagrun=request.GET.dagrun,
identifiant_unique=dagrun_line.display_acteur_details().identifiant_unique %}
{% include 'qfdmo/partials/candidat_row.html' %}

{% endwith %}
</td>
<td>
<details>
<summary>Données brutes</summary>
<pre class="qf-text-wrap">{{ suggestion_unitaire.suggestion }}</pre>
<pre class="qf-text-wrap">{{ dagrun_line.row_updates }}</pre>
</details>
</td>
</tr>
{% endfor %}
</tbody>
{% if suggestion_unitaires.has_other_pages %}
{% if dagrun_lines.has_other_pages %}
<form method="get" action="">
<label for="page-select">Go to page:</label>
<select id="page-select" class="fr-select" name="page" onchange="this.form.submit()">
{% for num in suggestion_unitaires.paginator.page_range %}
<option value="{{ num }}" {% if suggestion_unitaires.number == num %}selected{% endif %}>
{% for num in dagrun_lines.paginator.page_range %}
<option value="{{ num }}" {% if dagrun_lines.number == num %}selected{% endif %}>
{{ num }}
</option>
{% endfor %}
</select>
<input type="hidden" name="suggestion_cohorte" value="{{ request.GET.suggestion_cohorte }}">
<input type="hidden" name="dagrun" value="{{ request.GET.dagrun }}">
<noscript>
<input type="submit" value="Go">
</noscript>
Expand Down
Loading

0 comments on commit b657043

Please sign in to comment.