Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: allow dynamic loop and throttle args #44

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 59 additions & 4 deletions lib/buffy/throttle.ex
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
# credo:disable-for-this-file Credo.Check.Refactor.LongQuoteBlocks
defmodule Buffy.Throttle do
@moduledoc """
The `Buffy.Throttle` module will wait for a specified amount of time before
Expand Down Expand Up @@ -51,7 +52,28 @@ defmodule Buffy.Throttle do

- `:supervisor_name` (`atom`) - Optional. The name of the dynamic supervisor to use. Defaults to the built in Buffy dynamic supervisor, but if you are running in a distributed instance you can set this value to a named `Horde.DynamicSupervisor` process. Defaults to `Buffy.DynamicSupervisor`.

- :throttle (`non_neg_integer`) - Required. The minimum amount of time to wait before invoking the function. This value is in milliseconds. The actual run time could be longer than this value based on the `:jitter` option.
- `:throttle` (`non_neg_integer`) - Optional. The minimum amount of time to wait before invoking the function. This value is in milliseconds. The actual run time could be longer than this value based on the `:jitter` option.

### Dynamic Options

Sometimes you want a different throttle value or jitter value based on the arguments you pass in. To deal with this, there are optional functions you can implement in your throttle module. These functions take in the arguments and will return the throttle and jitter values. For example:

defmodule MyThrottler do
use Buffy.Throttle,
registry_module: Horde.Registry,
registry_name: MyApp.HordeRegistry,
supervisor_module: Horde.DynamicSupervisor,
supervisor_name: MyApp.HordeDynamicSupervisor,
throttle: :timer.minutes(2)

def get_jitter(args) do
case args do
%Cat{} -> :timer.minutes(2)
%Dog{} -> :timer.seconds(10)
_ -> 0
end
end
end

## Using with Horde

Expand Down Expand Up @@ -122,7 +144,17 @@ defmodule Buffy.Throttle do
and wait the configured `throttle` time before calling the `c:handle_throttle/1`
function.
"""
@callback throttle(args :: args()) :: :ok | {:error, term()}
@callback throttle(args()) :: :ok | {:error, term()}

@doc """
Returns the amount of jitter in milliseconds to add to the throttle time.
"""
@callback get_jitter(args()) :: non_neg_integer()

@doc """
Returns the amount of throttle time in milliseconds.
"""
@callback get_throttle(args()) :: non_neg_integer()

@doc """
The function called after the throttle has completed. This function will
Expand All @@ -137,7 +169,7 @@ defmodule Buffy.Throttle do
restart = Keyword.get(opts, :restart, :temporary)
supervisor_module = Keyword.get(opts, :supervisor_module, DynamicSupervisor)
supervisor_name = Keyword.get(opts, :supervisor_name, Buffy.DynamicSupervisor)
throttle = Keyword.fetch!(opts, :throttle)
throttle = Keyword.get(opts, :throttle, 0)

quote do
@behaviour Buffy.Throttle
Expand Down Expand Up @@ -185,6 +217,28 @@ defmodule Buffy.Throttle do
end
end

@doc """
Returns the maximum amount of jitter in milliseconds. This allows
for a bit of random delay before calling the `throttle/1` function
to avoid thundering herd problems.
"""
@impl Buffy.Throttle
@spec get_jitter(Buffy.Throttle.args()) :: non_neg_integer()
def get_jitter(_args), do: unquote(jitter)

defoverridable get_jitter: 1

@doc """
Returns the amount of throttle in milliseconds to wait before calling
the `throttle/1` function. This function can be overridden to provide
dynamic throttling based on the passed in arguments.
"""
@impl Buffy.Throttle
@spec get_throttle(Buffy.Throttle.args()) :: non_neg_integer()
def get_throttle(_args), do: unquote(throttle)

defoverridable get_throttle: 1

@doc """
The function that runs after throttle has completed. This function will
be called with the `t:Buffy.Throttle.key()` and can return anything. The
Expand Down Expand Up @@ -256,7 +310,8 @@ defmodule Buffy.Throttle do
@impl GenServer
@spec handle_info(:timeout, Buffy.Throttle.state()) :: {:stop, :normal, Buffy.Throttle.state()}
def handle_info(:timeout, {key, args}) do
selected_jitter = max(:rand.uniform(unquote(jitter) + 1) - 1, 0)
jitter = get_jitter(args)
selected_jitter = max(:rand.uniform(jitter + 1) - 1, 0)

