Skip to content

Commit

Permalink
Merge branch 'master' into irve-packing
Browse files Browse the repository at this point in the history
  • Loading branch information
thbar authored Jan 20, 2025
2 parents 9401472 + f7e359a commit 269db41
Show file tree
Hide file tree
Showing 35 changed files with 989 additions and 422 deletions.
25 changes: 17 additions & 8 deletions apps/shared/lib/wrapper/wrapper_req.ex
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,18 @@ defmodule Transport.HTTPClient do
"""

def get!(url, options) do
{req, options} = setup_cache(options)

Transport.Req.impl().get!(req, options |> Keyword.merge(url: url))
end

def get(url, options) do
{req, options} = setup_cache(options)

Transport.Req.impl().get(req, options |> Keyword.merge(url: url))
end

defp setup_cache(options) do
options =
Keyword.validate!(options, [
:custom_cache_dir,
Expand All @@ -48,13 +60,10 @@ defmodule Transport.HTTPClient do

{enable_cache, options} = options |> Keyword.pop!(:enable_cache)

req =
if enable_cache do
req |> Transport.Shared.ReqCustomCache.attach()
else
req
end

Transport.Req.impl().get!(req, options |> Keyword.merge(url: url))
if enable_cache do
{req |> Transport.Shared.ReqCustomCache.attach(), options}
else
{req, options}
end
end
end
87 changes: 87 additions & 0 deletions apps/transport/lib/gtfs/utils.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
defmodule Transport.GTFS.Utils do
@moduledoc """
Some helpers for handling GTFS archives.
"""

def fetch_position(record, field) do
Map.fetch!(record, field) |> convert_text_to_float()
end

@doc """
Convert textual values to float.
iex> convert_text_to_float("")
nil
iex> convert_text_to_float("0")
0.0
iex> convert_text_to_float("0.0")
0.0
iex> convert_text_to_float("12.7")
12.7
iex> convert_text_to_float("-12.7")
-12.7
iex> convert_text_to_float(" -48.7 ")
-48.7
"""
def convert_text_to_float(input) do
if input |> String.trim() != "" do
input |> String.trim() |> Decimal.new() |> Decimal.to_float()
else
nil
end
end

@doc """
Variant of csv_get_with_default/3 that raises if a mandatory column is missing.
"""
def csv_get_with_default!(map, field, default_value, mandatory_column \\ true) do
value = if mandatory_column, do: Map.fetch!(map, field), else: Map.get(map, field)

case value do
nil -> default_value
"" -> default_value
v -> v
end
end

@doc """
iex> csv_get_with_default(%{}, "field", 0)
0
iex> csv_get_with_default(%{"other_field" => 1}, "field", 0)
0
iex> csv_get_with_default(%{"field" => 2, "other_field" => 1}, "field", 0)
2
iex> csv_get_with_default(%{"field" => "", "other_field" => 1}, "field", 0)
0
"""
def csv_get_with_default(map, field, default_value) do
value = Map.get(map, field)

case value do
nil -> default_value
"" -> default_value
v -> v
end
end

@doc """
Transform the stream outputed by Unzip to a stream of maps, each map
corresponding to a row from the CSV.
"""
def to_stream_of_maps(file_stream) do
file_stream
# transform the stream to a stream of binaries
|> Stream.map(fn c -> IO.iodata_to_binary(c) end)
# stream line by line
|> NimbleCSV.RFC4180.to_line_stream()
|> NimbleCSV.RFC4180.parse_stream(skip_headers: false)
# transform the stream to a stream of maps %{column_name1: value1, ...}
|> Stream.transform([], fn r, acc ->
if acc == [] do
{%{}, r |> Enum.map(fn h -> h |> String.replace_prefix("\uFEFF", "") end)}
else
{[acc |> Enum.zip(r) |> Enum.into(%{})], acc}
end
end)
end
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
defmodule Transport.Jobs.ExpirationAdminProducerNotificationJob do
@moduledoc """
This module is in charge of sending notifications to admins and producers when data is outdated.
It is similar to `Transport.Jobs.ExpirationNotificationJob`, dedicated to reusers.
Both could be merged in the future.
"""

use Oban.Worker, max_attempts: 3, tags: ["notifications"]
import Ecto.Query

@type delay_and_records :: {integer(), [{DB.Dataset.t(), [DB.Resource.t()]}]}
@expiration_reason Transport.NotificationReason.reason(:expiration)
# If delay < 0, the resource is already expired
@default_outdated_data_delays [-90, -60, -30, -45, -15, -7, -3, 0, 7, 14]

@impl Oban.Worker

def perform(%Oban.Job{id: job_id}) do
outdated_data(job_id)
:ok
end

def outdated_data(job_id) do
for delay <- possible_delays(),
date = Date.add(Date.utc_today(), delay) do
{delay, gtfs_datasets_expiring_on(date)}
end
|> Enum.reject(fn {_, records} -> Enum.empty?(records) end)
|> send_outdated_data_admin_mail()
|> Enum.map(&send_outdated_data_producer_notifications(&1, job_id))
end

@spec gtfs_datasets_expiring_on(Date.t()) :: [{DB.Dataset.t(), [DB.Resource.t()]}]
def gtfs_datasets_expiring_on(%Date{} = date) do
DB.Dataset.base_query()
|> DB.Dataset.join_from_dataset_to_metadata(Transport.Validators.GTFSTransport.validator_name())
|> where(
[metadata: m, resource: r],
fragment("TO_DATE(?->>'end_date', 'YYYY-MM-DD')", m.metadata) == ^date and r.format == "GTFS"
)
|> select([dataset: d, resource: r], {d, r})
|> distinct(true)
|> DB.Repo.all()
|> Enum.group_by(fn {%DB.Dataset{} = d, _} -> d end, fn {_, %DB.Resource{} = r} -> r end)
|> Enum.to_list()
end

def possible_delays do
@default_outdated_data_delays
|> Enum.uniq()
|> Enum.sort()
end

# A different email is sent to producers for every delay, containing all datasets expiring on this given delay
@spec send_outdated_data_producer_notifications(delay_and_records(), integer()) :: :ok
def send_outdated_data_producer_notifications({delay, records}, job_id) do
Enum.each(records, fn {%DB.Dataset{} = dataset, resources} ->
@expiration_reason
|> DB.NotificationSubscription.subscriptions_for_reason_dataset_and_role(dataset, :producer)
|> Enum.each(fn %DB.NotificationSubscription{contact: %DB.Contact{} = contact} = subscription ->
contact
|> Transport.UserNotifier.expiration_producer(dataset, resources, delay)
|> Transport.Mailer.deliver()

DB.Notification.insert!(dataset, subscription, %{delay: delay, job_id: job_id})
end)
end)
end

@spec send_outdated_data_admin_mail([delay_and_records()]) :: [delay_and_records()]
defp send_outdated_data_admin_mail([] = _records), do: []

defp send_outdated_data_admin_mail(records) do
Transport.AdminNotifier.expiration(records)
|> Transport.Mailer.deliver()

records
end
end
2 changes: 2 additions & 0 deletions apps/transport/lib/jobs/expiration_notification_job.ex
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ defmodule Transport.Jobs.ExpirationNotificationJob do
It has 2 `perform/1` methods:
- a dispatcher one in charge of identifying contacts we should get in touch with today
- another in charge of building the daily digest for a specific contact (with only their favorited datasets)
It is similar to `Transport.Jobs.ExpirationAdminProducerNotificationJob`, dedicated to producers and admins.
"""
use Oban.Worker,
max_attempts: 3,
Expand Down
65 changes: 9 additions & 56 deletions apps/transport/lib/jobs/gtfs_to_db.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,33 +3,7 @@ defmodule Transport.Jobs.GtfsToDB do
Get the content of a GTFS ResourceHistory, store it in the DB
"""

@doc """
Convert textual values to float.
iex> convert_text_to_float("0")
0.0
iex> convert_text_to_float("0.0")
0.0
iex> convert_text_to_float("12.7")
12.7
iex> convert_text_to_float("-12.7")
-12.7
iex> convert_text_to_float(" -48.7 ")
-48.7
"""
def convert_text_to_float(input) do
input |> String.trim() |> Decimal.new() |> Decimal.to_float()
end

def csv_get_with_default!(map, field, default_value, mandatory_column \\ true) do
value = if mandatory_column, do: Map.fetch!(map, field), else: Map.get(map, field)

case value do
nil -> default_value
"" -> default_value
v -> v
end
end
alias Transport.GTFS.Utils

def import_gtfs_from_resource_history(resource_history_id) do
%{id: data_import_id} = %DB.DataImport{resource_history_id: resource_history_id} |> DB.Repo.insert!()
Expand Down Expand Up @@ -61,16 +35,16 @@ defmodule Transport.Jobs.GtfsToDB do
def stops_stream_insert(file_stream, data_import_id) do
DB.Repo.transaction(fn ->
file_stream
|> to_stream_of_maps()
|> Utils.to_stream_of_maps()
# the map is reshaped for Ecto's needs
|> Stream.map(fn r ->
%{
data_import_id: data_import_id,
stop_id: r |> Map.fetch!("stop_id"),
stop_name: r |> Map.fetch!("stop_name"),
stop_lat: r |> Map.fetch!("stop_lat") |> convert_text_to_float(),
stop_lon: r |> Map.fetch!("stop_lon") |> convert_text_to_float(),
location_type: r |> csv_get_with_default!("location_type", "0", false) |> String.to_integer()
stop_lat: r |> Utils.fetch_position("stop_lat"),
stop_lon: r |> Utils.fetch_position("stop_lon"),
location_type: r |> Utils.csv_get_with_default!("location_type", "0", false) |> String.to_integer()
}
end)
|> Stream.chunk_every(1000)
Expand All @@ -79,27 +53,6 @@ defmodule Transport.Jobs.GtfsToDB do
end)
end

@doc """
Transform the stream outputed by Unzip to a stream of maps, each map
corresponding to a row from the CSV.
"""
def to_stream_of_maps(file_stream) do
file_stream
# transform the stream to a stream of binaries
|> Stream.map(fn c -> IO.iodata_to_binary(c) end)
# stream line by line
|> NimbleCSV.RFC4180.to_line_stream()
|> NimbleCSV.RFC4180.parse_stream(skip_headers: false)
# transform the stream to a stream of maps %{column_name1: value1, ...}
|> Stream.transform([], fn r, acc ->
if acc == [] do
{%{}, r |> Enum.map(fn h -> h |> String.replace_prefix("\uFEFF", "") end)}
else
{[acc |> Enum.zip(r) |> Enum.into(%{})], acc}
end
end)
end

def fill_calendar_from_resource_history(resource_history_id, data_import_id) do
file_stream = file_stream(resource_history_id, "calendar.txt")
calendar_stream_insert(file_stream, data_import_id)
Expand All @@ -108,7 +61,7 @@ defmodule Transport.Jobs.GtfsToDB do
def calendar_stream_insert(file_stream, data_import_id) do
DB.Repo.transaction(fn ->
file_stream
|> to_stream_of_maps()
|> Utils.to_stream_of_maps()
|> Stream.map(fn r ->
res = %{
data_import_id: data_import_id,
Expand Down Expand Up @@ -155,7 +108,7 @@ defmodule Transport.Jobs.GtfsToDB do
DB.Repo.transaction(
fn ->
file_stream
|> to_stream_of_maps()
|> Utils.to_stream_of_maps()
|> Stream.map(fn r ->
%{
data_import_id: data_import_id,
Expand Down Expand Up @@ -209,7 +162,7 @@ defmodule Transport.Jobs.GtfsToDB do
DB.Repo.transaction(
fn ->
file_stream
|> to_stream_of_maps()
|> Utils.to_stream_of_maps()
|> Stream.map(fn r ->
%{
data_import_id: data_import_id,
Expand All @@ -235,7 +188,7 @@ defmodule Transport.Jobs.GtfsToDB do
DB.Repo.transaction(
fn ->
file_stream
|> to_stream_of_maps()
|> Utils.to_stream_of_maps()
|> Stream.map(fn r ->
%{
data_import_id: data_import_id,
Expand Down
3 changes: 1 addition & 2 deletions apps/transport/lib/jobs/new_datagouv_datasets_job.ex
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,7 @@ defmodule Transport.Jobs.NewDatagouvDatasetsJob do
"infrastructure de recharge",
"borne de recharge",
"irve",
"sdirve",
"électrique"
"sdirve"
]),
formats: MapSet.new([])
}
Expand Down
21 changes: 19 additions & 2 deletions apps/transport/lib/jobs/new_dataset_notifications_job.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@ defmodule Transport.Jobs.NewDatasetNotificationsJob do
"""
use Oban.Worker, max_attempts: 3, tags: ["notifications"]
import Ecto.Query
@new_dataset_reason Transport.NotificationReason.reason(:new_dataset)

