Skip to content

Commit

Permalink
Refactor rate-limiting code
Browse files Browse the repository at this point in the history
This refactoring introduces a couple of changes compared to the previous implementation:
- Rate limits are checked twice:
  - First, there's a preliminary check from large scale to small scale
     to allow short-circuiting when a bucket is already empty.
  - Secondly, the rate check goes from small scalle to large scale
     to hit buckets in the correct order.
- check_sender_rate_limit/1 has been moved to the Mailer module.
  It feels better to have it there as a companion function for
  deliver_with_sender/2 and shouldn't be in Sender because it has
  side-effects.
- Shared Senders are now also supported correctly
  • Loading branch information
wmnnd committed Aug 26, 2022
1 parent 0e48ebc commit 5c42e6c
Show file tree
Hide file tree
Showing 6 changed files with 145 additions and 47 deletions.
4 changes: 4 additions & 0 deletions config/dev.exs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@ config :keila, Keila.Mailings.SenderAdapters,
Keila.Mailings.SenderAdapters.SES,
Keila.Mailings.SenderAdapters.Mailgun,
Keila.Mailings.SenderAdapters.Local
],
shared_adapters: [
Keila.Mailings.SenderAdapters.Shared.SES,
Keila.Mailings.SenderAdapters.Shared.Local
]

# ## SSL Support
Expand Down
100 changes: 100 additions & 0 deletions lib/keila/mailer.ex
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
defmodule Keila.Mailer do
use Swoosh.Mailer, otp_app: :keila
alias Keila.Mailings.Sender
alias Keila.Mailings.SharedSender

