Skip to content

Commit

Permalink
Invalidate ports caches in ports flow
Browse files Browse the repository at this point in the history
  • Loading branch information
VincentAntoine committed Dec 20, 2024
1 parent a0ded4d commit b045446
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 2 deletions.
1 change: 1 addition & 0 deletions datascience/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,7 @@
REPORTING_ARCHIVING_ENDPOINT_TEMPLATE = (
API_ENDPOINT + "reportings/{reporting_id}/archive"
)
PORTS_CACHE_INVALIDATION_ENDPOINT = API_ENDPOINT + "ports/invalidate"

# Backend api key
BACKEND_API_KEY = os.environ.get("MONITORFISH_BACKEND_API_KEY")
Expand Down
9 changes: 8 additions & 1 deletion datascience/src/pipeline/flows/ports.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

from config import (
IS_INTEGRATION,
PORTS_CACHE_INVALIDATION_ENDPOINT,
PORTS_CSV_RESOURCE_ID,
PORTS_CSV_RESOURCE_TITLE,
PORTS_DATASET_ID,
Expand Down Expand Up @@ -863,6 +864,11 @@ def transform_ports_open_data(ports: pd.DataFrame) -> pd.DataFrame:
return ports_open_data


@task(checkpoint=False)
def invalidate_cache():
requests.put(PORTS_CACHE_INVALIDATION_ENDPOINT)


@task(checkpoint=False)
def load_ports(ports):
load(
Expand Down Expand Up @@ -900,7 +906,8 @@ def load_ports(ports):
ports_open_data = transform_ports_open_data(ports)

# Load
load_ports(ports)
loaded_ports = load_ports(ports)
invalidate_cache(upstream_tasks=[loaded_ports])

ports_open_data_csv_file = get_csv_file_object(ports_open_data)
update_resource(
Expand Down
18 changes: 17 additions & 1 deletion datascience/tests/test_pipeline/test_flows/test_ports.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
from io import BytesIO
from unittest.mock import patch

import pandas as pd
import pytest
from prefect import task

from src.pipeline.flows.ports import flow
from src.pipeline.flows.ports import flow, invalidate_cache
from src.read_query import read_query
from tests.mocks import mock_check_flow_not_running, mock_update_resource

Expand Down Expand Up @@ -57,9 +58,18 @@ def mock_extract_local_ports() -> pd.DataFrame:
return local_ports_data


@task(checkpoint=False)
def mock_invalidate_cache() -> pd.DataFrame:
with patch("src.pipeline.flows.ports.requests") as mock_requests:
invalidate_cache.run()

return mock_requests


flow.replace(flow.get_tasks("check_flow_not_running")[0], mock_check_flow_not_running)
flow.replace(flow.get_tasks("extract_local_ports")[0], mock_extract_local_ports)
flow.replace(flow.get_tasks("update_resource")[0], mock_update_resource)
flow.replace(flow.get_tasks("invalidate_cache")[0], mock_invalidate_cache)


def test_flow(reset_test_data, expected_ports_open_data, expected_loaded_ports):
Expand All @@ -86,3 +96,9 @@ def test_flow(reset_test_data, expected_ports_open_data, expected_loaded_ports):
expected_ports_open_data.convert_dtypes(),
check_like=True,
)
mock_invalidate_cache_result = state.result[
flow.get_tasks("mock_invalidate_cache")[0]
].result
mock_invalidate_cache_result.put.assert_called_once_with(
"https://monitor.fish/api/v1/ports/invalidate"
)

0 comments on commit b045446

Please sign in to comment.