diff --git a/datascience/config.py b/datascience/config.py index 56789858b3..add704158c 100644 --- a/datascience/config.py +++ b/datascience/config.py @@ -167,6 +167,9 @@ RISK_FACTOR_VERIFICATION_THRESHOLD = 2.3 FLAG_STATES_WITHOUT_SYSTEMATIC_VERIFICATION = ["FRA"] +# Missing DEP alerts configuration +MISSING_DEP_TRACK_ANALYSIS_HOURS = 48 + # App URL MONITORFISH_URL = os.getenv("MONITORFISH_URL") # http://monitor.fish/ BACKOFFICE_REGULATION_URL = MONITORFISH_URL + "backoffice/regulation" diff --git a/datascience/src/pipeline/flows/missing_dep_alerts.py b/datascience/src/pipeline/flows/missing_dep_alerts.py index c09f519800..8e5dac8e0a 100644 --- a/datascience/src/pipeline/flows/missing_dep_alerts.py +++ b/datascience/src/pipeline/flows/missing_dep_alerts.py @@ -1,8 +1,9 @@ from pathlib import Path -from prefect import Flow, case, task +from prefect import Flow, Parameter, case, task from prefect.executors import LocalDaskExecutor +from config import MISSING_DEP_TRACK_ANALYSIS_HOURS from src.pipeline.entities.alerts import AlertType from src.pipeline.generic_tasks import extract from src.pipeline.shared_tasks.alerts import ( @@ -16,23 +17,28 @@ @task(checkpoint=False) -def extract_missing_deps(): +def extract_missing_deps(hours_from_now: int): return extract( - db_name="monitorfish_remote", query_filepath="monitorfish/missing_deps.sql" + db_name="monitorfish_remote", + query_filepath="monitorfish/missing_deps.sql", + params={"hours_from_now": hours_from_now}, ) with Flow("Missing DEP alerts", executor=LocalDaskExecutor()) as flow: flow_not_running = check_flow_not_running() with case(flow_not_running, True): - vessels_with_missing_deps = extract_missing_deps() + hours_from_now = Parameter("hours_from_now", MISSING_DEP_TRACK_ANALYSIS_HOURS) + vessels_with_missing_deps = extract_missing_deps(hours_from_now) alerts = make_alerts( vessels_with_missing_deps, AlertType.MISSING_DEP_ALERT.value, AlertType.MISSING_DEP_ALERT.value, ) - silenced_alerts = extract_silenced_alerts(AlertType.MISSING_DEP_ALERT.value) + silenced_alerts = extract_silenced_alerts( + AlertType.MISSING_DEP_ALERT.value, number_of_hours=hours_from_now + ) active_reportings = extract_active_reportings(AlertType.MISSING_DEP_ALERT.value) filtered_alerts = filter_alerts(alerts, silenced_alerts, active_reportings) diff --git a/datascience/src/pipeline/flows/missing_far_alerts.py b/datascience/src/pipeline/flows/missing_far_alerts.py index 1cdc1868f4..93383268fe 100644 --- a/datascience/src/pipeline/flows/missing_far_alerts.py +++ b/datascience/src/pipeline/flows/missing_far_alerts.py @@ -27,14 +27,17 @@ @task(checkpoint=False) -def get_dates(days_without_far: int) -> Tuple[datetime, datetime, datetime, datetime]: +def get_dates( + days_without_far: int, +) -> Tuple[datetime, datetime, datetime, datetime, float]: """ - Returns the dates used in the flow as a 4-tuple : + Returns the dates used in the flow as a 5-tuple : - - `days_without_far` days ago at 00:00 (beginning of the day) in UTC + - `days_without_far` days ago at 00:00 (beginning of the day) in UTC (1) - Yesterday at 8pm in UTC - Today at 00:00 (beginning of the day) in UTC - - Current datetime in UTC + - Current datetime in UTC (2) + - The number of hours that separate 1 and 2 Returns: Tuple[datetime, datetime, datetime] @@ -43,12 +46,16 @@ def get_dates(days_without_far: int) -> Tuple[datetime, datetime, datetime, date today_at_zero_hours = utcnow.replace(hour=0, minute=0, second=0, microsecond=0) period_start_at_zero_hours = today_at_zero_hours - timedelta(days=days_without_far) yesterday_at_eight_pm = today_at_zero_hours - timedelta(hours=4) + period_start_hours_from_now = ( + utcnow - period_start_at_zero_hours + ).total_seconds() / 3600 return ( period_start_at_zero_hours, yesterday_at_eight_pm, today_at_zero_hours, utcnow, + period_start_hours_from_now, ) @@ -301,10 +308,16 @@ def get_vessels_at_sea(positions_at_sea: pd.DataFrame, min_days: int) -> pd.Data "vessel_name", "flag_state", "facade", + "date_time", "latitude", "longitude", ] ] + .rename( + columns={ + "date_time": "triggering_behaviour_datetime_utc", + } + ) .reset_index(drop=True) ) return vessels_at_sea @@ -426,6 +439,7 @@ def merge_risk_factor( yesterday_at_eight_pm, today_at_zero_hours, utcnow, + period_start_hours_from_now, ) = get_dates(days_without_far) positions_at_sea_yesterday_everywhere_query = make_positions_at_sea_query( @@ -497,7 +511,9 @@ def merge_risk_factor( districts_columns_to_add=["dml"], ) alerts = make_alerts(vessels_with_missing_fars, alert_type, alert_config_name) - silenced_alerts = extract_silenced_alerts(alert_type) + silenced_alerts = extract_silenced_alerts( + alert_type, number_of_hours=period_start_hours_from_now + ) alert_without_silenced = filter_alerts(alerts, silenced_alerts) # Load diff --git a/datascience/src/pipeline/flows/position_alerts.py b/datascience/src/pipeline/flows/position_alerts.py index a9ec775415..b1e5c2e5ef 100644 --- a/datascience/src/pipeline/flows/position_alerts.py +++ b/datascience/src/pipeline/flows/position_alerts.py @@ -439,7 +439,7 @@ def get_vessels_in_alert(positions_in_alert: pd.DataFrame) -> pd.DataFrame: ] .rename( columns={ - "date_time": "creation_date", + "date_time": "triggering_behaviour_datetime_utc", } ) .reset_index(drop=True) @@ -526,7 +526,9 @@ def get_vessels_in_alert(positions_in_alert: pd.DataFrame) -> pd.DataFrame: districts_columns_to_add=["dml"], ) alerts = make_alerts(vessels_in_alert, alert_type, alert_config_name) - silenced_alerts = extract_silenced_alerts(alert_type) + silenced_alerts = extract_silenced_alerts( + alert_type, number_of_hours=hours_from_now + ) alert_without_silenced = filter_alerts(alerts, silenced_alerts) load_alerts(alert_without_silenced, alert_config_name) diff --git a/datascience/src/pipeline/flows/suspicions_of_under_declaration_alerts.py b/datascience/src/pipeline/flows/suspicions_of_under_declaration_alerts.py index 0231ce7e6a..0b6ec59ded 100644 --- a/datascience/src/pipeline/flows/suspicions_of_under_declaration_alerts.py +++ b/datascience/src/pipeline/flows/suspicions_of_under_declaration_alerts.py @@ -36,7 +36,10 @@ def extract_suspicions_of_under_declaration(): AlertType.SUSPICION_OF_UNDER_DECLARATION_ALERT.value, ) silenced_alerts = extract_silenced_alerts( - AlertType.SUSPICION_OF_UNDER_DECLARATION_ALERT.value + AlertType.SUSPICION_OF_UNDER_DECLARATION_ALERT.value, + # 8 days, to cover the date range analyzed in + # `extract_suspicions_of_under_declaration` + number_of_hours=192, ) active_reportings = extract_active_reportings( AlertType.SUSPICION_OF_UNDER_DECLARATION_ALERT.value diff --git a/datascience/src/pipeline/queries/monitorfish/missing_deps.sql b/datascience/src/pipeline/queries/monitorfish/missing_deps.sql index 5d9fde426a..601c52acaf 100644 --- a/datascience/src/pipeline/queries/monitorfish/missing_deps.sql +++ b/datascience/src/pipeline/queries/monitorfish/missing_deps.sql @@ -17,7 +17,7 @@ WITH detected_recent_deps AS ( LEFT JOIN districts d ON d.district_code = v.district_code WHERE - p.date_time >= CURRENT_TIMESTAMP AT TIME ZONE 'UTC' - INTERVAL '48 hours' + p.date_time >= CURRENT_TIMESTAMP AT TIME ZONE 'UTC' - INTERVAL ':hours_from_now hours' AND p.date_time < CURRENT_TIMESTAMP AT TIME ZONE 'UTC' - INTERVAL '2 hours' AND p.is_at_port = false AND time_emitting_at_sea = INTERVAL '0' @@ -37,6 +37,7 @@ SELECT d.dml, d.flag_state, lp.risk_factor, + d.date_time AS triggering_behaviour_datetime_utc, d.latitude, d.longitude FROM detected_recent_deps d diff --git a/datascience/src/pipeline/queries/monitorfish/silenced_alerts.sql b/datascience/src/pipeline/queries/monitorfish/silenced_alerts.sql index 3248ba3041..11cf197751 100644 --- a/datascience/src/pipeline/queries/monitorfish/silenced_alerts.sql +++ b/datascience/src/pipeline/queries/monitorfish/silenced_alerts.sql @@ -1,8 +1,11 @@ -SELECT DISTINCT +SELECT internal_reference_number, external_reference_number, - ircs + ircs, + MAX(silenced_before_date) AT TIME ZONE 'UTC' AS silenced_before_date FROM silenced_alerts WHERE - NOW() < silenced_before_date + silenced_before_date > CURRENT_TIMESTAMP AT TIME ZONE 'UTC' - INTERVAL ':number_of_hours HOURS' AND value->>'type' = :alert_type +GROUP BY 1, 2, 3 +ORDER BY 1, 2, 3 \ No newline at end of file diff --git a/datascience/src/pipeline/queries/monitorfish/suspicions_of_under_declaration.sql b/datascience/src/pipeline/queries/monitorfish/suspicions_of_under_declaration.sql index d305ea7b96..0fd9f50cdb 100644 --- a/datascience/src/pipeline/queries/monitorfish/suspicions_of_under_declaration.sql +++ b/datascience/src/pipeline/queries/monitorfish/suspicions_of_under_declaration.sql @@ -51,6 +51,7 @@ SELECT fe.dml, fe.flag_state, lp.risk_factor, + DATE_TRUNC('day', CURRENT_TIMESTAMP AT TIME ZONE 'UTC') - INTERVAL '7 days' AS triggering_behaviour_datetime_utc, lp.latitude, lp.longitude FROM fishing_efforts fe diff --git a/datascience/src/pipeline/shared_tasks/alerts.py b/datascience/src/pipeline/shared_tasks/alerts.py index 20a98d380f..b6023f549c 100644 --- a/datascience/src/pipeline/shared_tasks/alerts.py +++ b/datascience/src/pipeline/shared_tasks/alerts.py @@ -15,22 +15,31 @@ from src.pipeline.generic_tasks import extract, load from src.pipeline.processing import ( df_to_dict_series, + join_on_multiple_keys, left_isin_right_by_decreasing_priority, ) from src.pipeline.utils import delete_rows, get_table @task(checkpoint=False) -def extract_silenced_alerts(alert_type: str) -> pd.DataFrame: +def extract_silenced_alerts(alert_type: str, number_of_hours: int = 0) -> pd.DataFrame: """ Return DataFrame of vessels with active silenced alerts of the given type. + + Args: + alert_type (str): Type of alert for which to extract silenced alerts + number_of_hours (int, optional): Number of hours from current time to extract. + Defaults to 0. + + Returns: + pd.DataFrame: Silenced alerts with columns """ alert_type = AlertType(alert_type) return extract( db_name="monitorfish_remote", query_filepath="monitorfish/silenced_alerts.sql", - params={"alert_type": alert_type.value}, + params={"alert_type": alert_type.value, "number_of_hours": number_of_hours}, ) @@ -119,10 +128,8 @@ def make_alerts( - `dml` - `flag_state` - `risk_factor` - - and optionally, `creation_date`, `latitude` and `longitude` - - If `creation_date` is not one of the columns, it will be added and filled with - `datetime.utcnow`. + - `triggering_behaviour_datetime_utc` + - and optionally, `latitude` and `longitude` If `latitude` and `longitude` are not columns of the input, they are added and filled with null values in the result. @@ -132,7 +139,6 @@ def make_alerts( create an alert. alert_type (str): `type` to specify in the built alerts. alert_config_name (str): `alert_config_name` to specify in the built alerts. - creation_date (datetime): `creation_date` to specify in the built alerts. Returns: pd.DataFrame: `DataFrame` of alerts. @@ -145,8 +151,7 @@ def make_alerts( } ) - if "creation_date" not in alerts: - alerts["creation_date"] = datetime.utcnow() + alerts["creation_date"] = datetime.utcnow() if "latitude" not in alerts: alerts["latitude"] = None @@ -175,6 +180,7 @@ def make_alerts( "flag_state", "vessel_id", "vessel_identifier", + "triggering_behaviour_datetime_utc", "creation_date", "latitude", "longitude", @@ -209,12 +215,16 @@ def filter_alerts( - vessel_identifier - flag_state - facade + - triggering_behaviour_datetime_utc - creation_date - latitude - longitude - value - alert_config_name + and the `silenced_alerts` DataFrame must have a `silenced_before_date` + column. + Args: alerts (pd.DataFrame): positions alerts. vessels_with_silenced_alerts (pd.DataFrame): vessels with silenced alerts. @@ -224,18 +234,14 @@ def filter_alerts( """ vessel_id_cols = ["internal_reference_number", "external_reference_number", "ircs"] - if isinstance(vessels_with_active_reportings, pd.DataFrame): - vessels_to_remove = ( - pd.concat([vessels_with_silenced_alerts, vessels_with_active_reportings]) - .drop_duplicates() - .reset_index(drop=True) - ) - else: - vessels_to_remove = vessels_with_silenced_alerts + alerts = join_on_multiple_keys( + alerts, vessels_with_silenced_alerts, or_join_keys=vessel_id_cols, how="left" + ) alerts = alerts.loc[ - ~left_isin_right_by_decreasing_priority( - alerts[vessel_id_cols], vessels_to_remove[vessel_id_cols] + ( + (alerts.silenced_before_date.isna()) + | (alerts.triggering_behaviour_datetime_utc > alerts.silenced_before_date) ), [ "vessel_name", @@ -251,7 +257,16 @@ def filter_alerts( "value", "alert_config_name", ], - ].reset_index(drop=True) + ] + + if isinstance(vessels_with_active_reportings, pd.DataFrame): + alerts = alerts.loc[ + ~left_isin_right_by_decreasing_priority( + alerts[vessel_id_cols], vessels_with_active_reportings[vessel_id_cols] + ) + ] + + alerts = alerts.sort_values("internal_reference_number").reset_index(drop=True) return alerts diff --git a/datascience/tests/test_data/remote_database/V666.19__Reset_test_silenced_alerts.sql b/datascience/tests/test_data/remote_database/V666.19__Reset_test_silenced_alerts.sql index a2d761447f..adb373040c 100644 --- a/datascience/tests/test_data/remote_database/V666.19__Reset_test_silenced_alerts.sql +++ b/datascience/tests/test_data/remote_database/V666.19__Reset_test_silenced_alerts.sql @@ -13,4 +13,14 @@ INSERT INTO silenced_alerts ( 'DEVINER FIGURE CONSCIENCE', 'ABC000542519', 'RO237719', 'FQ7058', 'INTERNAL_REFERENCE_NUMBER', NOW() + ('15 DAYS')::interval, 'FR', '{"type": "MISSING_FAR_ALERT", "seaFront": "NAMO"}' +), +( + 'DEVINER FIGURE CONSCIENCE', 'ABC000542519', 'RO237719', 'FQ7058', 'INTERNAL_REFERENCE_NUMBER', + NOW() + ('7 DAYS')::interval, 'FR', + '{"type": "MISSING_FAR_ALERT", "seaFront": "NAMO"}' +), +( + 'AUTRE NAVIRE', 'ABC000123456', NULL, NULL, 'INTERNAL_REFERENCE_NUMBER', + NOW() - ('5 HOURS')::interval, 'FR', + '{"type": "MISSING_FAR_ALERT", "seaFront": "NAMO"}' ); diff --git a/datascience/tests/test_pipeline/test_flows/test_missing_dep_alerts.py b/datascience/tests/test_pipeline/test_flows/test_missing_dep_alerts.py index 4caab16a2c..c8fdc940d6 100644 --- a/datascience/tests/test_pipeline/test_flows/test_missing_dep_alerts.py +++ b/datascience/tests/test_pipeline/test_flows/test_missing_dep_alerts.py @@ -1,4 +1,4 @@ -from datetime import datetime, timezone +from datetime import datetime, timedelta, timezone import pandas as pd import pytest @@ -26,6 +26,9 @@ def expected_missing_deps() -> pd.DataFrame: "dml": ["DML 29"], "flag_state": ["FR"], "risk_factor": [2.58], + "triggering_behaviour_datetime_utc": [ + datetime.utcnow() - timedelta(hours=2) + ], "latitude": [49.606], "longitude": [-0.736], } @@ -83,8 +86,23 @@ def reset_test_data_missing_dep_alerts(reset_test_data): def test_extract_missing_deps( reset_test_data_missing_dep_alerts, expected_missing_deps ): - res = extract_missing_deps.run() - pd.testing.assert_frame_equal(res, expected_missing_deps) + res = extract_missing_deps.run(hours_from_now=48) + pd.testing.assert_frame_equal( + res.drop(columns=["triggering_behaviour_datetime_utc"]), + expected_missing_deps.drop(columns=["triggering_behaviour_datetime_utc"]), + ) + + assert ( + ( + ( + res.triggering_behaviour_datetime_utc + - expected_missing_deps.triggering_behaviour_datetime_utc + ) + .map(lambda td: td.total_seconds()) + .abs() + ) + < 10 + ).all() def test_flow(reset_test_data_missing_dep_alerts): diff --git a/datascience/tests/test_pipeline/test_flows/test_missing_far_alerts.py b/datascience/tests/test_pipeline/test_flows/test_missing_far_alerts.py index a1c8fc6999..2c8d1cfbbd 100644 --- a/datascience/tests/test_pipeline/test_flows/test_missing_far_alerts.py +++ b/datascience/tests/test_pipeline/test_flows/test_missing_far_alerts.py @@ -88,6 +88,36 @@ def positions_at_sea(): ) +@pytest.fixture +def expected_vessels_at_sea_1_day() -> pd.DataFrame: + d = datetime(2020, 2, 5, 12, 56, 0) + td = timedelta(hours=1) + + return pd.DataFrame( + { + "cfr": ["A", "B"], + "external_immatriculation": ["AA", "BB"], + "ircs": ["AAA", "BBB"], + "vessel_name": ["vessel_A", "vessel_B"], + "flag_state": ["state_A", "state_B"], + "facade": ["facade_A", "facade_B"], + "triggering_behaviour_datetime_utc": [d + 48 * td, d], + "latitude": [1.5, -5.65], + "longitude": [10.6, -8.96], + } + ) + + +@pytest.fixture +def expected_vessels_at_sea_2_or_3_days(expected_vessels_at_sea_1_day) -> pd.DataFrame: + return expected_vessels_at_sea_1_day.head(1) + + +@pytest.fixture +def expected_vessels_at_sea_4_days(expected_vessels_at_sea_1_day) -> pd.DataFrame: + return expected_vessels_at_sea_1_day.head(0) + + @patch( "src.pipeline.flows.missing_far_alerts.datetime", mock_datetime_utcnow(datetime(2021, 1, 1, 16, 10, 0)), @@ -98,22 +128,26 @@ def test_get_dates(): yesterday_at_eight_pm, today_at_zero_hours, utcnow, + period_start_hours_from_now, ) = get_dates.run(days_without_far=1) assert period_start_at_zero_hours == datetime(2020, 12, 31, 0, 0, 0) assert yesterday_at_eight_pm == datetime(2020, 12, 31, 20, 0, 0) assert today_at_zero_hours == datetime(2021, 1, 1, 0, 0, 0) assert utcnow == datetime(2021, 1, 1, 16, 10, 0) + assert period_start_hours_from_now == pytest.approx(40.16, abs=0.01) ( period_start_at_zero_hours, yesterday_at_eight_pm, today_at_zero_hours, utcnow, + period_start_hours_from_now, ) = get_dates.run(days_without_far=2) assert period_start_at_zero_hours == datetime(2020, 12, 30, 0, 0, 0) assert yesterday_at_eight_pm == datetime(2020, 12, 31, 20, 0, 0) assert today_at_zero_hours == datetime(2021, 1, 1, 0, 0, 0) assert utcnow == datetime(2021, 1, 1, 16, 10, 0) + assert period_start_hours_from_now == pytest.approx(64.16, abs=0.01) def test_make_positions_at_sea_query(): @@ -270,45 +304,25 @@ def test_extract_vessels_that_emitted_fars(reset_test_data): assert vessels_that_emitted_fars == {"ABC000306959", "ABC000542519"} -def test_get_vessels_at_sea(positions_at_sea): +def test_get_vessels_at_sea( + positions_at_sea, + expected_vessels_at_sea_1_day, + expected_vessels_at_sea_2_or_3_days, + expected_vessels_at_sea_4_days, +): vessels_at_sea_1_day = get_vessels_at_sea.run(positions_at_sea, min_days=1) - expected_vessels_at_sea_1_day = pd.DataFrame( - { - "cfr": ["A", "B"], - "external_immatriculation": ["AA", "BB"], - "ircs": ["AAA", "BBB"], - "vessel_name": ["vessel_A", "vessel_B"], - "flag_state": ["state_A", "state_B"], - "facade": ["facade_A", "facade_B"], - "latitude": [1.5, -5.65], - "longitude": [10.6, -8.96], - } - ) - pd.testing.assert_frame_equal(vessels_at_sea_1_day, expected_vessels_at_sea_1_day) - vessels_at_sea_2_days = get_vessels_at_sea.run(positions_at_sea, min_days=2) vessels_at_sea_3_days = get_vessels_at_sea.run(positions_at_sea, min_days=3) - expected_vessels_at_sea_2_or_3_days = pd.DataFrame( - { - "cfr": ["A"], - "external_immatriculation": ["AA"], - "ircs": ["AAA"], - "vessel_name": ["vessel_A"], - "flag_state": ["state_A"], - "facade": ["facade_A"], - "latitude": [1.5], - "longitude": [10.6], - } - ) + vessels_at_sea_4_days = get_vessels_at_sea.run(positions_at_sea, min_days=4) + + pd.testing.assert_frame_equal(vessels_at_sea_1_day, expected_vessels_at_sea_1_day) pd.testing.assert_frame_equal( vessels_at_sea_2_days, expected_vessels_at_sea_2_or_3_days ) pd.testing.assert_frame_equal( vessels_at_sea_3_days, expected_vessels_at_sea_2_or_3_days ) - - vessels_at_sea_4_days = get_vessels_at_sea.run(positions_at_sea, min_days=4) - assert len(vessels_at_sea_4_days) == 0 + pd.testing.assert_frame_equal(vessels_at_sea_4_days, expected_vessels_at_sea_4_days) def test_concat(): diff --git a/datascience/tests/test_pipeline/test_flows/test_position_alerts.py b/datascience/tests/test_pipeline/test_flows/test_position_alerts.py index 9df8e2f5f1..0bd8d04ab8 100644 --- a/datascience/tests/test_pipeline/test_flows/test_position_alerts.py +++ b/datascience/tests/test_pipeline/test_flows/test_position_alerts.py @@ -428,7 +428,7 @@ def test_get_vessels_in_alert(): "INTERNAL_REFERENCE_NUMBER", "INTERNAL_REFERENCE_NUMBER", ], - "creation_date": [now, now - 0.5 * td], + "triggering_behaviour_datetime_utc": [now, now - 0.5 * td], "latitude": [-1.23, -51.23], "longitude": [39.25, -42.25], } @@ -561,11 +561,11 @@ def test_flow_inserts_new_pending_alerts(reset_test_data): "ZZ000000", ], "creation_date": [ - now - timedelta(days=1), - now - timedelta(minutes=10), - now - timedelta(minutes=25), - now - timedelta(minutes=15), - now - timedelta(minutes=10), + now, + now, + now, + now, + now, ], "trip_number": [None, None, None, None, None], "value": [ @@ -706,10 +706,10 @@ def test_flow_inserts_new_pending_alerts_without_silenced_alerts(reset_test_data "ZZ000000", ], "creation_date": [ - now - timedelta(days=1), - now - timedelta(minutes=10), - now - timedelta(minutes=25), - now - timedelta(minutes=10), + now, + now, + now, + now, ], "trip_number": [None, None, None, None], "value": [ @@ -829,8 +829,8 @@ def test_flow_filters_on_gears(reset_test_data): "FQ7058", ], "creation_date": [ - now - timedelta(days=1), - now - timedelta(minutes=25), + now, + now, ], "trip_number": [None, None], "value": [ @@ -940,9 +940,9 @@ def test_flow_filters_on_time(reset_test_data): "ZZ000000", ], "creation_date": [ - now - timedelta(minutes=10), - now - timedelta(minutes=25), - now - timedelta(minutes=10), + now, + now, + now, ], "trip_number": [None, None, None], "value": [ @@ -1051,7 +1051,7 @@ def test_flow_filters_on_flag_states(reset_test_data): "IL2468", ], "creation_date": [ - now - timedelta(days=1), + now, ], "trip_number": [None], "value": [ diff --git a/datascience/tests/test_pipeline/test_flows/test_suspicions_of_under_declaration_alerts.py b/datascience/tests/test_pipeline/test_flows/test_suspicions_of_under_declaration_alerts.py index 403a9df238..a364b61655 100644 --- a/datascience/tests/test_pipeline/test_flows/test_suspicions_of_under_declaration_alerts.py +++ b/datascience/tests/test_pipeline/test_flows/test_suspicions_of_under_declaration_alerts.py @@ -1,4 +1,4 @@ -from datetime import datetime, timezone +from datetime import datetime, timedelta, timezone import pandas as pd import pytest @@ -17,6 +17,10 @@ @pytest.fixture def expected_suspicions_of_under_declaration() -> pd.DataFrame: + today_at_zero_hours = datetime.utcnow().replace( + hour=0, minute=0, second=0, microsecond=0 + ) + one_day = timedelta(days=1) return pd.DataFrame( { "cfr": ["ABC000306959"], @@ -29,6 +33,7 @@ def expected_suspicions_of_under_declaration() -> pd.DataFrame: "dml": ["DML 29"], "flag_state": ["FR"], "risk_factor": [2.58], + "triggering_behaviour_datetime_utc": [today_at_zero_hours - 7 * one_day], "latitude": [49.606], "longitude": [-0.736], } diff --git a/datascience/tests/test_pipeline/test_shared_tasks/test_alerts.py b/datascience/tests/test_pipeline/test_shared_tasks/test_alerts.py index 2d290b6b59..296e3c8d11 100644 --- a/datascience/tests/test_pipeline/test_shared_tasks/test_alerts.py +++ b/datascience/tests/test_pipeline/test_shared_tasks/test_alerts.py @@ -2,6 +2,7 @@ from unittest.mock import patch import pandas as pd +import pytest from src.pipeline.entities.alerts import AlertType from src.pipeline.shared_tasks.alerts import ( @@ -23,14 +24,29 @@ def test_extract_silenced_alerts(reset_test_data): silenced_alerts = extract_silenced_alerts.run( AlertType.THREE_MILES_TRAWLING_ALERT.value ) + now = datetime.utcnow() + d = timedelta(days=1) + h = timedelta(hours=1) + expected_silenced_alerts = pd.DataFrame( { "internal_reference_number": ["ABC000658985"], "external_reference_number": ["OHMYGOSH"], "ircs": ["OGMJ"], + "silenced_before_date": [now + 15 * d], } ) - pd.testing.assert_frame_equal(silenced_alerts, expected_silenced_alerts) + pd.testing.assert_frame_equal( + silenced_alerts.drop(columns=["silenced_before_date"]), + expected_silenced_alerts.drop(columns=["silenced_before_date"]), + ) + assert ( + ( + silenced_alerts.silenced_before_date + - expected_silenced_alerts.silenced_before_date + ).map(lambda td: td.total_seconds()) + < 10 + ).all() silenced_alerts = extract_silenced_alerts.run(AlertType.MISSING_FAR_ALERT.value) expected_silenced_alerts = pd.DataFrame( @@ -38,9 +54,43 @@ def test_extract_silenced_alerts(reset_test_data): "internal_reference_number": ["ABC000542519"], "external_reference_number": ["RO237719"], "ircs": ["FQ7058"], + "silenced_before_date": [now + 15 * d], } ) - pd.testing.assert_frame_equal(silenced_alerts, expected_silenced_alerts) + pd.testing.assert_frame_equal( + silenced_alerts.drop(columns=["silenced_before_date"]), + expected_silenced_alerts.drop(columns=["silenced_before_date"]), + ) + assert ( + ( + silenced_alerts.silenced_before_date + - expected_silenced_alerts.silenced_before_date + ).map(lambda td: td.total_seconds()) + < 10 + ).all() + + silenced_alerts = extract_silenced_alerts.run( + AlertType.MISSING_FAR_ALERT.value, number_of_hours=6 + ) + expected_silenced_alerts = pd.DataFrame( + { + "internal_reference_number": ["ABC000123456", "ABC000542519"], + "external_reference_number": [None, "RO237719"], + "ircs": [None, "FQ7058"], + "silenced_before_date": [now - 5 * h, now + 15 * d], + } + ) + pd.testing.assert_frame_equal( + silenced_alerts.drop(columns=["silenced_before_date"]), + expected_silenced_alerts.drop(columns=["silenced_before_date"]), + ) + assert ( + ( + silenced_alerts.silenced_before_date + - expected_silenced_alerts.silenced_before_date + ).map(lambda td: td.total_seconds()) + < 10 + ).all() def test_extract_active_reportings(reset_test_data): @@ -118,7 +168,7 @@ def test_make_alerts(): "INTERNAL_REFERENCE_NUMBER", ], "risk_factor": [1.23, 3.56], - "creation_date": [date_1, date_2], + "triggering_behaviour_datetime_utc": [date_1, date_2], "latitude": [9.8, -1.963], "longitude": [65.59, -81.71], "vessel_id": [1, 12], @@ -144,7 +194,8 @@ def test_make_alerts(): "INTERNAL_REFERENCE_NUMBER", "INTERNAL_REFERENCE_NUMBER", ], - "creation_date": [date_1, date_2], + "triggering_behaviour_datetime_utc": [date_1, date_2], + "creation_date": [datetime(2020, 5, 3, 8, 0, 0)] * 2, "latitude": [9.8, -1.963], "longitude": [65.59, -81.71], "type": ["MISSING_FAR_ALERT", "MISSING_FAR_ALERT"], @@ -167,14 +218,12 @@ def test_make_alerts(): "MISSING_FAR_ALERT_CONFIG_1", ], } - ).astype({"creation_date": "datetime64[ns]"}) + ).astype({"creation_date": "datetime64[us]"}) pd.testing.assert_frame_equal(alerts, expected_alerts) # Without optional in input - vessels_in_alert = vessels_in_alert.drop( - columns=["creation_date", "latitude", "longitude"] - ) + vessels_in_alert = vessels_in_alert.drop(columns=["latitude", "longitude"]) alerts = make_alerts.run( vessels_in_alert, alert_type="MISSING_FAR_ALERT", @@ -191,13 +240,22 @@ def test_make_alerts(): pd.testing.assert_frame_equal(alerts, expected_alerts) -def test_filter_alerts(): - now = datetime(2020, 1, 1, 0, 0, 0) - td = timedelta(hours=1) +@pytest.fixture +def first_of_january_2000() -> datetime: + return datetime(2020, 1, 1, 0, 0, 0) + + +@pytest.fixture +def one_day() -> timedelta: + return timedelta(days=1) + + +@pytest.fixture +def alerts_to_filter(first_of_january_2000, one_day) -> pd.DataFrame: alert_type = "USER_DEFINED_ALERT_TYPE" alert_config_name = "ALERTE_CHALUTAGE_CONFIG_1" - alerts = pd.DataFrame( + return pd.DataFrame( { "vessel_name": ["v_A", "v_B", "v_C"], "internal_reference_number": ["A", "B", "C"], @@ -210,7 +268,12 @@ def test_filter_alerts(): "INTERNAL_REFERENCE_NUMBER", "INTERNAL_REFERENCE_NUMBER", ], - "creation_date": [now, now - 0.5 * td, now - td], + "triggering_behaviour_datetime_utc": [ + first_of_january_2000 - 0.5 * one_day, + first_of_january_2000 - 0.5 * one_day, + first_of_january_2000 - one_day, + ], + "creation_date": [first_of_january_2000] * 3, "latitude": [9.8, -1.963, -2.365], "longitude": [65.59, -81.71, 46.894], "type": [alert_type, alert_type, alert_type], @@ -242,51 +305,58 @@ def test_filter_alerts(): } ) - silenced_alerts = pd.DataFrame( + +@pytest.fixture +def silenced_alerts_1(first_of_january_2000, one_day) -> pd.DataFrame: + return pd.DataFrame( { - "internal_reference_number": ["A", "B_ANOTHER_VESSEL"], - "external_reference_number": ["AA", "BB_ANOTHER_VESSEL"], - "ircs": ["AAA", "BBB"], + "internal_reference_number": ["A", "B_ANOTHER_VESSEL", "C"], + "external_reference_number": ["AA", "BB_ANOTHER_VESSEL", "CC"], + "ircs": ["AAA", "BBB", "CCC"], + "silenced_before_date": [first_of_january_2000 - 1.25 * one_day] * 3, } ) - active_alerts = filter_alerts.run(alerts, silenced_alerts) - expected_active_alerts = pd.DataFrame( - { - "vessel_name": ["v_B", "v_C"], - "internal_reference_number": ["B", "C"], - "external_reference_number": ["BB", "CC"], - "ircs": ["BBB", "CCC"], - "flag_state": ["FR", "FR"], - "vessel_id": [12, 15], - "vessel_identifier": [ - "INTERNAL_REFERENCE_NUMBER", - "INTERNAL_REFERENCE_NUMBER", - ], - "creation_date": [now - 0.5 * td, now - td], - "latitude": [-1.963, -2.365], - "longitude": [-81.71, 46.894], - "value": [ - { - "seaFront": "MEMN", - "type": alert_type, - "riskFactor": None, - "dml": "dml B", - }, - { - "seaFront": "MEMN", - "type": alert_type, - "riskFactor": 2.56, - "dml": "dml C", - }, - ], - "alert_config_name": [alert_config_name, alert_config_name], - } - ).reset_index(drop=True) +@pytest.fixture +def silenced_alerts_2( + silenced_alerts_1, first_of_january_2000, one_day +) -> pd.DataFrame: + return silenced_alerts_1.assign( + silenced_before_date=first_of_january_2000 - 0.75 * one_day + ) + + +@pytest.fixture +def silenced_alerts_3( + silenced_alerts_1, first_of_january_2000, one_day +) -> pd.DataFrame: + return silenced_alerts_1.assign( + silenced_before_date=first_of_january_2000 - 0.25 * one_day + ) + + +@pytest.fixture +def expected_filtered_alerts(alerts_to_filter) -> pd.DataFrame: + return alerts_to_filter.drop(columns=["type", "triggering_behaviour_datetime_utc"]) + + +def test_filter_alerts( + alerts_to_filter, + silenced_alerts_1, + silenced_alerts_2, + silenced_alerts_3, + expected_filtered_alerts, +): + filtered_alerts_1 = filter_alerts.run(alerts_to_filter, silenced_alerts_1) + pd.testing.assert_frame_equal(filtered_alerts_1, expected_filtered_alerts) + + filtered_alerts_2 = filter_alerts.run(alerts_to_filter, silenced_alerts_2) + pd.testing.assert_frame_equal(filtered_alerts_2, expected_filtered_alerts.head(2)) + filtered_alerts_3 = filter_alerts.run(alerts_to_filter, silenced_alerts_3) pd.testing.assert_frame_equal( - active_alerts.reset_index(drop=True), expected_active_alerts + filtered_alerts_3, expected_filtered_alerts.loc[[1]].reset_index(drop=True) )