Skip to content

Commit

Permalink
Alertes de suspicion de sous-déclaration (#3810)
Browse files Browse the repository at this point in the history
## Linked issues

- Resolve #3647

## Contexte et critères de déclenchement

Cette alerte vient compléter les alertes déjà existantes de FAR manquant
et de DEP manquant sur les manquements aux obligations déclaratives en
détectant les navires dont les déclarations sont bien présentes mais
dans lesquelles les volumes de produits déclarés sont manifestement trop
faibles au regard de l'effort de pêche déployé.

En analysant les données VMS et déclaratives agrégées par navire sur 7
jours révolus (7 jours devant donc avoir déjà fait l'objet de
déclarations FAR), on trouve :


![image](https://github.com/user-attachments/assets/b1d4e6b8-028a-4c89-8879-59a59062e561)

En dessous de la droite `poids = 0.015 * effort de pêche` on trouve :
- des navires sans aucune déclaration de capture sur 7 jours --> ceux-ci
seront identifiés par l'alerte de FAR et / ou DEP manquant s'il n'y a
pas de DEP non plus
- des navires avec uniquement des FAR sans capture (FAR 0). Il peut
arriver qu'un effort de pêche ne donne lieu à aucune capture mais cela
reste l'exception. Si on limite aux navires qui ont fait au moins 2
jours de pêche (sur 7 jours glissants), on trouve uniquement des navires
qui ne font QUE des FAR 0 et qui sont nécessairement incorrects puisque
les déclarations de débarquement associées comportent, elles, des
captures --> ceux-ci seront identifiés par l'alerte de suspicion de
sous-déclaration
- des navires qui font uniquement des déclarations avec des poids non
plausibles (1kg sur chaque espèce...) --> ceux-ci seront identifiés par
l'alerte de suspicion de sous-déclaration
  • Loading branch information
VincentAntoine authored Nov 4, 2024
2 parents e9c3859 + 49065fd commit cfe9fb1
Show file tree
Hide file tree
Showing 19 changed files with 388 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ enum class AlertTypeMapping(
clazz = MissingFAR48HoursAlert::class.java,
alertName = "Non-emission de message \"FAR\" en 48h",
),
SUSPICION_OF_UNDER_DECLARATION_ALERT(
clazz = SuspicionOfUnderDeclarationAlert::class.java,
alertName = "Suspicion de sous-déclaration",
),
;

override fun getImplementation(): Class<out AlertType> {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package fr.gouv.cnsp.monitorfish.domain.entities.alerts.type

class SuspicionOfUnderDeclarationAlert(
override var seaFront: String? = null,
override var dml: String? = null,
var riskFactor: Double? = null,
) : AlertType(AlertTypeMapping.SUSPICION_OF_UNDER_DECLARATION_ALERT, seaFront, dml, 27689)
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ class GetPendingAlerts(
AlertTypeMapping.TWELVE_MILES_FISHING_ALERT,
AlertTypeMapping.MISSING_DEP_ALERT,
AlertTypeMapping.MISSING_FAR_48_HOURS_ALERT,
AlertTypeMapping.SUSPICION_OF_UNDER_DECLARATION_ALERT,
),
).map { pendingAlert ->
pendingAlert.value.natinfCode?.let {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ class ArchiveOutdatedReportings(private val reportingRepository: ReportingReposi
reportingCandidatesToArchive.filter {
it.second.type == AlertTypeMapping.MISSING_FAR_ALERT ||
it.second.type == AlertTypeMapping.THREE_MILES_TRAWLING_ALERT ||
it.second.type == AlertTypeMapping.MISSING_DEP_ALERT
it.second.type == AlertTypeMapping.MISSING_DEP_ALERT ||
it.second.type == AlertTypeMapping.SUSPICION_OF_UNDER_DECLARATION_ALERT
}.map { it.first }

logger.info("Found ${filteredReportingIdsToArchive.size} reportings to archive.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ class GetPendingAlertsUTests {
AlertTypeMapping.TWELVE_MILES_FISHING_ALERT,
AlertTypeMapping.MISSING_DEP_ALERT,
AlertTypeMapping.MISSING_FAR_48_HOURS_ALERT,
AlertTypeMapping.SUSPICION_OF_UNDER_DECLARATION_ALERT,
),
)
Mockito.verify(infractionRepository, Mockito.times(1)).findInfractionByNatinfCode(eq(7059))
Expand Down
1 change: 1 addition & 0 deletions datascience/src/pipeline/entities/alerts.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,4 @@ class AlertType(Enum):
MISSING_DEP_ALERT = "MISSING_DEP_ALERT"
MISSING_FAR_ALERT = "MISSING_FAR_ALERT"
MISSING_FAR_48_HOURS_ALERT = "MISSING_FAR_48_HOURS_ALERT"
SUSPICION_OF_UNDER_DECLARATION = "SUSPICION_OF_UNDER_DECLARATION"
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
from pathlib import Path

from prefect import Flow, case, task
from prefect.executors import LocalDaskExecutor

from src.pipeline.entities.alerts import AlertType
from src.pipeline.generic_tasks import extract
from src.pipeline.shared_tasks.alerts import (
extract_active_reportings,
extract_silenced_alerts,
filter_alerts,
load_alerts,
make_alerts,
)
from src.pipeline.shared_tasks.control_flow import check_flow_not_running


@task(checkpoint=False)
def extract_suspicions_of_under_declaration():
return extract(
db_name="monitorfish_remote",
query_filepath="monitorfish/suspicions_of_under_declaration.sql",
)


with Flow("Suspicions of under-declaration", executor=LocalDaskExecutor()) as flow:
flow_not_running = check_flow_not_running()
with case(flow_not_running, True):
vessels_with_suspicions_of_under_declaration = (
extract_suspicions_of_under_declaration()
)

alerts = make_alerts(
vessels_with_suspicions_of_under_declaration,
AlertType.SUSPICION_OF_UNDER_DECLARATION.value,
AlertType.SUSPICION_OF_UNDER_DECLARATION.value,
)
silenced_alerts = extract_silenced_alerts(
AlertType.SUSPICION_OF_UNDER_DECLARATION.value
)
active_reportings = extract_active_reportings(
AlertType.SUSPICION_OF_UNDER_DECLARATION.value
)
filtered_alerts = filter_alerts(alerts, silenced_alerts, active_reportings)

# Load
load_alerts(
filtered_alerts,
alert_config_name=AlertType.SUSPICION_OF_UNDER_DECLARATION.value,
)

flow.file_name = Path(__file__).name
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
WITH fishing_efforts AS (
SELECT
p.internal_reference_number AS cfr,
d.dml,
COALESCE(p.flag_state, v.flag_state) AS flag_state,
v.power * EXTRACT(epoch FROM SUM(p.time_since_previous_position)) / 3600 AS fishing_effort_kwh
FROM positions p
LEFT JOIN vessels v
ON v.cfr = p.internal_reference_number
LEFT JOIN districts d
ON d.district_code = v.district_code
WHERE
p.date_time >= DATE_TRUNC('day', CURRENT_TIMESTAMP AT TIME ZONE 'UTC') - INTERVAL '7 days'
AND p.date_time < DATE_TRUNC('day', CURRENT_TIMESTAMP AT TIME ZONE 'UTC')
AND p.internal_reference_number IS NOT NULL
AND p.flag_state = 'FR'
AND v.length >= 12
AND v.logbook_equipment_status = 'Equipé'
AND p.is_fishing
GROUP BY 1, 2, 3, v.power
-- Minimum number of days with fishing activity
HAVING COUNT(DISTINCT DATE_TRUNC('day', date_time)) >= 2
),

catches AS (
SELECT
lb.cfr,
COALESCE(SUM(weight), 0) AS weight
FROM logbook_reports lb
LEFT JOIN jsonb_array_elements(lb.value->'hauls') haul ON true
LEFT JOIN LATERAL (
SELECT
SUM((catch->>'weight')::DOUBLE PRECISION) AS weight
FROM jsonb_array_elements(haul->'catches') catch
) catch_weight ON true
WHERE
lb.operation_datetime_utc >= DATE_TRUNC('day', CURRENT_TIMESTAMP AT TIME ZONE 'UTC') - INTERVAL '7 days'
AND lb.activity_datetime_utc >= DATE_TRUNC('day', CURRENT_TIMESTAMP AT TIME ZONE 'UTC') - INTERVAL '7 days'
AND lb.log_type = 'FAR'
GROUP BY 1
)

SELECT
fe.cfr,
lp.external_immatriculation,
lp.ircs,
lp.vessel_id,
lp.vessel_identifier,
lp.vessel_name,
f.facade,
fe.dml,
fe.flag_state,
lp.risk_factor,
lp.latitude,
lp.longitude
FROM fishing_efforts fe
JOIN catches c
ON fe.cfr = c.cfr
LEFT JOIN last_positions lp
ON lp.cfr = fe.cfr
LEFT JOIN facade_areas_subdivided f
ON ST_Intersects(ST_SetSRID(ST_Point(lp.longitude, lp.latitude), 4326), f.geometry)
WHERE c.weight < 0.015 * COALESCE(fe.fishing_effort_kWh, 0)
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ INSERT INTO public.vessels (
declared_fishing_gears, nav_licence_expiration_date,
vessel_emails, vessel_phones, proprietor_name, proprietor_phones, proprietor_emails, operator_name, operator_phones,
under_charter,
operator_mobile_phone, vessel_mobile_phone, vessel_telex, vessel_fax, operator_fax, operator_email
operator_mobile_phone, vessel_mobile_phone, vessel_telex, vessel_fax, operator_fax, operator_email, logbook_equipment_status
) VALUES
(
1,
Expand All @@ -16,7 +16,7 @@ INSERT INTO public.vessels (
'{GNS,GTR,LLS}', (NOW() AT TIME ZONE 'UTC')::TIMESTAMP + INTERVAL '2 months',
'{}', '{}', NULL, '{}', '{}', 'Le pêcheur de poissons', '{1234567890,"06 06 06 06 06"}',
false,
null, null, null, null, null, '[email protected]'
null, null, null, null, null, '[email protected]', 'Equipé'
),
(
2,
Expand All @@ -25,7 +25,7 @@ INSERT INTO public.vessels (
'{DRB,PS1}', (NOW() AT TIME ZONE 'UTC')::TIMESTAMP + INTERVAL '3 months',
'{[email protected], [email protected]}', '{}', NULL, '{}', '{}', 'Le pêcheur de crevettes', '{9876543210}',
true,
'0600000000', null, null, '0100000000', '0200000000', '[email protected]'
'0600000000', null, null, '0100000000', '0200000000', '[email protected]', 'Equipé'
),
(
3,
Expand All @@ -34,7 +34,7 @@ INSERT INTO public.vessels (
'{OTM,OTB,OTT}', NULL,
'{}', '{}', NULL, '{}', '{}', 'Le pêcheur de fonds', '{0000000000}',
false,
null, null, null, null, null, '[email protected]'
null, null, null, null, null, '[email protected]', 'Equipé'
),
(
4,
Expand All @@ -43,7 +43,7 @@ INSERT INTO public.vessels (
'{OTM,OTB,OTT}', NULL,
'{}', '{}', NULL, '{}', '{}', 'Le pêcheur', '{11111111111}',
false,
null, '0111111111', null, null, null, '[email protected]'
null, '0111111111', null, null, null, '[email protected]', 'Equipé'
),
(
5,
Expand All @@ -52,7 +52,7 @@ INSERT INTO public.vessels (
'{OTT}', NULL,
'{}', '{}', NULL, '{}', '{}', 'Le pêcheur qui se cache', '{2222222222}',
false,
null, null, null, null, null, '[email protected]'
null, null, null, null, null, '[email protected]', 'Equipé'
),
(
6,
Expand All @@ -61,7 +61,7 @@ INSERT INTO public.vessels (
'{OTT}', NULL,
'{}', '{}', NULL, '{}', '{}', 'Le pêcheur qui se fait ses 4h reports', '{3333333333}',
false,
null, null, null, null, null, '[email protected]'
null, null, null, null, null, '[email protected]', 'Equipé'
),
(
7,
Expand All @@ -70,6 +70,6 @@ INSERT INTO public.vessels (
'{LLS}', NULL,
'{}', '{}', NULL, '{}', '{}', 'Pêchou', '{9546458753}',
false,
null, null, null, null, null, 'target@me'
null, null, null, null, null, 'target@me', 'Equipé'
)
;
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ INSERT INTO positions (
( 13639240, 'ABC000055481', 'AS761555', NULL, 'IL2468', 'PLACE SPECTACLE SUBIR', 'NL', 'FR', NULL, NULL, 53.4279999999999973, 5.55299999999999994, 2, 31, (NOW() AT TIME ZONE 'UTC')::TIMESTAMP - INTERVAL '1 day 1 hour', 'VMS', false, false, 2500, INTERVAL '30 minutes', 2.69, true, INTERVAL '1 day 2 hours 30 minutes'),
( 13640592, 'ABC000055481', 'AS761555', NULL, 'IL2468', 'PLACE SPECTACLE SUBIR', 'NL', 'FR', NULL, NULL, 53.4239999999999995, 5.54900000000000038, 2, 338, (NOW() AT TIME ZONE 'UTC')::TIMESTAMP - INTERVAL '1 day 30 minutes', 'VMS', false, false, 2500, INTERVAL '30 minutes', 2.69, true, INTERVAL '1 day 3 hours'),
( 13641745, 'ABC000055481', 'AS761555', NULL, 'IL2468', 'PLACE SPECTACLE SUBIR', 'NL', 'FR', NULL, NULL, 53.4350000000000023, 5.55299999999999994, 2, 356, (NOW() AT TIME ZONE 'UTC')::TIMESTAMP - INTERVAL '1 day', 'VMS', false, false, 2500, INTERVAL '30 minutes', 2.69, NULL, INTERVAL '1 day 3 hours 30 minutes'),
( 13634203, 'ABC000306959', 'RV348407', NULL, 'LLUK', 'ÉTABLIR IMPRESSION LORSQUE', 'FR', 'FR', NULL, NULL, 49.6069999999999993, -0.744999999999999996, 1, 343, (NOW() AT TIME ZONE 'UTC')::TIMESTAMP - INTERVAL '50 hours 10 minutes', 'VMS', false, true, 2050, INTERVAL '24 hours', 1.107, true, INTERVAL '0 hour'),
( 13634204, 'ABC000306959', 'RV348407', NULL, 'LLUK', 'ÉTABLIR IMPRESSION LORSQUE', 'FR', 'FR', NULL, NULL, 49.6069999999999993, -0.744999999999999996, 1, 343, (NOW() AT TIME ZONE 'UTC')::TIMESTAMP - INTERVAL '26 hours 10 minutes', 'VMS', false, true, 2050, INTERVAL '24 hours', 1.107, true, INTERVAL '0 hour'),
( 13634205, 'ABC000306959', 'RV348407', NULL, 'LLUK', 'ÉTABLIR IMPRESSION LORSQUE', 'FR', 'FR', NULL, NULL, 49.6069999999999993, -0.744999999999999996, 1, 343, (NOW() AT TIME ZONE 'UTC')::TIMESTAMP - INTERVAL '2 hours 10 minutes', 'VMS', false, true, 2050, INTERVAL '1 hour', 1.107, true, INTERVAL '0 hour'),
( 13637054, 'ABC000306959', 'RV348407', NULL, 'LLUK', 'ÉTABLIR IMPRESSION LORSQUE', 'FR', 'FR', NULL, NULL, 49.6060000000000016, -0.735999999999999988, 1.5, 351, (NOW() AT TIME ZONE 'UTC')::TIMESTAMP - INTERVAL '1 hour 10 minutes', 'VMS', false, false, 2050, INTERVAL '1 hour', 1.107, NULL, INTERVAL '0 hour'),
( 13639642, 'ABC000306959', 'RV348407', NULL, 'LLUK', 'ÉTABLIR IMPRESSION LORSQUE', 'FR', 'FR', NULL, NULL, 49.6099999999999994, -0.739999999999999991, 1, 302, (NOW() AT TIME ZONE 'UTC')::TIMESTAMP - INTERVAL '10 minutes', 'VMS', false, NULL, NULL, NULL, NULL, NULL, NULL),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,22 +118,6 @@ VALUES
((now() AT TIME ZONE 'UTC') - INTERVAL '1 month 3 days 23 hours 48 minutes')::TIMESTAMP, NULL, 'ERS'
);

UPDATE logbook_reports
SET value = jsonb_set(
value,
'{departureDatetimeUtc}',
('"' || to_char(CURRENT_TIMESTAMP - INTERVAL '1 month 5 days', 'YYYY-MM-DD') || 'T' || to_char(CURRENT_TIMESTAMP - INTERVAL '1 week 5 days', 'HH24:MI:SS') || 'Z"')::jsonb
)
WHERE operation_number = '3';

UPDATE logbook_reports
SET value = jsonb_set(
value,
'{departureDatetimeUtc}',
('"' || to_char(CURRENT_TIMESTAMP - INTERVAL '1 week 5 days', 'YYYY-MM-DD') || 'T' || to_char(CURRENT_TIMESTAMP - INTERVAL '1 week 5 days', 'HH24:MI:SS') || 'Z"')::jsonb
)
WHERE operation_number = '5';


-- Add FLUX test data
INSERT INTO logbook_reports (
Expand Down Expand Up @@ -286,4 +270,85 @@ INSERT INTO logbook_reports (
((now() AT TIME ZONE 'UTC') - INTERVAL '1 month 10 minutes')::TIMESTAMP, NULL, 'ERS',
false, NULL, NULL
)
;
;
--WHEN log_type = 'FAR' THEN (SELECT MIN((haul->>'farDatetimeUtc')::TIMESTAMPTZ) AT TIME ZONE 'UTC' FROM jsonb_array_elements(value->'hauls') haul)
-- Set activity timestamps to operation_datetime_utc
UPDATE logbook_reports
SET value = jsonb_set(
value,
CASE
WHEN log_type = 'DEP' THEN '{departureDatetimeUtc}'
WHEN log_type = 'COE' THEN '{effortZoneEntryDatetimeUtc}'
WHEN log_type = 'CPS' THEN '{cpsDatetimeUtc}'
WHEN log_type = 'DIS' THEN '{discardDatetimeUtc}'
WHEN log_type = 'COX' THEN '{effortZoneExitDatetimeUtc}'
WHEN log_type = 'CRO' THEN '{effortZoneExitDatetimeUtc}'
WHEN log_type = 'EOF' THEN '{endOfFishingDatetimeUtc}'
WHEN log_type = 'PNO' THEN '{predictedArrivalDatetimeUtc}'
WHEN log_type = 'LAN' THEN '{landingDatetimeUtc}'
WHEN log_type = 'RTP' THEN '{returnDatetimeUtc}'
END::VARCHAR[],
('"' || to_char(operation_datetime_utc + CASE WHEN log_type = 'PNO' THEN INTERVAL '4 hours' ELSE INTERVAL '0' END, 'YYYY-MM-DD"T"HH24:MI:SS.MS"Z"') || '"')::jsonb
)
WHERE
transmission_format = 'ERS'
AND log_type IN ('DEP', 'COE', 'CPS', 'DIS', 'COX', 'CRO', 'EOF', 'PNO', 'LAN', 'RTP');


UPDATE logbook_reports
SET value = jsonb_set(
value,
'{predictedLandingDatetimeUtc}'
,
('"' || to_char(operation_datetime_utc + INTERVAL '4 hours', 'YYYY-MM-DD"T"HH24:MI:SS.MS"Z"') || '"')::jsonb
)
WHERE
transmission_format = 'ERS'
AND log_type = 'PNO';


WITH updated_hauls AS (
SELECT
report_id,
jsonb_agg(
jsonb_set(
haul,
'{farDatetimeUtc}',
('"' || to_char(operation_datetime_utc, 'YYYY-MM-DD"T"HH24:MI:SS.MS"Z"') || '"')::jsonb
)
) AS hauls
FROM logbook_reports, jsonb_array_elements(value->'hauls') haul
WHERE
transmission_format = 'ERS'
AND log_type = 'FAR'
GROUP BY report_id
)

UPDATE logbook_reports lr
SET value = jsonb_set(
value,
'{hauls}',
hauls
)
FROM updated_hauls uh
WHERE uh.report_id = lr.report_id;

-- Add activity_datetime_utc values
UPDATE logbook_reports
SET activity_datetime_utc = CASE
WHEN log_type = 'DEP' THEN (value->>'departureDatetimeUtc')::TIMESTAMPTZ AT TIME ZONE 'UTC'
WHEN log_type = 'NOT-COE' THEN (value->>'effortZoneEntryDatetimeUtc')::TIMESTAMPTZ AT TIME ZONE 'UTC'
WHEN log_type = 'COE' THEN (value->>'effortZoneEntryDatetimeUtc')::TIMESTAMPTZ AT TIME ZONE 'UTC'
WHEN log_type = 'FAR' THEN (SELECT MIN((haul->>'farDatetimeUtc')::TIMESTAMPTZ) AT TIME ZONE 'UTC' FROM jsonb_array_elements(value->'hauls') haul)
WHEN log_type = 'CPS' THEN (value->>'cpsDatetimeUtc')::TIMESTAMPTZ AT TIME ZONE 'UTC'
WHEN log_type = 'DIS' THEN (value->>'discardDatetimeUtc')::TIMESTAMPTZ AT TIME ZONE 'UTC'
WHEN log_type = 'NOT-COX' THEN (value->>'effortZoneExitDatetimeUtc')::TIMESTAMPTZ AT TIME ZONE 'UTC'
WHEN log_type = 'COX' THEN (value->>'effortZoneExitDatetimeUtc')::TIMESTAMPTZ AT TIME ZONE 'UTC'
WHEN log_type = 'CRO' THEN (value->>'effortZoneExitDatetimeUtc')::TIMESTAMPTZ AT TIME ZONE 'UTC'
WHEN log_type = 'EOF' THEN (value->>'endOfFishingDatetimeUtc')::TIMESTAMPTZ AT TIME ZONE 'UTC'
WHEN log_type = 'PNO' THEN (value->>'predictedArrivalDatetimeUtc')::TIMESTAMPTZ AT TIME ZONE 'UTC'
WHEN log_type = 'LAN' THEN (value->>'landingDatetimeUtc')::TIMESTAMPTZ AT TIME ZONE 'UTC'
WHEN log_type = 'RTP' THEN (value->>'returnDatetimeUtc')::TIMESTAMPTZ AT TIME ZONE 'UTC'
ELSE NULL
END
WHERE log_type IS NOT NULL;
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ def current_segments() -> pd.DataFrame:
],
"departure_datetime_utc": [
pd.NaT,
datetime.datetime(2018, 2, 27, 1, 5),
now - datetime.timedelta(days=2),
now - datetime.timedelta(weeks=1, days=5),
pd.NaT,
pd.NaT,
Expand Down
Loading

0 comments on commit cfe9fb1

Please sign in to comment.