Skip to content

Commit

Permalink
Cache préemptif des statistiques (#4435)
Browse files Browse the repository at this point in the history
Co-authored-by: Antoine Augusti <[email protected]>
  • Loading branch information
vdegove and AntoineAugusti authored Feb 10, 2025
1 parent 8d1a169 commit 851e4f0
Show file tree
Hide file tree
Showing 7 changed files with 113 additions and 38 deletions.
1 change: 1 addition & 0 deletions apps/transport/lib/transport/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ defmodule Transport.Application do
|> add_scheduler()
|> add_if(fn -> run_web_processes end, Transport.RealtimePoller)
|> add_if(fn -> run_web_processes end, Transport.PreemptiveAPICache)
|> add_if(fn -> run_web_processes end, Transport.PreemptiveStatsCache)
## manually add a children supervisor that is not scheduled
|> Kernel.++([{Task.Supervisor, name: ImportTaskSupervisor}])

Expand Down
35 changes: 6 additions & 29 deletions apps/transport/lib/transport/preemptive_api_cache.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,36 +3,13 @@ defmodule Transport.PreemptiveAPICache do
A module that populates the Cachex cache for the /api/datasets endpoint ("api-datasets-index")
"""

use GenServer
require Logger

@job_delay :timer.seconds(300)
# slightly more than twice `@job_delay` to reduce the risk of parallel computation
@cache_ttl :timer.seconds(700)

def cache_ttl, do: @cache_ttl

def start_link(_opts) do
GenServer.start_link(__MODULE__, %{})
end

def init(state) do
# initial schedule is immediate, but via the same code path,
# to ensure we jump on the data
schedule_next_occurrence(0)
use Transport.PreemptiveBaseCache,
first_run: 0,
job_delay: :timer.seconds(300),
# more than twice job_delay to reduce the risk of parallel computation
cache_ttl: :timer.seconds(700)

{:ok, state}
end

def schedule_next_occurrence(delay \\ @job_delay) do
Process.send_after(self(), :tick, delay)
end

def handle_info(:tick, state) do
schedule_next_occurrence()
populate_cache()
{:noreply, state}
end
require Logger

def populate_cache do
Logger.info("[preemptive-api-cache] Populating cache for /api/datasets…")
Expand Down
50 changes: 50 additions & 0 deletions apps/transport/lib/transport/preemptive_base_cache.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
defmodule Transport.PreemptiveBaseCache do
@moduledoc """
Common code for preemptive caches. This module is a macro that generates a GenServer,
which will populate a cache at regular intervals (similar to a cron job).
Usage:
```
use Transport.PreemptiveBaseCache,
first_run: 0,
job_delay: :timer.seconds(300),
cache_ttl: :timer.seconds(700)
```
The module in which it is used must implement the `populate_cache/0` function, that will be regularly called.
- First run indicates the time to wait before the first run of the job between the start of the application and the first run.
- Job delay indicates the time to wait between each run of the job.
- Cache TTL indicates the time to keep the cache alive.
"""
defmacro __using__(opts) do
quote do
use GenServer

@first_run unquote(opts[:first_run])
@job_delay unquote(opts[:job_delay])
@cache_ttl unquote(opts[:cache_ttl])

def cache_ttl, do: @cache_ttl

def start_link(_opts) do
GenServer.start_link(__MODULE__, %{})
end

def init(state) do
schedule_next_occurrence(@first_run)

{:ok, state}
end

def schedule_next_occurrence(delay) do
Process.send_after(self(), :tick, delay)
end

def handle_info(:tick, state) do
schedule_next_occurrence(@job_delay)
populate_cache()
{:noreply, state}
end
end
end
end
39 changes: 39 additions & 0 deletions apps/transport/lib/transport/preemptive_stats_cache.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
defmodule Transport.PreemptiveStatsCache do
@moduledoc """
A module that populates the Cachex cache for the /stats and /api/stats/* endpoints
"""

use Transport.PreemptiveBaseCache,
# Let’s give some time for the system to boot up before we start and the API cache to be populated
first_run: :timer.minutes(1),
job_delay: :timer.hours(3),
# more than twice job_delay to reduce the risk of parallel computation
cache_ttl: :timer.hours(7)

require Logger

def populate_cache do
Logger.info("[preemptive-stats-cache] Populating cache for stats…")
Transport.Cache.put("stats-page-index", Transport.StatsHandler.compute_stats(), @cache_ttl)

Transport.Cache.put(
"api-stats-aoms",
TransportWeb.API.StatsController.rendered_geojson(:aoms, timeout: :timer.seconds(60)),
@cache_ttl
)

Transport.Cache.put(
"api-stats-regions",
TransportWeb.API.StatsController.rendered_geojson(:regions, timeout: :timer.seconds(60)),
@cache_ttl
)

Transport.Cache.put(
"api-stats-quality",
TransportWeb.API.StatsController.rendered_geojson(:quality, timeout: :timer.seconds(60)),
@cache_ttl
)

Logger.info("[preemptive-stats-cache] Finished populating cache for stats.")
end
end
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,8 @@ defmodule TransportWeb.API.StatsController do
#
@spec render_features(Plug.Conn.t(), atom(), binary()) :: Plug.Conn.t()
defp render_features(conn, item, cache_key) do
data = Transport.Cache.fetch(cache_key, fn -> rendered_geojson(item) end)
data =
Transport.Cache.fetch(cache_key, fn -> rendered_geojson(item) end, Transport.PreemptiveStatsCache.cache_ttl())

render(conn, data: {:skip_json_encoding, data})
end
Expand All @@ -276,21 +277,23 @@ defmodule TransportWeb.API.StatsController do
render(conn, data: {:skip_json_encoding, data})
end

def rendered_geojson(item) when item in [:aoms, :regions, :quality] do
def rendered_geojson(item, ecto_opts \\ [])

def rendered_geojson(item, ecto_opts) when item in [:aoms, :regions, :quality] do
case item do
:aoms -> aom_features_query()
:regions -> region_features_query()
:quality -> quality_features_query()
end
|> Repo.all()
|> Repo.all(ecto_opts)
|> features()
|> geojson()
|> Jason.encode!()
end

def rendered_geojson(:bike_scooter_sharing) do
def rendered_geojson(:bike_scooter_sharing, ecto_opts) do
bike_scooter_sharing_features_query()
|> Repo.all()
|> Repo.all(ecto_opts)
|> bike_scooter_sharing_features()
|> geojson()
|> Jason.encode!()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,12 @@ defmodule TransportWeb.StatsController do

@spec index(Plug.Conn.t(), any) :: Plug.Conn.t()
def index(conn, _params) do
stats = Transport.Cache.fetch("stats-page-index", fn -> Transport.StatsHandler.compute_stats() end)
stats =
Transport.Cache.fetch(
"stats-page-index",
fn -> Transport.StatsHandler.compute_stats() end,
Transport.PreemptiveStatsCache.cache_ttl()
)

conn =
stats
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,17 @@ defmodule TransportWeb.API.StatsControllerTest do
for {route, cache_key} <- @cached_features_routes do
test "GET #{route} (invokes the cache system)", %{conn: conn} do
# return original computed payload
mock = fn unquote(cache_key), x -> x.() end
mock = fn unquote(cache_key), x, _ -> x.() end

with_mock Transport.Cache, fetch: mock do
conn = conn |> get(unquote(route))
%{"features" => _features} = json_response(conn, 200)
assert_called_exactly(Transport.Cache.fetch(:_, :_), 1)
assert_called_exactly(Transport.Cache.fetch(:_, :_, :_), 1)
end
end

test "GET #{route} (returns the cached value as is)", %{conn: conn} do
mock = fn unquote(cache_key), _ -> %{hello: 123} |> Jason.encode!() end
mock = fn unquote(cache_key), _, _ -> %{hello: 123} |> Jason.encode!() end

with_mock Transport.Cache, fetch: mock do
conn = conn |> get(unquote(route))
Expand Down

0 comments on commit 851e4f0

Please sign in to comment.