Skip to content

Commit

Permalink
Adding retention period for all node/keys. The code will periodically
Browse files Browse the repository at this point in the history
check the retention time and delete the keys that are over the period.
  • Loading branch information
thiagoesteves committed Jan 24, 2025
1 parent 37fc7b0 commit 23ae64c
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 22 deletions.
65 changes: 49 additions & 16 deletions lib/deployex/telemetry/collector.ex
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,11 @@ defmodule Deployex.Telemetry.Collector do

alias Deployex.Storage

@metric_key_tables "metric-keys"
@metric_keys "metric-keys"
@nodes_table :nodes_list

@minute_to_milliseconds 60_000
@data_retention_period_min 1

### ==========================================================================
### Callback functions
Expand All @@ -22,7 +23,7 @@ defmodule Deployex.Telemetry.Collector do
end

@impl true
def init(_args) do
def init(args) do
Process.flag(:trap_exit, true)

:ets.new(@nodes_table, [:set, :protected, :named_table])
Expand All @@ -34,11 +35,15 @@ defmodule Deployex.Telemetry.Collector do
node = String.to_atom("#{Storage.sname(instance)}@#{hostname}")
# Create metric tables for the node
:ets.new(node, [:set, :protected, :named_table])
:ets.insert(node, {@metric_key_tables, []})
# Add the node to the nodes list table to avoid dynamic atom creation
:ets.insert(node, {@metric_keys, []})
# Add the node to the nodes list table to improve performance
:ets.insert(@nodes_table, {instance, node})
end)

args
|> Keyword.get(:data_retention_period_min, :timer.minutes(@data_retention_period_min))
|> :timer.send_interval(:prune_expired_entries)

Logger.info("Initialising telemetry collector server")

{:ok, %{}}
Expand All @@ -49,10 +54,8 @@ defmodule Deployex.Telemetry.Collector do
{:telemetry, %{metrics: metrics, reporter: reporter, measurements: measurements}},
state
) do
now_to_minute = fn now -> trunc(now / @minute_to_milliseconds) end

now = System.os_time(:millisecond)
minute = now_to_minute.(now)
minute = unix_to_minute(now)

keys = get_keys_by_node(reporter)

Expand All @@ -62,7 +65,7 @@ defmodule Deployex.Telemetry.Collector do

current_data =
case :ets.lookup(reporter, timed_key) do
[{_, value}] -> [data | value]
[{_, current_list_data}] -> [data | current_list_data]
_ -> [data]
end

Expand All @@ -82,7 +85,7 @@ defmodule Deployex.Telemetry.Collector do
end)

if new_keys != [] do
:ets.insert(reporter, {@metric_key_tables, new_keys ++ keys})
:ets.insert(reporter, {@metric_keys, new_keys ++ keys})