@doc """
Delivers an email using a given sender.
Expand All @@ -18,4 +20,102 @@ defmodule Keila.Mailer do
|> adapter.put_provider_options(sender)
|> deliver(config)
end

@doc """
Checks the rate limit of a Sender. If the sender is using a shared
sender, the `:shared_sender` association must be preloaded.
Returns `:ok` if rate limit has not been exceeded or
`{:error, seconds_to_next_bucket}` otherwise.
`seconds_to_next_bucket` is the time until the rate limit that was exceeded
is reset.
"""
@spec check_sender_rate_limit(Sender.t()) :: :ok | {:error, integer()}
def check_sender_rate_limit(sender) do
sender = sender.shared_sender || sender

rate_limits = get_rate_limits(sender)

with :ok <- precheck_rate_limits(rate_limits) do
check_rate_limits(rate_limits)
end
end

defp get_rate_limits(%Sender{shared_sender: %SharedSender{} = shared_sender}) do
get_rate_limits(shared_sender)
end

# List rate limits with larger scale first
defp get_rate_limits(sender) do
[
{:hour, sender.config.rate_limit_per_hour, bucket_name(sender, :hour)},
{:minute, sender.config.rate_limit_per_minute, bucket_name(sender, :minute)},
{:second, sender.config.rate_limit_per_second, bucket_name(sender, :second)}
]
end

# Inspect buckets to see if buckets with large scales have already been exhausted
defp precheck_rate_limits(rate_limits) do
rate_limits
|> Enum.filter(fn {scale_name, _, _} -> scale_name != :second end)
|> Enum.reduce_while(:ok, fn rate_limit, :ok ->
case do_precheck_rate_limit(rate_limit) do
:ok -> {:cont, :ok}
{:error, seconds_to_next_bucket} -> {:halt, {:error, seconds_to_next_bucket}}
end
end)
end

# Check rate limits in reversed order (starting with smaller scales)
defp check_rate_limits(rate_limits) do
rate_limits
|> Enum.reverse()
|> Enum.reduce_while(:ok, fn rate_limit, :ok ->
case do_check_rate_limit(rate_limit) do
:ok -> {:cont, :ok}
{:error, seconds_to_next_bucket} -> {:halt, {:error, seconds_to_next_bucket}}
end
end)
end

defp do_precheck_rate_limit({scale_name, limit, bucket}) when is_integer(limit) and limit > 0 do
scale = scale(scale_name)

{_, remaining, ms_to_next_bucket, _, _} = ExRated.inspect_bucket(bucket, scale, limit)

if remaining == 0 do
seconds_to_next_bucket = max(1, div(ms_to_next_bucket, 1000))
{:error, seconds_to_next_bucket}
else
:ok
end
end

defp do_precheck_rate_limit(_), do: :ok

defp do_check_rate_limit({scale_name, limit, bucket}) when is_integer(limit) and limit > 0 do
scale = scale(scale_name)

case ExRated.check_rate(bucket, scale, limit) do
{:ok, _} ->
:ok

{:error, _} ->
{_, _, ms_to_next_bucket, _, _} = ExRated.inspect_bucket(bucket, scale, limit)
seconds_to_next_bucket = max(1, div(ms_to_next_bucket, 1000))
{:error, seconds_to_next_bucket}
end
end

defp do_check_rate_limit(_), do: :ok

defp bucket_name(sender, scale_name) do
"sender-bucket-per-#{scale_name}-#{sender.id}"
end

defp scale(scale_name)
defp scale(:second), do: 1_000
defp scale(:minute), do: 60_000
defp scale(:hour), do: 3_600_000
end
26 changes: 0 additions & 26 deletions lib/keila/mailings/schemas/sender.ex
Original file line number Diff line number Diff line change
Expand Up @@ -50,32 +50,6 @@ defmodule Keila.Mailings.Sender do
|> apply_constraints()
end

@spec check_rate(%__MODULE__{}) :: {:error, integer} | {:ok, integer}
def check_rate(struct) do
with {:ok, hour_calls} <-
ExRated.check_rate(
"sender-bucket-per-hour-#{struct.id}",
3_600_000,
struct.config.rate_limit_per_hour
),
{:ok, minute_calls} <-
ExRated.check_rate(
"sender-bucket-per-minute-#{struct.id}",
60_000,
struct.config.rate_limit_per_minute
),
{:ok, second_calls} <-
ExRated.check_rate(
"sender-bucket-per-second-#{struct.id}",
1_000,
struct.config.rate_limit_per_second
) do
{:ok, hour_calls + minute_calls + second_calls}
else
{:error, calls} -> {:error, calls}
end
end

defp lowercase_emails(changeset) do
changeset
|> update_change(:from_email, &String.downcase/1)
Expand Down
34 changes: 27 additions & 7 deletions lib/keila/mailings/worker.ex
Original file line number Diff line number Diff line change
@@ -1,12 +1,17 @@
defmodule Keila.Mailings.Worker do
use Oban.Worker, queue: :mailer
use Keila.Repo
alias Keila.Mailings.{Recipient, Builder, Sender}
alias Keila.Mailings.{Recipient, Builder}
require ExRated
require Logger

@impl true
def perform(%Oban.Job{args: args}) do
%{"recipient_id" => recipient_id, "recipient_count" => recipient_count} = args
# TODO: recipient_count is currently not used but could be used to improve
# the algorithm snoozing delivery when running for the first time.
# This would also require check_sender_rate/1 to return information about
# active rate limits (bucket sizes and scales).
%{"recipient_id" => recipient_id, "recipient_count" => _recipient_count} = args

recipient =
from(r in Recipient,
Expand All @@ -17,19 +22,34 @@ defmodule Keila.Mailings.Worker do

sender = recipient.campaign.sender

case Sender.check_rate(sender) do
{:error, _} ->
# wait is proportional to the number of workers
{:snooze, 1 * recipient_count}
case Keila.Mailer.check_sender_rate_limit(sender) do
{:error, min_delay} ->
# wait until the minimum delay + add randomness to even out load
random_delay = :rand.uniform(60)
delay = min_delay + :rand.uniform(60)

{:ok, _} ->
Logger.debug(
"Snoozing email to #{recipient.contact.email} for campaign #{recipient.campaign.id} for #{min_delay} + #{random_delay} s"
)

{:snooze, delay}

:ok ->
if recipient.contact.status == :active && recipient.campaign.sender do
Logger.debug(
"Sending email to #{recipient.contact.email} for campaign #{recipient.campaign.id}"
)

recipient.campaign
|> Builder.build(recipient, %{})
|> tap(&ensure_valid!/1)
|> Keila.Mailer.deliver_with_sender(sender)
|> maybe_update_recipient(recipient)
else
Logger.debug(
"Skipping sending email to #{recipient.contact.email} for campaign #{recipient.campaign.id}"
)

from(r in Recipient, where: r.id == ^recipient.id) |> Repo.delete_all()

:ok
Expand Down
14 changes: 7 additions & 7 deletions lib/keila_web/templates/sender/_config.html.heex
Original file line number Diff line number Diff line change
Expand Up @@ -52,21 +52,21 @@
<%= render_sender_adapter_form(fc, adapter) %>

<div class="form-row">
<%= label(fc, :rate_limit_per_hour, "Rate limit per hour") %>
<%= with_validation(fc, :rate_limit_per_hour) do %>
<%= number_input(fc, :rate_limit_per_hour, placeholder: gettext("10,000"), class: "text-black") %>
<%= label(fc, :rate_limit_per_second, "Rate limit per second") %>
<%= with_validation(fc, :rate_limit_per_second) do %>
<%= number_input(fc, :rate_limit_per_second, class: "text-black", min: 0) %>
<% end %>
</div>
<div class="form-row">
<%= label(fc, :rate_limit_per_minute, "Rate limit per minute") %>
<%= with_validation(fc, :rate_limit_per_minute) do %>
<%= number_input(fc, :rate_limit_per_minute, placeholder: gettext("1,000"), class: "text-black") %>
<%= number_input(fc, :rate_limit_per_minute, class: "text-black", min: 0) %>
<% end %>
</div>
<div class="form-row">
<%= label(fc, :rate_limit_per_second, "Rate limit per second") %>
<%= with_validation(fc, :rate_limit_per_second) do %>
<%= number_input(fc, :rate_limit_per_second, placeholder: gettext("100"), class: "text-black") %>
<%= label(fc, :rate_limit_per_hour, "Rate limit per hour") %>
<%= with_validation(fc, :rate_limit_per_hour) do %>
<%= number_input(fc, :rate_limit_per_hour, class: "text-black", min: 0) %>
<% end %>
</div>
</div>
Expand Down
14 changes: 7 additions & 7 deletions test/keila/mailings/mailings_senders_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -141,10 +141,10 @@ defmodule Keila.Mailings.SenderTest do
assert rate_limit_per_second == sender.config.rate_limit_per_second

for _ <- 1..rate_limit_per_second do
assert {:ok, _} = Sender.check_rate(sender)
assert :ok = Keila.Mailer.check_sender_rate_limit(sender)
end

assert {:error, ^rate_limit_per_second} = Sender.check_rate(sender)
assert {:error, _min_delay} = Keila.Mailer.check_sender_rate_limit(sender)
end

test "using check rate limit by minutes of new sender" do
Expand All @@ -167,10 +167,10 @@ defmodule Keila.Mailings.SenderTest do
assert rate_limit_per_minute == sender.config.rate_limit_per_minute

for _ <- 1..rate_limit_per_minute do
assert {:ok, _} = Sender.check_rate(sender)
assert :ok = Keila.Mailer.check_sender_rate_limit(sender)
end

assert {:error, ^rate_limit_per_minute} = Sender.check_rate(sender)
assert {:error, _min_delay} = Keila.Mailer.check_sender_rate_limit(sender)
end

test "using check rate limit by hours of new sender" do
Expand All @@ -193,10 +193,10 @@ defmodule Keila.Mailings.SenderTest do
assert rate_limit_per_hour == sender.config.rate_limit_per_hour

for _ <- 1..rate_limit_per_hour do
assert {:ok, _} = Sender.check_rate(sender)
assert :ok = Keila.Mailer.check_sender_rate_limit(sender)
end

assert {:error, ^rate_limit_per_hour} = Sender.check_rate(sender)
assert {:error, _min_delay} = Keila.Mailer.check_sender_rate_limit(sender)
end

test "using check rate without limit of new sender" do
Expand All @@ -210,7 +210,7 @@ defmodule Keila.Mailings.SenderTest do
assert sender.config.rate_limit_per_hour == nil

for _ <- 1..50 do
assert {:ok, _} = Sender.check_rate(sender)
assert :ok = Keila.Mailer.check_sender_rate_limit(sender)
end
end
end
Expand Down

0 comments on commit 5c42e6c

Please sign in to comment.