From 851e4f0f30af36ef2f643d69184afeacf6fd9074 Mon Sep 17 00:00:00 2001 From: Vincent Degove Date: Mon, 10 Feb 2025 13:22:38 +0100 Subject: [PATCH] =?UTF-8?q?Cache=20pr=C3=A9emptif=20des=20statistiques=20(?= =?UTF-8?q?#4435)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Antoine Augusti --- apps/transport/lib/transport/application.ex | 1 + .../lib/transport/preemptive_api_cache.ex | 35 +++---------- .../lib/transport/preemptive_base_cache.ex | 50 +++++++++++++++++++ .../lib/transport/preemptive_stats_cache.ex | 39 +++++++++++++++ .../api/controllers/stats_controller.ex | 13 +++-- .../controllers/stats_controller.ex | 7 ++- .../controllers/api/stats_controller_test.exs | 6 +-- 7 files changed, 113 insertions(+), 38 deletions(-) create mode 100644 apps/transport/lib/transport/preemptive_base_cache.ex create mode 100644 apps/transport/lib/transport/preemptive_stats_cache.ex diff --git a/apps/transport/lib/transport/application.ex b/apps/transport/lib/transport/application.ex index 98206c9497..50100f1d77 100644 --- a/apps/transport/lib/transport/application.ex +++ b/apps/transport/lib/transport/application.ex @@ -45,6 +45,7 @@ defmodule Transport.Application do |> add_scheduler() |> add_if(fn -> run_web_processes end, Transport.RealtimePoller) |> add_if(fn -> run_web_processes end, Transport.PreemptiveAPICache) + |> add_if(fn -> run_web_processes end, Transport.PreemptiveStatsCache) ## manually add a children supervisor that is not scheduled |> Kernel.++([{Task.Supervisor, name: ImportTaskSupervisor}]) diff --git a/apps/transport/lib/transport/preemptive_api_cache.ex b/apps/transport/lib/transport/preemptive_api_cache.ex index da9a8bc94b..d5c19ad248 100644 --- a/apps/transport/lib/transport/preemptive_api_cache.ex +++ b/apps/transport/lib/transport/preemptive_api_cache.ex @@ -3,36 +3,13 @@ defmodule Transport.PreemptiveAPICache do A module that populates the Cachex cache for the /api/datasets endpoint ("api-datasets-index") """ - use GenServer - require Logger - - @job_delay :timer.seconds(300) - # slightly more than twice `@job_delay` to reduce the risk of parallel computation - @cache_ttl :timer.seconds(700) - - def cache_ttl, do: @cache_ttl - - def start_link(_opts) do - GenServer.start_link(__MODULE__, %{}) - end - - def init(state) do - # initial schedule is immediate, but via the same code path, - # to ensure we jump on the data - schedule_next_occurrence(0) + use Transport.PreemptiveBaseCache, + first_run: 0, + job_delay: :timer.seconds(300), + # more than twice job_delay to reduce the risk of parallel computation + cache_ttl: :timer.seconds(700) - {:ok, state} - end - - def schedule_next_occurrence(delay \\ @job_delay) do - Process.send_after(self(), :tick, delay) - end - - def handle_info(:tick, state) do - schedule_next_occurrence() - populate_cache() - {:noreply, state} - end + require Logger def populate_cache do Logger.info("[preemptive-api-cache] Populating cache for /api/datasets…") diff --git a/apps/transport/lib/transport/preemptive_base_cache.ex b/apps/transport/lib/transport/preemptive_base_cache.ex new file mode 100644 index 0000000000..14478b3c29 --- /dev/null +++ b/apps/transport/lib/transport/preemptive_base_cache.ex @@ -0,0 +1,50 @@ +defmodule Transport.PreemptiveBaseCache do + @moduledoc """ + Common code for preemptive caches. This module is a macro that generates a GenServer, + which will populate a cache at regular intervals (similar to a cron job). + + Usage: + ``` + use Transport.PreemptiveBaseCache, + first_run: 0, + job_delay: :timer.seconds(300), + cache_ttl: :timer.seconds(700) + ``` + The module in which it is used must implement the `populate_cache/0` function, that will be regularly called. + + - First run indicates the time to wait before the first run of the job between the start of the application and the first run. + - Job delay indicates the time to wait between each run of the job. + - Cache TTL indicates the time to keep the cache alive. + """ + defmacro __using__(opts) do + quote do + use GenServer + + @first_run unquote(opts[:first_run]) + @job_delay unquote(opts[:job_delay]) + @cache_ttl unquote(opts[:cache_ttl]) + + def cache_ttl, do: @cache_ttl + + def start_link(_opts) do + GenServer.start_link(__MODULE__, %{}) + end + + def init(state) do + schedule_next_occurrence(@first_run) + + {:ok, state} + end + + def schedule_next_occurrence(delay) do + Process.send_after(self(), :tick, delay) + end + + def handle_info(:tick, state) do + schedule_next_occurrence(@job_delay) + populate_cache() + {:noreply, state} + end + end + end +end diff --git a/apps/transport/lib/transport/preemptive_stats_cache.ex b/apps/transport/lib/transport/preemptive_stats_cache.ex new file mode 100644 index 0000000000..1846379203 --- /dev/null +++ b/apps/transport/lib/transport/preemptive_stats_cache.ex @@ -0,0 +1,39 @@ +defmodule Transport.PreemptiveStatsCache do + @moduledoc """ + A module that populates the Cachex cache for the /stats and /api/stats/* endpoints + """ + + use Transport.PreemptiveBaseCache, + # Let’s give some time for the system to boot up before we start and the API cache to be populated + first_run: :timer.minutes(1), + job_delay: :timer.hours(3), + # more than twice job_delay to reduce the risk of parallel computation + cache_ttl: :timer.hours(7) + + require Logger + + def populate_cache do + Logger.info("[preemptive-stats-cache] Populating cache for stats…") + Transport.Cache.put("stats-page-index", Transport.StatsHandler.compute_stats(), @cache_ttl) + + Transport.Cache.put( + "api-stats-aoms", + TransportWeb.API.StatsController.rendered_geojson(:aoms, timeout: :timer.seconds(60)), + @cache_ttl + ) + + Transport.Cache.put( + "api-stats-regions", + TransportWeb.API.StatsController.rendered_geojson(:regions, timeout: :timer.seconds(60)), + @cache_ttl + ) + + Transport.Cache.put( + "api-stats-quality", + TransportWeb.API.StatsController.rendered_geojson(:quality, timeout: :timer.seconds(60)), + @cache_ttl + ) + + Logger.info("[preemptive-stats-cache] Finished populating cache for stats.") + end +end diff --git a/apps/transport/lib/transport_web/api/controllers/stats_controller.ex b/apps/transport/lib/transport_web/api/controllers/stats_controller.ex index 8daa2126a1..eba713c6a5 100644 --- a/apps/transport/lib/transport_web/api/controllers/stats_controller.ex +++ b/apps/transport/lib/transport_web/api/controllers/stats_controller.ex @@ -265,7 +265,8 @@ defmodule TransportWeb.API.StatsController do # @spec render_features(Plug.Conn.t(), atom(), binary()) :: Plug.Conn.t() defp render_features(conn, item, cache_key) do - data = Transport.Cache.fetch(cache_key, fn -> rendered_geojson(item) end) + data = + Transport.Cache.fetch(cache_key, fn -> rendered_geojson(item) end, Transport.PreemptiveStatsCache.cache_ttl()) render(conn, data: {:skip_json_encoding, data}) end @@ -276,21 +277,23 @@ defmodule TransportWeb.API.StatsController do render(conn, data: {:skip_json_encoding, data}) end - def rendered_geojson(item) when item in [:aoms, :regions, :quality] do + def rendered_geojson(item, ecto_opts \\ []) + + def rendered_geojson(item, ecto_opts) when item in [:aoms, :regions, :quality] do case item do :aoms -> aom_features_query() :regions -> region_features_query() :quality -> quality_features_query() end - |> Repo.all() + |> Repo.all(ecto_opts) |> features() |> geojson() |> Jason.encode!() end - def rendered_geojson(:bike_scooter_sharing) do + def rendered_geojson(:bike_scooter_sharing, ecto_opts) do bike_scooter_sharing_features_query() - |> Repo.all() + |> Repo.all(ecto_opts) |> bike_scooter_sharing_features() |> geojson() |> Jason.encode!() diff --git a/apps/transport/lib/transport_web/controllers/stats_controller.ex b/apps/transport/lib/transport_web/controllers/stats_controller.ex index 89f0c8df14..3fee93524f 100644 --- a/apps/transport/lib/transport_web/controllers/stats_controller.ex +++ b/apps/transport/lib/transport_web/controllers/stats_controller.ex @@ -3,7 +3,12 @@ defmodule TransportWeb.StatsController do @spec index(Plug.Conn.t(), any) :: Plug.Conn.t() def index(conn, _params) do - stats = Transport.Cache.fetch("stats-page-index", fn -> Transport.StatsHandler.compute_stats() end) + stats = + Transport.Cache.fetch( + "stats-page-index", + fn -> Transport.StatsHandler.compute_stats() end, + Transport.PreemptiveStatsCache.cache_ttl() + ) conn = stats diff --git a/apps/transport/test/transport_web/controllers/api/stats_controller_test.exs b/apps/transport/test/transport_web/controllers/api/stats_controller_test.exs index 2db08544e0..3c8930ed8b 100644 --- a/apps/transport/test/transport_web/controllers/api/stats_controller_test.exs +++ b/apps/transport/test/transport_web/controllers/api/stats_controller_test.exs @@ -14,17 +14,17 @@ defmodule TransportWeb.API.StatsControllerTest do for {route, cache_key} <- @cached_features_routes do test "GET #{route} (invokes the cache system)", %{conn: conn} do # return original computed payload - mock = fn unquote(cache_key), x -> x.() end + mock = fn unquote(cache_key), x, _ -> x.() end with_mock Transport.Cache, fetch: mock do conn = conn |> get(unquote(route)) %{"features" => _features} = json_response(conn, 200) - assert_called_exactly(Transport.Cache.fetch(:_, :_), 1) + assert_called_exactly(Transport.Cache.fetch(:_, :_, :_), 1) end end test "GET #{route} (returns the cached value as is)", %{conn: conn} do - mock = fn unquote(cache_key), _ -> %{hello: 123} |> Jason.encode!() end + mock = fn unquote(cache_key), _, _ -> %{hello: 123} |> Jason.encode!() end with_mock Transport.Cache, fetch: mock do conn = conn |> get(unquote(route))