Phoenix.PubSub.broadcast(
Deployex.PubSub,
Expand All @@ -94,6 +97,30 @@ defmodule Deployex.Telemetry.Collector do
{:noreply, state}
end

@impl true
def handle_info(:prune_expired_entries, state) do
now_minutes = unix_to_minute()
deletion_period_to = now_minutes - @data_retention_period_min - 1
deletion_period_from = deletion_period_to - 2

prune_keys = fn node, key ->
Enum.each(deletion_period_from..deletion_period_to, fn timestamp ->
:ets.delete(node, metric_key(key, timestamp))
end)
end

Storage.instance_list()
|> Enum.each(fn instance ->
node = node_by_instance(instance)

node
|> get_keys_by_node()
|> Enum.each(&prune_keys.(node, &1))
end)

{:noreply, state}
end

### ==========================================================================
### Public functions
### ==========================================================================
Expand All @@ -109,11 +136,11 @@ defmodule Deployex.Telemetry.Collector do
Phoenix.PubSub.unsubscribe(Deployex.PubSub, keys_topic())
end

def subscribe_for_updates(service, key) do
def subscribe_for_new_data(service, key) do
Phoenix.PubSub.subscribe(Deployex.PubSub, metrics_topic(service, key))
end

def unsubscribe_for_updates(service, key) do
def unsubscribe_for_new_data(service, key) do
Phoenix.PubSub.unsubscribe(Deployex.PubSub, metrics_topic(service, key))
end

Expand Down Expand Up @@ -141,12 +168,12 @@ defmodule Deployex.Telemetry.Collector do
from = Keyword.get(options, :from, 15)
order = Keyword.get(options, :order, :asc)

now_minutes = trunc(System.os_time(:millisecond) / @minute_to_milliseconds)
now_minutes = unix_to_minute()
from_minutes = now_minutes - from

result =
Enum.reduce(from_minutes..now_minutes, [], fn minute, acc ->
case :ets.lookup(service, "#{key}|#{minute}") do
case :ets.lookup(service, metric_key(key, minute)) do
[{_, value}] ->
value ++ acc

Expand Down Expand Up @@ -174,8 +201,14 @@ defmodule Deployex.Telemetry.Collector do
### ==========================================================================
### Private functions
### ==========================================================================

defp metric_key(metric, timestamp), do: "#{metric}|#{timestamp}"

defp unix_to_minute(time \\ System.os_time(:millisecond)),
do: trunc(time / @minute_to_milliseconds)

defp get_keys_by_node(node) do
case :ets.lookup(node, @metric_key_tables) do
case :ets.lookup(node, @metric_keys) do
[{_, value}] -> value
_ -> []
end
Expand All @@ -189,12 +222,12 @@ defmodule Deployex.Telemetry.Collector do
### ==========================================================================
defp prepare_timeseries_data(%{name: name} = metric, measurements, now, minute)
when name in ["vm.memory.total"] do
{name, "#{name}|#{minute}",
{name, metric_key(name, minute),
%{timestamp: now, value: metric.value, unit: metric.unit, metadata: measurements}}
end

defp prepare_timeseries_data(%{name: name} = metric, _measurements, now, minute) do
{name, "#{name}|#{minute}",
{name, metric_key(name, minute),
%{timestamp: now, value: metric.value, unit: metric.unit, tags: metric.tags}}
end
end
12 changes: 6 additions & 6 deletions lib/deployex_web/live/metrics/index.ex
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ defmodule DeployexWeb.MetricsLive do

socket =
Enum.reduce(node_info.selected_metrics_keys, socket, fn metric_key, acc ->
Collector.unsubscribe_for_updates(service_key, metric_key)
Collector.unsubscribe_for_new_data(service_key, metric_key)

data_key = data_key(service_key, metric_key)

Expand All @@ -216,7 +216,7 @@ defmodule DeployexWeb.MetricsLive do

socket =
Enum.reduce(node_info.selected_services_keys, socket, fn service_key, acc ->
Collector.unsubscribe_for_updates(service_key, metric_key)
Collector.unsubscribe_for_new_data(service_key, metric_key)

data_key = data_key(service_key, metric_key)

Expand All @@ -243,7 +243,7 @@ defmodule DeployexWeb.MetricsLive do

socket =
Enum.reduce(node_info.selected_metrics_keys, socket, fn metric_key, acc ->
Collector.subscribe_for_updates(service_key, metric_key)
Collector.subscribe_for_new_data(service_key, metric_key)

data_key = data_key(service_key, metric_key)

Expand Down Expand Up @@ -274,7 +274,7 @@ defmodule DeployexWeb.MetricsLive do

socket =
Enum.reduce(node_info.selected_services_keys, socket, fn service_key, acc ->
Collector.subscribe_for_updates(service_key, metric_key)
Collector.subscribe_for_new_data(service_key, metric_key)

data_key = data_key(service_key, metric_key)

Expand Down Expand Up @@ -376,8 +376,8 @@ defmodule DeployexWeb.MetricsLive do
service = Collector.node_by_instance(instance) |> to_string
[name, _hostname] = String.split(service, "@")

metrics_keys = (metrics_keys ++ instance_metrics_keys) |> Enum.uniq()
services_keys = services_keys ++ [service]
metrics_keys = (metrics_keys ++ instance_metrics_keys) |> Enum.sort() |> Enum.uniq()
services_keys = Enum.sort(services_keys ++ [service])

node =
if service in selected_services_keys do
Expand Down

0 comments on commit 23ae64c

Please sign in to comment.