:telemetry.execute([:buffy, :throttle, :handle, :jitter], %{jitter: selected_jitter}, %{
args: args,
Expand Down
108 changes: 93 additions & 15 deletions lib/buffy/throttle_and_timed.ex
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ defmodule Buffy.ThrottleAndTimed do

## Options

- :throttle (`non_neg_integer`) - Required. The amount of time to wait before invoking the function. This value is in milliseconds.
- `:throttle` (`non_neg_integer`) - Optional. The amount of time to wait before invoking the function. This value is in milliseconds.

- `:registry_module` (`atom`) - Optional. A module that implements the `Registry` behaviour. If you are running in a distributed instance, you can set this value to `Horde.Registry`. Defaults to `Registry`.

Expand All @@ -119,6 +119,27 @@ defmodule Buffy.ThrottleAndTimed do

- `:loop_interval` (`atom`) - Optional. The amount of time that this process will wait while inbox is empty until sending a `:timeout` message (handle via `handle_info`). Resets if message comes in. In milliseconds. Without this, the module would function exactly like `Buffy.Throttle`.

### Dynamic Options

Sometimes you want a different throttle value or loop interval value based on the arguments you pass in. To deal with this, there are optional functions you can implement in your throttle and timed module. These functions take in the arguments and will return the throttle and loop interval values. For example:

defmodule MyThrottler do
use Buffy.ThrottleAndTimed,
registry_module: Horde.Registry,
registry_name: MyApp.HordeRegistry,
supervisor_module: Horde.DynamicSupervisor,
supervisor_name: MyApp.HordeDynamicSupervisor,
throttle: :timer.minutes(2)

def get_loop_interval(args) do
case args do
%Cat{} -> :timer.minutes(2)
%Dog{} -> :timer.seconds(10)
_ -> 0
end
end
end

## Example Usage:

### Have `throttle/1` add to data to state to process in `handle_throttle/1`
Expand Down Expand Up @@ -212,12 +233,19 @@ defmodule Buffy.ThrottleAndTimed do
"""
@type key :: term()

@typedoc """
The amount of time that this process will wait while inbox is empty
until sending a `:timeout` message.
"""
@type loop_interval :: non_neg_integer() | :infinity

@typedoc """
Internal state that `Buffy.ThrottleAndTimed` keeps.
"""
@type state :: %{
args: args(),
key: key(),
loop_interval: loop_interval(),
timer_ref: nil | reference()
}

Expand All @@ -226,7 +254,22 @@ defmodule Buffy.ThrottleAndTimed do
and wait the configured `throttle` time before calling the `c:handle_throttle/1`
function.
"""
@callback throttle(args :: args()) :: :ok | {:error, term()}
@callback throttle(args()) :: :ok | {:error, term()}

@doc """
Generates a unique key for the given arguments.
"""
@callback args_to_key(args()) :: key()

@doc """
Returns the amount of jitter in milliseconds to add to the throttle time.
"""
@callback get_loop_interval(args()) :: loop_interval()

@doc """
Returns the amount of throttle time in milliseconds.
"""
@callback get_throttle(args()) :: non_neg_integer()

@doc """
The function called after the throttle has completed. This function will
Expand All @@ -235,13 +278,13 @@ defmodule Buffy.ThrottleAndTimed do
@callback handle_throttle(args()) :: any()

defmacro __using__(opts) do
loop_interval = Keyword.get(opts, :loop_interval, :infinity)
registry_module = Keyword.get(opts, :registry_module, Registry)
registry_name = Keyword.get(opts, :registry_name, Buffy.Registry)
restart = Keyword.get(opts, :restart, :temporary)
supervisor_module = Keyword.get(opts, :supervisor_module, DynamicSupervisor)
supervisor_name = Keyword.get(opts, :supervisor_name, Buffy.DynamicSupervisor)
throttle = Keyword.fetch!(opts, :throttle)
loop_interval = Keyword.get(opts, :loop_interval, :infinity)

quote do
@behaviour Buffy.ThrottleAndTimed
Expand Down Expand Up @@ -299,16 +342,43 @@ defmodule Buffy.ThrottleAndTimed do
@doc """
Function that returns a key from incoming args.

Defaults to `args |> :erlang.term_to_binary() |> :erlang.phash2()`
Defaults to using `:erlang.term_to_binary/1` and `:erlang.phash2()`
but can be overwritten for more efficient memory usage.
"""
@spec args_to_key(any()) :: non_neg_integer()
def args_to_key(args), do: args |> :erlang.term_to_binary() |> :erlang.phash2()
@impl Buffy.ThrottleAndTimed
@spec args_to_key(Buffy.ThrottleAndTimed.args()) :: non_neg_integer()
def args_to_key(args) do
args
|> :erlang.term_to_binary()
|> :erlang.phash2()
end

defoverridable args_to_key: 1

defp key_to_name(key) do
{:via, unquote(registry_module), {unquote(registry_name), {__MODULE__, key}}}
end

@doc """
Returns the amount of time in milliseconds to wait while inbox is empty
until sending a `:timeout` message. By default, this is `:infinity`
which causes this module to function just like `Throttle`.
"""
@impl Buffy.ThrottleAndTimed
@spec get_loop_interval(Buffy.ThrottleAndTimed.args()) :: Buffy.ThrottleAndTimed.loop_interval()
def get_loop_interval(_args), do: unquote(loop_interval)

defoverridable get_loop_interval: 1

@doc """
Returns the amount of throttle time in milliseconds.
"""
@impl Buffy.ThrottleAndTimed
@spec get_throttle(Buffy.ThrottleAndTimed.args()) :: non_neg_integer()
def get_throttle(_args), do: unquote(throttle)

defoverridable get_throttle: 1

@doc """
The function that runs after throttle has completed. This function will
be called with the `t:Buffy.ThrottleAndTimed.key()` and can return anything. The
Expand Down Expand Up @@ -347,7 +417,8 @@ defmodule Buffy.ThrottleAndTimed do
@impl GenServer
@spec init({Buffy.ThrottleAndTimed.key(), Buffy.ThrottleAndTimed.args()}) :: {:ok, Buffy.ThrottleAndTimed.state()}
def init({key, args}) do
{:ok, schedule_throttle_and_update_state(%{key: key, args: args, timer_ref: nil}), {:continue, :measure_memory}}
state = %{key: key, args: args, timer_ref: nil}
{:ok, schedule_throttle(state), {:continue, :measure_memory}}
end

@doc false
Expand Down Expand Up @@ -379,21 +450,26 @@ defmodule Buffy.ThrottleAndTimed do
@doc """
Function to invoke the throttle logic if process already exists.
It will only schedule a throttle if `timer_ref` is `nil`.

"""
@impl GenServer
@spec handle_cast({:throttle, new_args :: any()}, Buffy.ThrottleAndTimed.state()) ::
@spec handle_cast({:throttle, any()}, Buffy.ThrottleAndTimed.state()) ::
{:noreply, Buffy.ThrottleAndTimed.state()}
def handle_cast({:throttle, new_args}, %{timer_ref: nil, args: args} = state) do
{:noreply, state |> schedule_throttle_and_update_state() |> Map.put(:args, update_args(args, new_args))}
def handle_cast({:throttle, new_args}, %{timer_ref: nil} = state) do
state =
state
|> Map.put(:args, update_args(state.args, new_args))
|> schedule_throttle()

{:noreply, state}
end

def handle_cast({:throttle, new_args}, %{args: args} = state) do
{:noreply, %{state | args: update_args(args, new_args)}}
end

defp schedule_throttle_and_update_state(state) do
timer_ref = Process.send_after(self(), :execute_throttle_callback, unquote(throttle))
defp schedule_throttle(state) do
throttle = get_throttle(state.args)
timer_ref = Process.send_after(self(), :execute_throttle_callback, throttle)
%{state | timer_ref: timer_ref}
end

Expand Down Expand Up @@ -431,14 +507,16 @@ defmodule Buffy.ThrottleAndTimed do
)

new_state = %{update_state_with_work_result(state, result) | timer_ref: nil}
loop_interval = get_loop_interval(new_state.args)

{:noreply, new_state, unquote(loop_interval)}
{:noreply, new_state, loop_interval}
rescue
e ->
Logger.error("Error in throttle: #{inspect(e)}")
new_state = %{state | timer_ref: nil}
loop_interval = get_loop_interval(new_state.args)

{:noreply, new_state, unquote(loop_interval)}
{:noreply, new_state, loop_interval}
end

@doc """
Expand Down
Loading