Skip to content

Commit

Permalink
Invalidation du cache de ports lors des mises à jour de référentiel (#…
Browse files Browse the repository at this point in the history
…3985)

## Linked issues

- Resolve #3975

## TODO
- [x] @louptheron passer les caches de ports en durées infinies
  • Loading branch information
louptheron authored Jan 14, 2025
2 parents 2d26c59 + 40d9035 commit 337a105
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -129,9 +129,9 @@ class CaffeineConfiguration {
val missionControlUnitsCache = buildMinutesCache(missionControlUnits, ticker, 120)

// Ports
val portsCache = buildMinutesCache(ports, ticker, oneWeek)
val activePortsCache = buildMinutesCache(activePorts, ticker, oneDay)
val portCache = buildMinutesCache(port, ticker, oneWeek)
val portsCache = buildPermanentCache(ports)
val activePortsCache = buildPermanentCache(activePorts)
val portCache = buildPermanentCache(port)

// Risk Factors
val riskFactorCache = buildMinutesCache(riskFactor, ticker, 1)
Expand Down Expand Up @@ -245,6 +245,17 @@ class CaffeineConfiguration {
)
}

private fun buildPermanentCache(
name: String,
): CaffeineCache {
return CaffeineCache(
name,
Caffeine.newBuilder()
.recordStats()
.build(),
)
}

private fun buildSecondsCache(
name: String,
ticker: Ticker,
Expand Down
1 change: 1 addition & 0 deletions datascience/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,7 @@
REPORTING_ARCHIVING_ENDPOINT_TEMPLATE = (
API_ENDPOINT + "reportings/{reporting_id}/archive"
)
PORTS_CACHE_INVALIDATION_ENDPOINT = API_ENDPOINT + "ports/invalidate"

# Backend api key
BACKEND_API_KEY = os.environ.get("MONITORFISH_BACKEND_API_KEY")
Expand Down
9 changes: 8 additions & 1 deletion datascience/src/pipeline/flows/ports.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

from config import (
IS_INTEGRATION,
PORTS_CACHE_INVALIDATION_ENDPOINT,
PORTS_CSV_RESOURCE_ID,
PORTS_CSV_RESOURCE_TITLE,
PORTS_DATASET_ID,
Expand Down Expand Up @@ -863,6 +864,11 @@ def transform_ports_open_data(ports: pd.DataFrame) -> pd.DataFrame:
return ports_open_data


@task(checkpoint=False)
def invalidate_cache():
requests.put(PORTS_CACHE_INVALIDATION_ENDPOINT)


@task(checkpoint=False)
def load_ports(ports):
load(
Expand Down Expand Up @@ -900,7 +906,8 @@ def load_ports(ports):
ports_open_data = transform_ports_open_data(ports)

# Load
load_ports(ports)
loaded_ports = load_ports(ports)
invalidate_cache(upstream_tasks=[loaded_ports])

ports_open_data_csv_file = get_csv_file_object(ports_open_data)
update_resource(
Expand Down
18 changes: 17 additions & 1 deletion datascience/tests/test_pipeline/test_flows/test_ports.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
from io import BytesIO
from unittest.mock import patch

import pandas as pd
import pytest
from prefect import task

from src.pipeline.flows.ports import flow
from src.pipeline.flows.ports import flow, invalidate_cache
from src.read_query import read_query
from tests.mocks import mock_check_flow_not_running, mock_update_resource

Expand Down Expand Up @@ -57,9 +58,18 @@ def mock_extract_local_ports() -> pd.DataFrame:
return local_ports_data


@task(checkpoint=False)
def mock_invalidate_cache() -> pd.DataFrame:
with patch("src.pipeline.flows.ports.requests") as mock_requests:
invalidate_cache.run()

return mock_requests


flow.replace(flow.get_tasks("check_flow_not_running")[0], mock_check_flow_not_running)
flow.replace(flow.get_tasks("extract_local_ports")[0], mock_extract_local_ports)
flow.replace(flow.get_tasks("update_resource")[0], mock_update_resource)
flow.replace(flow.get_tasks("invalidate_cache")[0], mock_invalidate_cache)


def test_flow(reset_test_data, expected_ports_open_data, expected_loaded_ports):
Expand All @@ -86,3 +96,9 @@ def test_flow(reset_test_data, expected_ports_open_data, expected_loaded_ports):
expected_ports_open_data.convert_dtypes(),
check_like=True,
)
mock_invalidate_cache_result = state.result[
flow.get_tasks("mock_invalidate_cache")[0]
].result
mock_invalidate_cache_result.put.assert_called_once_with(
"https://monitor.fish/api/v1/ports/invalidate"
)

0 comments on commit 337a105

Please sign in to comment.