Skip to content

Commit

Permalink
Begin working on native helm repository client
Browse files Browse the repository at this point in the history
This does not cover various forms of authentication.  It also doesn't cover including this in the attributes for a helm service quite yet.
  • Loading branch information
michaeljguarino committed May 2, 2024
1 parent d750f18 commit 354db97
Show file tree
Hide file tree
Showing 23 changed files with 467 additions and 11 deletions.
11 changes: 11 additions & 0 deletions assets/src/generated/graphql.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1176,6 +1176,12 @@ export type ConsoleConfiguration = {
vpnEnabled?: Maybe<Scalars['Boolean']['output']>;
};

export enum ConstraintEnforcement {
Deny = 'DENY',
DryRun = 'DRY_RUN',
Warn = 'WARN'
}

export type ConstraintRef = {
__typename?: 'ConstraintRef';
kind: Scalars['String']['output'];
Expand Down Expand Up @@ -1832,6 +1838,7 @@ export type HelmConfigAttributes = {
/** pointer to a Plural GitRepository */
repositoryId?: InputMaybe<Scalars['ID']['input']>;
set?: InputMaybe<HelmValueAttributes>;
url?: InputMaybe<Scalars['String']['input']>;
values?: InputMaybe<Scalars['String']['input']>;
valuesFiles?: InputMaybe<Array<InputMaybe<Scalars['String']['input']>>>;
version?: InputMaybe<Scalars['String']['input']>;
Expand Down Expand Up @@ -1876,6 +1883,8 @@ export type HelmSpec = {
repositoryId?: Maybe<Scalars['ID']['output']>;
/** a list of helm name/value pairs to precisely set individual values */
set?: Maybe<Array<Maybe<HelmValue>>>;
/** the helm repository url to use */
url?: Maybe<Scalars['String']['output']>;
/** a helm values file to use with this service, requires auth and so is heavy to query */
values?: Maybe<Scalars['String']['output']>;
/** a list of relative paths to values files to use for helm applies */
Expand Down Expand Up @@ -3196,6 +3205,7 @@ export type PolicyConstraint = {
__typename?: 'PolicyConstraint';
cluster?: Maybe<Cluster>;
description?: Maybe<Scalars['String']['output']>;
enforcement?: Maybe<ConstraintEnforcement>;
id: Scalars['ID']['output'];
insertedAt?: Maybe<Scalars['DateTime']['output']>;
name: Scalars['String']['output'];
Expand All @@ -3212,6 +3222,7 @@ export type PolicyConstraint = {
/** inputs to add constraint data from an OPA gatekeeper constraint CRD */
export type PolicyConstraintAttributes = {
description?: InputMaybe<Scalars['String']['input']>;
enforcement?: InputMaybe<ConstraintEnforcement>;
name: Scalars['String']['input'];
recommendation?: InputMaybe<Scalars['String']['input']>;
/** pointer to the group/name for the CR */
Expand Down
2 changes: 2 additions & 0 deletions lib/console/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,13 @@ defmodule Console.Application do
{Registry, [keys: :unique, name: Console.Deployments.Git.Agent.registry()]},
{Registry, [keys: :unique, name: Console.Deployments.Pipelines.Supervisor.registry()]},
{Registry, [keys: :unique, name: Console.Deployments.Stacks.Worker.registry()]},
{Registry, [keys: :unique, name: Console.Deployments.Helm.Agent.registry()]},
{Cluster.Supervisor, [topologies, [name: Console.ClusterSupervisor]]},
Console.Deployments.Git.Supervisor,
Console.Deployments.Stacks.Supervisor,
Console.Deployments.Helm.Server,
Console.Deployments.Pipelines.Supervisor,
Console.Deployments.Helm.Supervisor,
Console.Deployments.Git.Kick,
Console.Deployments.Deprecations.Table,
Console.Deployments.Compatibilities.Table,
Expand Down
2 changes: 1 addition & 1 deletion lib/console/deployments/compatibilities/table.ex
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ defmodule Console.Deployments.Compatibilities.Table do
end

def handle_info(:poll, %State{table: table, static: true} = state) do
table = Enum.reduce(Static.compatibilities(), table, &KeyValueSet.put(&2, &1.name, &1))
table = Enum.reduce(Static.compatibilities(), table, &KeyValueSet.put!(&2, &1.name, &1))
{:noreply, %{state | table: table}}
end

Expand Down
16 changes: 15 additions & 1 deletion lib/console/deployments/git.ex
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,15 @@ defmodule Console.Deployments.Git do
ScmWebhook,
PrAutomation,
PullRequest,
DependencyManagementService
DependencyManagementService,
HelmRepository
}

@cache Console.conf(:cache_adapter)
@ttl :timer.minutes(30)

@type repository_resp :: {:ok, GitRepository.t} | Console.error
@type helm_resp :: {:ok, HelmRepository.t} | Console.error
@type connection_resp :: {:ok, ScmConnection.t} | Console.error
@type webhook_resp :: {:ok, ScmWebhook.t} | Console.error
@type automation_resp :: {:ok, PrAutomation.t} | Console.error
Expand All @@ -30,6 +32,8 @@ defmodule Console.Deployments.Git do

def get_repository!(id), do: Repo.get!(GitRepository, id)

def get_helm_repository(url), do: Repo.get_by(HelmRepository, url: url)

def get_by_url!(url), do: Repo.get_by!(GitRepository, url: url)

def get_by_url(url), do: Repo.get_by(GitRepository, url: url)
Expand Down Expand Up @@ -327,6 +331,16 @@ defmodule Console.Deployments.Git do
end
end

@spec upsert_helm_repository(binary) :: helm_resp
def upsert_helm_repository(url) do
case Console.Repo.get_by(HelmRepository, url: url) do
%HelmRepository{} = repo -> repo
nil -> %HelmRepository{url: url}
end
|> HelmRepository.changeset()
|> Console.Repo.insert_or_update()
end

@doc """
Sets up a service to run the renovate cron given an scm connection and target repositories
"""
Expand Down
86 changes: 86 additions & 0 deletions lib/console/deployments/helm/agent.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
defmodule Console.Deployments.Helm.Agent do
use GenServer, restart: :transient
alias Console.Deployments.Git
alias Console.Deployments.Helm.{AgentCache, Discovery}
alias Console.Schema.HelmRepository
require Logger

defmodule State, do: defstruct [:repo, :cache]

@poll :timer.minutes(2)
@jitter 15

def registry(), do: __MODULE__

def fetch(pid, chart, vsn), do: GenServer.call(pid, {:fetch, chart, vsn})

def start(url) do
GenServer.start(__MODULE__, url, name: via(url))
end

def start_link([url]), do: start_link(url)
def start_link(url) do
GenServer.start_link(__MODULE__, url, name: via(url))
end

defp via(url), do: {:via, Registry, {registry(), {:helm, url}}}

def init(url) do
{:ok, repo} = Git.upsert_helm_repository(url)
schedule_pull()
:timer.send_interval(@poll, :move)
send self(), :pull
{:ok, %State{repo: repo, cache: AgentCache.new(repo)}}
end

def handle_call({:fetch, c, v}, _, %State{cache: cache} = state) do
with {:ok, l, cache} <- AgentCache.fetch(cache, c, v),
{:ok, f} <- File.open(l.file) do
{:reply, {:ok, f, l.digest}, %{state | cache: cache}}
else
err -> {:reply, err, state}
end
end

def handle_info(:pull, %State{repo: repo, cache: cache} = state) do
with {:ok, repo} <- Git.upsert_helm_repository(repo.url),
{:ok, cache} <- AgentCache.refresh(cache),
{:ok, repo} <- refresh(repo) do
schedule_pull()
{:noreply, %{state | cache: cache, repo: repo}}
else
err ->
schedule_pull()
Logger.error "Failed to resync helm repo: #{inspect(err)}"
{:noreply, state}
end
end

def handle_info({:refresh, c, v}, %State{cache: cache} = state) do
case AgentCache.write(cache, c, v) do
{:ok, _, cache} -> {:noreply, %{state | cache: cache}}
_ -> {:noreply, state}
end
end

def handle_info(:move, %State{repo: repo} = state) do
case Discovery.local?(repo.url) do
true -> {:noreply, state}
false -> {:stop, {:shutdown, :moved}, state}
end
end

def handle_info(_, state), do: {:noreply, state}

defp refresh(%HelmRepository{} = repo) do
HelmRepository.changeset(repo, %{pulled_at: Timex.now(), health: :pullable})
|> Console.Repo.update()
end

defp schedule_pull(), do: Process.send_after(self(), :pull, @poll + jitter())

defp jitter() do
:rand.uniform(@jitter)
|> :timer.seconds()
end
end
67 changes: 67 additions & 0 deletions lib/console/deployments/helm/agent_cache.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
defmodule Console.Deployments.Helm.AgentCache do
alias Console.Helm.Client
alias Console.Deployments.Helm.Utils
require Logger

defstruct [:repo, :index, :dir, cache: %{}]

defmodule Line do
@expiry [minutes: -10]
defstruct [:file, :chart, :vsn, :digest, :touched]

def new(file, chart, vsn, digest) do
%__MODULE__{file: file, chart: chart, vsn: vsn, digest: digest, touched: Timex.now()}
end

def touch(%__MODULE__{} = mod), do: %{mod | touched: Timex.now()}

def expire(%__MODULE__{file: f}), do: File.rm(f)

def expired?(%__MODULE__{touched: touched}) do
Timex.now()
|> Timex.shift(@expiry)
|> Timex.after?(touched)
end
end

def new(repo) do
{:ok, dir} = Briefly.create(directory: true)
%__MODULE__{repo: repo, dir: dir, cache: %{}}
end

def refresh(%__MODULE__{} = cache) do
case Client.index(cache.repo.url) do
{:ok, indx} -> {:ok, sweep(%{cache | index: indx})}
_ -> {:error, "could not fetch index"}
end
end

def fetch(%__MODULE__{index: nil} = cache, chart, vsn) do
with {:ok, cache} <- refresh(cache),
do: fetch(cache, chart, vsn)
end

def fetch(%__MODULE__{cache: lines} = cache, chart, vsn) do
case lines[{chart, vsn}] do
%Line{} = l -> {:ok, l, put_in(cache.cache[{chart, vsn}], Line.touch(l))}
nil -> write(cache, chart, vsn)
end
end

def write(%__MODULE__{} = cache, chart, vsn) do
path = Path.join(cache.dir, "#{chart}.#{vsn}.tgz")
with {:ok, url, digest} <- Client.chart(cache.index, chart, vsn),
{:ok, tmp} <- Briefly.create(),
{:ok, _} <- Client.download(url, File.stream!(tmp)),
:ok <- Utils.clean_chart(tmp, path, chart),
line <- Line.new(path, chart, vsn, digest),
do: {:ok, line, put_in(cache.cache[{chart, vsn}], line)}
end

defp sweep(%__MODULE__{cache: lines} = cache) do
{keep, expire} = Enum.split_with(lines, fn {_, l} -> !Line.expired?(l) end)
Enum.each(expire, &Line.expire/1)
Enum.each(keep, fn l -> send(self(), {:refresh, l.chart, l.vsn}) end)
%{cache | cache: Map.new(keep)}
end
end
13 changes: 9 additions & 4 deletions lib/console/deployments/helm/cache.ex
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ defmodule Console.Deployments.Helm.Cache do
require Logger
alias Kube.HelmChart
alias Kube.HelmChart.Status
alias Console.Deployments.Tar
alias Console.Deployments.{Helm.Utils, Tar}

defstruct [:dir, :touched]

Expand Down Expand Up @@ -48,12 +48,17 @@ defmodule Console.Deployments.Helm.Cache do
end

defp build_tarball(url, cache, path, chart) do
with {:ok, f} <- Tar.from_url(url),
{:ok, contents} <- Tar.tar_stream(f),
:ok <- Tar.tarball(path, remove_prefix(contents, chart)),
with {:ok, path} <- download_to(url, path, chart),
do: open(cache, path)
end

def download_to(url, path, chart) do
with {:ok, tmp} <- Tar.from_url(url),
:ok <- File.open!(tmp) |> Utils.clean_chart(path, chart),
:ok <- File.rm(tmp),
do: {:ok, path}
end

def refresh(%__MODULE__{touched: touched} = cache) do
Logger.info "expiring helm chart cache..."
expires = Timex.now() |> Timex.shift(minutes: -30)
Expand Down
33 changes: 33 additions & 0 deletions lib/console/deployments/helm/discovery.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
defmodule Console.Deployments.Helm.Discovery do
alias Console.Deployments.Helm.{Supervisor, Agent}

def agent(url) do
case maybe_rpc(url, Supervisor, :start_child, [url]) do
{:ok, pid} -> {:ok, pid}
{:error, {:already_started, pid}} -> {:ok, pid}
err -> err
end
end

def fetch(url, chart, vsn) do
with {:ok, pid} <- agent(url),
do: Agent.fetch(pid, chart, vsn)
end

defp maybe_rpc(id, module, func, args) do
me = node()
case worker_node(id) do
^me -> apply(module, func, args)
node -> :rpc.call(node, module, func, args)
end
end

def worker_node(url), do: HashRing.key_to_node(ring(), url)

def local?(url), do: worker_node(url) == node()

defp ring() do
HashRing.new()
|> HashRing.add_nodes([node() | Node.list()])
end
end
17 changes: 17 additions & 0 deletions lib/console/deployments/helm/supervisor.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
defmodule Console.Deployments.Helm.Supervisor do
use DynamicSupervisor
alias Console.Deployments.Helm.Agent

def start_link(init_arg \\ :ok) do
DynamicSupervisor.start_link(__MODULE__, init_arg, name: __MODULE__)
end

def start_child(url) do
DynamicSupervisor.start_child(__MODULE__, {Agent, url})
end

@impl true
def init(_init_arg) do
DynamicSupervisor.init(strategy: :one_for_one)
end
end
23 changes: 23 additions & 0 deletions lib/console/deployments/helm/utils.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
defmodule Console.Deployments.Helm.Utils do
alias Console.Deployments.Tar

def clean_chart(path, to, chart) when is_binary(path) do
file = File.open!(path)
try do
clean_chart(file, to, chart)
after
File.close(file)
end
end

def clean_chart(f, to, chart) do
with {:ok, contents} <- Tar.tar_stream(f),
do: Tar.tarball(to, remove_prefix(contents, chart))
end

defp remove_prefix(contents, chart) do
Enum.map(contents, fn {path, content} ->
{String.trim_leading(path, "#{chart}/"), content}
end)
end
end
11 changes: 10 additions & 1 deletion lib/console/deployments/services.ex
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,16 @@ defmodule Console.Deployments.Services do
Git.get_repository!(id)
|> Git.Discovery.fetch(git)
end
defp tarfile(%Service{helm: %Service.Helm{chart: c, version: v}} = svc) when is_binary(c) and is_binary(v) do

defp tarfile(%Service{helm: %Service.Helm{chart: c, version: v, url: url}} = svc)
when is_binary(c) and is_binary(v) and is_binary(url) do
with {:ok, f, sha} <- Helm.Discovery.fetch(url, c, v),
{:ok, _} <- update_sha_without_revision(svc, sha),
do: {:ok, f}
end

defp tarfile(%Service{helm: %Service.Helm{chart: c, version: v}} = svc)
when is_binary(c) and is_binary(v) do
with {:ok, f, sha} <- Helm.Charts.artifact(svc),
{:ok, _} <- update_sha_without_revision(svc, sha),
do: {:ok, f}
Expand Down
Loading

0 comments on commit 354db97

Please sign in to comment.