@impl Oban.Worker
def perform(%Oban.Job{inserted_at: %DateTime{} = inserted_at}) do
inserted_at |> relevant_datasets() |> Transport.DataChecker.send_new_dataset_notifications()

def perform(%Oban.Job{id: job_id, inserted_at: %DateTime{} = inserted_at}) do
inserted_at |> relevant_datasets() |> send_new_dataset_notifications(job_id)
:ok
end

Expand All @@ -18,4 +20,19 @@ defmodule Transport.Jobs.NewDatasetNotificationsJob do
|> where([dataset: d], d.inserted_at >= ^datetime_limit)
|> DB.Repo.all()
end

@spec send_new_dataset_notifications([DB.Dataset.t()] | [], pos_integer()) :: no_return() | :ok
def send_new_dataset_notifications([], _job_id), do: :ok

def send_new_dataset_notifications(datasets, job_id) do
@new_dataset_reason
|> DB.NotificationSubscription.subscriptions_for_reason_and_role(:reuser)
|> Enum.each(fn %DB.NotificationSubscription{contact: %DB.Contact{} = contact} = subscription ->
contact
|> Transport.UserNotifier.new_datasets(datasets)
|> Transport.Mailer.deliver()

DB.Notification.insert!(subscription, %{dataset_ids: Enum.map(datasets, & &1.id), job_id: job_id})
end)
end
end
Loading

0 comments on commit 269db41

Please sign in to comment.